1 Description: transformer: Add aggregator transformer
2 This adds a transformer aggregate counters until a threshold and/or a
3 retention time and then flushing them out.
4 Author: Mehdi Abaakouk <mehdi.abaakouk@enovance.com>
5 Date: Wed, 9 Apr 2014 14:28:54 +0000 (+0200)
6 X-Git-Url: https://review.openstack.org/gitweb?p=openstack%2Fceilometer.git;a=commitdiff_plain;h=6957e7b66aa37685ec0d75fe7ef65cdab981d55f
7 Change-Id: If4a950e585fe5309ddec58ed2c9a92928ac1acb2
8 Co-Authored-By: Jordan Pittier <jordan.pittier@cloudwatt.com>
9 Origin: upstream, https://review.openstack.org/#/c/87238/
10 Last-Update: 2014-05-21
12 diff --git a/ceilometer/tests/pipeline_base.py b/ceilometer/tests/pipeline_base.py
13 index db51c64..f4f5a64 100644
14 --- a/ceilometer/tests/pipeline_base.py
15 +++ b/ceilometer/tests/pipeline_base.py
22 from stevedore import extension
24 @@ -52,6 +53,7 @@ class BasePipelineTestCase(test.BaseTestCase):
25 'except': self.TransformerClassException,
26 'drop': self.TransformerClassDrop,
27 'cache': accumulator.TransformerAccumulator,
28 + 'aggregator': conversions.AggregatorTransformer,
29 'unit_conversion': conversions.ScalingTransformer,
30 'rate_of_change': conversions.RateOfChangeTransformer,
32 @@ -1113,3 +1115,329 @@ class BasePipelineTestCase(test.BaseTestCase):
33 meters = ('disk.read.bytes', 'disk.write.requests')
34 units = ('B', 'request')
35 self._do_test_rate_of_change_mapping(pipe, meters, units)
37 + def _do_test_aggregator(self, parameters, expected_length):
40 + 'name': 'aggregator',
41 + 'parameters': parameters,
44 + self._set_pipeline_cfg('transformers', transformer_cfg)
45 + self._set_pipeline_cfg('counters', ['storage.objects.incoming.bytes'])
48 + name='storage.objects.incoming.bytes',
49 + type=sample.TYPE_DELTA,
52 + user_id='test_user',
53 + project_id='test_proj',
54 + resource_id='test_resource',
55 + timestamp=timeutils.utcnow().isoformat(),
56 + resource_metadata={'version': '1.0'}
59 + name='storage.objects.incoming.bytes',
60 + type=sample.TYPE_DELTA,
63 + user_id='test_user',
64 + project_id='test_proj',
65 + resource_id='test_resource',
66 + timestamp=timeutils.utcnow().isoformat(),
67 + resource_metadata={'version': '2.0'}
70 + name='storage.objects.incoming.bytes',
71 + type=sample.TYPE_DELTA,
74 + user_id='test_user_bis',
75 + project_id='test_proj_bis',
76 + resource_id='test_resource',
77 + timestamp=timeutils.utcnow().isoformat(),
78 + resource_metadata={'version': '1.0'}
81 + name='storage.objects.incoming.bytes',
82 + type=sample.TYPE_DELTA,
85 + user_id='test_user_bis',
86 + project_id='test_proj_bis',
87 + resource_id='test_resource',
88 + timestamp=timeutils.utcnow().isoformat(),
89 + resource_metadata={'version': '2.0'}
92 + name='storage.objects.incoming.bytes',
93 + type=sample.TYPE_DELTA,
96 + user_id='test_user',
97 + project_id='test_proj_bis',
98 + resource_id='test_resource',
99 + timestamp=timeutils.utcnow().isoformat(),
100 + resource_metadata={'version': '2.0'}
103 + name='storage.objects.incoming.bytes',
104 + type=sample.TYPE_DELTA,
107 + user_id='test_user_bis',
108 + project_id='test_proj',
109 + resource_id='test_resource',
110 + timestamp=timeutils.utcnow().isoformat(),
111 + resource_metadata={'version': '3.0'}
115 + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
116 + self.transformer_manager)
117 + pipe = pipeline_manager.pipelines[0]
119 + pipe.publish_samples(None, counters)
121 + publisher = pipeline_manager.pipelines[0].publishers[0]
122 + self.assertEqual(expected_length, len(publisher.samples))
123 + return sorted(publisher.samples, key=lambda s: s.volume)
125 + def test_aggregator_metadata(self):
126 + for conf, expected_version in [('last', '2.0'), ('first', '1.0')]:
127 + samples = self._do_test_aggregator({
128 + 'resource_metadata': conf,
129 + 'target': {'name': 'aggregated-bytes'}
130 + }, expected_length=4)
132 + self.assertEqual('aggregated-bytes', s.name)
133 + self.assertEqual(2, s.volume)
134 + self.assertEqual('test_user_bis', s.user_id)
135 + self.assertEqual('test_proj', s.project_id)
136 + self.assertEqual({'version': '3.0'},
137 + s.resource_metadata)
139 + self.assertEqual('aggregated-bytes', s.name)
140 + self.assertEqual(15, s.volume)
141 + self.assertEqual('test_user', s.user_id)
142 + self.assertEqual('test_proj_bis', s.project_id)
143 + self.assertEqual({'version': '2.0'},
144 + s.resource_metadata)
146 + self.assertEqual('aggregated-bytes', s.name)
147 + self.assertEqual(42, s.volume)
148 + self.assertEqual('test_user', s.user_id)
149 + self.assertEqual('test_proj', s.project_id)
150 + self.assertEqual({'version': expected_version},
151 + s.resource_metadata)
153 + self.assertEqual('aggregated-bytes', s.name)
154 + self.assertEqual(95, s.volume)
155 + self.assertEqual('test_user_bis', s.user_id)
156 + self.assertEqual('test_proj_bis', s.project_id)
157 + self.assertEqual({'version': expected_version},
158 + s.resource_metadata)
160 + def test_aggregator_user_last_and_metadata_last(self):
161 + samples = self._do_test_aggregator({
162 + 'resource_metadata': 'last',
164 + 'target': {'name': 'aggregated-bytes'}
165 + }, expected_length=2)
167 + self.assertEqual('aggregated-bytes', s.name)
168 + self.assertEqual(44, s.volume)
169 + self.assertEqual('test_user_bis', s.user_id)
170 + self.assertEqual('test_proj', s.project_id)
171 + self.assertEqual({'version': '3.0'},
172 + s.resource_metadata)
174 + self.assertEqual('aggregated-bytes', s.name)
175 + self.assertEqual(110, s.volume)
176 + self.assertEqual('test_user', s.user_id)
177 + self.assertEqual('test_proj_bis', s.project_id)
178 + self.assertEqual({'version': '2.0'},
179 + s.resource_metadata)
181 + def test_aggregator_user_first_and_metadata_last(self):
182 + samples = self._do_test_aggregator({
183 + 'resource_metadata': 'last',
184 + 'user_id': 'first',
185 + 'target': {'name': 'aggregated-bytes'}
186 + }, expected_length=2)
188 + self.assertEqual('aggregated-bytes', s.name)
189 + self.assertEqual(44, s.volume)
190 + self.assertEqual('test_user', s.user_id)
191 + self.assertEqual('test_proj', s.project_id)
192 + self.assertEqual({'version': '3.0'},
193 + s.resource_metadata)
195 + self.assertEqual('aggregated-bytes', s.name)
196 + self.assertEqual(110, s.volume)
197 + self.assertEqual('test_user_bis', s.user_id)
198 + self.assertEqual('test_proj_bis', s.project_id)
199 + self.assertEqual({'version': '2.0'},
200 + s.resource_metadata)
202 + def test_aggregator_all_first(self):
203 + samples = self._do_test_aggregator({
204 + 'resource_metadata': 'first',
205 + 'user_id': 'first',
206 + 'project_id': 'first',
207 + 'target': {'name': 'aggregated-bytes'}
208 + }, expected_length=1)
210 + self.assertEqual('aggregated-bytes', s.name)
211 + self.assertEqual(154, s.volume)
212 + self.assertEqual('test_user', s.user_id)
213 + self.assertEqual('test_proj', s.project_id)
214 + self.assertEqual({'version': '1.0'},
215 + s.resource_metadata)
217 + def test_aggregator_all_last(self):
218 + samples = self._do_test_aggregator({
219 + 'resource_metadata': 'last',
221 + 'project_id': 'last',
222 + 'target': {'name': 'aggregated-bytes'}
223 + }, expected_length=1)
225 + self.assertEqual('aggregated-bytes', s.name)
226 + self.assertEqual(154, s.volume)
227 + self.assertEqual('test_user_bis', s.user_id)
228 + self.assertEqual('test_proj', s.project_id)
229 + self.assertEqual({'version': '3.0'},
230 + s.resource_metadata)
232 + def test_aggregator_all_mixed(self):
233 + samples = self._do_test_aggregator({
234 + 'resource_metadata': 'drop',
235 + 'user_id': 'first',
236 + 'project_id': 'last',
237 + 'target': {'name': 'aggregated-bytes'}
238 + }, expected_length=1)
240 + self.assertEqual('aggregated-bytes', s.name)
241 + self.assertEqual(154, s.volume)
242 + self.assertEqual('test_user', s.user_id)
243 + self.assertEqual('test_proj', s.project_id)
244 + self.assertEqual({}, s.resource_metadata)
246 + def test_aggregator_metadata_default(self):
247 + samples = self._do_test_aggregator({
249 + 'project_id': 'last',
250 + 'target': {'name': 'aggregated-bytes'}
251 + }, expected_length=1)
253 + self.assertEqual('aggregated-bytes', s.name)
254 + self.assertEqual(154, s.volume)
255 + self.assertEqual('test_user_bis', s.user_id)
256 + self.assertEqual('test_proj', s.project_id)
257 + self.assertEqual({'version': '3.0'},
258 + s.resource_metadata)
260 + @mock.patch('ceilometer.transformer.conversions.LOG')
261 + def test_aggregator_metadata_invalid(self, mylog):
262 + samples = self._do_test_aggregator({
263 + 'resource_metadata': 'invalid',
265 + 'project_id': 'last',
266 + 'target': {'name': 'aggregated-bytes'}
267 + }, expected_length=1)
269 + self.assertTrue(mylog.warn.called)
270 + self.assertEqual('aggregated-bytes', s.name)
271 + self.assertEqual(154, s.volume)
272 + self.assertEqual('test_user_bis', s.user_id)
273 + self.assertEqual('test_proj', s.project_id)
274 + self.assertEqual({'version': '3.0'},
275 + s.resource_metadata)
277 + def test_aggregator_sized_flush(self):
278 + transformer_cfg = [
280 + 'name': 'aggregator',
281 + 'parameters': {'size': 2},
284 + self._set_pipeline_cfg('transformers', transformer_cfg)
285 + self._set_pipeline_cfg('counters', ['storage.objects.incoming.bytes'])
288 + name='storage.objects.incoming.bytes',
289 + type=sample.TYPE_DELTA,
292 + user_id='test_user',
293 + project_id='test_proj',
294 + resource_id='test_resource',
295 + timestamp=timeutils.utcnow().isoformat(),
296 + resource_metadata={'version': '1.0'}
299 + name='storage.objects.incoming.bytes',
300 + type=sample.TYPE_DELTA,
303 + user_id='test_user_bis',
304 + project_id='test_proj_bis',
305 + resource_id='test_resource',
306 + timestamp=timeutils.utcnow().isoformat(),
307 + resource_metadata={'version': '2.0'}
311 + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
312 + self.transformer_manager)
313 + pipe = pipeline_manager.pipelines[0]
315 + pipe.publish_samples(None, [counters[0]])
317 + publisher = pipe.publishers[0]
318 + self.assertEqual(0, len(publisher.samples))
320 + pipe.publish_samples(None, [counters[1]])
322 + publisher = pipe.publishers[0]
323 + self.assertEqual(2, len(publisher.samples))
325 + def test_aggregator_timed_flush(self):
326 + timeutils.set_time_override()
327 + transformer_cfg = [
329 + 'name': 'aggregator',
330 + 'parameters': {'size': 900, 'retention_time': 60},
333 + self._set_pipeline_cfg('transformers', transformer_cfg)
334 + self._set_pipeline_cfg('counters', ['storage.objects.incoming.bytes'])
337 + name='storage.objects.incoming.bytes',
338 + type=sample.TYPE_DELTA,
341 + user_id='test_user',
342 + project_id='test_proj',
343 + resource_id='test_resource',
344 + timestamp=timeutils.utcnow().isoformat(),
345 + resource_metadata={'version': '1.0'}
349 + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
350 + self.transformer_manager)
351 + pipe = pipeline_manager.pipelines[0]
353 + pipe.publish_samples(None, counters)
355 + publisher = pipeline_manager.pipelines[0].publishers[0]
356 + self.assertEqual(0, len(publisher.samples))
358 + timeutils.advance_time_seconds(120)
360 + publisher = pipeline_manager.pipelines[0].publishers[0]
361 + self.assertEqual(1, len(publisher.samples))
362 diff --git a/ceilometer/transformer/conversions.py b/ceilometer/transformer/conversions.py
363 index 3a7d079..bfa9f0c 100644
364 --- a/ceilometer/transformer/conversions.py
365 +++ b/ceilometer/transformer/conversions.py
366 @@ -163,3 +163,89 @@ class RateOfChangeTransformer(ScalingTransformer):
372 +class AggregatorTransformer(ScalingTransformer):
373 + """Transformer that aggregate sample until a threshold or/and a
374 + retention_time, and then flush them out in the wild.
377 + To aggregate sample by resource_metadata and keep the
378 + resource_metadata of the latest received sample;
380 + AggregatorTransformer(retention_time=60, resource_metadata='last')
382 + To aggregate sample by user_id and resource_metadata and keep the
383 + user_id of the first received sample and drop the resource_metadata.
385 + AggregatorTransformer(size=15, user_id='first',
386 + resource_metadata='drop')
390 + def __init__(self, size=1, retention_time=None,
391 + project_id=None, user_id=None, resource_metadata="last",
393 + super(AggregatorTransformer, self).__init__(**kwargs)
396 + self.retention_time = retention_time
397 + self.initial_timestamp = None
398 + self.aggregated_samples = 0
400 + self.key_attributes = []
401 + self.merged_attribute_policy = {}
403 + self._init_attribute('project_id', project_id)
404 + self._init_attribute('user_id', user_id)
405 + self._init_attribute('resource_metadata', resource_metadata,
406 + is_droppable=True, mandatory=True)
408 + def _init_attribute(self, name, value, is_droppable=False,
410 + drop = ['drop'] if is_droppable else []
411 + if value or mandatory:
412 + if value not in ['last', 'first'] + drop:
413 + LOG.warn('%s is unknown (%s), using last' % (name, value))
415 + self.merged_attribute_policy[name] = value
417 + self.key_attributes.append(name)
419 + def _get_unique_key(self, s):
420 + non_aggregated_keys = "-".join([getattr(s, field)
421 + for field in self.key_attributes])
422 + #NOTE(sileht): it assumes, a meter always have the same unit/type
423 + return "%s-%s-%s" % (s.name, s.resource_id, non_aggregated_keys)
425 + def handle_sample(self, context, sample):
426 + if not self.initial_timestamp:
427 + self.initial_timestamp = timeutils.parse_strtime(
430 + self.aggregated_samples += 1
431 + key = self._get_unique_key(sample)
432 + if key not in self.samples:
433 + self.samples[key] = self._convert(sample)
434 + if self.merged_attribute_policy[
435 + 'resource_metadata'] == 'drop':
436 + self.samples[key].resource_metadata = {}
438 + self.samples[key].volume += self._scale(sample)
439 + for field in self.merged_attribute_policy:
440 + if self.merged_attribute_policy[field] == 'last':
441 + setattr(self.samples[key], field,
442 + getattr(sample, field))
444 + def flush(self, context):
445 + expired = self.retention_time and \
446 + timeutils.is_older_than(self.initial_timestamp,
447 + self.retention_time)
448 + full = self.aggregated_samples >= self.size
449 + if full or expired:
450 + x = self.samples.values()
452 + self.aggregated_samples = 0
453 + self.initial_timestamp = None
456 diff --git a/setup.cfg b/setup.cfg
457 index ef67325..3f19142 100644
460 @@ -156,6 +156,7 @@ ceilometer.transformer =
461 accumulator = ceilometer.transformer.accumulator:TransformerAccumulator
462 unit_conversion = ceilometer.transformer.conversions:ScalingTransformer
463 rate_of_change = ceilometer.transformer.conversions:RateOfChangeTransformer
464 + aggregator = ceilometer.transformer.conversions:AggregatorTransformer
466 ceilometer.publisher =
467 test = ceilometer.publisher.test:TestPublisher