]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
l3 agent rpc
authorgongysh <gongysh@cn.ibm.com>
Mon, 12 Nov 2012 12:28:16 +0000 (20:28 +0800)
committergongysh <gongysh@cn.ibm.com>
Tue, 4 Dec 2012 08:46:44 +0000 (16:46 +0800)
On one hand, we sync router data (including routers,
their gw ports, interfaces and floatingips) from l3_agent
to quantum server periodically if needed.

On the other hand, we notify l3 agent from quantum server when
we delete or update a router's stuff, such as floating IP,
interface and gwport and router itself.

blueprint rpc-for-l3-agent
bug #1080286

Change-Id: I60f3081975fc7164b22f9e9fa941e702a3f4c663

20 files changed:
etc/l3_agent.ini
openstack-common.conf
quantum/agent/l3_agent.py
quantum/api/v2/base.py
quantum/common/config.py
quantum/common/constants.py
quantum/common/exceptions.py
quantum/common/topics.py
quantum/common/utils.py
quantum/db/l3_db.py
quantum/db/l3_rpc_agent_api.py [new file with mode: 0644]
quantum/db/l3_rpc_base.py [new file with mode: 0644]
quantum/manager.py
quantum/openstack/common/periodic_task.py [new file with mode: 0644]
quantum/plugins/linuxbridge/lb_quantum_plugin.py
quantum/plugins/openvswitch/ovs_quantum_plugin.py
quantum/service.py
quantum/tests/unit/test_db_plugin.py
quantum/tests/unit/test_l3_agent.py
quantum/tests/unit/test_l3_plugin.py

index 7be4853b42eb7d39edad6e6edac79136f85a5e1b..84149cd6a6a001c4cc68df707cec889e0a33eb3b 100644 (file)
@@ -10,13 +10,6 @@ interface_driver = quantum.agent.linux.interface.OVSInterfaceDriver
 # LinuxBridge
 #interface_driver = quantum.agent.linux.interface.BridgeInterfaceDriver
 
-# The Quantum user information for accessing the Quantum API.
-auth_url = http://localhost:35357/v2.0
-auth_region = RegionOne
-admin_tenant_name = %SERVICE_TENANT_NAME%
-admin_user = %SERVICE_USER%
-admin_password = %SERVICE_PASSWORD%
-
 # Use "sudo quantum-rootwrap /etc/quantum/rootwrap.conf" to use the real
 # root filter facility.
 # Change to "sudo" to skip the filtering and just run the comand directly
@@ -54,9 +47,13 @@ root_helper = sudo
 # TCP Port used by Quantum metadata server
 # metadata_port = 9697
 
-# The time in seconds between state poll requests
-# polling_interval = 3
-
 # Send this many gratuitous ARPs for HA setup. Set it below or equal to 0
 # to disable this feature.
 # send_arp_for_ha = 3
+
+# seconds between re-sync routers' data if needed
+# periodic_interval = 40
+
+# seconds to start to sync routers' data after
+# starting agent
+# periodic_fuzzy_delay = 5
index 7bb3a365629d97b7050e65c9c7f58c3a13389314..00364b7bde3b1107e21e050ee577a5d6a744824e 100644 (file)
@@ -1,5 +1,5 @@
 [DEFAULT]
 # The list of modules to copy from openstack-common
-modules=cfg,context,eventlet_backdoor,exception,excutils,fileutils,gettextutils,importutils,iniparser,jsonutils,local,lockutils,log,loopingcall,network_utils,notifier,policy,rpc,service,setup,threadgroup,timeutils,uuidutils,version
+modules=cfg,context,eventlet_backdoor,exception,excutils,fileutils,gettextutils,importutils,iniparser,jsonutils,local,lockutils,log,loopingcall,network_utils,notifier,periodic_task,policy,rpc,service,setup,threadgroup,timeutils,uuidutils,version
 # The base module to hold the copy of openstack.common
 base=quantum
index 1eb32a80387e893f7d141399a3b1a91ff4b72076..4a54d569c142515eeed03da9d03840f9f7ff4656 100644 (file)
@@ -20,8 +20,9 @@
 """
 
 import sys
-import time
 
+import eventlet
+from eventlet import semaphore
 import netaddr
 
 from quantum.agent.common import config
@@ -30,11 +31,19 @@ from quantum.agent.linux import interface
 from quantum.agent.linux import ip_lib
 from quantum.agent.linux import iptables_manager
 from quantum.agent.linux import utils
-from quantum.db import l3_db
+from quantum.common import constants as l3_constants
+from quantum.common import topics
+from quantum import context
+from quantum import manager
 from quantum.openstack.common import cfg
 from quantum.openstack.common import importutils
 from quantum.openstack.common import log as logging
-from quantumclient.v2_0 import client
+from quantum.openstack.common import periodic_task
+from quantum.openstack.common.rpc import common as rpc_common
+from quantum.openstack.common.rpc import proxy
+from quantum.openstack.common import service
+from quantum import service as quantum_service
+
 
 LOG = logging.getLogger(__name__)
 NS_PREFIX = 'qrouter-'
@@ -42,16 +51,53 @@ INTERNAL_DEV_PREFIX = 'qr-'
 EXTERNAL_DEV_PREFIX = 'qg-'
 
 
+class L3PluginApi(proxy.RpcProxy):
+    """Agent side of the l3 agent RPC API.
+
+    API version history:
+        1.0 - Initial version.
+
+    """
+
+    BASE_RPC_API_VERSION = '1.0'
+
+    def __init__(self, topic, host):
+        super(L3PluginApi, self).__init__(
+            topic=topic, default_version=self.BASE_RPC_API_VERSION)
+        self.host = host
+
+    def get_routers(self, context, fullsync=True, router_id=None):
+        """Make a remote process call to retrieve the sync data for routers."""
+        router_ids = [router_id] if router_id else None
+        return self.call(context,
+                         self.make_msg('sync_routers', host=self.host,
+                                       fullsync=fullsync,
+                                       router_ids=router_ids),
+                         topic=self.topic)
+
+    def get_external_network_id(self, context):
+        """Make a remote process call to retrieve the external network id.
+
+        @raise common.RemoteError: with TooManyExternalNetworks
+                                   as exc_type if there are
+                                   more than one external network
+        """
+        return self.call(context,
+                         self.make_msg('get_external_network_id',
+                                       host=self.host),
+                         topic=self.topic)
+
+
 class RouterInfo(object):
 
-    def __init__(self, router_id, root_helper, use_namespaces):
+    def __init__(self, router_id, root_helper, use_namespaces, router=None):
         self.router_id = router_id
         self.ex_gw_port = None
         self.internal_ports = []
         self.floating_ips = []
         self.root_helper = root_helper
         self.use_namespaces = use_namespaces
-
+        self.router = router
         self.iptables_manager = iptables_manager.IptablesManager(
             root_helper=root_helper,
             #FIXME(danwent): use_ipv6=True,
@@ -62,23 +108,14 @@ class RouterInfo(object):
             return NS_PREFIX + self.router_id
 
 
-class L3NATAgent(object):
+class L3NATAgent(manager.Manager):
 
     OPTS = [
-        cfg.StrOpt('admin_user'),
-        cfg.StrOpt('admin_password'),
-        cfg.StrOpt('admin_tenant_name'),
-        cfg.StrOpt('auth_url'),
-        cfg.StrOpt('auth_strategy', default='keystone'),
-        cfg.StrOpt('auth_region'),
         cfg.StrOpt('root_helper', default='sudo'),
         cfg.StrOpt('external_network_bridge', default='br-ex',
                    help="Name of bridge used for external network traffic."),
         cfg.StrOpt('interface_driver',
                    help="The driver used to manage the virtual interface."),
-        cfg.IntOpt('polling_interval',
-                   default=3,
-                   help="The time in seconds between state poll requests."),
         cfg.IntOpt('metadata_port',
                    default=9697,
                    help="TCP Port used by Quantum metadata namespace proxy."),
@@ -97,36 +134,33 @@ class L3NATAgent(object):
         cfg.StrOpt('gateway_external_network_id', default='',
                    help="UUID of external network for routers implemented "
                         "by the agents."),
+        cfg.StrOpt('l3_agent_manager',
+                   default='quantum.agent.l3_agent.L3NATAgent'),
     ]
 
-    def __init__(self, conf):
-        self.conf = conf
+    def __init__(self, host, conf=None):
+        if conf:
+            self.conf = conf
+        else:
+            self.conf = cfg.CONF
         self.router_info = {}
 
-        if not conf.interface_driver:
-            LOG.error(_('You must specify an interface driver'))
+        if not self.conf.interface_driver:
+            LOG.error(_('An interface driver must be specified'))
             sys.exit(1)
         try:
-            self.driver = importutils.import_object(conf.interface_driver,
-                                                    conf)
+            self.driver = importutils.import_object(self.conf.interface_driver,
+                                                    self.conf)
         except:
-            LOG.exception(_("Error importing interface driver '%s'"),
-                          conf.interface_driver)
+            LOG.exception(_("Error importing interface driver '%s'"
+                            % self.conf.interface_driver))
             sys.exit(1)
-
-        self.polling_interval = conf.polling_interval
-
-        self.qclient = client.Client(
-            username=self.conf.admin_user,
-            password=self.conf.admin_password,
-            tenant_name=self.conf.admin_tenant_name,
-            auth_url=self.conf.auth_url,
-            auth_strategy=self.conf.auth_strategy,
-            region_name=self.conf.auth_region
-        )
-
+        self.plugin_rpc = L3PluginApi(topics.PLUGIN, host)
+        self.fullsync = True
+        self.sync_sem = semaphore.Semaphore(1)
         if self.conf.use_namespaces:
             self._destroy_all_router_namespaces()
+        super(L3NATAgent, self).__init__(host=self.conf.host)
 
     def _destroy_all_router_namespaces(self):
         """Destroy all router namespaces on the host to eliminate
