pcp
[Top] [All Lists]

pmrep: convert to use pmfg

To: pcp developers <pcp@xxxxxxxxxxx>
Subject: pmrep: convert to use pmfg
From: Marko Myllynen <myllynen@xxxxxxxxxx>
Date: Tue, 21 Jun 2016 14:36:11 +0300
Delivered-to: pcp@xxxxxxxxxxx
Organization: Red Hat
Reply-to: Marko Myllynen <myllynen@xxxxxxxxxx>
User-agent: Mozilla/5.0 (X11; Linux x86_64; rv:38.0) Gecko/20100101 Thunderbird/38.8.0
Hi,

Below is the first cut to convert pmrep to use pmfg. It's not yet
intended for merging as there are quite a few QA failures but to kick
off discussion are we talking about pmfg, pmrep, or some other issues
with those QA failures.

Highlights:

- CPU usage reduced roughly by ~80% when reporting huge amount of
metrics
- the convoluted extract() method is gone
- further cleanups possible

Current issues:

- pmfg rejects metrics like pmcd.pmie.configfile which pmval/pmrep/etc
accept
- in QA 1062 (vmstat-like output) idleness was previously reported
correctly (~99), now it get some insanely high value (115646464) which
gets truncated. Might also be incorrect scale parameter.
- QA 1068 output is different, haven't investigated yet
- in QA 1069 for sample.seconds we get 1 (integer) unlike with
pmval/pmrep which give typically something like ~0.999-1.001 and for
sample.milliseconds we get ~1000.002 or such where pmval/pmrep report
~1.000.
- in QA 1070 earlier 0.500/0.000 is now 0 (just noticed this, haven't
investigated much)
- in QA 1071 a similar as with 1070

So overall things looks pretty good, it might well be a one-liner or
two in pmrep and/or pmfg and we'd see QA passing. I'll certainly
continue investigating but if can think of any explanation for the
above hickups, please let me now.


---
 src/pmrep/pmrep.py | 233 +++++++++++++++--------------------------------------
 1 file changed, 64 insertions(+), 169 deletions(-)

diff --git a/src/pmrep/pmrep.py b/src/pmrep/pmrep.py
index 548b8f4..38ccf70 100755
--- a/src/pmrep/pmrep.py
+++ b/src/pmrep/pmrep.py
@@ -216,16 +216,14 @@ class PMReporter(object):
 
         # Performance metrics store
         # key - metric name
-        # values - 0:label, 1:instance(s), 2:unit/scale, 3:type, 4:width
+        # values - 0:label, 1:instance(s), 2:unit/scale, 3:type, 4:width, 
5:pmfg item
         self.metrics = OrderedDict()
+        self.pmfg = None
+        self.pmfg_ts = None
 
         # Corresponding config file metric specifiers
         self.metricspec = ('label', 'instance', 'unit', 'type', 'width', 
'formula')
 
-        self.prevvals = None
-        self.currvals = None
-        self.ptstamp = 0
-        self.ctstamp = 0
         self.pmids = []
         self.descs = []
         self.insts = []
@@ -527,9 +525,11 @@ class PMReporter(object):
             self.source = None
         else:
             optctx = pmapi.c_api.PM_CONTEXT_HOST
-        self.context = pmapi.pmContext(optctx, self.source)
+        self.pmfg = pmapi.fetchgroup(optctx, self.source)
+        self.context = self.pmfg.get_context()
         if pmapi.c_api.pmSetContextOptions(self.context.ctx, self.opts.mode, 
self.opts.delta):
             raise pmapi.pmUsageErr()
+        self.pmfg_ts = self.pmfg.extend_timestamp()
 
     def validate_config(self):
         """ Validate configuration options """
@@ -677,7 +677,7 @@ class PMReporter(object):
         # Finalize the metrics set
         for i, metric in enumerate(self.metrics):
             # Fill in all fields for easier checking later
-            for index in range(0, 5):
+            for index in range(0, 6):
                 if len(self.metrics[metric]) <= index:
                     self.metrics[metric].append(None)
 
@@ -690,7 +690,8 @@ class PMReporter(object):
                 self.metrics[metric][0] = name[:-2] + m
 
             # Rawness
-            if self.metrics[metric][3] == 'raw' or self.type == 1:
+            if self.metrics[metric][3] == 'raw' or self.type == 1 or \
+               self.output == OUTPUT_CSV:
                 self.metrics[metric][3] = 1
             else:
                 self.metrics[metric][3] = 0
