From 97f59a4724944b5dc4859173d9676703d26a6b8a Mon Sep 17 00:00:00 2001 From: Thomas Goirand Date: Thu, 22 May 2014 00:05:30 +0800 Subject: [PATCH] Adds Add_aggregator_transformer.patch Change-Id: I3c50e6f43c61f4ade574ccc65579b64dc04a3a17 Rewritten-From: 97e0fb41d5066a13dd7a0331670ba840cec0dca1 --- trusty/debian/changelog | 6 + .../patches/Add_aggregator_transformer.patch | 467 ++++++++++++++++++ trusty/debian/patches/series | 1 + 3 files changed, 474 insertions(+) create mode 100644 trusty/debian/patches/Add_aggregator_transformer.patch diff --git a/trusty/debian/changelog b/trusty/debian/changelog index 92f6a3a..b52b570 100644 --- a/trusty/debian/changelog +++ b/trusty/debian/changelog @@ -1,3 +1,9 @@ +ceilometer (2014.1-6) unstable; urgency=medium + + * Adds Add_aggregator_transformer.patch. + + -- Thomas Goirand Thu, 22 May 2014 00:06:15 +0800 + ceilometer (2014.1-5) unstable; urgency=medium * Added Opencontrail_network_statistics_driver.patch. diff --git a/trusty/debian/patches/Add_aggregator_transformer.patch b/trusty/debian/patches/Add_aggregator_transformer.patch new file mode 100644 index 0000000..a5fdad2 --- /dev/null +++ b/trusty/debian/patches/Add_aggregator_transformer.patch @@ -0,0 +1,467 @@ +Description: transformer: Add aggregator transformer + This adds a transformer aggregate counters until a threshold and/or a + retention time and then flushing them out. +Author: Mehdi Abaakouk +Date: Wed, 9 Apr 2014 14:28:54 +0000 (+0200) +X-Git-Url: https://review.openstack.org/gitweb?p=openstack%2Fceilometer.git;a=commitdiff_plain;h=6957e7b66aa37685ec0d75fe7ef65cdab981d55f +Change-Id: If4a950e585fe5309ddec58ed2c9a92928ac1acb2 +Co-Authored-By: Jordan Pittier +Origin: upstream, https://review.openstack.org/#/c/87238/ +Last-Update: 2014-05-21 + +diff --git a/ceilometer/tests/pipeline_base.py b/ceilometer/tests/pipeline_base.py +index db51c64..f4f5a64 100644 +--- a/ceilometer/tests/pipeline_base.py ++++ b/ceilometer/tests/pipeline_base.py +@@ -20,6 +20,7 @@ + import abc + import datetime + ++import mock + import six + from stevedore import extension + +@@ -52,6 +53,7 @@ class BasePipelineTestCase(test.BaseTestCase): + 'except': self.TransformerClassException, + 'drop': self.TransformerClassDrop, + 'cache': accumulator.TransformerAccumulator, ++ 'aggregator': conversions.AggregatorTransformer, + 'unit_conversion': conversions.ScalingTransformer, + 'rate_of_change': conversions.RateOfChangeTransformer, + } +@@ -1113,3 +1115,329 @@ class BasePipelineTestCase(test.BaseTestCase): + meters = ('disk.read.bytes', 'disk.write.requests') + units = ('B', 'request') + self._do_test_rate_of_change_mapping(pipe, meters, units) ++ ++ def _do_test_aggregator(self, parameters, expected_length): ++ transformer_cfg = [ ++ { ++ 'name': 'aggregator', ++ 'parameters': parameters, ++ }, ++ ] ++ self._set_pipeline_cfg('transformers', transformer_cfg) ++ self._set_pipeline_cfg('counters', ['storage.objects.incoming.bytes']) ++ counters = [ ++ sample.Sample( ++ name='storage.objects.incoming.bytes', ++ type=sample.TYPE_DELTA, ++ volume=26, ++ unit='B', ++ user_id='test_user', ++ project_id='test_proj', ++ resource_id='test_resource', ++ timestamp=timeutils.utcnow().isoformat(), ++ resource_metadata={'version': '1.0'} ++ ), ++ sample.Sample( ++ name='storage.objects.incoming.bytes', ++ type=sample.TYPE_DELTA, ++ volume=16, ++ unit='B', ++ user_id='test_user', ++ project_id='test_proj', ++ resource_id='test_resource', ++ timestamp=timeutils.utcnow().isoformat(), ++ resource_metadata={'version': '2.0'} ++ ), ++ sample.Sample( ++ name='storage.objects.incoming.bytes', ++ type=sample.TYPE_DELTA, ++ volume=53, ++ unit='B', ++ user_id='test_user_bis', ++ project_id='test_proj_bis', ++ resource_id='test_resource', ++ timestamp=timeutils.utcnow().isoformat(), ++ resource_metadata={'version': '1.0'} ++ ), ++ sample.Sample( ++ name='storage.objects.incoming.bytes', ++ type=sample.TYPE_DELTA, ++ volume=42, ++ unit='B', ++ user_id='test_user_bis', ++ project_id='test_proj_bis', ++ resource_id='test_resource', ++ timestamp=timeutils.utcnow().isoformat(), ++ resource_metadata={'version': '2.0'} ++ ), ++ sample.Sample( ++ name='storage.objects.incoming.bytes', ++ type=sample.TYPE_DELTA, ++ volume=15, ++ unit='B', ++ user_id='test_user', ++ project_id='test_proj_bis', ++ resource_id='test_resource', ++ timestamp=timeutils.utcnow().isoformat(), ++ resource_metadata={'version': '2.0'} ++ ), ++ sample.Sample( ++ name='storage.objects.incoming.bytes', ++ type=sample.TYPE_DELTA, ++ volume=2, ++ unit='B', ++ user_id='test_user_bis', ++ project_id='test_proj', ++ resource_id='test_resource', ++ timestamp=timeutils.utcnow().isoformat(), ++ resource_metadata={'version': '3.0'} ++ ), ++ ] ++ ++ pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, ++ self.transformer_manager) ++ pipe = pipeline_manager.pipelines[0] ++ ++ pipe.publish_samples(None, counters) ++ pipe.flush(None) ++ publisher = pipeline_manager.pipelines[0].publishers[0] ++ self.assertEqual(expected_length, len(publisher.samples)) ++ return sorted(publisher.samples, key=lambda s: s.volume) ++ ++ def test_aggregator_metadata(self): ++ for conf, expected_version in [('last', '2.0'), ('first', '1.0')]: ++ samples = self._do_test_aggregator({ ++ 'resource_metadata': conf, ++ 'target': {'name': 'aggregated-bytes'} ++ }, expected_length=4) ++ s = samples[0] ++ self.assertEqual('aggregated-bytes', s.name) ++ self.assertEqual(2, s.volume) ++ self.assertEqual('test_user_bis', s.user_id) ++ self.assertEqual('test_proj', s.project_id) ++ self.assertEqual({'version': '3.0'}, ++ s.resource_metadata) ++ s = samples[1] ++ self.assertEqual('aggregated-bytes', s.name) ++ self.assertEqual(15, s.volume) ++ self.assertEqual('test_user', s.user_id) ++ self.assertEqual('test_proj_bis', s.project_id) ++ self.assertEqual({'version': '2.0'}, ++ s.resource_metadata) ++ s = samples[2] ++ self.assertEqual('aggregated-bytes', s.name) ++ self.assertEqual(42, s.volume) ++ self.assertEqual('test_user', s.user_id) ++ self.assertEqual('test_proj', s.project_id) ++ self.assertEqual({'version': expected_version}, ++ s.resource_metadata) ++ s = samples[3] ++ self.assertEqual('aggregated-bytes', s.name) ++ self.assertEqual(95, s.volume) ++ self.assertEqual('test_user_bis', s.user_id) ++ self.assertEqual('test_proj_bis', s.project_id) ++ self.assertEqual({'version': expected_version}, ++ s.resource_metadata) ++ ++ def test_aggregator_user_last_and_metadata_last(self): ++ samples = self._do_test_aggregator({ ++ 'resource_metadata': 'last', ++ 'user_id': 'last', ++ 'target': {'name': 'aggregated-bytes'} ++ }, expected_length=2) ++ s = samples[0] ++ self.assertEqual('aggregated-bytes', s.name) ++ self.assertEqual(44, s.volume) ++ self.assertEqual('test_user_bis', s.user_id) ++ self.assertEqual('test_proj', s.project_id) ++ self.assertEqual({'version': '3.0'}, ++ s.resource_metadata) ++ s = samples[1] ++ self.assertEqual('aggregated-bytes', s.name) ++ self.assertEqual(110, s.volume) ++ self.assertEqual('test_user', s.user_id) ++ self.assertEqual('test_proj_bis', s.project_id) ++ self.assertEqual({'version': '2.0'}, ++ s.resource_metadata) ++ ++ def test_aggregator_user_first_and_metadata_last(self): ++ samples = self._do_test_aggregator({ ++ 'resource_metadata': 'last', ++ 'user_id': 'first', ++ 'target': {'name': 'aggregated-bytes'} ++ }, expected_length=2) ++ s = samples[0] ++ self.assertEqual('aggregated-bytes', s.name) ++ self.assertEqual(44, s.volume) ++ self.assertEqual('test_user', s.user_id) ++ self.assertEqual('test_proj', s.project_id) ++ self.assertEqual({'version': '3.0'}, ++ s.resource_metadata) ++ s = samples[1] ++ self.assertEqual('aggregated-bytes', s.name) ++ self.assertEqual(110, s.volume) ++ self.assertEqual('test_user_bis', s.user_id) ++ self.assertEqual('test_proj_bis', s.project_id) ++ self.assertEqual({'version': '2.0'}, ++ s.resource_metadata) ++ ++ def test_aggregator_all_first(self): ++ samples = self._do_test_aggregator({ ++ 'resource_metadata': 'first', ++ 'user_id': 'first', ++ 'project_id': 'first', ++ 'target': {'name': 'aggregated-bytes'} ++ }, expected_length=1) ++ s = samples[0] ++ self.assertEqual('aggregated-bytes', s.name) ++ self.assertEqual(154, s.volume) ++ self.assertEqual('test_user', s.user_id) ++ self.assertEqual('test_proj', s.project_id) ++ self.assertEqual({'version': '1.0'}, ++ s.resource_metadata) ++ ++ def test_aggregator_all_last(self): ++ samples = self._do_test_aggregator({ ++ 'resource_metadata': 'last', ++ 'user_id': 'last', ++ 'project_id': 'last', ++ 'target': {'name': 'aggregated-bytes'} ++ }, expected_length=1) ++ s = samples[0] ++ self.assertEqual('aggregated-bytes', s.name) ++ self.assertEqual(154, s.volume) ++ self.assertEqual('test_user_bis', s.user_id) ++ self.assertEqual('test_proj', s.project_id) ++ self.assertEqual({'version': '3.0'}, ++ s.resource_metadata) ++ ++ def test_aggregator_all_mixed(self): ++ samples = self._do_test_aggregator({ ++ 'resource_metadata': 'drop', ++ 'user_id': 'first', ++ 'project_id': 'last', ++ 'target': {'name': 'aggregated-bytes'} ++ }, expected_length=1) ++ s = samples[0] ++ self.assertEqual('aggregated-bytes', s.name) ++ self.assertEqual(154, s.volume) ++ self.assertEqual('test_user', s.user_id) ++ self.assertEqual('test_proj', s.project_id) ++ self.assertEqual({}, s.resource_metadata) ++ ++ def test_aggregator_metadata_default(self): ++ samples = self._do_test_aggregator({ ++ 'user_id': 'last', ++ 'project_id': 'last', ++ 'target': {'name': 'aggregated-bytes'} ++ }, expected_length=1) ++ s = samples[0] ++ self.assertEqual('aggregated-bytes', s.name) ++ self.assertEqual(154, s.volume) ++ self.assertEqual('test_user_bis', s.user_id) ++ self.assertEqual('test_proj', s.project_id) ++ self.assertEqual({'version': '3.0'}, ++ s.resource_metadata) ++ ++ @mock.patch('ceilometer.transformer.conversions.LOG') ++ def test_aggregator_metadata_invalid(self, mylog): ++ samples = self._do_test_aggregator({ ++ 'resource_metadata': 'invalid', ++ 'user_id': 'last', ++ 'project_id': 'last', ++ 'target': {'name': 'aggregated-bytes'} ++ }, expected_length=1) ++ s = samples[0] ++ self.assertTrue(mylog.warn.called) ++ self.assertEqual('aggregated-bytes', s.name) ++ self.assertEqual(154, s.volume) ++ self.assertEqual('test_user_bis', s.user_id) ++ self.assertEqual('test_proj', s.project_id) ++ self.assertEqual({'version': '3.0'}, ++ s.resource_metadata) ++ ++ def test_aggregator_sized_flush(self): ++ transformer_cfg = [ ++ { ++ 'name': 'aggregator', ++ 'parameters': {'size': 2}, ++ }, ++ ] ++ self._set_pipeline_cfg('transformers', transformer_cfg) ++ self._set_pipeline_cfg('counters', ['storage.objects.incoming.bytes']) ++ counters = [ ++ sample.Sample( ++ name='storage.objects.incoming.bytes', ++ type=sample.TYPE_DELTA, ++ volume=26, ++ unit='B', ++ user_id='test_user', ++ project_id='test_proj', ++ resource_id='test_resource', ++ timestamp=timeutils.utcnow().isoformat(), ++ resource_metadata={'version': '1.0'} ++ ), ++ sample.Sample( ++ name='storage.objects.incoming.bytes', ++ type=sample.TYPE_DELTA, ++ volume=16, ++ unit='B', ++ user_id='test_user_bis', ++ project_id='test_proj_bis', ++ resource_id='test_resource', ++ timestamp=timeutils.utcnow().isoformat(), ++ resource_metadata={'version': '2.0'} ++ ) ++ ] ++ ++ pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, ++ self.transformer_manager) ++ pipe = pipeline_manager.pipelines[0] ++ ++ pipe.publish_samples(None, [counters[0]]) ++ pipe.flush(None) ++ publisher = pipe.publishers[0] ++ self.assertEqual(0, len(publisher.samples)) ++ ++ pipe.publish_samples(None, [counters[1]]) ++ pipe.flush(None) ++ publisher = pipe.publishers[0] ++ self.assertEqual(2, len(publisher.samples)) ++ ++ def test_aggregator_timed_flush(self): ++ timeutils.set_time_override() ++ transformer_cfg = [ ++ { ++ 'name': 'aggregator', ++ 'parameters': {'size': 900, 'retention_time': 60}, ++ }, ++ ] ++ self._set_pipeline_cfg('transformers', transformer_cfg) ++ self._set_pipeline_cfg('counters', ['storage.objects.incoming.bytes']) ++ counters = [ ++ sample.Sample( ++ name='storage.objects.incoming.bytes', ++ type=sample.TYPE_DELTA, ++ volume=26, ++ unit='B', ++ user_id='test_user', ++ project_id='test_proj', ++ resource_id='test_resource', ++ timestamp=timeutils.utcnow().isoformat(), ++ resource_metadata={'version': '1.0'} ++ ), ++ ] ++ ++ pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, ++ self.transformer_manager) ++ pipe = pipeline_manager.pipelines[0] ++ ++ pipe.publish_samples(None, counters) ++ pipe.flush(None) ++ publisher = pipeline_manager.pipelines[0].publishers[0] ++ self.assertEqual(0, len(publisher.samples)) ++ ++ timeutils.advance_time_seconds(120) ++ pipe.flush(None) ++ publisher = pipeline_manager.pipelines[0].publishers[0] ++ self.assertEqual(1, len(publisher.samples)) +diff --git a/ceilometer/transformer/conversions.py b/ceilometer/transformer/conversions.py +index 3a7d079..bfa9f0c 100644 +--- a/ceilometer/transformer/conversions.py ++++ b/ceilometer/transformer/conversions.py +@@ -163,3 +163,89 @@ class RateOfChangeTransformer(ScalingTransformer): + (s,)) + s = None + return s ++ ++ ++class AggregatorTransformer(ScalingTransformer): ++ """Transformer that aggregate sample until a threshold or/and a ++ retention_time, and then flush them out in the wild. ++ ++ Example: ++ To aggregate sample by resource_metadata and keep the ++ resource_metadata of the latest received sample; ++ ++ AggregatorTransformer(retention_time=60, resource_metadata='last') ++ ++ To aggregate sample by user_id and resource_metadata and keep the ++ user_id of the first received sample and drop the resource_metadata. ++ ++ AggregatorTransformer(size=15, user_id='first', ++ resource_metadata='drop') ++ ++ """ ++ ++ def __init__(self, size=1, retention_time=None, ++ project_id=None, user_id=None, resource_metadata="last", ++ **kwargs): ++ super(AggregatorTransformer, self).__init__(**kwargs) ++ self.samples = {} ++ self.size = size ++ self.retention_time = retention_time ++ self.initial_timestamp = None ++ self.aggregated_samples = 0 ++ ++ self.key_attributes = [] ++ self.merged_attribute_policy = {} ++ ++ self._init_attribute('project_id', project_id) ++ self._init_attribute('user_id', user_id) ++ self._init_attribute('resource_metadata', resource_metadata, ++ is_droppable=True, mandatory=True) ++ ++ def _init_attribute(self, name, value, is_droppable=False, ++ mandatory=False): ++ drop = ['drop'] if is_droppable else [] ++ if value or mandatory: ++ if value not in ['last', 'first'] + drop: ++ LOG.warn('%s is unknown (%s), using last' % (name, value)) ++ value = 'last' ++ self.merged_attribute_policy[name] = value ++ else: ++ self.key_attributes.append(name) ++ ++ def _get_unique_key(self, s): ++ non_aggregated_keys = "-".join([getattr(s, field) ++ for field in self.key_attributes]) ++ #NOTE(sileht): it assumes, a meter always have the same unit/type ++ return "%s-%s-%s" % (s.name, s.resource_id, non_aggregated_keys) ++ ++ def handle_sample(self, context, sample): ++ if not self.initial_timestamp: ++ self.initial_timestamp = timeutils.parse_strtime( ++ sample.timestamp) ++ ++ self.aggregated_samples += 1 ++ key = self._get_unique_key(sample) ++ if key not in self.samples: ++ self.samples[key] = self._convert(sample) ++ if self.merged_attribute_policy[ ++ 'resource_metadata'] == 'drop': ++ self.samples[key].resource_metadata = {} ++ else: ++ self.samples[key].volume += self._scale(sample) ++ for field in self.merged_attribute_policy: ++ if self.merged_attribute_policy[field] == 'last': ++ setattr(self.samples[key], field, ++ getattr(sample, field)) ++ ++ def flush(self, context): ++ expired = self.retention_time and \ ++ timeutils.is_older_than(self.initial_timestamp, ++ self.retention_time) ++ full = self.aggregated_samples >= self.size ++ if full or expired: ++ x = self.samples.values() ++ self.samples = {} ++ self.aggregated_samples = 0 ++ self.initial_timestamp = None ++ return x ++ return [] +diff --git a/setup.cfg b/setup.cfg +index ef67325..3f19142 100644 +--- a/setup.cfg ++++ b/setup.cfg +@@ -156,6 +156,7 @@ ceilometer.transformer = + accumulator = ceilometer.transformer.accumulator:TransformerAccumulator + unit_conversion = ceilometer.transformer.conversions:ScalingTransformer + rate_of_change = ceilometer.transformer.conversions:RateOfChangeTransformer ++ aggregator = ceilometer.transformer.conversions:AggregatorTransformer + + ceilometer.publisher = + test = ceilometer.publisher.test:TestPublisher diff --git a/trusty/debian/patches/series b/trusty/debian/patches/series index d8e4612..76ba8c8 100644 --- a/trusty/debian/patches/series +++ b/trusty/debian/patches/series @@ -1,2 +1,3 @@ using-mongodb-by-default.patch Opencontrail_network_statistics_driver.patch +Add_aggregator_transformer.patch -- 2.32.3