@@ -138,7 +172,7 @@ class L3NATAgent(object):
                 try:
                     self._destroy_router_namespace(ns)
                 except:
-                    LOG.exception(_("Couldn't delete namespace '%s'"), ns)
+                    LOG.exception(_("Failed deleting namespace '%s'") % ns)
 
     def _destroy_router_namespace(self, namespace):
         ns_ip = ip_lib.IPWrapper(self.conf.root_helper,
@@ -160,81 +194,25 @@ class L3NATAgent(object):
             ip_wrapper = ip_wrapper_root.ensure_namespace(ri.ns_name())
             ip_wrapper.netns.execute(['sysctl', '-w', 'net.ipv4.ip_forward=1'])
 
-    def daemon_loop(self):
-        #TODO(danwent): this simple diff logic does not handle if
-        # details of a router port (e.g., IP, mac) are changed behind
-        # our back.  Will fix this properly with update notifications.
-
-        while True:
-            try:
-                self.do_single_loop()
-            except:
-                LOG.exception(_("Error running l3_nat daemon_loop"))
-
-            time.sleep(self.polling_interval)
-
     def _fetch_external_net_id(self):
         """Find UUID of single external network for this agent"""
         if self.conf.gateway_external_network_id:
             return self.conf.gateway_external_network_id
-
-        params = {'router:external': True}
-        ex_nets = self.qclient.list_networks(**params)['networks']
-        if len(ex_nets) > 1:
-            raise Exception(_("Must configure 'gateway_external_network_id' "
-                              "if Quantum has more than one external "
-                              "network."))
-        if len(ex_nets) == 0:
-            return None
-        return ex_nets[0]['id']
-
-    def do_single_loop(self):
-
-        if (self.conf.external_network_bridge and
-            not ip_lib.device_exists(self.conf.external_network_bridge)):
-            LOG.error(_("External network bridge '%s' does not exist"),
-                      self.conf.external_network_bridge)
-            return
-
-        prev_router_ids = set(self.router_info)
-        cur_router_ids = set()
-
-        target_ex_net_id = self._fetch_external_net_id()
-
-        # identify and update new or modified routers
-        for r in self.qclient.list_routers()['routers']:
-            if not r['admin_state_up']:
-                continue
-
-            ex_net_id = (r['external_gateway_info'] and
-                         r['external_gateway_info'].get('network_id'))
-            if not ex_net_id and not self.conf.handle_internal_only_routers:
-                continue
-
-            if ex_net_id and ex_net_id != target_ex_net_id:
-                continue
-
-            # If namespaces are disabled, only process the router associated
-            # with the configured agent id.
-            if (self.conf.use_namespaces or
-                r['id'] == self.conf.router_id):
-                cur_router_ids.add(r['id'])
+        try:
+            return self.plugin_rpc.get_external_network_id(
+                context.get_admin_context())
+        except rpc_common.RemoteError as e:
+            if e.exc_type == 'TooManyExternalNetworks':
+                msg = _(
+                    "The 'gateway_external_network_id' must be configured"
+                    " if Quantum has more than one external network.")
+                raise Exception(msg)
             else:
-                continue
-            if r['id'] not in self.router_info:
-                self._router_added(r['id'])
-
-            ri = self.router_info[r['id']]
-            self.process_router(ri)
+                raise
 
-        # identify and remove routers that no longer exist
-        for router_id in prev_router_ids - cur_router_ids:
-            self._router_removed(router_id)
-        prev_router_ids = cur_router_ids
-
-    def _router_added(self, router_id):
+    def _router_added(self, router_id, router=None):
         ri = RouterInfo(router_id, self.conf.root_helper,
-                        self.conf.use_namespaces)
+                        self.conf.use_namespaces, router)
         self.router_info[router_id] = ri
         if self.conf.use_namespaces:
             self._create_router_namespace(ri)
@@ -283,20 +261,15 @@ class L3NATAgent(object):
         if not ips:
             raise Exception(_("Router port %s has no IP address") % port['id'])
         if len(ips) > 1:
-            LOG.error(_("Ignoring multiple IPs on router port %s"), port['id'])
-        port['subnet'] = self.qclient.show_subnet(
-            ips[0]['subnet_id'])['subnet']
+            LOG.error(_("Ignoring multiple IPs on router port %s") %
+                      port['id'])
         prefixlen = netaddr.IPNetwork(port['subnet']['cidr']).prefixlen
         port['ip_cidr'] = "%s/%s" % (ips[0]['ip_address'], prefixlen)
 
     def process_router(self, ri):
 
         ex_gw_port = self._get_ex_gw_port(ri)
-
-        internal_ports = self.qclient.list_ports(
-            device_id=ri.router_id,
-            device_owner=l3_db.DEVICE_OWNER_ROUTER_INTF)['ports']
-
+        internal_ports = ri.router.get(l3_constants.INTERFACE_KEY, [])
         existing_port_ids = set([p['id'] for p in ri.internal_ports])
         current_port_ids = set([p['id'] for p in internal_ports
                                 if p['admin_state_up']])
@@ -333,8 +306,7 @@ class L3NATAgent(object):
         ri.ex_gw_port = ex_gw_port
 
     def process_router_floating_ips(self, ri, ex_gw_port):
-        floating_ips = self.qclient.list_floatingips(
-            router_id=ri.router_id)['floatingips']
+        floating_ips = ri.router.get(l3_constants.FLOATINGIP_KEY, [])
         existing_floating_ip_ids = set([fip['id'] for fip in ri.floating_ips])
         cur_floating_ip_ids = set([fip['id'] for fip in floating_ips])
 
@@ -375,16 +347,7 @@ class L3NATAgent(object):
                     ri.floating_ips.append(new_fip)
 
     def _get_ex_gw_port(self, ri):
-        ports = self.qclient.list_ports(
-            device_id=ri.router_id,
-            device_owner=l3_db.DEVICE_OWNER_ROUTER_GW)['ports']
-        if not ports:
-            return None
-        elif len(ports) == 1:
-            return ports[0]
-        else:
-            LOG.error(_("Ignoring multiple gateway ports for router %s"),
-                      ri.router_id)
+        return ri.router.get('gw_port')
 
     def _send_gratuitous_arp_packet(self, ri, interface_name, ip_address):
         if self.conf.send_arp_for_ha > 0:
@@ -562,14 +525,94 @@ class L3NATAgent(object):
                 ('float-snat', '-s %s -j SNAT --to %s' %
                  (fixed_ip, floating_ip))]
 
