]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Add L3 Scheduler Changes for Distributed Routers
authorMurali Birru <murali.birru@hp.com>
Tue, 22 Apr 2014 20:45:31 +0000 (13:45 -0700)
committerarmando-migliaccio <armamig@gmail.com>
Wed, 30 Jul 2014 06:10:17 +0000 (23:10 -0700)
This patch implements the L3 Scheduler changes for the
Distributed Virtual Routers.

Partially-implements: blueprint neutron-ovs-dvr

Change-Id: I407c3d639ebdf885b1418bceac7cfc251e7eba1f
Co-Authored-By: Carl Baldwin <carl.baldwin@hp.com>
Co-Authored-By: Armando Migliaccio <armamig@gmail.com>
17 files changed:
neutron/api/rpc/agentnotifiers/l3_rpc_agent_api.py
neutron/db/l3_agentschedulers_db.py
neutron/db/l3_db.py
neutron/db/l3_dvr_db.py
neutron/db/l3_dvrscheduler_db.py [new file with mode: 0644]
neutron/db/l3_gwmode_db.py
neutron/db/l3_rpc_base.py
neutron/db/migration/alembic_migrations/versions/5589aa32bf80_l3_dvr_scheduler.py [new file with mode: 0644]
neutron/db/migration/alembic_migrations/versions/HEAD
neutron/db/migration/models/head.py
neutron/extensions/l3agentscheduler.py
neutron/plugins/ml2/plugin.py
neutron/plugins/nec/nec_router.py
neutron/scheduler/l3_agent_scheduler.py
neutron/services/l3_router/l3_router_plugin.py
neutron/tests/unit/test_extension_ext_gw_mode.py
neutron/tests/unit/test_l3_schedulers.py

index 0515ba40f582eb7dfe5d7c759c76457ff953b93c..d38e69b44408394286a113038d29e322f2625726 100644 (file)
@@ -66,6 +66,32 @@ class L3AgentNotifyAPI(n_rpc.RpcProxy):
                     topic='%s.%s' % (l3_agent.topic, l3_agent.host),
                     version='1.1')
 
+    def _agent_notification_arp(self, context, method, router_id,
+                                operation, data):
+        """Notify arp details to l3 agents hosting router."""
+        if not router_id:
+            return
+        adminContext = (context.is_admin and
+                        context or context.elevated())
+        plugin = manager.NeutronManager.get_service_plugins().get(
+            service_constants.L3_ROUTER_NAT)
+        l3_agents = (plugin.
+                     get_l3_agents_hosting_routers(adminContext,
+                                                   [router_id],
+                                                   admin_state_up=True,
+                                                   active=True))
+        # TODO(murali): replace cast with fanout to avoid performance
+        # issues at greater scale.
+        for l3_agent in l3_agents:
+            topic = '%s.%s' % (l3_agent.topic, l3_agent.host)
+            LOG.debug('Casting message %(method)s with topic %(topic)s',
+                      {'topic': topic, 'method': method})
+            dvr_arptable = {'router_id': router_id,
+                            'arp_table': data}
+            self.cast(context,
+                      self.make_msg(method, payload=dvr_arptable),
+                      topic=topic, version='1.2')
+
     def _notification(self, context, method, router_ids, operation, data):
         """Notify all the agents that are hosting the routers."""
         plugin = manager.NeutronManager.get_service_plugins().get(
@@ -78,7 +104,7 @@ class L3AgentNotifyAPI(n_rpc.RpcProxy):
                 plugin, constants.L3_AGENT_SCHEDULER_EXT_ALIAS):
             adminContext = (context.is_admin and
                             context or context.elevated())
-            plugin.schedule_routers(adminContext, router_ids)
+            plugin.schedule_routers(adminContext, router_ids, hints=data)
             self._agent_notification(
                 context, method, router_ids, operation, data)
         else:
@@ -112,6 +138,14 @@ class L3AgentNotifyAPI(n_rpc.RpcProxy):
             self._notification(context, 'routers_updated', router_ids,
                                operation, data)
 
+    def add_arp_entry(self, context, router_id, arp_table, operation=None):
+        self._agent_notification_arp(context, 'add_arp_entry', router_id,
+                                     operation, arp_table)
+
+    def del_arp_entry(self, context, router_id, arp_table, operation=None):
+        self._agent_notification_arp(context, 'del_arp_entry', router_id,
+                                     operation, arp_table)
+
     def router_removed_from_agent(self, context, router_id, host):
         self._notification_host(context, 'router_removed_from_agent',
                                 {'router_id': router_id}, host)
index 4c64b7b86745c8c8ee21dde01dc510167cbc49cd..3d6927218ee3fd4c7dbda3754b5e9f89528e84a1 100644 (file)
@@ -25,7 +25,7 @@ from neutron.db import agents_db
 from neutron.db import agentschedulers_db
 from neutron.db import model_base
 from neutron.extensions import l3agentscheduler
