Hi,
Below is the first cut to convert pmrep to use pmfg. It's not yet
intended for merging as there are quite a few QA failures but to kick
off discussion are we talking about pmfg, pmrep, or some other issues
with those QA failures.
Highlights:
- CPU usage reduced roughly by ~80% when reporting huge amount of
metrics
- the convoluted extract() method is gone
- further cleanups possible
Current issues:
- pmfg rejects metrics like pmcd.pmie.configfile which pmval/pmrep/etc
accept
- in QA 1062 (vmstat-like output) idleness was previously reported
correctly (~99), now it get some insanely high value (115646464) which
gets truncated. Might also be incorrect scale parameter.
- QA 1068 output is different, haven't investigated yet
- in QA 1069 for sample.seconds we get 1 (integer) unlike with
pmval/pmrep which give typically something like ~0.999-1.001 and for
sample.milliseconds we get ~1000.002 or such where pmval/pmrep report
~1.000.
- in QA 1070 earlier 0.500/0.000 is now 0 (just noticed this, haven't
investigated much)
- in QA 1071 a similar as with 1070
So overall things looks pretty good, it might well be a one-liner or
two in pmrep and/or pmfg and we'd see QA passing. I'll certainly
continue investigating but if can think of any explanation for the
above hickups, please let me now.
---
src/pmrep/pmrep.py | 233 +++++++++++++++--------------------------------------
1 file changed, 64 insertions(+), 169 deletions(-)
diff --git a/src/pmrep/pmrep.py b/src/pmrep/pmrep.py
index 548b8f4..38ccf70 100755
--- a/src/pmrep/pmrep.py
+++ b/src/pmrep/pmrep.py
@@ -216,16 +216,14 @@ class PMReporter(object):
# Performance metrics store
# key - metric name
- # values - 0:label, 1:instance(s), 2:unit/scale, 3:type, 4:width
+ # values - 0:label, 1:instance(s), 2:unit/scale, 3:type, 4:width,
5:pmfg item
self.metrics = OrderedDict()
+ self.pmfg = None
+ self.pmfg_ts = None
# Corresponding config file metric specifiers
self.metricspec = ('label', 'instance', 'unit', 'type', 'width',
'formula')
- self.prevvals = None
- self.currvals = None
- self.ptstamp = 0
- self.ctstamp = 0
self.pmids = []
self.descs = []
self.insts = []
@@ -527,9 +525,11 @@ class PMReporter(object):
self.source = None
else:
optctx = pmapi.c_api.PM_CONTEXT_HOST
- self.context = pmapi.pmContext(optctx, self.source)
+ self.pmfg = pmapi.fetchgroup(optctx, self.source)
+ self.context = self.pmfg.get_context()
if pmapi.c_api.pmSetContextOptions(self.context.ctx, self.opts.mode,
self.opts.delta):
raise pmapi.pmUsageErr()
+ self.pmfg_ts = self.pmfg.extend_timestamp()
def validate_config(self):
""" Validate configuration options """
@@ -677,7 +677,7 @@ class PMReporter(object):
# Finalize the metrics set
for i, metric in enumerate(self.metrics):
# Fill in all fields for easier checking later
- for index in range(0, 5):
+ for index in range(0, 6):
if len(self.metrics[metric]) <= index:
self.metrics[metric].append(None)
@@ -690,7 +690,8 @@ class PMReporter(object):
self.metrics[metric][0] = name[:-2] + m
# Rawness
- if self.metrics[metric][3] == 'raw' or self.type == 1:
+ if self.metrics[metric][3] == 'raw' or self.type == 1 or \
+ self.output == OUTPUT_CSV:
self.metrics[metric][3] = 1
else:
self.metrics[metric][3] = 0
@@ -750,16 +751,16 @@ class PMReporter(object):
if self.metrics[metric][4] < len(TRUNC):
self.metrics[metric][4] = len(TRUNC) # Forced minimum
- # RHBZ#1264147
- def pmids_to_ctypes(self, pmids):
- """ Convert a Python list of pmids (numbers) to
- a ctypes LP_c_uint (a C array of uints).
- """
- from ctypes import c_uint
- pmidA = (c_uint * len(pmids))()
- for i, p in enumerate(pmids):
- pmidA[i] = c_uint(p)
- return pmidA
+ # Add fetchgroup item
+ if self.metrics[metric][2][0] != "util":
+ (scale, mult) =
self.context.pmParseUnitsStr(self.metrics[metric][2][0])
+ scale = str(scale)
+ else:
+ scale = None
+ ins = 1 if self.insts[i][0][0] == PM_IN_NULL else
len(self.insts[i][0])
+ self.metrics[metric][5] = []
+ for j in range(ins):
+ self.metrics[metric][5].append(self.pmfg.extend_item(metric,
None, scale, self.insts[i][1][j]))
def get_current_tz(self):
""" Figure out the current timezone using the PCP convention """
@@ -858,143 +859,44 @@ class PMReporter(object):
lines = 0
while self.samples != 0:
+ # Repeat the header if needed
if self.output == OUTPUT_STDOUT:
if lines > 1 and self.repeat_header == lines:
self.write_header()
lines = 0
lines += 1
+ # Fetch values
try:
- result = self.context.pmFetch(self.pmids_to_ctypes(self.pmids))
+ self.pmfg.fetch()
except pmapi.pmErr as error:
if error.args[0] == PM_ERR_EOL:
break
raise error
- self.extract(result)
- if self.ctstamp == 0:
- self.ctstamp = copy.copy(result.contents.timestamp)
- self.ptstamp = self.ctstamp
- self.ctstamp = copy.copy(result.contents.timestamp)
- if self.context.type == PM_CONTEXT_ARCHIVE:
- if float(self.ctstamp) < float(self.opts.pmGetOptionOrigin()):
- self.context.pmFreeResult(result)
- continue
- if float(self.ctstamp) > float(self.opts.pmGetOptionFinish()):
- self.context.pmFreeResult(result)
- break
-
- self.report(self.ctstamp, self.currvals)
- self.context.pmFreeResult(result)
+ # Report and prepare for the next round
+ self.report(self.pmfg_ts())
if self.samples and self.samples > 0:
self.samples -= 1
if self.delay and self.interpol and self.samples != 0:
self.context.pmtimevalSleep(self.interval)
# Allow modules to flush buffered values / say goodbye
- self.report(None, None)
-
- def extract(self, result):
- """ Extract the metric values from pmResult structure """
- # Metrics incl. all instance values, must match self.format on return
- values = []
-
- for i, metric in enumerate(self.metrics):
- # Per-metric values incl. all instance values
- # We use dict to make it easier to deal with gone/unknown instances
- values.append({})
-
- # Populate instance fields to have values for unavailable instances
- # Values are (instance id, instance name, instance value)
- for inst in self.insts[i][0]:
- values[i][inst] = (-1, None, NO_VAL)
-
- # No values available for this metric
- if result.contents.get_numval(i) == 0:
- continue
-
- # Process all fetched instances
- for j in range(result.contents.get_numval(i)):
- inst = result.contents.get_inst(i, j)
-
- # Locate the correct instance and its position
- if inst >= 0:
- if inst not in self.insts[i][0]:
- # Ignore newly emerged instances
- continue
- k = 0
- while inst != self.insts[i][0][k]:
- k += 1
-
- # Extract and scale the value
- try:
- # Use native type if no rescaling needed
- if self.descs[i].contents.type == PM_TYPE_STRING or \
- self.metrics[metric][3] == 1 or \
- (self.metrics[metric][2][2] == 1 and \
- str(self.descs[i].contents.units) == \
- str(self.metrics[metric][2][1])):
- rescale = 0
- vtype = self.descs[i].contents.type
- else:
- rescale = 1
- vtype = PM_TYPE_DOUBLE
-
- atom = self.context.pmExtractValue(
- result.contents.get_valfmt(i),
- result.contents.get_vlist(i, j),
- self.descs[i].contents.type,
- vtype)
-
- if rescale:
- atom = self.context.pmConvScale(
- vtype,
- atom, self.descs, i,
- self.metrics[metric][2][1])
-
- val = atom.dref(vtype)
+ self.report(None)
- if rescale:
- val *= self.metrics[metric][2][2]
- val = int(val) if val == int(val) else val
-
- if inst >= 0:
- values[i][inst] = (inst, self.insts[i][1][k], val)
- else:
- values[i][PM_IN_NULL] = (-1, None, val)
-
- except pmapi.pmErr as error:
- sys.stderr.write("%s: %s, aborting.\n" % (metric,
str(error)))
- sys.exit(1)
-
- # Convert dicts to lists
- vals = []
- for v in values:
- vals.append(v.values())
- values = vals
-
- # Store current and previous values
- # Output modules need to handle non-existing self.prevvals
- self.prevvals = self.currvals
- self.currvals = values
-
- def report(self, tstamp, values):
+ def report(self, tstamp):
""" Report the metric values """
if tstamp != None:
- ts = self.context.pmLocaltime(tstamp.tv_sec)
- us = int(tstamp.tv_usec)
- dt = datetime(ts.tm_year+1900, ts.tm_mon+1, ts.tm_mday,
- ts.tm_hour, ts.tm_min, ts.tm_sec, us, None)
- tstamp = dt.strftime(self.timefmt)
+ tstamp = tstamp.strftime(self.timefmt)
if self.output == OUTPUT_ARCHIVE:
- self.write_archive(tstamp, values)
+ self.write_archive(tstamp)
if self.output == OUTPUT_CSV:
- self.write_csv(tstamp, values)
+ self.write_csv(tstamp)
if self.output == OUTPUT_STDOUT:
- self.write_stdout(tstamp, values)
+ self.write_stdout(tstamp)
if self.output == OUTPUT_ZABBIX:
- self.write_zabbix(tstamp, values)
+ self.write_zabbix(tstamp)
def prepare_writer(self):
""" Prepare generic stdout writer """
@@ -1153,9 +1055,9 @@ class PMReporter(object):
else:
self.writer.write("...\n(Ctrl-C to stop)\n")
- def write_archive(self, timestamp, values):
+ def write_archive(self, timestamp):
""" Write an archive record """
- if timestamp == None and values == None:
+ if timestamp == None:
# Complete and close
self.pmi.pmiEnd()
self.pmi = None
@@ -1187,25 +1089,28 @@ class PMReporter(object):
for i, metric in enumerate(self.metrics):
ins = 1 if self.insts[i][0][0] == PM_IN_NULL else
len(self.insts[i][0])
for j in range(ins):
- if str(list(values[i])[j][2]) != NO_VAL:
- data = 1
+ try:
+ value = self.metrics[metric][5][j]()
inst = self.insts[i][1][j]
+ data = 1
if self.descs[i].contents.type == PM_TYPE_STRING:
- self.pmi.pmiPutValue(metric, inst,
str(list(values[i])[j][2]))
+ self.pmi.pmiPutValue(metric, inst, value)
elif self.descs[i].contents.type == PM_TYPE_FLOAT or \
self.descs[i].contents.type == PM_TYPE_DOUBLE:
- self.pmi.pmiPutValue(metric, inst, "%f" %
list(values[i])[j][2])
+ self.pmi.pmiPutValue(metric, inst, "%f" % value)
else:
- self.pmi.pmiPutValue(metric, inst, "%d" %
list(values[i])[j][2])
+ self.pmi.pmiPutValue(metric, inst, "%d" % value)
+ except:
+ pass
# Flush
if data:
# pylint: disable=maybe-no-member
- self.pmi.pmiWrite(self.ctstamp.tv_sec, self.ctstamp.tv_usec)
+ self.pmi.pmiWrite(self.pmfg_ts().second,
self.pmfg_ts().microsecond)
- def write_csv(self, timestamp, values):
+ def write_csv(self, timestamp):
""" Write results in CSV format """
- if timestamp == None and values == None:
+ if timestamp == None:
# Silent goodbye
return
@@ -1215,16 +1120,20 @@ class PMReporter(object):
ins = 1 if self.insts[i][0][0] == PM_IN_NULL else
len(self.insts[i][0])
for j in range(ins):
line += self.delimiter
- if type(list(values[i])[j][2]) is float:
+ try:
+ value = self.metrics[metric][5][j]()
+ except:
+ value = NO_VAL
+ if type(value) is float:
fmt = "." + str(self.precision) + "f"
- line += format(list(values[i])[j][2], fmt)
+ line += format(value, fmt)
else:
- line += str(list(values[i])[j][2])
+ line += str(value)
self.writer.write(line + "\n")
- def write_stdout(self, timestamp, values):
+ def write_stdout(self, timestamp):
""" Write a line to stdout """
- if timestamp == None and values == None:
+ if timestamp == None:
# Silent goodbye
return
@@ -1242,29 +1151,13 @@ class PMReporter(object):
for i, metric in enumerate(self.metrics):
l = self.metrics[metric][4]
- for j in range(len(values[i])):
+ for j in range(len(self.metrics[metric][5])):
k += 1
- # Raw or rate
- if self.metrics[metric][3] or \
- self.descs[i].sem != PM_SEM_COUNTER or \
- list(values[i])[j][2] == NO_VAL:
- # Raw
- value = list(values[i])[j][2]
- elif not self.metrics[metric][3] and \
- (self.prevvals == None or list(self.prevvals[i])[j][2] ==
NO_VAL):
- # Rate not yet possible
+ try:
+ value = self.metrics[metric][5][j]()
+ except:
value = NO_VAL
- else:
- # Rate
- scale = 1
- if self.descs[i].contents.units.dimTime != 0:
- if self.descs[i].contents.units.scaleTime >
PM_TIME_SEC:
- scale = pow(60, (PM_TIME_SEC -
self.descs[i].contents.units.scaleTime))
- else:
- scale = pow(1000, (PM_TIME_SEC -
self.descs[i].contents.units.scaleTime))
- delta = scale * (float(self.ctstamp) - float(self.ptstamp))
- value = (list(values[i])[j][2] -
list(self.prevvals[i])[j][2]) / delta if delta else 0
# Make sure the value fits
if type(value) is int or type(value) is long:
@@ -1308,9 +1201,9 @@ class PMReporter(object):
nfmt = nfmt[:-l]
self.writer.write(nfmt.format(*tuple(line)) + "\n")
- def write_zabbix(self, timestamp, values):
+ def write_zabbix(self, timestamp):
""" Write (send) metrics to a Zabbix server """
- if timestamp == None and values == None:
+ if timestamp == None:
# Send any remaining buffered values
if self.zabbix_metrics:
send_to_zabbix(self.zabbix_metrics, self.zabbix_server,
self.zabbix_port)
@@ -1318,7 +1211,7 @@ class PMReporter(object):
return
# Collect the results
- ts = float(self.ctstamp)
+ ts = self.pmfg_ts().timestamp()
if self.zabbix_prevsend == None:
self.zabbix_prevsend = ts
for i, metric in enumerate(self.metrics):
@@ -1327,9 +1220,11 @@ class PMReporter(object):
key = ZBXPRFX + metric
if self.insts[i][1][j]:
key += "[" + str(self.insts[i][1][j]) + "]"
- val = str(list(values[i])[j][2])
- if val != NO_VAL:
+ try:
+ val = str(self.metrics[metric][5][j]())
self.zabbix_metrics.append(ZabbixMetric(self.zabbix_host,
key, val, ts))
+ except:
+ pass
# Send when needed
if self.context.type == PM_CONTEXT_ARCHIVE:
Thanks,
--
Marko Myllynen
|