+    def router_deleted(self, context, router_id):
+        """Deal with router deletion RPC message."""
+        with self.sync_sem:
+            if router_id in self.router_info:
+                try:
+                    self._router_removed(router_id)
+                except Exception:
+                    msg = _("Failed dealing with router "
+                            "'%s' deletion RPC message")
+                    LOG.debug(msg, router_id)
+                    self.fullsync = True
+
+    def routers_updated(self, context, routers):
+        """Deal with routers modification and creation RPC message."""
+        if not routers:
+            return
+        with self.sync_sem:
+            try:
+                self._process_routers(routers)
+            except Exception:
+                msg = _("Failed dealing with routers update RPC message")
+                LOG.debug(msg)
+                self.fullsync = True
+
+    def _process_routers(self, routers):
+        if (self.conf.external_network_bridge and
+            not ip_lib.device_exists(self.conf.external_network_bridge)):
+            LOG.error(_("The external network bridge '%s' does not exist")
+                      % self.conf.external_network_bridge)
+            return
+
+        target_ex_net_id = self._fetch_external_net_id()
+
+        for r in routers:
+            if not r['admin_state_up']:
+                continue
+
+            # If namespaces are disabled, only process the router associated
+            # with the configured agent id.
+            if (not self.conf.use_namespaces and
+                r['id'] != self.conf.router_id):
+                continue
+
+            ex_net_id = (r['external_gateway_info'] or {}).get('network_id')
+            if not ex_net_id and not self.conf.handle_internal_only_routers:
+                continue
+
+            if ex_net_id and ex_net_id != target_ex_net_id:
+                continue
+
+            if r['id'] not in self.router_info:
+                self._router_added(r['id'])
+
+            ri = self.router_info[r['id']]
+            ri.router = r
+            self.process_router(ri)
+
+    @periodic_task.periodic_task
+    def _sync_routers_task(self, context):
+        # we need to sync with router deletion RPC message
+        with self.sync_sem:
+            if self.fullsync:
+                try:
+                    if not self.conf.use_namespaces:
+                        router_id = self.conf.router_id
+                    else:
+                        router_id = None
+                    routers = self.plugin_rpc.get_routers(
+                        context, router_id)
+                    self.router_info = {}
+                    self._process_routers(routers)
+                    self.fullsync = False
+                except Exception:
+                    LOG.exception(_("Failed synchronizing routers"))
+                    self.fullsync = True
+
+    def after_start(self):
+        LOG.info(_("L3 agent started"))
+
 
 def main():
-    conf = config.setup_conf()
+    eventlet.monkey_patch()
+    conf = cfg.CONF
     conf.register_opts(L3NATAgent.OPTS)
     conf.register_opts(interface.OPTS)
     conf.register_opts(external_process.OPTS)
     conf(sys.argv)
     config.setup_logging(conf)
-
-    mgr = L3NATAgent(conf)
-    mgr.daemon_loop()
+    server = quantum_service.Service.create(binary='quantum-l3-agent',
+                                            topic=topics.L3_AGENT)
+    service.launch(server).wait()
index 1076e48afd60329359a9ccea8596a50130a63163..02b2e935b1201271fd62911de901e145f5417dad 100644 (file)
@@ -13,8 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import socket
-
 import netaddr
 import webob.exc
 
@@ -54,14 +52,6 @@ FAULT_MAP = {exceptions.NotFound: webob.exc.HTTPNotFound,
 QUOTAS = quota.QUOTAS
 
 
-def _get_hostname():
-    return socket.gethostname()
-
-
-# Register the configuration options
-cfg.CONF.register_opt(cfg.StrOpt('host', default=_get_hostname()))
-
-
 def _fields(request):
     """
     Extracts the list of fields to return
index ee5c514e36a7276bde58d7e04186c4e398b6ebd0..709471e66538e322d34aec5c4dc96a5609e30f29 100644 (file)
@@ -25,6 +25,7 @@ import sys
 from paste import deploy
 
 from quantum.api.v2 import attributes
+from quantum.common import utils
 from quantum.openstack.common import cfg
 from quantum.openstack.common import log as logging
 from quantum.version import version_info as quantum_version
@@ -53,7 +54,8 @@ core_opts = [
     cfg.BoolOpt('allow_overlapping_ips', default=False),
     cfg.StrOpt('control_exchange',
                default='quantum',
-               help='AMQP exchange to connect to if using RabbitMQ or Qpid')
+               help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
+    cfg.StrOpt('host', default=utils.get_hostname()),
 
 ]
 
index a803cdf589313845f05e3eb59fbaf368d145b343..2f4efb64db7231e5fea681438b19df0b0e88dd16 100644 (file)
@@ -22,3 +22,9 @@ PORT_STATUS_ACTIVE = 'ACTIVE'
 PORT_STATUS_BUILD = 'BUILD'
 PORT_STATUS_DOWN = 'DOWN'
 PORT_STATUS_ERROR = 'ERROR'
+
+DEVICE_OWNER_ROUTER_INTF = "network:router_interface"
+DEVICE_OWNER_ROUTER_GW = "network:router_gateway"
+DEVICE_OWNER_FLOATINGIP = "network:floatingip"
+FLOATINGIP_KEY = '_floatingips'
+INTERFACE_KEY = '_interfaces'
index 365687bad32ebd95c651b43e477a5b7c817111cc..4d1013c57451edb18ac3d8133c6ecee89123740c 100644 (file)
@@ -235,3 +235,7 @@ class InvalidSharedSetting(QuantumException):
 
 class InvalidExtenstionEnv(QuantumException):
     message = _("Invalid extension environment: %(reason)s")
+
+
+class TooManyExternalNetworks(QuantumException):
+    message = _("More than one external network exists")
index d46769b6de59cef3da1c002b06a6d6edd67fc1d0..15621ef59d00e8e1593ac5055fa697e02c726937 100644 (file)
@@ -25,6 +25,8 @@ AGENT = 'q-agent-notifier'
 PLUGIN = 'q-plugin'
 DHCP = 'q-dhcp-notifer'
 
+L3_AGENT = 'l3_agent'
+
 
 def get_topic_name(prefix, table, operation):
     """Create a topic name.
index ed8e6d772eb419e62196b81262d69891a569b183..aeca0484e9b11bc302bc7424d9231f56b5d98fe8 100644 (file)
@@ -23,6 +23,7 @@
 
 import os
 import signal
+import socket
 
 from eventlet.green import subprocess
 
@@ -145,3 +146,7 @@ def parse_mappings(mapping_list, unique_values=True):
                              (value, mapping))
         mappings[key] = value
     return mappings
+
+
+def get_hostname():
+    return socket.getfqdn()
index 19ae37d1072b60f8709fabc3862198d1067f3a9f..78ac1cf5ee614f68028d243cfda21404684c809d 100644 (file)
@@ -25,12 +25,13 @@ from sqlalchemy.sql import expression as expr
 import webob.exc as w_exc
 
 from quantum.api.v2 import attributes
+from quantum.common import constants as l3_constants
 from quantum.common import exceptions as q_exc
 from quantum.db import db_base_plugin_v2
+from quantum.db import l3_rpc_agent_api
 from quantum.db import model_base
 from quantum.db import models_v2
 from quantum.extensions import l3
-from quantum.openstack.common import cfg
 from quantum.openstack.common import log as logging
 from quantum.openstack.common import uuidutils
 from quantum import policy
@@ -38,17 +39,10 @@ from quantum import policy
 
 LOG = logging.getLogger(__name__)
 
-l3_opts = [
-    cfg.StrOpt('metadata_ip_address', default='127.0.0.1'),
-    cfg.IntOpt('metadata_port', default=8775)
-]
 
-# Register the configuration options
-cfg.CONF.register_opts(l3_opts)
-
-DEVICE_OWNER_ROUTER_INTF = "network:router_interface"
-DEVICE_OWNER_ROUTER_GW = "network:router_gateway"
-DEVICE_OWNER_FLOATINGIP = "network:floatingip"
+DEVICE_OWNER_ROUTER_INTF = l3_constants.DEVICE_OWNER_ROUTER_INTF
+DEVICE_OWNER_ROUTER_GW = l3_constants.DEVICE_OWNER_ROUTER_GW
+DEVICE_OWNER_FLOATINGIP = l3_constants.DEVICE_OWNER_FLOATINGIP
 
 
 class Router(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
@@ -164,6 +158,9 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
             # Ensure we actually have something to update
             if r.keys():
                 router_db.update(r)
+        routers = self.get_sync_data(context.elevated(),
+                                     [router_db['id']])
+        l3_rpc_agent_api.L3AgentNofity.routers_updated(context, routers)
         return self._make_router_dict(router_db)
 
     def _update_router_gw_info(self, context, router_id, info):
@@ -250,6 +247,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
                 self._delete_port(context.elevated(), ports[0]['id'])
 
             context.session.delete(router)
+        l3_rpc_agent_api.L3AgentNofity.router_deleted(context, id)
 
     def get_router(self, context, id, fields=None):
         router = self._get_router(context, id)
@@ -324,9 +322,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
             self._check_for_dup_router_subnet(context, router_id,
                                               port['network_id'],
                                               fixed_ips[0]['subnet_id'])
-            with context.session.begin(subtransactions=True):
-                port.update({'device_id': router_id,
-                             'device_owner': DEVICE_OWNER_ROUTER_INTF})
+            port.update({'device_id': router_id,
+                         'device_owner': DEVICE_OWNER_ROUTER_INTF})
         elif 'subnet_id' in interface_info:
             subnet_id = interface_info['subnet_id']
             subnet = self._get_subnet(context, subnet_id)
@@ -348,6 +345,9 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
                  'device_id': router_id,
                  'device_owner': DEVICE_OWNER_ROUTER_INTF,
                  'name': ''}})
