pcp
[Top] [All Lists]

Re: [pcp] PCP Updates: Active Probing for __pmDiscoverServices() / pmfin

To: Dave Brolley <brolley@xxxxxxxxxx>
Subject: Re: [pcp] PCP Updates: Active Probing for __pmDiscoverServices() / pmfind
From: Nathan Scott <nathans@xxxxxxxxxx>
Date: Mon, 19 May 2014 04:16:33 -0400 (EDT)
Cc: PCP Mailing List <pcp@xxxxxxxxxxx>
Delivered-to: pcp@xxxxxxxxxxx
In-reply-to: <5373D0D2.5090902@xxxxxxxxxx>
References: <5373D0D2.5090902@xxxxxxxxxx>
Reply-to: Nathan Scott <nathans@xxxxxxxxxx>
Thread-index: ix1Gl7wVmB3yJv+KJsLGKrSAZplLqw==
Thread-topic: PCP Updates: Active Probing for __pmDiscoverServices() / pmfind
Hi Dave,

----- Original Message -----
> Hi,
> 
> The commits below have been pushed to my brolley/dev branch in the
> pcpfans repository. They collectively represent an implementation of an
> active probing mode for __pmDiscoverServices() / pmfind.
> ...
> I am soliciting review of the code and the user interface. All comments
> and suggestions are welcome.
> 

Looking good!

