--- /dev/null
+Description: transformer: Add aggregator transformer
+ This adds a transformer aggregate counters until a threshold and/or a
+ retention time and then flushing them out.
+Author: Mehdi Abaakouk <mehdi.abaakouk@enovance.com>
+Date: Wed, 9 Apr 2014 14:28:54 +0000 (+0200)
+X-Git-Url: https://review.openstack.org/gitweb?p=openstack%2Fceilometer.git;a=commitdiff_plain;h=6957e7b66aa37685ec0d75fe7ef65cdab981d55f
+Change-Id: If4a950e585fe5309ddec58ed2c9a92928ac1acb2
+Co-Authored-By: Jordan Pittier <jordan.pittier@cloudwatt.com>
+Origin: upstream, https://review.openstack.org/#/c/87238/
+Last-Update: 2014-05-21
+
+diff --git a/ceilometer/tests/pipeline_base.py b/ceilometer/tests/pipeline_base.py
+index db51c64..f4f5a64 100644
+--- a/ceilometer/tests/pipeline_base.py
++++ b/ceilometer/tests/pipeline_base.py
+@@ -20,6 +20,7 @@
+ import abc
+ import datetime
+
++import mock
+ import six
+ from stevedore import extension
+
+@@ -52,6 +53,7 @@ class BasePipelineTestCase(test.BaseTestCase):
+ 'except': self.TransformerClassException,
+ 'drop': self.TransformerClassDrop,
+ 'cache': accumulator.TransformerAccumulator,
++ 'aggregator': conversions.AggregatorTransformer,
+ 'unit_conversion': conversions.ScalingTransformer,
+ 'rate_of_change': conversions.RateOfChangeTransformer,
+ }
+@@ -1113,3 +1115,329 @@ class BasePipelineTestCase(test.BaseTestCase):
+ meters = ('disk.read.bytes', 'disk.write.requests')
+ units = ('B', 'request')
+ self._do_test_rate_of_change_mapping(pipe, meters, units)
++
++ def _do_test_aggregator(self, parameters, expected_length):
++ transformer_cfg = [
++ {
++ 'name': 'aggregator',
++ 'parameters': parameters,
++ },
++ ]
++ self._set_pipeline_cfg('transformers', transformer_cfg)
++ self._set_pipeline_cfg('counters', ['storage.objects.incoming.bytes'])
++ counters = [
++ sample.Sample(
++ name='storage.objects.incoming.bytes',
++ type=sample.TYPE_DELTA,
++ volume=26,
++ unit='B',
++ user_id='test_user',
++ project_id='test_proj',
++ resource_id='test_resource',
++ timestamp=timeutils.utcnow().isoformat(),
++ resource_metadata={'version': '1.0'}
++ ),
++ sample.Sample(
++ name='storage.objects.incoming.bytes',
++ type=sample.TYPE_DELTA,
++ volume=16,
++ unit='B',
++ user_id='test_user',
++ project_id='test_proj',
++ resource_id='test_resource',
++ timestamp=timeutils.utcnow().isoformat(),
++ resource_metadata={'version': '2.0'}
++ ),
++ sample.Sample(
++ name='storage.objects.incoming.bytes',
++ type=sample.TYPE_DELTA,
++ volume=53,
++ unit='B',
++ user_id='test_user_bis',
++ project_id='test_proj_bis',
++ resource_id='test_resource',
++ timestamp=timeutils.utcnow().isoformat(),
++ resource_metadata={'version': '1.0'}
++ ),
++ sample.Sample(
++ name='storage.objects.incoming.bytes',
++ type=sample.TYPE_DELTA,
++ volume=42,
++ unit='B',
++ user_id='test_user_bis',
++ project_id='test_proj_bis',
++ resource_id='test_resource',
++ timestamp=timeutils.utcnow().isoformat(),
++ resource_metadata={'version': '2.0'}
++ ),
++ sample.Sample(
++ name='storage.objects.incoming.bytes',
++ type=sample.TYPE_DELTA,
++ volume=15,
++ unit='B',
++ user_id='test_user',
++ project_id='test_proj_bis',
++ resource_id='test_resource',
++ timestamp=timeutils.utcnow().isoformat(),
++ resource_metadata={'version': '2.0'}
++ ),
++ sample.Sample(
++ name='storage.objects.incoming.bytes',
++ type=sample.TYPE_DELTA,
++ volume=2,
++ unit='B',
++ user_id='test_user_bis',
++ project_id='test_proj',
++ resource_id='test_resource',
++ timestamp=timeutils.utcnow().isoformat(),
++ resource_metadata={'version': '3.0'}
++ ),
++ ]
++
++ pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
++ self.transformer_manager)
++ pipe = pipeline_manager.pipelines[0]
++
++ pipe.publish_samples(None, counters)
++ pipe.flush(None)
++ publisher = pipeline_manager.pipelines[0].publishers[0]
++ self.assertEqual(expected_length, len(publisher.samples))
++ return sorted(publisher.samples, key=lambda s: s.volume)
++
++ def test_aggregator_metadata(self):
++ for conf, expected_version in [('last', '2.0'), ('first', '1.0')]:
++ samples = self._do_test_aggregator({
++ 'resource_metadata': conf,
++ 'target': {'name': 'aggregated-bytes'}
++ }, expected_length=4)
++ s = samples[0]
++ self.assertEqual('aggregated-bytes', s.name)
++ self.assertEqual(2, s.volume)
++ self.assertEqual('test_user_bis', s.user_id)
++ self.assertEqual('test_proj', s.project_id)
++ self.assertEqual({'version': '3.0'},
++ s.resource_metadata)
++ s = samples[1]
++ self.assertEqual('aggregated-bytes', s.name)
++ self.assertEqual(15, s.volume)
++ self.assertEqual('test_user', s.user_id)
++ self.assertEqual('test_proj_bis', s.project_id)
++ self.assertEqual({'version': '2.0'},
++ s.resource_metadata)
++ s = samples[2]
++ self.assertEqual('aggregated-bytes', s.name)
++ self.assertEqual(42, s.volume)
++ self.assertEqual('test_user', s.user_id)
++ self.assertEqual('test_proj', s.project_id)
++ self.assertEqual({'version': expected_version},
++ s.resource_metadata)
++ s = samples[3]
++ self.assertEqual('aggregated-bytes', s.name)
++ self.assertEqual(95, s.volume)
++ self.assertEqual('test_user_bis', s.user_id)
++ self.assertEqual('test_proj_bis', s.project_id)
++ self.assertEqual({'version': expected_version},
++ s.resource_metadata)
++
++ def test_aggregator_user_last_and_metadata_last(self):
++ samples = self._do_test_aggregator({
++ 'resource_metadata': 'last',
++ 'user_id': 'last',
++ 'target': {'name': 'aggregated-bytes'}
++ }, expected_length=2)
++ s = samples[0]
++ self.assertEqual('aggregated-bytes', s.name)
++ self.assertEqual(44, s.volume)
++ self.assertEqual('test_user_bis', s.user_id)
++ self.assertEqual('test_proj', s.project_id)
++ self.assertEqual({'version': '3.0'},
++ s.resource_metadata)
++ s = samples[1]
++ self.assertEqual('aggregated-bytes', s.name)
++ self.assertEqual(110, s.volume)
++ self.assertEqual('test_user', s.user_id)
++ self.assertEqual('test_proj_bis', s.project_id)
++ self.assertEqual({'version': '2.0'},
++ s.resource_metadata)
++
++ def test_aggregator_user_first_and_metadata_last(self):
++ samples = self._do_test_aggregator({
++ 'resource_metadata': 'last',
++ 'user_id': 'first',
++ 'target': {'name': 'aggregated-bytes'}
++ }, expected_length=2)
++ s = samples[0]
++ self.assertEqual('aggregated-bytes', s.name)
++ self.assertEqual(44, s.volume)
++ self.assertEqual('test_user', s.user_id)
++ self.assertEqual('test_proj', s.project_id)
++ self.assertEqual({'version': '3.0'},
++ s.resource_metadata)
++ s = samples[1]
++ self.assertEqual('aggregated-bytes', s.name)
++ self.assertEqual(110, s.volume)
++ self.assertEqual('test_user_bis', s.user_id)
++ self.assertEqual('test_proj_bis', s.project_id)
++ self.assertEqual({'version': '2.0'},
++ s.resource_metadata)
++
++ def test_aggregator_all_first(self):
++ samples = self._do_test_aggregator({
++ 'resource_metadata': 'first',
++ 'user_id': 'first',
++ 'project_id': 'first',
++ 'target': {'name': 'aggregated-bytes'}
++ }, expected_length=1)
++ s = samples[0]
++ self.assertEqual('aggregated-bytes', s.name)
++ self.assertEqual(154, s.volume)
++ self.assertEqual('test_user', s.user_id)
++ self.assertEqual('test_proj', s.project_id)
++ self.assertEqual({'version': '1.0'},
++ s.resource_metadata)
++
++ def test_aggregator_all_last(self):
++ samples = self._do_test_aggregator({
++ 'resource_metadata': 'last',
++ 'user_id': 'last',
++ 'project_id': 'last',
++ 'target': {'name': 'aggregated-bytes'}
++ }, expected_length=1)
++ s = samples[0]
++ self.assertEqual('aggregated-bytes', s.name)
++ self.assertEqual(154, s.volume)
++ self.assertEqual('test_user_bis', s.user_id)
++ self.assertEqual('test_proj', s.project_id)
++ self.assertEqual({'version': '3.0'},
++ s.resource_metadata)
++
++ def test_aggregator_all_mixed(self):
++ samples = self._do_test_aggregator({
++ 'resource_metadata': 'drop',
++ 'user_id': 'first',
++ 'project_id': 'last',
++ 'target': {'name': 'aggregated-bytes'}
++ }, expected_length=1)
++ s = samples[0]
++ self.assertEqual('aggregated-bytes', s.name)
++ self.assertEqual(154, s.volume)
++ self.assertEqual('test_user', s.user_id)
++ self.assertEqual('test_proj', s.project_id)
++ self.assertEqual({}, s.resource_metadata)
++
++ def test_aggregator_metadata_default(self):
++ samples = self._do_test_aggregator({
++ 'user_id': 'last',
++ 'project_id': 'last',
++ 'target': {'name': 'aggregated-bytes'}
++ }, expected_length=1)
++ s = samples[0]
++ self.assertEqual('aggregated-bytes', s.name)
++ self.assertEqual(154, s.volume)
++ self.assertEqual('test_user_bis', s.user_id)
++ self.assertEqual('test_proj', s.project_id)
++ self.assertEqual({'version': '3.0'},
++ s.resource_metadata)
++
++ @mock.patch('ceilometer.transformer.conversions.LOG')
++ def test_aggregator_metadata_invalid(self, mylog):
++ samples = self._do_test_aggregator({
++ 'resource_metadata': 'invalid',
++ 'user_id': 'last',
++ 'project_id': 'last',
++ 'target': {'name': 'aggregated-bytes'}
++ }, expected_length=1)
++ s = samples[0]
++ self.assertTrue(mylog.warn.called)
++ self.assertEqual('aggregated-bytes', s.name)
++ self.assertEqual(154, s.volume)
++ self.assertEqual('test_user_bis', s.user_id)
++ self.assertEqual('test_proj', s.project_id)
++ self.assertEqual({'version': '3.0'},
++ s.resource_metadata)
++
++ def test_aggregator_sized_flush(self):
++ transformer_cfg = [
++ {
++ 'name': 'aggregator',
++ 'parameters': {'size': 2},
++ },
++ ]
++ self._set_pipeline_cfg('transformers', transformer_cfg)
++ self._set_pipeline_cfg('counters', ['storage.objects.incoming.bytes'])
++ counters = [
++ sample.Sample(
++ name='storage.objects.incoming.bytes',
++ type=sample.TYPE_DELTA,
++ volume=26,
++ unit='B',
++ user_id='test_user',
++ project_id='test_proj',
++ resource_id='test_resource',
++ timestamp=timeutils.utcnow().isoformat(),
++ resource_metadata={'version': '1.0'}
++ ),
++ sample.Sample(
++ name='storage.objects.incoming.bytes',
++ type=sample.TYPE_DELTA,
++ volume=16,
++ unit='B',
++ user_id='test_user_bis',
++ project_id='test_proj_bis',
++ resource_id='test_resource',
++ timestamp=timeutils.utcnow().isoformat(),
++ resource_metadata={'version': '2.0'}
++ )
++ ]
++
++ pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
++ self.transformer_manager)
++ pipe = pipeline_manager.pipelines[0]
++
++ pipe.publish_samples(None, [counters[0]])
++ pipe.flush(None)
++ publisher = pipe.publishers[0]
++ self.assertEqual(0, len(publisher.samples))
++
++ pipe.publish_samples(None, [counters[1]])
++ pipe.flush(None)
++ publisher = pipe.publishers[0]
++ self.assertEqual(2, len(publisher.samples))
++
++ def test_aggregator_timed_flush(self):
++ timeutils.set_time_override()
++ transformer_cfg = [
++ {
++ 'name': 'aggregator',
++ 'parameters': {'size': 900, 'retention_time': 60},
++ },
++ ]
++ self._set_pipeline_cfg('transformers', transformer_cfg)
++ self._set_pipeline_cfg('counters', ['storage.objects.incoming.bytes'])
++ counters = [
++ sample.Sample(
++ name='storage.objects.incoming.bytes',
++ type=sample.TYPE_DELTA,
++ volume=26,
++ unit='B',
++ user_id='test_user',
++ project_id='test_proj',
++ resource_id='test_resource',
++ timestamp=timeutils.utcnow().isoformat(),
++ resource_metadata={'version': '1.0'}
++ ),
++ ]
++
++ pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
++ self.transformer_manager)
++ pipe = pipeline_manager.pipelines[0]
++
++ pipe.publish_samples(None, counters)
++ pipe.flush(None)
++ publisher = pipeline_manager.pipelines[0].publishers[0]
++ self.assertEqual(0, len(publisher.samples))
++
++ timeutils.advance_time_seconds(120)
++ pipe.flush(None)
++ publisher = pipeline_manager.pipelines[0].publishers[0]
++ self.assertEqual(1, len(publisher.samples))
+diff --git a/ceilometer/transformer/conversions.py b/ceilometer/transformer/conversions.py
+index 3a7d079..bfa9f0c 100644
+--- a/ceilometer/transformer/conversions.py
++++ b/ceilometer/transformer/conversions.py
+@@ -163,3 +163,89 @@ class RateOfChangeTransformer(ScalingTransformer):
+ (s,))
+ s = None
+ return s
++
++
++class AggregatorTransformer(ScalingTransformer):
++ """Transformer that aggregate sample until a threshold or/and a
++ retention_time, and then flush them out in the wild.
++
++ Example:
++ To aggregate sample by resource_metadata and keep the
++ resource_metadata of the latest received sample;
++
++ AggregatorTransformer(retention_time=60, resource_metadata='last')
++
++ To aggregate sample by user_id and resource_metadata and keep the
++ user_id of the first received sample and drop the resource_metadata.
++
++ AggregatorTransformer(size=15, user_id='first',
++ resource_metadata='drop')
++
++ """
++
++ def __init__(self, size=1, retention_time=None,
++ project_id=None, user_id=None, resource_metadata="last",
++ **kwargs):
++ super(AggregatorTransformer, self).__init__(**kwargs)
++ self.samples = {}
++ self.size = size
++ self.retention_time = retention_time
++ self.initial_timestamp = None
++ self.aggregated_samples = 0
++
++ self.key_attributes = []
++ self.merged_attribute_policy = {}
++
++ self._init_attribute('project_id', project_id)
++ self._init_attribute('user_id', user_id)
++ self._init_attribute('resource_metadata', resource_metadata,
++ is_droppable=True, mandatory=True)
++
++ def _init_attribute(self, name, value, is_droppable=False,
++ mandatory=False):
++ drop = ['drop'] if is_droppable else []
++ if value or mandatory:
++ if value not in ['last', 'first'] + drop:
++ LOG.warn('%s is unknown (%s), using last' % (name, value))
++ value = 'last'
++ self.merged_attribute_policy[name] = value
++ else:
++ self.key_attributes.append(name)
++
++ def _get_unique_key(self, s):
++ non_aggregated_keys = "-".join([getattr(s, field)
++ for field in self.key_attributes])
++ #NOTE(sileht): it assumes, a meter always have the same unit/type
++ return "%s-%s-%s" % (s.name, s.resource_id, non_aggregated_keys)
++
++ def handle_sample(self, context, sample):
++ if not self.initial_timestamp:
++ self.initial_timestamp = timeutils.parse_strtime(
++ sample.timestamp)
++
++ self.aggregated_samples += 1
++ key = self._get_unique_key(sample)
++ if key not in self.samples:
++ self.samples[key] = self._convert(sample)
++ if self.merged_attribute_policy[
++ 'resource_metadata'] == 'drop':
++ self.samples[key].resource_metadata = {}
++ else:
++ self.samples[key].volume += self._scale(sample)
++ for field in self.merged_attribute_policy:
++ if self.merged_attribute_policy[field] == 'last':
++ setattr(self.samples[key], field,
++ getattr(sample, field))
++
++ def flush(self, context):
++ expired = self.retention_time and \
++ timeutils.is_older_than(self.initial_timestamp,
++ self.retention_time)
++ full = self.aggregated_samples >= self.size
++ if full or expired:
++ x = self.samples.values()
++ self.samples = {}
++ self.aggregated_samples = 0
++ self.initial_timestamp = None
++ return x
++ return []
+diff --git a/setup.cfg b/setup.cfg
+index ef67325..3f19142 100644
+--- a/setup.cfg
++++ b/setup.cfg
+@@ -156,6 +156,7 @@ ceilometer.transformer =
+ accumulator = ceilometer.transformer.accumulator:TransformerAccumulator
+ unit_conversion = ceilometer.transformer.conversions:ScalingTransformer
+ rate_of_change = ceilometer.transformer.conversions:RateOfChangeTransformer
++ aggregator = ceilometer.transformer.conversions:AggregatorTransformer
+
+ ceilometer.publisher =
+ test = ceilometer.publisher.test:TestPublisher