+
+        routers = self.get_sync_data(context.elevated(), [router_id])
+        l3_rpc_agent_api.L3AgentNofity.routers_updated(context, routers)
         return {'port_id': port['id'],
                 'subnet_id': port['fixed_ips'][0]['subnet_id']}
 
@@ -423,6 +423,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
                 raise w_exc.HTTPNotFound("Router %(router_id)s has no "
                                          "interface on subnet %(subnet_id)s"
                                          % locals())
+        routers = self.get_sync_data(context.elevated(), [router_id])
+        l3_rpc_agent_api.L3AgentNofity.routers_updated(context, routers)
 
     def _get_floatingip(self, context, id):
         try:
@@ -621,7 +623,10 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
         except Exception:
             LOG.exception(_("Floating IP association failed"))
             raise
-
+        router_id = floatingip_db['router_id']
+        if router_id:
+            routers = self.get_sync_data(context.elevated(), [router_id])
+            l3_rpc_agent_api.L3AgentNofity.routers_updated(context, routers)
         return self._make_floatingip_dict(floatingip_db)
 
     def update_floatingip(self, context, id, floatingip):
@@ -631,18 +636,32 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
             fip['tenant_id'] = floatingip_db['tenant_id']
             fip['id'] = id
             fip_port_id = floatingip_db['floating_port_id']
+            before_router_id = floatingip_db['router_id']
             self._update_fip_assoc(context, fip, floatingip_db,
                                    self.get_port(context.elevated(),
                                                  fip_port_id))
+        router_ids = []
+        if before_router_id:
+            router_ids.append(before_router_id)
+        router_id = floatingip_db['router_id']
+        if router_id and router_id != before_router_id:
+            router_ids.append(router_id)
+        if router_ids:
+            routers = self.get_sync_data(context.elevated(), router_ids)
+            l3_rpc_agent_api.L3AgentNofity.routers_updated(context, routers)
         return self._make_floatingip_dict(floatingip_db)
 
     def delete_floatingip(self, context, id):
         floatingip = self._get_floatingip(context, id)
+        router_id = floatingip['router_id']
         with context.session.begin(subtransactions=True):
             context.session.delete(floatingip)
             self.delete_port(context.elevated(),
                              floatingip['floating_port_id'],
                              l3_port_check=False)
+        if router_id:
+            routers = self.get_sync_data(context.elevated(), [router_id])
+            l3_rpc_agent_api.L3AgentNofity.routers_updated(context, routers)
 
     def get_floatingip(self, context, id, fields=None):
         floatingip = self._get_floatingip(context, id)
@@ -677,6 +696,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
             try:
                 fip_qry = context.session.query(FloatingIP)
                 floating_ip = fip_qry.filter_by(fixed_port_id=port_id).one()
+                router_id = floating_ip['router_id']
                 floating_ip.update({'fixed_port_id': None,
                                     'fixed_ip_address': None,
                                     'router_id': None})
@@ -686,6 +706,9 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
                 # should never happen
                 raise Exception('Multiple floating IPs found for port %s'
                                 % port_id)
+        if router_id:
+            routers = self.get_sync_data(context.elevated(), [router_id])
+            l3_rpc_agent_api.L3AgentNofity.routers_updated(context, routers)
 
     def _check_l3_view_auth(self, context, network):
         return policy.check(context,
@@ -761,3 +784,137 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
             return [n for n in nets if n['id'] in ext_nets]
         else:
             return [n for n in nets if n['id'] not in ext_nets]
+
+    def _get_sync_routers(self, context, router_ids=None):
+        """Query routers and their gw ports for l3 agent.
+
+        Query routers with the router_ids. The gateway ports, if any,
+        will be queried too.
+        l3 agent has an option to deal with only one router id. In addition,
+        when we need to notify the agent the data about only one router
+        (when modification of router, its interfaces, gw_port and floatingips),
+        we will have router_ids.
+        @param router_ids: the list of router ids which we want to query.
+                           if it is None, all of routers will be queried.
+        @return: a list of dicted routers with dicted gw_port populated if any
+        """
+        router_query = context.session.query(Router)
+        if router_ids:
+            router_query = router_query.filter(Router.id.in_(router_ids))
+        routers = router_query.all()
+        gw_port_ids = []
+        if not routers:
+            return []
+        for router in routers:
+            gw_port_id = router.gw_port_id
+            if gw_port_id:
+                gw_port_ids.append(gw_port_id)
+        gw_ports = []
+        if gw_port_ids:
+            gw_ports = self._get_sync_gw_ports(context, gw_port_ids)
+        gw_port_id_gw_port_dict = {}
+        for gw_port in gw_ports:
+            gw_port_id_gw_port_dict[gw_port['id']] = gw_port
+        router_id_gw_port_id_dict = {}
+        for router in routers:
+            router_id_gw_port_id_dict[router.id] = router.gw_port_id
+        routers_list = [self._make_router_dict(c, None) for c in routers]
+        for router in routers_list:
+            gw_port_id = router_id_gw_port_id_dict[router['id']]
+            if gw_port_id:
+                router['gw_port'] = gw_port_id_gw_port_dict[gw_port_id]
+        return routers_list
+
+    def _get_sync_floating_ips(self, context, router_ids):
+        """Query floating_ips that relate to list of router_ids."""
+        if not router_ids:
+            return []
+        return self.get_floatingips(context, {'router_id': router_ids})
+
+    def _get_sync_gw_ports(self, context, gw_port_ids):
+        if not gw_port_ids:
+            return []
+        filters = {'id': gw_port_ids}
+        gw_ports = self.get_ports(context, filters)
+        if gw_ports:
+            self._populate_subnet_for_ports(context, gw_ports)
+        return gw_ports
+
+    def _get_sync_interfaces(self, context, router_ids):
+        """Query router interfaces that relate to list of router_ids."""
+        if not router_ids:
+            return []
+        filters = {'device_id': router_ids,
+                   'device_owner': [DEVICE_OWNER_ROUTER_INTF]}
+        interfaces = self.get_ports(context, filters)
+        if interfaces:
+            self._populate_subnet_for_ports(context, interfaces)
+        return interfaces
+
+    def _populate_subnet_for_ports(self, context, ports):
+        """Populate ports with subnet.
+
+        These ports already have fixed_ips populated.
+        """
+        if not ports:
+            return
+        subnet_id_ports_dict = {}
+        for port in ports:
+            fixed_ips = port.get('fixed_ips', [])
+            if len(fixed_ips) > 1:
+                LOG.error(_("Ignoring multiple IPs on router port %s") %
+                          port['id'])
+                ports.remove(port)
+                continue
+            # Empty fixed_ips should not happen
+            fixed_ip = fixed_ips[0]
+            my_ports = subnet_id_ports_dict.get(fixed_ip['subnet_id'], [])
+            my_ports.append(port)
+            subnet_id_ports_dict[fixed_ip['subnet_id']] = my_ports
+        filters = {'id': subnet_id_ports_dict.keys()}
+        fields = ['id', 'cidr', 'gateway_ip']
+        subnet_dicts = self.get_subnets(context, filters, fields)
+        for subnet_dict in subnet_dicts:
+            ports = subnet_id_ports_dict.get(subnet_dict['id'], [])
+            for port in ports:
+                # TODO(gongysh) stash the subnet into fixed_ips
+                # to make the payload smaller.
+                port['subnet'] = {'id': subnet_dict['id'],
+                                  'cidr': subnet_dict['cidr'],
+                                  'gateway_ip': subnet_dict['gateway_ip']}
+
+    def _process_sync_data(self, routers, interfaces, floating_ips):
+        routers_dict = {}
+        for router in routers:
+            routers_dict[router['id']] = router
+        for floating_ip in floating_ips:
+            router = routers_dict.get(floating_ip['router_id'])
+            if router:
+                router_floatingips = router.get(l3_constants.FLOATINGIP_KEY,
+                                                [])
+                router_floatingips.append(floating_ip)
+                router[l3_constants.FLOATINGIP_KEY] = router_floatingips
+        for interface in interfaces:
+            router = routers_dict.get(interface['device_id'])
+            if router:
+                router_interfaces = router.get(l3_constants.INTERFACE_KEY, [])
+                router_interfaces.append(interface)
+                router[l3_constants.INTERFACE_KEY] = router_interfaces
+        return routers_dict.values()
+
+    def get_sync_data(self, context, router_ids=None):
+        """Query routers and their related floating_ips, interfaces."""
+        with context.session.begin(subtransactions=True):
+            routers = self._get_sync_routers(context,
+                                             router_ids)
+            router_ids = [router['id'] for router in routers]
+            floating_ips = self._get_sync_floating_ips(context, router_ids)
+            interfaces = self._get_sync_interfaces(context, router_ids)
+        return self._process_sync_data(routers, interfaces, floating_ips)
+
+    def get_external_network_id(self, context):
+        nets = self.get_networks(context, {'router:external': [True]})
+        if len(nets) > 1:
+            raise q_exc.TooManyExternalNetworks()
+        else:
+            return nets[0]['id'] if nets else None
diff --git a/quantum/db/l3_rpc_agent_api.py b/quantum/db/l3_rpc_agent_api.py
new file mode 100644 (file)
index 0000000..de032d3
--- /dev/null
@@ -0,0 +1,50 @@
+# Copyright (c) 2012 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from quantum.common import topics
+from quantum.openstack.common import jsonutils
+from quantum.openstack.common import log as logging
+from quantum.openstack.common.rpc import proxy
+
+
+LOG = logging.getLogger(__name__)
+
+
+class L3AgentNotifyAPI(proxy.RpcProxy):
+    """API for plugin to notify L3 agent."""
+    BASE_RPC_API_VERSION = '1.0'
+
+    def __init__(self, topic=topics.L3_AGENT):
+        super(L3AgentNotifyAPI, self).__init__(
+            topic=topic, default_version=self.BASE_RPC_API_VERSION)
+
+    def router_deleted(self, context, router_id):
+        LOG.debug(_('Nofity agent the router %s is deleted'), router_id)
+        self.cast(context,
+                  self.make_msg('router_deleted',
+                                router_id=router_id),
+                  topic=self.topic)
+
+    def routers_updated(self, context, routers):
+        if routers:
+            LOG.debug(_('Nofity agent routers were updated:\n %s'),
+                      jsonutils.dumps(routers, indent=5))
+            self.cast(context,
+                      self.make_msg('routers_updated',
+                                    routers=routers),
+                      topic=self.topic)
+
+
+L3AgentNofity = L3AgentNotifyAPI()
diff --git a/quantum/db/l3_rpc_base.py b/quantum/db/l3_rpc_base.py
new file mode 100644 (file)
index 0000000..2a11b70
--- /dev/null
@@ -0,0 +1,56 @@
+# Copyright (c) 2012 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from quantum import context as quantum_context
+from quantum import manager
+from quantum.openstack.common import jsonutils
+from quantum.openstack.common import log as logging
+
+
+LOG = logging.getLogger(__name__)
+
+
+class L3RpcCallbackMixin(object):
+    """A mix-in that enable L3 agent rpc support in plugin implementations."""
+
+    def sync_routers(self, context, **kwargs):
+        """Sync routers according to filters to a specific agent.
+
+        @param context: contain user information
+        @param kwargs: host, or router_id
+        @return: a list of routers
+                 with their interfaces and floating_ips
+        """
+        router_id = kwargs.get('router_id')
+        # TODO(gongysh) we will use host in kwargs for multi host BP
+        context = quantum_context.get_admin_context()
+        plugin = manager.QuantumManager.get_plugin()
+        routers = plugin.get_sync_data(context, router_id)
+        LOG.debug(_("Routers returned to l3 agent:\n %s"),
+                  jsonutils.dumps(routers, indent=5))
+        return routers
+
+    def get_external_network_id(self, context, **kwargs):
+        """Get one external network id for l3 agent.
+
+        l3 agent expects only on external network when it performs
+        this query.
+        """
+        context = quantum_context.get_admin_context()
+        plugin = manager.QuantumManager.get_plugin()
+        net_id = plugin.get_external_network_id(context)
+        LOG.debug(_("External network ID returned to l3 agent: %s"),
+                  net_id)
+        return net_id
index e1cd169287640d188a709cfa0003b84bc9270535..00aca48068ce8aa62e68299f8be9ada4a90d869f 100644 (file)
 #    under the License.
 # @author: Somik Behera, Nicira Networks, Inc.
 
-"""
-Quantum's Manager class is responsible for parsing a config file and
-instantiating the correct plugin that concretely implement quantum_plugin_base
-class.
-The caller should make sure that QuantumManager is a singleton.
-"""
-
 from quantum.common.exceptions import ClassNotFound
 from quantum.openstack.common import cfg
 from quantum.openstack.common import importutils
 from quantum.openstack.common import log as logging
+from quantum.openstack.common import periodic_task
 from quantum.plugins.common import constants
 
 
 LOG = logging.getLogger(__name__)
 
 
-class QuantumManager(object):
+class Manager(periodic_task.PeriodicTasks):
+
+    # Set RPC API version to 1.0 by default.
+    RPC_API_VERSION = '1.0'
+
+    def __init__(self, host=None):
+        if not host:
+            host = cfg.CONF.host
+        self.host = host
+        super(Manager, self).__init__()
+
+    def periodic_tasks(self, context, raise_on_error=False):
+        self.run_periodic_tasks(context, raise_on_error=raise_on_error)
+
+    def init_host(self):
+        """Handle initialization if this is a standalone service.
 
+        Child classes should override this method.
+
+        """
+        pass
+
+    def after_start(self):
+        """Handler post initialization stuff.
+
+        Child classes can override this method.
+        """
+        pass
+
+
+class QuantumManager(object):
+    """
+    Quantum's Manager class is responsible for parsing a config file and
+    instantiating the correct plugin that concretely implement
+    quantum_plugin_base class.
+    The caller should make sure that QuantumManager is a singleton.
+    """
     _instance = None
 
     def __init__(self, options=None, config_file=None):
diff --git a/quantum/openstack/common/periodic_task.py b/quantum/openstack/common/periodic_task.py
new file mode 100644 (file)
index 0000000..ba2f511
--- /dev/null
@@ -0,0 +1,111 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from quantum.openstack.common.gettextutils import _
+from quantum.openstack.common import log as logging
+
+LOG = logging.getLogger(__name__)
+
+
+def periodic_task(*args, **kwargs):
+    """Decorator to indicate that a method is a periodic task.
+
+    This decorator can be used in two ways:
+
+        1. Without arguments '@periodic_task', this will be run on every tick
+           of the periodic scheduler.
+
+        2. With arguments, @periodic_task(ticks_between_runs=N), this will be
+           run on every N ticks of the periodic scheduler.
+    """
+    def decorator(f):
+        f._periodic_task = True
+        f._ticks_between_runs = kwargs.pop('ticks_between_runs', 0)
+        return f
+
+    # NOTE(sirp): The `if` is necessary to allow the decorator to be used with
+    # and without parens.
+    #
+    # In the 'with-parens' case (with kwargs present), this function needs to
+    # return a decorator function since the interpreter will invoke it like:
+    #
+    #   periodic_task(*args, **kwargs)(f)
+    #
+    # In the 'without-parens' case, the original function will be passed
+    # in as the first argument, like:
+    #
+    #   periodic_task(f)
+    if kwargs:
+        return decorator
+    else:
+        return decorator(args[0])
+
+
+class _PeriodicTasksMeta(type):
+    def __init__(cls, names, bases, dict_):
+        """Metaclass that allows us to collect decorated periodic tasks."""
+        super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_)
+
+        # NOTE(sirp): if the attribute is not present then we must be the base
+        # class, so, go ahead and initialize it. If the attribute is present,
+        # then we're a subclass so make a copy of it so we don't step on our
+        # parent's toes.
+        try:
+            cls._periodic_tasks = cls._periodic_tasks[:]
+        except AttributeError:
+            cls._periodic_tasks = []
+
+        try:
+            cls._ticks_to_skip = cls._ticks_to_skip.copy()
+        except AttributeError:
+            cls._ticks_to_skip = {}
+
+        # This uses __dict__ instead of
+        # inspect.getmembers(cls, inspect.ismethod) so only the methods of the
+        # current class are added when this class is scanned, and base classes
+        # are not added redundantly.
+        for value in cls.__dict__.values():
+            if getattr(value, '_periodic_task', False):
+                task = value
+                name = task.__name__
+                cls._periodic_tasks.append((name, task))
+                cls._ticks_to_skip[name] = task._ticks_between_runs
+
+
+class PeriodicTasks(object):
+    __metaclass__ = _PeriodicTasksMeta
+
+    def run_periodic_tasks(self, context, raise_on_error=False):
+        """Tasks to be run at a periodic interval."""
+        for task_name, task in self._periodic_tasks:
+            full_task_name = '.'.join([self.__class__.__name__, task_name])
+
+            ticks_to_skip = self._ticks_to_skip[task_name]
+            if ticks_to_skip > 0:
+                LOG.debug(_("Skipping %(full_task_name)s, %(ticks_to_skip)s"
+                            " ticks left until next run"), locals())
+                self._ticks_to_skip[task_name] -= 1
+                continue
+
+            self._ticks_to_skip[task_name] = task._ticks_between_runs
+            LOG.debug(_("Running periodic task %(full_task_name)s"), locals())
+
+            try:
+                task(self, context)
+            except Exception as e:
+                if raise_on_error:
+                    raise
+                LOG.exception(_("Error during %(full_task_name)s: %(e)s"),
+                              locals())
index 5086662b7daab2ebff7015bb34ebea0e02de22dd..711a059c2ea6629736e7ee18d3ce7cb7cbc13f0d 100644 (file)
@@ -24,6 +24,7 @@ from quantum.db import api as db_api
 from quantum.db import db_base_plugin_v2
 from quantum.db import dhcp_rpc_base
 from quantum.db import l3_db
