diff --git a/src/collectl2pcp/collectl2pcp.c b/src/collectl2pcp/collectl2pcp.c index c9755af..4bf6b58 100644 --- a/src/collectl2pcp/collectl2pcp.c +++ b/src/collectl2pcp/collectl2pcp.c @@ -248,12 +248,23 @@ main(int argc, char *argv[]) fields_free(f); } - /* final flush for this file */ + /* final flush for this collectl input log */ if ((sts = timestamp_flush()) < 0) { fprintf(stderr, "Error: failed to write final timestamp: %s\n", pmiErrStr(sts)); exit(1); } + if (filenum < nfilelist-1) { + /* + * End of this collectl input log and there's more input files, + * emit a to signify a temporal gap since the output PCP + * archive is effectively a merged archive. + */ + if (vflag) + fprintf(stderr, "End of collectl input file '%s', record emitted\n", infile); + pmiPutMark(); + } + if (gzipped) pclose(fp); else diff --git a/src/include/pcp/import.h b/src/include/pcp/import.h index e535796..5e6940f 100644 --- a/src/include/pcp/import.h +++ b/src/include/pcp/import.h @@ -42,6 +42,7 @@ PMI_CALL extern int pmiGetHandle(const char *, const char *); PMI_CALL extern int pmiPutValueHandle(int, const char *); PMI_CALL extern int pmiWrite(int, int); PMI_CALL extern int pmiPutResult(const pmResult *); +PMI_CALL extern int pmiPutMark(void); /* helper routines */ PMI_CALL extern pmID pmiID(int, int, int); diff --git a/src/libpcp_import/src/exports b/src/libpcp_import/src/exports index a544262..d484e6e 100644 --- a/src/libpcp_import/src/exports +++ b/src/libpcp_import/src/exports @@ -1,4 +1,4 @@ -PCP_IMPORT_1.0 { +PCP_IMPORT_1.1 { global: pmiStart; pmiSetHostname; @@ -23,6 +23,7 @@ PCP_IMPORT_1.0 { pmiPutValueHandle; pmiWrite; + pmiPutMark; local: *; }; diff --git a/src/libpcp_import/src/import.c b/src/libpcp_import/src/import.c index 71b8c7f..2ea08af 100644 --- a/src/libpcp_import/src/import.c +++ b/src/libpcp_import/src/import.c @@ -724,3 +724,40 @@ pmiPutResult(const pmResult *result) return current->last_sts = _pmi_put_result(current, current->result); } + +int +pmiPutMark(void) +{ + __pmLogCtl *lcp; + struct { + __pmPDU hdr; + __pmTimeval timestamp; /* when returned */ + int numpmid; /* zero PMIDs to follow */ + __pmPDU tail; + } mark; + + if (current == NULL) + return PM_ERR_NOCONTEXT; + + if (current->last_stamp.tv_sec == 0 && current->last_stamp.tv_usec == 0) + /* no earlier pmResult, no point adding a mark record */ + return 0; + lcp = ¤t->logctl; + + mark.hdr = htonl((int)sizeof(mark)); + mark.tail = mark.hdr; + mark.timestamp.tv_sec = current->last_stamp.tv_sec; + mark.timestamp.tv_usec = current->last_stamp.tv_usec + 1000; /* + 1msec */ + if (mark.timestamp.tv_usec > 1000000) { + mark.timestamp.tv_usec -= 1000000; + mark.timestamp.tv_sec++; + } + mark.timestamp.tv_sec = htonl(mark.timestamp.tv_sec); + mark.timestamp.tv_usec = htonl(mark.timestamp.tv_usec); + mark.numpmid = htonl(0); + + if (fwrite(&mark, 1, sizeof(mark), lcp->l_mfp) != sizeof(mark)) + return -oserror(); + else + return 0; +}