@abc.abstractmethod
def handle_port(self, context, data):
- """handle agent extension for port.
+ """Handle agent extension for port.
+
+ This can be called on either create or update, depending on the
+ code flow. Thus, it's this function's responsibility to check what
+ actually changed.
+
+ :param context - rpc context
+ :param data - port data
+ """
+
+ @abc.abstractmethod
+ def delete_port(self, context, data):
+ """Delete port from agent extension.
:param context - rpc context
:param data - port data
"while handling port update"),
{'name': extension.name}
)
- #TODO(Qos) we are missing how to handle delete. we can pass action
- #type in all the handle methods or add handle_delete_resource methods
+
+ def delete_port(self, context, data):
+ """Notify all agent extensions to delete port."""
+ for extension in self:
+ try:
+ extension.obj.delete_port(context, data)
+ # TODO(QoS) add agent extensions exception and catch them here
+ # instead of AttributeError
+ except AttributeError:
+ LOG.exception(
+ _LE("Agent Extension '%(name)s' failed "
+ "while handling port deletion"),
+ {'name': extension.name}
+ )
context, resources.QOS_POLICY, qos_policy_id)
self.qos_driver.create(port, qos_policy)
+ def delete_port(self, context, port):
+ self._process_reset_port(port)
+
def _process_update_policy(self, qos_policy):
for port_id, port in self.qos_policy_ports[qos_policy.id].items():
# TODO(QoS): for now, just reflush the rules on the port. Later, we
# longer have access to the network
self.sg_agent.remove_devices_filter([port_id])
port = self.int_br.get_vif_port_by_id(port_id)
+ self.ext_manager.delete_port(self.context,
+ {"vif_port": port,
+ "port_id": port_id})
if port:
# don't log errors since there is a chance someone will be
# removing the port from the bridge at the same time
self.manager.handle_port(context, data)
ext = self._get_extension()
ext.handle_port.assert_called_once_with(context, data)
+
+ def test_delete_port(self):
+ context = object()
+ data = object()
+ self.manager.delete_port(context, data)
+ ext = self._get_extension()
+ ext.delete_port.assert_called_once_with(context, data)
#TODO(QoS): handle qos_driver.update call check when
# we do that
+ def test_delete_known_port(self):
+ port = self._create_test_port_dict()
+ port_id = port['port_id']
+ self.qos_ext.handle_port(self.context, port)
+ self.qos_ext.qos_driver.reset_mock()
+ self.qos_ext.delete_port(self.context, port)
+ self.qos_ext.qos_driver.delete.assert_called_with(port, None)
+ self.assertNotIn(port_id, self.qos_ext.known_ports)
+
+ def test_delete_unknown_port(self):
+ port = self._create_test_port_dict()
+ port_id = port['port_id']
+ self.qos_ext.delete_port(self.context, port)
+ self.assertFalse(self.qos_ext.qos_driver.delete.called)
+ self.assertNotIn(port_id, self.qos_ext.known_ports)
+
def test__handle_notification_ignores_all_event_types_except_updated(self):
with mock.patch.object(
self.qos_ext, '_process_update_policy') as update_mock: