diff --git a/src/include/pcp/pmda.h b/src/include/pcp/pmda.h index 20968f1..654269b 100644 --- a/src/include/pcp/pmda.h +++ b/src/include/pcp/pmda.h @@ -592,6 +592,9 @@ extern pmnsNode * pmdaNodeLookup(pmnsNode *, const char *); * * pmdaCachePurge * cull inactive entries + * + * pmdaCacheResize + * set the maximum instance identifier */ extern int pmdaCacheStore(pmInDom, int, const char *, void *); extern int pmdaCacheStoreKey(pmInDom, int, const char *, int, const void *, void *); @@ -600,6 +603,7 @@ extern int pmdaCacheLookupName(pmInDom, const char *, int *, void **); extern int pmdaCacheLookupKey(pmInDom, const char *, int, const void *, char **, int *, void **); extern int pmdaCacheOp(pmInDom, int); extern int pmdaCachePurge(pmInDom, time_t); +extern int pmdaCacheResize(pmInDom, int); #define PMDA_CACHE_LOAD 1 #define PMDA_CACHE_ADD 2 diff --git a/src/libpcp_pmda/src/cache.c b/src/libpcp_pmda/src/cache.c index dd74b3f..37feda6 100644 --- a/src/libpcp_pmda/src/cache.c +++ b/src/libpcp_pmda/src/cache.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013 Red Hat. + * Copyright (c) 2013,2015 Red Hat. * Copyright (c) 2005 Silicon Graphics, Inc. All Rights Reserved. * * This library is free software; you can redistribute it and/or modify it @@ -39,7 +39,9 @@ typedef struct entry { time_t stamp; } entry_t; -#define VERSION 1 /* version of external file format */ +#define CACHE_VERSION1 1 +#define CACHE_VERSION2 2 +#define CACHE_VERSION CACHE_VERSION2 /* version of external file format */ #define MAX_HASH_TRY 10 /* @@ -59,8 +61,11 @@ typedef struct hdr { int ins_mode; /* see insert_cache() */ int hstate; /* dirty/clean/string state */ int keyhash_cnt[MAX_HASH_TRY]; + int maxinst; /* maximum inst */ } hdr_t; +#define DEFAULT_MAXINST 0x7fffffff + /* bitfields for hstate */ #define DIRTY_INSTANCE 0x1 #define DIRTY_STAMP 0x2 @@ -189,6 +194,7 @@ find_cache(pmInDom indom, int *sts) h->hstate = 0; for (i = 0; i < MAX_HASH_TRY; i++) h->keyhash_cnt[i] = 0; + h->maxinst = DEFAULT_MAXINST; return h; } @@ -617,7 +623,7 @@ insert_cache(hdr_t *h, const char *name, int inst, int *sts) if (last_e == NULL) inst = 0; else { - if (last_e->inst == 0x7fffffff) { + if (last_e->inst == h->maxinst) { /* * overflowed inst identifier, need to shift to * ins_mode == 1 @@ -635,7 +641,7 @@ retry: for (e = h->first; e != NULL; e = e->next) { if (inst < e->inst) break; - if (inst == 0x7fffffff) { + if (inst == h->maxinst) { /* * 2^32-1 is the maximum number of instances we can have */ @@ -742,10 +748,34 @@ load_cache(hdr_t *h) fclose(fp); return PM_ERR_GENERIC; } - s = sscanf(buf, "%d %d", &x, &h->ins_mode); - if (s != 2 || x != 1 || h->ins_mode < 0 || h->ins_mode > 1) { + /* First grab the file version. */ + s = sscanf(buf, "%d ", &x); + if (s != 1 || x <= 0 || x > CACHE_VERSION) { __pmNotifyErr(LOG_ERR, - "pmdaCacheOp: %s: illegal first record: %s", + "pmdaCacheOp: %s: illegal cache header record: %s", + filename, buf); + fclose(fp); + return PM_ERR_GENERIC; + } + + /* Based on the file version, grab the entire line. */ + switch (x) { + case CACHE_VERSION1: + h->maxinst = DEFAULT_MAXINST; + s = sscanf(buf, "%d %d", &x, &h->ins_mode); + if (s != 2) + s = 0; + break; + default: + s = sscanf(buf, "%d %d %d", &x, &h->ins_mode, &h->maxinst); + if (s != 3) + s = 0; + break; + } + if (s == 0 || h->ins_mode < 0 || h->ins_mode > 1 || h->maxinst < 0 + || h->maxinst > DEFAULT_MAXINST) { + __pmNotifyErr(LOG_ERR, + "pmdaCacheOp: %s: illegal cache header record: %s", filename, buf); fclose(fp); return PM_ERR_GENERIC; @@ -873,7 +903,7 @@ save_cache(hdr_t *h, int hstate) vdp, sep, sep, sep, pmInDomStr_r(h->indom, strbuf, sizeof(strbuf))); if ((fp = fopen(filename, "w")) == NULL) return -oserror(); - fprintf(fp, "%d %d\n", VERSION, h->ins_mode); + fprintf(fp, "%d %d %d\n", CACHE_VERSION, h->ins_mode, h->maxinst); now = time(NULL); cnt = 0; @@ -1411,6 +1441,35 @@ int pmdaCachePurge(pmInDom indom, time_t recent) return cnt; } +int pmdaCacheResize(pmInDom indom, int maximum) +{ + hdr_t *h; + int sts; + entry_t *e; + + if (indom == PM_INDOM_NULL) + return PM_ERR_INDOM; + + if ((h = find_cache(indom, &sts)) == NULL) + return sts; + + if (maximum < 0) + return PM_ERR_SIGN; + + /* Find the largest inst in the queue. */ + for (e = h->first; e != NULL; e = e->next) { + /* If the new maximum is smaller than an existing inst, error. */ + if (maximum < e->inst) + return PM_ERR_TOOBIG; + } + + // FIXME 2: Does this maximum value need to be saved/restored when + // the cache itself is saved/restored? Currently only a few things + // from the header are stored. + h->maxinst = maximum; + return 0; +} + /* -------------------------------------------------------------------- lookup2.c, by Bob Jenkins, December 1996, Public Domain. diff --git a/src/libpcp_pmda/src/exports b/src/libpcp_pmda/src/exports index 323d8f6..f5fc5c2 100644 --- a/src/libpcp_pmda/src/exports +++ b/src/libpcp_pmda/src/exports @@ -119,6 +119,8 @@ PCP_PMDA_3.4 { pmdaInterfaceMoved; pmdaNodeLookup; + pmdaCacheResize; + pmdaRootConnect; pmdaRootShutdown; pmdaRootContainerHostName; diff --git a/src/pmdas/json/pmdajson.python b/src/pmdas/json/pmdajson.python index a9b59a3..959fcff 100644 --- a/src/pmdas/json/pmdajson.python +++ b/src/pmdas/json/pmdajson.python @@ -9,7 +9,7 @@ from collections import OrderedDict from pcp.pmda import PMDA, pmdaMetric, pmdaIndom import cpmapi as c_api from pcp.pmapi import pmUnits, pmContext, pmErr -from ctypes import c_int, POINTER, cast +from ctypes import c_int import os, stat, pwd import re import traceback @@ -17,7 +17,8 @@ import subprocess import shlex MAX_CLUSTER = 0xfff # 12 bits, see pcp/impl.h -MAX_ITEM = 0x3ff # 10 bits, see pcp/impl.h +MAX_METRIC = 0x3ff # 10 bits, see pcp/impl.h +MAX_INDOM = 0x7fffffff NOBODY_UID = -1 NOBODY_GID = -1 @@ -33,12 +34,12 @@ def preexec(): class Metric(object): ''' Metric information class ''' __name_re = re.compile(r'^[a-zA-Z][\w_\.]+$') - def __init__(self, name_prefix, cluster, idx, pmda): + def __init__(self, name_prefix, cluster, pmda): self.__name = '' self.name_prefix = name_prefix self.full_name = '' self.cluster = cluster - self.idx = idx + self.idx = -1 self.__pmda = pmda self.desc = '' self.type = c_api.PM_TYPE_UNKNOWN @@ -46,7 +47,7 @@ class Metric(object): self.pointer = None self.pmid = None self.obj = None - self.indom = None + self.indom_cache = None self.index_pointer = None self.__units_val = pmUnits(0, 0, 0, 0, 0, 0) self.__units_str = '' @@ -109,8 +110,8 @@ class Metric(object): raise RuntimeError("Invalid metric") self.pmid = self.__pmda.pmid(self.cluster, self.idx) - if self.indom != None: - self.obj = pmdaMetric(self.pmid, self.type, self.indom.obj, + if self.indom_cache != None: + self.obj = pmdaMetric(self.pmid, self.type, self.indom_cache.indom, self.sem, self.__units_val) else: self.obj = pmdaMetric(self.pmid, self.type, c_api.PM_INDOM_NULL, @@ -119,45 +120,125 @@ class Metric(object): # Note that you can't delete individual metrics. The # pmda.{clear,reset}_metrics() functions clear out *all* metrics. -class Indom(object): - ''' Indom (instance domain) information class ''' - def __init__(self, idx, pmda): +class IndomCache(pmdaIndom): + ''' Indom (instance domain) cache information class ''' + def __init__(self, serial, max_value, pmda): self.__pmda = pmda - self.idx = idx - self.obj = pmda.indom(self.idx) - self.values = {} - self.__pmda.add_indom(pmdaIndom(self.obj, self.values)) + self.serial = serial + + # In IndomCache.add_value, we're using 'value' as the inst + # value. However, the pmdaCache routines treat the passed in + # value as the 'private' field and generates its own inst + # value. However, this 'private' field isn't saved and + # restored, so it isn't very useful for our purposes. + # + # To get around this, we'll use an OrderedDict so that the + # dictionary order should match up with the inst order. + # (Another way to fix this problem would be to go ahead and + # call pmdaCacheStore() in IndomCache.add_value(), but that + # fix would require more api calls.) + self.__values = OrderedDict() + # '__names_by_values' is the inversion of '__values'. + self.__names_by_values = {} + pmdaIndom.__init__(self, pmda.indom(self.serial), self.__values) + try: + self.__pmda.add_indom(self) + except KeyError: + # If we've seen this indom before, it will already be + # present in the pmda, so replace it. + self.__pmda.replace_indom(self, self.__values) + self.__maxval = max_value + self.cache_resize(max_value) + self.__nextval = 0 + + @property + def indom(self): + ''' Get cache's indom. ''' + return self.it_indom def log(self, string): ''' Log an informational message ''' return self.__pmda.log(string) - def add_value(self, name, value): + def add_value(self, name, value=c_api.PM_IN_NULL): ''' Add a value to the indom ''' # PMDA.replace_indom() wants a dictionary, indexed by # indom string value. PMDA.replace_indom() doesn't really # care what is stored at that string value. We're storing the - # array index there. - self.values[name] = c_int(value) + # instance there. + if value == c_api.PM_IN_NULL: + value = self.next_value() + if self.__pmda.debug: + self.log("Adding ('%s', %d) to the cache" % (name, value)) + self.__values[name] = c_int(value) + if value >= self.__nextval: + self.__nextval = value + 1 + self.__names_by_values[value] = name + + def lookup_name(self, name): + ''' + Lookup name in an indom cache and return its associated value. + ''' + if name not in self.__values: + raise KeyError(name) + valueobj = self.__values[name] + return valueobj.value - def lookup_inst(self, inst): - ''' Lookup an array index based on the instance ID ''' - voidp = self.__pmda.inst_lookup(self.obj, inst) - if voidp == None: - return None - valuep = cast(voidp, POINTER(c_int)) - return valuep.contents.value + def lookup_value(self, value): + ''' + Lookup a value in an indom cache and return its associated name. + ''' + # We could call an api function here (pmda.inst_lookup() which + # calls pmdaCacheLookup()), but we can handle this in python + # by using the inverted dictionary. + if value not in self.__names_by_values: + raise KeyError(value) + return self.__names_by_values[value] + + def refresh(self): + ''' Update and save the indom cache. ''' + self.__pmda.replace_indom(self, self.__values) + # Note that set_dict_instances() saves the cache to disk. + self.set_dict_instances(self.it_indom, self.__values) + + def load(self): + ''' Load indom cache values. ''' + if self.__pmda.debug: + self.log("Loading cache %d..." % self.serial) + pmdaIndom.load(self) + self.cache_active() + for (inst, name) in self: + self.add_value(name, inst) + + def next_value(self): + ''' Return next value to be allocated. ''' + if self.__nextval > self.__maxval: + raise ValueError("Indom cache reached max value.") + value = self.__nextval + self.__nextval += 1 + return value + + def len(self): + ''' Return cache size. ''' + return len(self.__values) class JsonSource(object): ''' JSON Source class. Contains all metrics and data needed by a single JSON source. ''' - def __init__(self, path, cluster, pmda, trusted): + def __init__(self, path, pmda, trusted): self.__path = path - self.cluster = cluster self.__pmda = pmda + # cluster, metric_cache_idx, and indom_cache_idx get filled in later. + self.__cluster = -1 + self.__metric_cache_idx = -1 + self.__indom_cache_idx = -1 + self.__metric_cache = None + self.__indom_cache = None + self.__array_indexes = {} + # Note that this is the default root name. It can be # overridden with the metadata 'prefix' attribute. self.__root_name = os.path.basename(path) @@ -184,14 +265,58 @@ class JsonSource(object): self.__json_data = {} self.__metrics = {} self.__metrics_by_name = {} - self.__metric_idx = 0 - self.__indoms = {} self.__lastfetch = 0 + # Here we need to load the metadata and preparse it, in case + # it changes the source name. + self.__load_json_metadata() + self.__preparse_metadata() + def log(self, string): ''' Log an informational message ''' return self.__pmda.log(string) + @property + def name(self): + ''' + Get JSON source name value. This defaults to the base name of + the directory where the JSON source was found, but can be + overridden by the metadata file. + ''' + return self.__root_name + + @property + def cluster(self): + ''' Returns the source's cluster id. ''' + return self.__cluster + + @cluster.setter + def cluster(self, cluster): + ''' Sets the source's cluster id. ''' + self.__cluster = cluster + # Note that the cache indexes are laid out like this: + # 0: cluster cache (cluster 0 is for the static metrics, + # cluster 1 is for the first JSON source) + # 1: metric cache for JSON source #1 + # 2: indom cache for JSON source #1 + # 3: metric cache for JSON source #2 + # 4: indom cache for JSON source #2 + # 5: metric cache for JSON source #3 + # 6: indom cache for JSON source #3 + # ... + + self.__metric_cache_idx = (self.__cluster * 2) - 1 + self.__indom_cache_idx = self.__metric_cache_idx + 1 + # Now that we know the cluster id, try to load the metric + # cache and indom cache. Note that if they aren't present, + # that's OK - this must be a new JSON source. + self.__metric_cache = IndomCache(self.__metric_cache_idx, + MAX_METRIC, self.__pmda) + self.__metric_cache.load() + self.__indom_cache = IndomCache(self.__indom_cache_idx, + MAX_INDOM, self.__pmda) + self.__indom_cache.load() + def __load_json_metadata(self): ''' Load the JSON metadata file for this JSON source. ''' self.__metadata = {} @@ -255,7 +380,8 @@ class JsonSource(object): % self.__data_exec) self.log("%s" % traceback.format_exc()) else: - self.log("Found data-path %s" % self.__data_path) + if self.__pmda.debug: + self.log("Found data-path %s" % self.__data_path) try: fobj = open(self.__data_path) except IOError: @@ -278,8 +404,8 @@ class JsonSource(object): ''' if self.__pmda.debug: self.log("Loading JSON source %s" % self.__root_name) - self.__load_json_metadata() - self.__preparse_metadata() + # Note that we're loading the metadata in the init function, + # so we can update the source name. self.__load_json_data() # If either loading the metadata or data failed, quit. @@ -314,25 +440,36 @@ class JsonSource(object): def __refresh_indoms(self): ''' Refresh the list of indoms. ''' # Notice we never delete indoms, we just keep adding. - for array_name in self.__indoms.keys(): - index = 0 + self.__array_indexes.clear() + for (dummy, metric_info) in self.__metrics.iteritems(): + # Skip non-arrays. + if metric_info.index_pointer == None: + continue + try: - metric_info = self.__metrics_by_name[array_name] metrics_array = metric_info.pointer.resolve(self.__json_data) - # Loop through all the array items, updating the indom - # list with the array's index pointer values. + # list with any new values. Also remember the array + # index where we found a particular indom, to make + # retrieval easy. + index = 0 for item in metrics_array: indom_value = metric_info.index_pointer.resolve(item) - self.__indoms[array_name].add_value(indom_value, index) + full_name = "%s.%s" % (metric_info.name, indom_value) + self.__array_indexes[full_name] = index index += 1 + try: + dummy = self.__indom_cache.lookup_name(indom_value) + except KeyError: + # This indom value wasn't found in the indom + # cache. Add it. + self.__indom_cache.add_value(indom_value) except KeyError: self.log("Error while refreshing indom for array %s" - % array_name) + % metric_info.name) self.log("%s" % traceback.format_exc()) continue - self.__pmda.replace_indom(self.__indoms[array_name].obj, - self.__indoms[array_name].values) + self.__indom_cache.refresh() def __add_metric(self, metric_info): ''' Create and add a metric to the pmda. ''' @@ -346,26 +483,11 @@ class JsonSource(object): def __parse_array_metadata(self, array_name, metrics_array): ''' Parse a JSON array metadata. ''' - if array_name not in self.__indoms: - # Note that we're creating an indom here, but we don't - # know any values for it yet. We'll get those on a data - # read. - self.__indoms[array_name] = Indom(self.__pmda.indom_idx, - self.__pmda) - self.__pmda.indom_idx += 1 - # Process the array's metrics array. metric_prefix = "%s.%s" % (self.__pmda.pmda_name, self.__root_name) for item in metrics_array: - if self.__metric_idx > MAX_ITEM: - self.log("Skipping metrics in '%s' - max metric reached" - % metric_prefix) - break - metric_info = Metric(metric_prefix, self.cluster, - self.__metric_idx, self.__pmda) - self.__metric_idx += 1 - metric_info.indom = self.__indoms[array_name] - + metric_info = Metric(metric_prefix, self.cluster, self.__pmda) + metric_info.indom_cache = self.__indom_cache error_seen = 0 for (key, value) in item.iteritems(): # 'name' (required): Sanity check it and save it. @@ -380,7 +502,7 @@ class JsonSource(object): # this metric self.log("Skipping metric with invalid name '%s.%s'" % (array_name, value)) - break + continue # 'type' (required): Sanity check it and save it. elif key == 'type': if not isinstance(value, unicode): @@ -427,6 +549,23 @@ class JsonSource(object): self.log("Ignoring unknown attribute in metadata '%s'" % (key)) + # Try looking up the metric name in the metric cache. If + # it is there, reuse the metric index. + try: + metric_info.idx \ + = self.__metric_cache.lookup_name(metric_info.name) + except KeyError: + try: + # We couldn't find the metric name, so just grab + # the next metric index value. + metric_info.idx = self.__metric_cache.next_value() + self.__metric_cache.add_value(metric_info.name, + metric_info.idx) + except ValueError: + self.log("Skipping metrics in '%s' - max metric reached" + % metric_prefix) + break + # Make sure we have everything we need. If not (or we've # seen an error), just skip this metric. if error_seen or not metric_info.valid() \ @@ -435,9 +574,12 @@ class JsonSource(object): self.log("Metadata doesn't have required" " information for the following entry: %s" % metric_info.name) + del metric_info continue # We have all the required information. Add the metric. + if self.__pmda.debug: + self.log("Adding metric '%s'" % metric_info.name) self.__add_metric(metric_info) def __preparse_metadata(self): @@ -481,6 +623,11 @@ class JsonSource(object): Go through the meta, looking for information we can use to create the pcp representation of the metadata. ''' + # Make sure we've got a real cluster idx at this point. + if self.__cluster < 0: + raise TypeError("Cluster index must be set before parsing" + " metadata to create metrics.") + # Look for the "metrics" array. metrics_array = None for (key, value) in self.__metadata.items(): @@ -508,14 +655,7 @@ class JsonSource(object): # Process the metrics array. metric_prefix = "%s.%s" % (self.__pmda.pmda_name, self.__root_name) for item in metrics_array: - if self.__metric_idx > MAX_ITEM: - self.log("Skipping metrics in '%s' - max metric reached" - % metric_prefix) - break - metric_info = Metric(metric_prefix, self.cluster, - self.__metric_idx, self.__pmda) - self.__metric_idx += 1 - + metric_info = Metric(metric_prefix, self.__cluster, self.__pmda) error_seen = 0 for (key, value) in item.iteritems(): # 'name' (required): Sanity check it and save it. @@ -533,7 +673,7 @@ class JsonSource(object): self.log("Skipping metric with invalid name '%s'" % value) error_seen = 1 - break + continue # 'type' (required): Sanity check it and save it. elif key == 'type': if not isinstance(value, unicode): @@ -615,6 +755,23 @@ class JsonSource(object): self.log("Ignoring unknown attribute in metadata '%s'" % (key)) + # Try looking up the metric name in the metric cache. If + # it is there, reuse the metric index. + try: + metric_info.idx \ + = self.__metric_cache.lookup_name(metric_info.name) + except KeyError: + try: + # We couldn't find the metric name, so just grab + # the next metric index value. + metric_info.idx = self.__metric_cache.next_value() + self.__metric_cache.add_value(metric_info.name, + metric_info.idx) + except ValueError: + self.log("Skipping metrics in '%s' - max metric reached" + % metric_prefix) + break + # Make sure we have everything we need. If not (or we've # seen an error), just skip this metric. if error_seen or not metric_info.valid() \ @@ -629,6 +786,8 @@ class JsonSource(object): if self.__pmda.debug: self.log("Adding metric '%s'" % metric_info.name) self.__add_metric(metric_info) + # Now that all the metrics are created, save the metric cache. + self.__metric_cache.refresh() def fetch(self, item, inst): ''' Fetch value for this item and instance. ''' @@ -643,14 +802,7 @@ class JsonSource(object): metric_info = self.__metrics[item] # Handle array metrics. - if metric_info.indom != None: - # Get the array index from the indom. - array_index = metric_info.indom.lookup_inst(inst) - if array_index == None: - self.log("JSON source %s has no indom %d" - % (self.__root_name, inst)) - return [c_api.PM_ERR_INST, 0] - + if metric_info.indom_cache != None: # Split the full name into the array name and metric (array, dummy) = metric_info.name.split('.', 2) if array not in self.__metrics_by_name: @@ -663,9 +815,18 @@ class JsonSource(object): # Get the entire array. metrics_array = array_info.pointer.resolve(self.__json_data) - # Get the appropriate item from within the desired array entry. - return [metric_info.pointer.resolve(metrics_array[array_index]), - 1] + # Turn the instance id into a name. + name = self.__indom_cache.lookup_value(inst) + + # Using that name, lookup the array index where we + # found it. + full_name = "%s.%s" % (array_info.name, name) + try: + index = self.__array_indexes[full_name] + except KeyError: + self.log("array index for '%s' cannot be found" % full_name) + return [c_api.PM_ERR_INDOM, 0] + return [metric_info.pointer.resolve(metrics_array[index]), 1] except (KeyError, TypeError): self.log("Error while fetching metrics for array %s" % array_info.name) @@ -695,18 +856,31 @@ class JsonPMDA(PMDA): self.pmda_name = pmda_name PMDA.__init__(self, self.pmda_name, domain) self.connect_pmcd() - self.indom_idx = 0 + self.__cluster_indom = None self.numfetch = 0 self.metadata_name = 'metadata.json' - # cluster 0 is reserved for the static metrics - self.cluster_idx = 1 + # cache_idx 0 is reserved for the cluster cache. cluster 0 is + # reserved for the static metrics, so clusters that get added + # to the cache start with 1. + self.__cluster_cache = IndomCache(0, MAX_CLUSTER, self) - self.__metrics = {} # FIXME: python doesn't have a __pmParseDebug() wrapper. So, # if PCP_PYTHON_DEBUG has any value, turn debugging on. self.debug = ('PCP_PYTHON_DEBUG' in os.environ) + + # Try loading old cluster cache values. + self.__cluster_cache.load() + if self.__cluster_cache.len() == 0: + # If there weren't any old cluster cache values, we've got + # a bit of a problem. The indom cache only allocates + # consecutive values starting at 0. We want to start at 1, + # since cluster 0 is reserved for the static metrics. So, + # let's add a fake entry. + self.__cluster_cache.add_value('__internal__', 0) + + self.__metrics = {} self.__add_static_metrics() # Set up defaults for config variables. @@ -767,22 +941,24 @@ class JsonPMDA(PMDA): Create all the static metrics (not from a JSON source). ''' # Create our 'nsources' metric. - metric_info = Metric(self.pmda_name, 0, 0, self) + metric_info = Metric(self.pmda_name, 0, self) metric_info.name = 'nsources' metric_info.type = c_api.PM_TYPE_64 metric_info.sem = c_api.PM_SEM_COUNTER metric_info.desc = 'Number of JSON sources' + metric_info.idx = 0 metric_info.create() self.add_metric(metric_info.full_name, metric_info.obj, metric_info.desc) self.__metrics[metric_info.idx] = metric_info # Create our 'debug' metric. - metric_info = Metric(self.pmda_name, 0, 1, self) + metric_info = Metric(self.pmda_name, 0, self) metric_info.name = 'debug' metric_info.type = c_api.PM_TYPE_64 metric_info.sem = c_api.PM_SEM_INSTANT metric_info.desc = 'Debug logging state' + metric_info.idx = 1 metric_info.create() self.add_metric(metric_info.full_name, metric_info.obj, metric_info.desc) @@ -828,6 +1004,7 @@ class JsonPMDA(PMDA): dir_list = self.__trusted_directory_list else: dir_list = self.__directory_list + new_source_seen = False for directory in dir_list: for root, dummy, files in os.walk(directory): # Make sure we have the metadata file. @@ -845,16 +1022,39 @@ class JsonPMDA(PMDA): "file")): continue - # Add the new source. - if self.cluster_idx > MAX_CLUSTER: - self.log("Skipping source '%s' -" - " max cluster reached" % root) - continue - source = JsonSource(root, self.cluster_idx, - self, trusted) + # Create the new JsonSource. After the + # JsonSource has been initialized, it has + # parsed the metadata enough to optionally + # change the name. + source = JsonSource(root, self, trusted) + + # Try looking up the source name in the + # cluster cache. + try: + cluster_idx \ + = self.__cluster_cache.lookup_name(source.name) + if self.debug: + self.log("Found %s in cluster cache: %d" % + (source.name, cluster_idx)) + except KeyError: + try: + cluster_idx = self.__cluster_cache.next_value() + if self.debug: + self.log("allocating new cluster idx" + " %d for source %s" % + (cluster_idx, source.name)) + except ValueError: + self.log("Skipping source '%s' -" + " max cluster reached" % root) + continue + + if self.debug: + self.log("Adding source '%s', cluster_idx %d" + % (source.name, cluster_idx)) + self.__cluster_cache.add_value(source.name, cluster_idx) + source.cluster = cluster_idx self.sources_by_root[root] = source - self.sources_by_cluster[self.cluster_idx] = source - self.cluster_idx += 1 + self.sources_by_cluster[cluster_idx] = source # Notice we're going ahead and loading the # JSON data for new sources. If we're @@ -862,7 +1062,10 @@ class JsonPMDA(PMDA): # new data source, but we need the metadata # (for metrics) and data (for indoms). self.sources_by_root[root].load() + new_source_seen = True sources_seen[root] = 1 + if new_source_seen: + self.__cluster_cache.refresh() def __load_all_json(self): ''' @@ -957,4 +1160,5 @@ if __name__ == '__main__': #os.environ["PCP_PYTHON_DEBUG"] = "ALL" #os.environ["PCP_PYTHON_DEBUG"] = "APPL0|LIBPMDA" + #os.environ["PCP_PYTHON_DEBUG"] = "INDOM" JsonPMDA('json', 137).run() diff --git a/src/python/pcp/pmda.py b/src/python/pcp/pmda.py index 49203e0..26835f0 100644 --- a/src/python/pcp/pmda.py +++ b/src/python/pcp/pmda.py @@ -25,7 +25,7 @@ import os import cpmapi import cpmda from pcp.pmapi import pmContext as PCP -from pcp.pmapi import pmInDom, pmDesc, pmUnits +from pcp.pmapi import pmInDom, pmDesc, pmUnits, pmErr from ctypes.util import find_library from ctypes import CDLL, c_int, c_long, c_char_p, c_void_p, cast, byref @@ -55,6 +55,8 @@ LIBPCP_PMDA.pmdaCacheLookupKey.argtypes = [ POINTER(c_int), POINTER(c_void_p)] LIBPCP_PMDA.pmdaCacheOp.restype = c_int LIBPCP_PMDA.pmdaCacheOp.argtypes = [pmInDom, c_long] +LIBPCP_PMDA.pmdaCacheResize.restype = c_int +LIBPCP_PMDA.pmdaCacheResize.argtypes = [pmInDom, c_int] ## @@ -104,6 +106,39 @@ class pmdaIndom(Structure): self.it_indom = indom self.set_instances(indom, insts) + def __iter__(self): + # Generates an iterator for the cache. + if self.it_numinst < 0: + LIBPCP_PMDA.pmdaCacheOp(self.it_indom, + cpmda.PMDA_CACHE_WALK_REWIND) + while 1: + inst = LIBPCP_PMDA.pmdaCacheOp(self.it_indom, + cpmda.PMDA_CACHE_WALK_NEXT) + if inst < 0: + break + name = self.inst_name_lookup(inst) + if name: + yield (inst, name) + else: + for i in range(self.it_numinst): + inst = self.it_set[i].i_inst + name = self.inst_name_lookup(inst) + if name: + yield (inst, name) + + def inst_name_lookup(self, instance): + if self.it_numinst < 0: + name = (c_char_p)() + sts = LIBPCP_PMDA.pmdaCacheLookup(self.it_indom, instance, + byref(name), None) + if (sts == cpmda.PMDA_CACHE_ACTIVE): + return str(name.value.decode()) + elif self.it_numinst > 0: + for inst in self.it_set: + if (inst.i_inst == instance): + return str(inst.i_name.decode()) + return None + def set_list_instances(self, insts): instance_count = len(insts) if (instance_count == 0): @@ -113,6 +148,7 @@ class pmdaIndom(Structure): instance_array[i].i_inst = insts[i].i_inst instance_array[i].i_name = insts[i].i_name self.it_set = instance_array + self.it_numinst = instance_count def set_dict_instances(self, indom, insts): LIBPCP_PMDA.pmdaCacheOp(indom, cpmda.PMDA_CACHE_INACTIVE) @@ -134,6 +170,34 @@ class pmdaIndom(Structure): def __str__(self): return "pmdaIndom@%#lx indom=%#lx num=%d" % (addressof(self), self.it_indom, self.it_numinst) + def load(self): + if self.it_numinst <= 0: + sts = LIBPCP_PMDA.pmdaCacheOp(self.it_indom, cpmda.PMDA_CACHE_LOAD) + if sts < 0: + raise pmErr(sts) + else: + raise pmErr(cpmapi.PM_ERR_NYI) + + def cache_active(self): + if self.it_numinst <= 0: + LIBPCP_PMDA.pmdaCacheOp(self.it_indom, cpmda.PMDA_CACHE_ACTIVE) + else: + raise pmErr(cpmapi.PM_ERR_NYI) + + def cache_inactive(self): + if self.it_numinst <= 0: + LIBPCP_PMDA.pmdaCacheOp(self.it_indom, cpmda.PMDA_CACHE_INACTIVE) + else: + raise pmErr(cpmapi.PM_ERR_NYI) + + def cache_resize(self, maximum): + if self.it_numinst <= 0: + sts = LIBPCP_PMDA.pmdaCacheResize(self.it_indom, maximum) + if sts < 0: + raise pmErr(sts) + else: + raise pmErr(cpmapi.PM_ERR_NYI) + class pmdaUnits(pmUnits): """ Wrapper class for PMDAs defining their metrics (avoids pmapi import) """ def __init__(self, dimS, dimT, dimC, scaleS, scaleT, scaleC): @@ -240,13 +304,21 @@ class MetricDispatch(object): self._indom_helptext[indomid] = text def replace_indom(self, indom, insts): - replacement = pmdaIndom(indom, insts) + # Note that this function can take a numeric indom or a + # pmdaIndom. + if isinstance(indom, pmdaIndom): + it_indom = indom.it_indom + replacement = indom + else: + it_indom = indom + replacement = pmdaIndom(it_indom, insts) # list indoms need to keep the table up-to-date for libpcp_pmda if (isinstance(insts, list)): for entry in self._indomtable: - if (entry.it_indom == indom): + if (entry.it_indom == it_indom): entry = replacement - self._indoms[indom] = replacement + break + self._indoms[it_indom] = replacement def inst_lookup(self, indom, instance): """ @@ -268,17 +340,7 @@ class MetricDispatch(object): a specific instance domain. """ entry = self._indoms[indom] - if (entry.it_numinst < 0): - name = (c_char_p)() - sts = LIBPCP_PMDA.pmdaCacheLookup(indom, instance, byref(name), None) - if (sts == cpmda.PMDA_CACHE_ACTIVE): - return str(name.value.decode()) - elif (entry.it_numinst > 0 and entry.it_indom == indom): - for inst in entry.it_set: - if (inst.i_inst == instance): - return str(inst.i_name.decode()) - return None - + return entry.inst_name_lookup(instance) class PMDA(MetricDispatch): """ Defines a PCP performance metrics domain agent