pcp
[Top] [All Lists]

Re: [pcp] PCP Updates: Active Probing for __pmDiscoverServices() / pmfin

To: Nathan Scott <nathans@xxxxxxxxxx>
Subject: Re: [pcp] PCP Updates: Active Probing for __pmDiscoverServices() / pmfind
From: Dave Brolley <brolley@xxxxxxxxxx>
Date: Thu, 22 May 2014 15:32:04 -0400
Cc: PCP Mailing List <pcp@xxxxxxxxxxx>
Delivered-to: pcp@xxxxxxxxxxx
In-reply-to: <1264071097.9700484.1400487393003.JavaMail.zimbra@xxxxxxxxxx>
References: <5373D0D2.5090902@xxxxxxxxxx> <1264071097.9700484.1400487393003.JavaMail.zimbra@xxxxxxxxxx>
User-agent: Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20100101 Thunderbird/24.5.0
On 05/19/2014 04:16 AM, Nathan Scott wrote:
diff --git a/configure.in b/configure.in
index 7248293..0a32cfd 100644
--- a/configure.in
+++ b/configure.in
@@ -1559,6 +1559,12 @@ __thread int x;], [],
        AC_TRY_COMPILE([#include <pthread.h>], [pthread_barrier_t mybarrier;],
            AC_DEFINE(HAVE_PTHREAD_BARRIER_T) AC_MSG_RESULT(yes),
            AC_MSG_RESULT(no))
+
+       dnl Check if sem_t is defined in semephore.h
+       AC_MSG_CHECKING([for sem_t in semephore.h])

typo: semephore.h -> semaphore.h
Fixed.
diff --git a/src/include/pcp/impl.h b/src/include/pcp/impl.h
index edffa31..727390a 100644
--- a/src/include/pcp/impl.h
+++ b/src/include/pcp/impl.h
@@ -628,9 +628,11 @@ extern int      __pmSockAddrIsLoopBack(const __pmSockAddr 
*);
  extern int         __pmSockAddrIsInet(const __pmSockAddr *);
  extern int         __pmSockAddrIsIPv6(const __pmSockAddr *);
  extern int         __pmSockAddrIsUnix(const __pmSockAddr *);
-extern char *       __pmSockAddrToString(__pmSockAddr *);
+extern char *       __pmSockAddrToString(const __pmSockAddr *);
  extern __pmSockAddr *__pmStringToSockAddr(const char *);
  extern __pmSockAddr *__pmLoopBackAddress(int);
+extern __pmSockAddr *__pmSockAddrFirstSubnetAddr(const __pmSockAddr *, int);
+extern __pmSockAddr *__pmSockAddrNextSubnetAddr(__pmSockAddr *, int);


Hmm, do these really need to be exposed outside libpcp?  (on review, it
would seem not - and theres no exports file change, so they definitely
should not be exposed up here in impl.h, right?)
This function was already here and exposed. I just changed the parameter to be 'const'. That's why there is no exports file change (it's already there). It may not be used externally yet, but I think it's only a matter of time, since it is, IMO, a generally useful function. If we remove it now, I believe that we will only end up re-adding it in the future.

diff --git a/src/libpcp/src/discovery.c b/src/libpcp/src/discovery.c
index d620311..094c91f 100644
--- a/src/libpcp/src/discovery.c
+++ b/src/libpcp/src/discovery.c
@@ -108,7 +118,7 @@ __pmAddDiscoveredService(__pmServiceInfo *info, int 
numUrls, char ***urls)
        __pmNotifyErr(LOG_ERR,
                      "__pmAddDiscoveredService: Unsupported service: '%s'",
                      info->spec);
-       return EOPNOTSUPP;
+       return -EOPNOTSUPP;
      }
