pcp
[Top] [All Lists]

Re: [pcp] pmie and system clock changes

To: Ken McDonell <kenj@xxxxxxxxxxxxxxxx>
Subject: Re: [pcp] pmie and system clock changes
From: Martin Hicks <mort@xxxxxxx>
Date: Tue, 11 May 2010 14:32:29 -0500
Cc: pcp@xxxxxxxxxxx
In-reply-to: <1273131342.22330.140.camel@xxxxxxxxxxxxxxxx>
References: <20100505201154.GX25390@xxxxxxxxxxxxxxxxxxxxxxxxx> <1273131342.22330.140.camel@xxxxxxxxxxxxxxxx>
User-agent: Mutt/1.5.17 (2007-11-01)
On Thu, May 06, 2010 at 05:35:42PM +1000, Ken McDonell wrote:
> I suspect we can solve this problem with a large hammer that will work
> on all platforms (Greg's suggestion won't apparently work on Mac OS X).
> 
> If the time ever goes backwards (not a small ntp adjustment, I mean
> now - last_delay is < now at the last sample, so time went back more
> than 2*last_delay), there is no point keeping any history and any notion
> of a "rate" over the last interval is pretty wobbly.

One place where I can see this breaking down is on a heavily loaded
machine (or with a lot of PMIE rules being evalulated???).  With a
fairly short delta I could see the rule being > 2*delta LATE in being
scheduled, in which case the code would think that there must have been
a clock changes.

> 
> If the time ever goes forward by a lot (e.g. now is > now at last sample
> + 2*last_delay), there is no point keeping any history and any notion of
> a "rate" over the last interval is pretty wobbly.
> 
> Fortunately "no point keeping any history" looks a lot like reconnect to
> pmcd after lost the connection to pmcd ... so we may be able to leverage
> that logic.

Here's an early crack at this.  I'm definitely not handling the time
Window Specification correctly (maybe I just need to get rid of
reinitTime() and call initTime() again)

Does the invalidate() go far enough towards clearing metric history or
should this really be doing reinitMetric() like in the host reconnect
path?

I traded in a few timeval-related functions for those that are in
libpcp, rather than the reinvented ones in the pmie sources.

mh

PS wow are there ever a lot of global variables...


diff --git a/src/pmie/src/dstruct.c b/src/pmie/src/dstruct.c
index d440ea3..5f1373d 100644
--- a/src/pmie/src/dstruct.c
+++ b/src/pmie/src/dstruct.c
@@ -125,7 +125,7 @@ getReal(void)
     struct timeval t;
 
     gettimeofday(&t, NULL);
-    return realize(t);
+    return __pmtimevalToReal(&t);
 }
 
 
@@ -172,63 +172,53 @@ reflectTime(RealTime d)
 }
 
 
-/* convert RealTime to timeval */
-void
-unrealize(RealTime rt, struct timeval *tv)
+int initTime(void)
 {
-    tv->tv_sec = (time_t)rt;
-    tv->tv_usec = (int)(1000000 * (rt - tv->tv_sec));
-}
+    struct timeval tv, tv1, tv2;
+    char *msg;
+
+    if (archives) {
+       __pmtimevalFromReal(first, &tv1);
+       __pmtimevalFromReal(last, &tv2);
+    } else {
+       now = getReal() + 1.0;
+       reflectTime(dfltDelta);
+       __pmtimevalFromReal(now, &tv1);
+       tv2.tv_sec = INT_MAX;
+    }
 
+    if (pmParseTimeWindow(startFlag, stopFlag, alignFlag, offsetFlag,
+                         &tv1, &tv2,
+                         &tv, &tv2, &tv1,
+                         &msg) < 0) {
+       fputs(msg, stderr);
+       exit(1);
+    }
 
-/* convert RealTime to timespec */
-void
-unrealizenano(RealTime rt, struct timespec *ts)
-{
-    ts->tv_sec = (time_t)rt;
-    ts->tv_nsec = (int)(1000000000 * (rt - ts->tv_sec));
+    /* set run timing window */
+    start = __pmtimevalToReal(&tv1);
+    stop = __pmtimevalToReal(&tv2);
+    runTime = stop - start;
+
+    return 0;
 }
 
 
