Hi Dave,
----- Original Message -----
> Hi,
>
> The commits below have been pushed to my brolley/dev branch in the
> pcpfans repository. They collectively represent an implementation of an
> active probing mode for __pmDiscoverServices() / pmfind.
> ...
> I am soliciting review of the code and the user interface. All comments
> and suggestions are welcome.
>
Looking good!
diff --git a/configure.in b/configure.in
index 7248293..0a32cfd 100644
--- a/configure.in
+++ b/configure.in
@@ -1559,6 +1559,12 @@ __thread int x;], [],
AC_TRY_COMPILE([#include <pthread.h>], [pthread_barrier_t mybarrier;],
AC_DEFINE(HAVE_PTHREAD_BARRIER_T) AC_MSG_RESULT(yes),
AC_MSG_RESULT(no))
+
+ dnl Check if sem_t is defined in semephore.h
+ AC_MSG_CHECKING([for sem_t in semephore.h])
typo: semephore.h -> semaphore.h
+ AC_TRY_COMPILE([#include <semaphore.h>], [sem_t mysem;],
+ AC_DEFINE(HAVE_SEM_T) AC_MSG_RESULT(yes),
+ AC_MSG_RESULT(no))
fi
fi
diff --git a/src/include/pcp/impl.h b/src/include/pcp/impl.h
index edffa31..727390a 100644
--- a/src/include/pcp/impl.h
+++ b/src/include/pcp/impl.h
@@ -628,9 +628,11 @@ extern int __pmSockAddrIsLoopBack(const __pmSockAddr
*);
extern int __pmSockAddrIsInet(const __pmSockAddr *);
extern int __pmSockAddrIsIPv6(const __pmSockAddr *);
extern int __pmSockAddrIsUnix(const __pmSockAddr *);
-extern char * __pmSockAddrToString(__pmSockAddr *);
+extern char * __pmSockAddrToString(const __pmSockAddr *);
extern __pmSockAddr *__pmStringToSockAddr(const char *);
extern __pmSockAddr *__pmLoopBackAddress(int);
+extern __pmSockAddr *__pmSockAddrFirstSubnetAddr(const __pmSockAddr *, int);
+extern __pmSockAddr *__pmSockAddrNextSubnetAddr(__pmSockAddr *, int);
Hmm, do these really need to be exposed outside libpcp? (on review, it
would seem not - and theres no exports file change, so they definitely
should not be exposed up here in impl.h, right?)
diff --git a/src/libpcp/src/discovery.c b/src/libpcp/src/discovery.c
index d620311..094c91f 100644
--- a/src/libpcp/src/discovery.c
+++ b/src/libpcp/src/discovery.c
@@ -108,7 +118,7 @@ __pmAddDiscoveredService(__pmServiceInfo *info, int
numUrls, char ***urls)
__pmNotifyErr(LOG_ERR,
"__pmAddDiscoveredService: Unsupported service: '%s'",
info->spec);
- return EOPNOTSUPP;
+ return -EOPNOTSUPP;
}
/*
Whoops. QA test time? (we must have no/few error path tests, I guess?
would this be difficult to add? I'm guessing not - via "pmfind -s"?)
diff --git a/src/libpcp/src/probe.c b/src/libpcp/src/probe.c
new file mode 100644
index 0000000..8147b2e
--- /dev/null
+++ b/src/libpcp/src/probe.c
+[...]
+static void
+attemptComplete (void)
(space oddity - several in this file)
+
+/* Attempt a connection on the given address and port. Return 0, if
successful. */
+static void *
+attemptConnection (void *arg)
+{
+ int s;
+ int flags;
+ int sts;
+ struct timeval canwait = { 1, 000000 };
Hard-coding - use TIMEOUT_DEFAULT? &| __pmDefaultRequestTimeout or some
other (new) env var customisable timeout, perhaps?
+ /*
+ * Create a socket. There may be a limit on open fds. If we get EAGAIN,
then
+ * wait 0.1 seconds and try again. We must have a limit in case something
goes wrong.
+ * Make it 5 seconds (50 * 100,000 usecs).
+ */
+ for (attempt = 0; attempt < 50; ++attempt) {
+ if (__pmSockAddrIsInet(context->address))
+ s = __pmCreateSocket();
+ else /* address family already checked */
+ s = __pmCreateIPv6Socket();
+ if (s != -EAGAIN)
+ break;
+ usleep(100000);
(hard-code ... more comments on this shortly, below)
+ /* Attempt to connect. If flags comaes back as less than zero, then the
type: comaes -> comes
+
+/* Dispatch a connection attempt, on its own thread, if supported. */
+static void
+dispatchConnection (
(space oddity)
+ const char *service,
+ const __pmSockAddr *address,
+ int port,
+ int *numUrls,
+ char ***urls
+)
+{
+#if IS_MULTI_THREAD
+ pthread_t thread;
+#endif
+ connectionContext *context;
+ int rc;
"sts" please, you're hacking PCP this week. ;)
+ int attempt;
+
+ /* We need a separate connection context for each potential thread. */
+ context = malloc(sizeof(*context));
+ if (context == NULL) {
+ __pmNoMem("__pmProbeDiscoverServices: unable to allocate connection
context",
+ sizeof(*context), PM_FATAL_ERR);
+ }
+ context->service = service;
+ context->port = port;
+ context->numUrls = numUrls;
+ context->urls = urls;
+ context->address = __pmSockAddrDup(address);
This seems to be a second Dup, and we leak address, if I'm not mistaken?
(when allocated from __pmSockAddrFirstSubnetAddr -> probeForServices?)
+ if (context->address == NULL) {
+ char *addrString = __pmSockAddrToString(address);
+ __pmNotifyErr(LOG_ERR, "__pmProbeDiscoverServices: unable to copy
socket address %s",
+ addrString);
+ free(addrString);
+ attemptComplete();
+ return;
+ }
+
+ /*
+ * Attempt the connection. Since we're not passing in attributes for the
(possible)
+ * new thread, the only error that can occur is EAGAIN. Sleep for 0.1
seconds
+ * before trying again. We must have a limit in case something goes wrong.
Make it
+ * 5 seconds (50 * 100,000 usecs).
+ *
+ * Respect the requested maximum number of threads.
+ */
+ SEM_WAIT(&threadsAvailable);
+ for (attempt = 0; attempt < 50; ++attempt) {
Hard-coded limits.
+ /* Attempt the connection on a new thread. */
+ rc = THREAD_START(&thread, &attr, attemptConnection, context);
+ if (rc != EAGAIN)
+ break;
+
+ /* Wait before trying again. */
+ usleep(100000);
+ }
Hmm, more hard-coding. usleep is not portable - go for pmtimevalSleep perhaps?
The above hard-coding could be broken via env var / setting for discovery
timeout, and then split that time up into chunks (e.g. divide by 100000usec,
if thats a good time)
+ else if (pmDebug & DBG_TRACE_DISCOVERY) {
+ if (attempt > 0) {
+ __pmNotifyErr(LOG_INFO, "Waited for %d attempts to create a
thread\n",
+ attempt);
+ }
(loving the diagnostics!)
+}
+
+static int
+probeForServices (
(space oddity)
+ /* The service is either a service name (e.g. pmcd) or a port number. */
+ if (strcmp(service, "pmcd") == 0) {
+ if ((env = getenv("PMCD_PORT")) != NULL) {
+ port = strtol(env, &end, 0);
+ if (*end != '\0' || port < 0) {
+ __pmNotifyErr(LOG_WARNING,
+ "__pmProbeDiscoverServices: ignored bad PMCD_PORT
= '%s'\n",
+ env);
+ port = SERVER_PORT;
+ }
+ }
+ else
+ port = SERVER_PORT;
+ }
+ else {
+ port = strtol(service, &end, 0);
+ if (*end != '\0') {
+ __pmNotifyErr(LOG_ERR, "__pmProbeDiscoverServices: service '%s; is
not valid",
+ service);
+ return 0;
It worries me a little we only have one service here, tends to hide
bugs that will mean only one will ever work - I'd love to see pmproxy
(PROXY_PORT) and pmwebd (PMWEBD_PORT) added into this mix ASAP to
make sure it always supports >1 service as well as >1 mechanism.
Going back a bit, the probe does a socket connect/close doesn't it?
So, all we'd need here is some other strcmp/port mappings for those
other two - is that right?
(similarly, the Avahi extension would be relatively straight forward,
no? I think we should just go right ahead and do it, unless there's
something big & onerous I'm missing there?)
+ }
+ __pmSockAddrSetPort(netAddress, port);
+
+ /*
+ * We have a network address, a subnet and a port. Iterate over the
addresses in the subnet,
(long comment line - generally 80 cols is used as defacto std)
+ * Since the connection attempts may be on separate threads,
+ * we need a counter to be incremented by each thread as they end so that
+ * we know when all of the threads have ended.
+ * IPv6 has 128 bits, so it is possible that the number of addresses could
+ * exceed the range of the native integer types or of a semaphore. However,
+ * a __pmSockAddr can be used for this purpose, since it can represent all
+ * of the addresses to be probed. Use the subnet iteration API to
initialize
+ * and increment this counter. When the last thread has been processed,
this
+ * pointer will be freed by the API and become NULL.
Cunning.
+ addressesProcessed = __pmSockAddrFirstSubnetAddr(netAddress, maskBits);
+
+ /* Dispatch the connection attempts. */
+ prevNumUrls = numUrls;
+ for (address = __pmSockAddrFirstSubnetAddr(netAddress, maskBits);
+ address != NULL;
+ address = __pmSockAddrNextSubnetAddr(address, maskBits)) {
+ dispatchConnection (service, address, port, &numUrls, urls);
(extra whitespace, and "address" use here looks questionable wrt leaking
as mentioned above)
+/*
+ * Parse the mechanism string for options. The first option will be of the form
+ *
+ * probe=<net-address>/<maskSize>
+ *
+ * Subesquent options, if any, will be separated by commas. Currently
supported:
typo: subesquent -> subsequent.
+ * maxThreads=<integer> -- specifies a hard limit on the number of active
threads.
+ */
+ static int
+ parseOptions(const char *mechanism)
Hmm - these options are specific to the individual discovery mechanisms
aren't they? Shouldn't we be calling out to discovery-specific parsing
mechanisms here?
+ /* Parse any remaining options. */
+ option = end;
+ if (*option == '\0')
+ return 0; /* no options */
+
+ sts = 0;
+ /* Examine the option. */
+ if (strncmp(option, "maxThreads=", 11) == 0) {
just "threads"?
+ option += 11;
(change that if so -> either way, sizeof("maxThreads") will be better than
simply 11)
+ option = end;
+ /*
+ * Make sure the value is positive. Large values are ok. They
have the
+ * effect of "no fixed limit". However, there is an actual
limit to be
+ * observed. sem_init(3) says that it is SEM_VALUE_MAX.
However, on f19,
+ * where SEM_VALUE_MAX is 0xffffffff, values higher than
0x7fffffff cause
*cough* - sounds like a glibc bug?
+ * the semaphore to block on the first sem_wait.
+ */
+ if (longVal > 0x7fffffff) {
How about using something like "max(SEM_VALUE_MAX, 0x7fffffff)" here,
to be sure to be sure?
Either way, those numbers are insanely high - maybe just set a crazy but
smaller thread count limit anyway - like 1024? (ie who would use *that*
many threads, anyway!?)
+int
+__pmProbeDiscoverServices(const char *service, const char *mechanism, int
numUrls, char ***urls)
+{
+ int sts;
+
+ /* Interpret the mechanism string. */
+ sts = parseOptions(mechanism);
+ if (sts != 0)
+ return 0;
+
+ /* Everything checks out. Now do the actual probing. */
+ numUrls = probeForServices (service, netAddress, maskBits, numUrls, urls);
(whitespace oddity)
@@ -2252,3 +2252,80 @@ __pmSockAddrToString(__pmSockAddr *addr)
}
return buf;
}
+
+__pmSockAddr *
+__pmSockAddrFirstSubnetAddr(const __pmSockAddr *netAddr, int maskBits)
+{
+ __pmSockAddr *addr;
+
+ /* Make a copy of the net address for iteration purposes. */
+ addr = __pmSockAddrDup(netAddr);
(this allocation ultimately seems to lead to one or two potential
memory leaks down the track ... but perhaps I'm missing something
subtle there)
diff --git a/src/pmfind/pmfind.c b/src/pmfind/pmfind.c
index 0c55709..5e59c50 100644
--- a/src/pmfind/pmfind.c
+++ b/src/pmfind/pmfind.c
@@ -22,7 +22,7 @@ static char *mechanism;
static pmLongOptions longopts[] = {
PMAPI_OPTIONS_HEADER("Discovery options"),
PMOPT_DEBUG,
- { "mechanism", 1, 'm', "NAME", "set the discovery method to use
[avahi|...|all]" },
+ { "mechanism", 1, 'm', "NAME", "set the discovery method to use
[avahi|probe=<subnet>|all]" },
{ "service", 1, 's', "NAME", "discover local services [pmcd|...]" },
PMAPI_OPTIONS_HEADER("Reporting options"),
{ "quiet", 0, 'q', 0, "quiet mode, do not write to stdout" },
This is a very neat feature - thanks Dave! (and Frank for the original
suggestion, of course!)
cheers.
--
Nathan
|