-
+from neutron import manager
 
 L3_AGENTS_SCHEDULER_OPTS = [
     cfg.StrOpt('router_scheduler_driver',
@@ -62,21 +62,40 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
     def add_router_to_l3_agent(self, context, agent_id, router_id):
         """Add a l3 agent to host a router."""
         router = self.get_router(context, router_id)
+        distributed = router.get('distributed')
         with context.session.begin(subtransactions=True):
             agent_db = self._get_agent(context, agent_id)
+            agent_conf = self.get_configuration_dict(agent_db)
+            agent_mode = agent_conf.get('agent_mode', 'legacy')
+            if (not distributed and agent_mode == 'dvr' or
+                distributed and agent_mode == 'legacy'):
+                router_type = ('distributed' if distributed else 'centralized')
+                raise l3agentscheduler.RouterL3AgentMismatch(
+                    router_type=router_type, router_id=router_id,
+                    agent_mode=agent_mode, agent_id=agent_id)
             if (agent_db['agent_type'] != constants.AGENT_TYPE_L3 or
                 not agent_db['admin_state_up'] or
-                not self.get_l3_agent_candidates(router, [agent_db])):
+                not self.get_l3_agent_candidates(context,
+                                                 router,
+                                                 [agent_db])):
                 raise l3agentscheduler.InvalidL3Agent(id=agent_id)
             query = context.session.query(RouterL3AgentBinding)
-            try:
-                binding = query.filter_by(router_id=router_id).one()
+            if distributed:
+                binding = query.filter_by(router_id=router_id,
+                                          l3_agent_id=agent_id).first()
+                if binding:
+                    raise l3agentscheduler.RouterHostedByL3Agent(
+                        router_id=router_id,
+                        agent_id=binding.l3_agent_id)
+            else:
+                try:
+                    binding = query.filter_by(router_id=router_id).one()
 
-                raise l3agentscheduler.RouterHostedByL3Agent(
-                    router_id=router_id,
-                    agent_id=binding.l3_agent_id)
-            except exc.NoResultFound:
-                pass
+                    raise l3agentscheduler.RouterHostedByL3Agent(
+                        router_id=router_id,
+                        agent_id=binding.l3_agent_id)
+                except exc.NoResultFound:
+                    pass
 
             result = self.auto_schedule_routers(context,
                                                 agent_db.host,
@@ -238,7 +257,57 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
                 if agentschedulers_db.AgentSchedulerDbMixin.is_eligible_agent(
                     active, l3_agent)]
 
-    def get_l3_agent_candidates(self, sync_router, l3_agents):
+    def check_vmexists_on_l3agent(self, context, l3_agent, router_id,
+                                  subnet_id):
+        if not subnet_id:
+            return True
+
+        core_plugin = manager.NeutronManager.get_plugin()
+        filter = {'fixed_ips': {'subnet_id': [subnet_id]}}
+        ports = core_plugin.get_ports(context, filters=filter)
+        for port in ports:
+            if ("compute:" in port['device_owner'] and
+                l3_agent['host'] == port['binding:host_id']):
+                    return True
+
+        return False
+
+    def get_snat_candidates(self, sync_router, l3_agents):
+        """Get the valid snat enabled l3 agents for the distributed router."""
+        candidates = []
+        is_router_distributed = sync_router.get('distributed', False)
+        if not is_router_distributed:
+            return candidates
+        for l3_agent in l3_agents:
+            if not l3_agent.admin_state_up:
+                continue
+
+            agent_conf = self.get_configuration_dict(l3_agent)
+            agent_mode = agent_conf.get('agent_mode', 'legacy')
+            if agent_mode != 'dvr_snat':
+                continue
+
+            router_id = agent_conf.get('router_id', None)
+            use_namespaces = agent_conf.get('use_namespaces', True)
+            if not use_namespaces and router_id != sync_router['id']:
+                continue
+
+            handle_internal_only_routers = agent_conf.get(
+                'handle_internal_only_routers', True)
+            gateway_external_network_id = agent_conf.get(
+                'gateway_external_network_id', None)
+            ex_net_id = (sync_router['external_gateway_info'] or {}).get(
+                'network_id')
+            if ((not ex_net_id and not handle_internal_only_routers) or
+                (ex_net_id and gateway_external_network_id and
+                 ex_net_id != gateway_external_network_id)):
+                continue
+
+            candidates.append(l3_agent)
+        return candidates
+
+    def get_l3_agent_candidates(self, context, sync_router, l3_agents,
+                                subnet_id=None):
         """Get the valid l3 agents for the router from a list of l3_agents."""
         candidates = []
         for l3_agent in l3_agents:
@@ -251,6 +320,7 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
                 'handle_internal_only_routers', True)
             gateway_external_network_id = agent_conf.get(
                 'gateway_external_network_id', None)
+            agent_mode = agent_conf.get('agent_mode', 'legacy')
             if not use_namespaces and router_id != sync_router['id']:
                 continue
             ex_net_id = (sync_router['external_gateway_info'] or {}).get(
@@ -259,7 +329,13 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
                 (ex_net_id and gateway_external_network_id and
                  ex_net_id != gateway_external_network_id)):
                 continue
-            candidates.append(l3_agent)
+            is_router_distributed = sync_router.get('distributed', False)
+            if not is_router_distributed and agent_mode == 'legacy':
+                candidates.append(l3_agent)
+            elif (agent_mode.startswith('dvr') and
+                self.check_vmexists_on_l3agent(
+                    context, l3_agent, sync_router['id'], subnet_id)):
+                candidates.append(l3_agent)
         return candidates
 
     def auto_schedule_routers(self, context, host, router_ids):
@@ -267,15 +343,15 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
             return self.router_scheduler.auto_schedule_routers(
                 self, context, host, router_ids)
 
-    def schedule_router(self, context, router, candidates=None):
+    def schedule_router(self, context, router, candidates=None, hints=None):
         if self.router_scheduler:
             return self.router_scheduler.schedule(
-                self, context, router, candidates)
+                self, context, router, candidates=candidates, hints=hints)
 
-    def schedule_routers(self, context, routers):
+    def schedule_routers(self, context, routers, hints=None):
         """Schedule the routers to l3 agents."""
         for router in routers:
-            self.schedule_router(context, router)
+            self.schedule_router(context, router, candidates=None, hints=hints)
 
     def get_l3_agent_with_min_routers(self, context, agent_ids):
         """Return l3 agent with the least number of routers."""
index 80de421445856b1d0c2785cfe1ac0ee512d64aa2..203c998be398297275598c63acd1200ea56d87b0 100644 (file)
@@ -166,17 +166,20 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
         # l3 agent (associated with given external network);
         # do check before update in DB as an exception will be raised
         # in case no proper l3 agent found
-        candidates = None
         if gw_info != attributes.ATTR_NOT_SPECIFIED:
             candidates = self._check_router_needs_rescheduling(
                 context, id, gw_info)
+            payload = {'gw_exists': True}
+        else:
+            candidates = None
+            payload = {'gw_exists': False}
         router_db = self._update_router_db(context, id, r, gw_info)
         if candidates:
             l3_plugin = manager.NeutronManager.get_service_plugins().get(
                 constants.L3_ROUTER_NAT)
             l3_plugin.reschedule_router(context, id, candidates)
-
-        self.l3_rpc_notifier.routers_updated(context, [router_db['id']])
+        self.l3_rpc_notifier.routers_updated(context, [router_db['id']],
+                                             None, payload)
         return self._make_router_dict(router_db)
 
     def _check_router_needs_rescheduling(self, context, router_id, gw_info):
@@ -234,8 +237,9 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
             'id': router_id,
             'external_gateway_info': {'network_id': network_id}
         }
-        candidates = l3_plugin.get_l3_agent_candidates(
-            router, active_agents)
+        candidates = l3_plugin.get_l3_agent_candidates(context,
+                                                       router,
+                                                       active_agents)
         if not candidates:
             msg = (_('No eligible l3 agent associated with external network '
                      '%s found') % network_id)
@@ -468,7 +472,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
     def notify_router_interface_action(
             self, context, router_id, tenant_id, port_id, subnet_id, action):
         l3_method = '%s_router_interface' % action
-        self.l3_rpc_notifier.routers_updated(context, [router_id], l3_method)
+        self.l3_rpc_notifier.routers_updated(context, [router_id],
+            l3_method, {'subnet_id': subnet_id})
 
         mapping = {'add': 'create', 'remove': 'delete'}
         info = {
@@ -784,7 +789,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
         if router_id:
             self.l3_rpc_notifier.routers_updated(
                 context, [router_id],
-                'create_floatingip')
+                'create_floatingip', {})
         return self._make_floatingip_dict(floatingip_db)
 
     def update_floatingip(self, context, id, floatingip):
@@ -806,7 +811,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
             router_ids.append(router_id)
         if router_ids:
             self.l3_rpc_notifier.routers_updated(
-                context, router_ids, 'update_floatingip')
+                context, router_ids, 'update_floatingip', {})
         return self._make_floatingip_dict(floatingip_db)
 
     def update_floatingip_status(self, context, floatingip_id, status):
@@ -826,7 +831,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
         if router_id:
             self.l3_rpc_notifier.routers_updated(
                 context, [router_id],
-                'delete_floatingip')
+                'delete_floatingip', {})
 
     def get_floatingip(self, context, id, fields=None):
         floatingip = self._get_floatingip(context, id)
@@ -915,15 +920,13 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
         if router_ids:
             self.l3_rpc_notifier.routers_updated(
                 context, list(router_ids),
-                'disassociate_floatingips')
+                'disassociate_floatingips', {})
 
-    def _build_routers_list(self, routers, gw_ports):
-        gw_port_id_gw_port_dict = dict((gw_port['id'], gw_port)
-                                       for gw_port in gw_ports)
+    def _build_routers_list(self, context, routers, gw_ports):
         for router in routers:
             gw_port_id = router['gw_port_id']
             if gw_port_id:
-                router['gw_port'] = gw_port_id_gw_port_dict[gw_port_id]
+                router['gw_port'] = gw_ports[gw_port_id]
         return routers
 
     def _get_sync_routers(self, context, router_ids=None, active=None):
@@ -952,8 +955,10 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
                 gw_port_ids.append(gw_port_id)
         gw_ports = []
         if gw_port_ids:
-            gw_ports = self.get_sync_gw_ports(context, gw_port_ids)
-        return self._build_routers_list(router_dicts, gw_ports)
+            gw_ports = dict((gw_port['id'], gw_port)
+                            for gw_port in
+                            self.get_sync_gw_ports(context, gw_port_ids))
+        return self._build_routers_list(context, router_dicts, gw_ports)
 
     def _get_sync_floating_ips(self, context, router_ids):
         """Query floating_ips that relate to list of router_ids."""
index 5856cbbea139f0fbce19db8a87b8d1a30b58ec2c..259ccf77da46b6d2c760e7a10ab18e51b854646c 100644 (file)
@@ -19,7 +19,9 @@ from neutron.common import constants as l3_const
 from neutron.common import exceptions as n_exc
 from neutron.db import l3_attrs_db
 from neutron.db import l3_db
+from neutron.db import l3_dvrscheduler_db as l3_dvrsched_db
 from neutron.db import models_v2
+from neutron.extensions import l3
 from neutron.extensions import portbindings
 from neutron.openstack.common import log as logging
 
@@ -219,6 +221,32 @@ class L3_NAT_with_dvr_db_mixin(l3_db.L3_NAT_db_mixin,
             self._populate_subnet_for_ports(context, interfaces)
         return interfaces
 
+    def _build_routers_list(self, context, routers, gw_ports):
+        # Perform a single query up front for all routers
+        router_ids = [r['id'] for r in routers]
+        snat_binding = l3_dvrsched_db.CentralizedSnatL3AgentBinding
+        query = (context.session.query(snat_binding).
+                 filter(snat_binding.router_id.in_(router_ids))).all()
+        bindings = dict((b.router_id, b) for b in query)
+
+        for rtr in routers:
+            gw_port_id = rtr['gw_port_id']
+            if gw_port_id:
+                rtr['gw_port'] = gw_ports[gw_port_id]
+                if 'enable_snat' in rtr[l3.EXTERNAL_GW_INFO]:
+                    rtr['enable_snat'] = (
+                        rtr[l3.EXTERNAL_GW_INFO]['enable_snat'])
+
+                binding = bindings.get(rtr['id'])
+                if not binding:
+                    rtr['gw_port_host'] = None
+                    LOG.debug('No snat is bound to router %s', rtr['id'])
+                    continue
+
+                rtr['gw_port_host'] = binding.l3_agent.host
+
+        return routers
+
     def _process_routers(self, context, routers):
         routers_dict = {}
         for router in routers:
diff --git a/neutron/db/l3_dvrscheduler_db.py b/neutron/db/l3_dvrscheduler_db.py
new file mode 100644 (file)
index 0000000..276faed
--- /dev/null
@@ -0,0 +1,277 @@
+#    (c) Copyright 2014 Hewlett-Packard Development Company, L.P.
+#    All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import random
+
+import sqlalchemy as sa
+from sqlalchemy import orm
+from sqlalchemy.orm import exc
+
+from neutron.common import constants as q_const
+from neutron.db import agents_db
+from neutron.db import l3_agentschedulers_db as l3agent_sch_db
+from neutron.db import model_base
+from neutron.db import models_v2
+from neutron.openstack.common import log as logging
+from neutron.plugins.ml2 import db as ml2_db
+
+LOG = logging.getLogger(__name__)
+
+
+class CentralizedSnatL3AgentBinding(model_base.BASEV2):
+    """Represents binding between Neutron Centralized SNAT and L3 agents."""
+
+    __tablename__ = "csnat_l3_agent_bindings"
+
+    router_id = sa.Column(sa.String(36),
+                          sa.ForeignKey("routers.id", ondelete='CASCADE'),
+                          primary_key=True)
+    l3_agent_id = sa.Column(sa.String(36),
+                            sa.ForeignKey("agents.id", ondelete='CASCADE'),
+                            nullable=False)
+    host_id = sa.Column(sa.String(255))
+    csnat_gw_port_id = sa.Column(sa.String(36), sa.ForeignKey('ports.id'))
+
+    l3_agent = orm.relationship(agents_db.Agent)
+    csnat_gw_port = orm.relationship(models_v2.Port)
+
+
+class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
+    """Mixin class for L3 DVR scheduler."""
+
+    def dvr_update_router_addvm(self, context, port):
+        ips = port['fixed_ips']
+        for ip in ips:
+            subnet = ip['subnet_id']
+            filter_sub = {'fixed_ips': {'subnet_id': [subnet]},
+                          'device_owner':
+                          [q_const.DEVICE_OWNER_DVR_INTERFACE]}
+            router_id = None
+            ports = self._core_plugin.get_ports(context, filters=filter_sub)
+            for port in ports:
+                router_id = port['device_id']
+                router_dict = self.get_router(context, router_id)
+                if router_dict.get('distributed', False):
+                    payload = {'subnet_id': subnet}
+                    self.l3_rpc_notifier.routers_updated(
+                        context, [router_id], None, payload)
+                    break
+            LOG.debug('DVR: dvr_update_router_addvm %s ', router_id)
+
+    def get_dvr_routers_by_vmportid(self, context, port_id):
+        """Gets the dvr routers on vmport subnets."""
+        router_ids = set()
+        port_dict = self._core_plugin.get_port(context, port_id)
+        fixed_ips = port_dict['fixed_ips']
+        for fixedip in fixed_ips:
+            vm_subnet = fixedip['subnet_id']
+            filter_sub = {'fixed_ips': {'subnet_id': [vm_subnet]},
+                          'device_owner':
+                          [q_const.DEVICE_OWNER_DVR_INTERFACE]}
+            subnet_ports = self._core_plugin.get_ports(
+                context, filters=filter_sub)
+            for subnet_port in subnet_ports:
+                router_ids.add(subnet_port['device_id'])
+        return router_ids
+
+    def get_subnet_ids_on_router(self, context, router_id):
+        """Return subnet IDs for interfaces attached to the given router."""
+        subnet_ids = set()
+        filter_rtr = {'device_id': [router_id]}
+        int_ports = self._core_plugin.get_ports(context, filters=filter_rtr)
+        for int_port in int_ports:
+            int_ips = int_port['fixed_ips']
+            int_subnet = int_ips[0]['subnet_id']
+            subnet_ids.add(int_subnet)
+        return subnet_ids
+
+    def check_vm_exists_on_subnet(self, context, host, port_id, subnet_id):
+        """Check if there is any vm exists on the subnet_id."""
+        filter_sub = {'fixed_ips': {'subnet_id': [subnet_id]}}
+        ports = self._core_plugin.get_ports(context, filters=filter_sub)
+        for port in ports:
+            if ("compute:" in port['device_owner']
+                and port['status'] == 'ACTIVE'
+                and port['binding:host_id'] == host
+                and port['id'] != port_id):
+                LOG.debug('DVR: VM exists for subnet %(subnet_id)s on host '
+                          '%(host)s', {'subnet_id': subnet_id,
+                                       'host': host})
+                return True
+        return False
+
+    def delete_namespace_on_host(self, context, host, router_id):
+        """Delete the given router namespace on the host."""
+        agent = self._get_agent_by_type_and_host(
+            context, q_const.AGENT_TYPE_L3, host)
+        agent_id = str(agent.id)
+        with context.session.begin(subtransactions=True):
+            (context.session.query(l3agent_sch_db.RouterL3AgentBinding).
+             filter_by(router_id=router_id, l3_agent_id=agent_id).
+             delete(synchronize_session=False))
+        LOG.debug('Deleted router %(router_id)s on agent.id %(id)s',
+                  {'router_id': router_id,
+                   'id': agent.id})
+
+    def dvr_deletens_if_no_vm(self, context, port_id):
+        """Delete the DVR namespace if no VM exists."""
+        router_ids = self.get_dvr_routers_by_vmportid(context, port_id)
+        port_host = ml2_db.get_port_binding_host(port_id)
+        if not router_ids:
+            LOG.debug('No namespaces available for this DVR port %(port)s '
+                      'on host %(host)s', {'port': port_id,
+                                           'host': port_host})
+            return []
+        removed_router_info = []
+        for router_id in router_ids:
+            subnet_ids = self.get_subnet_ids_on_router(context, router_id)
+            vm_exists_on_subnet = False
+            for subnet in subnet_ids:
+                if self.check_vm_exists_on_subnet(context,
+                                                  port_host,
+                                                  port_id,
+                                                  subnet):
+                    vm_exists_on_subnet = True
+                    break
+
+            if vm_exists_on_subnet:
+                continue
+            filter_rtr = {'device_id': [router_id],
+                          'device_owner':
+                          [q_const.DEVICE_OWNER_DVR_INTERFACE]}
+            int_ports = self._core_plugin.get_ports(
+                context, filters=filter_rtr)
+            for prt in int_ports:
+                dvr_binding = (ml2_db.
+                               get_dvr_port_binding_by_host(context.session,
+                                                            prt['id'],
+                                                            port_host))
+                if dvr_binding:
+                    # unbind this port from router
+                    dvr_binding['router_id'] = None
+                    dvr_binding.update(dvr_binding)
+            self.delete_namespace_on_host(context, port_host, router_id)
+            info = {'router_id': router_id, 'host': port_host}
+            removed_router_info.append(info)
+            LOG.debug('Deleted router namespace %(router_id)s '
+                      'on host %(host)s', info)
+        return removed_router_info
+
+    def bind_snat_router(self, context, router_id, chosen_agent):
+        """Bind the router to the chosen l3 agent."""
+        with context.session.begin(subtransactions=True):
+            binding = CentralizedSnatL3AgentBinding()
+            binding.l3_agent = chosen_agent
+            binding.router_id = router_id
+            context.session.add(binding)
+            LOG.debug('SNAT Router %(router_id)s is scheduled to L3 agent '
+                      '%(agent_id)s', {'router_id': router_id,
+                                       'agent_id': chosen_agent.id})
+
+    def bind_dvr_router_servicenode(self, context, router_id,
+                                    chosen_snat_agent):
+        """Bind the IR router to service node if not already hosted."""
+        query = (context.session.query(l3agent_sch_db.RouterL3AgentBinding).
+                 filter_by(router_id=router_id))
+        for bind in query:
+            if bind.l3_agent_id == chosen_snat_agent.id:
+                LOG.debug('Distributed Router %(router_id)s already hosted '
+                          'on snat l3_agent %(snat_id)s',
+                          {'router_id': router_id,
+                           'snat_id': chosen_snat_agent.id})
+                return
+        with context.session.begin(subtransactions=True):
+            binding = l3agent_sch_db.RouterL3AgentBinding()
+            binding.l3_agent = chosen_snat_agent
+            binding.router_id = router_id
+            context.session.add(binding)
+            LOG.debug('Binding the distributed router %(router_id)s to '
+                      'the snat agent %(snat_id)s',
+                      {'router_id': router_id,
+                       'snat_id': chosen_snat_agent.id})
+
+    def bind_snat_servicenode(self, context, router_id, snat_candidates):
+        """Bind the snat router to the chosen l3 service agent."""
+        chosen_snat_agent = random.choice(snat_candidates)
+        self.bind_snat_router(context, router_id, chosen_snat_agent)
+
+    def unbind_snat_servicenode(self, context, router_id):
+        """Unbind the snat router to the chosen l3 service agent."""
+        vm_ports = []
+        with context.session.begin(subtransactions=True):
+            query = (context.session.
+                     query(CentralizedSnatL3AgentBinding).
+                     filter_by(router_id=router_id))
+            try:
+                binding = query.one()
+            except exc.NoResultFound:
+                LOG.debug('no snat router binding found for %s', router_id)
+                return
+
+            host = binding.l3_agent.host
+            subnet_ids = self.get_subnet_ids_on_router(context, router_id)
+            for subnet in subnet_ids:
+                vm_ports = (
+                    self.get_compute_ports_on_host_by_subnet(
+                        context, host, subnet))
+                if vm_ports:
+                    LOG.debug('VM exists on the snat enabled l3_agent '
+                              'host %(host)s and router_id %(router_id)s',
+                              {'host': host, 'router_id': router_id})
+                    break
+            agent_id = binding.l3_agent_id
+            LOG.debug('Delete binding of the SNAT router %(router_id)s '
+                      'from agent %(id)s', {'router_id': router_id,
+                                            'id': agent_id})
+            context.session.delete(binding)
+
+            if not vm_ports:
+                query = (context.session.
+                         query(l3agent_sch_db.RouterL3AgentBinding).
+                         filter_by(router_id=router_id,
+                                   l3_agent_id=agent_id).
+                         delete(synchronize_session=False))
+        self.l3_rpc_notifier.router_removed_from_agent(
+            context, router_id, host)
+        LOG.debug('Removed binding for router %(router_id)s and '
+                  'agent %(id)s', {'router_id': router_id, 'id': agent_id})
+
+    def schedule_snat_router(self, context, router_id, gw_exists):
+        """Schedule the snat router on l3 service agent."""
+        if gw_exists:
+            query = (context.session.
+                     query(CentralizedSnatL3AgentBinding).
+                     filter_by(router_id=router_id))
+            for bind in query:
+                agt_id = bind.l3_agent_id
+                LOG.debug('SNAT Router %(router_id)s has already been '
+                          'hosted by L3 agent '
+                          '%(agent_id)s', {'router_id': router_id,
+                                           'agent_id': agt_id})
+                self.bind_dvr_router_servicenode(context,
+                                                 router_id,
+                                                 bind.l3_agent)
+                return
+            active_l3_agents = self.get_l3_agents(context, active=True)
+            if not active_l3_agents:
+                LOG.warn(_('No active L3 agents'))
+                return
+            sync_router = self.get_router(context, router_id)
+            snat_candidates = self.get_snat_candidates(sync_router,
+                                                       active_l3_agents)
+            if snat_candidates:
+                self.bind_snat_servicenode(context, router_id, snat_candidates)
+        else:
+            self.unbind_snat_servicenode(context, router_id)
index 108ecc824b5c78c50c97d6d480de1d9c9b04d902..f4655c6c3a852faf736bdcdcb69d4977c1641d78 100644 (file)
@@ -62,14 +62,11 @@ class L3_NAT_db_mixin(l3_db.L3_NAT_db_mixin):
         # method is overriden in child classes
         return router
 
-    def _build_routers_list(self, routers, gw_ports):
-        gw_port_id_gw_port_dict = {}
-        for gw_port in gw_ports:
-            gw_port_id_gw_port_dict[gw_port['id']] = gw_port
+    def _build_routers_list(self, context, routers, gw_ports):
         for rtr in routers:
             gw_port_id = rtr['gw_port_id']
             if gw_port_id:
-                rtr['gw_port'] = gw_port_id_gw_port_dict[gw_port_id]
+                rtr['gw_port'] = gw_ports[gw_port_id]
                 # Add enable_snat key
                 rtr['enable_snat'] = rtr[EXTERNAL_GW_INFO]['enable_snat']
         return routers
index 7de4522622ba52a3003b50fd728ab3745a4970df..d0d8287f15a53853b492c19bb991d08d97930c4e 100644 (file)
@@ -69,9 +69,19 @@ class L3RpcCallbackMixin(object):
         for router in routers:
             LOG.debug(_("Checking router: %(id)s for host: %(host)s"),
                       {'id': router['id'], 'host': host})
-            self._ensure_host_set_on_port(context, plugin, host,
-                                          router.get('gw_port'),
-                                          router['id'])
+            if router.get('gw_port') and router.get('distributed'):
+                self._ensure_host_set_on_port(context, plugin,
+                                              router.get('gw_port_host'),
+                                              router.get('gw_port'),
+                                              router['id'])
+                for p in router.get(constants.SNAT_ROUTER_INTF_KEY, []):
+                    self._ensure_host_set_on_port(context, plugin,
+                                                  router.get('gw_port_host'),
+                                                  p, router['id'])
+            else:
+                self._ensure_host_set_on_port(context, plugin, host,
+                                              router.get('gw_port'),
+                                              router['id'])
             for interface in router.get(constants.INTERFACE_KEY, []):
                 self._ensure_host_set_on_port(context, plugin, host,
                                               interface, router['id'])
diff --git a/neutron/db/migration/alembic_migrations/versions/5589aa32bf80_l3_dvr_scheduler.py b/neutron/db/migration/alembic_migrations/versions/5589aa32bf80_l3_dvr_scheduler.py
new file mode 100644 (file)
index 0000000..a2e6673
--- /dev/null
@@ -0,0 +1,62 @@
+# Copyright 2014 OpenStack Foundation
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+
+"""L3 scheduler additions to support DVR
+
+Revision ID: 5589aa32bf80
+Revises: 31d7f831a591
+Create Date: 2014-07-7 11:00:43.392912
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '5589aa32bf80'
+down_revision = '31d7f831a591'
+
+migration_for_plugins = [
+    '*'
+]
+
+from alembic import op
+import sqlalchemy as sa
+
+from neutron.db import migration
+
+
+def upgrade(active_plugins=None, options=None):
+    if not migration.should_run(active_plugins, migration_for_plugins):
+        return
+
+    op.create_table(
+        'csnat_l3_agent_bindings',
+        sa.Column('router_id', sa.String(length=36), nullable=False),
+        sa.Column('l3_agent_id', sa.String(length=36), nullable=False),
+        sa.Column('host_id', sa.String(length=255), nullable=True),
+        sa.Column('csnat_gw_port_id', sa.String(length=36), nullable=True),
+        sa.ForeignKeyConstraint(['l3_agent_id'], ['agents.id'],
+                                ondelete='CASCADE'),
+        sa.ForeignKeyConstraint(['router_id'], ['routers.id'],
+                                ondelete='CASCADE'),
+        sa.ForeignKeyConstraint(['csnat_gw_port_id'], ['ports.id'],
+                                ondelete='CASCADE'),
+        sa.PrimaryKeyConstraint('router_id')
+    )
+
+
+def downgrade(active_plugins=None, options=None):
+    if not migration.should_run(active_plugins, migration_for_plugins):
+        return
+
+    op.drop_table('csnat_l3_agent_bindings')
index 2478aa43332d6eec4acb012daf56a45690958b2a..1383ffcb3a1d30bed62e0a5fb27a8ea36787a396 100644 (file)
@@ -1 +1 @@
-31d7f831a591
+5589aa32bf80
index 3eb7fa3de839c3bdb8d3dc62ff694406f87d31a2..1d82bca79b7a0d75013fe0e532b7f13708b7a95e 100644 (file)
@@ -32,6 +32,7 @@ from neutron.db.firewall import firewall_db  # noqa
 from neutron.db import l3_agentschedulers_db  # noqa
 from neutron.db import l3_attrs_db  # noqa
 from neutron.db import l3_db  # noqa
+from neutron.db import l3_dvrscheduler_db  # noqa
 from neutron.db import l3_gwmode_db  # noqa
 from neutron.db.loadbalancer import loadbalancer_db  # noqa
 from neutron.db.metering import metering_db  # noqa
index 8e110e10e9b4b8b35808d30f5c8c1e5cddf496b8..7f66a1a3d7da238804ba20baee30325976aee657 100644 (file)
@@ -177,6 +177,11 @@ class RouterNotHostedByL3Agent(exceptions.Conflict):
                 " by L3 agent %(agent_id)s.")
 
 
+class RouterL3AgentMismatch(exceptions.Conflict):
+    message = _("Cannot host %(router_type)s router %(router_id)s "
+                "on %(agent_mode)s L3 agent %(agent_id)s.")
+
+
 class L3AgentSchedulerPluginBase(object):
     """REST API to operate the l3 agent scheduler.
 
index 1c94b830c6098e694a7788efec531088866da960..4bc167810516ab389293a104ea51a276eb65f240 100644 (file)
@@ -28,6 +28,7 @@ from neutron.common import constants as const
 from neutron.common import exceptions as exc
 from neutron.common import rpc as n_rpc
 from neutron.common import topics
+from neutron.common import utils
 from neutron.db import agents_db
 from neutron.db import agentschedulers_db
 from neutron.db import allowedaddresspairs_db as addr_pair_db
@@ -214,7 +215,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
         # TODO(rkukura): Implement filtering.
         return nets
 
-    def _process_port_binding(self, mech_context, attrs):
+    def _process_port_binding(self, mech_context, context, attrs):
         binding = mech_context._binding
         port = mech_context.current
         changes = False
@@ -224,6 +225,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
             binding.host != host):
             binding.host = host
             changes = True
+            if "compute:" in port['device_owner']:
+                l3plugin = manager.NeutronManager.get_service_plugins().get(
+                    service_constants.L3_ROUTER_NAT)
+                if (utils.is_extension_supported(
+                    l3plugin, const.L3_DISTRIBUTED_EXT_ALIAS)):
+                    l3plugin.dvr_update_router_addvm(context, port)
 
         vnic_type = attrs and attrs.get(portbindings.VNIC_TYPE)
         if (attributes.is_attr_set(vnic_type) and
@@ -796,7 +803,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
             binding = db.add_port_binding(session, result['id'])
             mech_context = driver_context.PortContext(self, context, result,
                                                       network, binding)
-            self._process_port_binding(mech_context, attrs)
+            self._process_port_binding(mech_context, context, attrs)
+
             result[addr_pair.ADDRESS_PAIRS] = (
                 self._process_create_allowed_address_pairs(
                     context, result,
@@ -858,7 +866,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
                 self, context, updated_port, network, binding,
                 original_port=original_port)
             need_port_update_notify |= self._process_port_binding(
-                mech_context, attrs)
+                mech_context, context, attrs)
             self.mechanism_manager.update_port_precommit(mech_context)
 
         # TODO(apech) - handle errors raised by update_port, potentially
@@ -959,8 +967,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
 
     def delete_port(self, context, id, l3_port_check=True):
         LOG.debug(_("Deleting port %s"), id)
+        removed_routers = []
         l3plugin = manager.NeutronManager.get_service_plugins().get(
             service_constants.L3_ROUTER_NAT)
+        is_dvr_enabled = utils.is_extension_supported(
+            l3plugin, const.L3_DISTRIBUTED_EXT_ALIAS)
         if l3plugin and l3_port_check:
             l3plugin.prevent_l3_port_deletion(context, id)
 
@@ -990,11 +1001,16 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
             else:
                 mech_context = driver_context.PortContext(self, context, port,
                                                           network, binding)
+                if "compute:" in port['device_owner'] and is_dvr_enabled:
+                    router_info = l3plugin.dvr_deletens_if_no_vm(context, id)
+                    removed_routers += router_info
                 self.mechanism_manager.delete_port_precommit(mech_context)
                 self._delete_port_security_group_bindings(context, id)
             if l3plugin:
                 router_ids = l3plugin.disassociate_floatingips(
                     context, id, do_notify=False)
+                if is_dvr_enabled:
+                    l3plugin.dvr_vmarp_table_update(context, id, "del")
 
             LOG.debug("Calling delete_port for %(port_id)s owned by %(owner)s"
                       % {"port_id": id, "owner": port['device_owner']})
@@ -1003,6 +1019,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
         # now that we've left db transaction, we are safe to notify
         if l3plugin:
             l3plugin.notify_routers_updated(context, router_ids)
+            for router in removed_routers:
+                l3plugin.remove_router_from_l3_agent(
+                    context, router['host'], router['router_id'])
 
         try:
             # for both normal and DVR Interface ports, only one invocation of
index 18fb53909bc3d68037f838f9216c3a6b3228830e..99f407667344d92c576094dffbf428a0baf6a5e2 100644 (file)
@@ -271,11 +271,11 @@ class L3AgentSchedulerDbMixin(l3_agentschedulers_db.L3AgentSchedulerDbMixin):
         return super(L3AgentSchedulerDbMixin, self).auto_schedule_routers(
             context, host, router_ids)
 
-    def schedule_router(self, context, router):
+    def schedule_router(self, context, router, candidates=None, hints=None):
         if (self._get_provider_by_router_id(context, router) ==
             nconst.ROUTER_PROVIDER_L3AGENT):
             return super(L3AgentSchedulerDbMixin, self).schedule_router(
-                context, router)
+                context, router, candidates=candidates, hints=hints)
 
     def add_router_to_l3_agent(self, context, id, router_id):
         provider = self._get_provider_by_router_id(context, router_id)
