Description: transformer: Add aggregator transformer This adds a transformer aggregate counters until a threshold and/or a retention time and then flushing them out. Author: Mehdi Abaakouk Date: Wed, 9 Apr 2014 14:28:54 +0000 (+0200) X-Git-Url: https://review.openstack.org/gitweb?p=openstack%2Fceilometer.git;a=commitdiff_plain;h=6957e7b66aa37685ec0d75fe7ef65cdab981d55f Change-Id: If4a950e585fe5309ddec58ed2c9a92928ac1acb2 Co-Authored-By: Jordan Pittier Origin: upstream, https://review.openstack.org/#/c/87238/ Last-Update: 2014-05-21 diff --git a/ceilometer/tests/pipeline_base.py b/ceilometer/tests/pipeline_base.py index db51c64..f4f5a64 100644 --- a/ceilometer/tests/pipeline_base.py +++ b/ceilometer/tests/pipeline_base.py @@ -20,6 +20,7 @@ import abc import datetime +import mock import six from stevedore import extension @@ -52,6 +53,7 @@ class BasePipelineTestCase(test.BaseTestCase): 'except': self.TransformerClassException, 'drop': self.TransformerClassDrop, 'cache': accumulator.TransformerAccumulator, + 'aggregator': conversions.AggregatorTransformer, 'unit_conversion': conversions.ScalingTransformer, 'rate_of_change': conversions.RateOfChangeTransformer, } @@ -1113,3 +1115,329 @@ class BasePipelineTestCase(test.BaseTestCase): meters = ('disk.read.bytes', 'disk.write.requests') units = ('B', 'request') self._do_test_rate_of_change_mapping(pipe, meters, units) + + def _do_test_aggregator(self, parameters, expected_length): + transformer_cfg = [ + { + 'name': 'aggregator', + 'parameters': parameters, + }, + ] + self._set_pipeline_cfg('transformers', transformer_cfg) + self._set_pipeline_cfg('counters', ['storage.objects.incoming.bytes']) + counters = [ + sample.Sample( + name='storage.objects.incoming.bytes', + type=sample.TYPE_DELTA, + volume=26, + unit='B', + user_id='test_user', + project_id='test_proj', + resource_id='test_resource', + timestamp=timeutils.utcnow().isoformat(), + resource_metadata={'version': '1.0'} + ), + sample.Sample( + name='storage.objects.incoming.bytes', + type=sample.TYPE_DELTA, + volume=16, + unit='B', + user_id='test_user', + project_id='test_proj', + resource_id='test_resource', + timestamp=timeutils.utcnow().isoformat(), + resource_metadata={'version': '2.0'} + ), + sample.Sample( + name='storage.objects.incoming.bytes', + type=sample.TYPE_DELTA, + volume=53, + unit='B', + user_id='test_user_bis', + project_id='test_proj_bis', + resource_id='test_resource', + timestamp=timeutils.utcnow().isoformat(), + resource_metadata={'version': '1.0'} + ), + sample.Sample( + name='storage.objects.incoming.bytes', + type=sample.TYPE_DELTA, + volume=42, + unit='B', + user_id='test_user_bis', + project_id='test_proj_bis', + resource_id='test_resource', + timestamp=timeutils.utcnow().isoformat(), + resource_metadata={'version': '2.0'} + ), + sample.Sample( + name='storage.objects.incoming.bytes', + type=sample.TYPE_DELTA, + volume=15, + unit='B', + user_id='test_user', + project_id='test_proj_bis', + resource_id='test_resource', + timestamp=timeutils.utcnow().isoformat(), + resource_metadata={'version': '2.0'} + ), + sample.Sample( + name='storage.objects.incoming.bytes', + type=sample.TYPE_DELTA, + volume=2, + unit='B', + user_id='test_user_bis', + project_id='test_proj', + resource_id='test_resource', + timestamp=timeutils.utcnow().isoformat(), + resource_metadata={'version': '3.0'} + ), + ] + + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager) + pipe = pipeline_manager.pipelines[0] + + pipe.publish_samples(None, counters) + pipe.flush(None) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(expected_length, len(publisher.samples)) + return sorted(publisher.samples, key=lambda s: s.volume) + + def test_aggregator_metadata(self): + for conf, expected_version in [('last', '2.0'), ('first', '1.0')]: + samples = self._do_test_aggregator({ + 'resource_metadata': conf, + 'target': {'name': 'aggregated-bytes'} + }, expected_length=4) + s = samples[0] + self.assertEqual('aggregated-bytes', s.name) + self.assertEqual(2, s.volume) + self.assertEqual('test_user_bis', s.user_id) + self.assertEqual('test_proj', s.project_id) + self.assertEqual({'version': '3.0'}, + s.resource_metadata) + s = samples[1] + self.assertEqual('aggregated-bytes', s.name) + self.assertEqual(15, s.volume) + self.assertEqual('test_user', s.user_id) + self.assertEqual('test_proj_bis', s.project_id) + self.assertEqual({'version': '2.0'}, + s.resource_metadata) + s = samples[2] + self.assertEqual('aggregated-bytes', s.name) + self.assertEqual(42, s.volume) + self.assertEqual('test_user', s.user_id) + self.assertEqual('test_proj', s.project_id) + self.assertEqual({'version': expected_version}, + s.resource_metadata) + s = samples[3] + self.assertEqual('aggregated-bytes', s.name) + self.assertEqual(95, s.volume) + self.assertEqual('test_user_bis', s.user_id) + self.assertEqual('test_proj_bis', s.project_id) + self.assertEqual({'version': expected_version}, + s.resource_metadata) + + def test_aggregator_user_last_and_metadata_last(self): + samples = self._do_test_aggregator({ + 'resource_metadata': 'last', + 'user_id': 'last', + 'target': {'name': 'aggregated-bytes'} + }, expected_length=2) + s = samples[0] + self.assertEqual('aggregated-bytes', s.name) + self.assertEqual(44, s.volume) + self.assertEqual('test_user_bis', s.user_id) + self.assertEqual('test_proj', s.project_id) + self.assertEqual({'version': '3.0'}, + s.resource_metadata) + s = samples[1] + self.assertEqual('aggregated-bytes', s.name) + self.assertEqual(110, s.volume) + self.assertEqual('test_user', s.user_id) + self.assertEqual('test_proj_bis', s.project_id) + self.assertEqual({'version': '2.0'}, + s.resource_metadata) + + def test_aggregator_user_first_and_metadata_last(self): + samples = self._do_test_aggregator({ + 'resource_metadata': 'last', + 'user_id': 'first', + 'target': {'name': 'aggregated-bytes'} + }, expected_length=2) + s = samples[0] + self.assertEqual('aggregated-bytes', s.name) + self.assertEqual(44, s.volume) + self.assertEqual('test_user', s.user_id) + self.assertEqual('test_proj', s.project_id) + self.assertEqual({'version': '3.0'}, + s.resource_metadata) + s = samples[1] + self.assertEqual('aggregated-bytes', s.name) + self.assertEqual(110, s.volume) + self.assertEqual('test_user_bis', s.user_id) + self.assertEqual('test_proj_bis', s.project_id) + self.assertEqual({'version': '2.0'}, + s.resource_metadata) + + def test_aggregator_all_first(self): + samples = self._do_test_aggregator({ + 'resource_metadata': 'first', + 'user_id': 'first', + 'project_id': 'first', + 'target': {'name': 'aggregated-bytes'} + }, expected_length=1) + s = samples[0] + self.assertEqual('aggregated-bytes', s.name) + self.assertEqual(154, s.volume) + self.assertEqual('test_user', s.user_id) + self.assertEqual('test_proj', s.project_id) + self.assertEqual({'version': '1.0'}, + s.resource_metadata) + + def test_aggregator_all_last(self): + samples = self._do_test_aggregator({ + 'resource_metadata': 'last', + 'user_id': 'last', + 'project_id': 'last', + 'target': {'name': 'aggregated-bytes'} + }, expected_length=1) + s = samples[0] + self.assertEqual('aggregated-bytes', s.name) + self.assertEqual(154, s.volume) + self.assertEqual('test_user_bis', s.user_id) + self.assertEqual('test_proj', s.project_id) + self.assertEqual({'version': '3.0'}, + s.resource_metadata) + + def test_aggregator_all_mixed(self): + samples = self._do_test_aggregator({ + 'resource_metadata': 'drop', + 'user_id': 'first', + 'project_id': 'last', + 'target': {'name': 'aggregated-bytes'} + }, expected_length=1) + s = samples[0] + self.assertEqual('aggregated-bytes', s.name) + self.assertEqual(154, s.volume) + self.assertEqual('test_user', s.user_id) + self.assertEqual('test_proj', s.project_id) + self.assertEqual({}, s.resource_metadata) + + def test_aggregator_metadata_default(self): + samples = self._do_test_aggregator({ + 'user_id': 'last', + 'project_id': 'last', + 'target': {'name': 'aggregated-bytes'} + }, expected_length=1) + s = samples[0] + self.assertEqual('aggregated-bytes', s.name) + self.assertEqual(154, s.volume) + self.assertEqual('test_user_bis', s.user_id) + self.assertEqual('test_proj', s.project_id) + self.assertEqual({'version': '3.0'}, + s.resource_metadata) + + @mock.patch('ceilometer.transformer.conversions.LOG') + def test_aggregator_metadata_invalid(self, mylog): + samples = self._do_test_aggregator({ + 'resource_metadata': 'invalid', + 'user_id': 'last', + 'project_id': 'last', + 'target': {'name': 'aggregated-bytes'} + }, expected_length=1) + s = samples[0] + self.assertTrue(mylog.warn.called) + self.assertEqual('aggregated-bytes', s.name) + self.assertEqual(154, s.volume) + self.assertEqual('test_user_bis', s.user_id) + self.assertEqual('test_proj', s.project_id) + self.assertEqual({'version': '3.0'}, + s.resource_metadata) + + def test_aggregator_sized_flush(self): + transformer_cfg = [ + { + 'name': 'aggregator', + 'parameters': {'size': 2}, + }, + ] + self._set_pipeline_cfg('transformers', transformer_cfg) + self._set_pipeline_cfg('counters', ['storage.objects.incoming.bytes']) + counters = [ + sample.Sample( + name='storage.objects.incoming.bytes', + type=sample.TYPE_DELTA, + volume=26, + unit='B', + user_id='test_user', + project_id='test_proj', + resource_id='test_resource', + timestamp=timeutils.utcnow().isoformat(), + resource_metadata={'version': '1.0'} + ), + sample.Sample( + name='storage.objects.incoming.bytes', + type=sample.TYPE_DELTA, + volume=16, + unit='B', + user_id='test_user_bis', + project_id='test_proj_bis', + resource_id='test_resource', + timestamp=timeutils.utcnow().isoformat(), + resource_metadata={'version': '2.0'} + ) + ] + + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager) + pipe = pipeline_manager.pipelines[0] + + pipe.publish_samples(None, [counters[0]]) + pipe.flush(None) + publisher = pipe.publishers[0] + self.assertEqual(0, len(publisher.samples)) + + pipe.publish_samples(None, [counters[1]]) + pipe.flush(None) + publisher = pipe.publishers[0] + self.assertEqual(2, len(publisher.samples)) + + def test_aggregator_timed_flush(self): + timeutils.set_time_override() + transformer_cfg = [ + { + 'name': 'aggregator', + 'parameters': {'size': 900, 'retention_time': 60}, + }, + ] + self._set_pipeline_cfg('transformers', transformer_cfg) + self._set_pipeline_cfg('counters', ['storage.objects.incoming.bytes']) + counters = [ + sample.Sample( + name='storage.objects.incoming.bytes', + type=sample.TYPE_DELTA, + volume=26, + unit='B', + user_id='test_user', + project_id='test_proj', + resource_id='test_resource', + timestamp=timeutils.utcnow().isoformat(), + resource_metadata={'version': '1.0'} + ), + ] + + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager) + pipe = pipeline_manager.pipelines[0] + + pipe.publish_samples(None, counters) + pipe.flush(None) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(0, len(publisher.samples)) + + timeutils.advance_time_seconds(120) + pipe.flush(None) + publisher = pipeline_manager.pipelines[0].publishers[0] + self.assertEqual(1, len(publisher.samples)) diff --git a/ceilometer/transformer/conversions.py b/ceilometer/transformer/conversions.py index 3a7d079..bfa9f0c 100644 --- a/ceilometer/transformer/conversions.py +++ b/ceilometer/transformer/conversions.py @@ -163,3 +163,89 @@ class RateOfChangeTransformer(ScalingTransformer): (s,)) s = None return s + + +class AggregatorTransformer(ScalingTransformer): + """Transformer that aggregate sample until a threshold or/and a + retention_time, and then flush them out in the wild. + + Example: + To aggregate sample by resource_metadata and keep the + resource_metadata of the latest received sample; + + AggregatorTransformer(retention_time=60, resource_metadata='last') + + To aggregate sample by user_id and resource_metadata and keep the + user_id of the first received sample and drop the resource_metadata. + + AggregatorTransformer(size=15, user_id='first', + resource_metadata='drop') + + """ + + def __init__(self, size=1, retention_time=None, + project_id=None, user_id=None, resource_metadata="last", + **kwargs): + super(AggregatorTransformer, self).__init__(**kwargs) + self.samples = {} + self.size = size + self.retention_time = retention_time + self.initial_timestamp = None + self.aggregated_samples = 0 + + self.key_attributes = [] + self.merged_attribute_policy = {} + + self._init_attribute('project_id', project_id) + self._init_attribute('user_id', user_id) + self._init_attribute('resource_metadata', resource_metadata, + is_droppable=True, mandatory=True) + + def _init_attribute(self, name, value, is_droppable=False, + mandatory=False): + drop = ['drop'] if is_droppable else [] + if value or mandatory: + if value not in ['last', 'first'] + drop: + LOG.warn('%s is unknown (%s), using last' % (name, value)) + value = 'last' + self.merged_attribute_policy[name] = value + else: + self.key_attributes.append(name) + + def _get_unique_key(self, s): + non_aggregated_keys = "-".join([getattr(s, field) + for field in self.key_attributes]) + #NOTE(sileht): it assumes, a meter always have the same unit/type + return "%s-%s-%s" % (s.name, s.resource_id, non_aggregated_keys) + + def handle_sample(self, context, sample): + if not self.initial_timestamp: + self.initial_timestamp = timeutils.parse_strtime( + sample.timestamp) + + self.aggregated_samples += 1 + key = self._get_unique_key(sample) + if key not in self.samples: + self.samples[key] = self._convert(sample) + if self.merged_attribute_policy[ + 'resource_metadata'] == 'drop': + self.samples[key].resource_metadata = {} + else: + self.samples[key].volume += self._scale(sample) + for field in self.merged_attribute_policy: + if self.merged_attribute_policy[field] == 'last': + setattr(self.samples[key], field, + getattr(sample, field)) + + def flush(self, context): + expired = self.retention_time and \ + timeutils.is_older_than(self.initial_timestamp, + self.retention_time) + full = self.aggregated_samples >= self.size + if full or expired: + x = self.samples.values() + self.samples = {} + self.aggregated_samples = 0 + self.initial_timestamp = None + return x + return [] diff --git a/setup.cfg b/setup.cfg index ef67325..3f19142 100644 --- a/setup.cfg +++ b/setup.cfg @@ -156,6 +156,7 @@ ceilometer.transformer = accumulator = ceilometer.transformer.accumulator:TransformerAccumulator unit_conversion = ceilometer.transformer.conversions:ScalingTransformer rate_of_change = ceilometer.transformer.conversions:RateOfChangeTransformer + aggregator = ceilometer.transformer.conversions:AggregatorTransformer ceilometer.publisher = test = ceilometer.publisher.test:TestPublisher