a5fdad2f2354d2127feb42764d86501017942ea8
[openstack-build/ceilometer-build.git] / trusty / debian / patches / Add_aggregator_transformer.patch
1 Description: transformer: Add aggregator transformer
2  This adds a transformer aggregate counters until a threshold and/or a
3  retention time and then flushing them out.
4 Author: Mehdi Abaakouk <mehdi.abaakouk@enovance.com>
5 Date: Wed, 9 Apr 2014 14:28:54 +0000 (+0200)
6 X-Git-Url: https://review.openstack.org/gitweb?p=openstack%2Fceilometer.git;a=commitdiff_plain;h=6957e7b66aa37685ec0d75fe7ef65cdab981d55f
7 Change-Id: If4a950e585fe5309ddec58ed2c9a92928ac1acb2
8 Co-Authored-By: Jordan Pittier <jordan.pittier@cloudwatt.com>
9 Origin: upstream, https://review.openstack.org/#/c/87238/
10 Last-Update: 2014-05-21
11
12 diff --git a/ceilometer/tests/pipeline_base.py b/ceilometer/tests/pipeline_base.py
13 index db51c64..f4f5a64 100644
14 --- a/ceilometer/tests/pipeline_base.py
15 +++ b/ceilometer/tests/pipeline_base.py
16 @@ -20,6 +20,7 @@
17  import abc
18  import datetime
19  
20 +import mock
21  import six
22  from stevedore import extension
23  
24 @@ -52,6 +53,7 @@ class BasePipelineTestCase(test.BaseTestCase):
25              'except': self.TransformerClassException,
26              'drop': self.TransformerClassDrop,
27              'cache': accumulator.TransformerAccumulator,
28 +            'aggregator': conversions.AggregatorTransformer,
29              'unit_conversion': conversions.ScalingTransformer,
30              'rate_of_change': conversions.RateOfChangeTransformer,
31          }
32 @@ -1113,3 +1115,329 @@ class BasePipelineTestCase(test.BaseTestCase):
33          meters = ('disk.read.bytes', 'disk.write.requests')
34          units = ('B', 'request')
35          self._do_test_rate_of_change_mapping(pipe, meters, units)
36 +
37 +    def _do_test_aggregator(self, parameters, expected_length):
38 +        transformer_cfg = [
39 +            {
40 +                'name': 'aggregator',
41 +                'parameters': parameters,
42 +            },
43 +        ]
44 +        self._set_pipeline_cfg('transformers', transformer_cfg)
45 +        self._set_pipeline_cfg('counters', ['storage.objects.incoming.bytes'])
46 +        counters = [
47 +            sample.Sample(
48 +                name='storage.objects.incoming.bytes',
49 +                type=sample.TYPE_DELTA,
50 +                volume=26,
51 +                unit='B',
52 +                user_id='test_user',
53 +                project_id='test_proj',
54 +                resource_id='test_resource',
55 +                timestamp=timeutils.utcnow().isoformat(),
56 +                resource_metadata={'version': '1.0'}
57 +            ),
58 +            sample.Sample(
59 +                name='storage.objects.incoming.bytes',
60 +                type=sample.TYPE_DELTA,
61 +                volume=16,
62 +                unit='B',
63 +                user_id='test_user',
64 +                project_id='test_proj',
65 +                resource_id='test_resource',
66 +                timestamp=timeutils.utcnow().isoformat(),
67 +                resource_metadata={'version': '2.0'}
68 +            ),
69 +            sample.Sample(
70 +                name='storage.objects.incoming.bytes',
71 +                type=sample.TYPE_DELTA,
72 +                volume=53,
73 +                unit='B',
74 +                user_id='test_user_bis',
75 +                project_id='test_proj_bis',
76 +                resource_id='test_resource',
77 +                timestamp=timeutils.utcnow().isoformat(),
78 +                resource_metadata={'version': '1.0'}
79 +            ),
80 +            sample.Sample(
81 +                name='storage.objects.incoming.bytes',
82 +                type=sample.TYPE_DELTA,
83 +                volume=42,
84 +                unit='B',
85 +                user_id='test_user_bis',
86 +                project_id='test_proj_bis',
87 +                resource_id='test_resource',
88 +                timestamp=timeutils.utcnow().isoformat(),
89 +                resource_metadata={'version': '2.0'}
90 +            ),
91 +            sample.Sample(
92 +                name='storage.objects.incoming.bytes',
93 +                type=sample.TYPE_DELTA,
94 +                volume=15,
95 +                unit='B',
96 +                user_id='test_user',
97 +                project_id='test_proj_bis',
98 +                resource_id='test_resource',
99 +                timestamp=timeutils.utcnow().isoformat(),
100 +                resource_metadata={'version': '2.0'}
101 +            ),
102 +            sample.Sample(
103 +                name='storage.objects.incoming.bytes',
104 +                type=sample.TYPE_DELTA,
105 +                volume=2,
106 +                unit='B',
107 +                user_id='test_user_bis',
108 +                project_id='test_proj',
109 +                resource_id='test_resource',
110 +                timestamp=timeutils.utcnow().isoformat(),
111 +                resource_metadata={'version': '3.0'}
112 +            ),
113 +        ]
114 +
115 +        pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
116 +                                                    self.transformer_manager)
117 +        pipe = pipeline_manager.pipelines[0]
118 +
119 +        pipe.publish_samples(None, counters)
120 +        pipe.flush(None)
121 +        publisher = pipeline_manager.pipelines[0].publishers[0]
122 +        self.assertEqual(expected_length, len(publisher.samples))
123 +        return sorted(publisher.samples, key=lambda s: s.volume)
124 +
125 +    def test_aggregator_metadata(self):
126 +        for conf, expected_version in [('last', '2.0'), ('first', '1.0')]:
127 +            samples = self._do_test_aggregator({
128 +                'resource_metadata': conf,
129 +                'target': {'name': 'aggregated-bytes'}
130 +            }, expected_length=4)
131 +            s = samples[0]
132 +            self.assertEqual('aggregated-bytes', s.name)
133 +            self.assertEqual(2, s.volume)
134 +            self.assertEqual('test_user_bis', s.user_id)
135 +            self.assertEqual('test_proj', s.project_id)
136 +            self.assertEqual({'version': '3.0'},
137 +                             s.resource_metadata)
138 +            s = samples[1]
139 +            self.assertEqual('aggregated-bytes', s.name)
140 +            self.assertEqual(15, s.volume)
141 +            self.assertEqual('test_user', s.user_id)
142 +            self.assertEqual('test_proj_bis', s.project_id)
143 +            self.assertEqual({'version': '2.0'},
144 +                             s.resource_metadata)
145 +            s = samples[2]
146 +            self.assertEqual('aggregated-bytes', s.name)
147 +            self.assertEqual(42, s.volume)
148 +            self.assertEqual('test_user', s.user_id)
149 +            self.assertEqual('test_proj', s.project_id)
150 +            self.assertEqual({'version': expected_version},
151 +                             s.resource_metadata)
152 +            s = samples[3]
153 +            self.assertEqual('aggregated-bytes', s.name)
154 +            self.assertEqual(95, s.volume)
155 +            self.assertEqual('test_user_bis', s.user_id)
156 +            self.assertEqual('test_proj_bis', s.project_id)
157 +            self.assertEqual({'version': expected_version},
158 +                             s.resource_metadata)
159 +
160 +    def test_aggregator_user_last_and_metadata_last(self):
161 +        samples = self._do_test_aggregator({
162 +            'resource_metadata': 'last',
163 +            'user_id': 'last',
164 +            'target': {'name': 'aggregated-bytes'}
165 +        }, expected_length=2)
166 +        s = samples[0]
167 +        self.assertEqual('aggregated-bytes', s.name)
168 +        self.assertEqual(44, s.volume)
169 +        self.assertEqual('test_user_bis', s.user_id)
170 +        self.assertEqual('test_proj', s.project_id)
171 +        self.assertEqual({'version': '3.0'},
172 +                         s.resource_metadata)
173 +        s = samples[1]
174 +        self.assertEqual('aggregated-bytes', s.name)
175 +        self.assertEqual(110, s.volume)
176 +        self.assertEqual('test_user', s.user_id)
177 +        self.assertEqual('test_proj_bis', s.project_id)
178 +        self.assertEqual({'version': '2.0'},
179 +                         s.resource_metadata)
180 +
181 +    def test_aggregator_user_first_and_metadata_last(self):
182 +        samples = self._do_test_aggregator({
183 +            'resource_metadata': 'last',
184 +            'user_id': 'first',
185 +            'target': {'name': 'aggregated-bytes'}
186 +        }, expected_length=2)
187 +        s = samples[0]
188 +        self.assertEqual('aggregated-bytes', s.name)
189 +        self.assertEqual(44, s.volume)
190 +        self.assertEqual('test_user', s.user_id)
191 +        self.assertEqual('test_proj', s.project_id)
192 +        self.assertEqual({'version': '3.0'},
193 +                         s.resource_metadata)
194 +        s = samples[1]
195 +        self.assertEqual('aggregated-bytes', s.name)
196 +        self.assertEqual(110, s.volume)
197 +        self.assertEqual('test_user_bis', s.user_id)
198 +        self.assertEqual('test_proj_bis', s.project_id)
199 +        self.assertEqual({'version': '2.0'},
200 +                         s.resource_metadata)
201 +
202 +    def test_aggregator_all_first(self):
203 +        samples = self._do_test_aggregator({
204 +            'resource_metadata': 'first',
205 +            'user_id': 'first',
206 +            'project_id': 'first',
207 +            'target': {'name': 'aggregated-bytes'}
208 +        }, expected_length=1)
209 +        s = samples[0]
210 +        self.assertEqual('aggregated-bytes', s.name)
211 +        self.assertEqual(154, s.volume)
212 +        self.assertEqual('test_user', s.user_id)
213 +        self.assertEqual('test_proj', s.project_id)
214 +        self.assertEqual({'version': '1.0'},
215 +                         s.resource_metadata)
216 +
217 +    def test_aggregator_all_last(self):
218 +        samples = self._do_test_aggregator({
219 +            'resource_metadata': 'last',
220 +            'user_id': 'last',
221 +            'project_id': 'last',
222 +            'target': {'name': 'aggregated-bytes'}
223 +        }, expected_length=1)
224 +        s = samples[0]
225 +        self.assertEqual('aggregated-bytes', s.name)
226 +        self.assertEqual(154, s.volume)
227 +        self.assertEqual('test_user_bis', s.user_id)
228 +        self.assertEqual('test_proj', s.project_id)
229 +        self.assertEqual({'version': '3.0'},
230 +                         s.resource_metadata)
231 +
232 +    def test_aggregator_all_mixed(self):
233 +        samples = self._do_test_aggregator({
234 +            'resource_metadata': 'drop',
235 +            'user_id': 'first',
236 +            'project_id': 'last',
237 +            'target': {'name': 'aggregated-bytes'}
238 +        }, expected_length=1)
239 +        s = samples[0]
240 +        self.assertEqual('aggregated-bytes', s.name)
241 +        self.assertEqual(154, s.volume)
242 +        self.assertEqual('test_user', s.user_id)
243 +        self.assertEqual('test_proj', s.project_id)
244 +        self.assertEqual({}, s.resource_metadata)
245 +
246 +    def test_aggregator_metadata_default(self):
247 +        samples = self._do_test_aggregator({
248 +            'user_id': 'last',
249 +            'project_id': 'last',
250 +            'target': {'name': 'aggregated-bytes'}
251 +        }, expected_length=1)
252 +        s = samples[0]
253 +        self.assertEqual('aggregated-bytes', s.name)
254 +        self.assertEqual(154, s.volume)
255 +        self.assertEqual('test_user_bis', s.user_id)
256 +        self.assertEqual('test_proj', s.project_id)
257 +        self.assertEqual({'version': '3.0'},
258 +                         s.resource_metadata)
259 +
260 +    @mock.patch('ceilometer.transformer.conversions.LOG')
261 +    def test_aggregator_metadata_invalid(self, mylog):
262 +        samples = self._do_test_aggregator({
263 +            'resource_metadata': 'invalid',
264 +            'user_id': 'last',
265 +            'project_id': 'last',
266 +            'target': {'name': 'aggregated-bytes'}
267 +        }, expected_length=1)
268 +        s = samples[0]
269 +        self.assertTrue(mylog.warn.called)
270 +        self.assertEqual('aggregated-bytes', s.name)
271 +        self.assertEqual(154, s.volume)
272 +        self.assertEqual('test_user_bis', s.user_id)
273 +        self.assertEqual('test_proj', s.project_id)
274 +        self.assertEqual({'version': '3.0'},
275 +                         s.resource_metadata)
276 +
277 +    def test_aggregator_sized_flush(self):
278 +        transformer_cfg = [
279 +            {
280 +                'name': 'aggregator',
281 +                'parameters': {'size': 2},
282 +            },
283 +        ]
284 +        self._set_pipeline_cfg('transformers', transformer_cfg)
285 +        self._set_pipeline_cfg('counters', ['storage.objects.incoming.bytes'])
286 +        counters = [
287 +            sample.Sample(
288 +                name='storage.objects.incoming.bytes',
289 +                type=sample.TYPE_DELTA,
290 +                volume=26,
291 +                unit='B',
292 +                user_id='test_user',
293 +                project_id='test_proj',
294 +                resource_id='test_resource',
295 +                timestamp=timeutils.utcnow().isoformat(),
296 +                resource_metadata={'version': '1.0'}
297 +            ),
298 +            sample.Sample(
299 +                name='storage.objects.incoming.bytes',
300 +                type=sample.TYPE_DELTA,
301 +                volume=16,
302 +                unit='B',
303 +                user_id='test_user_bis',
304 +                project_id='test_proj_bis',
305 +                resource_id='test_resource',
306 +                timestamp=timeutils.utcnow().isoformat(),
307 +                resource_metadata={'version': '2.0'}
308 +            )
309 +        ]
310 +
311 +        pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
312 +                                                    self.transformer_manager)
313 +        pipe = pipeline_manager.pipelines[0]
314 +
315 +        pipe.publish_samples(None, [counters[0]])
316 +        pipe.flush(None)
317 +        publisher = pipe.publishers[0]
318 +        self.assertEqual(0, len(publisher.samples))
319 +
320 +        pipe.publish_samples(None, [counters[1]])
321 +        pipe.flush(None)
322 +        publisher = pipe.publishers[0]
323 +        self.assertEqual(2, len(publisher.samples))
324 +
325 +    def test_aggregator_timed_flush(self):
326 +        timeutils.set_time_override()
327 +        transformer_cfg = [
328 +            {
329 +                'name': 'aggregator',
330 +                'parameters': {'size': 900, 'retention_time': 60},
331 +            },
332 +        ]
333 +        self._set_pipeline_cfg('transformers', transformer_cfg)
334 +        self._set_pipeline_cfg('counters', ['storage.objects.incoming.bytes'])
335 +        counters = [
336 +            sample.Sample(
337 +                name='storage.objects.incoming.bytes',
338 +                type=sample.TYPE_DELTA,
339 +                volume=26,
340 +                unit='B',
341 +                user_id='test_user',
342 +                project_id='test_proj',
343 +                resource_id='test_resource',
344 +                timestamp=timeutils.utcnow().isoformat(),
345 +                resource_metadata={'version': '1.0'}
346 +            ),
347 +        ]
348 +
349 +        pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
350 +                                                    self.transformer_manager)
351 +        pipe = pipeline_manager.pipelines[0]
352 +
353 +        pipe.publish_samples(None, counters)
354 +        pipe.flush(None)
355 +        publisher = pipeline_manager.pipelines[0].publishers[0]
356 +        self.assertEqual(0, len(publisher.samples))
357 +
358 +        timeutils.advance_time_seconds(120)
359 +        pipe.flush(None)
360 +        publisher = pipeline_manager.pipelines[0].publishers[0]
361 +        self.assertEqual(1, len(publisher.samples))
362 diff --git a/ceilometer/transformer/conversions.py b/ceilometer/transformer/conversions.py
363 index 3a7d079..bfa9f0c 100644
364 --- a/ceilometer/transformer/conversions.py
365 +++ b/ceilometer/transformer/conversions.py
366 @@ -163,3 +163,89 @@ class RateOfChangeTransformer(ScalingTransformer):
367                       (s,))
368              s = None
369          return s
370 +
371 +
372 +class AggregatorTransformer(ScalingTransformer):
373 +    """Transformer that aggregate sample until a threshold or/and a
374 +    retention_time, and then flush them out in the wild.
375 +
376 +    Example:
377 +      To aggregate sample by resource_metadata and keep the
378 +      resource_metadata of the latest received sample;
379 +
380 +        AggregatorTransformer(retention_time=60, resource_metadata='last')
381 +
382 +      To aggregate sample by user_id and resource_metadata and keep the
383 +      user_id of the first received sample and drop the resource_metadata.
384 +
385 +        AggregatorTransformer(size=15, user_id='first',
386 +                              resource_metadata='drop')
387 +
388 +    """
389 +
390 +    def __init__(self, size=1, retention_time=None,
391 +                 project_id=None, user_id=None, resource_metadata="last",
392 +                 **kwargs):
393 +        super(AggregatorTransformer, self).__init__(**kwargs)
394 +        self.samples = {}
395 +        self.size = size
396 +        self.retention_time = retention_time
397 +        self.initial_timestamp = None
398 +        self.aggregated_samples = 0
399 +
400 +        self.key_attributes = []
401 +        self.merged_attribute_policy = {}
402 +
403 +        self._init_attribute('project_id', project_id)
404 +        self._init_attribute('user_id', user_id)
405 +        self._init_attribute('resource_metadata', resource_metadata,
406 +                             is_droppable=True, mandatory=True)
407 +
408 +    def _init_attribute(self, name, value, is_droppable=False,
409 +                        mandatory=False):
410 +        drop = ['drop'] if is_droppable else []
411 +        if value or mandatory:
412 +            if value not in ['last', 'first'] + drop:
413 +                LOG.warn('%s is unknown (%s), using last' % (name, value))
414 +                value = 'last'
415 +            self.merged_attribute_policy[name] = value
416 +        else:
417 +            self.key_attributes.append(name)
418 +
419 +    def _get_unique_key(self, s):
420 +        non_aggregated_keys = "-".join([getattr(s, field)
421 +                                        for field in self.key_attributes])
422 +        #NOTE(sileht): it assumes, a meter always have the same unit/type
423 +        return "%s-%s-%s" % (s.name, s.resource_id, non_aggregated_keys)
424 +
425 +    def handle_sample(self, context, sample):
426 +        if not self.initial_timestamp:
427 +            self.initial_timestamp = timeutils.parse_strtime(
428 +                sample.timestamp)
429 +
430 +        self.aggregated_samples += 1
431 +        key = self._get_unique_key(sample)
432 +        if key not in self.samples:
433 +            self.samples[key] = self._convert(sample)
434 +            if self.merged_attribute_policy[
435 +                    'resource_metadata'] == 'drop':
436 +                self.samples[key].resource_metadata = {}
437 +        else:
438 +            self.samples[key].volume += self._scale(sample)
439 +            for field in self.merged_attribute_policy:
440 +                if self.merged_attribute_policy[field] == 'last':
441 +                    setattr(self.samples[key], field,
442 +                            getattr(sample, field))
443 +
444 +    def flush(self, context):
445 +        expired = self.retention_time and \
446 +            timeutils.is_older_than(self.initial_timestamp,
447 +                                    self.retention_time)
448 +        full = self.aggregated_samples >= self.size
449 +        if full or expired:
450 +            x = self.samples.values()
451 +            self.samples = {}
452 +            self.aggregated_samples = 0
453 +            self.initial_timestamp = None
454 +            return x
455 +        return []
456 diff --git a/setup.cfg b/setup.cfg
457 index ef67325..3f19142 100644
458 --- a/setup.cfg
459 +++ b/setup.cfg
460 @@ -156,6 +156,7 @@ ceilometer.transformer =
461      accumulator = ceilometer.transformer.accumulator:TransformerAccumulator
462      unit_conversion = ceilometer.transformer.conversions:ScalingTransformer
463      rate_of_change = ceilometer.transformer.conversions:RateOfChangeTransformer
464 +    aggregator = ceilometer.transformer.conversions:AggregatorTransformer
465  
466  ceilometer.publisher =
467      test = ceilometer.publisher.test:TestPublisher