from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils
-from neutron.db import agentschedulers_db
from neutron import manager
from neutron.plugins.common import constants as service_constants
adminContext = context if context.is_admin else context.elevated()
plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)
- state = agentschedulers_db.get_admin_state_up_filter()
for router_id in router_ids:
- l3_agents = plugin.get_l3_agents_hosting_routers(
- adminContext, [router_id],
- admin_state_up=state,
- active=True)
+ hosts = plugin.get_hosts_to_notify(adminContext, router_id)
if shuffle_agents:
- random.shuffle(l3_agents)
- for l3_agent in l3_agents:
+ random.shuffle(hosts)
+ for host in hosts:
LOG.debug('Notify agent at %(topic)s.%(host)s the message '
'%(method)s',
- {'topic': l3_agent.topic,
- 'host': l3_agent.host,
+ {'topic': topics.L3_AGENT,
+ 'host': host,
'method': method})
- cctxt = self.client.prepare(topic=l3_agent.topic,
- server=l3_agent.host,
+ cctxt = self.client.prepare(topic=topics.L3_AGENT,
+ server=host,
version='1.1')
cctxt.cast(context, method, routers=[router_id])
context or context.elevated())
plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)
- state = agentschedulers_db.get_admin_state_up_filter()
- l3_agents = (plugin.
- get_l3_agents_hosting_routers(adminContext,
- [router_id],
- admin_state_up=state,
- active=True))
+ hosts = plugin.get_hosts_to_notify(adminContext, router_id)
# TODO(murali): replace cast with fanout to avoid performance
# issues at greater scale.
- for l3_agent in l3_agents:
- log_topic = '%s.%s' % (l3_agent.topic, l3_agent.host)
+ for host in hosts:
+ log_topic = '%s.%s' % (topics.L3_AGENT, host)
LOG.debug('Casting message %(method)s with topic %(topic)s',
{'topic': log_topic, 'method': method})
dvr_arptable = {'router_id': router_id,
'arp_table': data}
- cctxt = self.client.prepare(topic=l3_agent.topic,
- server=l3_agent.host,
+ cctxt = self.client.prepare(topic=topics.L3_AGENT,
+ server=host,
version='1.2')
cctxt.cast(context, method, payload=dvr_arptable)
res = query.filter(agents_db.Agent.id.in_(agent_ids)).first()
return res[0]
+ def get_hosts_to_notify(self, context, router_id):
+ """Returns all hosts to send notification about router update"""
+ state = agentschedulers_db.get_admin_state_up_filter()
+ agents = self.get_l3_agents_hosting_routers(
+ context, [router_id], admin_state_up=state, active=True)
+ return [a.host for a in agents]
+
class AZL3AgentSchedulerDbMixin(L3AgentSchedulerDbMixin,
router_az.RouterAvailabilityZonePluginBase):
from oslo_db import exception as db_exc
from oslo_log import log as logging
import sqlalchemy as sa
+from sqlalchemy import or_
from sqlalchemy import orm
from sqlalchemy.orm import joinedload
from neutron.common import constants as n_const
from neutron.common import utils as n_utils
from neutron.db import agents_db
+from neutron.db import agentschedulers_db
from neutron.db import l3_agentschedulers_db as l3agent_sch_db
from neutron.db import model_base
from neutron.db import models_v2
from neutron import manager
from neutron.plugins.common import constants as service_constants
from neutron.plugins.ml2 import db as ml2_db
+from neutron.plugins.ml2 import models as ml2_models
LOG = logging.getLogger(__name__)
ips = port['fixed_ips']
router_ids = self.get_dvr_routers_by_portid(context, port['id'], ips)
+ if not router_ids:
+ return
+
for router_id in router_ids:
if not self.check_l3_agent_router_binding(
context, router_id, l3_agent_on_host['id']):
self).remove_router_from_l3_agent(
context, agent_id, router_id)
+ def get_hosts_to_notify(self, context, router_id):
+ """Returns all hosts to send notification about router update"""
+ hosts = super(L3_DVRsch_db_mixin, self).get_hosts_to_notify(
+ context, router_id)
+ router = self.get_router(context, router_id)
+ if router.get('distributed', False):
+ dvr_hosts = self._get_dvr_hosts_for_router(context, router_id)
+ dvr_hosts = set(dvr_hosts) - set(hosts)
+ state = agentschedulers_db.get_admin_state_up_filter()
+ agents = self.get_l3_agents(context, active=state,
+ filters={'host': dvr_hosts})
+ hosts += [a.host for a in agents]
+
+ return hosts
+
+ def _get_dvr_hosts_for_router(self, context, router_id):
+ """Get a list of hosts where specified DVR router should be hosted
+
+ It will first get IDs of all subnets connected to the router and then
+ get a set of hosts where all dvr serviceable ports on those subnets
+ are bound
+ """
+ subnet_ids = self.get_subnet_ids_on_router(context, router_id)
+ Binding = ml2_models.PortBinding
+ Port = models_v2.Port
+ IPAllocation = models_v2.IPAllocation
+
+ query = context.session.query(Binding.host).distinct()
+ query = query.join(Binding.port)
+ query = query.join(Port.fixed_ips)
+ query = query.filter(IPAllocation.subnet_id.in_(subnet_ids))
+ owner_filter = or_(
+ Port.device_owner.startswith(n_const.DEVICE_OWNER_COMPUTE_PREFIX),
+ Port.device_owner.in_(
+ n_utils.get_other_dvr_serviced_device_owners()))
+ query = query.filter(owner_filter)
+ hosts = [item[0] for item in query]
+ LOG.debug('Hosts for router %s: %s', router_id, hosts)
+ return hosts
+
def _notify_l3_agent_new_port(resource, event, trigger, **kwargs):
LOG.debug('Received %(resource)s %(event)s', {
from neutron.api.v2 import attributes
from neutron.common import constants
+from neutron.common import topics
from neutron import context
from neutron.extensions import external_net
from neutron.extensions import portbindings
def test_admin_router_remove_from_agent_on_vm_port_deletion(self):
self._test_router_remove_from_agent_on_vm_port_deletion(
non_admin_port=True)
+
+ def test_dvr_router_notifications(self):
+ """Check that notifications go to the right hosts in different
+ conditions
+ """
+ # register l3 agents in dvr mode in addition to existing dvr_snat agent
+ HOST1, HOST2, HOST3 = 'host1', 'host2', 'host3'
+ for host in [HOST1, HOST2, HOST3]:
+ helpers.register_l3_agent(
+ host=host, agent_mode=constants.L3_AGENT_MODE_DVR)
+
+ router = self._create_router()
+ arg_list = (portbindings.HOST_ID,)
+ with self.subnet() as ext_subnet,\
+ self.subnet(cidr='20.0.0.0/24') as subnet1,\
+ self.subnet(cidr='30.0.0.0/24') as subnet2,\
+ self.subnet(cidr='40.0.0.0/24') as subnet3,\
+ self.port(subnet=subnet1,
+ device_owner=DEVICE_OWNER_COMPUTE,
+ arg_list=arg_list,
+ **{portbindings.HOST_ID: HOST1}),\
+ self.port(subnet=subnet2,
+ device_owner=constants.DEVICE_OWNER_DHCP,
+ arg_list=arg_list,
+ **{portbindings.HOST_ID: HOST2}),\
+ self.port(subnet=subnet3,
+ device_owner=constants.DEVICE_OWNER_NEUTRON_PREFIX,
+ arg_list=arg_list,
+ **{portbindings.HOST_ID: HOST3}):
+ # make net external
+ ext_net_id = ext_subnet['subnet']['network_id']
+ self._update('networks', ext_net_id,
+ {'network': {external_net.EXTERNAL: True}})
+
+ with mock.patch.object(self.l3_plugin.l3_rpc_notifier.client,
+ 'prepare') as mock_prepare:
+ # add external gateway to router
+ self.l3_plugin.update_router(
+ self.context, router['id'],
+ {'router': {
+ 'external_gateway_info': {'network_id': ext_net_id}}})
+ # router has no interfaces so notification goes
+ # to only dvr_snat agent
+ mock_prepare.assert_called_once_with(
+ server=self.l3_agent['host'],
+ topic=topics.L3_AGENT,
+ version='1.1')
+
+ mock_prepare.reset_mock()
+ self.l3_plugin.add_router_interface(
+ self.context, router['id'],
+ {'subnet_id': subnet1['subnet']['id']})
+ self.assertEqual(2, mock_prepare.call_count)
+ expected = [mock.call(server=self.l3_agent['host'],
+ topic=topics.L3_AGENT,
+ version='1.1'),
+ mock.call(server=HOST1,
+ topic=topics.L3_AGENT,
+ version='1.1')]
+ mock_prepare.assert_has_calls(expected, any_order=True)
+
+ mock_prepare.reset_mock()
+ self.l3_plugin.add_router_interface(
+ self.context, router['id'],
+ {'subnet_id': subnet2['subnet']['id']})
+ self.assertEqual(3, mock_prepare.call_count)
+ expected = [mock.call(server=self.l3_agent['host'],
+ topic=topics.L3_AGENT,
+ version='1.1'),
+ mock.call(server=HOST1,
+ topic=topics.L3_AGENT,
+ version='1.1'),
+ mock.call(server=HOST2,
+ topic=topics.L3_AGENT,
+ version='1.1')]
+ mock_prepare.assert_has_calls(expected, any_order=True)
+
+ mock_prepare.reset_mock()
+ self.l3_plugin.add_router_interface(
+ self.context, router['id'],
+ {'subnet_id': subnet3['subnet']['id']})
+ # there are no dvr serviceable ports on HOST3, so notification
+ # goes to the same hosts
+ self.assertEqual(3, mock_prepare.call_count)
+ expected = [mock.call(server=self.l3_agent['host'],
+ topic=topics.L3_AGENT,
+ version='1.1'),
+ mock.call(server=HOST1,
+ topic=topics.L3_AGENT,
+ version='1.1'),
+ mock.call(server=HOST2,
+ topic=topics.L3_AGENT,
+ version='1.1')]
+ mock_prepare.assert_has_calls(expected, any_order=True)