+++ /dev/null
-Description: transformer: Add aggregator transformer
- This adds a transformer aggregate counters until a threshold and/or a
- retention time and then flushing them out.
-Author: Mehdi Abaakouk <mehdi.abaakouk@enovance.com>
-Date: Wed, 9 Apr 2014 14:28:54 +0000 (+0200)
-X-Git-Url: https://review.openstack.org/gitweb?p=openstack%2Fceilometer.git;a=commitdiff_plain;h=6957e7b66aa37685ec0d75fe7ef65cdab981d55f
-Change-Id: If4a950e585fe5309ddec58ed2c9a92928ac1acb2
-Co-Authored-By: Jordan Pittier <jordan.pittier@cloudwatt.com>
-Origin: upstream, https://review.openstack.org/#/c/87238/
-Last-Update: 2014-05-21
-
-diff --git a/ceilometer/tests/pipeline_base.py b/ceilometer/tests/pipeline_base.py
-index db51c64..f4f5a64 100644
---- a/ceilometer/tests/pipeline_base.py
-+++ b/ceilometer/tests/pipeline_base.py
-@@ -20,6 +20,7 @@
- import abc
- import datetime
-
-+import mock
- import six
- from stevedore import extension
-
-@@ -52,6 +53,7 @@ class BasePipelineTestCase(test.BaseTestCase):
- 'except': self.TransformerClassException,
- 'drop': self.TransformerClassDrop,
- 'cache': accumulator.TransformerAccumulator,
-+ 'aggregator': conversions.AggregatorTransformer,
- 'unit_conversion': conversions.ScalingTransformer,
- 'rate_of_change': conversions.RateOfChangeTransformer,
- }
-@@ -1113,3 +1115,329 @@ class BasePipelineTestCase(test.BaseTestCase):
- meters = ('disk.read.bytes', 'disk.write.requests')
- units = ('B', 'request')
- self._do_test_rate_of_change_mapping(pipe, meters, units)
-+
-+ def _do_test_aggregator(self, parameters, expected_length):
-+ transformer_cfg = [
-+ {
-+ 'name': 'aggregator',
-+ 'parameters': parameters,
-+ },
-+ ]
-+ self._set_pipeline_cfg('transformers', transformer_cfg)
-+ self._set_pipeline_cfg('counters', ['storage.objects.incoming.bytes'])
-+ counters = [
-+ sample.Sample(
-+ name='storage.objects.incoming.bytes',
-+ type=sample.TYPE_DELTA,
-+ volume=26,
-+ unit='B',
-+ user_id='test_user',
-+ project_id='test_proj',
-+ resource_id='test_resource',
-+ timestamp=timeutils.utcnow().isoformat(),
-+ resource_metadata={'version': '1.0'}
-+ ),
-+ sample.Sample(
-+ name='storage.objects.incoming.bytes',
-+ type=sample.TYPE_DELTA,
-+ volume=16,
-+ unit='B',
-+ user_id='test_user',
-+ project_id='test_proj',
-+ resource_id='test_resource',
-+ timestamp=timeutils.utcnow().isoformat(),
-+ resource_metadata={'version': '2.0'}
-+ ),
-+ sample.Sample(
-+ name='storage.objects.incoming.bytes',
-+ type=sample.TYPE_DELTA,
-+ volume=53,
-+ unit='B',
-+ user_id='test_user_bis',
-+ project_id='test_proj_bis',
-+ resource_id='test_resource',
-+ timestamp=timeutils.utcnow().isoformat(),
-+ resource_metadata={'version': '1.0'}
-+ ),
-+ sample.Sample(
-+ name='storage.objects.incoming.bytes',
-+ type=sample.TYPE_DELTA,
-+ volume=42,
-+ unit='B',
-+ user_id='test_user_bis',
-+ project_id='test_proj_bis',
-+ resource_id='test_resource',
-+ timestamp=timeutils.utcnow().isoformat(),
-+ resource_metadata={'version': '2.0'}
-+ ),
-+ sample.Sample(
-+ name='storage.objects.incoming.bytes',
-+ type=sample.TYPE_DELTA,
-+ volume=15,
-+ unit='B',
-+ user_id='test_user',
-+ project_id='test_proj_bis',
-+ resource_id='test_resource',
-+ timestamp=timeutils.utcnow().isoformat(),
-+ resource_metadata={'version': '2.0'}
-+ ),
-+ sample.Sample(
-+ name='storage.objects.incoming.bytes',
-+ type=sample.TYPE_DELTA,
-+ volume=2,
-+ unit='B',
-+ user_id='test_user_bis',
-+ project_id='test_proj',
-+ resource_id='test_resource',
-+ timestamp=timeutils.utcnow().isoformat(),
-+ resource_metadata={'version': '3.0'}
-+ ),
-+ ]
-+
-+ pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
-+ self.transformer_manager)
-+ pipe = pipeline_manager.pipelines[0]
-+
-+ pipe.publish_samples(None, counters)
-+ pipe.flush(None)
-+ publisher = pipeline_manager.pipelines[0].publishers[0]
-+ self.assertEqual(expected_length, len(publisher.samples))
-+ return sorted(publisher.samples, key=lambda s: s.volume)
-+
-+ def test_aggregator_metadata(self):
-+ for conf, expected_version in [('last', '2.0'), ('first', '1.0')]:
-+ samples = self._do_test_aggregator({
-+ 'resource_metadata': conf,
-+ 'target': {'name': 'aggregated-bytes'}
-+ }, expected_length=4)
-+ s = samples[0]
-+ self.assertEqual('aggregated-bytes', s.name)
-+ self.assertEqual(2, s.volume)
-+ self.assertEqual('test_user_bis', s.user_id)
-+ self.assertEqual('test_proj', s.project_id)
-+ self.assertEqual({'version': '3.0'},
-+ s.resource_metadata)
-+ s = samples[1]
-+ self.assertEqual('aggregated-bytes', s.name)
-+ self.assertEqual(15, s.volume)
-+ self.assertEqual('test_user', s.user_id)
-+ self.assertEqual('test_proj_bis', s.project_id)
-+ self.assertEqual({'version': '2.0'},
-+ s.resource_metadata)
-+ s = samples[2]
-+ self.assertEqual('aggregated-bytes', s.name)
-+ self.assertEqual(42, s.volume)
-+ self.assertEqual('test_user', s.user_id)
-+ self.assertEqual('test_proj', s.project_id)
-+ self.assertEqual({'version': expected_version},
-+ s.resource_metadata)
-+ s = samples[3]
-+ self.assertEqual('aggregated-bytes', s.name)
-+ self.assertEqual(95, s.volume)
-+ self.assertEqual('test_user_bis', s.user_id)
-+ self.assertEqual('test_proj_bis', s.project_id)
-+ self.assertEqual({'version': expected_version},
-+ s.resource_metadata)
-+
-+ def test_aggregator_user_last_and_metadata_last(self):
-+ samples = self._do_test_aggregator({
-+ 'resource_metadata': 'last',
-+ 'user_id': 'last',
-+ 'target': {'name': 'aggregated-bytes'}
-+ }, expected_length=2)
-+ s = samples[0]
-+ self.assertEqual('aggregated-bytes', s.name)
-+ self.assertEqual(44, s.volume)
-+ self.assertEqual('test_user_bis', s.user_id)
-+ self.assertEqual('test_proj', s.project_id)
-+ self.assertEqual({'version': '3.0'},
-+ s.resource_metadata)
-+ s = samples[1]
-+ self.assertEqual('aggregated-bytes', s.name)
-+ self.assertEqual(110, s.volume)
-+ self.assertEqual('test_user', s.user_id)
-+ self.assertEqual('test_proj_bis', s.project_id)
-+ self.assertEqual({'version': '2.0'},
-+ s.resource_metadata)
-+
-+ def test_aggregator_user_first_and_metadata_last(self):
-+ samples = self._do_test_aggregator({
-+ 'resource_metadata': 'last',
-+ 'user_id': 'first',
-+ 'target': {'name': 'aggregated-bytes'}
-+ }, expected_length=2)
-+ s = samples[0]
-+ self.assertEqual('aggregated-bytes', s.name)
-+ self.assertEqual(44, s.volume)
-+ self.assertEqual('test_user', s.user_id)
-+ self.assertEqual('test_proj', s.project_id)
-+ self.assertEqual({'version': '3.0'},
-+ s.resource_metadata)
-+ s = samples[1]
-+ self.assertEqual('aggregated-bytes', s.name)
-+ self.assertEqual(110, s.volume)
-+ self.assertEqual('test_user_bis', s.user_id)
-+ self.assertEqual('test_proj_bis', s.project_id)
-+ self.assertEqual({'version': '2.0'},
-+ s.resource_metadata)
-+
-+ def test_aggregator_all_first(self):
-+ samples = self._do_test_aggregator({
-+ 'resource_metadata': 'first',
-+ 'user_id': 'first',
-+ 'project_id': 'first',
-+ 'target': {'name': 'aggregated-bytes'}
-+ }, expected_length=1)
-+ s = samples[0]
-+ self.assertEqual('aggregated-bytes', s.name)
-+ self.assertEqual(154, s.volume)
-+ self.assertEqual('test_user', s.user_id)
-+ self.assertEqual('test_proj', s.project_id)
-+ self.assertEqual({'version': '1.0'},
-+ s.resource_metadata)
-+
-+ def test_aggregator_all_last(self):
-+ samples = self._do_test_aggregator({
-+ 'resource_metadata': 'last',
-+ 'user_id': 'last',
-+ 'project_id': 'last',
-+ 'target': {'name': 'aggregated-bytes'}
-+ }, expected_length=1)
-+ s = samples[0]
-+ self.assertEqual('aggregated-bytes', s.name)
-+ self.assertEqual(154, s.volume)
-+ self.assertEqual('test_user_bis', s.user_id)
-+ self.assertEqual('test_proj', s.project_id)
-+ self.assertEqual({'version': '3.0'},
-+ s.resource_metadata)
-+
-+ def test_aggregator_all_mixed(self):
-+ samples = self._do_test_aggregator({
-+ 'resource_metadata': 'drop',
-+ 'user_id': 'first',
-+ 'project_id': 'last',
-+ 'target': {'name': 'aggregated-bytes'}
-+ }, expected_length=1)
-+ s = samples[0]
-+ self.assertEqual('aggregated-bytes', s.name)
-+ self.assertEqual(154, s.volume)
-+ self.assertEqual('test_user', s.user_id)
-+ self.assertEqual('test_proj', s.project_id)
-+ self.assertEqual({}, s.resource_metadata)
-+
-+ def test_aggregator_metadata_default(self):
-+ samples = self._do_test_aggregator({
-+ 'user_id': 'last',
-+ 'project_id': 'last',
-+ 'target': {'name': 'aggregated-bytes'}
-+ }, expected_length=1)
-+ s = samples[0]
-+ self.assertEqual('aggregated-bytes', s.name)
-+ self.assertEqual(154, s.volume)
-+ self.assertEqual('test_user_bis', s.user_id)
-+ self.assertEqual('test_proj', s.project_id)
-+ self.assertEqual({'version': '3.0'},
-+ s.resource_metadata)
-+
-+ @mock.patch('ceilometer.transformer.conversions.LOG')
-+ def test_aggregator_metadata_invalid(self, mylog):
-+ samples = self._do_test_aggregator({
-+ 'resource_metadata': 'invalid',
-+ 'user_id': 'last',
-+ 'project_id': 'last',
-+ 'target': {'name': 'aggregated-bytes'}
-+ }, expected_length=1)
-+ s = samples[0]
-+ self.assertTrue(mylog.warn.called)
-+ self.assertEqual('aggregated-bytes', s.name)
-+ self.assertEqual(154, s.volume)
-+ self.assertEqual('test_user_bis', s.user_id)
-+ self.assertEqual('test_proj', s.project_id)
-+ self.assertEqual({'version': '3.0'},
-+ s.resource_metadata)
-+
-+ def test_aggregator_sized_flush(self):
-+ transformer_cfg = [
-+ {
-+ 'name': 'aggregator',
-+ 'parameters': {'size': 2},
-+ },
-+ ]
-+ self._set_pipeline_cfg('transformers', transformer_cfg)
-+ self._set_pipeline_cfg('counters', ['storage.objects.incoming.bytes'])
-+ counters = [
-+ sample.Sample(
-+ name='storage.objects.incoming.bytes',
-+ type=sample.TYPE_DELTA,
-+ volume=26,
-+ unit='B',
-+ user_id='test_user',
-+ project_id='test_proj',
-+ resource_id='test_resource',
-+ timestamp=timeutils.utcnow().isoformat(),
-+ resource_metadata={'version': '1.0'}
-+ ),
-+ sample.Sample(
-+ name='storage.objects.incoming.bytes',
-+ type=sample.TYPE_DELTA,
-+ volume=16,
-+ unit='B',
-+ user_id='test_user_bis',
-+ project_id='test_proj_bis',
-+ resource_id='test_resource',
-+ timestamp=timeutils.utcnow().isoformat(),
-+ resource_metadata={'version': '2.0'}
-+ )
-+ ]
-+
-+ pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
-+ self.transformer_manager)
-+ pipe = pipeline_manager.pipelines[0]
-+
-+ pipe.publish_samples(None, [counters[0]])
-+ pipe.flush(None)
-+ publisher = pipe.publishers[0]
-+ self.assertEqual(0, len(publisher.samples))
-+
-+ pipe.publish_samples(None, [counters[1]])
-+ pipe.flush(None)
-+ publisher = pipe.publishers[0]
-+ self.assertEqual(2, len(publisher.samples))
-+
-+ def test_aggregator_timed_flush(self):
-+ timeutils.set_time_override()
-+ transformer_cfg = [
-+ {
-+ 'name': 'aggregator',
-+ 'parameters': {'size': 900, 'retention_time': 60},
-+ },
-+ ]
-+ self._set_pipeline_cfg('transformers', transformer_cfg)
-+ self._set_pipeline_cfg('counters', ['storage.objects.incoming.bytes'])
-+ counters = [
-+ sample.Sample(
-+ name='storage.objects.incoming.bytes',
-+ type=sample.TYPE_DELTA,
-+ volume=26,
-+ unit='B',
-+ user_id='test_user',
-+ project_id='test_proj',
-+ resource_id='test_resource',
-+ timestamp=timeutils.utcnow().isoformat(),
-+ resource_metadata={'version': '1.0'}
-+ ),
-+ ]
-+
-+ pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
-+ self.transformer_manager)
-+ pipe = pipeline_manager.pipelines[0]
-+
-+ pipe.publish_samples(None, counters)
-+ pipe.flush(None)
-+ publisher = pipeline_manager.pipelines[0].publishers[0]
-+ self.assertEqual(0, len(publisher.samples))
-+
-+ timeutils.advance_time_seconds(120)
-+ pipe.flush(None)
-+ publisher = pipeline_manager.pipelines[0].publishers[0]
-+ self.assertEqual(1, len(publisher.samples))
-diff --git a/ceilometer/transformer/conversions.py b/ceilometer/transformer/conversions.py
-index 3a7d079..bfa9f0c 100644
---- a/ceilometer/transformer/conversions.py
-+++ b/ceilometer/transformer/conversions.py
-@@ -163,3 +163,89 @@ class RateOfChangeTransformer(ScalingTransformer):
- (s,))
- s = None
- return s
-+
-+
-+class AggregatorTransformer(ScalingTransformer):
-+ """Transformer that aggregate sample until a threshold or/and a
-+ retention_time, and then flush them out in the wild.
-+
-+ Example:
-+ To aggregate sample by resource_metadata and keep the
-+ resource_metadata of the latest received sample;
-+
-+ AggregatorTransformer(retention_time=60, resource_metadata='last')
-+
-+ To aggregate sample by user_id and resource_metadata and keep the
-+ user_id of the first received sample and drop the resource_metadata.
-+
-+ AggregatorTransformer(size=15, user_id='first',
-+ resource_metadata='drop')
-+
-+ """
-+
-+ def __init__(self, size=1, retention_time=None,
-+ project_id=None, user_id=None, resource_metadata="last",
-+ **kwargs):
-+ super(AggregatorTransformer, self).__init__(**kwargs)
-+ self.samples = {}
-+ self.size = size
-+ self.retention_time = retention_time
-+ self.initial_timestamp = None
-+ self.aggregated_samples = 0
-+
-+ self.key_attributes = []
-+ self.merged_attribute_policy = {}
-+
-+ self._init_attribute('project_id', project_id)
-+ self._init_attribute('user_id', user_id)
-+ self._init_attribute('resource_metadata', resource_metadata,
-+ is_droppable=True, mandatory=True)
-+
-+ def _init_attribute(self, name, value, is_droppable=False,
-+ mandatory=False):
-+ drop = ['drop'] if is_droppable else []
-+ if value or mandatory:
-+ if value not in ['last', 'first'] + drop:
-+ LOG.warn('%s is unknown (%s), using last' % (name, value))
-+ value = 'last'
-+ self.merged_attribute_policy[name] = value
-+ else:
-+ self.key_attributes.append(name)
-+
-+ def _get_unique_key(self, s):
-+ non_aggregated_keys = "-".join([getattr(s, field)
-+ for field in self.key_attributes])
-+ #NOTE(sileht): it assumes, a meter always have the same unit/type
-+ return "%s-%s-%s" % (s.name, s.resource_id, non_aggregated_keys)
-+
-+ def handle_sample(self, context, sample):
-+ if not self.initial_timestamp:
-+ self.initial_timestamp = timeutils.parse_strtime(
-+ sample.timestamp)
-+
-+ self.aggregated_samples += 1
-+ key = self._get_unique_key(sample)
-+ if key not in self.samples:
-+ self.samples[key] = self._convert(sample)
-+ if self.merged_attribute_policy[
-+ 'resource_metadata'] == 'drop':
-+ self.samples[key].resource_metadata = {}
-+ else:
-+ self.samples[key].volume += self._scale(sample)
-+ for field in self.merged_attribute_policy:
-+ if self.merged_attribute_policy[field] == 'last':
-+ setattr(self.samples[key], field,
-+ getattr(sample, field))
-+
-+ def flush(self, context):
-+ expired = self.retention_time and \
-+ timeutils.is_older_than(self.initial_timestamp,
-+ self.retention_time)
-+ full = self.aggregated_samples >= self.size
-+ if full or expired:
-+ x = self.samples.values()
-+ self.samples = {}
-+ self.aggregated_samples = 0
-+ self.initial_timestamp = None
-+ return x
-+ return []
-diff --git a/setup.cfg b/setup.cfg
-index ef67325..3f19142 100644
---- a/setup.cfg
-+++ b/setup.cfg
-@@ -156,6 +156,7 @@ ceilometer.transformer =
- accumulator = ceilometer.transformer.accumulator:TransformerAccumulator
- unit_conversion = ceilometer.transformer.conversions:ScalingTransformer
- rate_of_change = ceilometer.transformer.conversions:RateOfChangeTransformer
-+ aggregator = ceilometer.transformer.conversions:AggregatorTransformer
-
- ceilometer.publisher =
- test = ceilometer.publisher.test:TestPublisher
+++ /dev/null
-From: Sylvain Afchain <sylvain.afchain@enovance.com>
-Date: Sat, 5 Apr 2014 05:43:55 +0000 (+0200)
-Subject: Opencontrail network statistics driver
-X-Git-Url: https://review.openstack.org/gitweb?p=openstack%2Fceilometer.git;a=commitdiff_plain;h=891819736dcbd04b9ca81245419f87dadb237b97
-
-Opencontrail network statistics driver
-
-This patch introduces a network statistics driver
-for Opencontrail. Only port statistics are currently
-returned by the driver.
-
-Implements: blueprint meter-from-opencontrail
-Co-Authored-By: Edouard Thuleau <edouard.thuleau@cloudwatt.com>
-Change-Id: Ic0afc478362fb4170903ee4e3723b82cd6c723fa
-(cherry picked from commit 6e0f4d9bd9c7f3b957adc6f73bf1a48c8c120e1b)
----
-
-diff --git a/ceilometer/network/statistics/opencontrail/__init__.py b/ceilometer/network/statistics/opencontrail/__init__.py
-new file mode 100644
-index 0000000..e69de29
-diff --git a/ceilometer/network/statistics/opencontrail/client.py b/ceilometer/network/statistics/opencontrail/client.py
-new file mode 100644
-index 0000000..51f786d
---- /dev/null
-+++ b/ceilometer/network/statistics/opencontrail/client.py
-@@ -0,0 +1,165 @@
-+# Copyright (C) 2014 eNovance SAS <licensing@enovance.com>
-+#
-+# Author: Sylvain Afchain <sylvain.afchain@enovance.com>
-+#
-+# Licensed under the Apache License, Version 2.0 (the "License"); you may
-+# not use this file except in compliance with the License. You may obtain
-+# a copy of the License at
-+#
-+# http://www.apache.org/licenses/LICENSE-2.0
-+#
-+# Unless required by applicable law or agreed to in writing, software
-+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-+# License for the specific language governing permissions and limitations
-+# under the License.
-+
-+from oslo.config import cfg
-+import requests
-+import six
-+from six.moves.urllib import parse as url_parse
-+
-+from ceilometer.openstack.common.gettextutils import _ # noqa
-+from ceilometer.openstack.common import log
-+
-+
-+CONF = cfg.CONF
-+
-+
-+LOG = log.getLogger(__name__)
-+
-+
-+class OpencontrailAPIFailed(Exception):
-+ pass
-+
-+
-+class AnalyticsAPIBaseClient(object):
-+ """Opencontrail Base Statistics REST API Client."""
-+
-+ def __init__(self, endpoint, username, password, domain, verify_ssl=True):
-+ self.endpoint = endpoint
-+ self.username = username
-+ self.password = password
-+ self.domain = domain
-+ self.verify_ssl = verify_ssl
-+ self.sid = None
-+
-+ def authenticate(self):
-+ path = '/authenticate'
-+ data = {'username': self.username,
-+ 'password': self.password,
-+ 'domain': self.domain}
-+
-+ req_params = self._get_req_params(data=data)
-+ url = url_parse.urljoin(self.endpoint, path)
-+ resp = requests.post(url, **req_params)
-+ if resp.status_code != 302:
-+ raise OpencontrailAPIFailed(
-+ _('Opencontrail API returned %(status)s %(reason)s') %
-+ {'status': resp.status_code, 'reason': resp.reason})
-+ self.sid = resp.cookies['connect.sid']
-+
-+ def request(self, path, fqdn_uuid, data, retry=True):
-+ if not self.sid:
-+ self.authenticate()
-+
-+ if not data:
-+ data = {'fqnUUID': fqdn_uuid}
-+ else:
-+ data['fqnUUID'] = fqdn_uuid
-+
-+ req_params = self._get_req_params(data=data,
-+ cookies={'connect.sid': self.sid})
-+
-+ url = url_parse.urljoin(self.endpoint, path)
-+ self._log_req(url, req_params)
-+ resp = requests.get(url, **req_params)
-+ self._log_res(resp)
-+
-+ # it seems that the sid token has to be renewed
-+ if resp.status_code == 302:
-+ self.sid = 0
-+ if retry:
-+ return self.request(path, fqdn_uuid, data,
-+ retry=False)
-+
-+ if resp.status_code != 200:
-+ raise OpencontrailAPIFailed(
-+ _('Opencontrail API returned %(status)s %(reason)s') %
-+ {'status': resp.status_code, 'reason': resp.reason})
-+
-+ return resp
-+
-+ def _get_req_params(self, params=None, data=None, cookies=None):
-+ req_params = {
-+ 'headers': {
-+ 'Accept': 'application/json'
-+ },
-+ 'data': data,
-+ 'verify': self.verify_ssl,
-+ 'allow_redirects': False,
-+ 'cookies': cookies
-+ }
-+
-+ return req_params
-+
-+ @staticmethod
-+ def _log_req(url, req_params):
-+ if not CONF.debug:
-+ return
-+
-+ curl_command = ['REQ: curl -i -X GET ']
-+
-+ params = []
-+ for name, value in six.iteritems(req_params['data']):
-+ params.append("%s=%s" % (name, value))
-+
-+ curl_command.append('"%s?%s" ' % (url, '&'.join(params)))
-+
-+ for name, value in six.iteritems(req_params['headers']):
-+ curl_command.append('-H "%s: %s" ' % (name, value))
-+
-+ LOG.debug(''.join(curl_command))
-+
-+ @staticmethod
-+ def _log_res(resp):
-+ if not CONF.debug:
-+ return
-+
-+ dump = ['RES: \n']
-+ dump.append('HTTP %.1f %s %s\n' % (resp.raw.version,
-+ resp.status_code,
-+ resp.reason))
-+ dump.extend(['%s: %s\n' % (k, v)
-+ for k, v in six.iteritems(resp.headers)])
-+ dump.append('\n')
-+ if resp.content:
-+ dump.extend([resp.content, '\n'])
-+
-+ LOG.debug(''.join(dump))
-+
-+
-+class NetworksAPIClient(AnalyticsAPIBaseClient):
-+ """Opencontrail Statistics REST API Client."""
-+
-+ def get_port_statistics(self, fqdn_uuid):
-+ """Get port statistics of a network
-+
-+ URL:
-+ /tenant/networking/virtual-machines/details
-+ PARAMS:
-+ fqdnUUID=fqdn_uuid
-+ type=vn
-+ """
-+
-+ path = '/api/tenant/networking/virtual-machines/details'
-+ resp = self.request(path, fqdn_uuid, {'type': 'vn'})
-+
-+ return resp.json()
-+
-+
-+class Client(object):
-+
-+ def __init__(self, endpoint, username, password, domain, verify_ssl=True):
-+ self.networks = NetworksAPIClient(endpoint, username, password,
-+ domain, verify_ssl)
-diff --git a/ceilometer/network/statistics/opencontrail/driver.py b/ceilometer/network/statistics/opencontrail/driver.py
-new file mode 100644
-index 0000000..a54de7d
---- /dev/null
-+++ b/ceilometer/network/statistics/opencontrail/driver.py
-@@ -0,0 +1,149 @@
-+# Copyright (C) 2014 eNovance SAS <licensing@enovance.com>
-+#
-+# Author: Sylvain Afchain <sylvain.afchain@enovance.com>
-+#
-+# Licensed under the Apache License, Version 2.0 (the "License"); you may
-+# not use this file except in compliance with the License. You may obtain
-+# a copy of the License at
-+#
-+# http://www.apache.org/licenses/LICENSE-2.0
-+#
-+# Unless required by applicable law or agreed to in writing, software
-+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-+# License for the specific language governing permissions and limitations
-+# under the License.
-+
-+from six.moves.urllib import parse as url_parse
-+
-+from ceilometer.network.statistics import driver
-+from ceilometer.network.statistics.opencontrail import client
-+from ceilometer import neutron_client
-+from ceilometer.openstack.common import timeutils
-+
-+
-+class OpencontrailDriver(driver.Driver):
-+ """Driver of network analytics of Opencontrail.
-+
-+ This driver uses resources in "pipeline.yaml".
-+ Resource requires below conditions:
-+ * resource is url
-+ * scheme is "opencontrail"
-+
-+ This driver can be configured via query parameters.
-+ Supported parameters:
-+ * scheme:
-+ The scheme of request url to Opencontrail Analytics endpoint.
-+ (default http)
-+ * username:
-+ This is username used by Opencontrail Analytics.(default None)
-+ * password:
-+ This is password used by Opencontrail Analytics.(default None)
-+ * domain
-+ This is domain used by Opencontrail Analytics.(default None)
-+ * verify_ssl
-+ Specify if the certificate will be checked for https request.
-+ (default false)
-+
-+ e.g.
-+ opencontrail://localhost:8143/?username=admin&password=admin&
-+ scheme=https&domain=&verify_ssl=true
-+ """
-+ @staticmethod
-+ def _prepare_cache(endpoint, params, cache):
-+
-+ if 'network.statistics.opencontrail' in cache:
-+ return cache['network.statistics.opencontrail']
-+
-+ data = {
-+ 'o_client': client.Client(endpoint,
-+ params['username'],
-+ params['password'],
-+ params.get('domain'),
-+ params.get('verify_ssl') == 'true'),
-+ 'n_client': neutron_client.Client()
-+ }
-+
-+ cache['network.statistics.opencontrail'] = data
-+
-+ return data
-+
-+ def get_sample_data(self, meter_name, parse_url, params, cache):
-+
-+ parts = url_parse.ParseResult(params.get('scheme', ['http'])[0],
-+ parse_url.netloc,
-+ parse_url.path,
-+ None,
-+ None,
-+ None)
-+ endpoint = url_parse.urlunparse(parts)
-+
-+ iter = self._get_iter(meter_name)
-+ if iter is None:
-+ # The extractor for this meter is not implemented or the API
-+ # doesn't have method to get this meter.
-+ return
-+
-+ extractor = self._get_extractor(meter_name)
-+ if extractor is None:
-+ # The extractor for this meter is not implemented or the API
-+ # doesn't have method to get this meter.
-+ return
-+
-+ data = self._prepare_cache(endpoint, params, cache)
-+
-+ ports = data['n_client'].port_get_all()
-+ ports_map = dict((port['id'], port['tenant_id']) for port in ports)
-+
-+ networks = data['n_client'].network_get_all()
-+
-+ for network in networks:
-+ net_id = network['id']
-+
-+ timestamp = timeutils.utcnow().isoformat()
-+ statistics = data['o_client'].networks.get_port_statistics(net_id)
-+ if not statistics:
-+ continue
-+
-+ for value in statistics['value']:
-+ for sample in iter(extractor, value, ports_map):
-+ if sample is not None:
-+ sample[2]['network_id'] = net_id
-+ yield sample + (timestamp, )
-+
-+ def _get_iter(self, meter_name):
-+ if meter_name.startswith('switch.port'):
-+ return self._iter_port
-+
-+ def _get_extractor(self, meter_name):
-+ method_name = '_' + meter_name.replace('.', '_')
-+ return getattr(self, method_name, None)
-+
-+ @staticmethod
-+ def _iter_port(extractor, value, ports_map):
-+ ifstats = value['value']['UveVirtualMachineAgent']['if_stats_list']
-+ for ifstat in ifstats:
-+ name = ifstat['name']
-+ device_owner_id, port_id = name.split(':')
-+
-+ tenant_id = ports_map.get(port_id)
-+
-+ resource_meta = {'device_owner_id': device_owner_id,
-+ 'tenant_id': tenant_id}
-+ yield extractor(ifstat, port_id, resource_meta)
-+
-+ @staticmethod
-+ def _switch_port_receive_packets(statistic, resource_id, resource_meta):
-+ return (int(statistic['in_pkts']), resource_id, resource_meta)
-+
-+ @staticmethod
-+ def _switch_port_transmit_packets(statistic, resource_id, resource_meta):
-+ return (int(statistic['out_pkts']), resource_id, resource_meta)
-+
-+ @staticmethod
-+ def _switch_port_receive_bytes(statistic, resource_id, resource_meta):
-+ return (int(statistic['in_bytes']), resource_id, resource_meta)
-+
-+ @staticmethod
-+ def _switch_port_transmit_bytes(statistic, resource_id, resource_meta):
-+ return (int(statistic['out_bytes']), resource_id, resource_meta)
-diff --git a/ceilometer/neutron_client.py b/ceilometer/neutron_client.py
-new file mode 100644
-index 0000000..993df58
---- /dev/null
-+++ b/ceilometer/neutron_client.py
-@@ -0,0 +1,73 @@
-+# Copyright (C) 2014 eNovance SAS <licensing@enovance.com>
-+#
-+# Author: Sylvain Afchain <sylvain.afchain@enovance.com>
-+#
-+# Licensed under the Apache License, Version 2.0 (the "License"); you may
-+# not use this file except in compliance with the License. You may obtain
-+# a copy of the License at
-+#
-+# http://www.apache.org/licenses/LICENSE-2.0
-+#
-+# Unless required by applicable law or agreed to in writing, software
-+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-+# License for the specific language governing permissions and limitations
-+# under the License.
-+
-+import functools
-+
-+from neutronclient.v2_0 import client as clientv20
-+from oslo.config import cfg
-+
-+from ceilometer.openstack.common import log
-+
-+cfg.CONF.import_group('service_credentials', 'ceilometer.service')
-+
-+LOG = log.getLogger(__name__)
-+
-+
-+def logged(func):
-+
-+ @functools.wraps(func)
-+ def with_logging(*args, **kwargs):
-+ try:
-+ return func(*args, **kwargs)
-+ except Exception as e:
-+ LOG.exception(e)
-+ raise
-+
-+ return with_logging
-+
-+
-+class Client(object):
-+ """A client which gets information via python-neutronclient."""
-+
-+ def __init__(self):
-+ conf = cfg.CONF.service_credentials
-+ params = {
-+ 'insecure': conf.insecure,
-+ 'ca_cert': conf.os_cacert,
-+ 'username': conf.os_username,
-+ 'password': conf.os_password,
-+ 'auth_url': conf.os_auth_url,
-+ 'region_name': conf.os_region_name,
-+ 'endpoint_type': conf.os_endpoint_type
-+ }
-+
-+ if conf.os_tenant_id:
-+ params['tenant_id'] = conf.os_tenant_id
-+ else:
-+ params['tenant_name'] = conf.os_tenant_name
-+
-+ self.client = clientv20.Client(**params)
-+
-+ @logged
-+ def network_get_all(self):
-+ """Returns all networks."""
-+ resp = self.client.list_networks()
-+ return resp.get('networks')
-+
-+ @logged
-+ def port_get_all(self):
-+ resp = self.client.list_ports()
-+ return resp.get('ports')
-diff --git a/ceilometer/tests/network/statistics/opencontrail/__init__.py b/ceilometer/tests/network/statistics/opencontrail/__init__.py
-new file mode 100644
-index 0000000..e69de29
-diff --git a/ceilometer/tests/network/statistics/opencontrail/test_client.py b/ceilometer/tests/network/statistics/opencontrail/test_client.py
-new file mode 100644
-index 0000000..1817a32
---- /dev/null
-+++ b/ceilometer/tests/network/statistics/opencontrail/test_client.py
-@@ -0,0 +1,76 @@
-+# Copyright (C) 2014 eNovance SAS <licensing@enovance.com>
-+#
-+# Author: Sylvain Afchain <sylvain.afchain@enovance.com>
-+#
-+# Licensed under the Apache License, Version 2.0 (the "License"); you may
-+# not use this file except in compliance with the License. You may obtain
-+# a copy of the License at
-+#
-+# http://www.apache.org/licenses/LICENSE-2.0
-+#
-+# Unless required by applicable law or agreed to in writing, software
-+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-+# License for the specific language governing permissions and limitations
-+# under the License.
-+
-+import mock
-+
-+from ceilometer.network.statistics.opencontrail import client
-+from ceilometer.openstack.common import test
-+
-+
-+class TestOpencontrailClient(test.BaseTestCase):
-+
-+ def setUp(self):
-+ super(TestOpencontrailClient, self).setUp()
-+ self.client = client.Client('http://127.0.0.1:8143',
-+ 'admin', 'admin', None, False)
-+
-+ self.post_resp = mock.MagicMock()
-+ self.post = mock.patch('requests.post',
-+ return_value=self.post_resp).start()
-+
-+ self.post_resp.raw.version = 1.1
-+ self.post_resp.status_code = 302
-+ self.post_resp.reason = 'Moved'
-+ self.post_resp.headers = {}
-+ self.post_resp.cookies = {'connect.sid': 'aaa'}
-+ self.post_resp.content = 'dummy'
-+
-+ self.get_resp = mock.MagicMock()
-+ self.get = mock.patch('requests.get',
-+ return_value=self.get_resp).start()
-+ self.get_resp.raw_version = 1.1
-+ self.get_resp.status_code = 200
-+ self.post_resp.content = 'dqs'
-+
-+ def test_port_statistics(self):
-+ uuid = 'bbb'
-+ self.client.networks.get_port_statistics(uuid)
-+
-+ call_args = self.post.call_args_list[0][0]
-+ call_kwargs = self.post.call_args_list[0][1]
-+
-+ expected_url = 'http://127.0.0.1:8143/authenticate'
-+ self.assertEqual(expected_url, call_args[0])
-+
-+ data = call_kwargs.get('data')
-+ expected_data = {'domain': None, 'password': 'admin',
-+ 'username': 'admin'}
-+ self.assertEqual(expected_data, data)
-+
-+ call_args = self.get.call_args_list[0][0]
-+ call_kwargs = self.get.call_args_list[0][1]
-+
-+ expected_url = ('http://127.0.0.1:8143/api/tenant/'
-+ 'networking/virtual-machines/details')
-+ self.assertEqual(expected_url, call_args[0])
-+
-+ data = call_kwargs.get('data')
-+ cookies = call_kwargs.get('cookies')
-+
-+ expected_data = {'fqnUUID': 'bbb', 'type': 'vn'}
-+ expected_cookies = {'connect.sid': 'aaa'}
-+ self.assertEqual(expected_data, data)
-+ self.assertEqual(expected_cookies, cookies)
-diff --git a/ceilometer/tests/network/statistics/opencontrail/test_driver.py b/ceilometer/tests/network/statistics/opencontrail/test_driver.py
-new file mode 100644
-index 0000000..940e998
---- /dev/null
-+++ b/ceilometer/tests/network/statistics/opencontrail/test_driver.py
-@@ -0,0 +1,145 @@
-+# Copyright (C) 2014 eNovance SAS <licensing@enovance.com>
-+#
-+# Author: Sylvain Afchain <sylvain.afchain@enovance.com>
-+#
-+# Licensed under the Apache License, Version 2.0 (the "License"); you may
-+# not use this file except in compliance with the License. You may obtain
-+# a copy of the License at
-+#
-+# http://www.apache.org/licenses/LICENSE-2.0
-+#
-+# Unless required by applicable law or agreed to in writing, software
-+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-+# License for the specific language governing permissions and limitations
-+# under the License.
-+
-+import mock
-+from six.moves.urllib import parse as url_parse
-+
-+from ceilometer.network.statistics.opencontrail import driver
-+from ceilometer.openstack.common import test
-+
-+
-+class TestOpencontrailDriver(test.BaseTestCase):
-+
-+ def setUp(self):
-+ super(TestOpencontrailDriver, self).setUp()
-+
-+ self.nc_ports = mock.patch('ceilometer.neutron_client'
-+ '.Client.port_get_all',
-+ return_value=self.fake_ports())
-+ self.nc_ports.start()
-+
-+ self.nc_networks = mock.patch('ceilometer.neutron_client'
-+ '.Client.network_get_all',
-+ return_value=self.fake_networks())
-+ self.nc_networks.start()
-+
-+ self.driver = driver.OpencontrailDriver()
-+ self.parse_url = url_parse.ParseResult('opencontrail',
-+ '127.0.0.1:8143',
-+ '/', None, None, None)
-+ self.params = {'password': ['admin'],
-+ 'scheme': ['http'],
-+ 'username': ['admin'],
-+ 'verify_ssl': ['false']}
-+
-+ @staticmethod
-+ def fake_ports():
-+ return [{'admin_state_up': True,
-+ 'device_owner': 'compute:None',
-+ 'device_id': '674e553b-8df9-4321-87d9-93ba05b93558',
-+ 'extra_dhcp_opts': [],
-+ 'id': '96d49cc3-4e01-40ce-9cac-c0e32642a442',
-+ 'mac_address': 'fa:16:3e:c5:35:93',
-+ 'name': '',
-+ 'network_id': '298a3088-a446-4d5a-bad8-f92ecacd786b',
-+ 'status': 'ACTIVE',
-+ 'tenant_id': '89271fa581ab4380bf172f868c3615f9'}]
-+
-+ @staticmethod
-+ def fake_networks():
-+ return [{'admin_state_up': True,
-+ 'id': '298a3088-a446-4d5a-bad8-f92ecacd786b',
-+ 'name': 'public',
-+ 'provider:network_type': 'gre',
-+ 'provider:physical_network': None,
-+ 'provider:segmentation_id': 2,
-+ 'router:external': True,
-+ 'shared': False,
-+ 'status': 'ACTIVE',
-+ 'subnets': [u'c4b6f5b8-3508-4896-b238-a441f25fb492'],
-+ 'tenant_id': '62d6f08bbd3a44f6ad6f00ca15cce4e5'}]
-+
-+ @staticmethod
-+ def fake_port_stats():
-+ return {"value": [{
-+ "name": "c588ebb7-ae52-485a-9f0c-b2791c5da196",
-+ "value": {
-+ "UveVirtualMachineAgent": {
-+ "if_stats_list": [{
-+ "out_bytes": 22,
-+ "in_bandwidth_usage": 0,
-+ "in_bytes": 23,
-+ "out_bandwidth_usage": 0,
-+ "out_pkts": 5,
-+ "in_pkts": 6,
-+ "name": ("674e553b-8df9-4321-87d9-93ba05b93558:"
-+ "96d49cc3-4e01-40ce-9cac-c0e32642a442")
-+ }]}}}]}
-+
-+ def _test_meter(self, meter_name, expected):
-+ with mock.patch('ceilometer.network.'
-+ 'statistics.opencontrail.'
-+ 'client.NetworksAPIClient.'
-+ 'get_port_statistics',
-+ return_value=self.fake_port_stats()) as port_stats:
-+
-+ samples = self.driver.get_sample_data(meter_name, self.parse_url,
-+ self.params, {})
-+
-+ self.assertEqual(expected, [s for s in samples])
-+
-+ net_id = '298a3088-a446-4d5a-bad8-f92ecacd786b'
-+ port_stats.assert_called_with(net_id)
-+
-+ def test_switch_port_receive_packets(self):
-+ expected = [
-+ (6,
-+ '96d49cc3-4e01-40ce-9cac-c0e32642a442',
-+ {'device_owner_id': '674e553b-8df9-4321-87d9-93ba05b93558',
-+ 'network_id': '298a3088-a446-4d5a-bad8-f92ecacd786b',
-+ 'tenant_id': '89271fa581ab4380bf172f868c3615f9'},
-+ mock.ANY)]
-+ self._test_meter('switch.port.receive.packets', expected)
-+
-+ def test_switch_port_transmit_packets(self):
-+ expected = [
-+ (5,
-+ '96d49cc3-4e01-40ce-9cac-c0e32642a442',
-+ {'device_owner_id': '674e553b-8df9-4321-87d9-93ba05b93558',
-+ 'network_id': '298a3088-a446-4d5a-bad8-f92ecacd786b',
-+ 'tenant_id': '89271fa581ab4380bf172f868c3615f9'},
-+ mock.ANY)]
-+ self._test_meter('switch.port.transmit.packets', expected)
-+
-+ def test_switch_port_receive_bytes(self):
-+ expected = [
-+ (23,
-+ '96d49cc3-4e01-40ce-9cac-c0e32642a442',
-+ {'device_owner_id': '674e553b-8df9-4321-87d9-93ba05b93558',
-+ 'network_id': '298a3088-a446-4d5a-bad8-f92ecacd786b',
-+ 'tenant_id': '89271fa581ab4380bf172f868c3615f9'},
-+ mock.ANY)]
-+ self._test_meter('switch.port.receive.bytes', expected)
-+
-+ def test_switch_port_transmit_bytes(self):
-+ expected = [
-+ (22,
-+ '96d49cc3-4e01-40ce-9cac-c0e32642a442',
-+ {'device_owner_id': '674e553b-8df9-4321-87d9-93ba05b93558',
-+ 'network_id': '298a3088-a446-4d5a-bad8-f92ecacd786b',
-+ 'tenant_id': '89271fa581ab4380bf172f868c3615f9'},
-+ mock.ANY)]
-+ self._test_meter('switch.port.transmit.bytes', expected)
-diff --git a/ceilometer/tests/test_neutronclient.py b/ceilometer/tests/test_neutronclient.py
-new file mode 100644
-index 0000000..17d39b6
---- /dev/null
-+++ b/ceilometer/tests/test_neutronclient.py
-@@ -0,0 +1,74 @@
-+# Copyright (C) 2014 eNovance SAS <licensing@enovance.com>
-+#
-+# Author: Sylvain Afchain <sylvain.afchain@enovance.com>
-+#
-+# Licensed under the Apache License, Version 2.0 (the "License"); you may
-+# not use this file except in compliance with the License. You may obtain
-+# a copy of the License at
-+#
-+# http://www.apache.org/licenses/LICENSE-2.0
-+#
-+# Unless required by applicable law or agreed to in writing, software
-+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-+# License for the specific language governing permissions and limitations
-+# under the License.
-+
-+from mock import patch
-+
-+from ceilometer import neutron_client
-+from ceilometer.openstack.common import test
-+
-+
-+class TestNeutronClient(test.BaseTestCase):
-+
-+ def setUp(self):
-+ super(TestNeutronClient, self).setUp()
-+ self.nc = neutron_client.Client()
-+
-+ @staticmethod
-+ def fake_ports_list():
-+ return {'ports':
-+ [{'admin_state_up': True,
-+ 'device_id': '674e553b-8df9-4321-87d9-93ba05b93558',
-+ 'device_owner': 'network:router_gateway',
-+ 'extra_dhcp_opts': [],
-+ 'id': '96d49cc3-4e01-40ce-9cac-c0e32642a442',
-+ 'mac_address': 'fa:16:3e:c5:35:93',
-+ 'name': '',
-+ 'network_id': '298a3088-a446-4d5a-bad8-f92ecacd786b',
-+ 'status': 'ACTIVE',
-+ 'tenant_id': '89271fa581ab4380bf172f868c3615f9'}]}
-+
-+ def test_port_get_all(self):
-+ with patch.object(self.nc.client, 'list_ports',
-+ side_effect=self.fake_ports_list):
-+ ports = self.nc.port_get_all()
-+
-+ self.assertEqual(1, len(ports))
-+ self.assertEqual('96d49cc3-4e01-40ce-9cac-c0e32642a442',
-+ ports[0]['id'])
-+
-+ @staticmethod
-+ def fake_networks_list():
-+ return {'networks':
-+ [{'admin_state_up': True,
-+ 'id': '298a3088-a446-4d5a-bad8-f92ecacd786b',
-+ 'name': 'public',
-+ 'provider:network_type': 'gre',
-+ 'provider:physical_network': None,
-+ 'provider:segmentation_id': 2,
-+ 'router:external': True,
-+ 'shared': False,
-+ 'status': 'ACTIVE',
-+ 'subnets': [u'c4b6f5b8-3508-4896-b238-a441f25fb492'],
-+ 'tenant_id': '62d6f08bbd3a44f6ad6f00ca15cce4e5'}]}
-+
-+ def test_network_get_all(self):
-+ with patch.object(self.nc.client, 'list_networks',
-+ side_effect=self.fake_networks_list):
-+ networks = self.nc.network_get_all()
-+
-+ self.assertEqual(1, len(networks))
-+ self.assertEqual('298a3088-a446-4d5a-bad8-f92ecacd786b',
-+ networks[0]['id'])
-diff --git a/requirements.txt b/requirements.txt
-index 654f568..d325ea8 100644
---- a/requirements.txt
-+++ b/requirements.txt
-@@ -23,6 +23,7 @@ python-glanceclient>=0.9.0
- python-keystoneclient>=0.7.0
- python-novaclient>=2.17.0
- python-swiftclient>=1.6
-+python-neutronclient>=2.3.4,<3
- pytz>=2010h
- PyYAML>=3.1.0
- requests>=1.1
-diff --git a/setup.cfg b/setup.cfg
-index 8cd49ed..b7ce034 100644
---- a/setup.cfg
-+++ b/setup.cfg
-@@ -200,6 +200,7 @@ ceilometer.dispatcher =
-
- network.statistics.drivers =
- opendaylight = ceilometer.network.statistics.opendaylight.driver:OpenDayLightDriver
-+ opencontrail = ceilometer.network.statistics.opencontrail.driver:OpencontrailDriver
-
-
- [build_sphinx]