]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Move classes out of l3_agent.py
authorCarl Baldwin <carl.baldwin@hp.com>
Mon, 1 Dec 2014 21:49:10 +0000 (16:49 -0500)
committerBrian Haley <brian.haley@hp.com>
Wed, 10 Dec 2014 22:19:28 +0000 (15:19 -0700)
The file l3_agent.py has become too large.  This patch is a simple
pure refactor to move some of the functionality in to other files
where things aren't too tangled up.  There is no functional change
with this patch and I avoided gratuitous other fixups in this patch in
order to make it easier to review.

I plan to follow up on the new l3_dvr and l3_agent_router modules with
more restructuring in the near future.

Partially-Implements: bp restructure-l3-agent

Change-Id: I3529fe4146c50c940f41eb26d0b5efc5870b3af9

14 files changed:
neutron/agent/l3/__init__.py [new file with mode: 0644]
neutron/agent/l3/agent.py [moved from neutron/agent/l3_agent.py with 86% similarity]
neutron/agent/l3/ha.py [moved from neutron/agent/l3_ha_agent.py with 100% similarity]
neutron/agent/l3/link_local_allocator.py [new file with mode: 0644]
neutron/agent/l3/router_info.py [new file with mode: 0644]
neutron/agent/l3/router_processing_queue.py [new file with mode: 0644]
neutron/agent/netns_cleanup_util.py
neutron/agent/ovs_cleanup_util.py
neutron/tests/common/agents/l3_agent.py
neutron/tests/functional/agent/test_l3_agent.py
neutron/tests/unit/test_l3_agent.py
neutron/tests/unit/test_l3_dvr.py [new file with mode: 0644]
neutron/tests/unit/test_router_processing_queue.py [new file with mode: 0644]
setup.cfg

diff --git a/neutron/agent/l3/__init__.py b/neutron/agent/l3/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
similarity index 86%
rename from neutron/agent/l3_agent.py
rename to neutron/agent/l3/agent.py
index acd4d67b2c6da8398dbb1fda9a3dd3f473239773..9ddf430f92a7bcad4decc34179b2b7db405cd309 100644 (file)
@@ -15,7 +15,6 @@
 
 import sys
 
-import datetime
 import eventlet
 eventlet.monkey_patch()
 
@@ -26,10 +25,12 @@ from oslo import messaging
 from oslo.utils import excutils
 from oslo.utils import importutils
 from oslo.utils import timeutils
-import Queue
 
 from neutron.agent.common import config
-from neutron.agent import l3_ha_agent
+from neutron.agent.l3 import ha
+from neutron.agent.l3 import link_local_allocator as lla
+from neutron.agent.l3 import router_info
+from neutron.agent.l3 import router_processing_queue as queue
 from neutron.agent.linux import external_process
 from neutron.agent.linux import interface
 from neutron.agent.linux import ip_lib
@@ -77,10 +78,6 @@ FIP_PR_START = 32768
 FIP_PR_END = FIP_PR_START + 40000
 RPC_LOOP_INTERVAL = 1
 FLOATING_IP_CIDR_SUFFIX = '/32'
-# Lower value is higher priority
-PRIORITY_RPC = 0
-PRIORITY_SYNC_ROUTERS_TASK = 1
-DELETE_ROUTER = 1
 
 
 class L3PluginApi(object):
@@ -142,295 +139,8 @@ class L3PluginApi(object):
         return cctxt.call(context, 'get_service_plugin_list')
 
 
-class LinkLocalAddressPair(netaddr.IPNetwork):
-    def __init__(self, addr):
-        super(LinkLocalAddressPair, self).__init__(addr)
-
-    def get_pair(self):
-        """Builds an address pair from the first and last addresses. """
-        return (netaddr.IPNetwork("%s/%s" % (self.network, self.prefixlen)),
-                netaddr.IPNetwork("%s/%s" % (self.broadcast, self.prefixlen)))
-
-
-class LinkLocalAllocator(object):
-    """Manages allocation of link local IP addresses.
-
-    These link local addresses are used for routing inside the fip namespaces.
-    The associations need to persist across agent restarts to maintain
-    consistency.  Without this, there is disruption in network connectivity
-    as the agent rewires the connections with the new IP address assocations.
-
-    Persisting these in the database is unnecessary and would degrade
-    performance.
-    """
-    def __init__(self, state_file, subnet):
-        """Read the file with previous allocations recorded.
-
-        See the note in the allocate method for more detail.
-        """
-        self.state_file = state_file
-        subnet = netaddr.IPNetwork(subnet)
-
-        self.allocations = {}
-
-        self.remembered = {}
-        for line in self._read():
-            key, cidr = line.strip().split(',')
-            self.remembered[key] = LinkLocalAddressPair(cidr)
-
-        self.pool = set(LinkLocalAddressPair(s) for s in subnet.subnet(31))
-        self.pool.difference_update(self.remembered.values())
-
-    def allocate(self, key):
-        """Try to allocate a link local address pair.
-
-        I expect this to work in all cases because I expect the pool size to be
-        large enough for any situation.  Nonetheless, there is some defensive
-        programming in here.
-
-        Since the allocations are persisted, there is the chance to leak
-        allocations which should have been released but were not.  This leak
-        could eventually exhaust the pool.
-
-        So, if a new allocation is needed, the code first checks to see if
-        there are any remembered allocations for the key.  If not, it checks
-        the free pool.  If the free pool is empty then it dumps the remembered
-        allocations to free the pool.  This final desparate step will not
-        happen often in practice.
-        """
-        if key in self.remembered:
-            self.allocations[key] = self.remembered.pop(key)
-            return self.allocations[key]
-
-        if not self.pool:
-            # Desparate times.  Try to get more in the pool.
-            self.pool.update(self.remembered.values())
-            self.remembered.clear()
-            if not self.pool:
-                # More than 256 routers on a compute node!
-                raise RuntimeError(_("Cannot allocate link local address"))
-
-        self.allocations[key] = self.pool.pop()
-        self._write_allocations()
-        return self.allocations[key]
-
-    def release(self, key):
-        self.pool.add(self.allocations.pop(key))
-        self._write_allocations()
-
-    def _write_allocations(self):
-        current = ["%s,%s\n" % (k, v) for k, v in self.allocations.items()]
-        remembered = ["%s,%s\n" % (k, v) for k, v in self.remembered.items()]
-        current.extend(remembered)
-        self._write(current)
-
-    def _write(self, lines):
-        with open(self.state_file, "w") as f:
-            f.writelines(lines)
-
-    def _read(self):
-        if not os.path.exists(self.state_file):
-            return []
-        with open(self.state_file) as f:
-            return f.readlines()
-
-
-class RouterInfo(l3_ha_agent.RouterMixin):
-
-    def __init__(self, router_id, root_helper, router,
-                 use_ipv6=False, ns_name=None):
-        self.router_id = router_id
-        self.ex_gw_port = None
-        self._snat_enabled = None
-        self._snat_action = None
-        self.internal_ports = []
-        self.snat_ports = []
-        self.floating_ips = set()
-        self.floating_ips_dict = {}
-        self.root_helper = root_helper
-        # Invoke the setter for establishing initial SNAT action
-        self.router = router
-        self.ns_name = ns_name
-        self.iptables_manager = iptables_manager.IptablesManager(
-            root_helper=root_helper,
-            use_ipv6=use_ipv6,
-            namespace=self.ns_name)
-        self.snat_iptables_manager = None
-        self.routes = []
-        # DVR Data
-        # Linklocal subnet for router and floating IP namespace link
-        self.rtr_fip_subnet = None
-        self.dist_fip_count = 0
-
-        super(RouterInfo, self).__init__()
-
-    @property
-    def router(self):
-        return self._router
-
-    @router.setter
-    def router(self, value):
-        self._router = value
-        if not self._router:
-            return
-        # enable_snat by default if it wasn't specified by plugin
-        self._snat_enabled = self._router.get('enable_snat', True)
-        # Set a SNAT action for the router
-        if self._router.get('gw_port'):
-            self._snat_action = ('add_rules' if self._snat_enabled
-                                 else 'remove_rules')
-        elif self.ex_gw_port:
-            # Gateway port was removed, remove rules
-            self._snat_action = 'remove_rules'
-
-    def perform_snat_action(self, snat_callback, *args):
-        # Process SNAT rules for attached subnets
-        if self._snat_action:
-            snat_callback(self, self._router.get('gw_port'),
-                          *args, action=self._snat_action)
-        self._snat_action = None
-
-
-class RouterUpdate(object):
-    """Encapsulates a router update
-
-    An instance of this object carries the information necessary to prioritize
-    and process a request to update a router.
-    """
-    def __init__(self, router_id, priority,
-                 action=None, router=None, timestamp=None):
-        self.priority = priority
-        self.timestamp = timestamp
-        if not timestamp:
-            self.timestamp = timeutils.utcnow()
-        self.id = router_id
-        self.action = action
-        self.router = router
-
-    def __lt__(self, other):
-        """Implements priority among updates
-
-        Lower numerical priority always gets precedence.  When comparing two
-        updates of the same priority then the one with the earlier timestamp
-        gets procedence.  In the unlikely event that the timestamps are also
-        equal it falls back to a simple comparison of ids meaning the
-        precedence is essentially random.
-        """
-        if self.priority != other.priority:
-            return self.priority < other.priority
-        if self.timestamp != other.timestamp:
-            return self.timestamp < other.timestamp
-        return self.id < other.id
-
-
-class ExclusiveRouterProcessor(object):
-    """Manager for access to a router for processing
-
-    This class controls access to a router in a non-blocking way.  The first
-    instance to be created for a given router_id is granted exclusive access to
-    the router.
-
-    Other instances may be created for the same router_id while the first
-    instance has exclusive access.  If that happens then it doesn't block and
-    wait for access.  Instead, it signals to the master instance that an update
-    came in with the timestamp.
-
-    This way, a thread will not block to wait for access to a router.  Instead
-    it effectively signals to the thread that is working on the router that
-    something has changed since it started working on it.  That thread will
-    simply finish its current iteration and then repeat.
-
-    This class keeps track of the last time that a router data was fetched and
-    processed.  The timestamp that it keeps must be before when the data used
-    to process the router last was fetched from the database.  But, as close as
-    possible.  The timestamp should not be recorded, however, until the router
-    has been processed using the fetch data.
-    """
-    _masters = {}
-    _router_timestamps = {}
-
-    def __init__(self, router_id):
-        self._router_id = router_id
-
-        if router_id not in self._masters:
-            self._masters[router_id] = self
-            self._queue = []
-
-        self._master = self._masters[router_id]
-
-    def _i_am_master(self):
-        return self == self._master
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, type, value, traceback):
-        if self._i_am_master():
-            del self._masters[self._router_id]
-
-    def _get_router_data_timestamp(self):
-        return self._router_timestamps.get(self._router_id,
-                                           datetime.datetime.min)
-
-    def fetched_and_processed(self, timestamp):
-        """Records the data timestamp after it is used to update the router"""
-        new_timestamp = max(timestamp, self._get_router_data_timestamp())
-        self._router_timestamps[self._router_id] = new_timestamp
-
-    def queue_update(self, update):
-        """Queues an update from a worker
-
-        This is the queue used to keep new updates that come in while a router
-        is being processed.  These updates have already bubbled to the front of
-        the RouterProcessingQueue.
-        """
-        self._master._queue.append(update)
-
-    def updates(self):
-        """Processes the router until updates stop coming
-
-        Only the master instance will process the router.  However, updates may
-        come in from other workers while it is in progress.  This method loops
-        until they stop coming.
-        """
-        if self._i_am_master():
-            while self._queue:
-                # Remove the update from the queue even if it is old.
-                update = self._queue.pop(0)
-                # Process the update only if it is fresh.
-                if self._get_router_data_timestamp() < update.timestamp:
-                    yield update
-
-
-class RouterProcessingQueue(object):
-    """Manager of the queue of routers to process."""
-    def __init__(self):
-        self._queue = Queue.PriorityQueue()
-
-    def add(self, update):
-        self._queue.put(update)
-
-    def each_update_to_next_router(self):
-        """Grabs the next router from the queue and processes
-
-        This method uses a for loop to process the router repeatedly until
-        updates stop bubbling to the front of the queue.
-        """
-        next_update = self._queue.get()
-
-        with ExclusiveRouterProcessor(next_update.id) as rp:
-            # Queue the update whether this worker is the master or not.
-            rp.queue_update(next_update)
-
-            # Here, if the current worker is not the master, the call to
-            # rp.updates() will not yield and so this will essentially be a
-            # noop.
-            for update in rp.updates():
-                yield (rp, update)
-
-
 class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