+from quantum.db import l3_rpc_base
 from quantum.extensions import providernet as provider
 from quantum.openstack.common import cfg
 from quantum.openstack.common import log as logging
@@ -37,7 +38,8 @@ from quantum import policy
 LOG = logging.getLogger(__name__)
 
 
-class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin):
+class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
+                              l3_rpc_base.L3RpcCallbackMixin):
 
     # Set RPC API version to 1.0 by default.
     RPC_API_VERSION = '1.0'
index f7416b139b3e5831db3353eff52af3d7585ae98f..eea17377cb635d9c725942d14d2acd3ca54eb3c6 100644 (file)
@@ -30,6 +30,7 @@ from quantum.common import topics
 from quantum.db import db_base_plugin_v2
 from quantum.db import dhcp_rpc_base
 from quantum.db import l3_db
+from quantum.db import l3_rpc_base
 from quantum.extensions import providernet as provider
 from quantum.openstack.common import cfg
 from quantum.openstack.common import log as logging
@@ -44,7 +45,8 @@ from quantum import policy
 LOG = logging.getLogger(__name__)
 
 
-class OVSRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin):
+class OVSRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
+                      l3_rpc_base.L3RpcCallbackMixin):
 
     # Set RPC API version to 1.0 by default.
     RPC_API_VERSION = '1.0'
index 39c0850f89d4da645b5a3f543676efd9bed0ef53..3876bff64ffa5731f37b7d63bcdc96365ab763b3 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import inspect
 import logging as std_logging
+import os
+import random
 
 from quantum.common import config
+from quantum import context
 from quantum.openstack.common import cfg
+from quantum.openstack.common import importutils
 from quantum.openstack.common import log as logging
+from quantum.openstack.common import loopingcall
+from quantum.openstack.common.rpc import service
 from quantum import wsgi
 
 
+LOG = logging.getLogger(__name__)
+
+service_opts = [
+    cfg.IntOpt('report_interval',
+               default=10,
+               help='seconds between nodes reporting state to datastore'),
+    cfg.IntOpt('periodic_interval',
+               default=40,
+               help='seconds between running periodic tasks'),
+    cfg.IntOpt('periodic_fuzzy_delay',
+               default=5,
+               help='range of seconds to randomly delay when starting the'
+                    ' periodic task scheduler to reduce stampeding.'
+                    ' (Disable by setting to 0)'),
+]
+CONF = cfg.CONF
+CONF.register_opts(service_opts)
+
 LOG = logging.getLogger(__name__)
 
 
@@ -91,3 +116,120 @@ def _run_wsgi(app_name):
              {'host': cfg.CONF.bind_host,
               'port': cfg.CONF.bind_port})
     return server
+
+
+class Service(service.Service):
+    """Service object for binaries running on hosts.
+
+    A service takes a manager and enables rpc by listening to queues based
+    on topic. It also periodically runs tasks on the manager."""
+
+    def __init__(self, host, binary, topic, manager, report_interval=None,
+                 periodic_interval=None, periodic_fuzzy_delay=None,
+                 *args, **kwargs):
+
+        self.binary = binary
+        self.manager_class_name = manager
+        manager_class = importutils.import_class(self.manager_class_name)
+        self.manager = manager_class(host=host, *args, **kwargs)
+        self.report_interval = report_interval
+        self.periodic_interval = periodic_interval
+        self.periodic_fuzzy_delay = periodic_fuzzy_delay
+        self.saved_args, self.saved_kwargs = args, kwargs
+        self.timers = []
+        super(Service, self).__init__(host, topic, manager=self.manager)
+
+    def start(self):
+        self.manager.init_host()
+        super(Service, self).start()
+        if self.report_interval:
+            pulse = loopingcall.LoopingCall(self.report_state)
+            pulse.start(interval=self.report_interval,
+                        initial_delay=self.report_interval)
+            self.timers.append(pulse)
+
+        if self.periodic_interval:
+            if self.periodic_fuzzy_delay:
+                initial_delay = random.randint(0, self.periodic_fuzzy_delay)
+            else:
+                initial_delay = None
+
+            periodic = loopingcall.LoopingCall(self.periodic_tasks)
+            periodic.start(interval=self.periodic_interval,
+                           initial_delay=initial_delay)
+            self.timers.append(periodic)
+        self.manager.after_start()
+
+    def __getattr__(self, key):
+        manager = self.__dict__.get('manager', None)
+        return getattr(manager, key)
+
+    @classmethod
+    def create(cls, host=None, binary=None, topic=None, manager=None,
+               report_interval=None, periodic_interval=None,
+               periodic_fuzzy_delay=None):
+        """Instantiates class and passes back application object.
+
+        :param host: defaults to CONF.host
+        :param binary: defaults to basename of executable
+        :param topic: defaults to bin_name - 'nova-' part
+        :param manager: defaults to CONF.<topic>_manager
+        :param report_interval: defaults to CONF.report_interval
+        :param periodic_interval: defaults to CONF.periodic_interval
+        :param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay
+
+        """
+        if not host:
+            host = CONF.host
+        if not binary:
+            binary = os.path.basename(inspect.stack()[-1][1])
+        if not topic:
+            topic = binary.rpartition('quantum-')[2]
+            topic = topic.replace("-", "_")
+        if not manager:
+            manager = CONF.get('%s_manager' % topic, None)
+        if report_interval is None:
+            report_interval = CONF.report_interval
+        if periodic_interval is None:
+            periodic_interval = CONF.periodic_interval
+        if periodic_fuzzy_delay is None:
+            periodic_fuzzy_delay = CONF.periodic_fuzzy_delay
+        service_obj = cls(host, binary, topic, manager,
+                          report_interval=report_interval,
+                          periodic_interval=periodic_interval,
+                          periodic_fuzzy_delay=periodic_fuzzy_delay)
+
+        return service_obj
+
+    def kill(self):
+        """Destroy the service object."""
+        self.stop()
+
+    def stop(self):
+        super(Service, self).stop()
+        for x in self.timers:
+            try:
+                x.stop()
+            except Exception:
+                LOG.exception("exception occurs when timer stops")
+                pass
+        self.timers = []
+
+    def wait(self):
+        super(Service, self).wait()
+        for x in self.timers:
+            try:
+                x.wait()
+            except Exception:
+                LOG.exception("exception occurs when waiting for timer")
+                pass
+
+    def periodic_tasks(self, raise_on_error=False):
+        """Tasks to be run at a periodic interval."""
+        ctxt = context.get_admin_context()
+        self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
+
+    def report_state(self):
+        """Update the state of this service."""
+        # Todo(gongysh) report state to quantum server
+        pass
index 339e7c07ba1b59aef093d407b114201d39c35259..049cb072076e591d88c794f99b65abc24d3febc4 100644 (file)
@@ -445,7 +445,7 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase):
             self._delete('subnets', subnet['subnet']['id'])
 
     @contextlib.contextmanager