@@ -750,16 +751,16 @@ class PMReporter(object):
             if self.metrics[metric][4] < len(TRUNC):
                 self.metrics[metric][4] = len(TRUNC) # Forced minimum
 
-    # RHBZ#1264147
-    def pmids_to_ctypes(self, pmids):
-        """ Convert a Python list of pmids (numbers) to
-            a ctypes LP_c_uint (a C array of uints).
-        """
-        from ctypes import c_uint
-        pmidA = (c_uint * len(pmids))()
-        for i, p in enumerate(pmids):
-            pmidA[i] = c_uint(p)
-        return pmidA
+            # Add fetchgroup item
+            if self.metrics[metric][2][0] != "util":
+                (scale, mult) = 
self.context.pmParseUnitsStr(self.metrics[metric][2][0])
+                scale = str(scale)
+            else:
+                scale = None
+            ins = 1 if self.insts[i][0][0] == PM_IN_NULL else 
len(self.insts[i][0])
+            self.metrics[metric][5] = []
+            for j in range(ins):
+                self.metrics[metric][5].append(self.pmfg.extend_item(metric, 
None, scale, self.insts[i][1][j]))
 
     def get_current_tz(self):
         """ Figure out the current timezone using the PCP convention """
@@ -858,143 +859,44 @@ class PMReporter(object):
 
         lines = 0
         while self.samples != 0:
+            # Repeat the header if needed
             if self.output == OUTPUT_STDOUT:
                 if lines > 1 and self.repeat_header == lines:
                     self.write_header()
                     lines = 0
                 lines += 1
 
+            # Fetch values
             try:
-                result = self.context.pmFetch(self.pmids_to_ctypes(self.pmids))
+                self.pmfg.fetch()
             except pmapi.pmErr as error:
                 if error.args[0] == PM_ERR_EOL:
                     break
                 raise error
-            self.extract(result)
-            if self.ctstamp == 0:
-                self.ctstamp = copy.copy(result.contents.timestamp)
-            self.ptstamp = self.ctstamp
-            self.ctstamp = copy.copy(result.contents.timestamp)
 
-            if self.context.type == PM_CONTEXT_ARCHIVE:
-                if float(self.ctstamp) < float(self.opts.pmGetOptionOrigin()):
-                    self.context.pmFreeResult(result)
-                    continue
-                if float(self.ctstamp) > float(self.opts.pmGetOptionFinish()):
-                    self.context.pmFreeResult(result)
-                    break
-
-            self.report(self.ctstamp, self.currvals)
-            self.context.pmFreeResult(result)
+            # Report and prepare for the next round
+            self.report(self.pmfg_ts())
             if self.samples and self.samples > 0:
                 self.samples -= 1
             if self.delay and self.interpol and self.samples != 0:
                 self.context.pmtimevalSleep(self.interval)
 
         # Allow modules to flush buffered values / say goodbye
-        self.report(None, None)
-
-    def extract(self, result):
-        """ Extract the metric values from pmResult structure """
-        # Metrics incl. all instance values, must match self.format on return
-        values = []
-
-        for i, metric in enumerate(self.metrics):
-            # Per-metric values incl. all instance values
-            # We use dict to make it easier to deal with gone/unknown instances
-            values.append({})
-
-            # Populate instance fields to have values for unavailable instances
-            # Values are (instance id, instance name, instance value)
-            for inst in self.insts[i][0]:
-                values[i][inst] = (-1, None, NO_VAL)
-
-            # No values available for this metric
-            if result.contents.get_numval(i) == 0:
-                continue
-
-            # Process all fetched instances
-            for j in range(result.contents.get_numval(i)):
-                inst = result.contents.get_inst(i, j)
-
-                # Locate the correct instance and its position
-                if inst >= 0:
-                    if inst not in self.insts[i][0]:
-                        # Ignore newly emerged instances
-                        continue
-                    k = 0
-                    while inst != self.insts[i][0][k]:
-                        k += 1
-
-                # Extract and scale the value
-                try:
-                    # Use native type if no rescaling needed
-                    if self.descs[i].contents.type == PM_TYPE_STRING or \
-                       self.metrics[metric][3] == 1 or \
-                       (self.metrics[metric][2][2] == 1 and \
-                        str(self.descs[i].contents.units) == \
-                        str(self.metrics[metric][2][1])):
-                        rescale = 0
-                        vtype = self.descs[i].contents.type
-                    else:
-                        rescale = 1
-                        vtype = PM_TYPE_DOUBLE
-
-                    atom = self.context.pmExtractValue(
-                        result.contents.get_valfmt(i),
-                        result.contents.get_vlist(i, j),
-                        self.descs[i].contents.type,
-                        vtype)
-
-                    if rescale:
-                        atom = self.context.pmConvScale(
-                            vtype,
-                            atom, self.descs, i,
-                            self.metrics[metric][2][1])
-
-                    val = atom.dref(vtype)
+        self.report(None)
 