-                 l3_ha_agent.AgentMixin,
+                 ha.AgentMixin,
                  manager.Manager):
     """Manager for L3NatAgent
 
@@ -554,12 +264,12 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
         # dvr data
         self.agent_gateway_port = None
         self.fip_ns_subscribers = set()
-        self.local_subnets = LinkLocalAllocator(
+        self.local_subnets = lla.LinkLocalAllocator(
             os.path.join(self.conf.state_path, 'fip-linklocal-networks'),
             FIP_LL_SUBNET)
         self.fip_priorities = set(range(FIP_PR_START, FIP_PR_END))
 
-        self._queue = RouterProcessingQueue()
+        self._queue = queue.RouterProcessingQueue()
         super(L3NATAgent, self).__init__(conf=self.conf)
 
         self.target_ex_net_id = None
@@ -745,11 +455,11 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
     def _router_added(self, router_id, router):
         ns_name = (self.get_ns_name(router_id)
                    if self.conf.use_namespaces else None)
-        ri = RouterInfo(router_id=router_id,
-                        root_helper=self.root_helper,
-                        router=router,
-                        use_ipv6=self.use_ipv6,
-                        ns_name=ns_name)
+        ri = router_info.RouterInfo(router_id=router_id,
+                                    root_helper=self.root_helper,
+                                    router=router,
+                                    use_ipv6=self.use_ipv6,
+                                    ns_name=ns_name)
         self.router_info[router_id] = ri
         if self.conf.use_namespaces:
             self._create_router_namespace(ri)
@@ -1698,7 +1408,9 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
     def router_deleted(self, context, router_id):
         """Deal with router deletion RPC message."""
         LOG.debug('Got router deleted notification for %s', router_id)
-        update = RouterUpdate(router_id, PRIORITY_RPC, action=DELETE_ROUTER)
+        update = queue.RouterUpdate(router_id,
+                                    queue.PRIORITY_RPC,
+                                    action=queue.DELETE_ROUTER)
         self._queue.add(update)
 
     def _update_arp_entry(self, ri, ip, mac, subnet_id, operation):
@@ -1751,13 +1463,15 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
             if isinstance(routers[0], dict):
                 routers = [router['id'] for router in routers]
             for id in routers:
-                update = RouterUpdate(id, PRIORITY_RPC)
+                update = queue.RouterUpdate(id, queue.PRIORITY_RPC)
                 self._queue.add(update)
 
     def router_removed_from_agent(self, context, payload):
         LOG.debug('Got router removed from agent :%r', payload)
         router_id = payload['router_id']
-        update = RouterUpdate(router_id, PRIORITY_RPC, action=DELETE_ROUTER)
+        update = queue.RouterUpdate(router_id,
+                                    queue.PRIORITY_RPC,
+                                    action=queue.DELETE_ROUTER)
         self._queue.add(update)
 
     def router_added_to_agent(self, context, payload):
@@ -1801,7 +1515,7 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
         for rp, update in self._queue.each_update_to_next_router():
             LOG.debug("Starting router update for %s", update.id)
             router = update.router
-            if update.action != DELETE_ROUTER and not router:
+            if update.action != queue.DELETE_ROUTER and not router:
                 try:
                     update.timestamp = timeutils.utcnow()
                     routers = self.plugin_rpc.get_routers(self.context,
@@ -1870,10 +1584,10 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
         else:
             LOG.debug('Processing :%r', routers)
             for r in routers:
-                update = RouterUpdate(r['id'],
-                                      PRIORITY_SYNC_ROUTERS_TASK,
-                                      router=r,
-                                      timestamp=timestamp)
+                update = queue.RouterUpdate(r['id'],
+                                            queue.PRIORITY_SYNC_ROUTERS_TASK,
+                                            router=r,
+                                            timestamp=timestamp)
                 self._queue.add(update)
             self.fullsync = False
             LOG.debug("periodic_sync_routers_task successfully completed")
@@ -1884,10 +1598,10 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
             # Two kinds of stale routers:  Routers for which info is cached in
             # self.router_info and the others.  First, handle the former.
             for router_id in prev_router_ids - curr_router_ids:
-                update = RouterUpdate(router_id,
-                                      PRIORITY_SYNC_ROUTERS_TASK,
-                                      timestamp=timestamp,
-                                      action=DELETE_ROUTER)
+                update = queue.RouterUpdate(router_id,
+                                            queue.PRIORITY_SYNC_ROUTERS_TASK,
+                                            timestamp=timestamp,
+                                            action=queue.DELETE_ROUTER)
                 self._queue.add(update)
 
             # Next, one effort to clean out namespaces for which we don't have
@@ -2001,7 +1715,7 @@ class L3NATAgentWithStateReport(L3NATAgent):
 
 def _register_opts(conf):
     conf.register_opts(L3NATAgent.OPTS)
-    conf.register_opts(l3_ha_agent.OPTS)
+    conf.register_opts(ha.OPTS)
     config.register_interface_driver_opts_helper(conf)
     config.register_use_namespaces_opts_helper(conf)
     config.register_agent_state_opts_helper(conf)
@@ -2010,7 +1724,7 @@ def _register_opts(conf):
     conf.register_opts(external_process.OPTS)
 
 
-def main(manager='neutron.agent.l3_agent.L3NATAgentWithStateReport'):
+def main(manager='neutron.agent.l3.agent.L3NATAgentWithStateReport'):
     _register_opts(cfg.CONF)
     common_config.init(sys.argv[1:])
     config.setup_logging()
diff --git a/neutron/agent/l3/link_local_allocator.py b/neutron/agent/l3/link_local_allocator.py
new file mode 100644 (file)
index 0000000..594daa7
--- /dev/null
@@ -0,0 +1,109 @@
+# Copyright 2014 Hewlett-Packard Development Company, L.P.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import netaddr
+import os
+
+
+class LinkLocalAddressPair(netaddr.IPNetwork):
+    def __init__(self, addr):
+        super(LinkLocalAddressPair, self).__init__(addr)
+
+    def get_pair(self):
+        """Builds an address pair from the first and last addresses. """
+        return (netaddr.IPNetwork("%s/%s" % (self.network, self.prefixlen)),
+                netaddr.IPNetwork("%s/%s" % (self.broadcast, self.prefixlen)))
+
+
+class LinkLocalAllocator(object):
+    """Manages allocation of link local IP addresses.
+
+    These link local addresses are used for routing inside the fip namespaces.
+    The associations need to persist across agent restarts to maintain
+    consistency.  Without this, there is disruption in network connectivity
+    as the agent rewires the connections with the new IP address assocations.
+
+    Persisting these in the database is unnecessary and would degrade
+    performance.
+    """
+    def __init__(self, state_file, subnet):
+        """Read the file with previous allocations recorded.
+
+        See the note in the allocate method for more detail.
+        """
+        self.state_file = state_file
+        subnet = netaddr.IPNetwork(subnet)
+
+        self.allocations = {}
+
+        self.remembered = {}
+        for line in self._read():
+            key, cidr = line.strip().split(',')
+            self.remembered[key] = LinkLocalAddressPair(cidr)
+
+        self.pool = set(LinkLocalAddressPair(s) for s in subnet.subnet(31))
+        self.pool.difference_update(self.remembered.values())
+
+    def allocate(self, key):
+        """Try to allocate a link local address pair.
+
+        I expect this to work in all cases because I expect the pool size to be
+        large enough for any situation.  Nonetheless, there is some defensive
+        programming in here.
+
+        Since the allocations are persisted, there is the chance to leak
+        allocations which should have been released but were not.  This leak
+        could eventually exhaust the pool.
+
+        So, if a new allocation is needed, the code first checks to see if
+        there are any remembered allocations for the key.  If not, it checks
+        the free pool.  If the free pool is empty then it dumps the remembered
+        allocations to free the pool.  This final desparate step will not
+        happen often in practice.
+        """
+        if key in self.remembered:
+            self.allocations[key] = self.remembered.pop(key)
+            return self.allocations[key]
+
+        if not self.pool:
+            # Desparate times.  Try to get more in the pool.
+            self.pool.update(self.remembered.values())
+            self.remembered.clear()
+            if not self.pool:
+                # More than 256 routers on a compute node!
+                raise RuntimeError(_("Cannot allocate link local address"))
+
+        self.allocations[key] = self.pool.pop()
+        self._write_allocations()
+        return self.allocations[key]
+
+    def release(self, key):
+        self.pool.add(self.allocations.pop(key))
+        self._write_allocations()
+
+    def _write_allocations(self):
+        current = ["%s,%s\n" % (k, v) for k, v in self.allocations.items()]
+        remembered = ["%s,%s\n" % (k, v) for k, v in self.remembered.items()]
+        current.extend(remembered)
+        self._write(current)
+
+    def _write(self, lines):
+        with open(self.state_file, "w") as f:
+            f.writelines(lines)
+
+    def _read(self):
+        if not os.path.exists(self.state_file):
+            return []
+        with open(self.state_file) as f:
+            return f.readlines()
diff --git a/neutron/agent/l3/router_info.py b/neutron/agent/l3/router_info.py
new file mode 100644 (file)
index 0000000..2999ed8
--- /dev/null
@@ -0,0 +1,72 @@
+# Copyright (c) 2014 Openstack Foundation
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from neutron.agent.l3 import ha
+from neutron.agent.linux import iptables_manager
+
+
+class RouterInfo(ha.RouterMixin):
+
+    def __init__(self, router_id, root_helper, router,
+                 use_ipv6=False, ns_name=None):
+        self.router_id = router_id
+        self.ex_gw_port = None
+        self._snat_enabled = None
+        self._snat_action = None
+        self.internal_ports = []
+        self.snat_ports = []
+        self.floating_ips = set()
+        self.floating_ips_dict = {}
+        self.root_helper = root_helper
+        # Invoke the setter for establishing initial SNAT action
+        self.router = router
+        self.ns_name = ns_name
+        self.iptables_manager = iptables_manager.IptablesManager(
+            root_helper=root_helper,
+            use_ipv6=use_ipv6,
+            namespace=self.ns_name)
+        self.snat_iptables_manager = None
+        self.routes = []
+        # DVR Data
+        # Linklocal subnet for router and floating IP namespace link
+        self.rtr_fip_subnet = None
+        self.dist_fip_count = 0
+
+        super(RouterInfo, self).__init__()
+
+    @property
+    def router(self):
+        return self._router
+
+    @router.setter
+    def router(self, value):
+        self._router = value
+        if not self._router:
+            return
+        # enable_snat by default if it wasn't specified by plugin
+        self._snat_enabled = self._router.get('enable_snat', True)
+        # Set a SNAT action for the router
+        if self._router.get('gw_port'):
+            self._snat_action = ('add_rules' if self._snat_enabled
+                                 else 'remove_rules')
+        elif self.ex_gw_port:
+            # Gateway port was removed, remove rules
+            self._snat_action = 'remove_rules'
+
+    def perform_snat_action(self, snat_callback, *args):
+        # Process SNAT rules for attached subnets
+        if self._snat_action:
+            snat_callback(self, self._router.get('gw_port'),
+                          *args, action=self._snat_action)
+        self._snat_action = None
diff --git a/neutron/agent/l3/router_processing_queue.py b/neutron/agent/l3/router_processing_queue.py
new file mode 100644 (file)
index 0000000..a206437
--- /dev/null
@@ -0,0 +1,162 @@
+# Copyright 2014 Hewlett-Packard Development Company, L.P.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+
+import datetime
+import Queue
+
+from oslo.utils import timeutils
+
+# Lower value is higher priority
+PRIORITY_RPC = 0
+PRIORITY_SYNC_ROUTERS_TASK = 1
+DELETE_ROUTER = 1
+
+
+class RouterUpdate(object):
+    """Encapsulates a router update
+
+    An instance of this object carries the information necessary to prioritize
+    and process a request to update a router.
+    """
+    def __init__(self, router_id, priority,
+                 action=None, router=None, timestamp=None):
+        self.priority = priority
+        self.timestamp = timestamp
+        if not timestamp:
+            self.timestamp = timeutils.utcnow()
+        self.id = router_id
+        self.action = action
+        self.router = router
+
+    def __lt__(self, other):
+        """Implements priority among updates
+
+        Lower numerical priority always gets precedence.  When comparing two
+        updates of the same priority then the one with the earlier timestamp
+        gets procedence.  In the unlikely event that the timestamps are also
+        equal it falls back to a simple comparison of ids meaning the
+        precedence is essentially random.
+        """
+        if self.priority != other.priority:
+            return self.priority < other.priority
+        if self.timestamp != other.timestamp:
+            return self.timestamp < other.timestamp
+        return self.id < other.id
+
+
+class ExclusiveRouterProcessor(object):
+    """Manager for access to a router for processing
+
+    This class controls access to a router in a non-blocking way.  The first
+    instance to be created for a given router_id is granted exclusive access to
+    the router.
+
+    Other instances may be created for the same router_id while the first
+    instance has exclusive access.  If that happens then it doesn't block and
+    wait for access.  Instead, it signals to the master instance that an update
+    came in with the timestamp.
+
+    This way, a thread will not block to wait for access to a router.  Instead
+    it effectively signals to the thread that is working on the router that
+    something has changed since it started working on it.  That thread will
+    simply finish its current iteration and then repeat.
+
+    This class keeps track of the last time that a router data was fetched and
+    processed.  The timestamp that it keeps must be before when the data used
+    to process the router last was fetched from the database.  But, as close as
+    possible.  The timestamp should not be recorded, however, until the router
+    has been processed using the fetch data.
+    """
+    _masters = {}
+    _router_timestamps = {}
+
+    def __init__(self, router_id):
+        self._router_id = router_id
+
+        if router_id not in self._masters:
+            self._masters[router_id] = self
+            self._queue = []
+
+        self._master = self._masters[router_id]
+
+    def _i_am_master(self):
+        return self == self._master
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, type, value, traceback):
+        if self._i_am_master():
+            del self._masters[self._router_id]
+
+    def _get_router_data_timestamp(self):
+        return self._router_timestamps.get(self._router_id,
+                                           datetime.datetime.min)
+
+    def fetched_and_processed(self, timestamp):
+        """Records the data timestamp after it is used to update the router"""
+        new_timestamp = max(timestamp, self._get_router_data_timestamp())
+        self._router_timestamps[self._router_id] = new_timestamp
+
+    def queue_update(self, update):
+        """Queues an update from a worker
+
+        This is the queue used to keep new updates that come in while a router
+        is being processed.  These updates have already bubbled to the front of
+        the RouterProcessingQueue.
+        """
+        self._master._queue.append(update)
+
+    def updates(self):
+        """Processes the router until updates stop coming
+
+        Only the master instance will process the router.  However, updates may
+        come in from other workers while it is in progress.  This method loops
+        until they stop coming.
+        """
+        if self._i_am_master():
+            while self._queue:
+                # Remove the update from the queue even if it is old.
+                update = self._queue.pop(0)
+                # Process the update only if it is fresh.
+                if self._get_router_data_timestamp() < update.timestamp:
+                    yield update
+
+
+class RouterProcessingQueue(object):
+    """Manager of the queue of routers to process."""
+    def __init__(self):
+        self._queue = Queue.PriorityQueue()
+
+    def add(self, update):
+        self._queue.put(update)
+
+    def each_update_to_next_router(self):
+        """Grabs the next router from the queue and processes
+
+        This method uses a for loop to process the router repeatedly until
+        updates stop bubbling to the front of the queue.
+        """
+        next_update = self._queue.get()
+
+        with ExclusiveRouterProcessor(next_update.id) as rp:
+            # Queue the update whether this worker is the master or not.
+            rp.queue_update(next_update)
+
+            # Here, if the current worker is not the master, the call to
+            # rp.updates() will not yield and so this will essentially be a
+            # noop.
+            for update in rp.updates():
+                yield (rp, update)
index 60e3f82fd533db64a06206292c4f40bc0a95f5df..5f3d6dca6cd9bb5960b1bb830654e1729a57de35 100644 (file)
@@ -23,7 +23,7 @@ from oslo.utils import importutils
 
 from neutron.agent.common import config as agent_config
 from neutron.agent import dhcp_agent
-from neutron.agent import l3_agent
+from neutron.agent.l3 import agent as l3_agent
 from neutron.agent.linux import dhcp
 from neutron.agent.linux import interface
 from neutron.agent.linux import ip_lib
index 9d365abbaae80abedc5aafe18862ad2591088650..2f9ca68ca479e608f22feb608b9047cc2b270837 100644 (file)
@@ -16,7 +16,7 @@
 from oslo.config import cfg
 
 from neutron.agent.common import config as agent_config
-from neutron.agent import l3_agent
+from neutron.agent.l3 import agent
 from neutron.agent.linux import interface
 from neutron.agent.linux import ip_lib
 from neutron.agent.linux import ovs_lib
@@ -45,7 +45,7 @@ def setup_conf():
 
     conf = cfg.CONF
     conf.register_cli_opts(opts)
-    conf.register_opts(l3_agent.L3NATAgent.OPTS)
+    conf.register_opts(agent.L3NATAgent.OPTS)
     conf.register_opts(interface.OPTS)
     agent_config.register_interface_driver_opts_helper(conf)
     agent_config.register_use_namespaces_opts_helper(conf)
index 92ad587fa477d7152c45f01c1e7bd60fb05ac9cb..529ecefe06900968a654e01b3707ef1c9db768af 100644 (file)
 #    under the License.
 
 
-from neutron.agent import l3_agent
+from neutron.agent.l3 import agent
 
 
-class TestL3NATAgent(l3_agent.L3NATAgentWithStateReport):
+class TestL3NATAgent(agent.L3NATAgentWithStateReport):
     NESTED_NAMESPACE_SEPARATOR = '@'
 
     def get_ns_name(self, router_id):
index 99f1d1edd4e98baa644772464c1a6e958b02da76..e7b86d5b39ae5535736434597cffecc1860c220f 100644 (file)
@@ -20,7 +20,7 @@ import mock
 from oslo.config import cfg
 
 from neutron.agent.common import config as agent_config
-from neutron.agent import l3_agent
+from neutron.agent.l3 import agent as l3_agent
 from neutron.agent.linux import external_process
 from neutron.agent.linux import ip_lib
 from neutron.common import config as common_config
@@ -39,7 +39,7 @@ class L3AgentTestFramework(base.BaseOVSLinuxTestCase):
     def setUp(self):
         super(L3AgentTestFramework, self).setUp()
         self.check_sudo_enabled()
-        mock.patch('neutron.agent.l3_agent.L3PluginApi').start()
+        mock.patch('neutron.agent.l3.agent.L3PluginApi').start()
         self.agent = self._configure_agent('agent1')
 
     def _get_config_opts(self):
index 54ae156cf6befc1765f8d15edc91dca83640b33b..e50eda7cc00aef1119f083500bb39b989fe9edeb 100644 (file)
@@ -15,7 +15,6 @@
 
 import contextlib
 import copy
-import datetime
 
 import mock
 import netaddr
@@ -24,8 +23,10 @@ from oslo import messaging
 from testtools import matchers
 
 from neutron.agent.common import config as agent_config
-from neutron.agent import l3_agent
-from neutron.agent import l3_ha_agent
+from neutron.agent.l3 import agent as l3_agent
+from neutron.agent.l3 import ha
+from neutron.agent.l3 import link_local_allocator as lla
+from neutron.agent.l3 import router_info as l3router
 from neutron.agent.linux import interface
 from neutron.common import config as base_config
 from neutron.common import constants as l3_constants
@@ -49,161 +50,6 @@ class FakeDev(object):
         self.name = name
 
 
-class TestExclusiveRouterProcessor(base.BaseTestCase):
-    def setUp(self):
-        super(TestExclusiveRouterProcessor, self).setUp()
-
-    def test_i_am_master(self):
-        master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
-        not_master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
-        master_2 = l3_agent.ExclusiveRouterProcessor(FAKE_ID_2)
-        not_master_2 = l3_agent.ExclusiveRouterProcessor(FAKE_ID_2)
-
-        self.assertTrue(master._i_am_master())
-        self.assertFalse(not_master._i_am_master())
-        self.assertTrue(master_2._i_am_master())
-        self.assertFalse(not_master_2._i_am_master())
-
-        master.__exit__(None, None, None)
-        master_2.__exit__(None, None, None)
-
-    def test_master(self):
-        master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
-        not_master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
-        master_2 = l3_agent.ExclusiveRouterProcessor(FAKE_ID_2)
-        not_master_2 = l3_agent.ExclusiveRouterProcessor(FAKE_ID_2)
-
-        self.assertEqual(master._master, master)
-        self.assertEqual(not_master._master, master)
-        self.assertEqual(master_2._master, master_2)
-        self.assertEqual(not_master_2._master, master_2)
-
-        master.__exit__(None, None, None)
-        master_2.__exit__(None, None, None)
-
-    def test__enter__(self):
-        self.assertFalse(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters)
-        master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
-        master.__enter__()
-        self.assertTrue(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters)
-        master.__exit__(None, None, None)
-
-    def test__exit__(self):
-        master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
-        not_master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
-        master.__enter__()
-        self.assertTrue(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters)
-        not_master.__enter__()
-        not_master.__exit__(None, None, None)
-        self.assertTrue(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters)
-        master.__exit__(None, None, None)
-        self.assertFalse(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters)
-
-    def test_data_fetched_since(self):
-        master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
-        self.assertEqual(master._get_router_data_timestamp(),
-                         datetime.datetime.min)
-
-        ts1 = datetime.datetime.utcnow() - datetime.timedelta(seconds=10)
-        ts2 = datetime.datetime.utcnow()
-
-        master.fetched_and_processed(ts2)
-        self.assertEqual(master._get_router_data_timestamp(), ts2)
-        master.fetched_and_processed(ts1)
-        self.assertEqual(master._get_router_data_timestamp(), ts2)
-
-        master.__exit__(None, None, None)
-
-    def test_updates(self):
-        master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
-        not_master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
-
-        master.queue_update(l3_agent.RouterUpdate(FAKE_ID, 0))
-        not_master.queue_update(l3_agent.RouterUpdate(FAKE_ID, 0))
-
-        for update in not_master.updates():
-            raise Exception("Only the master should process a router")
-
-        self.assertEqual(2, len([i for i in master.updates()]))
-
-
-class TestLinkLocalAddrAllocator(base.BaseTestCase):
-    def setUp(self):
-        super(TestLinkLocalAddrAllocator, self).setUp()
-        self.subnet = netaddr.IPNetwork('169.254.31.0/24')
-
-    def test__init__(self):
-        a = l3_agent.LinkLocalAllocator('/file', self.subnet.cidr)
-        self.assertEqual('/file', a.state_file)
-        self.assertEqual({}, a.allocations)
-
-    def test__init__readfile(self):
-        with mock.patch.object(l3_agent.LinkLocalAllocator, '_read') as read:
-            read.return_value = ["da873ca2,169.254.31.28/31\n"]
-            a = l3_agent.LinkLocalAllocator('/file', self.subnet.cidr)
-
-        self.assertTrue('da873ca2' in a.remembered)
-        self.assertEqual({}, a.allocations)
-
-    def test_allocate(self):
-        a = l3_agent.LinkLocalAllocator('/file', self.subnet.cidr)
-        with mock.patch.object(l3_agent.LinkLocalAllocator, '_write') as write:
-            subnet = a.allocate('deadbeef')
-
-        self.assertTrue('deadbeef' in a.allocations)
-        self.assertTrue(subnet not in a.pool)
-        self._check_allocations(a.allocations)
-        write.assert_called_once_with(['deadbeef,%s\n' % subnet.cidr])
-
-    def test_allocate_from_file(self):
-        with mock.patch.object(l3_agent.LinkLocalAllocator, '_read') as read:
-            read.return_value = ["deadbeef,169.254.31.88/31\n"]
-            a = l3_agent.LinkLocalAllocator('/file', self.subnet.cidr)
-
-        with mock.patch.object(l3_agent.LinkLocalAllocator, '_write') as write:
-            subnet = a.allocate('deadbeef')
-
-        self.assertEqual(netaddr.IPNetwork('169.254.31.88/31'), subnet)
-        self.assertTrue(subnet not in a.pool)
-        self._check_allocations(a.allocations)
-        self.assertFalse(write.called)
-
-    def test_allocate_exhausted_pool(self):
-        subnet = netaddr.IPNetwork('169.254.31.0/31')
-        with mock.patch.object(l3_agent.LinkLocalAllocator, '_read') as read:
-            read.return_value = ["deadbeef,169.254.31.0/31\n"]
-            a = l3_agent.LinkLocalAllocator('/file', subnet.cidr)
-
-        with mock.patch.object(l3_agent.LinkLocalAllocator, '_write') as write:
-            allocation = a.allocate('abcdef12')
-
-        self.assertEqual(subnet, allocation)
-        self.assertFalse('deadbeef' in a.allocations)
-        self.assertTrue('abcdef12' in a.allocations)
-        self.assertTrue(allocation not in a.pool)
-        self._check_allocations(a.allocations)
-        write.assert_called_once_with(['abcdef12,%s\n' % allocation.cidr])
-
-        self.assertRaises(RuntimeError, a.allocate, 'deadbeef')
-
-    def test_release(self):
-        with mock.patch.object(l3_agent.LinkLocalAllocator, '_write') as write:
-            a = l3_agent.LinkLocalAllocator('/file', self.subnet.cidr)
-            subnet = a.allocate('deadbeef')
-            write.reset_mock()
-            a.release('deadbeef')
-
-        self.assertTrue('deadbeef' not in a.allocations)
-        self.assertTrue(subnet in a.pool)
-        self.assertEqual({}, a.allocations)
-        write.assert_called_once_with([])
-
-    def _check_allocations(self, allocations):
-        for key, subnet in allocations.items():
-            self.assertTrue(subnet in self.subnet)
-            self.assertEqual(subnet.prefixlen, 31)
-
-
 def router_append_interface(router, count=1, ip_version=4, ra_mode=None,
                             addr_mode=None):
     if ip_version == 4:
@@ -318,7 +164,7 @@ class TestBasicRouterOperations(base.BaseTestCase):
         self.conf = agent_config.setup_conf()
         self.conf.register_opts(base_config.core_opts)
         self.conf.register_opts(l3_agent.L3NATAgent.OPTS)
-        self.conf.register_opts(l3_ha_agent.OPTS)
+        self.conf.register_opts(ha.OPTS)
         agent_config.register_interface_driver_opts_helper(self.conf)
         agent_config.register_use_namespaces_opts_helper(self.conf)
         agent_config.register_root_helper(self.conf)
@@ -334,7 +180,7 @@ class TestBasicRouterOperations(base.BaseTestCase):
             'neutron.agent.linux.ip_lib.device_exists')
         self.device_exists = self.device_exists_p.start()
 
-        mock.patch('neutron.agent.l3_ha_agent.AgentMixin'
+        mock.patch('neutron.agent.l3.ha.AgentMixin'
                    '._init_ha_conf_path').start()
         mock.patch('neutron.agent.linux.keepalived.KeepalivedNotifierMixin'
                    '._get_full_config_file_path').start()
@@ -352,7 +198,7 @@ class TestBasicRouterOperations(base.BaseTestCase):
         self.external_process = self.external_process_p.start()
 
         self.send_arp_p = mock.patch(
-            'neutron.agent.l3_agent.L3NATAgent._send_gratuitous_arp_packet')
+            'neutron.agent.l3.agent.L3NATAgent._send_gratuitous_arp_packet')
         self.send_arp = self.send_arp_p.start()
 
         self.dvr_cls_p = mock.patch('neutron.agent.linux.interface.NullDriver')
@@ -376,7 +222,7 @@ class TestBasicRouterOperations(base.BaseTestCase):
         ip_dev.return_value = self.mock_ip_dev
 
         self.l3pluginApi_cls_p = mock.patch(
-            'neutron.agent.l3_agent.L3PluginApi')
+            'neutron.agent.l3.agent.L3PluginApi')
         l3pluginApi_cls = self.l3pluginApi_cls_p.start()
         self.plugin_api = mock.MagicMock()
         l3pluginApi_cls.return_value = self.plugin_api
@@ -412,7 +258,7 @@ class TestBasicRouterOperations(base.BaseTestCase):
         network_id = _uuid()
         router = prepare_router_data(num_internal_ports=2)
         router_id = router['id']
-        ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
+        ri = l3router.RouterInfo(router_id, self.conf.root_helper,
                                  router=router)
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         cidr = '99.0.1.9/24'
@@ -440,7 +286,7 @@ class TestBasicRouterOperations(base.BaseTestCase):
     def test_router_info_create(self):
         id = _uuid()
         ns = "ns-" + id
-        ri = l3_agent.RouterInfo(id, self.conf.root_helper, {}, ns_name=ns)
+        ri = l3router.RouterInfo(id, self.conf.root_helper, {}, ns_name=ns)
 
         self.assertTrue(ri.ns_name.endswith(id))
 
@@ -458,7 +304,7 @@ class TestBasicRouterOperations(base.BaseTestCase):
             'routes': [],
             'gw_port': ex_gw_port}
         ns = "ns-" + id
-        ri = l3_agent.RouterInfo(id, self.conf.root_helper, router, ns_name=ns)
+        ri = l3router.RouterInfo(id, self.conf.root_helper, router, ns_name=ns)
         self.assertTrue(ri.ns_name.endswith(id))
         self.assertEqual(ri.router, router)
 
@@ -529,7 +375,7 @@ class TestBasicRouterOperations(base.BaseTestCase):
 
     def _test_external_gateway_action(self, action, router):
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
-        ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+        ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                  router=router,
                                  ns_name=agent.get_ns_name(router['id']))
         # Special setup for dvr routers
@@ -600,7 +446,7 @@ class TestBasicRouterOperations(base.BaseTestCase):
     def test_external_gateway_updated(self):
         router = prepare_router_data(num_internal_ports=2)
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
-        ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+        ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                  router=router,
                                  ns_name=agent.get_ns_name(router['id']))
         interface_name, ex_gw_port = self._prepare_ext_gw_test(agent)
@@ -627,7 +473,7 @@ class TestBasicRouterOperations(base.BaseTestCase):
     def _test_ext_gw_updated_dvr_agent_mode(self, host,
                                             agent_mode, expected_call_count):
         router = prepare_router_data(num_internal_ports=2)
-        ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+        ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                  router=router)
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         interface_name, ex_gw_port = self._prepare_ext_gw_test(agent)
@@ -672,7 +518,7 @@ class TestBasicRouterOperations(base.BaseTestCase):
             self.conf.set_override('use_namespaces', False)
 
         router_id = _uuid()
-        ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, {})
+        ri = l3router.RouterInfo(router_id, self.conf.root_helper, {})
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         floating_ip = '20.0.0.101'
         interface_name = agent.get_external_device_name(router_id)
@@ -711,7 +557,7 @@ class TestBasicRouterOperations(base.BaseTestCase):
             self.conf.set_override('use_namespaces', False)
 
         router_id = _uuid()
-        ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, {})
+        ri = l3router.RouterInfo(router_id, self.conf.root_helper, {})
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
 
         fake_route1 = {'destination': '135.207.0.0/16',
@@ -757,7 +603,7 @@ class TestBasicRouterOperations(base.BaseTestCase):
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         router_id = _uuid()
 
-        ri = l3_agent.RouterInfo(router_id, self.conf.root_helper, {})
+        ri = l3router.RouterInfo(router_id, self.conf.root_helper, {})
         ri.router = {}
 
         fake_old_routes = []
@@ -815,7 +661,7 @@ class TestBasicRouterOperations(base.BaseTestCase):
     def test__map_internal_interfaces(self):
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         router = prepare_router_data(num_internal_ports=4)
-        ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+        ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                  router=router)
         test_port = {
             'mac_address': '00:12:23:34:45:56',
@@ -841,7 +687,7 @@ class TestBasicRouterOperations(base.BaseTestCase):
         router = prepare_router_data(num_internal_ports=4)
         subnet_ids = [_get_subnet_id(port) for port in
                       router[l3_constants.INTERFACE_KEY]]
-        ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+        ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                  router=router)
 
         # Test Basic cases
@@ -870,7 +716,7 @@ class TestBasicRouterOperations(base.BaseTestCase):
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         router = prepare_router_data(num_internal_ports=2)
         router['distributed'] = True
-        ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+        ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                  router=router)
         ports = ri.router.get(l3_constants.INTERFACE_KEY, [])
         test_ports = [{'mac_address': '00:11:22:33:44:55',
@@ -921,7 +767,7 @@ class TestBasicRouterOperations(base.BaseTestCase):
 
     def test__update_arp_entry_with_no_subnet(self):
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
-        ri = l3_agent.RouterInfo(
+        ri = l3router.RouterInfo(
             'foo_router_id', mock.ANY,
             {'distributed': True, 'gw_port_host': HOSTNAME})
         with mock.patch.object(l3_agent.ip_lib, 'IPDevice') as f:
@@ -949,13 +795,13 @@ class TestBasicRouterOperations(base.BaseTestCase):
 
     def test_process_cent_router(self):
         router = prepare_router_data()
-        ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+        ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                  router=router)
         self._test_process_router(ri)
 
     def test_process_dist_router(self):
         router = prepare_router_data()
-        ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+        ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                  router=router)
         subnet_id = _get_subnet_id(router[l3_constants.INTERFACE_KEY][0])
         ri.router['distributed'] = True
@@ -1047,7 +893,7 @@ class TestBasicRouterOperations(base.BaseTestCase):
         router['routes'] = [
             {'destination': '8.8.8.8/32', 'nexthop': '35.4.0.10'},
             {'destination': '8.8.4.4/32', 'nexthop': '35.4.0.11'}]
-        ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+        ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                  router=router)
         ri.router = router
         with contextlib.nested(mock.patch.object(agent,
@@ -1104,7 +950,7 @@ vrrp_instance VR_1 {
         device.addr.list.return_value = []
         ri.iptables_manager.ipv4['nat'] = mock.MagicMock()
 
-        with mock.patch.object(l3_agent.LinkLocalAllocator, '_write'):
+        with mock.patch.object(lla.LinkLocalAllocator, '_write'):
             fip_statuses = agent.process_router_floating_ip_addresses(
                 ri, {'id': _uuid()})
         self.assertEqual({fip_id: l3_constants.FLOATINGIP_STATUS_ACTIVE},
@@ -1141,7 +987,7 @@ vrrp_instance VR_1 {
 
         router = prepare_router_data(enable_snat=True)
         router[l3_constants.FLOATINGIP_KEY] = fake_floatingips['floatingips']
-        ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+        ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                  router=router)
         ri.iptables_manager.ipv4['nat'] = mock.MagicMock()
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
@@ -1165,7 +1011,7 @@ vrrp_instance VR_1 {
         router = prepare_router_data(enable_snat=True)
         router[l3_constants.FLOATINGIP_KEY] = fake_floatingips['floatingips']
         router['distributed'] = True
-        ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+        ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                  router=router)
         ri.iptables_manager.ipv4['nat'] = mock.MagicMock()
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
@@ -1288,7 +1134,7 @@ vrrp_instance VR_1 {
     def test_process_router_snat_disabled(self):
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         router = prepare_router_data(enable_snat=True)
-        ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+        ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                  router=router)
         agent.external_gateway_added = mock.Mock()
         # Process with NAT
@@ -1310,7 +1156,7 @@ vrrp_instance VR_1 {
     def test_process_router_snat_enabled(self):
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         router = prepare_router_data(enable_snat=False)
-        ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+        ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                  router=router)
         agent.external_gateway_added = mock.Mock()
         # Process without NAT
@@ -1332,7 +1178,7 @@ vrrp_instance VR_1 {
     def test_process_router_interface_added(self):
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         router = prepare_router_data()
-        ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+        ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                  router=router)
         agent.external_gateway_added = mock.Mock()
         # Process with NAT
@@ -1351,7 +1197,7 @@ vrrp_instance VR_1 {
         # Get NAT rules without the gw_port
         gw_port = router['gw_port']
         router['gw_port'] = None
-        ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+        ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                  router=router)
         agent.external_gateway_added = mock.Mock()
         agent.process_router(ri)
@@ -1359,7 +1205,7 @@ vrrp_instance VR_1 {
 
         # Get NAT rules with the gw_port
         router['gw_port'] = gw_port
-        ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+        ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                  router=router)
         with mock.patch.object(
                 agent,
@@ -1374,7 +1220,7 @@ vrrp_instance VR_1 {
     def _process_router_ipv6_interface_added(
             self, router, ra_mode=None, addr_mode=None):
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
-        ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+        ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                  router=router)
         agent.external_gateway_added = mock.Mock()
         # Process with NAT
@@ -1432,7 +1278,7 @@ vrrp_instance VR_1 {
     def test_process_router_ipv6v4_interface_added(self):
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         router = prepare_router_data()
-        ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+        ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                  router=router)
         agent.external_gateway_added = mock.Mock()
         # Process with NAT
@@ -1448,7 +1294,7 @@ vrrp_instance VR_1 {
     def test_process_router_interface_removed(self):
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         router = prepare_router_data(num_internal_ports=2)
-        ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+        ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                  router=router)
         agent.external_gateway_added = mock.Mock()
         # Process with NAT
@@ -1464,7 +1310,7 @@ vrrp_instance VR_1 {
     def test_process_router_ipv6_interface_removed(self):
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         router = prepare_router_data()
-        ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+        ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                  router=router)
         agent.external_gateway_added = mock.Mock()
         ri.router = router
@@ -1483,7 +1329,7 @@ vrrp_instance VR_1 {
     def test_process_router_internal_network_added_unexpected_error(self):
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         router = prepare_router_data()
-        ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+        ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                  router=router)
         agent.external_gateway_added = mock.Mock()
         with mock.patch.object(
@@ -1509,7 +1355,7 @@ vrrp_instance VR_1 {
     def test_process_router_internal_network_removed_unexpected_error(self):
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         router = prepare_router_data()
-        ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+        ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                  router=router)
         agent.external_gateway_added = mock.Mock()
         # add an internal port
@@ -1550,7 +1396,7 @@ vrrp_instance VR_1 {
                  'fixed_ip_address': '7.7.7.7',
                  'port_id': router[l3_constants.INTERFACE_KEY][0]['id']}]
 
-            ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+            ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                      router=router)
             agent.external_gateway_added = mock.Mock()
             agent.process_router(ri)
@@ -1583,7 +1429,7 @@ vrrp_instance VR_1 {
                  'fixed_ip_address': '7.7.7.7',
                  'port_id': router[l3_constants.INTERFACE_KEY][0]['id']}]
 
-            ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+            ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                      router=router)
             agent.external_gateway_added = mock.Mock()
             agent.process_router(ri)
@@ -1594,7 +1440,7 @@ vrrp_instance VR_1 {
                 {fip_id: l3_constants.FLOATINGIP_STATUS_ERROR})
 
     def test_handle_router_snat_rules_distributed_without_snat_manager(self):
-        ri = l3_agent.RouterInfo(
+        ri = l3router.RouterInfo(
             'foo_router_id', mock.ANY, {'distributed': True})
         ri.iptables_manager = mock.Mock()
 
@@ -1626,7 +1472,7 @@ vrrp_instance VR_1 {
 
     def test_handle_router_snat_rules_add_rules(self):
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
-        ri = l3_agent.RouterInfo(_uuid(), self.conf.root_helper, {})
+        ri = l3router.RouterInfo(_uuid(), self.conf.root_helper, {})
         ex_gw_port = {'fixed_ips': [{'ip_address': '192.168.1.4'}]}
         ri.router = {'distributed': False}
         agent._handle_router_snat_rules(ri, ex_gw_port,
@@ -1657,7 +1503,7 @@ vrrp_instance VR_1 {
         self.mock_ip.get_devices.return_value = get_devices_return
 
         router = prepare_router_data(enable_snat=True, num_internal_ports=1)
-        ri = l3_agent.RouterInfo(router['id'],
+        ri = l3router.RouterInfo(router['id'],
                                  self.conf.root_helper,
                                  router=router)
 
@@ -1700,7 +1546,7 @@ vrrp_instance VR_1 {
 
         router = prepare_router_data(enable_snat=True, num_internal_ports=1)
         del router['gw_port']
-        ri = l3_agent.RouterInfo(router['id'],
+        ri = l3router.RouterInfo(router['id'],
                                  self.conf.root_helper,
                                  router=router)
 
@@ -2030,7 +1876,7 @@ vrrp_instance VR_1 {
     def test_create_dvr_gateway(self):
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         router = prepare_router_data()
-        ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+        ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                  router=router)
 
         port_id = _uuid()
@@ -2091,14 +1937,14 @@ vrrp_instance VR_1 {
                'floating_network_id': _uuid(),
                'port_id': _uuid()}
 
-        ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+        ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                  router=router)
 
         rtr_2_fip_name = agent.get_rtr_int_device_name(ri.router_id)
         fip_2_rtr_name = agent.get_fip_int_device_name(ri.router_id)
         fip_ns_name = agent.get_fip_ns_name(str(fip['floating_network_id']))
 
-        with mock.patch.object(l3_agent.LinkLocalAllocator, '_write'):
+        with mock.patch.object(lla.LinkLocalAllocator, '_write'):
             self.device_exists.return_value = False
             agent.create_rtr_2_fip_link(ri, fip['floating_network_id'])
         self.mock_ip.add_veth.assert_called_with(rtr_2_fip_name,
@@ -2113,17 +1959,17 @@ vrrp_instance VR_1 {
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         router = prepare_router_data()
 
-        ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+        ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                  router=router)
         self.device_exists.return_value = True
-        with mock.patch.object(l3_agent.LinkLocalAllocator, '_write'):
+        with mock.patch.object(lla.LinkLocalAllocator, '_write'):
             agent.create_rtr_2_fip_link(ri, {})
         self.assertFalse(self.mock_ip.add_veth.called)
 
     def test_floating_ip_added_dist(self):
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         router = prepare_router_data()
-        ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+        ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                  router=router)
         agent_gw_port = {'fixed_ips': [{'ip_address': '20.0.0.30',
                                         'subnet_id': _uuid()}],
@@ -2140,14 +1986,14 @@ vrrp_instance VR_1 {
                'floating_network_id': _uuid(),
                'port_id': _uuid()}
         agent.agent_gateway_port = agent_gw_port
-        ri.rtr_fip_subnet = l3_agent.LinkLocalAddressPair('169.254.30.42/31')
+        ri.rtr_fip_subnet = lla.LinkLocalAddressPair('169.254.30.42/31')
         agent.floating_ip_added_dist(ri, fip)
         self.mock_rule.add_rule_from.assert_called_with('192.168.0.1',
                                                         16, FIP_PRI)
         # TODO(mrsmith): add more asserts
 
     @mock.patch.object(l3_agent.L3NATAgent, '_fip_ns_unsubscribe')
-    @mock.patch.object(l3_agent.LinkLocalAllocator, '_write')
+    @mock.patch.object(lla.LinkLocalAllocator, '_write')
     def test_floating_ip_removed_dist(self, write, unsubscribe):
         agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
         router = prepare_router_data()
@@ -2160,7 +2006,7 @@ vrrp_instance VR_1 {
                          'ip_cidr': '20.0.0.30/24'}
         fip_cidr = '11.22.33.44/24'
 
-        ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
+        ri = l3router.RouterInfo(router['id'], self.conf.root_helper,
                                  router=router)
         ri.dist_fip_count = 2
         agent.fip_ns_subscribers.add(ri.router_id)
@@ -2168,7 +2014,7 @@ vrrp_instance VR_1 {
         ri.fip_2_rtr = '11.22.33.42'
         ri.rtr_2_fip = '11.22.33.40'
         agent.agent_gateway_port = agent_gw_port
-        s = l3_agent.LinkLocalAddressPair('169.254.30.42/31')
+        s = lla.LinkLocalAddressPair('169.254.30.42/31')
         ri.rtr_fip_subnet = s
         agent.floating_ip_removed_dist(ri, fip_cidr)
         self.mock_rule.delete_rule_priority.assert_called_with(FIP_PRI)
@@ -2278,7 +2124,7 @@ vrrp_instance VR_1 {
         router['distributed'] = True
         router['gw_port_host'] = HOSTNAME
 
-        ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper, router)
+        ri = l3router.RouterInfo(router['id'], self.conf.root_helper, router)
         vm_floating_ip = '19.4.4.2'
         ri.floating_ips_dict[vm_floating_ip] = FIP_PRI
         ri.dist_fip_count = 1
@@ -2321,7 +2167,7 @@ class TestL3AgentEventHandler(base.BaseTestCase):
     def setUp(self):
         super(TestL3AgentEventHandler, self).setUp()
         cfg.CONF.register_opts(l3_agent.L3NATAgent.OPTS)
-        cfg.CONF.register_opts(l3_ha_agent.OPTS)
+        cfg.CONF.register_opts(ha.OPTS)
         agent_config.register_interface_driver_opts_helper(cfg.CONF)
         agent_config.register_use_namespaces_opts_helper(cfg.CONF)
         cfg.CONF.set_override(
@@ -2347,7 +2193,7 @@ class TestL3AgentEventHandler(base.BaseTestCase):
         driver_cls.return_value = mock_driver
 
         l3_plugin_p = mock.patch(
-            'neutron.agent.l3_agent.L3PluginApi')
+            'neutron.agent.l3.agent.L3PluginApi')
         l3_plugin_cls = l3_plugin_p.start()
         l3_plugin_cls.return_value = mock.MagicMock()
 
@@ -2370,7 +2216,7 @@ class TestL3AgentEventHandler(base.BaseTestCase):
         cfg.CONF.set_override('debug', True)
 
         self.external_process_p.stop()
-        ri = l3_agent.RouterInfo(router_id, None, None)
+        ri = l3router.RouterInfo(router_id, None, None)
         try:
             with mock.patch(ip_class_path) as ip_mock:
                 self.agent._spawn_metadata_proxy(ri.router_id, ri.ns_name)
diff --git a/neutron/tests/unit/test_l3_dvr.py b/neutron/tests/unit/test_l3_dvr.py
new file mode 100644 (file)
index 0000000..89ad856
--- /dev/null
@@ -0,0 +1,96 @@
+# Copyright 2014 Hewlett-Packard Development Company, L.P.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import mock
+import netaddr
+
+from neutron.agent.l3 import link_local_allocator as lla
+from neutron.tests import base
+
+
+class TestLinkLocalAddrAllocator(base.BaseTestCase):
+    def setUp(self):
+        super(TestLinkLocalAddrAllocator, self).setUp()
+        self.subnet = netaddr.IPNetwork('169.254.31.0/24')
+
+    def test__init__(self):
+        a = lla.LinkLocalAllocator('/file', self.subnet.cidr)
+        self.assertEqual('/file', a.state_file)
+        self.assertEqual({}, a.allocations)
+
+    def test__init__readfile(self):
+        with mock.patch.object(lla.LinkLocalAllocator, '_read') as read:
+            read.return_value = ["da873ca2,169.254.31.28/31\n"]
+            a = lla.LinkLocalAllocator('/file', self.subnet.cidr)
+
+        self.assertTrue('da873ca2' in a.remembered)
+        self.assertEqual({}, a.allocations)
+
+    def test_allocate(self):
+        a = lla.LinkLocalAllocator('/file', self.subnet.cidr)
+        with mock.patch.object(lla.LinkLocalAllocator, '_write') as write:
+            subnet = a.allocate('deadbeef')
+
+        self.assertTrue('deadbeef' in a.allocations)
+        self.assertTrue(subnet not in a.pool)
+        self._check_allocations(a.allocations)
+        write.assert_called_once_with(['deadbeef,%s\n' % subnet.cidr])
+
+    def test_allocate_from_file(self):
+        with mock.patch.object(lla.LinkLocalAllocator, '_read') as read:
+            read.return_value = ["deadbeef,169.254.31.88/31\n"]
+            a = lla.LinkLocalAllocator('/file', self.subnet.cidr)
+
+        with mock.patch.object(lla.LinkLocalAllocator, '_write') as write:
+            subnet = a.allocate('deadbeef')
+
+        self.assertEqual(netaddr.IPNetwork('169.254.31.88/31'), subnet)
+        self.assertTrue(subnet not in a.pool)
+        self._check_allocations(a.allocations)
+        self.assertFalse(write.called)
+
+    def test_allocate_exhausted_pool(self):
+        subnet = netaddr.IPNetwork('169.254.31.0/31')
+        with mock.patch.object(lla.LinkLocalAllocator, '_read') as read:
+            read.return_value = ["deadbeef,169.254.31.0/31\n"]
+            a = lla.LinkLocalAllocator('/file', subnet.cidr)
+
+        with mock.patch.object(lla.LinkLocalAllocator, '_write') as write:
+            allocation = a.allocate('abcdef12')
+
+        self.assertEqual(subnet, allocation)
+        self.assertFalse('deadbeef' in a.allocations)
+        self.assertTrue('abcdef12' in a.allocations)
+        self.assertTrue(allocation not in a.pool)
+        self._check_allocations(a.allocations)
+        write.assert_called_once_with(['abcdef12,%s\n' % allocation.cidr])
+
+        self.assertRaises(RuntimeError, a.allocate, 'deadbeef')
+
+    def test_release(self):
+        with mock.patch.object(lla.LinkLocalAllocator, '_write') as write:
+            a = lla.LinkLocalAllocator('/file', self.subnet.cidr)
+            subnet = a.allocate('deadbeef')
+            write.reset_mock()
+            a.release('deadbeef')
+
+        self.assertTrue('deadbeef' not in a.allocations)
+        self.assertTrue(subnet in a.pool)
+        self.assertEqual({}, a.allocations)
+        write.assert_called_once_with([])
+
+    def _check_allocations(self, allocations):
+        for key, subnet in allocations.items():
+            self.assertTrue(subnet in self.subnet)
+            self.assertEqual(subnet.prefixlen, 31)
diff --git a/neutron/tests/unit/test_router_processing_queue.py b/neutron/tests/unit/test_router_processing_queue.py
new file mode 100644 (file)
index 0000000..0f7bfcd
--- /dev/null
@@ -0,0 +1,102 @@
+# Copyright 2014 Hewlett-Packard Development Company, L.P.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+
+import datetime
+
+from neutron.agent.l3 import router_processing_queue as l3_queue
+from neutron.openstack.common import uuidutils
+from neutron.tests import base
+
+_uuid = uuidutils.generate_uuid
+FAKE_ID = _uuid()
+FAKE_ID_2 = _uuid()
+
+
+class TestExclusiveRouterProcessor(base.BaseTestCase):
+    def setUp(self):
+        super(TestExclusiveRouterProcessor, self).setUp()
+
+    def test_i_am_master(self):
+        master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
+        not_master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
+        master_2 = l3_queue.ExclusiveRouterProcessor(FAKE_ID_2)
+        not_master_2 = l3_queue.ExclusiveRouterProcessor(FAKE_ID_2)
+
+        self.assertTrue(master._i_am_master())
+        self.assertFalse(not_master._i_am_master())
+        self.assertTrue(master_2._i_am_master())
+        self.assertFalse(not_master_2._i_am_master())
+
+        master.__exit__(None, None, None)
+        master_2.__exit__(None, None, None)
+
+    def test_master(self):
+        master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
+        not_master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
+        master_2 = l3_queue.ExclusiveRouterProcessor(FAKE_ID_2)
+        not_master_2 = l3_queue.ExclusiveRouterProcessor(FAKE_ID_2)
+
+        self.assertEqual(master._master, master)
+        self.assertEqual(not_master._master, master)
+        self.assertEqual(master_2._master, master_2)
+        self.assertEqual(not_master_2._master, master_2)
+
+        master.__exit__(None, None, None)
+        master_2.__exit__(None, None, None)
+
+    def test__enter__(self):
+        self.assertFalse(FAKE_ID in l3_queue.ExclusiveRouterProcessor._masters)
+        master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
+        master.__enter__()
+        self.assertTrue(FAKE_ID in l3_queue.ExclusiveRouterProcessor._masters)
+        master.__exit__(None, None, None)
+
+    def test__exit__(self):
+        master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
+        not_master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
+        master.__enter__()
+        self.assertTrue(FAKE_ID in l3_queue.ExclusiveRouterProcessor._masters)
+        not_master.__enter__()
+        not_master.__exit__(None, None, None)
+        self.assertTrue(FAKE_ID in l3_queue.ExclusiveRouterProcessor._masters)
+        master.__exit__(None, None, None)
+        self.assertFalse(FAKE_ID in l3_queue.ExclusiveRouterProcessor._masters)
+
+    def test_data_fetched_since(self):
+        master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
+        self.assertEqual(master._get_router_data_timestamp(),
+                         datetime.datetime.min)
+
+        ts1 = datetime.datetime.utcnow() - datetime.timedelta(seconds=10)
+        ts2 = datetime.datetime.utcnow()
+
+        master.fetched_and_processed(ts2)
+        self.assertEqual(master._get_router_data_timestamp(), ts2)
+        master.fetched_and_processed(ts1)
+        self.assertEqual(master._get_router_data_timestamp(), ts2)
+
+        master.__exit__(None, None, None)
+
+    def test_updates(self):
+        master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
+        not_master = l3_queue.ExclusiveRouterProcessor(FAKE_ID)
+
+        master.queue_update(l3_queue.RouterUpdate(FAKE_ID, 0))
+        not_master.queue_update(l3_queue.RouterUpdate(FAKE_ID, 0))
+
+        for update in not_master.updates():
+            raise Exception("Only the master should process a router")
+
+        self.assertEqual(2, len([i for i in master.updates()]))
index 83cfc9aeebd2edfd468cd79d4e32fa98b52033b3..2feb253afbacb56d977d527ddde409d0608d46e7 100644 (file)
--- a/setup.cfg
+++ b/setup.cfg
@@ -103,7 +103,7 @@ console_scripts =
     neutron-dhcp-agent = neutron.agent.dhcp_agent:main
     neutron-hyperv-agent = neutron.plugins.hyperv.agent.hyperv_neutron_agent:main
     neutron-ibm-agent = neutron.plugins.ibm.agent.sdnve_neutron_agent:main
-    neutron-l3-agent = neutron.agent.l3_agent:main
+    neutron-l3-agent = neutron.agent.l3.agent:main
     neutron-lbaas-agent = neutron.services.loadbalancer.agent.agent:main
     neutron-linuxbridge-agent = neutron.plugins.linuxbridge.agent.linuxbridge_neutron_agent:main
     neutron-metadata-agent = neutron.agent.metadata.agent:main