Set lock_path correctly.
[openstack-build/neutron-build.git] / neutron / tests / fake_notifier.py
1 # Copyright 2014 Red Hat, Inc.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 import collections
16 import functools
17
18
19 NOTIFICATIONS = []
20
21
22 def reset():
23     del NOTIFICATIONS[:]
24
25
26 FakeMessage = collections.namedtuple('Message',
27                                      ['publisher_id', 'priority',
28                                       'event_type', 'payload'])
29
30
31 class FakeNotifier(object):
32
33     def __init__(self, transport, publisher_id=None,
34                  driver=None, topic=None,
35                  serializer=None, retry=None):
36         self.transport = transport
37         self.publisher_id = publisher_id
38         for priority in ('debug', 'info', 'warn', 'error', 'critical'):
39             setattr(self, priority,
40                     functools.partial(self._notify, priority=priority.upper()))
41
42     def prepare(self, publisher_id=None):
43         if publisher_id is None:
44             publisher_id = self.publisher_id
45         return self.__class__(self.transport, publisher_id)
46
47     def _notify(self, ctxt, event_type, payload, priority):
48         msg = dict(publisher_id=self.publisher_id,
49                    priority=priority,
50                    event_type=event_type,
51                    payload=payload)
52         NOTIFICATIONS.append(msg)