index d6b4dc7d4ddb93e5f5c6fd841188e30a77f710cf..a85015d2af0bea59ca22286add6775394bc78283 100644 (file)
@@ -35,13 +35,23 @@ LOG = logging.getLogger(__name__)
 class L3Scheduler(object):
 
     @abc.abstractmethod
-    def schedule(self, plugin, context, router_id, candidates=None):
+    def schedule(self, plugin, context, router_id,
+                 candidates=None, hints=None):
         """Schedule the router to an active L3 agent.
 
         Schedule the router only if it is not already scheduled.
         """
         pass
 
+    def dvr_has_binding(self, context, router_id, l3_agent_id):
+        router_binding_model = l3_agentschedulers_db.RouterL3AgentBinding
+
+        query = context.session.query(router_binding_model)
+        query = query.filter(router_binding_model.router_id == router_id,
+                             router_binding_model.l3_agent_id == l3_agent_id)
+
+        return query.count() > 0
+
     def auto_schedule_routers(self, plugin, context, host, router_ids):
         """Schedule non-hosted routers to L3 Agent running on host.
 
@@ -69,18 +79,20 @@ class L3Scheduler(object):
                 LOG.warn(_('L3 agent %s is not active'), l3_agent.id)
             # check if each of the specified routers is hosted
             if router_ids:
-                unscheduled_router_ids = []
-                for router_id in router_ids:
+                routers = plugin.get_routers(
+                    context, filters={'id': router_ids})
+                unscheduled_routers = []
+                for router in routers:
                     l3_agents = plugin.get_l3_agents_hosting_routers(
-                        context, [router_id], admin_state_up=True)
-                    if l3_agents:
+                        context, [router['id']], admin_state_up=True)
+                    if l3_agents and not router.get('distributed', False):
                         LOG.debug(_('Router %(router_id)s has already been'
                                     ' hosted by L3 agent %(agent_id)s'),
-                                  {'router_id': router_id,
+                                  {'router_id': router['id'],
                                    'agent_id': l3_agents[0]['id']})
                     else:
-                        unscheduled_router_ids.append(router_id)
-                if not unscheduled_router_ids:
+                        unscheduled_routers.append(router)
+                if not unscheduled_routers:
                     # all (specified) routers are already scheduled
                     return False
             else:
@@ -95,27 +107,36 @@ class L3Scheduler(object):
                 if not unscheduled_router_ids:
                     LOG.debug(_('No non-hosted routers'))
                     return False
+                unscheduled_routers = plugin.get_routers(
+                    context, filters={'id': unscheduled_router_ids})
 
             # check if the configuration of l3 agent is compatible
             # with the router
-            routers = plugin.get_routers(
-                context, filters={'id': unscheduled_router_ids})
-            to_removed_ids = []
-            for router in routers:
-                candidates = plugin.get_l3_agent_candidates(router, [l3_agent])
+            to_removed_ids = set()
+            for router in unscheduled_routers:
+                candidates = plugin.get_l3_agent_candidates(context,
+                                                            router,
+                                                            [l3_agent])
                 if not candidates:
-                    to_removed_ids.append(router['id'])
-            router_ids = set([r['id'] for r in routers]) - set(to_removed_ids)
-            if not router_ids:
+                    to_removed_ids.add(router['id'])
+
+            target_routers = [r for r in unscheduled_routers
+                              if r['id'] not in to_removed_ids]
+            if not target_routers:
                 LOG.warn(_('No routers compatible with L3 agent configuration'
                            ' on host %s'), host)
                 return False
 
-            for router_id in router_ids:
-                self.bind_router(context, router_id, l3_agent)
+            for router_dict in target_routers:
+                if (router_dict.get('distributed', False)
+                    and self.dvr_has_binding(context,
+                                             router_dict['id'],
+                                             l3_agent.id)):
+                    continue
+                self.bind_router(context, router_dict['id'], l3_agent)
         return True
 
-    def get_candidates(self, plugin, context, sync_router):
+    def get_candidates(self, plugin, context, sync_router, subnet_id):
         """Return L3 agents where a router could be scheduled."""
         with context.session.begin(subtransactions=True):
             # allow one router is hosted by just
@@ -124,7 +145,7 @@ class L3Scheduler(object):
             # active any time
             l3_agents = plugin.get_l3_agents_hosting_routers(
                 context, [sync_router['id']], admin_state_up=True)
-            if l3_agents:
+            if l3_agents and not sync_router.get('distributed', False):
                 LOG.debug(_('Router %(router_id)s has already been hosted'
                             ' by L3 agent %(agent_id)s'),
                           {'router_id': sync_router['id'],
@@ -135,8 +156,16 @@ class L3Scheduler(object):
             if not active_l3_agents:
                 LOG.warn(_('No active L3 agents'))
                 return
-            candidates = plugin.get_l3_agent_candidates(sync_router,
-                                                        active_l3_agents)
+            new_l3agents = plugin.get_l3_agent_candidates(context,
+                                                          sync_router,
+                                                          active_l3_agents,
+                                                          subnet_id)
+            old_l3agentset = set(l3_agents)
+            if sync_router.get('distributed', False):
+                new_l3agentset = set(new_l3agents)
+                candidates = list(new_l3agentset - old_l3agentset)
+            else:
+                candidates = new_l3agents
             if not candidates:
                 LOG.warn(_('No L3 agents can host the router %s'),
                          sync_router['id'])
@@ -163,38 +192,56 @@ class L3Scheduler(object):
                   '%(agent_id)s', {'router_id': router_id,
                                    'agent_id': chosen_agent.id})
 
+    def _schedule_router(self, plugin, context, router_id,
+                         candidates=None, hints=None):
+        sync_router = plugin.get_router(context, router_id)
+        subnet_id = hints.get('subnet_id') if hints else None
+        candidates = candidates or self.get_candidates(
+            plugin, context, sync_router, subnet_id)
+        if (hints and 'gw_exists' in hints
+            and sync_router.get('distributed', False)):
+            plugin.schedule_snat_router(context, router_id, sync_router)
+        if not candidates:
+            return
+        if sync_router.get('distributed', False):
+            for chosen_agent in candidates:
+                self.bind_router(context, router_id, chosen_agent)
+        else:
+            chosen_agent = self._choose_router_agent(
+                plugin, context, candidates)
+            self.bind_router(context, router_id, chosen_agent)
+        return chosen_agent
+
+    @abc.abstractmethod
+    def _choose_router_agent(self, plugin, context, candidates):
+        """Choose an agent from candidates based on a specific policy."""
+        pass
+
 
 class ChanceScheduler(L3Scheduler):
     """Randomly allocate an L3 agent for a router."""
 
-    def schedule(self, plugin, context, router_id, candidates=None):
+    def schedule(self, plugin, context, router_id,
+                 candidates=None, hints=None):
         with context.session.begin(subtransactions=True):
-            sync_router = plugin.get_router(context, router_id)
-            candidates = candidates or self.get_candidates(
-                plugin, context, sync_router)
-            if not candidates:
-                return
+            return self._schedule_router(
+                plugin, context, router_id, candidates=candidates, hints=hints)
 
-            chosen_agent = random.choice(candidates)
-            self.bind_router(context, router_id, chosen_agent)
-            return chosen_agent
+    def _choose_router_agent(self, plugin, context, candidates):
+        return random.choice(candidates)
 
 
 class LeastRoutersScheduler(L3Scheduler):
     """Allocate to an L3 agent with the least number of routers bound."""
 
-    def schedule(self, plugin, context, router_id, candidates=None):
+    def schedule(self, plugin, context, router_id,
+                 candidates=None, hints=None):
         with context.session.begin(subtransactions=True):
-            sync_router = plugin.get_router(context, router_id)
-            candidates = candidates or self.get_candidates(
-                plugin, context, sync_router)
-            if not candidates:
-                return
-
-            candidate_ids = [candidate['id'] for candidate in candidates]
-            chosen_agent = plugin.get_l3_agent_with_min_routers(
-                context, candidate_ids)
-
-            self.bind_router(context, router_id, chosen_agent)
-
-            return chosen_agent
+            return self._schedule_router(
+                plugin, context, router_id, candidates=candidates, hints=hints)
+
+    def _choose_router_agent(self, plugin, context, candidates):
+        candidate_ids = [candidate['id'] for candidate in candidates]
+        chosen_agent = plugin.get_l3_agent_with_min_routers(
+            context, candidate_ids)
+        return chosen_agent
index 75be80db5187eb8a7c5b15e0e92246aa76e9e9af..61614d684a06d1f8445575effabafc6fb1c15354 100644 (file)
@@ -24,8 +24,8 @@ from neutron.common import topics
 from neutron.db import api as qdbapi
 from neutron.db import common_db_mixin
 from neutron.db import extraroute_db
-from neutron.db import l3_agentschedulers_db
 from neutron.db import l3_dvr_db
+from neutron.db import l3_dvrscheduler_db
 from neutron.db import l3_gwmode_db
 from neutron.db import l3_rpc_base
 from neutron.db import model_base
@@ -45,7 +45,7 @@ class L3RouterPlugin(common_db_mixin.CommonDbMixin,
                      extraroute_db.ExtraRoute_db_mixin,
                      l3_dvr_db.L3_NAT_with_dvr_db_mixin,
                      l3_gwmode_db.L3_NAT_db_mixin,
-                     l3_agentschedulers_db.L3AgentSchedulerDbMixin):
+                     l3_dvrscheduler_db.L3_DVRsch_db_mixin):
 
     """Implementation of the Neutron L3 Router Service Plugin.
 
