diff --git a/build/rpm/pcp.spec.in b/build/rpm/pcp.spec.in index f5f7d77..30aebe9 100755 --- a/build/rpm/pcp.spec.in +++ b/build/rpm/pcp.spec.in @@ -520,6 +520,27 @@ Requires: python-pcp = @package_version@ %description export-pcp2graphite Performance Co-Pilot (PCP) front-end tools for exporting metric values to graphite (http://graphite.readthedocs.org). + +# +# pcp-export-pcp2influxdb +# +%package export-pcp2influxdb +License: GPLv2+ +Group: Applications/System +Summary: Performance Co-Pilot tools for exporting PCP metrics to InfluxDB +URL: http://www.pcp.io +Requires: pcp-libs >= %{version}-%{release} +%if "@enable_python3@" == "true" +Requires: python3-pcp = @package_version@ +Requires: python3-requests +%else +Requires: python-pcp = @package_version@ +Requires: python-requests +%endif + +%description export-pcp2influxdb +Performance Co-Pilot (PCP) front-end tools for exporting metric values +to InfluxDB (https://influxdata.com/time-series-platform/influxdb). %endif %if "@enable_python2@" == "true" @@ -1737,6 +1758,7 @@ awk '{print $NF}' $DIST_MANIFEST | egrep 'ganglia2pcp' >import_ganglia2pcp_files awk '{print $NF}' $DIST_MANIFEST | egrep 'collectl2pcp' >import_collectl2pcp_files %if "@have_python@" == "true" awk '{print $NF}' $DIST_MANIFEST | egrep 'pcp2graphite' >export_pcp2graphite_files +awk '{print $NF}' $DIST_MANIFEST | egrep 'pcp2influxdb' >export_pcp2influxdb_files %endif awk '{print $NF}' $DIST_MANIFEST | egrep 'zabbix|zbxpcp' >export_zabbix_agent_files %if "@pmda_infiniband@" == "true" @@ -1944,6 +1966,7 @@ BEGIN { while( getline < "pmda_unbound_files") pmda_unbound[$0]=1; while( getline < "pmda_mic_files") pmda_mic[$0]=1; while( getline < "export_pcp2graphite_files") export_pcp2graphite[$0]=1; + while( getline < "export_pcp2influxdb_files") export_pcp2influxdb[$0]=1; %endif while( getline < "export_zabbix_agent_files") export_zabbix_agent[$0]=1; %if "@pmda_json@" == "true" @@ -2041,6 +2064,7 @@ BEGIN { else if (pmda_unbound[$NF]) f="pmda_unbound_files.rpm"; else if (pmda_mic[$NF]) f="pmda_mic_files.rpm"; else if (export_pcp2graphite[$NF]) f="export_pcp2graphite_files.rpm"; + else if (export_pcp2influxdb[$NF]) f="export_pcp2influxdb_files.rpm"; %endif else if (export_zabbix_agent[$NF]) f="export_zabbix_agent_files.rpm"; %if "@pmda_json@" == "true" @@ -2668,6 +2692,8 @@ cd %files pmda-mic -f pmda_mic_files.rpm %files export-pcp2graphite -f export_pcp2graphite_files.rpm + +%files export-pcp2influxdb -f export_pcp2influxdb_files.rpm %endif %files export-zabbix-agent -f export_zabbix_agent_files.rpm diff --git a/src/GNUmakefile b/src/GNUmakefile index 5ae0f92..7eaa8f2 100644 --- a/src/GNUmakefile +++ b/src/GNUmakefile @@ -43,6 +43,7 @@ OTHER_SUBDIRS = \ newhelp \ pcp \ pcp2graphite \ + pcp2influxdb \ pmafm \ pmfind \ pmcpp \ diff --git a/src/pcp2influxdb/GNUmakefile b/src/pcp2influxdb/GNUmakefile new file mode 100644 index 0000000..4a98999 --- /dev/null +++ b/src/pcp2influxdb/GNUmakefile @@ -0,0 +1,37 @@ +# +# Copyright (c) 2014-2015 Red Hat, Inc. All Rights Reserved. +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the +# Free Software Foundation; either version 2 of the License, or (at your +# option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. +# + +TOPDIR = ../.. +include $(TOPDIR)/src/include/builddefs + +TARGET = pcp2influxdb +MAN_SECTION = 1 +MAN_PAGES = $(TARGET).$(MAN_SECTION) +MAN_DEST = $(PCP_MAN_DIR)/man$(MAN_SECTION) + +default: $(TARGET).py $(MAN_PAGES) + +default: + +include $(BUILDRULES) + +install: default +ifeq "$(HAVE_PYTHON)" "true" + $(INSTALL) -m 755 $(TARGET).py $(PCP_BIN_DIR)/$(TARGET) + @$(INSTALL_MAN) +endif + +default_pcp: default + +install_pcp: install diff --git a/src/pcp2influxdb/pcp2influxdb.1 b/src/pcp2influxdb/pcp2influxdb.1 new file mode 100644 index 0000000..43eb500 --- /dev/null +++ b/src/pcp2influxdb/pcp2influxdb.1 @@ -0,0 +1,114 @@ +.TH PCP2INFLUXDB 1 "PCP" "Performance Co-Pilot" +.SH NAME +.B pcp2influxdb +\- pcp-to-influxdb metrics exporter +.SH SYNOPSIS +.B pcp2influxdb +[\f3\-h\f1 \f2hostspec\f1] +[\f3\-a\f1 \f2archive\f1] +[\f3\-L\f1] +[\f3\-O\f1 \f2origin\f1] +[\f3\-T\f1 \f2finish\f1] +[\f3\-s\f1 \f2samples\f1] +[\f3\-t\f1 \f2delta\f1] +[\f3\-i\f1 \f2influxdbaddress\f1] +[\f3\-u\f1 \f2units\f1] +[\f3\-d\f1 \f2database\f1] +[\f3\-U\f1 \f2username\f1] +[\f3\-P\f1 \f2password\f1] +[\f3\-I\f1 \f2tagstring\f1] +\f2metricname\f1 ... +.SH DESCRIPTION +.B pcp2influxdb +is a PCP metric exporter. +It relays metrics of the +.BR pmns (5) +to a designated +InfluxDB database by periodically polling, then +copying/converting relevant numerical metrics across the InfluxDB +HTTP(S) protocols. Metric values are optionally +scaled to a given unit/scale, but are not rate-converted. Metric +names are somewhat canonicalized (mangled) to fit the +InfluxDB namespace rules. +.PP +The data transfer will continue on a best-effort basis until the +program is interrupted. Temporary errors are ignored, but reported. Multiple +copies of this program may be run in parallel, to populate different +InfluxDB databases or subtrees, with different scaling or sampling +intervals. +.PP +.SH OPTIONS +.TP +.B \-h \f2hostspec\f1 +Extract metrics from the specified +.BR pmcd (1) +daemon, using a host specification as described on +.BR PCPIntro (1). +The default is \f3local:\f1. +.TP +.B \-a \f2archive\f1 +Extract metrics from the given set of archives, +which is a comma-separated list of names, each +of which may be the base name of an archive or the name of a directory containing +one or more archives. +.TP +.B \-L +Extract metrics from the local context PCP PMDAs. +.TP +.B \-t \f2delta\f1 +Specify the polling interval in seconds. If live, polling is aligned to +natural multiples of this interval. The default is \f360\f1 seconds. +.TP +.B \-O \f2origin\f1 +Set the time origin for archive scanning. The default is the start of the +set of archives. +.TP +.B \-T \f2finish\f1 +Set the time finish for archive scanning. The default is the end of the +set of archives. +.TP +.B \-s \f2samples\f1 +Set the maximum number of samples to relay. The default is unlimited. +.TP +.B \-i \f2influxdbaddress\f1 +Specify the influxdb HTTP(S) address where the relayed data is to +be relayed. The default is \f3http://localhost:8086\f1. +.TP +.B \-u \f2units\f1 +Rescale the metrics to the given UNITS string, as parsed by the +.IR pmParseUnitsStr (3) +function. This must be dimensionally consistent with the underlying +PCP metric. The default is to perform no scaling. +.TP +.B \-d \f2database\f1 +The name of the database to write to. Must exist already; will not be created. +Defaults to "pcp". +.TP +.B \-U \f2username\f1 +The username to use to authenticate to InfluxDB. Must have write access on the +database that metrics are written to. +.TP +.B \-P \f2password\f1 +The password to use to authenticate to InfluxDB that goes with the given +username. +.TP +.B \-I \f2tagstring\f1 +A string of tags, in InfluxDB's format (i.e. host=www.example.com,gpu=tesla). +Not required, but recommended to be enable richer querying and dashboard +creation. +.TP +.I metricname +Relay all numeric metrics nested at or below the given name in the PMNS. +This option may be repeated. This is a compulsory option, having no +default. +.SH "SEE ALSO" +.BR PCPIntro (1), +.BR pmcd (1), +.BR pmParseUnitsStr (3), +.BR https://influxdata.com/time-series-platform/influxdb/, +.BR pmmgr (1), +.BR pmval (1), +.BR PMAPI (3), +.BR pcp.conf (5) +and +.BR pmns (5). diff --git a/src/pcp2influxdb/pcp2influxdb.py b/src/pcp2influxdb/pcp2influxdb.py new file mode 100644 index 0000000..db52231 --- /dev/null +++ b/src/pcp2influxdb/pcp2influxdb.py @@ -0,0 +1,375 @@ +#!/usr/bin/env pmpython +# +# Copyright (C) 2014-2015 Red Hat. +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the +# Free Software Foundation; either version 2 of the License, or (at your +# option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. +# +""" Relay PCP metrics to an InfluxDB server """ + +import re +import sys +import time + +from pcp import pmapi +import cpmapi as c_api + + +class WriteBody(object): + """ Create a request to POST to /write on an InfluxDB server + + name will be used for the measurement name after it has been + transformed to be allowable. Characters like '-' and '.' will be replaced + with '_', and multiple underscores in a row will be replaced with a single + underscore. + + value will be put into the measurement with a field key of 'value'. + It should be a numeric type, but it will _not_ be checked, just cast + directly to a string. + + timestamp should be an integer that is unix time from epoch in seconds. + + tags should be a dictionary of tags to add, with keys being tag + keys and values being tag values. + """ + + def __init__(self, name, value, timestamp, tag_str=None): + self.body = self.sanitize_name(name) + + if tag_str: + self.body += ',' + tag_str + + self.body += ' ' + + self.body += 'value=' + str(value) + + self.body += ' ' + + # converting to nanoseconds for influxdb + self.body += str(timestamp) + '000000000' + + def sanitize_name(self, name): + tmp = name + + for c in ['.', '-']: + tmp = tmp.replace(c, '_') + + while '__' in tmp: + tmp = tmp.replace('__', '_') + + return tmp + + +class Relay(object): + """ Sends a periodic report to InfluxDB about all instances of named + metrics. Knows about some of the default PCP arguments. + """ + + def describe_source(self): + """ Return a string describing our context; apprx. inverse of + pmapi.fromOptions + """ + + ctxtype = self.context.type + if ctxtype == c_api.PM_CONTEXT_ARCHIVE: + return "archive " + ", ".join(self.opts.pmGetOptionArchives()) + elif ctxtype == c_api.PM_CONTEXT_HOST: + hosts = self.opts.pmGetOptionHosts() + + # pmapi.py idiosyncracy; it has already defaulted to this + if hosts is None: + hosts = ["local:"] + + return "host " + ", ".join(hosts) + elif ctxtype == c_api.PM_CONTEXT_LOCAL: + hosts = ["local:"] + + return "host " + ", ".join(hosts) + else: + raise pmapi.pmUsageErr + + def __init__(self): + """ Construct object, parse command line """ + self.context = None + self.database = 'pcp' + self.influxdb_tags = '' + self.influxdb_address = 'http://127.0.0.1:8086' + self.influxdb_user = None + self.influxdb_pass = None + self.sample_count = 0 + self.unitsstr = None + self.units = None + self.units_mult = None + + # option setup + self.opts = pmapi.pmOptions() + self.opts.pmSetShortOptions("a:O:s:T:g:p:P:r:m:t:u:h:t:D:LV?") + self.opts.pmSetShortUsage("[options] metricname ...") + self.opts.pmSetLongOptionText(""" +Description: Periodically, relay raw values of all instances of a +given hierarchies of PCP metrics to an InfluxDB server on the network.""") + self.opts.pmSetLongOptionHeader("Options") + self.opts.pmSetOptionCallback(self.option) + + # common options + self.opts.pmSetLongOptionVersion() + self.opts.pmSetLongOptionArchive() + self.opts.pmSetLongOptionOrigin() + self.opts.pmSetLongOptionSamples() + self.opts.pmSetLongOptionFinish() + self.opts.pmSetLongOptionDebug() + self.opts.pmSetLongOptionHost() + self.opts.pmSetLongOptionLocalPMDA() + self.opts.pmSetLongOptionInterval() + + # custom options + self.opts.pmSetLongOption("influxdb-address", 1, 'i', '', + "InfluxDB HTTP/HTTPS address " + + "(default \"" + self.influxdb_address + + "\")") + self.opts.pmSetLongOption("units", 1, 'u', '', + "rescale units " + + "(e.g. \"MB\", will omit incompatible units)") + self.opts.pmSetLongOption("database", 1, 'd', '', + "database for metric (default \"pcp\")") + self.opts.pmSetLongOption("db-user", 1, 'U', '', + "username for InfluxDB database") + self.opts.pmSetLongOption("db-password", 1, 'P', '', + "password for InfluxDB database") + self.opts.pmSetLongOption("tag-string", 1, 'I', '', + "string of tags to add to the metrics") + self.opts.pmSetLongOptionHelp() + + # parse options + self.context = pmapi.pmContext.fromOptions(self.opts, sys.argv) + self.interval = self.opts.pmGetOptionInterval() or pmapi.timeval(60, 0) + if self.unitsstr is not None: + units = self.context.pmParseUnitsStr(self.unitsstr) + (self.units, self.units_mult) = units + self.metrics = [] + self.pmids = [] + self.descs = [] + metrics = self.opts.pmGetNonOptionsFromList(sys.argv) + if metrics: + for m in metrics: + try: + self.context.pmTraversePMNS(m, + self.handle_candidate_metric) + except pmapi.pmErr as error: + sys.stderr.write("Excluding metric %s (%s)\n" % + (m, str(error))) + + sys.stderr.flush() + + if len(self.metrics) == 0: + sys.stderr.write("No acceptable metrics specified.\n") + raise pmapi.pmUsageErr() + + # Report what we're about to do + print("Relaying %d %smetric(s) to database %s with tags %s from %s " + "to %s every %f s" % + (len(self.metrics), + "rescaled " if self.units else "", + self.database, + self.influxdb_tags, + self.describe_source(), + self.influxdb_address, + self.interval)) + + sys.stdout.flush() + + def option(self, opt, optarg, index): + # need only handle the non-common options + if opt == 'i': + self.influxdb_address = optarg + elif opt == 'u': + self.unitsstr = optarg + elif opt == 'd': + self.database = optarg + elif opt == 'U': + self.influxdb_user = optarg + elif opt == 'P': + self.influxdb_pass = optarg + elif opt == 'I': + self.influxdb_tags = optarg + else: + raise pmapi.pmUsageErr() + + # Check the given metric name (a leaf in the PMNS) for + # acceptability for graphite: it needs to be numeric, and + # convertable to the given unit (if specified). + # + # Print an error message here if needed; can't throw an exception + # through the pmapi pmTraversePMNS wrapper. + def handle_candidate_metric(self, name): + try: + pmid = self.context.pmLookupName(name)[0] + desc = self.context.pmLookupDescs(pmid)[0] + except pmapi.pmErr as err: + sys.stderr.write("Excluding metric %s (%s)\n" % (name, str(err))) + return + + # reject non-numeric types (future pmExtractValue failure) + types = desc.contents.type + if not ((types == c_api.PM_TYPE_32) or + (types == c_api.PM_TYPE_U32) or + (types == c_api.PM_TYPE_64) or + (types == c_api.PM_TYPE_U64) or + (types == c_api.PM_TYPE_FLOAT) or + (types == c_api.PM_TYPE_DOUBLE)): + sys.stderr.write("Excluding metric %s (%s)\n" % + (name, "need numeric type")) + return + + # reject dimensionally incompatible (future pmConvScale failure) + if self.units is not None: + units = desc.contents.units + if (units.dimSpace != self.units.dimSpace or + units.dimTime != self.units.dimTime or + units.dimCount != self.units.dimCount): + sys.stderr.write("Excluding metric %s (%s)\n" % + (name, "incompatible dimensions")) + return + + self.metrics.append(name) + self.pmids.append(pmid) + self.descs.append(desc) + + + # Convert a python list of pmids (numbers) to a ctypes LP_c_uint + # (a C array of uints). + def convert_pmids_to_ctypes(self, pmids): + from ctypes import c_uint + pmidA = (c_uint * len(pmids))() + for i, p in enumerate(pmids): + pmidA[i] = c_uint(p) + return pmidA + + def send(self, timestamp, miv_tuples): + import requests + + body_parts = [] + + for (metric, value) in miv_tuples: + part = WriteBody(name=metric, value=value, timestamp=timestamp, + tag_str=self.influxdb_tags) + + body_parts.append(part.body) + + url = self.influxdb_address + '/write' + params = {'db': self.database} + body = "\n".join(body_parts) + auth = None + + if self.influxdb_user and self.influxdb_pass: + auth = requests.auth.HTTPBasicAuth(self.influxdb_user, + self.influxdb_pass) + + res = requests.post(url, params=params, data=body, auth=auth) + + if res.status_code != 204: + print("could not send for some reason") + + if res.status_code == 200: + print("influx could not complete the request") + else: + print("request to " + res.url + " failed with code " + + str(res.status_code)) + + print("body of request is:") + print(body) + + def sanitize_nameindom(self, string): + """ Quote the given instance-domain string for proper digestion + by carbon/graphite. """ + return "_" + re.sub('[^a-zA-Z_0-9-]', '_', string) + + def execute(self): + """ Using a PMAPI context (could be either host or archive), + fetch and report a fixed set of values related to graphite. + """ + + # align poll interval to host clock + ctype = self.context.type + if ctype == c_api.PM_CONTEXT_HOST or ctype == c_api.PM_CONTEXT_LOCAL: + align = float(self.interval) - (time.time() % float(self.interval)) + time.sleep(align) + + # We would like to do: result = self.context.pmFetch(self.pmids) + # But pmFetch takes ctypes array-of-uint's and not a python list; + # ideally, pmFetch would become polymorphic to improve this code. + result = self.context.pmFetch(self.convert_pmids_to_ctypes(self.pmids)) + sample_time = result.contents.timestamp.tv_sec + # + (result.contents.timestamp.tv_usec/1000000.0) + + if ctype == c_api.PM_CONTEXT_ARCHIVE: + endtime = self.opts.pmGetOptionFinish() + if endtime is not None: + if float(sample_time) > float(endtime.tv_sec): + raise SystemExit + + miv_tuples = [] + + for i, name in enumerate(self.metrics): + for j in range(0, result.contents.get_numval(i)): + # a fetch or other error will just omit that data value + # from the graphite-bound set + try: + atom = self.context.pmExtractValue( + result.contents.get_valfmt(i), + result.contents.get_vlist(i, j), + self.descs[i].contents.type, c_api.PM_TYPE_FLOAT) + + inst = result.contents.get_vlist(i, j).inst + if inst is None or inst < 0: + suffix = "" + else: + indom = self.context.pmNameInDom(self.descs[i], inst) + suffix = "." + self.sanitize_nameindom(indom) + + # Rescale if desired + if self.units is not None: + atom = self.context.pmConvScale(c_api.PM_TYPE_FLOAT, + atom, + self.descs, i, + self.units) + + if self.units_mult is not None: + atom.f = atom.f * self.units_mult + + miv_tuples.append((name+suffix, atom.f)) + + except pmapi.pmErr as error: + sys.stderr.write("%s[%d]: %s, continuing\n" % + (name, inst, str(error))) + + self.send(sample_time, miv_tuples) + self.context.pmFreeResult(result) + + self.sample_count += 1 + max_samples = self.opts.pmGetOptionSamples() + if max_samples is not None and self.sample_count >= max_samples: + raise SystemExit + + +if __name__ == '__main__': + try: + relay = Relay() + while True: + relay.execute() + + except pmapi.pmErr as error: + if error.args[0] == c_api.PM_ERR_EOL: + pass + sys.stderr.write('%s: %s\n' % (error.progname(), error.message())) + except pmapi.pmUsageErr as usage: + usage.message() + except KeyboardInterrupt: + pass