/*


Whoops.  QA test time?  (we must have no/few error path tests, I guess?
would this be difficult to add?  I'm guessing not - via "pmfind -s"?)
There is qa coverage for this in test 724. What happened here is that the API (internal) changed so that the new number of urls is returned on success (was 0), so we could no longer return a positive error code on failure (probably should not been anyway). However this code gets removed by your change to enable additional services, since the protocol now gets passed in.
diff --git a/src/libpcp/src/probe.c b/src/libpcp/src/probe.c
new file mode 100644
index 0000000..8147b2e
--- /dev/null
+++ b/src/libpcp/src/probe.c
+[...]
+static void
+attemptComplete (void)

(space oddity - several in this file)

+
+/* Attempt a connection on the given address and port. Return 0, if 
successful. */
+static void *
+attemptConnection (void *arg)
+{
+    int                        s;
+    int                        flags;
+    int                        sts;
+    struct timeval     canwait = { 1, 000000 };

Hard-coding - use TIMEOUT_DEFAULT? &| __pmDefaultRequestTimeout or some
other (new) env var customisable timeout, perhaps?
Yeah. I was experimenting with different timeouts for probing large subnets. I've changed it to use __pmConnectTimeout() for now.
+    /* Attempt to connect. If flags comaes back as less than zero, then the

type: comaes -> comes
Fixed.
+
+/* Dispatch a connection attempt, on its own thread, if supported. */
+static void
+dispatchConnection (

(space oddity)
Fixed.
+    const char *service,
+    const __pmSockAddr *address,
+    int port,
+    int *numUrls,
+    char ***urls
+)
+{
+#if IS_MULTI_THREAD
+    pthread_t thread;
+#endif
+    connectionContext *context;
+    int rc;

"sts" please, you're hacking PCP this week.  ;)
Done.
+    int attempt;
+
+    /* We need a separate connection context for each potential thread. */
+    context = malloc(sizeof(*context));
+    if (context == NULL) {
+       __pmNoMem("__pmProbeDiscoverServices: unable to allocate connection 
context",
+                 sizeof(*context), PM_FATAL_ERR);
+    }
+    context->service = service;
+    context->port = port;
+    context->numUrls = numUrls;
+    context->urls = urls;
+    context->address = __pmSockAddrDup(address);

This seems to be a second Dup, and we leak address, if I'm not mistaken?
(when allocated from __pmSockAddrFirstSubnetAddr -> probeForServices?)
No leak here. Each thread needs its own copy of the address it is probing and frees it when the thread ends. The subnet iteration API reuses the same __pmSockAddr and frees it when the iteration reaches the end.
+    if (context->address == NULL) {
+       char *addrString = __pmSockAddrToString(address);
+       __pmNotifyErr(LOG_ERR, "__pmProbeDiscoverServices: unable to copy socket 
address %s",
+                     addrString);
+       free(addrString);
+       attemptComplete();
+       return;
+    }
+
+    /*
+     * Attempt the connection. Since we're not passing in attributes for the 
(possible)
+     * new thread, the only error that can occur is EAGAIN. Sleep for 0.1 
seconds
+     * before trying again. We must have a limit in case something goes wrong. 
Make it
+     * 5 seconds (50 * 100,000 usecs).
+     *
+     * Respect the requested maximum number of threads.
+     */
+    SEM_WAIT(&threadsAvailable);
+    for (attempt = 0; attempt < 50; ++attempt) {

Hard-coded limits.

+       /* Attempt the connection on a new thread. */
+        rc = THREAD_START(&thread, &attr, attemptConnection, context);
+       if (rc != EAGAIN)
+           break;
+
+       /* Wait before trying again. */
+       usleep(100000);
+    }

Hmm, more hard-coding.  usleep is not portable - go for pmtimevalSleep perhaps?

The above hard-coding could be broken via env var / setting for discovery
timeout, and then split that time up into chunks (e.g. divide by 100000usec,
if thats a good time)
Understood regarding usleep. The code now uses __pmtimevalSleep().

Regarding reconfigurability, This is really just a control against infinite looping, in case something goes horribly wrong. I'm not so sure that this is something the user needs/wants to be able to configure. This particular timeout could be made obsolete by the proposed overall timeout or by Frank's proposed alternative algorithm.

+}
+
+static int
+probeForServices (

(space oddity)
Fixed.
+    /* The service is either a service name (e.g. pmcd) or a port number. */
+    if (strcmp(service, "pmcd") == 0) {
+       if ((env = getenv("PMCD_PORT")) != NULL) {
+           port = strtol(env, &end, 0);
+           if (*end != '\0' || port < 0) {
+               __pmNotifyErr(LOG_WARNING,
+                             "__pmProbeDiscoverServices: ignored bad PMCD_PORT = 
'%s'\n",
+                             env);
+               port = SERVER_PORT;
+           }
+       }
+       else
+           port = SERVER_PORT;
+    }
+    else {
+       port = strtol(service, &end, 0);
+       if (*end != '\0') {
+           __pmNotifyErr(LOG_ERR, "__pmProbeDiscoverServices: service '%s; is not 
valid",
+                         service);
+           return 0;

It worries me a little we only have one service here, tends to hide
bugs that will mean only one will ever work - I'd love to see pmproxy
(PROXY_PORT) and pmwebd (PMWEBD_PORT) added into this mix ASAP to
make sure it always supports >1 service as well as >1 mechanism.

Going back a bit, the probe does a socket connect/close doesn't it?
So, all we'd need here is some other strcmp/port mappings for those
other two - is that right?

(similarly, the Avahi extension would be relatively straight forward,
no?  I think we should just go right ahead and do it, unless there's
something big & onerous I'm missing there?)
Thanks for doing this part.
+    }
+    __pmSockAddrSetPort(netAddress, port);
+
+    /*
+     * We have a network address, a subnet and a port. Iterate over the 
addresses in the subnet,

(long comment line - generally 80 cols is used as defacto std)
Fixed.
+    addressesProcessed = __pmSockAddrFirstSubnetAddr(netAddress, maskBits);
+
+    /* Dispatch the connection attempts. */
+    prevNumUrls = numUrls;
+    for (address = __pmSockAddrFirstSubnetAddr(netAddress, maskBits);
+        address != NULL;
+        address = __pmSockAddrNextSubnetAddr(address, maskBits)) {
+       dispatchConnection (service, address, port, &numUrls, urls);

(extra whitespace, and "address" use here looks questionable wrt leaking
as mentioned above)
Whitespace fixed. No leak here. See the existing comment above this loop.
+/*
+ * Parse the mechanism string for options. The first option will be of the form
+ *
+ *   probe=<net-address>/<maskSize>
+ *
+ * Subesquent options, if any, will be separated by commas. Currently 
supported:

typo: subesquent -> subsequent.

+ *   maxThreads=<integer>  -- specifies a hard limit on the number of active 
threads.
+ */
+ static int
+ parseOptions(const char *mechanism)

Hmm - these options are specific to the individual discovery mechanisms
aren't they?  Shouldn't we be calling out to discovery-specific parsing
mechanisms here?
This entire source file is specific to the probing mechanism.
+    /* Parse any remaining options. */
+    option = end;
+    if (*option == '\0')
+       return 0; /* no options */
+
+    sts = 0;

+       /* Examine the option. */
+       if (strncmp(option, "maxThreads=", 11) == 0) {

just "threads"?
It really is a maximum. Fewer threads may end up being used if the subnet is small enough or if the threads finish quickly enough.
+           option += 11;

(change that if so -> either way, sizeof("maxThreads") will be better than
simply 11)
Fixed.
+               option = end;
+               /*
+                * Make sure the value is positive. Large values are ok. They 
have the
+                * effect of "no fixed limit". However, there is an actual 
limit to be
+                * observed. sem_init(3) says that it is SEM_VALUE_MAX. 
However, on f19,
+                * where SEM_VALUE_MAX is 0xffffffff, values higher than 
0x7fffffff cause

*cough* - sounds like a glibc bug?
Yeah -- I'll put together a test case and submit a report.
+                * the semaphore to block on the first sem_wait.
+                */
+               if (longVal > 0x7fffffff) {

How about using something like "max(SEM_VALUE_MAX, 0x7fffffff)" here,
to be sure to be sure?

Either way, those numbers are insanely high - maybe just set a crazy but
smaller thread count limit anyway - like 1024?  (ie who would use *that*
many threads, anyway!?)
Yeah, the actual limit will end up being FD_SETSIZE (which is 1024 on my system), since that's the max number of open fds we can manage. I've fixed this code to set the limit to max(SEM_VALUE_MAX, FD_SET_SIZE).

+int
+__pmProbeDiscoverServices(const char *service, const char *mechanism, int 
numUrls, char ***urls)
+{
+    int        sts;
+
+    /* Interpret the mechanism string. */
+    sts = parseOptions(mechanism);
+    if (sts != 0)
+       return 0;
+
+    /* Everything checks out. Now do the actual probing. */
+    numUrls = probeForServices (service, netAddress, maskBits, numUrls, urls);

(whitespace oddity)
Fixed.
@@ -2252,3 +2252,80 @@ __pmSockAddrToString(__pmSockAddr *addr)
      }
      return buf;
  }
+
+__pmSockAddr *
+__pmSockAddrFirstSubnetAddr(const __pmSockAddr *netAddr, int maskBits)
+{
+    __pmSockAddr       *addr;
+
+    /* Make a copy of the net address for iteration purposes. */
+    addr = __pmSockAddrDup(netAddr);

(this allocation ultimately seems to lead to one or two potential
memory leaks down the track ... but perhaps I'm missing something
subtle there)
No memory leak here -- as discussed in earlier comments.

Thanks for the review! Let me know what you think about the items I've left open.

Dave

<Prev in Thread] Current Thread [Next in Thread>