diff --git a/configure.in b/configure.in
index 7248293..0a32cfd 100644
--- a/configure.in
+++ b/configure.in
@@ -1559,6 +1559,12 @@ __thread int x;], [],
        AC_TRY_COMPILE([#include <pthread.h>], [pthread_barrier_t mybarrier;],
            AC_DEFINE(HAVE_PTHREAD_BARRIER_T) AC_MSG_RESULT(yes),
            AC_MSG_RESULT(no))
+
+       dnl Check if sem_t is defined in semephore.h
+       AC_MSG_CHECKING([for sem_t in semephore.h])

typo: semephore.h -> semaphore.h

+       AC_TRY_COMPILE([#include <semaphore.h>], [sem_t mysem;],
+           AC_DEFINE(HAVE_SEM_T) AC_MSG_RESULT(yes),
+           AC_MSG_RESULT(no))
     fi
 fi
 
diff --git a/src/include/pcp/impl.h b/src/include/pcp/impl.h
index edffa31..727390a 100644
--- a/src/include/pcp/impl.h
+++ b/src/include/pcp/impl.h
@@ -628,9 +628,11 @@ extern int      __pmSockAddrIsLoopBack(const __pmSockAddr 
*);
 extern int          __pmSockAddrIsInet(const __pmSockAddr *);
 extern int          __pmSockAddrIsIPv6(const __pmSockAddr *);
 extern int          __pmSockAddrIsUnix(const __pmSockAddr *);
-extern char *       __pmSockAddrToString(__pmSockAddr *);
+extern char *       __pmSockAddrToString(const __pmSockAddr *);
 extern __pmSockAddr *__pmStringToSockAddr(const char *);
 extern __pmSockAddr *__pmLoopBackAddress(int);
+extern __pmSockAddr *__pmSockAddrFirstSubnetAddr(const __pmSockAddr *, int);
+extern __pmSockAddr *__pmSockAddrNextSubnetAddr(__pmSockAddr *, int);


Hmm, do these really need to be exposed outside libpcp?  (on review, it
would seem not - and theres no exports file change, so they definitely
should not be exposed up here in impl.h, right?)

 
diff --git a/src/libpcp/src/discovery.c b/src/libpcp/src/discovery.c
index d620311..094c91f 100644
--- a/src/libpcp/src/discovery.c
+++ b/src/libpcp/src/discovery.c
@@ -108,7 +118,7 @@ __pmAddDiscoveredService(__pmServiceInfo *info, int 
numUrls, char ***urls)
        __pmNotifyErr(LOG_ERR,
                      "__pmAddDiscoveredService: Unsupported service: '%s'",
                      info->spec);
-       return EOPNOTSUPP;
+       return -EOPNOTSUPP;
     }
 
     /*


Whoops.  QA test time?  (we must have no/few error path tests, I guess?
would this be difficult to add?  I'm guessing not - via "pmfind -s"?)


diff --git a/src/libpcp/src/probe.c b/src/libpcp/src/probe.c
new file mode 100644
index 0000000..8147b2e
--- /dev/null
+++ b/src/libpcp/src/probe.c
+[...]
+static void
+attemptComplete (void)

(space oddity - several in this file)

+
+/* Attempt a connection on the given address and port. Return 0, if 
successful. */
+static void *
+attemptConnection (void *arg)
+{
+    int                        s;
+    int                        flags;
+    int                        sts;
+    struct timeval     canwait = { 1, 000000 };

Hard-coding - use TIMEOUT_DEFAULT? &| __pmDefaultRequestTimeout or some
other (new) env var customisable timeout, perhaps?

+    /*
+     * Create a socket. There may be a limit on open fds. If we get EAGAIN, 
then
+     * wait 0.1 seconds and try again.  We must have a limit in case something 
goes wrong.
+     * Make it 5 seconds (50 * 100,000 usecs).
+     */
+    for (attempt = 0; attempt < 50; ++attempt) {
+       if (__pmSockAddrIsInet(context->address))
+           s = __pmCreateSocket();
+       else /* address family already checked */
+           s = __pmCreateIPv6Socket();
+       if (s != -EAGAIN)
+           break;
+       usleep(100000);

(hard-code ... more comments on this shortly, below)

+    /* Attempt to connect. If flags comaes back as less than zero, then the

type: comaes -> comes

+
+/* Dispatch a connection attempt, on its own thread, if supported. */
+static void
+dispatchConnection (

(space oddity)

+    const char *service,
+    const __pmSockAddr *address,
+    int port,
+    int *numUrls,
+    char ***urls
+)
+{
+#if IS_MULTI_THREAD
+    pthread_t thread;
+#endif
+    connectionContext *context;
+    int rc;

"sts" please, you're hacking PCP this week.  ;)

+    int attempt;
+
+    /* We need a separate connection context for each potential thread. */
+    context = malloc(sizeof(*context));
+    if (context == NULL) {
+       __pmNoMem("__pmProbeDiscoverServices: unable to allocate connection 
context",
+                 sizeof(*context), PM_FATAL_ERR);
+    }
+    context->service = service;
+    context->port = port;
+    context->numUrls = numUrls;
+    context->urls = urls;
+    context->address = __pmSockAddrDup(address);

This seems to be a second Dup, and we leak address, if I'm not mistaken?
(when allocated from __pmSockAddrFirstSubnetAddr -> probeForServices?)

+    if (context->address == NULL) {
+       char *addrString = __pmSockAddrToString(address);
+       __pmNotifyErr(LOG_ERR, "__pmProbeDiscoverServices: unable to copy 
socket address %s",
+                     addrString);
+       free(addrString);
+       attemptComplete();
+       return;
+    }
+
+    /*
+     * Attempt the connection. Since we're not passing in attributes for the 
(possible)
+     * new thread, the only error that can occur is EAGAIN. Sleep for 0.1 
seconds
+     * before trying again. We must have a limit in case something goes wrong. 
Make it
+     * 5 seconds (50 * 100,000 usecs).
+     *
+     * Respect the requested maximum number of threads.
+     */
+    SEM_WAIT(&threadsAvailable);
+    for (attempt = 0; attempt < 50; ++attempt) {

Hard-coded limits.

+       /* Attempt the connection on a new thread. */
+        rc = THREAD_START(&thread, &attr, attemptConnection, context);
+       if (rc != EAGAIN)
+           break;
+
+       /* Wait before trying again. */
+       usleep(100000);
+    }

Hmm, more hard-coding.  usleep is not portable - go for pmtimevalSleep perhaps?

The above hard-coding could be broken via env var / setting for discovery
timeout, and then split that time up into chunks (e.g. divide by 100000usec,
if thats a good time)

+    else if (pmDebug & DBG_TRACE_DISCOVERY) {
+       if (attempt > 0) {
+           __pmNotifyErr(LOG_INFO, "Waited for %d attempts to create a 
thread\n",
+                         attempt);
+       }

(loving the diagnostics!)

+}
+
+static int
+probeForServices (

(space oddity)

+    /* The service is either a service name (e.g. pmcd) or a port number. */
+    if (strcmp(service, "pmcd") == 0) {
+       if ((env = getenv("PMCD_PORT")) != NULL) {
+           port = strtol(env, &end, 0);
+           if (*end != '\0' || port < 0) {
+               __pmNotifyErr(LOG_WARNING,
+                             "__pmProbeDiscoverServices: ignored bad PMCD_PORT 
= '%s'\n",
+                             env);
+               port = SERVER_PORT;
+           }
+       }
+       else
+           port = SERVER_PORT;
+    }
+    else {
+       port = strtol(service, &end, 0);
+       if (*end != '\0') {
+           __pmNotifyErr(LOG_ERR, "__pmProbeDiscoverServices: service '%s; is 
not valid",
+                         service);
+           return 0;

It worries me a little we only have one service here, tends to hide
bugs that will mean only one will ever work - I'd love to see pmproxy
(PROXY_PORT) and pmwebd (PMWEBD_PORT) added into this mix ASAP to
make sure it always supports >1 service as well as >1 mechanism.

Going back a bit, the probe does a socket connect/close doesn't it?
So, all we'd need here is some other strcmp/port mappings for those
other two - is that right?

(similarly, the Avahi extension would be relatively straight forward,
no?  I think we should just go right ahead and do it, unless there's
something big & onerous I'm missing there?)

+    }
+    __pmSockAddrSetPort(netAddress, port);
+
+    /*
+     * We have a network address, a subnet and a port. Iterate over the 
addresses in the subnet,

(long comment line - generally 80 cols is used as defacto std)

+     * Since the connection attempts may be on separate threads,
+     * we need a counter to be incremented by each thread as they end so that
+     * we know when all of the threads have ended.
+     * IPv6 has 128 bits, so it is possible that the number of addresses could
+     * exceed the range of the native integer types or of a semaphore. However,
+     * a __pmSockAddr can be used for this purpose, since it can represent all
+     * of the addresses to be probed. Use the subnet iteration API to 
initialize
+     * and increment this counter. When the last thread has been processed, 
this
+     * pointer will be freed by the API and become NULL.

Cunning.

+    addressesProcessed = __pmSockAddrFirstSubnetAddr(netAddress, maskBits);
+
+    /* Dispatch the connection attempts. */
+    prevNumUrls = numUrls;
+    for (address = __pmSockAddrFirstSubnetAddr(netAddress, maskBits);
+        address != NULL;
+        address = __pmSockAddrNextSubnetAddr(address, maskBits)) {
+       dispatchConnection (service, address, port, &numUrls, urls);

(extra whitespace, and "address" use here looks questionable wrt leaking
as mentioned above)

+/*
+ * Parse the mechanism string for options. The first option will be of the form
+ *
+ *   probe=<net-address>/<maskSize>
+ *
+ * Subesquent options, if any, will be separated by commas. Currently 
supported:

typo: subesquent -> subsequent.

+ *   maxThreads=<integer>  -- specifies a hard limit on the number of active 
threads.
+ */
+ static int
+ parseOptions(const char *mechanism)

Hmm - these options are specific to the individual discovery mechanisms
aren't they?  Shouldn't we be calling out to discovery-specific parsing
mechanisms here?

+    /* Parse any remaining options. */
+    option = end;
+    if (*option == '\0')
+       return 0; /* no options */
+
+    sts = 0;

+       /* Examine the option. */
+       if (strncmp(option, "maxThreads=", 11) == 0) {

just "threads"?

+           option += 11;

(change that if so -> either way, sizeof("maxThreads") will be better than
simply 11)

+               option = end;
+               /*
+                * Make sure the value is positive. Large values are ok. They 
have the
+                * effect of "no fixed limit". However, there is an actual 
limit to be
+                * observed. sem_init(3) says that it is SEM_VALUE_MAX. 
However, on f19,
+                * where SEM_VALUE_MAX is 0xffffffff, values higher than 
0x7fffffff cause

*cough* - sounds like a glibc bug?

+                * the semaphore to block on the first sem_wait.
+                */
+               if (longVal > 0x7fffffff) {

How about using something like "max(SEM_VALUE_MAX, 0x7fffffff)" here,
to be sure to be sure?

Either way, those numbers are insanely high - maybe just set a crazy but
smaller thread count limit anyway - like 1024?  (ie who would use *that*
many threads, anyway!?)

+int
+__pmProbeDiscoverServices(const char *service, const char *mechanism, int 
numUrls, char ***urls)
+{
+    int        sts;
+
+    /* Interpret the mechanism string. */
+    sts = parseOptions(mechanism);
+    if (sts != 0)
+       return 0;
+
+    /* Everything checks out. Now do the actual probing. */
+    numUrls = probeForServices (service, netAddress, maskBits, numUrls, urls);

(whitespace oddity)

@@ -2252,3 +2252,80 @@ __pmSockAddrToString(__pmSockAddr *addr)
     }
     return buf;
 }
+
+__pmSockAddr *
+__pmSockAddrFirstSubnetAddr(const __pmSockAddr *netAddr, int maskBits)
+{
+    __pmSockAddr       *addr;
+    
+    /* Make a copy of the net address for iteration purposes. */
+    addr = __pmSockAddrDup(netAddr);

(this allocation ultimately seems to lead to one or two potential
memory leaks down the track ... but perhaps I'm missing something
subtle there)

diff --git a/src/pmfind/pmfind.c b/src/pmfind/pmfind.c
index 0c55709..5e59c50 100644
--- a/src/pmfind/pmfind.c
+++ b/src/pmfind/pmfind.c
@@ -22,7 +22,7 @@ static char   *mechanism;
 static pmLongOptions longopts[] = {
     PMAPI_OPTIONS_HEADER("Discovery options"),
     PMOPT_DEBUG,
-    { "mechanism", 1, 'm', "NAME", "set the discovery method to use 
[avahi|...|all]" },
+    { "mechanism", 1, 'm', "NAME", "set the discovery method to use 
[avahi|probe=<subnet>|all]" },
     { "service", 1, 's', "NAME", "discover local services [pmcd|...]" },
     PMAPI_OPTIONS_HEADER("Reporting options"),
     { "quiet", 0, 'q', 0, "quiet mode, do not write to stdout" },


This is a very neat feature - thanks Dave!  (and Frank for the original
suggestion, of course!)

cheers.

--
Nathan

<Prev in Thread] Current Thread [Next in Thread>