1 # Licensed under the Apache License, Version 2.0 (the "License"); you may
2 # not use this file except in compliance with the License. You may obtain
3 # a copy of the License at
5 # http://www.apache.org/licenses/LICENSE-2.0
7 # Unless required by applicable law or agreed to in writing, software
8 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10 # License for the specific language governing permissions and limitations
15 from neutron.common import exceptions as n_exc
16 from neutron.db import api as db_api
17 from neutron.db import models_v2
18 from neutron.objects.qos import policy
19 from neutron.objects.qos import rule
20 from neutron.tests.unit.objects import test_base
21 from neutron.tests.unit import testlib_api
24 class QosPolicyObjectTestCase(test_base.BaseObjectIfaceTestCase):
26 _test_class = policy.QosPolicy
29 super(QosPolicyObjectTestCase, self).setUp()
30 # qos_policy_ids will be incorrect, but we don't care in this test
31 self.db_qos_bandwidth_rules = [
32 self.get_random_fields(rule.QosBandwidthLimitRule)
36 self._test_class.db_model: self.db_objs,
37 rule.QosBandwidthLimitRule.db_model: self.db_qos_bandwidth_rules}
39 def fake_get_objects(self, context, model, **kwargs):
40 return self.model_map[model]
42 def fake_get_object(self, context, model, id):
43 objects = self.model_map[model]
44 return [obj for obj in objects if obj['id'] == id][0]
46 def test_get_objects(self):
47 admin_context = self.context.elevated()
48 with mock.patch.object(
49 db_api, 'get_objects',
50 side_effect=self.fake_get_objects) as get_objects_mock:
52 with mock.patch.object(
54 side_effect=self.fake_get_object):
56 with mock.patch.object(
59 return_value=admin_context) as context_mock:
61 objs = self._test_class.get_objects(self.context)
62 context_mock.assert_called_once_with()
63 get_objects_mock.assert_any_call(
64 admin_context, self._test_class.db_model)
65 self._validate_objects(self.db_objs, objs)
67 def test_get_objects_valid_fields(self):
68 admin_context = self.context.elevated()
70 with mock.patch.object(
71 db_api, 'get_objects',
72 return_value=[self.db_obj]) as get_objects_mock:
74 with mock.patch.object(
77 return_value=admin_context) as context_mock:
79 objs = self._test_class.get_objects(
81 **self.valid_field_filter)
82 context_mock.assert_called_once_with()
83 get_objects_mock.assert_any_call(
84 admin_context, self._test_class.db_model,
85 **self.valid_field_filter)
86 self._validate_objects([self.db_obj], objs)
88 def test_get_by_id(self):
89 admin_context = self.context.elevated()
90 with mock.patch.object(db_api, 'get_object',
91 return_value=self.db_obj) as get_object_mock:
92 with mock.patch.object(self.context,
94 return_value=admin_context) as context_mock:
95 obj = self._test_class.get_by_id(self.context, id='fake_id')
96 self.assertTrue(self._is_test_class(obj))
97 self.assertEqual(self.db_obj, test_base.get_obj_db_fields(obj))
98 context_mock.assert_called_once_with()
99 get_object_mock.assert_called_once_with(
100 admin_context, self._test_class.db_model, id='fake_id')
103 class QosPolicyDbObjectTestCase(test_base.BaseDbObjectTestCase,
104 testlib_api.SqlTestCase):
106 _test_class = policy.QosPolicy
109 super(QosPolicyDbObjectTestCase, self).setUp()
110 self._create_test_network()
111 self._create_test_port(self._network)
113 def _create_test_policy(self):
114 policy_obj = policy.QosPolicy(self.context, **self.db_obj)
118 def _create_test_policy_with_rule(self):
119 policy_obj = self._create_test_policy()
121 rule_fields = self.get_random_fields(
122 obj_cls=rule.QosBandwidthLimitRule)
123 rule_fields['qos_policy_id'] = policy_obj.id
125 rule_obj = rule.QosBandwidthLimitRule(self.context, **rule_fields)
128 return policy_obj, rule_obj
130 def _create_test_network(self):
131 # TODO(ihrachys): replace with network.create() once we get an object
132 # implementation for networks
133 self._network = db_api.create_object(self.context, models_v2.Network,
134 {'name': 'test-network1'})
136 def _create_test_port(self, network):
137 # TODO(ihrachys): replace with port.create() once we get an object
138 # implementation for ports
139 self._port = db_api.create_object(self.context, models_v2.Port,
140 {'name': 'test-port1',
141 'network_id': network['id'],
142 'mac_address': 'fake_mac',
143 'admin_state_up': True,
145 'device_id': 'fake_device',
146 'device_owner': 'fake_owner'})
148 def test_attach_network_get_network_policy(self):
150 obj = self._create_test_policy()
152 policy_obj = policy.QosPolicy.get_network_policy(self.context,
154 self.assertIsNone(policy_obj)
156 # Now attach policy and repeat
157 obj.attach_network(self._network['id'])
159 policy_obj = policy.QosPolicy.get_network_policy(self.context,
161 self.assertEqual(obj, policy_obj)
163 def test_attach_network_nonexistent_network(self):
165 obj = self._create_test_policy()
166 self.assertRaises(n_exc.NetworkQosBindingNotFound,
167 obj.attach_network, 'non-existent-network')
169 def test_attach_port_nonexistent_port(self):
171 obj = self._create_test_policy()
172 self.assertRaises(n_exc.PortQosBindingNotFound,
173 obj.attach_port, 'non-existent-port')
175 def test_attach_network_nonexistent_policy(self):
177 policy_obj = policy.QosPolicy(self.context, **self.db_obj)
178 self.assertRaises(n_exc.NetworkQosBindingNotFound,
179 policy_obj.attach_network, self._network['id'])
181 def test_attach_port_nonexistent_policy(self):
183 policy_obj = policy.QosPolicy(self.context, **self.db_obj)
184 self.assertRaises(n_exc.PortQosBindingNotFound,
185 policy_obj.attach_port, self._port['id'])
187 def test_attach_port_get_port_policy(self):
189 obj = self._create_test_policy()
191 policy_obj = policy.QosPolicy.get_network_policy(self.context,
194 self.assertIsNone(policy_obj)
196 # Now attach policy and repeat
197 obj.attach_port(self._port['id'])
199 policy_obj = policy.QosPolicy.get_port_policy(self.context,
201 self.assertEqual(obj, policy_obj)
203 def test_detach_port(self):
204 obj = self._create_test_policy()
205 obj.attach_port(self._port['id'])
206 obj.detach_port(self._port['id'])
208 policy_obj = policy.QosPolicy.get_port_policy(self.context,
210 self.assertIsNone(policy_obj)
212 def test_detach_network(self):
213 obj = self._create_test_policy()
214 obj.attach_network(self._network['id'])
215 obj.detach_network(self._network['id'])
217 policy_obj = policy.QosPolicy.get_network_policy(self.context,
219 self.assertIsNone(policy_obj)
221 def test_detach_port_nonexistent_port(self):
222 obj = self._create_test_policy()
223 self.assertRaises(n_exc.PortQosBindingNotFound,
224 obj.detach_port, 'non-existent-port')
226 def test_detach_network_nonexistent_network(self):
227 obj = self._create_test_policy()
228 self.assertRaises(n_exc.NetworkQosBindingNotFound,
229 obj.detach_network, 'non-existent-port')
231 def test_detach_port_nonexistent_policy(self):
232 policy_obj = policy.QosPolicy(self.context, **self.db_obj)
233 self.assertRaises(n_exc.PortQosBindingNotFound,
234 policy_obj.detach_port, self._port['id'])
236 def test_detach_network_nonexistent_policy(self):
237 policy_obj = policy.QosPolicy(self.context, **self.db_obj)
238 self.assertRaises(n_exc.NetworkQosBindingNotFound,
239 policy_obj.detach_network, self._network['id'])
241 def test_synthetic_rule_fields(self):
242 policy_obj, rule_obj = self._create_test_policy_with_rule()
243 policy_obj = policy.QosPolicy.get_by_id(self.context, policy_obj.id)
244 self.assertEqual([rule_obj], policy_obj.rules)
246 def test_get_by_id_fetches_rules_non_lazily(self):
247 policy_obj, rule_obj = self._create_test_policy_with_rule()
248 policy_obj = policy.QosPolicy.get_by_id(self.context, policy_obj.id)
250 primitive = policy_obj.obj_to_primitive()
251 self.assertNotEqual([], (primitive['versioned_object.data']['rules']))
253 def test_to_dict_returns_rules_as_dicts(self):
254 policy_obj, rule_obj = self._create_test_policy_with_rule()
255 policy_obj = policy.QosPolicy.get_by_id(self.context, policy_obj.id)
257 obj_dict = policy_obj.to_dict()
258 rule_dict = rule_obj.to_dict()
260 # first make sure that to_dict() is still sane and does not return
262 for obj in (rule_dict, obj_dict):
263 self.assertIsInstance(obj, dict)
265 self.assertEqual(rule_dict, obj_dict['rules'][0])
267 def test_shared_default(self):
268 self.db_obj.pop('shared')
269 obj = self._test_class(self.context, **self.db_obj)
270 self.assertFalse(obj.shared)
272 def test_delete_not_allowed_if_policy_in_use_by_port(self):
273 obj = self._create_test_policy()
274 obj.attach_port(self._port['id'])
276 self.assertRaises(n_exc.QosPolicyInUse, obj.delete)
278 obj.detach_port(self._port['id'])
281 def test_delete_not_allowed_if_policy_in_use_by_network(self):
282 obj = self._create_test_policy()
283 obj.attach_network(self._network['id'])
285 self.assertRaises(n_exc.QosPolicyInUse, obj.delete)
287 obj.detach_network(self._network['id'])
290 def test_reload_rules_reloads_rules(self):
291 policy_obj, rule_obj = self._create_test_policy_with_rule()
292 self.assertEqual([], policy_obj.rules)
294 policy_obj.reload_rules()
295 self.assertEqual([rule_obj], policy_obj.rules)