from sqlalchemy.orm import exc
from sqlalchemy.orm import joinedload
+from neutron.callbacks import events
+from neutron.callbacks import registry
+from neutron.callbacks import resources
from neutron.common import constants as q_const
from neutron.common import utils as n_utils
from neutron.db import agents_db
from neutron.db import model_base
from neutron.db import models_v2
from neutron.i18n import _LI, _LW
+from neutron import manager
+from neutron.plugins.common import constants as service_constants
from neutron.plugins.ml2 import db as ml2_db
LOG = logging.getLogger(__name__)
self.bind_dvr_router_servicenode(
context, router_id, chosen_agent)
return chosen_agent
+
+
+def _notify_l3_agent_new_port(resource, event, trigger, **kwargs):
+ LOG.debug('Received %s %s' % (resource, event))
+ port = kwargs['port']
+ if not port:
+ return
+
+ l3plugin = manager.NeutronManager.get_service_plugins().get(
+ service_constants.L3_ROUTER_NAT)
+ mac_address_updated = kwargs.get('mac_address_updated')
+ update_device_up = kwargs.get('update_device_up')
+ context = kwargs['context']
+ if mac_address_updated or update_device_up:
+ l3plugin.dvr_vmarp_table_update(context, port, "add")
+ if n_utils.is_dvr_serviced(port['device_owner']):
+ l3plugin.dvr_update_router_addvm(context, port)
+
+
+def subscribe():
+ registry.subscribe(
+ _notify_l3_agent_new_port, resources.PORT, events.AFTER_UPDATE)
+ registry.subscribe(
+ _notify_l3_agent_new_port, resources.PORT, events.AFTER_CREATE)
from neutron.api.rpc.handlers import metadata_rpc
from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.api.v2 import attributes
+from neutron.callbacks import events
+from neutron.callbacks import registry
+from neutron.callbacks import resources
from neutron.common import constants as const
from neutron.common import exceptions as exc
from neutron.common import ipv6_utils
if self.type_manager.network_matches_filters(network, filters)
]
- def _notify_l3_agent_new_port(self, context, port):
- if not port:
- return
-
- # Whenever a DVR serviceable port comes up on a
- # node, it has to be communicated to the L3 Plugin
- # and agent for creating the respective namespaces.
- if (utils.is_dvr_serviced(port['device_owner'])):
- l3plugin = manager.NeutronManager.get_service_plugins().get(
- service_constants.L3_ROUTER_NAT)
- if (utils.is_extension_supported(
- l3plugin, const.L3_DISTRIBUTED_EXT_ALIAS)):
- l3plugin.dvr_update_router_addvm(context, port)
-
def _get_host_port_if_changed(self, mech_context, attrs):
binding = mech_context._binding
host = attrs and attrs.get(portbindings.HOST_ID)
attrs = port[attributes.PORT]
result, mech_context = self._create_port_db(context, port)
new_host_port = self._get_host_port_if_changed(mech_context, attrs)
- self._notify_l3_agent_new_port(context, new_host_port)
+ # notify any plugin that is interested in port create events
+ kwargs = {'context': context, 'port': new_host_port}
+ registry.notify(resources.PORT, events.AFTER_CREATE, self, **kwargs)
try:
self.mechanism_manager.create_port_postcommit(mech_context)
if attrs and attrs.get(portbindings.HOST_ID):
new_host_port = self._get_host_port_if_changed(
obj['mech_context'], attrs)
- self._notify_l3_agent_new_port(context, new_host_port)
+ kwargs = {'context': context, 'port': new_host_port}
+ registry.notify(
+ resources.PORT, events.AFTER_CREATE, self, **kwargs)
try:
for obj in objects:
def update_port(self, context, id, port):
attrs = port[attributes.PORT]
need_port_update_notify = False
- l3plugin = manager.NeutronManager.get_service_plugins().get(
- service_constants.L3_ROUTER_NAT)
- is_dvr_enabled = utils.is_extension_supported(
- l3plugin, const.L3_DISTRIBUTED_EXT_ALIAS)
-
session = context.session
# REVISIT: Serialize this operation with a semaphore to
self.mechanism_manager.update_port_precommit(mech_context)
# Notifications must be sent after the above transaction is complete
- if mac_address_updated and l3plugin and is_dvr_enabled:
- # NOTE: "add" actually does a 'replace' operation
- l3plugin.dvr_vmarp_table_update(context, updated_port, "add")
- self._notify_l3_agent_new_port(context, new_host_port)
+ kwargs = {
+ 'context': context,
+ 'port': new_host_port,
+ 'mac_address_updated': mac_address_updated,
+ }
+ registry.notify(resources.PORT, events.AFTER_UPDATE, self, **kwargs)
# TODO(apech) - handle errors raised by update_port, potentially
# by re-calling update_port with the previous attributes. For
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.handlers import dvr_rpc
+from neutron.callbacks import events
+from neutron.callbacks import registry
+from neutron.callbacks import resources
from neutron.common import constants as q_const
from neutron.common import exceptions
from neutron.common import rpc as n_rpc
from neutron.common import topics
-from neutron.common import utils
from neutron.extensions import portbindings
from neutron.i18n import _LW
from neutron import manager
-from neutron.plugins.common import constants as service_constants
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2.drivers import type_tunnel
# REVISIT(kmestery): Allow the type and mechanism drivers to supply the
port_id = plugin.update_port_status(rpc_context, port_id,
q_const.PORT_STATUS_ACTIVE,
host)
- l3plugin = manager.NeutronManager.get_service_plugins().get(
- service_constants.L3_ROUTER_NAT)
- if (l3plugin and
- utils.is_extension_supported(l3plugin,
- q_const.L3_DISTRIBUTED_EXT_ALIAS)):
- try:
- port = plugin._get_port(rpc_context, port_id)
- l3plugin.dvr_vmarp_table_update(rpc_context, port, "add")
- except exceptions.PortNotFound:
- LOG.debug('Port %s not found during ARP update', port_id)
+ try:
+ # NOTE(armax): it's best to remove all objects from the
+ # session, before we try to retrieve the new port object
+ rpc_context.session.expunge_all()
+ port = plugin._get_port(rpc_context, port_id)
+ except exceptions.PortNotFound:
+ LOG.debug('Port %s not found during update', port_id)
+ else:
+ kwargs = {
+ 'context': rpc_context,
+ 'port': port,
+ 'update_device_up': True
+ }
+ registry.notify(
+ resources.PORT, events.AFTER_UPDATE, plugin, **kwargs)
class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
cfg.CONF.router_scheduler_driver)
self.start_periodic_l3_agent_status_check()
super(L3RouterPlugin, self).__init__()
+ if 'dvr' in self.supported_extension_aliases:
+ l3_dvrscheduler_db.subscribe()
def setup_rpc(self):
# RPC support
def setUp(self):
super(TestMl2PluginCreateUpdateDeletePort, self).setUp()
self.context = mock.MagicMock()
+ self.notify_p = mock.patch('neutron.callbacks.registry.notify')
+ self.notify = self.notify_p.start()
def _ensure_transaction_is_closed(self):
transaction = self.context.session.begin(subtransactions=True)
return_value=new_host_port)
plugin._check_mac_update_allowed = mock.Mock(return_value=True)
- plugin._notify_l3_agent_new_port = mock.Mock()
- plugin._notify_l3_agent_new_port.side_effect = (
- lambda c, p: self._ensure_transaction_is_closed())
+ self.notify.side_effect = (
+ lambda r, e, t, **kwargs: self._ensure_transaction_is_closed())
return plugin
plugin.create_port(self.context, mock.MagicMock())
- plugin._notify_l3_agent_new_port.assert_called_once_with(
- self.context, new_host_port)
+ kwargs = {'context': self.context, 'port': new_host_port}
+ self.notify.assert_called_once_with('port', 'after_create',
+ plugin, **kwargs)
def test_update_port_rpc_outside_transaction(self):
with contextlib.nested(
mock.patch.object(ml2_plugin.Ml2Plugin, '__init__'),
mock.patch.object(base_plugin.NeutronDbPluginV2, 'update_port'),
- mock.patch.object(manager.NeutronManager, 'get_service_plugins'),
- ) as (init, super_update_port, get_service_plugins):
+ ) as (init, super_update_port):
init.return_value = None
- l3plugin = mock.Mock()
- l3plugin.supported_extension_aliases = [
- constants.L3_DISTRIBUTED_EXT_ALIAS,
- ]
- get_service_plugins.return_value = {
- service_constants.L3_ROUTER_NAT: l3plugin,
- }
-
new_host_port = mock.Mock()
plugin = self._create_plugin_for_create_update_port(new_host_port)
plugin.update_port(self.context, 'fake_id', mock.MagicMock())
- plugin._notify_l3_agent_new_port.assert_called_once_with(
- self.context, new_host_port)
- l3plugin.dvr_vmarp_table_update.assert_called_once_with(
- self.context, mock.ANY, "add")
+ kwargs = {
+ 'context': self.context,
+ 'port': new_host_port,
+ 'mac_address_updated': True,
+ }
+ self.notify.assert_called_once_with('port', 'after_update',
+ plugin, **kwargs)
def test_vmarp_table_update_outside_of_delete_transaction(self):
l3plugin = mock.Mock()
self.type_manager)
self.manager = mock.patch.object(
plugin_rpc.manager, 'NeutronManager').start()
- self.l3plugin = mock.Mock()
- self.manager.get_service_plugins.return_value = {
- 'L3_ROUTER_NAT': self.l3plugin
- }
self.plugin = self.manager.get_plugin()
- def _test_update_device_up(self, extensions, kwargs):
- with mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin'
- '._device_to_port_id'):
- type(self.l3plugin).supported_extension_aliases = (
- mock.PropertyMock(return_value=extensions))
- self.callbacks.update_device_up(mock.ANY, **kwargs)
-
- def test_update_device_up_without_dvr(self):
+ def _test_update_device_up(self):
kwargs = {
'agent_id': 'foo_agent',
'device': 'foo_device'
}
- self._test_update_device_up(['router'], kwargs)
- self.assertFalse(self.l3plugin.dvr_vmarp_table_update.call_count)
+ with mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin'
+ '._device_to_port_id'):
+ with mock.patch('neutron.callbacks.registry.notify') as notify:
+ self.callbacks.update_device_up(mock.Mock(), **kwargs)
+ return notify
- def test_update_device_up_with_dvr(self):
+ def test_update_device_up_notify(self):
+ notify = self._test_update_device_up()
kwargs = {
- 'agent_id': 'foo_agent',
- 'device': 'foo_device'
+ 'context': mock.ANY, 'port': mock.ANY, 'update_device_up': True
}
- self._test_update_device_up(['router', 'dvr'], kwargs)
- self.l3plugin.dvr_vmarp_table_update.assert_called_once_with(
- mock.ANY, mock.ANY, 'add')
+ notify.assert_called_once_with(
+ 'port', 'after_update', self.plugin, **kwargs)
- def test_update_device_up_with_dvr_when_port_not_found(self):
- kwargs = {
- 'agent_id': 'foo_agent',
- 'device': 'foo_device'
- }
- self.l3plugin.dvr_vmarp_table_update.side_effect = (
+ def test_update_device_up_notify_not_sent_with_port_not_found(self):
+ self.plugin._get_port.side_effect = (
exceptions.PortNotFound(port_id='foo_port_id'))
- self._test_update_device_up(['router', 'dvr'], kwargs)
- self.assertTrue(self.l3plugin.dvr_vmarp_table_update.call_count)
+ notify = self._test_update_device_up()
+ self.assertFalse(notify.call_count)
def test_get_device_details_without_port_context(self):
self.plugin.get_bound_port_context.return_value = None