-    def port(self, subnet=None, fixed_ips=None, fmt='json', no_delete=False,
+    def port(self, subnet=None, fmt='json', no_delete=False,
              **kwargs):
         if not subnet:
             with self.subnet() as subnet:
index 44f16ba9d708b3bc071a8b2b201080e595b8c174..dc474c5bae96e71df187f031333ee0545e08b226 100644 (file)
 #    under the License.
 
 import copy
-import time
-import unittest
+import unittest2
 
 import mock
 
-from quantum.agent.common import config
 from quantum.agent import l3_agent
 from quantum.agent.linux import interface
-from quantum.db import l3_db
+from quantum.common import config as base_config
+from quantum.common import constants as l3_constants
+from quantum.openstack.common import cfg
 from quantum.openstack.common import uuidutils
 
 
 _uuid = uuidutils.generate_uuid
+HOSTNAME = 'myhost'
 
 
-class TestBasicRouterOperations(unittest.TestCase):
+class TestBasicRouterOperations(unittest2.TestCase):
 
     def setUp(self):
-        self.conf = config.setup_conf()
+        self.conf = cfg.CommonConfigOpts()
+        self.conf.register_opts(base_config.core_opts)
         self.conf.register_opts(l3_agent.L3NATAgent.OPTS)
         self.conf.register_opts(interface.OPTS)
         self.conf.set_override('interface_driver',
@@ -65,14 +67,15 @@ class TestBasicRouterOperations(unittest.TestCase):
         self.mock_ip = mock.MagicMock()
         ip_cls.return_value = self.mock_ip
 
-        self.client_cls_p = mock.patch('quantumclient.v2_0.client.Client')
-        client_cls = self.client_cls_p.start()
-        self.client_inst = mock.Mock()
-        client_cls.return_value = self.client_inst
+        self.l3pluginApi_cls_p = mock.patch(
+            'quantum.agent.l3_agent.L3PluginApi')
+        l3pluginApi_cls = self.l3pluginApi_cls_p.start()
+        self.plugin_api = mock.Mock()
+        l3pluginApi_cls.return_value = self.plugin_api
 
     def tearDown(self):
         self.device_exists_p.stop()
-        self.client_cls_p.stop()
+        self.l3pluginApi_cls_p.stop()
         self.ip_cls_p.stop()
         self.dvr_cls_p.stop()
         self.utils_exec_p.stop()
@@ -86,7 +89,7 @@ class TestBasicRouterOperations(unittest.TestCase):
         self.assertTrue(ri.ns_name().endswith(id))
 
     def testAgentCreate(self):
-        agent = l3_agent.L3NATAgent(self.conf)
+        agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
 
     def _test_internal_network_action(self, action):
         port_id = _uuid()
@@ -94,7 +97,7 @@ class TestBasicRouterOperations(unittest.TestCase):
         network_id = _uuid()
         ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
                                  self.conf.use_namespaces)
-        agent = l3_agent.L3NATAgent(self.conf)
+        agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         interface_name = agent.get_internal_device_name(port_id)
         cidr = '99.0.1.9/24'
         mac = 'ca:fe:de:ad:be:ef'
@@ -123,7 +126,7 @@ class TestBasicRouterOperations(unittest.TestCase):
         router_id = _uuid()
         ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
                                  self.conf.use_namespaces)
-        agent = l3_agent.L3NATAgent(self.conf)
+        agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         internal_cidrs = ['100.0.1.0/24', '200.74.0.0/16']
         ex_gw_port = {'fixed_ips': [{'ip_address': '20.0.0.30',
                                      'subnet_id': _uuid()}],
@@ -167,7 +170,7 @@ class TestBasicRouterOperations(unittest.TestCase):
         router_id = _uuid()
         ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
                                  self.conf.use_namespaces)
-        agent = l3_agent.L3NATAgent(self.conf)
+        agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         floating_ip = '20.0.0.100'
         fixed_ip = '10.0.0.23'
         ex_gw_port = {'fixed_ips': [{'ip_address': '20.0.0.30',
@@ -206,101 +209,79 @@ class TestBasicRouterOperations(unittest.TestCase):
 
     def testProcessRouter(self):
 
-        agent = l3_agent.L3NATAgent(self.conf)
+        agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         router_id = _uuid()
-        ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
-                                 self.conf.use_namespaces)
-
-        # return data so that state is built up
         ex_gw_port = {'id': _uuid(),
                       'network_id': _uuid(),
                       'fixed_ips': [{'ip_address': '19.4.4.4',
-                                     'subnet_id': _uuid()}]}
+                                     'subnet_id': _uuid()}],
+                      'subnet': {'cidr': '19.4.4.0/24',
+                                 'gateway_ip': '19.4.4.1'}}
         internal_port = {'id': _uuid(),
                          'network_id': _uuid(),
                          'admin_state_up': True,
                          'fixed_ips': [{'ip_address': '35.4.4.4',
                                         'subnet_id': _uuid()}],
-                         'mac_address': 'ca:fe:de:ad:be:ef'}
-
-        def fake_list_ports1(**kwargs):
-            if kwargs['device_owner'] == l3_db.DEVICE_OWNER_ROUTER_GW:
-                return {'ports': [ex_gw_port]}
-            elif kwargs['device_owner'] == l3_db.DEVICE_OWNER_ROUTER_INTF:
-                return {'ports': [internal_port]}
-
-        fake_subnet = {'subnet': {'cidr': '19.4.4.0/24',
-                                  'gateway_ip': '19.4.4.1'}}
+                         'mac_address': 'ca:fe:de:ad:be:ef',
+                         'subnet': {'cidr': '35.4.4.0/24',
+                                    'gateway_ip': '35.4.4.1'}}
 
         fake_floatingips1 = {'floatingips': [
             {'id': _uuid(),
              'floating_ip_address': '8.8.8.8',
              'fixed_ip_address': '7.7.7.7',
              'port_id': _uuid()}]}
-
-        self.client_inst.list_ports.side_effect = fake_list_ports1
-        self.client_inst.show_subnet.return_value = fake_subnet
-        self.client_inst.list_floatingips.return_value = fake_floatingips1
+        router = {
+            'id': router_id,
+            l3_constants.FLOATINGIP_KEY: fake_floatingips1['floatingips'],
+            l3_constants.INTERFACE_KEY: [internal_port],
+            'gw_port': ex_gw_port}
+        ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
+                                 self.conf.use_namespaces, router=router)
         agent.process_router(ri)
 
         # remap floating IP to a new fixed ip
         fake_floatingips2 = copy.deepcopy(fake_floatingips1)
         fake_floatingips2['floatingips'][0]['fixed_ip_address'] = '7.7.7.8'
-        self.client_inst.list_floatingips.return_value = fake_floatingips2
+        router[l3_constants.FLOATINGIP_KEY] = fake_floatingips2['floatingips']
         agent.process_router(ri)
 
         # remove just the floating ips
-        self.client_inst.list_floatingips.return_value = {'floatingips': []}
+        del router[l3_constants.FLOATINGIP_KEY]
         agent.process_router(ri)
 
-        # now return no ports so state is torn down
-        self.client_inst.list_ports.return_value = {'ports': []}
+        # now no ports so state is torn down
+        del router[l3_constants.INTERFACE_KEY]
+        del router['gw_port']
         agent.process_router(ri)
 
-    def testSingleLoopRouterRemoval(self):
-        agent = l3_agent.L3NATAgent(self.conf)
-        router_id = _uuid()
-
-        self.client_inst.list_ports.return_value = {'ports': []}
+    def testRoutersWithAdminStateDown(self):
+        agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
+        self.plugin_api.get_external_network_id.return_value = None
 
-        self.client_inst.list_networks.return_value = {'networks': []}
+        routers = [
+            {'id': _uuid(),
+             'admin_state_up': False,
+             'external_gateway_info': {}}]
+        agent._process_routers(routers)
+        self.assertNotIn(routers[0]['id'], agent.router_info)
 
-        self.client_inst.list_routers.return_value = {'routers': [
-            {'id': router_id,
+    def testSingleLoopRouterRemoval(self):
+        agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
+        self.plugin_api.get_external_network_id.return_value = None
+        routers = [
+            {'id': _uuid(),
              'admin_state_up': True,
-             'external_gateway_info': {}}]}
-        agent.do_single_loop()
-
-        self.client_inst.list_routers.return_value = {'routers': []}
-        agent.do_single_loop()
-        self.external_process.assert_has_calls(
-            [mock.call(agent.conf, router_id, 'sudo', 'qrouter-' + router_id),
-             mock.call().enable(mock.ANY),
-             mock.call(agent.conf, router_id, 'sudo', 'qrouter-' + router_id),
-             mock.call().disable()])
+             'external_gateway_info': {}}]
+        agent._process_routers(routers)
 
+        agent.router_deleted(None, routers[0]['id'])
         # verify that remove is called
         self.assertEquals(self.mock_ip.get_devices.call_count, 1)
 
         self.device_exists.assert_has_calls(
             [mock.call(self.conf.external_network_bridge)])
 
-    def testDaemonLoop(self):
-
-        # just take a pass through the loop, then raise on time.sleep()
-        time_sleep_p = mock.patch('time.sleep')
-        time_sleep = time_sleep_p.start()
-
-        class ExpectedException(Exception):
-            pass
-
-        time_sleep.side_effect = ExpectedException()
-
-        agent = l3_agent.L3NATAgent(self.conf)
-        self.assertRaises(ExpectedException, agent.daemon_loop)
-
-        time_sleep_p.stop()
-
     def testDestroyNamespace(self):
 
         class FakeDev(object):
@@ -311,16 +292,5 @@ class TestBasicRouterOperations(unittest.TestCase):
         self.mock_ip.get_devices.return_value = [FakeDev('qr-aaaa'),
                                                  FakeDev('qgw-aaaa')]
 
