Adds Add_aggregator_transformer.patch
authorThomas Goirand <thomas@goirand.fr>
Wed, 21 May 2014 16:05:30 +0000 (00:05 +0800)
committerThomas Goirand <thomas@goirand.fr>
Wed, 21 May 2014 16:06:37 +0000 (00:06 +0800)
Change-Id: I3c50e6f43c61f4ade574ccc65579b64dc04a3a17

Rewritten-From: 97e0fb41d5066a13dd7a0331670ba840cec0dca1

trusty/debian/changelog
trusty/debian/patches/Add_aggregator_transformer.patch [new file with mode: 0644]
trusty/debian/patches/series

index 92f6a3a22ae20b37977ba5c228fdeeeb457466e7..b52b5702c4d9b4c91c21660127595880d6a39dd7 100644 (file)
@@ -1,3 +1,9 @@
+ceilometer (2014.1-6) unstable; urgency=medium
+
+  * Adds Add_aggregator_transformer.patch.
+
+ -- Thomas Goirand <zigo@debian.org>  Thu, 22 May 2014 00:06:15 +0800
+
 ceilometer (2014.1-5) unstable; urgency=medium
 
   * Added Opencontrail_network_statistics_driver.patch.
diff --git a/trusty/debian/patches/Add_aggregator_transformer.patch b/trusty/debian/patches/Add_aggregator_transformer.patch
new file mode 100644 (file)
index 0000000..a5fdad2
--- /dev/null
@@ -0,0 +1,467 @@
+Description: transformer: Add aggregator transformer
+ This adds a transformer aggregate counters until a threshold and/or a
+ retention time and then flushing them out.
+Author: Mehdi Abaakouk <mehdi.abaakouk@enovance.com>
+Date: Wed, 9 Apr 2014 14:28:54 +0000 (+0200)
+X-Git-Url: https://review.openstack.org/gitweb?p=openstack%2Fceilometer.git;a=commitdiff_plain;h=6957e7b66aa37685ec0d75fe7ef65cdab981d55f
+Change-Id: If4a950e585fe5309ddec58ed2c9a92928ac1acb2
+Co-Authored-By: Jordan Pittier <jordan.pittier@cloudwatt.com>
+Origin: upstream, https://review.openstack.org/#/c/87238/
+Last-Update: 2014-05-21
+
+diff --git a/ceilometer/tests/pipeline_base.py b/ceilometer/tests/pipeline_base.py
+index db51c64..f4f5a64 100644
+--- a/ceilometer/tests/pipeline_base.py
++++ b/ceilometer/tests/pipeline_base.py
+@@ -20,6 +20,7 @@
+ import abc
+ import datetime
++import mock
+ import six
+ from stevedore import extension
+@@ -52,6 +53,7 @@ class BasePipelineTestCase(test.BaseTestCase):
+             'except': self.TransformerClassException,
+             'drop': self.TransformerClassDrop,
+             'cache': accumulator.TransformerAccumulator,
++            'aggregator': conversions.AggregatorTransformer,
+             'unit_conversion': conversions.ScalingTransformer,
+             'rate_of_change': conversions.RateOfChangeTransformer,
+         }
+@@ -1113,3 +1115,329 @@ class BasePipelineTestCase(test.BaseTestCase):
+         meters = ('disk.read.bytes', 'disk.write.requests')
+         units = ('B', 'request')
+         self._do_test_rate_of_change_mapping(pipe, meters, units)
++
++    def _do_test_aggregator(self, parameters, expected_length):
++        transformer_cfg = [
++            {
++                'name': 'aggregator',
++                'parameters': parameters,
++            },
++        ]
++        self._set_pipeline_cfg('transformers', transformer_cfg)
++        self._set_pipeline_cfg('counters', ['storage.objects.incoming.bytes'])
++        counters = [
++            sample.Sample(
++                name='storage.objects.incoming.bytes',
++                type=sample.TYPE_DELTA,
++                volume=26,
++                unit='B',
++                user_id='test_user',
++                project_id='test_proj',
++                resource_id='test_resource',
++                timestamp=timeutils.utcnow().isoformat(),
++                resource_metadata={'version': '1.0'}
++            ),
++            sample.Sample(
++                name='storage.objects.incoming.bytes',
++                type=sample.TYPE_DELTA,
++                volume=16,
++                unit='B',
++                user_id='test_user',
++                project_id='test_proj',
++                resource_id='test_resource',
++                timestamp=timeutils.utcnow().isoformat(),
++                resource_metadata={'version': '2.0'}
++            ),
++            sample.Sample(
++                name='storage.objects.incoming.bytes',
++                type=sample.TYPE_DELTA,
++                volume=53,
++                unit='B',
++                user_id='test_user_bis',
++                project_id='test_proj_bis',
++                resource_id='test_resource',
++                timestamp=timeutils.utcnow().isoformat(),
++                resource_metadata={'version': '1.0'}
++            ),
++            sample.Sample(
++                name='storage.objects.incoming.bytes',
++                type=sample.TYPE_DELTA,
++                volume=42,
++                unit='B',
++                user_id='test_user_bis',
++                project_id='test_proj_bis',
++                resource_id='test_resource',
++                timestamp=timeutils.utcnow().isoformat(),
++                resource_metadata={'version': '2.0'}
++            ),
++            sample.Sample(
++                name='storage.objects.incoming.bytes',
++                type=sample.TYPE_DELTA,
++                volume=15,
++                unit='B',
++                user_id='test_user',
++                project_id='test_proj_bis',
++                resource_id='test_resource',
++                timestamp=timeutils.utcnow().isoformat(),
++                resource_metadata={'version': '2.0'}
++            ),
++            sample.Sample(
++                name='storage.objects.incoming.bytes',
++                type=sample.TYPE_DELTA,
++                volume=2,
++                unit='B',
++                user_id='test_user_bis',
++                project_id='test_proj',
++                resource_id='test_resource',
++                timestamp=timeutils.utcnow().isoformat(),
++                resource_metadata={'version': '3.0'}
++            ),
++        ]
++
++        pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
++                                                    self.transformer_manager)
++        pipe = pipeline_manager.pipelines[0]
++
++        pipe.publish_samples(None, counters)
++        pipe.flush(None)
++        publisher = pipeline_manager.pipelines[0].publishers[0]
++        self.assertEqual(expected_length, len(publisher.samples))
++        return sorted(publisher.samples, key=lambda s: s.volume)
++
++    def test_aggregator_metadata(self):
++        for conf, expected_version in [('last', '2.0'), ('first', '1.0')]:
++            samples = self._do_test_aggregator({
++                'resource_metadata': conf,
++                'target': {'name': 'aggregated-bytes'}
++            }, expected_length=4)
++            s = samples[0]
++            self.assertEqual('aggregated-bytes', s.name)
++            self.assertEqual(2, s.volume)
++            self.assertEqual('test_user_bis', s.user_id)
++            self.assertEqual('test_proj', s.project_id)
++            self.assertEqual({'version': '3.0'},
++                             s.resource_metadata)
++            s = samples[1]
++            self.assertEqual('aggregated-bytes', s.name)
++            self.assertEqual(15, s.volume)
++            self.assertEqual('test_user', s.user_id)
++            self.assertEqual('test_proj_bis', s.project_id)
++            self.assertEqual({'version': '2.0'},
++                             s.resource_metadata)
++            s = samples[2]
++            self.assertEqual('aggregated-bytes', s.name)
++            self.assertEqual(42, s.volume)
++            self.assertEqual('test_user', s.user_id)
++            self.assertEqual('test_proj', s.project_id)
++            self.assertEqual({'version': expected_version},
++                             s.resource_metadata)
++            s = samples[3]
++            self.assertEqual('aggregated-bytes', s.name)
++            self.assertEqual(95, s.volume)
++            self.assertEqual('test_user_bis', s.user_id)
++            self.assertEqual('test_proj_bis', s.project_id)
++            self.assertEqual({'version': expected_version},
++                             s.resource_metadata)
++
++    def test_aggregator_user_last_and_metadata_last(self):
++        samples = self._do_test_aggregator({
++            'resource_metadata': 'last',
++            'user_id': 'last',
++            'target': {'name': 'aggregated-bytes'}
++        }, expected_length=2)
++        s = samples[0]
++        self.assertEqual('aggregated-bytes', s.name)
++        self.assertEqual(44, s.volume)
++        self.assertEqual('test_user_bis', s.user_id)
++        self.assertEqual('test_proj', s.project_id)
++        self.assertEqual({'version': '3.0'},
++                         s.resource_metadata)
++        s = samples[1]
++        self.assertEqual('aggregated-bytes', s.name)
++        self.assertEqual(110, s.volume)
++        self.assertEqual('test_user', s.user_id)
++        self.assertEqual('test_proj_bis', s.project_id)
++        self.assertEqual({'version': '2.0'},
++                         s.resource_metadata)
++
++    def test_aggregator_user_first_and_metadata_last(self):
++        samples = self._do_test_aggregator({
++            'resource_metadata': 'last',
++            'user_id': 'first',
++            'target': {'name': 'aggregated-bytes'}
++        }, expected_length=2)
++        s = samples[0]
++        self.assertEqual('aggregated-bytes', s.name)
++        self.assertEqual(44, s.volume)
++        self.assertEqual('test_user', s.user_id)
++        self.assertEqual('test_proj', s.project_id)
++        self.assertEqual({'version': '3.0'},
++                         s.resource_metadata)
++        s = samples[1]
++        self.assertEqual('aggregated-bytes', s.name)
++        self.assertEqual(110, s.volume)
++        self.assertEqual('test_user_bis', s.user_id)
++        self.assertEqual('test_proj_bis', s.project_id)
++        self.assertEqual({'version': '2.0'},
++                         s.resource_metadata)
++
++    def test_aggregator_all_first(self):
++        samples = self._do_test_aggregator({
++            'resource_metadata': 'first',
++            'user_id': 'first',
++            'project_id': 'first',
++            'target': {'name': 'aggregated-bytes'}
++        }, expected_length=1)
++        s = samples[0]
++        self.assertEqual('aggregated-bytes', s.name)
++        self.assertEqual(154, s.volume)
++        self.assertEqual('test_user', s.user_id)
++        self.assertEqual('test_proj', s.project_id)
++        self.assertEqual({'version': '1.0'},
++                         s.resource_metadata)
++
++    def test_aggregator_all_last(self):
++        samples = self._do_test_aggregator({
++            'resource_metadata': 'last',
++            'user_id': 'last',
++            'project_id': 'last',
++            'target': {'name': 'aggregated-bytes'}
++        }, expected_length=1)
++        s = samples[0]
++        self.assertEqual('aggregated-bytes', s.name)
++        self.assertEqual(154, s.volume)
++        self.assertEqual('test_user_bis', s.user_id)
++        self.assertEqual('test_proj', s.project_id)
++        self.assertEqual({'version': '3.0'},
++                         s.resource_metadata)
++
++    def test_aggregator_all_mixed(self):
++        samples = self._do_test_aggregator({
++            'resource_metadata': 'drop',
++            'user_id': 'first',
++            'project_id': 'last',
++            'target': {'name': 'aggregated-bytes'}
++        }, expected_length=1)
++        s = samples[0]
++        self.assertEqual('aggregated-bytes', s.name)
++        self.assertEqual(154, s.volume)
++        self.assertEqual('test_user', s.user_id)
++        self.assertEqual('test_proj', s.project_id)
++        self.assertEqual({}, s.resource_metadata)
++
++    def test_aggregator_metadata_default(self):
++        samples = self._do_test_aggregator({
++            'user_id': 'last',
++            'project_id': 'last',
++            'target': {'name': 'aggregated-bytes'}
++        }, expected_length=1)
++        s = samples[0]
++        self.assertEqual('aggregated-bytes', s.name)
++        self.assertEqual(154, s.volume)
++        self.assertEqual('test_user_bis', s.user_id)
++        self.assertEqual('test_proj', s.project_id)
++        self.assertEqual({'version': '3.0'},
++                         s.resource_metadata)
++
++    @mock.patch('ceilometer.transformer.conversions.LOG')
++    def test_aggregator_metadata_invalid(self, mylog):
++        samples = self._do_test_aggregator({
++            'resource_metadata': 'invalid',
++            'user_id': 'last',
++            'project_id': 'last',
++            'target': {'name': 'aggregated-bytes'}
++        }, expected_length=1)
++        s = samples[0]
++        self.assertTrue(mylog.warn.called)
++        self.assertEqual('aggregated-bytes', s.name)
++        self.assertEqual(154, s.volume)
++        self.assertEqual('test_user_bis', s.user_id)
++        self.assertEqual('test_proj', s.project_id)
++        self.assertEqual({'version': '3.0'},
++                         s.resource_metadata)
++
++    def test_aggregator_sized_flush(self):
++        transformer_cfg = [
++            {
++                'name': 'aggregator',
++                'parameters': {'size': 2},
++            },
++        ]
++        self._set_pipeline_cfg('transformers', transformer_cfg)
++        self._set_pipeline_cfg('counters', ['storage.objects.incoming.bytes'])
++        counters = [
++            sample.Sample(
++                name='storage.objects.incoming.bytes',
++                type=sample.TYPE_DELTA,
++                volume=26,
++                unit='B',
++                user_id='test_user',
++                project_id='test_proj',
++                resource_id='test_resource',
++                timestamp=timeutils.utcnow().isoformat(),
++                resource_metadata={'version': '1.0'}
++            ),
++            sample.Sample(
++                name='storage.objects.incoming.bytes',
++                type=sample.TYPE_DELTA,
++                volume=16,
++                unit='B',
++                user_id='test_user_bis',
++                project_id='test_proj_bis',
++                resource_id='test_resource',
++                timestamp=timeutils.utcnow().isoformat(),
++                resource_metadata={'version': '2.0'}
++            )
++        ]
++
++        pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
++                                                    self.transformer_manager)
++        pipe = pipeline_manager.pipelines[0]
++
++        pipe.publish_samples(None, [counters[0]])
++        pipe.flush(None)
++        publisher = pipe.publishers[0]
++        self.assertEqual(0, len(publisher.samples))
++
++        pipe.publish_samples(None, [counters[1]])
++        pipe.flush(None)
++        publisher = pipe.publishers[0]
++        self.assertEqual(2, len(publisher.samples))
++
++    def test_aggregator_timed_flush(self):
++        timeutils.set_time_override()
++        transformer_cfg = [
++            {
++                'name': 'aggregator',
++                'parameters': {'size': 900, 'retention_time': 60},
++            },
++        ]
++        self._set_pipeline_cfg('transformers', transformer_cfg)
++        self._set_pipeline_cfg('counters', ['storage.objects.incoming.bytes'])
++        counters = [
++            sample.Sample(
++                name='storage.objects.incoming.bytes',
++                type=sample.TYPE_DELTA,
++                volume=26,
++                unit='B',
++                user_id='test_user',
++                project_id='test_proj',
++                resource_id='test_resource',
++                timestamp=timeutils.utcnow().isoformat(),
++                resource_metadata={'version': '1.0'}
++            ),
++        ]
++
++        pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
++                                                    self.transformer_manager)
++        pipe = pipeline_manager.pipelines[0]
++
++        pipe.publish_samples(None, counters)
++        pipe.flush(None)
++        publisher = pipeline_manager.pipelines[0].publishers[0]
++        self.assertEqual(0, len(publisher.samples))
++
++        timeutils.advance_time_seconds(120)
++        pipe.flush(None)
++        publisher = pipeline_manager.pipelines[0].publishers[0]
++        self.assertEqual(1, len(publisher.samples))
+diff --git a/ceilometer/transformer/conversions.py b/ceilometer/transformer/conversions.py
+index 3a7d079..bfa9f0c 100644
+--- a/ceilometer/transformer/conversions.py
++++ b/ceilometer/transformer/conversions.py
+@@ -163,3 +163,89 @@ class RateOfChangeTransformer(ScalingTransformer):
+                      (s,))
+             s = None
+         return s
++
++
++class AggregatorTransformer(ScalingTransformer):
++    """Transformer that aggregate sample until a threshold or/and a
++    retention_time, and then flush them out in the wild.
++
++    Example:
++      To aggregate sample by resource_metadata and keep the
++      resource_metadata of the latest received sample;
++
++        AggregatorTransformer(retention_time=60, resource_metadata='last')
++
++      To aggregate sample by user_id and resource_metadata and keep the
++      user_id of the first received sample and drop the resource_metadata.
++
++        AggregatorTransformer(size=15, user_id='first',
++                              resource_metadata='drop')
++
++    """
++
++    def __init__(self, size=1, retention_time=None,
++                 project_id=None, user_id=None, resource_metadata="last",
++                 **kwargs):
++        super(AggregatorTransformer, self).__init__(**kwargs)
++        self.samples = {}
++        self.size = size
++        self.retention_time = retention_time
++        self.initial_timestamp = None
++        self.aggregated_samples = 0
++
++        self.key_attributes = []
++        self.merged_attribute_policy = {}
++
++        self._init_attribute('project_id', project_id)
++        self._init_attribute('user_id', user_id)
++        self._init_attribute('resource_metadata', resource_metadata,
++                             is_droppable=True, mandatory=True)
++
++    def _init_attribute(self, name, value, is_droppable=False,
++                        mandatory=False):
++        drop = ['drop'] if is_droppable else []
++        if value or mandatory:
++            if value not in ['last', 'first'] + drop:
++                LOG.warn('%s is unknown (%s), using last' % (name, value))
++                value = 'last'
++            self.merged_attribute_policy[name] = value
++        else:
++            self.key_attributes.append(name)
++
++    def _get_unique_key(self, s):
++        non_aggregated_keys = "-".join([getattr(s, field)
++                                        for field in self.key_attributes])
++        #NOTE(sileht): it assumes, a meter always have the same unit/type
++        return "%s-%s-%s" % (s.name, s.resource_id, non_aggregated_keys)
++
++    def handle_sample(self, context, sample):
++        if not self.initial_timestamp:
++            self.initial_timestamp = timeutils.parse_strtime(
++                sample.timestamp)
++
++        self.aggregated_samples += 1
++        key = self._get_unique_key(sample)
++        if key not in self.samples:
++            self.samples[key] = self._convert(sample)
++            if self.merged_attribute_policy[
++                    'resource_metadata'] == 'drop':
++                self.samples[key].resource_metadata = {}
++        else:
++            self.samples[key].volume += self._scale(sample)
++            for field in self.merged_attribute_policy:
++                if self.merged_attribute_policy[field] == 'last':
++                    setattr(self.samples[key], field,
++                            getattr(sample, field))
++
++    def flush(self, context):
++        expired = self.retention_time and \
++            timeutils.is_older_than(self.initial_timestamp,
++                                    self.retention_time)
++        full = self.aggregated_samples >= self.size
++        if full or expired:
++            x = self.samples.values()
++            self.samples = {}
++            self.aggregated_samples = 0
++            self.initial_timestamp = None
++            return x
++        return []
+diff --git a/setup.cfg b/setup.cfg
+index ef67325..3f19142 100644
+--- a/setup.cfg
++++ b/setup.cfg
+@@ -156,6 +156,7 @@ ceilometer.transformer =
+     accumulator = ceilometer.transformer.accumulator:TransformerAccumulator
+     unit_conversion = ceilometer.transformer.conversions:ScalingTransformer
+     rate_of_change = ceilometer.transformer.conversions:RateOfChangeTransformer
++    aggregator = ceilometer.transformer.conversions:AggregatorTransformer
+ ceilometer.publisher =
+     test = ceilometer.publisher.test:TestPublisher
index d8e4612f5b55fd051b50f6ab72443ea46a84d3a9..76ba8c82752ad6688988762f9ca58f1ebcc4f5e9 100644 (file)
@@ -1,2 +1,3 @@
 using-mongodb-by-default.patch
 Opencontrail_network_statistics_driver.patch
+Add_aggregator_transformer.patch