message = _("The resource is inuse")
+class QosPolicyInUse(InUse):
+ message = _("QoS Policy %(policy_id)s is used by "
+ "%(object_type)s %(object_id)s.")
+
+
class NetworkInUse(InUse):
message = _("Unable to complete operation on network %(net_id)s. "
"There are one or more ports still in use on the network.")
super(QosPolicy, self).create()
self._load_rules()
+ def delete(self):
+ models = (
+ ('network', self.network_binding_model),
+ ('port', self.port_binding_model)
+ )
+ with db_api.autonested_transaction(self._context.session):
+ for object_type, model in models:
+ binding_db_obj = db_api.get_object(self._context, model,
+ policy_id=self.id)
+ if binding_db_obj:
+ raise exceptions.QosPolicyInUse(
+ policy_id=self.id,
+ object_type=object_type,
+ object_id=binding_db_obj['%s_id' % object_type])
+
+ super(QosPolicy, self).delete()
+
def attach_network(self, network_id):
qos_db_api.create_policy_network_binding(self._context,
policy_id=self.id,
self._disassociate_port(port['id'])
+ @test.attr(type='smoke')
+ @test.idempotent_id('18163237-8ba9-4db5-9525-bad6d2343c75')
+ def test_delete_not_allowed_if_policy_in_use_by_network(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=True)
+ network = self.create_shared_network(
+ 'test network', qos_policy_id=policy['id'])
+ self.assertRaises(
+ exceptions.Conflict,
+ self.admin_client.delete_qos_policy, policy['id'])
+
+ self._disassociate_network(self.admin_client, network['id'])
+ self.admin_client.delete_qos_policy(policy['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('24153230-84a9-4dd5-9525-bad6d2343c75')
+ def test_delete_not_allowed_if_policy_in_use_by_port(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=True)
+ network = self.create_shared_network('test network')
+ port = self.create_port(network, qos_policy_id=policy['id'])
+ self.assertRaises(
+ exceptions.Conflict,
+ self.admin_client.delete_qos_policy, policy['id'])
+
+ self._disassociate_port(port['id'])
+ self.admin_client.delete_qos_policy(policy['id'])
+
class QosBandwidthLimitRuleTestJSON(base.BaseAdminNetworkTest):
@classmethod
self.db_obj.pop('shared')
obj = self._test_class(self.context, **self.db_obj)
self.assertEqual(False, obj.shared)
+
+ def test_delete_not_allowed_if_policy_in_use_by_port(self):
+ obj = self._create_test_policy()
+ obj.attach_port(self._port['id'])
+
+ self.assertRaises(n_exc.QosPolicyInUse, obj.delete)
+
+ obj.detach_port(self._port['id'])
+ obj.delete()
+
+ def test_delete_not_allowed_if_policy_in_use_by_network(self):
+ obj = self._create_test_policy()
+ obj.attach_network(self._network['id'])
+
+ self.assertRaises(n_exc.QosPolicyInUse, obj.delete)
+
+ obj.detach_network(self._network['id'])
+ obj.delete()
self.ctxt, self.policy.id, {'policy': fields})
self._validate_registry_params(events.UPDATED)
- def test_delete_policy(self):
+ @mock.patch('neutron.db.api.get_object', return_value=None)
+ def test_delete_policy(self, *mocks):
self.qos_plugin.delete_policy(self.ctxt, self.policy.id)
self._validate_registry_params(events.DELETED)