Set lock_path correctly.
[openstack-build/neutron-build.git] / neutron / tests / unit / objects / qos / test_policy.py
1 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
2 #    not use this file except in compliance with the License. You may obtain
3 #    a copy of the License at
4 #
5 #         http://www.apache.org/licenses/LICENSE-2.0
6 #
7 #    Unless required by applicable law or agreed to in writing, software
8 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10 #    License for the specific language governing permissions and limitations
11 #    under the License.
12
13 import mock
14
15 from neutron.common import exceptions as n_exc
16 from neutron.db import api as db_api
17 from neutron.db import models_v2
18 from neutron.objects.qos import policy
19 from neutron.objects.qos import rule
20 from neutron.tests.unit.objects import test_base
21 from neutron.tests.unit import testlib_api
22
23
24 class QosPolicyObjectTestCase(test_base.BaseObjectIfaceTestCase):
25
26     _test_class = policy.QosPolicy
27
28     def setUp(self):
29         super(QosPolicyObjectTestCase, self).setUp()
30         # qos_policy_ids will be incorrect, but we don't care in this test
31         self.db_qos_bandwidth_rules = [
32             self.get_random_fields(rule.QosBandwidthLimitRule)
33             for _ in range(3)]
34
35         self.model_map = {
36             self._test_class.db_model: self.db_objs,
37             rule.QosBandwidthLimitRule.db_model: self.db_qos_bandwidth_rules}
38
39     def fake_get_objects(self, context, model, **kwargs):
40         return self.model_map[model]
41
42     def fake_get_object(self, context, model, id):
43         objects = self.model_map[model]
44         return [obj for obj in objects if obj['id'] == id][0]
45
46     def test_get_objects(self):
47         admin_context = self.context.elevated()
48         with mock.patch.object(
49             db_api, 'get_objects',
50             side_effect=self.fake_get_objects) as get_objects_mock:
51
52             with mock.patch.object(
53                 db_api, 'get_object',
54                 side_effect=self.fake_get_object):
55
56                 with mock.patch.object(
57                     self.context,
58                     'elevated',
59                     return_value=admin_context) as context_mock:
60
61                     objs = self._test_class.get_objects(self.context)
62                     context_mock.assert_called_once_with()
63             get_objects_mock.assert_any_call(
64                 admin_context, self._test_class.db_model)
65         self._validate_objects(self.db_objs, objs)
66
67     def test_get_objects_valid_fields(self):
68         admin_context = self.context.elevated()
69
70         with mock.patch.object(
71             db_api, 'get_objects',
72             return_value=[self.db_obj]) as get_objects_mock:
73
74             with mock.patch.object(
75                 self.context,
76                 'elevated',
77                 return_value=admin_context) as context_mock:
78
79                 objs = self._test_class.get_objects(
80                     self.context,
81                     **self.valid_field_filter)
82                 context_mock.assert_called_once_with()
83             get_objects_mock.assert_any_call(
84                 admin_context, self._test_class.db_model,
85                 **self.valid_field_filter)
86         self._validate_objects([self.db_obj], objs)
87
88     def test_get_by_id(self):
89         admin_context = self.context.elevated()
90         with mock.patch.object(db_api, 'get_object',
91                                return_value=self.db_obj) as get_object_mock:
92             with mock.patch.object(self.context,
93                                    'elevated',
94                                    return_value=admin_context) as context_mock:
95                 obj = self._test_class.get_by_id(self.context, id='fake_id')
96                 self.assertTrue(self._is_test_class(obj))
97                 self.assertEqual(self.db_obj, test_base.get_obj_db_fields(obj))
98                 context_mock.assert_called_once_with()
99                 get_object_mock.assert_called_once_with(
100                     admin_context, self._test_class.db_model, id='fake_id')
101
102
103 class QosPolicyDbObjectTestCase(test_base.BaseDbObjectTestCase,
104                                 testlib_api.SqlTestCase):
105
106     _test_class = policy.QosPolicy
107
108     def setUp(self):
109         super(QosPolicyDbObjectTestCase, self).setUp()
110         self._create_test_network()
111         self._create_test_port(self._network)
112
113     def _create_test_policy(self):
114         policy_obj = policy.QosPolicy(self.context, **self.db_obj)
115         policy_obj.create()
116         return policy_obj
117
118     def _create_test_policy_with_rule(self):
119         policy_obj = self._create_test_policy()
120
121         rule_fields = self.get_random_fields(
122             obj_cls=rule.QosBandwidthLimitRule)
123         rule_fields['qos_policy_id'] = policy_obj.id
124
125         rule_obj = rule.QosBandwidthLimitRule(self.context, **rule_fields)
126         rule_obj.create()
127
128         return policy_obj, rule_obj
129
130     def _create_test_network(self):
131         # TODO(ihrachys): replace with network.create() once we get an object
132         # implementation for networks
133         self._network = db_api.create_object(self.context, models_v2.Network,
134                                              {'name': 'test-network1'})
135
136     def _create_test_port(self, network):
137         # TODO(ihrachys): replace with port.create() once we get an object
138         # implementation for ports
139         self._port = db_api.create_object(self.context, models_v2.Port,
140                                           {'name': 'test-port1',
141                                            'network_id': network['id'],
142                                            'mac_address': 'fake_mac',
143                                            'admin_state_up': True,
144                                            'status': 'ACTIVE',
145                                            'device_id': 'fake_device',
146                                            'device_owner': 'fake_owner'})
147
148     def test_attach_network_get_network_policy(self):
149
150         obj = self._create_test_policy()
151
152         policy_obj = policy.QosPolicy.get_network_policy(self.context,
153                                                          self._network['id'])
154         self.assertIsNone(policy_obj)
155
156         # Now attach policy and repeat
157         obj.attach_network(self._network['id'])
158
159         policy_obj = policy.QosPolicy.get_network_policy(self.context,
160                                                          self._network['id'])
161         self.assertEqual(obj, policy_obj)
162
163     def test_attach_network_nonexistent_network(self):
164
165         obj = self._create_test_policy()
166         self.assertRaises(n_exc.NetworkQosBindingNotFound,
167                           obj.attach_network, 'non-existent-network')
168
169     def test_attach_port_nonexistent_port(self):
170
171         obj = self._create_test_policy()
172         self.assertRaises(n_exc.PortQosBindingNotFound,
173                           obj.attach_port, 'non-existent-port')
174
175     def test_attach_network_nonexistent_policy(self):
176
177         policy_obj = policy.QosPolicy(self.context, **self.db_obj)
178         self.assertRaises(n_exc.NetworkQosBindingNotFound,
179                           policy_obj.attach_network, self._network['id'])
180
181     def test_attach_port_nonexistent_policy(self):
182
183         policy_obj = policy.QosPolicy(self.context, **self.db_obj)
184         self.assertRaises(n_exc.PortQosBindingNotFound,
185                           policy_obj.attach_port, self._port['id'])
186
187     def test_attach_port_get_port_policy(self):
188
189         obj = self._create_test_policy()
190
191         policy_obj = policy.QosPolicy.get_network_policy(self.context,
192                                                          self._network['id'])
193
194         self.assertIsNone(policy_obj)
195
196         # Now attach policy and repeat
197         obj.attach_port(self._port['id'])
198
199         policy_obj = policy.QosPolicy.get_port_policy(self.context,
200                                                       self._port['id'])
201         self.assertEqual(obj, policy_obj)
202
203     def test_detach_port(self):
204         obj = self._create_test_policy()
205         obj.attach_port(self._port['id'])
206         obj.detach_port(self._port['id'])
207
208         policy_obj = policy.QosPolicy.get_port_policy(self.context,
209                                                       self._port['id'])
210         self.assertIsNone(policy_obj)
211
212     def test_detach_network(self):
213         obj = self._create_test_policy()
214         obj.attach_network(self._network['id'])
215         obj.detach_network(self._network['id'])
216
217         policy_obj = policy.QosPolicy.get_network_policy(self.context,
218                                                          self._network['id'])
219         self.assertIsNone(policy_obj)
220
221     def test_detach_port_nonexistent_port(self):
222         obj = self._create_test_policy()
223         self.assertRaises(n_exc.PortQosBindingNotFound,
224                           obj.detach_port, 'non-existent-port')
225
226     def test_detach_network_nonexistent_network(self):
227         obj = self._create_test_policy()
228         self.assertRaises(n_exc.NetworkQosBindingNotFound,
229                           obj.detach_network, 'non-existent-port')
230
231     def test_detach_port_nonexistent_policy(self):
232         policy_obj = policy.QosPolicy(self.context, **self.db_obj)
233         self.assertRaises(n_exc.PortQosBindingNotFound,
234                           policy_obj.detach_port, self._port['id'])
235
236     def test_detach_network_nonexistent_policy(self):
237         policy_obj = policy.QosPolicy(self.context, **self.db_obj)
238         self.assertRaises(n_exc.NetworkQosBindingNotFound,
239                           policy_obj.detach_network, self._network['id'])
240
241     def test_synthetic_rule_fields(self):
242         policy_obj, rule_obj = self._create_test_policy_with_rule()
243         policy_obj = policy.QosPolicy.get_by_id(self.context, policy_obj.id)
244         self.assertEqual([rule_obj], policy_obj.rules)
245
246     def test_get_by_id_fetches_rules_non_lazily(self):
247         policy_obj, rule_obj = self._create_test_policy_with_rule()
248         policy_obj = policy.QosPolicy.get_by_id(self.context, policy_obj.id)
249
250         primitive = policy_obj.obj_to_primitive()
251         self.assertNotEqual([], (primitive['versioned_object.data']['rules']))
252
253     def test_to_dict_returns_rules_as_dicts(self):
254         policy_obj, rule_obj = self._create_test_policy_with_rule()
255         policy_obj = policy.QosPolicy.get_by_id(self.context, policy_obj.id)
256
257         obj_dict = policy_obj.to_dict()
258         rule_dict = rule_obj.to_dict()
259
260         # first make sure that to_dict() is still sane and does not return
261         # objects
262         for obj in (rule_dict, obj_dict):
263             self.assertIsInstance(obj, dict)
264
265         self.assertEqual(rule_dict, obj_dict['rules'][0])
266
267     def test_shared_default(self):
268         self.db_obj.pop('shared')
269         obj = self._test_class(self.context, **self.db_obj)
270         self.assertFalse(obj.shared)
271
272     def test_delete_not_allowed_if_policy_in_use_by_port(self):
273         obj = self._create_test_policy()
274         obj.attach_port(self._port['id'])
275
276         self.assertRaises(n_exc.QosPolicyInUse, obj.delete)
277
278         obj.detach_port(self._port['id'])
279         obj.delete()
280
281     def test_delete_not_allowed_if_policy_in_use_by_network(self):
282         obj = self._create_test_policy()
283         obj.attach_network(self._network['id'])
284
285         self.assertRaises(n_exc.QosPolicyInUse, obj.delete)
286
287         obj.detach_network(self._network['id'])
288         obj.delete()
289
290     def test_reload_rules_reloads_rules(self):
291         policy_obj, rule_obj = self._create_test_policy_with_rule()
292         self.assertEqual([], policy_obj.rules)
293
294         policy_obj.reload_rules()
295         self.assertEqual([rule_obj], policy_obj.rules)