-                    if rescale:
-                        val *= self.metrics[metric][2][2]
-                        val = int(val) if val == int(val) else val
-
-                    if inst >= 0:
-                        values[i][inst] = (inst, self.insts[i][1][k], val)
-                    else:
-                        values[i][PM_IN_NULL] = (-1, None, val)
-
-                except pmapi.pmErr as error:
-                    sys.stderr.write("%s: %s, aborting.\n" % (metric, 
str(error)))
-                    sys.exit(1)
-
-        # Convert dicts to lists
-        vals = []
-        for v in values:
-            vals.append(v.values())
-        values = vals
-
-        # Store current and previous values
-        # Output modules need to handle non-existing self.prevvals
-        self.prevvals = self.currvals
-        self.currvals = values
-
-    def report(self, tstamp, values):
+    def report(self, tstamp):
         """ Report the metric values """
         if tstamp != None:
-            ts = self.context.pmLocaltime(tstamp.tv_sec)
-            us = int(tstamp.tv_usec)
-            dt = datetime(ts.tm_year+1900, ts.tm_mon+1, ts.tm_mday,
-                          ts.tm_hour, ts.tm_min, ts.tm_sec, us, None)
-            tstamp = dt.strftime(self.timefmt)
+            tstamp = tstamp.strftime(self.timefmt)
 
         if self.output == OUTPUT_ARCHIVE:
-            self.write_archive(tstamp, values)
+            self.write_archive(tstamp)
         if self.output == OUTPUT_CSV:
-            self.write_csv(tstamp, values)
+            self.write_csv(tstamp)
         if self.output == OUTPUT_STDOUT:
-            self.write_stdout(tstamp, values)
+            self.write_stdout(tstamp)
         if self.output == OUTPUT_ZABBIX:
-            self.write_zabbix(tstamp, values)
+            self.write_zabbix(tstamp)
 
     def prepare_writer(self):
         """ Prepare generic stdout writer """
@@ -1153,9 +1055,9 @@ class PMReporter(object):
             else:
                 self.writer.write("...\n(Ctrl-C to stop)\n")
 
-    def write_archive(self, timestamp, values):
+    def write_archive(self, timestamp):
         """ Write an archive record """
-        if timestamp == None and values == None:
+        if timestamp == None:
             # Complete and close
             self.pmi.pmiEnd()
             self.pmi = None
@@ -1187,25 +1089,28 @@ class PMReporter(object):
         for i, metric in enumerate(self.metrics):
             ins = 1 if self.insts[i][0][0] == PM_IN_NULL else 
len(self.insts[i][0])
             for j in range(ins):
-                if str(list(values[i])[j][2]) != NO_VAL:
-                    data = 1
+                try:
+                    value = self.metrics[metric][5][j]()
                     inst = self.insts[i][1][j]
+                    data = 1
                     if self.descs[i].contents.type == PM_TYPE_STRING:
-                        self.pmi.pmiPutValue(metric, inst, 
str(list(values[i])[j][2]))
+                        self.pmi.pmiPutValue(metric, inst, value)
                     elif self.descs[i].contents.type == PM_TYPE_FLOAT or \
                          self.descs[i].contents.type == PM_TYPE_DOUBLE:
-                        self.pmi.pmiPutValue(metric, inst, "%f" % 
list(values[i])[j][2])
+                        self.pmi.pmiPutValue(metric, inst, "%f" % value)
                     else:
-                        self.pmi.pmiPutValue(metric, inst, "%d" % 
list(values[i])[j][2])
+                        self.pmi.pmiPutValue(metric, inst, "%d" % value)
+                except:
+                    pass
 
         # Flush
         if data:
             # pylint: disable=maybe-no-member
-            self.pmi.pmiWrite(self.ctstamp.tv_sec, self.ctstamp.tv_usec)
+            self.pmi.pmiWrite(self.pmfg_ts().second, 
self.pmfg_ts().microsecond)
 
