Set lock_path correctly.
[openstack-build/neutron-build.git] / neutron / tests / unit / agent / l3 / test_router_processing_queue.py
1 # Copyright 2014 Hewlett-Packard Development Company, L.P.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14 #
15
16 import datetime
17
18 from oslo_utils import uuidutils
19
20 from neutron.agent.l3 import router_processing_queue as l3_queue
21 from neutron.tests import base
22
23 _uuid = uuidutils.generate_uuid
24 FAKE_ID = _uuid()
25 FAKE_ID_2 = _uuid()
26
27
28 class TestExclusiveRouterProcessor(base.BaseTestCase):
29     def setUp(self):
30         super(TestExclusiveRouterProcessor, self).setUp()
31
32     def test_i_am_master(self):
33         master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
34         not_master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
35         master_2 = l3_queue.ExclusiveRouterProcessor(FAKE_ID_2)
36         not_master_2 = l3_queue.ExclusiveRouterProcessor(FAKE_ID_2)
37
38         self.assertTrue(master._i_am_master())
39         self.assertFalse(not_master._i_am_master())
40         self.assertTrue(master_2._i_am_master())
41         self.assertFalse(not_master_2._i_am_master())
42
43         master.__exit__(None, None, None)
44         master_2.__exit__(None, None, None)
45
46     def test_master(self):
47         master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
48         not_master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
49         master_2 = l3_queue.ExclusiveRouterProcessor(FAKE_ID_2)
50         not_master_2 = l3_queue.ExclusiveRouterProcessor(FAKE_ID_2)
51
52         self.assertEqual(master, master._master)
53         self.assertEqual(master, not_master._master)
54         self.assertEqual(master_2, master_2._master)
55         self.assertEqual(master_2, not_master_2._master)
56
57         master.__exit__(None, None, None)
58         master_2.__exit__(None, None, None)
59
60     def test__enter__(self):
61         self.assertNotIn(FAKE_ID, l3_queue.ExclusiveRouterProcessor._masters)
62         master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
63         master.__enter__()
64         self.assertIn(FAKE_ID, l3_queue.ExclusiveRouterProcessor._masters)
65         master.__exit__(None, None, None)
66
67     def test__exit__(self):
68         master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
69         not_master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
70         master.__enter__()
71         self.assertIn(FAKE_ID, l3_queue.ExclusiveRouterProcessor._masters)
72         not_master.__enter__()
73         not_master.__exit__(None, None, None)
74         self.assertIn(FAKE_ID, l3_queue.ExclusiveRouterProcessor._masters)
75         master.__exit__(None, None, None)
76         self.assertNotIn(FAKE_ID, l3_queue.ExclusiveRouterProcessor._masters)
77
78     def test_data_fetched_since(self):
79         master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
80         self.assertEqual(datetime.datetime.min,
81                          master._get_router_data_timestamp())
82
83         ts1 = datetime.datetime.utcnow() - datetime.timedelta(seconds=10)
84         ts2 = datetime.datetime.utcnow()
85
86         master.fetched_and_processed(ts2)
87         self.assertEqual(ts2, master._get_router_data_timestamp())
88         master.fetched_and_processed(ts1)
89         self.assertEqual(ts2, master._get_router_data_timestamp())
90
91         master.__exit__(None, None, None)
92
93     def test_updates(self):
94         master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
95         not_master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
96
97         master.queue_update(l3_queue.RouterUpdate(FAKE_ID, 0))
98         not_master.queue_update(l3_queue.RouterUpdate(FAKE_ID, 0))
99
100         for update in not_master.updates():
101             raise Exception("Only the master should process a router")
102
103         self.assertEqual(2, len([i for i in master.updates()]))