-/* sleep until given RealTime */
-void
-sleepTight(RealTime sched)
+/* reinitialize time after the system clock changed */
+int reinitTime(void)
 {
-    RealTime   delay;  /* interval to sleep */
-    int                sts;
-#ifdef HAVE_WAITPID
-    pid_t      pid;
-
-    /* harvest terminated children */
-    while ((pid = waitpid(-1, &sts, WNOHANG)) > (pid_t)0) {
-#if PCP_DEBUG
-       if (pmDebug & DBG_TRACE_APPL2) {
-           fprintf(stderr, "sleepTight: wait: pid=%d done status=0x%x", 
(int)pid, sts);
-           if (WIFEXITED(sts))
-               fprintf(stderr, " exit=%d", WEXITSTATUS(sts));
-           if (WIFSIGNALED(sts))
-               fprintf(stderr, " signal=%d", WTERMSIG(sts));
-           fprintf(stderr, "\n");
-       }
-#endif
-       ;
-    }
-#endif
+    /* 'now' is stale -- from the old system clock */
+    RealTime elapsed = now - start;
 
-    if (!archives) {
-       struct timespec ts, tleft;
+    now = getReal();
+    reflectTime(dfltDelta);
 
-       delay = sched - getReal();
-       unrealizenano(delay, &ts);
-       for (;;) {      /* loop to catch early wakeup from nanosleep */
-           sts = nanosleep(&ts, &tleft);
-           if (sts == 0 || (sts < 0 && errno != EINTR))
-               break;
-           ts = tleft;
-       }
-    }
-}
+    start = now;
+    runTime -= elapsed;
+    stop = start + runTime;
 
+    return 0;
+}
 
 /***********************************************************************
  * ring buffer management
diff --git a/src/pmie/src/dstruct.h b/src/pmie/src/dstruct.h
index 550afb7..ae7d380 100644
--- a/src/pmie/src/dstruct.h
+++ b/src/pmie/src/dstruct.h
@@ -69,15 +69,12 @@ typedef double              RealTime;       /* wall clock 
time or interval */
 #define DELTA_DFLT     10              /* default sample interval */
 #define DELTA_MIN      0.1             /* minimum sample interval */
 
-/* convert timeval to RealTime */
-#define realize(t) (1.0e-6 * (RealTime)(t).tv_usec + (RealTime)(t).tv_sec)
-/* convert RealTime to timeval */
-void unrealize(RealTime, struct timeval *);
-
 RealTime getReal(void);                        /* return current time */
 void reflectTime(RealTime);            /* update time vars to reflect now */
-void sleepTight(RealTime);             /* sleep until given RealTime */
-
+int initTime(void);                    /* Init the various time variables */
+int reinitTime(void);                  /* reinitialize time variables after
+                                        * the system time changes drastically
+                                        */
 
 /***********************************************************************
  * evaluator functions
diff --git a/src/pmie/src/eval.c b/src/pmie/src/eval.c
index 7a0dbb8..1d37c19 100644
--- a/src/pmie/src/eval.c
+++ b/src/pmie/src/eval.c
@@ -30,7 +30,7 @@
  * scheduling
  ***********************************************************************/
 
-/* enter Task into task queue */
+/* enter Task into task queue, ordered by next wakeup time */
 static void
 enque(Task *t)
 {
@@ -725,42 +725,115 @@ findEval(Expr *x)
 }
 
 