-    def write_csv(self, timestamp, values):
+    def write_csv(self, timestamp):
         """ Write results in CSV format """
-        if timestamp == None and values == None:
+        if timestamp == None:
             # Silent goodbye
             return
 
@@ -1215,16 +1120,20 @@ class PMReporter(object):
             ins = 1 if self.insts[i][0][0] == PM_IN_NULL else 
len(self.insts[i][0])
             for j in range(ins):
                 line += self.delimiter
-                if type(list(values[i])[j][2]) is float:
+                try:
+                    value = self.metrics[metric][5][j]()
+                except:
+                    value = NO_VAL
+                if type(value) is float:
                     fmt = "." + str(self.precision) + "f"
-                    line += format(list(values[i])[j][2], fmt)
+                    line += format(value, fmt)
                 else:
-                    line += str(list(values[i])[j][2])
+                    line += str(value)
         self.writer.write(line + "\n")
 
-    def write_stdout(self, timestamp, values):
+    def write_stdout(self, timestamp):
         """ Write a line to stdout """
-        if timestamp == None and values == None:
+        if timestamp == None:
             # Silent goodbye
             return
 
@@ -1242,29 +1151,13 @@ class PMReporter(object):
         for i, metric in enumerate(self.metrics):
             l = self.metrics[metric][4]
 
-            for j in range(len(values[i])):
+            for j in range(len(self.metrics[metric][5])):
                 k += 1
 
-                # Raw or rate
-                if self.metrics[metric][3] or \
-                  self.descs[i].sem != PM_SEM_COUNTER or \
-                  list(values[i])[j][2] == NO_VAL:
-                    # Raw
-                    value = list(values[i])[j][2]
-                elif not self.metrics[metric][3] and \
-                  (self.prevvals == None or list(self.prevvals[i])[j][2] == 
NO_VAL):
-                    # Rate not yet possible
+                try:
+                    value = self.metrics[metric][5][j]()
+                except:
                     value = NO_VAL
-                else:
-                    # Rate
-                    scale = 1
-                    if self.descs[i].contents.units.dimTime != 0:
-                        if self.descs[i].contents.units.scaleTime > 
PM_TIME_SEC:
-                            scale = pow(60, (PM_TIME_SEC - 
self.descs[i].contents.units.scaleTime))
-                        else:
-                            scale = pow(1000, (PM_TIME_SEC - 
self.descs[i].contents.units.scaleTime))
-                    delta = scale * (float(self.ctstamp) - float(self.ptstamp))
-                    value = (list(values[i])[j][2] - 
list(self.prevvals[i])[j][2]) / delta if delta else 0
 
                 # Make sure the value fits
                 if type(value) is int or type(value) is long:
@@ -1308,9 +1201,9 @@ class PMReporter(object):
         nfmt = nfmt[:-l]
         self.writer.write(nfmt.format(*tuple(line)) + "\n")
 
-    def write_zabbix(self, timestamp, values):
+    def write_zabbix(self, timestamp):
         """ Write (send) metrics to a Zabbix server """
-        if timestamp == None and values == None:
+        if timestamp == None:
             # Send any remaining buffered values
             if self.zabbix_metrics:
                 send_to_zabbix(self.zabbix_metrics, self.zabbix_server, 
self.zabbix_port)
@@ -1318,7 +1211,7 @@ class PMReporter(object):
             return
 
         # Collect the results
-        ts = float(self.ctstamp)
+        ts = self.pmfg_ts().timestamp()
         if self.zabbix_prevsend == None:
             self.zabbix_prevsend = ts
         for i, metric in enumerate(self.metrics):
@@ -1327,9 +1220,11 @@ class PMReporter(object):
                 key = ZBXPRFX + metric
                 if self.insts[i][1][j]:
                     key += "[" + str(self.insts[i][1][j]) + "]"
-                val = str(list(values[i])[j][2])
-                if val != NO_VAL:
+                try:
+                    val = str(self.metrics[metric][5][j]())
                     self.zabbix_metrics.append(ZabbixMetric(self.zabbix_host, 
key, val, ts))
+                except:
+                    pass
 
         # Send when needed
         if self.context.type == PM_CONTEXT_ARCHIVE:

Thanks,

-- 
Marko Myllynen

<Prev in Thread] Current Thread [Next in Thread>