index cabb1428e286c4a0834b709a90784597ffd341f9..48c5aec635acb31879e2ac93ee5ff8f891f5df02 100644 (file)
@@ -200,6 +200,10 @@ class TestL3GwModeMixin(base.BaseTestCase):
         self.fip_request = {'port_id': FAKE_FIP_INT_PORT_ID,
                             'tenant_id': self.tenant_id}
 
+    def _get_gwports_dict(self, gw_ports):
+        return dict((gw_port['id'], gw_port)
+                    for gw_port in gw_ports)
+
     def _reset_ext_gw(self):
         # Reset external gateway
         self.router.gw_port_id = None
@@ -253,7 +257,9 @@ class TestL3GwModeMixin(base.BaseTestCase):
     def test_build_routers_list_no_ext_gw(self):
         self._reset_ext_gw()
         router_dict = self.target_object._make_router_dict(self.router)
-        routers = self.target_object._build_routers_list([router_dict], [])
+        routers = self.target_object._build_routers_list(self.context,
+                                                         [router_dict],
+                                                         [])
         self.assertEqual(1, len(routers))
         router = routers[0]
         self.assertIsNone(router.get('gw_port'))
@@ -262,7 +268,8 @@ class TestL3GwModeMixin(base.BaseTestCase):
     def test_build_routers_list_with_ext_gw(self):
         router_dict = self.target_object._make_router_dict(self.router)
         routers = self.target_object._build_routers_list(
-            [router_dict], [self.router.gw_port])
+            self.context, [router_dict],
+            self._get_gwports_dict([self.router.gw_port]))
         self.assertEqual(1, len(routers))
         router = routers[0]
         self.assertIsNotNone(router.get('gw_port'))