+static int
+reapChildren(void)
+{
+#ifdef HAVE_WAITPID
+    pid_t      pid;
+    int                sts;
+
+    /* harvest terminated children */
+    while ((pid = waitpid(-1, &sts, WNOHANG)) > (pid_t)0) {
+#if PCP_DEBUG
+       if (pmDebug & DBG_TRACE_APPL2) {
+           fprintf(stderr, "sleepTight: wait: pid=%d done status=0x%x", 
(int)pid, sts);
+           if (WIFEXITED(sts))
+               fprintf(stderr, " exit=%d", WEXITSTATUS(sts));
+           if (WIFSIGNALED(sts))
+               fprintf(stderr, " signal=%d", WTERMSIG(sts));
+           fprintf(stderr, "\n");
+       }
+#endif
+       ;
+    }
+#endif
+
+    return 0;
+}
+
+
+/* determine if the system clock jumped */
+static int
+clockChanged(int wakeup, int delta)
+{
+    struct timeval tv;
+    RealTime n;
+
+    gettimeofday(&tv, NULL);
+    n = __pmtimevalToReal(&tv);
+
+    if (n > wakeup + 2*delta) {
+       return 1;
+    } else if (n < wakeup - 2*delta)
+       return -1;
+
+    return 0;
+}
+
+static void
+initSched(void)
+{
+    Task *t;
+
+    for (t = taskq; t; t = t->next) {
+       t->eval = t->epoch = start;
+       t->retry = 0;
+       t->tick = 0;
+    }
+}
+
 /* run evaluator */
 void
 run(void)
 {
+    int                sts;
     Task       *t;
 
     /* empty task queue */
     if (taskq == NULL)
        return;
 
-    /* initialize task scheduling */
-    t = taskq;
-    while (t) {
-       t->eval = t->epoch = start;
-       t->retry = 0;
-       t->tick = 0;
-       t = t->next;
-    }
+    initSched();
 
     /* evaluate and reschedule */
     t = taskq;
     for (;;) {
+       int do_retry = 0;
+       int next_wakeup, next_delta;
+       struct timeval tv;
+
        if (t->retry && t->retry < t->eval) {
+           do_retry = 1;
+           next_wakeup = t->retry;
+           next_delta  = RETRY;
+       } else {
+           next_wakeup = t->eval;
+           next_delta  = t->delta;
+       }
+       
+       if ((sts = clockChanged(next_wakeup, next_delta)) != 0) {
+           reinitTime();
+           initSched();
+           invalidate();
+           do_retry = 0;
+       }
+
+       reapChildren();
+       if (do_retry) {
            now = t->retry;
            if (now > stop)
                break;
-           sleepTight(t->retry);
+           __pmtimevalFromReal(t->retry, &tv);
+           __pmtimevalPause(tv);
            enable(t);
            t->retry = waiting(t) ? now + RETRY : 0;
-       }
-       else {
+       } else {
            now = t->eval;
            if (now > stop)
                break;
            reflectTime(t->delta);
-           sleepTight(t->eval);
+           __pmtimevalFromReal(t->eval, &tv);
+           __pmtimevalPause(tv);
            eval(t);
            t->tick++;
            t->eval = t->epoch + t->tick * t->delta;
diff --git a/src/pmie/src/pmie.c b/src/pmie/src/pmie.c
index f115282..63cb610 100644
--- a/src/pmie/src/pmie.c
+++ b/src/pmie/src/pmie.c
@@ -476,17 +476,15 @@ getargs(int argc, char *argv[])
     int                        c;
     int                        bflag = 0;
     Archive            *a;
-    struct timeval     tv, tv1, tv2;
+    struct timeval     tv;
     extern int         showTimeFlag;
     extern int         errs;           /* syntax errors from syntax.c */
 
     memset(&tv, 0, sizeof(tv));
-    memset(&tv1, 0, sizeof(tv1));
-    memset(&tv2, 0, sizeof(tv2));
     dstructInit();
 
     while ((c=getopt(argc, argv, "a:A:bc:CdD:efHh:j:l:n:O:S:t:T:vVWXxzZ:?")) 
!= EOF) {
-        switch (c) {
+       switch (c) {
 
        case 'a':                       /* archives */
            if (dfltConn && dfltConn != PM_CONTEXT_ARCHIVE) {
@@ -579,7 +577,7 @@ getargs(int argc, char *argv[])
            dfltHost = optarg;
            break;
 
-        case 'j':                      /* stomp protocol (JMS) config */
+       case 'j':                       /* stomp protocol (JMS) config */
            stompfile = optarg;
            break;
 
@@ -594,7 +592,7 @@ getargs(int argc, char *argv[])
            isdaemon = 1;
            break;
 
-        case 'n':                      /* alternate namespace file */
+       case 'n':                       /* alternate namespace file */
            pmnsfile = optarg;
            break;
 
@@ -607,8 +605,8 @@ getargs(int argc, char *argv[])
            break;
 
        case 't':                       /* sample interval */
-           if (pmParseInterval(optarg, &tv1, &msg) == 0)
-               dfltDelta = realize(tv1);
+           if (pmParseInterval(optarg, &tv, &msg) == 0)
+               dfltDelta = __pmtimevalToReal(&tv);
            else {
                fprintf(stderr, "%s: could not parse -t argument (%s)\n", 
pmProgname, optarg);
                fputs(msg, stderr);
@@ -748,21 +746,7 @@ getargs(int argc, char *argv[])
     reflectTime(dfltDelta);
 
     /* parse time window - just to check argument syntax */
-    unrealize(now, &tv1);
-    if (archives)
-       unrealize(last, &tv2);
-    else
-       tv2.tv_sec = INT_MAX;           /* sizeof(time_t) == sizeof(int) */
-    if (pmParseTimeWindow(startFlag, stopFlag, alignFlag, offsetFlag,
-                          &tv1, &tv2,
-                          &tv, &tv2, &tv1,
-                         &msg) < 0) {
-       fputs(msg, stderr);
-        exit(1);
-    }
-    start = realize(tv1);
-    stop = realize(tv2);
-    runTime = stop - start;
+    initTime();
 
     /* initialize PMAPI */
     if (pmnsfile != PM_NS_DEFAULT && (sts = pmLoadNameSpace(pmnsfile)) < 0) {
@@ -830,27 +814,7 @@ getargs(int argc, char *argv[])
        agentInit();                    /* initialize secret agent stuff */
 
     /* really parse time window */
-    if (!archives) {
-       now = getReal() + 1.0;
-       reflectTime(dfltDelta);
-    }
-    unrealize(now, &tv1);
-    if (archives)
-       unrealize(last, &tv2);
-    else
-       tv2.tv_sec = INT_MAX;
-    if (pmParseTimeWindow(startFlag, stopFlag, alignFlag, offsetFlag,
-                         &tv1, &tv2,
-                          &tv, &tv2, &tv1,
-                         &msg) < 0) {
-       fputs(msg, stderr);
-       exit(1);
-    }
-
-    /* set run timing window */
-    start = realize(tv1);
-    stop = realize(tv2);
-    runTime = stop - start;
+    initTime();
 }
 
 /***********************************************************************
@@ -890,7 +854,7 @@ interact(void)
                token = scanArg(finger);
                if (token) {
                    if (pmParseInterval(token, &tv1, &msg) == 0)
-                       runTime = realize(tv1);
+                       runTime = __pmtimevalToReal(&tv1);
                    else {
                        fputs(msg, stderr);
                        free(msg);
@@ -914,9 +878,9 @@ interact(void)
                    fprintf(stderr, "%s: error - argument required\n", 
pmProgname);
                    break;
                }
-               unrealize(start, &tv1);
+               __pmtimevalFromReal(start, &tv1);
                if (archives)
-                   unrealize(last, &tv2);
+                   __pmtimevalFromReal(last, &tv2);
                else
                    tv2.tv_sec = INT_MAX;
                if (__pmParseTime(token, &tv1, &tv2, &tv1, &msg) < 0) {
@@ -924,7 +888,7 @@ interact(void)
                    free(msg);
                    break;
                }
-               start = realize(tv1);
+               start = __pmtimevalToReal(&tv1);
                if (archives)
                    invalidate();
                break;
@@ -940,7 +904,7 @@ interact(void)
                    free(msg);
                    break;
                }
-               runTime = realize(tv1);
+               runTime = __pmtimevalToReal(&tv1);
                break;
            case 'q':
                quit = 1;
diff --git a/src/pmie/src/pragmatics.c b/src/pmie/src/pragmatics.c
index 2788b57..af33cb5 100644
--- a/src/pmie/src/pragmatics.c
+++ b/src/pmie/src/pragmatics.c
@@ -481,7 +481,7 @@ initArchive(Archive *a)
     n = (int) strlen(label.ll_hostname);
     a->hname = (char *) alloc(n + 1);
     strcpy(a->hname, label.ll_hostname);
-    a->first = realize(label.ll_start);
+    a->first = __pmtimevalToReal(&label.ll_start);
     if ((sts = pmGetArchiveEnd(&tv)) < 0) {
        fprintf(stderr, "%s: archive %s is corrupted\n"
                "pmGetArchiveEnd failed: %s\n",
@@ -489,7 +489,7 @@ initArchive(Archive *a)
        pmDestroyContext(handle);
        return 0;
     }
-    a->last = realize(tv);
+    a->last = __pmtimevalToReal(&tv);
 
     /* check for duplicate host */
     b = archives;
@@ -1131,7 +1131,7 @@ taskFetch(Task *t)
                            if (m->desc.pmid == r->vset[i]->pmid) {
                                if (r->vset[i]->numval > 0) {
                                    m->vset = r->vset[i];
-                                   m->stamp = realize(r->timestamp);
+                                   m->stamp = __pmtimevalToReal(&r->timestamp);
                                }
                                break;
                            }

<Prev in Thread] Current Thread [Next in Thread>