-        agent = l3_agent.L3NATAgent(self.conf)
+        agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         agent._destroy_all_router_namespaces()
-
-    def testMain(self):
-        agent_mock_p = mock.patch('quantum.agent.l3_agent.L3NATAgent')
-        agent_mock = agent_mock_p.start()
-        agent_mock.daemon_loop.return_value = None
-        with mock.patch('quantum.agent.common.config.setup_logging'):
-            with mock.patch('quantum.agent.l3_agent.sys') as mock_sys:
-                mock_sys.argv = []
-                l3_agent.main()
-
-        agent_mock_p.stop()
index 5f252c3cf683e41f0c48883f919264f74d8d7391..556716cc2d3531bad1762d6683efd7f355f4b306 100644 (file)
@@ -30,11 +30,13 @@ import webtest
 from quantum.api import extensions
 from quantum.api.v2 import attributes
 from quantum.common import config
+from quantum.common import constants as l3_constants
 from quantum.common import exceptions as q_exc
 from quantum.common.test_lib import test_config
 from quantum import context
 from quantum.db import db_base_plugin_v2
 from quantum.db import l3_db
+from quantum.db import l3_rpc_agent_api
 from quantum.db import models_v2
 from quantum.extensions import l3
 from quantum import manager
@@ -68,7 +70,6 @@ class L3NatExtensionTestCase(unittest.TestCase):
     def setUp(self):
 
         plugin = 'quantum.extensions.l3.RouterPluginBase'
-
         # Ensure 'stale' patched copies of the plugin are never returned
         manager.QuantumManager._instance = None
 
@@ -89,7 +90,6 @@ class L3NatExtensionTestCase(unittest.TestCase):
 
         self._plugin_patcher = mock.patch(plugin, autospec=True)
         self.plugin = self._plugin_patcher.start()
-
         # Instantiate mock plugin and enable the 'router' extension
         manager.QuantumManager.get_plugin().supported_extension_aliases = (
             ["router"])
@@ -118,9 +118,7 @@ class L3NatExtensionTestCase(unittest.TestCase):
         instance = self.plugin.return_value
         instance.create_router.return_value = return_value
         instance.get_routers_count.return_value = 0
-
         res = self.api.post_json(_get_path('routers'), data)
-
         instance.create_router.assert_called_with(mock.ANY,
                                                   router=data)
         self.assertEqual(res.status_int, exc.HTTPCreated.code)
@@ -1194,3 +1192,146 @@ class L3NatDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
         with self.network(router__external=True) as ext_net:
             self.assertEqual(ext_net['network'][l3.EXTERNAL],
                              True)
+
+    def _test_notify_op_agent(self, target_func, *args):
+        l3_rpc_agent_api_str = (
+            'quantum.db.l3_rpc_agent_api.L3AgentNotifyAPI')
+        oldNotify = l3_rpc_agent_api.L3AgentNofity
+        try:
+            with mock.patch(l3_rpc_agent_api_str) as notifyApi:
+                l3_rpc_agent_api.L3AgentNofity = notifyApi
+                kargs = [item for item in args]
+                kargs.append(notifyApi)
+                target_func(*kargs)
+        except:
+            l3_rpc_agent_api.L3AgentNofity = oldNotify
+            raise
+        else:
+            l3_rpc_agent_api.L3AgentNofity = oldNotify
+
+    def _test_router_gateway_op_agent(self, notifyApi):
+        with self.router() as r:
+            with self.subnet() as s:
+                self._set_net_external(s['subnet']['network_id'])
+                self._add_external_gateway_to_router(
+                    r['router']['id'],
+                    s['subnet']['network_id'])
+                self._remove_external_gateway_from_router(
+                    r['router']['id'],
+                    s['subnet']['network_id'])
+                self.assertEquals(
+                    2, notifyApi.routers_updated.call_count)
+
+    def test_router_gateway_op_agent(self):
+        self._test_notify_op_agent(self._test_router_gateway_op_agent)
+
+    def _test_interfaces_op_agent(self, r, notifyApi):
+        with self.port(no_delete=True) as p:
+            self._router_interface_action('add',
+                                          r['router']['id'],
+                                          None,
+                                          p['port']['id'])
+            # clean-up
+            self._router_interface_action('remove',
+                                          r['router']['id'],
+                                          None,
+                                          p['port']['id'])
+        self.assertEquals(2, notifyApi.routers_updated.call_count)
+
+    def test_interfaces_op_agent(self):
+        with self.router() as r:
+            self._test_notify_op_agent(
+                self._test_interfaces_op_agent, r)
+
+    def _test_floatingips_op_agent(self, notifyApi):
+        with self.floatingip_with_assoc() as fip:
+            pass
+        # add gateway, add interface, associate, deletion of floatingip,
+        # delete gateway, delete interface
+        self.assertEquals(6, notifyApi.routers_updated.call_count)
+
+    def test_floatingips_op_agent(self):
+        self._test_notify_op_agent(self._test_floatingips_op_agent)
+
+    def test_l3_agent_routers_query_interfaces(self):
+        with self.router() as r:
+            with self.port(no_delete=True) as p:
+                self._router_interface_action('add',
+                                              r['router']['id'],
+                                              None,
+                                              p['port']['id'])
+
+                plugin = TestL3NatPlugin()
+                routers = plugin.get_sync_data(context.get_admin_context(),
+                                               None)
+                self.assertEqual(1, len(routers))
+                interfaces = routers[0][l3_constants.INTERFACE_KEY]
+                self.assertEqual(1, len(interfaces))
+                subnet_id = interfaces[0]['subnet']['id']
+                wanted_subnetid = p['port']['fixed_ips'][0]['subnet_id']
+                self.assertEqual(wanted_subnetid, subnet_id)
+                # clean-up
+                self._router_interface_action('remove',
+                                              r['router']['id'],
+                                              None,
+                                              p['port']['id'])
+
+    def test_l3_agent_routers_query_ignore_interfaces_with_moreThanOneIp(self):
+        with self.router() as r:
+            with self.subnet(cidr='9.0.1.0/24') as subnet:
+                with self.port(subnet=subnet,
+                               no_delete=True,
+                               fixed_ips=[{'ip_address': '9.0.1.3'}]) as p:
+                    self._router_interface_action('add',
+                                                  r['router']['id'],
+                                                  None,
+                                                  p['port']['id'])
+                    port = {'port': {'fixed_ips':
+                                     [{'ip_address': '9.0.1.4',
+                                       'subnet_id': subnet['subnet']['id']},
+                                      {'ip_address': '9.0.1.5',
+                                       'subnet_id': subnet['subnet']['id']}]}}
+                    plugin = TestL3NatPlugin()
+                    ctx = context.get_admin_context()
+                    plugin.update_port(ctx, p['port']['id'], port)
+                    routers = plugin.get_sync_data(ctx, None)
+                    self.assertEqual(1, len(routers))
+                    interfaces = routers[0].get(l3_constants.INTERFACE_KEY, [])
+                    self.assertEqual(0, len(interfaces))
+                    # clean-up
+                    self._router_interface_action('remove',
+                                                  r['router']['id'],
+                                                  None,
+                                                  p['port']['id'])
+
+    def test_l3_agent_routers_query_gateway(self):
+        with self.router() as r:
+            with self.subnet() as s:
+                self._set_net_external(s['subnet']['network_id'])
+                self._add_external_gateway_to_router(
+                    r['router']['id'],
+                    s['subnet']['network_id'])
+                plugin = TestL3NatPlugin()
+                routers = plugin.get_sync_data(context.get_admin_context(),
+                                               [r['router']['id']])
+                self.assertEqual(1, len(routers))
+                gw_port = routers[0]['gw_port']
+                self.assertEquals(s['subnet']['id'], gw_port['subnet']['id'])
+                self._remove_external_gateway_from_router(
+                    r['router']['id'],
+                    s['subnet']['network_id'])
+
+    def test_l3_agent_routers_query_floatingips(self):
+        with self.floatingip_with_assoc() as fip:
+            plugin = TestL3NatPlugin()
+            routers = plugin.get_sync_data(context.get_admin_context(),
+                                           [fip['floatingip']['router_id']])
+            self.assertEqual(1, len(routers))
+            floatingips = routers[0][l3_constants.FLOATINGIP_KEY]
+            self.assertEqual(1, len(floatingips))
+            self.assertEquals(floatingips[0]['id'],
+                              fip['floatingip']['id'])
+            self.assertEquals(floatingips[0]['port_id'],
+                              fip['floatingip']['port_id'])
+            self.assertTrue(floatingips[0]['fixed_ip_address'] is not None)
+            self.assertTrue(floatingips[0]['router_id'] is not None)