@@ -273,7 +280,8 @@ class TestL3GwModeMixin(base.BaseTestCase):
         self.router.enable_snat = False
         router_dict = self.target_object._make_router_dict(self.router)
         routers = self.target_object._build_routers_list(
-            [router_dict], [self.router.gw_port])
+            self.context, [router_dict],
+            self._get_gwports_dict([self.router.gw_port]))
         self.assertEqual(1, len(routers))
         router = routers[0]
         self.assertIsNotNone(router.get('gw_port'))
index 5e619db8d8632ec5cc3b6605b4b1a4c4e01fea40..b794163b64d1f6e5803dc98bbec2425d830789ec 100644 (file)
@@ -28,10 +28,13 @@ from neutron.common import topics
 from neutron import context as q_context
 from neutron.db import agents_db
 from neutron.db import l3_agentschedulers_db
+from neutron.db import l3_db
+from neutron.db import l3_dvrscheduler_db
 from neutron.extensions import l3 as ext_l3
 from neutron import manager
 from neutron.openstack.common import timeutils
 from neutron.scheduler import l3_agent_scheduler
+from neutron.tests import base
 from neutron.tests.unit import test_db_plugin
 from neutron.tests.unit import test_l3_plugin
 
@@ -239,3 +242,142 @@ class L3AgentLeastRoutersSchedulerTestCase(L3SchedulerTestCase):
                         agent_id3 = agents[0]['id']
 
                         self.assertNotEqual(agent_id1, agent_id3)
+
+
+class L3DvrScheduler(l3_db.L3_NAT_db_mixin,
+                     l3_dvrscheduler_db.L3_DVRsch_db_mixin):
+    pass
+
+
+class L3DvrSchedulerTestCase(base.BaseTestCase):
+
+    def setUp(self):
+        plugin = 'neutron.plugins.ml2.plugin.Ml2Plugin'
+        self.setup_coreplugin(plugin)
+        super(L3DvrSchedulerTestCase, self).setUp()
+        self.adminContext = q_context.get_admin_context()
+        self.dut = L3DvrScheduler()
+
+    def test_dvr_update_router_addvm(self):
+        port = {
+                'device_id': 'abcd',
+                'device_owner': 'compute:nova',
+                'fixed_ips': [
+                    {
+                        'subnet_id': '80947d4a-fbc8-484b-9f92-623a6bfcf3e0',
+                        'ip_address': '10.10.10.3'
+                    }
+                ]
+        }
+        dvr_port = {
+                'id': 'dvr_port1',
+                'device_id': 'r1',
+                'device_owner': 'network:router_interface_distributed',
+                'fixed_ips': [
+                    {
+                        'subnet_id': '80947d4a-fbc8-484b-9f92-623a6bfcf3e0',
+                        'ip_address': '10.10.10.1'
+                    }
+                ]
+        }
+        r1 = {
+              'id': 'r1',
+              'distributed': True,
+        }
+
+        with contextlib.nested(
+            mock.patch('neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
+                       '.get_ports', return_value=[dvr_port]),
+            mock.patch('neutron.manager.NeutronManager.get_service_plugins',
+                       return_value=mock.Mock()),
+            mock.patch('neutron.db.l3_db.L3_NAT_db_mixin.get_router',
+                       return_value=r1),
+            mock.patch('neutron.api.rpc.agentnotifiers.l3_rpc_agent_api'
+                       '.L3AgentNotifyAPI')):
+            self.dut.dvr_update_router_addvm(self.adminContext, port)
+
+    def test_get_dvr_routers_by_vmportid(self):
+        dvr_port = {
+                'id': 'dvr_port1',
+                'device_id': 'r1',
+                'device_owner': 'network:router_interface_distributed',
+                'fixed_ips': [
+                    {
+                        'subnet_id': '80947d4a-fbc8-484b-9f92-623a6bfcf3e0',
+                        'ip_address': '10.10.10.1'
+                    }
+                ]
+        }
+        r1 = {
+              'id': 'r1',
+              'distributed': True,
+        }
+
+        with contextlib.nested(
+            mock.patch('neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
+                       '.get_port', return_value=dvr_port),
+            mock.patch('neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
+                       '.get_ports', return_value=[dvr_port])):
+            router_id = self.dut.get_dvr_routers_by_vmportid(self.adminContext,
+                                                             dvr_port['id'])
+            self.assertEqual(router_id.pop(), r1['id'])
+
+    def test_get_subnet_ids_on_router(self):
+        dvr_port = {
+                'id': 'dvr_port1',
+                'device_id': 'r1',
+                'device_owner': 'network:router_interface_distributed',
+                'fixed_ips': [
+                    {
+                        'subnet_id': '80947d4a-fbc8-484b-9f92-623a6bfcf3e0',
+                        'ip_address': '10.10.10.1'
+                    }
+                ]
+        }
+        r1 = {
+              'id': 'r1',
+              'distributed': True,
+        }
+
+        with contextlib.nested(
+            mock.patch('neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
+                       '.get_ports', return_value=[dvr_port])):
+            sub_ids = self.dut.get_subnet_ids_on_router(self.adminContext,
+                                                        r1['id'])
+            self.assertEqual(sub_ids.pop(),
+                            dvr_port.get('fixed_ips').pop(0).get('subnet_id'))
+
+    def test_check_vm_exists_on_subnet(self):
+        dvr_port = {
+                'id': 'dvr_port1',
+                'device_id': 'r1',
+                'status': 'ACTIVE',
+                'binding:host_id': 'thisHost',
+                'device_owner': 'compute:nova',
+                'fixed_ips': [
+                    {
+                        'subnet_id': '80947d4a-fbc8-484b-9f92-623a6bfcf3e0',
+                        'ip_address': '10.10.10.1'
+                    }
+                ]
+        }
+        r1 = {
+              'id': 'r1',
+              'distributed': True,
+        }
+        with contextlib.nested(
+            mock.patch('neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
+                       '.get_ports', return_value=[dvr_port]),
+            mock.patch('neutron.manager.NeutronManager.get_service_plugins',
+                       return_value=mock.Mock()),
+            mock.patch('neutron.db.l3_db.L3_NAT_db_mixin.get_router',
+                       return_value=r1),
+            mock.patch('neutron.api.rpc.agentnotifiers.l3_rpc_agent_api'
+                       '.L3AgentNotifyAPI')):
+            sub_ids = self.dut.get_subnet_ids_on_router(self.adminContext,
+                                                        r1['id'])
+            result = self.dut.check_vm_exists_on_subnet(
+                                                    self.adminContext,
+                                                    'thisHost', 'dvr_port1',
+                                                    sub_ids)
+            self.assertFalse(result)