On 05/19/2014 04:16 AM, Nathan Scott wrote:
diff --git a/configure.in b/configure.in
index 7248293..0a32cfd 100644
--- a/configure.in
+++ b/configure.in
@@ -1559,6 +1559,12 @@ __thread int x;], [],
AC_TRY_COMPILE([#include <pthread.h>], [pthread_barrier_t mybarrier;],
AC_DEFINE(HAVE_PTHREAD_BARRIER_T) AC_MSG_RESULT(yes),
AC_MSG_RESULT(no))
+
+ dnl Check if sem_t is defined in semephore.h
+ AC_MSG_CHECKING([for sem_t in semephore.h])
typo: semephore.h -> semaphore.h
Fixed.
diff --git a/src/include/pcp/impl.h b/src/include/pcp/impl.h
index edffa31..727390a 100644
--- a/src/include/pcp/impl.h
+++ b/src/include/pcp/impl.h
@@ -628,9 +628,11 @@ extern int __pmSockAddrIsLoopBack(const __pmSockAddr
*);
extern int __pmSockAddrIsInet(const __pmSockAddr *);
extern int __pmSockAddrIsIPv6(const __pmSockAddr *);
extern int __pmSockAddrIsUnix(const __pmSockAddr *);
-extern char * __pmSockAddrToString(__pmSockAddr *);
+extern char * __pmSockAddrToString(const __pmSockAddr *);
extern __pmSockAddr *__pmStringToSockAddr(const char *);
extern __pmSockAddr *__pmLoopBackAddress(int);
+extern __pmSockAddr *__pmSockAddrFirstSubnetAddr(const __pmSockAddr *, int);
+extern __pmSockAddr *__pmSockAddrNextSubnetAddr(__pmSockAddr *, int);
Hmm, do these really need to be exposed outside libpcp? (on review, it
would seem not - and theres no exports file change, so they definitely
should not be exposed up here in impl.h, right?)
This function was already here and exposed. I just changed the parameter
to be 'const'. That's why there is no exports file change (it's already
there). It may not be used externally yet, but I think it's only a
matter of time, since it is, IMO, a generally useful function. If we
remove it now, I believe that we will only end up re-adding it in the
future.
diff --git a/src/libpcp/src/discovery.c b/src/libpcp/src/discovery.c
index d620311..094c91f 100644
--- a/src/libpcp/src/discovery.c
+++ b/src/libpcp/src/discovery.c
@@ -108,7 +118,7 @@ __pmAddDiscoveredService(__pmServiceInfo *info, int
numUrls, char ***urls)
__pmNotifyErr(LOG_ERR,
"__pmAddDiscoveredService: Unsupported service: '%s'",
info->spec);
- return EOPNOTSUPP;
+ return -EOPNOTSUPP;
}
/*
Whoops. QA test time? (we must have no/few error path tests, I guess?
would this be difficult to add? I'm guessing not - via "pmfind -s"?)
There is qa coverage for this in test 724. What happened here is that
the API (internal) changed so that the new number of urls is returned on
success (was 0), so we could no longer return a positive error code on
failure (probably should not been anyway). However this code gets
removed by your change to enable additional services, since the protocol
now gets passed in.
diff --git a/src/libpcp/src/probe.c b/src/libpcp/src/probe.c
new file mode 100644
index 0000000..8147b2e
--- /dev/null
+++ b/src/libpcp/src/probe.c
+[...]
+static void
+attemptComplete (void)
(space oddity - several in this file)
+
+/* Attempt a connection on the given address and port. Return 0, if
successful. */
+static void *
+attemptConnection (void *arg)
+{
+ int s;
+ int flags;
+ int sts;
+ struct timeval canwait = { 1, 000000 };
Hard-coding - use TIMEOUT_DEFAULT? &| __pmDefaultRequestTimeout or some
other (new) env var customisable timeout, perhaps?
Yeah. I was experimenting with different timeouts for probing large
subnets. I've changed it to use __pmConnectTimeout() for now.
+ /* Attempt to connect. If flags comaes back as less than zero, then the
type: comaes -> comes
Fixed.
+
+/* Dispatch a connection attempt, on its own thread, if supported. */
+static void
+dispatchConnection (
(space oddity)
Fixed.
+ const char *service,
+ const __pmSockAddr *address,
+ int port,
+ int *numUrls,
+ char ***urls
+)
+{
+#if IS_MULTI_THREAD
+ pthread_t thread;
+#endif
+ connectionContext *context;
+ int rc;
"sts" please, you're hacking PCP this week. ;)
Done.
+ int attempt;
+
+ /* We need a separate connection context for each potential thread. */
+ context = malloc(sizeof(*context));
+ if (context == NULL) {
+ __pmNoMem("__pmProbeDiscoverServices: unable to allocate connection
context",
+ sizeof(*context), PM_FATAL_ERR);
+ }
+ context->service = service;
+ context->port = port;
+ context->numUrls = numUrls;
+ context->urls = urls;
+ context->address = __pmSockAddrDup(address);
This seems to be a second Dup, and we leak address, if I'm not mistaken?
(when allocated from __pmSockAddrFirstSubnetAddr -> probeForServices?)
No leak here. Each thread needs its own copy of the address it is
probing and frees it when the thread ends. The subnet iteration API
reuses the same __pmSockAddr and frees it when the iteration reaches the
end.
+ if (context->address == NULL) {
+ char *addrString = __pmSockAddrToString(address);
+ __pmNotifyErr(LOG_ERR, "__pmProbeDiscoverServices: unable to copy socket
address %s",
+ addrString);
+ free(addrString);
+ attemptComplete();
+ return;
+ }
+
+ /*
+ * Attempt the connection. Since we're not passing in attributes for the
(possible)
+ * new thread, the only error that can occur is EAGAIN. Sleep for 0.1
seconds
+ * before trying again. We must have a limit in case something goes wrong.
Make it
+ * 5 seconds (50 * 100,000 usecs).
+ *
+ * Respect the requested maximum number of threads.
+ */
+ SEM_WAIT(&threadsAvailable);
+ for (attempt = 0; attempt < 50; ++attempt) {
Hard-coded limits.
+ /* Attempt the connection on a new thread. */
+ rc = THREAD_START(&thread, &attr, attemptConnection, context);
+ if (rc != EAGAIN)
+ break;
+
+ /* Wait before trying again. */
+ usleep(100000);
+ }
Hmm, more hard-coding. usleep is not portable - go for pmtimevalSleep perhaps?
The above hard-coding could be broken via env var / setting for discovery
timeout, and then split that time up into chunks (e.g. divide by 100000usec,
if thats a good time)
Understood regarding usleep. The code now uses __pmtimevalSleep().
Regarding reconfigurability, This is really just a control against
infinite looping, in case something goes horribly wrong. I'm not so sure
that this is something the user needs/wants to be able to configure.
This particular timeout could be made obsolete by the proposed overall
timeout or by Frank's proposed alternative algorithm.
+}
+
+static int
+probeForServices (
(space oddity)
Fixed.
+ /* The service is either a service name (e.g. pmcd) or a port number. */
+ if (strcmp(service, "pmcd") == 0) {
+ if ((env = getenv("PMCD_PORT")) != NULL) {
+ port = strtol(env, &end, 0);
+ if (*end != '\0' || port < 0) {
+ __pmNotifyErr(LOG_WARNING,
+ "__pmProbeDiscoverServices: ignored bad PMCD_PORT =
'%s'\n",
+ env);
+ port = SERVER_PORT;
+ }
+ }
+ else
+ port = SERVER_PORT;
+ }
+ else {
+ port = strtol(service, &end, 0);
+ if (*end != '\0') {
+ __pmNotifyErr(LOG_ERR, "__pmProbeDiscoverServices: service '%s; is not
valid",
+ service);
+ return 0;
It worries me a little we only have one service here, tends to hide
bugs that will mean only one will ever work - I'd love to see pmproxy
(PROXY_PORT) and pmwebd (PMWEBD_PORT) added into this mix ASAP to
make sure it always supports >1 service as well as >1 mechanism.
Going back a bit, the probe does a socket connect/close doesn't it?
So, all we'd need here is some other strcmp/port mappings for those
other two - is that right?
(similarly, the Avahi extension would be relatively straight forward,
no? I think we should just go right ahead and do it, unless there's
something big & onerous I'm missing there?)
Thanks for doing this part.
+ }
+ __pmSockAddrSetPort(netAddress, port);
+
+ /*
+ * We have a network address, a subnet and a port. Iterate over the
addresses in the subnet,
(long comment line - generally 80 cols is used as defacto std)
Fixed.
+ addressesProcessed = __pmSockAddrFirstSubnetAddr(netAddress, maskBits);
+
+ /* Dispatch the connection attempts. */
+ prevNumUrls = numUrls;
+ for (address = __pmSockAddrFirstSubnetAddr(netAddress, maskBits);
+ address != NULL;
+ address = __pmSockAddrNextSubnetAddr(address, maskBits)) {
+ dispatchConnection (service, address, port, &numUrls, urls);
(extra whitespace, and "address" use here looks questionable wrt leaking
as mentioned above)
Whitespace fixed. No leak here. See the existing comment above this loop.
+/*
+ * Parse the mechanism string for options. The first option will be of the form
+ *
+ * probe=<net-address>/<maskSize>
+ *
+ * Subesquent options, if any, will be separated by commas. Currently
supported:
typo: subesquent -> subsequent.
+ * maxThreads=<integer> -- specifies a hard limit on the number of active
threads.
+ */
+ static int
+ parseOptions(const char *mechanism)
Hmm - these options are specific to the individual discovery mechanisms
aren't they? Shouldn't we be calling out to discovery-specific parsing
mechanisms here?
This entire source file is specific to the probing mechanism.
+ /* Parse any remaining options. */
+ option = end;
+ if (*option == '\0')
+ return 0; /* no options */
+
+ sts = 0;
+ /* Examine the option. */
+ if (strncmp(option, "maxThreads=", 11) == 0) {
just "threads"?
It really is a maximum. Fewer threads may end up being used if the
subnet is small enough or if the threads finish quickly enough.
+ option += 11;
(change that if so -> either way, sizeof("maxThreads") will be better than
simply 11)
Fixed.
+ option = end;
+ /*
+ * Make sure the value is positive. Large values are ok. They
have the
+ * effect of "no fixed limit". However, there is an actual
limit to be
+ * observed. sem_init(3) says that it is SEM_VALUE_MAX.
However, on f19,
+ * where SEM_VALUE_MAX is 0xffffffff, values higher than
0x7fffffff cause
*cough* - sounds like a glibc bug?
Yeah -- I'll put together a test case and submit a report.
+ * the semaphore to block on the first sem_wait.
+ */
+ if (longVal > 0x7fffffff) {
How about using something like "max(SEM_VALUE_MAX, 0x7fffffff)" here,
to be sure to be sure?
Either way, those numbers are insanely high - maybe just set a crazy but
smaller thread count limit anyway - like 1024? (ie who would use *that*
many threads, anyway!?)
Yeah, the actual limit will end up being FD_SETSIZE (which is 1024 on my
system), since that's the max number of open fds we can manage. I've
fixed this code to set the limit to max(SEM_VALUE_MAX, FD_SET_SIZE).
+int
+__pmProbeDiscoverServices(const char *service, const char *mechanism, int
numUrls, char ***urls)
+{
+ int sts;
+
+ /* Interpret the mechanism string. */
+ sts = parseOptions(mechanism);
+ if (sts != 0)
+ return 0;
+
+ /* Everything checks out. Now do the actual probing. */
+ numUrls = probeForServices (service, netAddress, maskBits, numUrls, urls);
(whitespace oddity)
Fixed.
@@ -2252,3 +2252,80 @@ __pmSockAddrToString(__pmSockAddr *addr)
}
return buf;
}
+
+__pmSockAddr *
+__pmSockAddrFirstSubnetAddr(const __pmSockAddr *netAddr, int maskBits)
+{
+ __pmSockAddr *addr;
+
+ /* Make a copy of the net address for iteration purposes. */
+ addr = __pmSockAddrDup(netAddr);
(this allocation ultimately seems to lead to one or two potential
memory leaks down the track ... but perhaps I'm missing something
subtle there)
No memory leak here -- as discussed in earlier comments.
Thanks for the review! Let me know what you think about the items I've
left open.
Dave
|