]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
RPC support for OVS Plugin and Agent
authorGary Kotton <gkotton@redhat.com>
Mon, 6 Aug 2012 11:45:34 +0000 (07:45 -0400)
committerGary Kotton <gkotton@redhat.com>
Mon, 13 Aug 2012 09:20:58 +0000 (05:20 -0400)
blueprint scalable-agent-comms

This adds support for the OVS plugin.

Change-Id: I613de63f5c7f374be87520f32a2f7129d86ef109

12 files changed:
quantum/agent/linux/ovs_lib.py
quantum/agent/rpc.py
quantum/plugins/linuxbridge/agent/linuxbridge_quantum_agent.py
quantum/plugins/linuxbridge/tests/unit/test_rpcapi.py
quantum/plugins/openvswitch/agent/ovs_quantum_agent.py
quantum/plugins/openvswitch/common/config.py
quantum/plugins/openvswitch/ovs_db_v2.py
quantum/plugins/openvswitch/ovs_models_v2.py
quantum/plugins/openvswitch/ovs_quantum_plugin.py
quantum/plugins/openvswitch/tests/unit/test_rpcapi.py [new file with mode: 0644]
quantum/plugins/openvswitch/tests/unit/test_tunnel.py
tools/pip-requires

index 204c98157296b1e324bdc8ed740fbe80236c25f9..67f06580e2aaf180c16d03bddd2aade3457a3aed 100644 (file)
@@ -89,7 +89,6 @@ class OVSBridge:
     def _build_flow_expr_arr(self, **kwargs):
         flow_expr_arr = []
         is_delete_expr = kwargs.get('delete', False)
-
         if not is_delete_expr:
             prefix = ("hard_timeout=%s,idle_timeout=%s,priority=%s" %
                      (kwargs.get('hard_timeout', '0'),
@@ -206,3 +205,24 @@ class OVSBridge:
                 edge_ports.append(p)
 
         return edge_ports
+
+    def get_vif_port_set(self):
+        edge_ports = set()
+        port_names = self.get_port_name_list()
+        for name in port_names:
+            external_ids = self.db_get_map("Interface", name, "external_ids")
+            if "iface-id" in external_ids and "attached-mac" in external_ids:
+                edge_ports.add(external_ids['iface-id'])
+            elif ("xs-vif-uuid" in external_ids and
+                  "attached-mac" in external_ids):
+                # if this is a xenserver and iface-id is not automatically
+                # synced to OVS from XAPI, we grab it from XAPI directly
+                iface_id = self.get_xapi_iface_id(external_ids["xs-vif-uuid"])
+                edge_ports.add(iface_id)
+        return edge_ports
+
+    def get_vif_port(self, port_name):
+        external_ids = self.db_get_map("Interface", port_name, "external_ids")
+        ofport = self.db_get_val("Interface", port_name, "ofport")
+        return VifPort(port_name, ofport, external_ids["iface-id"],
+                       external_ids["attached-mac"], self)
index 7eb85f10f4aab981c9401bfeba4a784de82a7ccb..7e7fe791f0f784df4cc01962539ee7a562c6553d 100644 (file)
@@ -15,6 +15,7 @@
 
 from quantum.common import topics
 from quantum.openstack.common import rpc
+from quantum.openstack.common.rpc import proxy
 
 
 def create_consumers(dispatcher, prefix, topic_details):
@@ -34,3 +35,35 @@ def create_consumers(dispatcher, prefix, topic_details):
         connection.create_consumer(topic_name, dispatcher, fanout=True)
     connection.consume_in_thread()
     return connection
+
+
+class PluginApi(proxy.RpcProxy):
+    '''Agent side of the rpc API.
+
+    API version history:
+        1.0 - Initial version.
+
+    '''
+
+    BASE_RPC_API_VERSION = '1.0'
+
+    def __init__(self, topic):
+        super(PluginApi, self).__init__(
+            topic=topic, default_version=self.BASE_RPC_API_VERSION)
+
+    def get_device_details(self, context, device, agent_id):
+        return self.call(context,
+                         self.make_msg('get_device_details', device=device,
+                                       agent_id=agent_id),
+                         topic=self.topic)
+
+    def update_device_down(self, context, device, agent_id):
+        return self.call(context,
+                         self.make_msg('update_device_down', device=device,
+                                       agent_id=agent_id),
+                         topic=self.topic)
+
+    def tunnel_sync(self, context, tunnel_ip):
+        return self.call(context,
+                         self.make_msg('tunnel_sync', tunnel_ip=tunnel_ip),
+                         topic=self.topic)
index 1acae609e01bf89ba84b05ca4ea749d5b9558dcd..2679a6474a1961d902ad2246a3025638d63a4316 100755 (executable)
@@ -34,14 +34,13 @@ import eventlet
 import pyudev
 from sqlalchemy.ext.sqlsoup import SqlSoup
 
-from quantum.agent.rpc import create_consumers
+from quantum.agent import rpc as agent_rpc
 from quantum.common import config as logging_config
 from quantum.common import topics
 from quantum.openstack.common import cfg
 from quantum.openstack.common import context
 from quantum.openstack.common import rpc
 from quantum.openstack.common.rpc import dispatcher
-from quantum.openstack.common.rpc import proxy
 from quantum.plugins.linuxbridge.common import config
 
 from quantum.agent.linux import utils
@@ -320,33 +319,6 @@ class LinuxBridge:
             LOG.debug("Done deleting subinterface %s" % interface)
 
 
-class PluginApi(proxy.RpcProxy):
-    '''Agent side of the linux bridge rpc API.
-
-    API version history:
-        1.0 - Initial version.
-
-    '''
-
-    BASE_RPC_API_VERSION = '1.0'
-
-    def __init__(self, topic):
-        super(PluginApi, self).__init__(
-            topic=topic, default_version=self.BASE_RPC_API_VERSION)
-
-    def get_device_details(self, context, device, agent_id):
-        return self.call(context,
-                         self.make_msg('get_device_details', device=device,
-                                       agent_id=agent_id),
-                         topic=self.topic)
-
-    def update_device_down(self, context, device, agent_id):
-        return self.call(context,
-                         self.make_msg('update_device_down', device=device,
-                                       agent_id=agent_id),
-                         topic=self.topic)
-
-
 class LinuxBridgeRpcCallbacks():
 
     # Set RPC API version to 1.0 by default.
@@ -578,7 +550,7 @@ class LinuxBridgeQuantumAgentRPC:
         mac = utils.get_interface_mac(physical_interface)
         self.agent_id = '%s%s' % ('lb', (mac.replace(":", "")))
         self.topic = topics.AGENT
-        self.plugin_rpc = PluginApi(topics.PLUGIN)
+        self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
 
         # RPC network init
         self.context = context.RequestContext('quantum', 'quantum',
@@ -590,8 +562,9 @@ class LinuxBridgeQuantumAgentRPC:
         # Define the listening consumers for the agent
         consumers = [[topics.PORT, topics.UPDATE],
                      [topics.NETWORK, topics.DELETE]]
-        self.connection = create_consumers(self.dispatcher, self.topic,
-                                           consumers)
+        self.connection = agent_rpc.create_consumers(self.dispatcher,
+                                                     self.topic,
+                                                     consumers)
         self.udev = pyudev.Context()
         monitor = pyudev.Monitor.from_netlink(self.udev)
         monitor.filter_by('net')
index 22213c5f2d25560026c99e43ad2a7df17d84585e..1fc3f6fd6a1d3a79baf251504417b437a580a96f 100644 (file)
@@ -21,10 +21,10 @@ Unit Tests for linuxbridge rpc
 import stubout
 import unittest2
 
+from quantum.agent import rpc as agent_rpc
 from quantum.common import topics
 from quantum.openstack.common import context
 from quantum.openstack.common import rpc
-from quantum.plugins.linuxbridge.agent import linuxbridge_quantum_agent as alb
 from quantum.plugins.linuxbridge import lb_quantum_plugin as plb
 
 
@@ -77,14 +77,14 @@ class rpcApiTestCase(unittest2.TestCase):
                           port='fake_port', vlan_id='fake_vlan_id')
 
     def test_device_details(self):
-        rpcapi = alb.PluginApi(topics.PLUGIN)
+        rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
         self._test_lb_api(rpcapi, topics.PLUGIN,
                           'get_device_details', rpc_method='call',
                           device='fake_device',
                           agent_id='fake_agent_id')
 
     def test_update_device_down(self):
-        rpcapi = alb.PluginApi(topics.PLUGIN)
+        rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
         self._test_lb_api(rpcapi, topics.PLUGIN,
                           'update_device_down', rpc_method='call',
                           device='fake_device',
index 6a239a14866668c89a34874c703fd195a8ba4c89..cb4b473c5c1bf2bc4c07e33538add978899e2c82 100755 (executable)
@@ -24,11 +24,18 @@ import logging
 import sys
 import time
 
+import eventlet
 from sqlalchemy.ext import sqlsoup
 
+from quantum.agent import rpc as agent_rpc
 from quantum.agent.linux import ovs_lib
+from quantum.agent.linux import utils
 from quantum.common import config as logging_config
+from quantum.common import topics
 from quantum.openstack.common import cfg
+from quantum.openstack.common import context
+from quantum.openstack.common import rpc
+from quantum.openstack.common.rpc import dispatcher
 from quantum.plugins.openvswitch.common import config
 
 logging.basicConfig()
@@ -120,15 +127,87 @@ class Portv2(object):
         return hash(self.id)
 
 
+class OVSRpcCallbacks():
+
+    # Set RPC API version to 1.0 by default.
+    RPC_API_VERSION = '1.0'
+
+    def __init__(self, context, int_br, local_ip=None, tun_br=None):
+        self.context = context
+        self.int_br = int_br
+        # Tunneling variables
+        self.local_ip = local_ip
+        self.tun_br = tun_br
+
+    def network_delete(self, context, **kwargs):
+        LOG.debug("network_delete received")
+        network_id = kwargs.get('network_id')
+        # (TODO) garyk delete the bridge interface
+        LOG.debug("Delete %s", network_id)
+
+    def port_update(self, context, **kwargs):
+        LOG.debug("port_update received")
+        port = kwargs.get('port')
+        port_name = 'tap%s' % port['id'][0:11]
+        vif_port = self.int_br.get_vif_port(port_name)
+        if port['admin_state_up']:
+            vlan_id = kwargs.get('vlan_id')
+            # create the networking for the port
+            self.int_br.set_db_attribute("Port", vif_port.port_name,
+                                         "tag", str(vlan_id))
+            self.int_br.delete_flows(in_port=vif_port.ofport)
+        else:
+            self.int_br.clear_db_attribute("Port", vif_port.port_name, "tag")
+
+    def tunnel_update(self, context, **kwargs):
+        LOG.debug("tunnel_update received")
+        tunnel_ip = kwargs.get('tunnel_ip')
+        tunnel_id = kwargs.get('tunnel_id')
+        if tunnel_ip == self.local_ip:
+            return
+        tun_name = 'gre-%s' % tunnel_id
+        self.tun_br.add_tunnel_port(tun_name, tunnel_ip)
+
+    def create_rpc_dispatcher(self):
+        '''Get the rpc dispatcher for this manager.
+
+        If a manager would like to set an rpc API version, or support more than
+        one class as the target of rpc messages, override this method.
+        '''
+        return dispatcher.RpcDispatcher([self])
+
+
 class OVSQuantumAgent(object):
 
     def __init__(self, integ_br, root_helper, polling_interval,
-                 reconnect_interval, target_v2_api=False):
+                 reconnect_interval, target_v2_api, rpc):
         self.root_helper = root_helper
         self.setup_integration_br(integ_br)
         self.polling_interval = polling_interval
         self.reconnect_interval = reconnect_interval
         self.target_v2_api = target_v2_api
+        self.rpc = rpc
+        if rpc:
+            self.setup_rpc(integ_br)
+
+    def setup_rpc(self, integ_br):
+        mac = utils.get_interface_mac(integ_br)
+        self.agent_id = '%s' % (mac.replace(":", ""))
+        self.topic = topics.AGENT
+        self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
+
+        # RPC network init
+        self.context = context.RequestContext('quantum', 'quantum',
+                                              is_admin=False)
+        # Handle updates from service
+        self.callbacks = OVSRpcCallbacks(self.context, self.int_br)
+        self.dispatcher = self.callbacks.create_rpc_dispatcher()
+        # Define the listening consumers for the agent
+        consumers = [[topics.PORT, topics.UPDATE],
+                     [topics.NETWORK, topics.DELETE]]
+        self.connection = agent_rpc.create_consumers(self.dispatcher,
+                                                     self.topic,
+                                                     consumers)
 
     def port_bound(self, port, vlan_id):
         self.int_br.set_db_attribute("Port", port.port_name,
@@ -145,7 +224,7 @@ class OVSQuantumAgent(object):
         # switch all traffic using L2 learning
         self.int_br.add_flow(priority=1, actions="normal")
 
-    def daemon_loop(self, db_connection_url):
+    def db_loop(self, db_connection_url):
         '''Main processing loop for Non-Tunneling Agent.
 
         :param options: database information - in the event need to reconnect
@@ -247,6 +326,102 @@ class OVSQuantumAgent(object):
 
             time.sleep(self.polling_interval)
 
+    def update_ports(self, registered_ports):
+        ports = self.int_br.get_vif_port_set()
+        if ports == registered_ports:
+            return
+        added = ports - registered_ports
+        removed = registered_ports - ports
+        return {'current': ports,
+                'added': added,
+                'removed': removed}
+
+    def treat_devices_added(self, devices):
+        resync = False
+        for device in devices:
+            LOG.info("Port %s added", device)
+            try:
+                details = self.plugin_rpc.get_device_details(self.context,
+                                                             device,
+                                                             self.agent_id)
+            except Exception as e:
+                LOG.debug("Unable to get port details for %s: %s", device, e)
+                resync = True
+                continue
+            if 'port_id' in details:
+                LOG.info("Port %s updated. Details: %s", device, details)
+                port_name = 'tap%s' % details['port_id'][0:11]
+                port = self.int_br.get_vif_port(port_name)
+                if details['admin_state_up']:
+                    self.port_bound(port, details['vlan_id'])
+                else:
+                    self.port_unbound(port, True)
+            else:
+                LOG.debug("Device %s not defined on plugin", device)
+        return resync
+
+    def treat_devices_removed(self, devices):
+        resync = False
+        for device in devices:
+            LOG.info("Attachment %s removed", device)
+            try:
+                details = self.plugin_rpc.update_device_down(self.context,
+                                                             device,
+                                                             self.agent_id)
+            except Exception as e:
+                LOG.debug("port_removed failed for %s: %s", device, e)
+                resync = True
+            if details['exists']:
+                LOG.info("Port %s updated.", device)
+                # Nothing to do regarding local networking
+            else:
+                LOG.debug("Device %s not defined on plugin", device)
+        return resync
+
+    def process_network_ports(self, port_info):
+        resync_a = False
+        resync_b = False
+        if 'added' in port_info:
+            resync_a = self.treat_devices_added(port_info['added'])
+        if 'removed' in port_info:
+            resync_b = self.treat_devices_removed(port_info['removed'])
+        # If one of the above opertaions fails => resync with plugin
+        return (resync_a | resync_b)
+
+    def rpc_loop(self):
+        sync = True
+        ports = set()
+
+        while True:
+            start = time.time()
+            if sync:
+                LOG.info("Agent out of sync with plugin!")
+                ports.clear()
+                sync = False
+
+            port_info = self.update_ports(ports)
+
+            # notify plugin about port deltas
+            if port_info:
+                LOG.debug("Agent loop has new devices!")
+                # If treat devices fails - indicates must resync with plugin
+                sync = self.process_network_ports(port_info)
+                ports = port_info['current']
+
+            # sleep till end of polling interval
+            elapsed = (time.time() - start)
+            if (elapsed < self.polling_interval):
+                time.sleep(self.polling_interval - elapsed)
+            else:
+                LOG.debug("Loop iteration exceeded interval (%s vs. %s)!",
+                          self.polling_interval, elapsed)
+
+    def daemon_loop(self, db_connection_url):
+        if self.rpc:
+            self.rpc_loop()
+        else:
+            self.db_loop(db_connection_url)
+
 
 class OVSQuantumTunnelAgent(object):
     '''Implements OVS-based tunneling.
@@ -273,7 +448,8 @@ class OVSQuantumTunnelAgent(object):
     MAX_VLAN_TAG = 4094
 
     def __init__(self, integ_br, tun_br, local_ip, root_helper,
-                 polling_interval, reconnect_interval, target_v2_api=False):
+                 polling_interval, reconnect_interval, target_v2_api,
+                 rpc):
         '''Constructor.
 
         :param integ_br: name of the integration bridge.
@@ -283,6 +459,7 @@ class OVSQuantumTunnelAgent(object):
         :param polling_interval: interval (secs) to poll DB.
         :param reconnect_internal: retry interval (secs) on DB error.
         :param target_v2_api: if True  use v2 api.
+        :param rpc: if True use RPC interface to interface with plugin.
         '''
         self.root_helper = root_helper
         self.available_local_vlans = set(
@@ -298,6 +475,30 @@ class OVSQuantumTunnelAgent(object):
         self.tunnel_count = 0
         self.setup_tunnel_br(tun_br)
         self.target_v2_api = target_v2_api
+        self.rpc = rpc
+        if rpc:
+            self.setup_rpc(integ_br)
+
+    def setup_rpc(self, integ_br):
+        mac = utils.get_interface_mac(integ_br)
+        self.agent_id = '%s%s' % ('ovs', (mac.replace(":", "")))
+        self.topic = topics.AGENT
+        self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
+
+        # RPC network init
+        self.context = context.RequestContext('quantum', 'quantum',
+                                              is_admin=False)
+        # Handle updates from service
+        self.callbacks = OVSRpcCallbacks(self.context, self.int_br,
+                                         self.local_ip, self.tun_br)
+        self.dispatcher = self.callbacks.create_rpc_dispatcher()
+        # Define the listening consumers for the agent
+        consumers = [[topics.PORT, topics.UPDATE],
+                     [topics.NETWORK, topics.DELETE],
+                     [config.TUNNEL, topics.UPDATE]]
+        self.connection = agent_rpc.create_consumers(self.dispatcher,
+                                                     self.topic,
+                                                     consumers)
 
     def provision_local_vlan(self, net_uuid, lsw_id):
         '''Provisions a local VLAN.
@@ -431,7 +632,7 @@ class OVSQuantumTunnelAgent(object):
             except:
                 LOG.exception("Problem connecting to database")
 
-    def daemon_loop(self, db_connection_url):
+    def db_loop(self, db_connection_url):
         '''Main processing loop for Tunneling Agent.
 
         :param options: database information - in the event need to reconnect
@@ -547,6 +748,123 @@ class OVSQuantumTunnelAgent(object):
                 LOG.exception("Main-loop Exception:")
                 self.rollback_until_success(db)
 
+    def update_ports(self, registered_ports):
+        ports = self.int_br.get_vif_port_set()
+        if ports == registered_ports:
+            return
+        added = ports - registered_ports
+        removed = registered_ports - ports
+        return {'current': ports,
+                'added': added,
+                'removed': removed}
+
+    def treat_devices_added(self, devices):
+        resync = False
+        for device in devices:
+            LOG.info("Port %s added", device)
+            try:
+                details = self.plugin_rpc.get_device_details(self.context,
+                                                             device,
+                                                             self.agent_id)
+            except Exception as e:
+                LOG.debug("Unable to get port details for %s: %s", device, e)
+                resync = True
+                continue
+            if 'port_id' in details:
+                LOG.info("Port %s updated. Details: %s", device, details)
+                port_name = 'tap%s' % details['port_id'][0:11]
+                port = self.int_br.get_vif_port(port_name)
+                if details['admin_state_up']:
+                    self.port_bound(port, details['network_id'],
+                                    details['vlan_id'])
+                else:
+                    self.port_unbound(port, details['network_id'])
+            else:
+                LOG.debug("Device %s not defined on plugin", device)
+        return resync
+
+    def treat_devices_removed(self, devices):
+        resync = False
+        for device in devices:
+            LOG.info("Attachment %s removed", device)
+            try:
+                details = self.plugin_rpc.update_device_down(self.context,
+                                                             device,
+                                                             self.agent_id)
+            except Exception as e:
+                LOG.debug("port_removed failed for %s: %s", device, e)
+                resync = True
+            if details['exists']:
+                LOG.info("Port %s updated.", device)
+                # Nothing to do regarding local networking
+            else:
+                LOG.debug("Device %s not defined on plugin", device)
+        return resync
+
+    def process_network_ports(self, port_info):
+        resync_a = False
+        resync_b = False
+        if 'added' in port_info:
+            resync_a = self.treat_devices_added(port_info['added'])
+        if 'removed' in port_info:
+            resync_b = self.treat_devices_removed(port_info['removed'])
+        # If one of the above opertaions fails => resync with plugin
+        return (resync_a | resync_b)
+
+    def tunnel_sync(self):
+        resync = False
+        try:
+            details = self.plugin_rpc.tunnel_sync(self.context, self.local_ip)
+            tunnels = details['tunnels']
+            for tunnel in tunnels:
+                if self.local_ip != tunnel['ip_address']:
+                    tun_name = 'gre-%s' % tunnel['id']
+                    self.tun_br.add_tunnel_port(tun_name, tunnel['ip_address'])
+        except Exception as e:
+            LOG.debug("Unable to sync tunnel IP %s: %s", self.local_ip, e)
+            resync = True
+        return resync
+
+    def rpc_loop(self):
+        sync = True
+        ports = set()
+        tunnel_sync = True
+
+        while True:
+            start = time.time()
+            if sync:
+                LOG.info("Agent out of sync with plugin!")
+                ports.clear()
+                sync = False
+
+            # Notify the plugin of tunnel IP
+            if tunnel_sync:
+                LOG.info("Agent tunnel out of sync with plugin!")
+                tunnel_sync = self.tunnel_sync()
+
+            port_info = self.update_ports(ports)
+
+            # notify plugin about port deltas
+            if port_info:
+                LOG.debug("Agent loop has new devices!")
+                # If treat devices fails - indicates must resync with plugin
+                sync = self.process_network_ports(port_info)
+                ports = port_info['current']
+
+            # sleep till end of polling interval
+            elapsed = (time.time() - start)
+            if (elapsed < self.polling_interval):
+                time.sleep(self.polling_interval - elapsed)
+            else:
+                LOG.debug("Loop iteration exceeded interval (%s vs. %s)!",
+                          self.polling_interval, elapsed)
+
+    def daemon_loop(self, db_connection_url):
+        if self.rpc:
+            self.rpc_loop()
+        else:
+            self.db_loop(db_connection_url)
+
 
 def main():
     cfg.CONF(args=sys.argv, project='quantum')
@@ -561,10 +879,15 @@ def main():
     polling_interval = cfg.CONF.AGENT.polling_interval
     reconnect_interval = cfg.CONF.DATABASE.reconnect_interval
     root_helper = cfg.CONF.AGENT.root_helper
+    rpc = cfg.CONF.AGENT.rpc
 
     # Determine API Version to use
     target_v2_api = cfg.CONF.AGENT.target_v2_api
 
+    # RPC only works with v2
+    if rpc and not target_v2_api:
+        rpc = False
+
     if enable_tunneling:
         # Get parameters for OVSQuantumTunnelAgent
         tun_br = cfg.CONF.OVS.tunnel_bridge
@@ -572,11 +895,11 @@ def main():
         local_ip = cfg.CONF.OVS.local_ip
         plugin = OVSQuantumTunnelAgent(integ_br, tun_br, local_ip, root_helper,
                                        polling_interval, reconnect_interval,
-                                       target_v2_api)
+                                       target_v2_api, rpc)
     else:
         # Get parameters for OVSQuantumAgent.
         plugin = OVSQuantumAgent(integ_br, root_helper, polling_interval,
-                                 reconnect_interval, target_v2_api)
+                                 reconnect_interval, target_v2_api, rpc)
 
     # Start everything.
     plugin.daemon_loop(db_connection_url)
@@ -584,4 +907,5 @@ def main():
     sys.exit(0)
 
 if __name__ == "__main__":
+    eventlet.monkey_patch()
     main()
index e11ac94c7fcd939a2d04566fddb547f563196ec8..36b787bbbbc356dcf5d4562484fbed9b693245ee 100644 (file)
@@ -17,6 +17,9 @@
 from quantum.openstack.common import cfg
 
 
+# Topic for tunnel notifications between the plugin and agent
+TUNNEL = 'tunnel'
+
 database_opts = [
     cfg.StrOpt('sql_connection', default='sqlite://'),
     cfg.IntOpt('sql_max_retries', default=-1),
@@ -37,6 +40,7 @@ agent_opts = [
     cfg.IntOpt('polling_interval', default=2),
     cfg.StrOpt('root_helper', default='sudo'),
     cfg.StrOpt('log_file', default=None),
+    cfg.BoolOpt('rpc', default=True),
 ]
 
 
index 45a391165ab082bcfae28e8e9edc0792bce12452..7298e439d7882409b635204d5aa3f7e42cd3a36f 100644 (file)
@@ -20,7 +20,9 @@ import logging
 
 from sqlalchemy.orm import exc
 
+from quantum.api import api_common
 from quantum.common import exceptions as q_exc
+from quantum.db import models_v2
 import quantum.db.api as db
 from quantum.openstack.common import cfg
 from quantum.plugins.openvswitch import ovs_models_v2
@@ -169,3 +171,62 @@ def release_vlan_id(vlan_id):
                 session.delete(record)
         except exc.NoResultFound:
             LOG.error("vlan id %s not found in release_vlan_id" % vlan_id)
+
+
+def get_port(port_id):
+    session = db.get_session()
+    try:
+        port = session.query(models_v2.Port).filter_by(id=port_id).one()
+    except exc.NoResultFound:
+        port = None
+    return port
+
+
+def set_port_status(port_id, status):
+    session = db.get_session()
+    try:
+        port = session.query(models_v2.Port).filter_by(id=port_id).one()
+        port['status'] = status
+        if status == api_common.PORT_STATUS_DOWN:
+            port['device_id'] = ''
+        session.merge(port)
+        session.flush()
+    except exc.NoResultFound:
+        raise q_exc.PortNotFound(port_id=port_id)
+
+
+def get_tunnels():
+    session = db.get_session()
+    try:
+        tunnels = session.query(ovs_models_v2.TunnelInfo).all()
+    except exc.NoResultFound:
+        return []
+    return [{'id': tunnel.id,
+             'ip_address': tunnel.ip_address} for tunnel in tunnels]
+
+
+def generate_tunnel_id(session):
+    try:
+        tunnels = session.query(ovs_models_v2.TunnelInfo).all()
+    except exc.NoResultFound:
+        return 0
+    tunnel_ids = ([tunnel['id'] for tunnel in tunnels])
+    if tunnel_ids:
+        id = max(tunnel_ids)
+    else:
+        id = 0
+    return id + 1
+
+
+def add_tunnel(ip):
+    session = db.get_session()
+    try:
+        tunnel = (session.query(ovs_models_v2.TunnelInfo).
+                  filter_by(ip_address=ip).one())
+    except exc.NoResultFound:
+        # Generate an id for the tunnel
+        id = generate_tunnel_id(session)
+        tunnel = ovs_models_v2.TunnelInfo(ip, id)
+        session.add(tunnel)
+        session.flush()
+    return tunnel
index 755dc8ee20a8a394aac495f0bd0f8ef62fd6d21b..aa427a41988f28ce7264e7b5b42aee63c849a384 100644 (file)
@@ -65,3 +65,18 @@ class TunnelIP(model_base.BASEV2):
 
     def __repr__(self):
         return "<TunnelIP(%s)>" % (self.ip_address)
+
+
+class TunnelInfo(model_base.BASEV2):
+    """Represents remote tunnel information in tunnel mode."""
+    __tablename__ = 'tunnel_info'
+
+    ip_address = Column(String(64), primary_key=True)
+    id = Column(Integer, nullable=False)
+
+    def __init__(self, ip_address, id):
+        self.ip_address = ip_address
+        self.id = id
+
+    def __repr__(self):
+        return "<TunnelInfo(%s,%s)>" % (self.ip_address, self.id)
index aa24cbdcaa1099a90a4d4119a7b0ca58baf73ec8..068f4adc4d1b53508c5c5a74e7821d9fe19e72bd 100644 (file)
 import logging
 import os
 
+from quantum.api import api_common
 from quantum.api.v2 import attributes
 from quantum.common import exceptions as q_exc
+from quantum.common import topics
 from quantum.common.utils import find_config_file
 from quantum.db import api as db
 from quantum.db import db_base_plugin_v2
 from quantum.db import models_v2
+from quantum.openstack.common import context
 from quantum.openstack.common import cfg
+from quantum.openstack.common import rpc
+from quantum.openstack.common.rpc import dispatcher
+from quantum.openstack.common.rpc import proxy
 from quantum.plugins.openvswitch.common import config
 from quantum.plugins.openvswitch import ovs_db_v2
 from quantum import policy
 
 
-LOG = logging.getLogger("ovs_quantum_plugin")
+LOG = logging.getLogger(__name__)
+
+
+class OVSRpcCallbacks():
+
+    # Set RPC API version to 1.0 by default.
+    RPC_API_VERSION = '1.0'
+
+    def __init__(self, context, notifier):
+        self.context = context
+        self.notifier = notifier
+
+    def create_rpc_dispatcher(self):
+        '''Get the rpc dispatcher for this manager.
+
+        If a manager would like to set an rpc API version, or support more than
+        one class as the target of rpc messages, override this method.
+        '''
+        return dispatcher.RpcDispatcher([self])
+
+    def get_device_details(self, context, **kwargs):
+        """Agent requests device details"""
+        agent_id = kwargs.get('agent_id')
+        device = kwargs.get('device')
+        LOG.debug("Device %s details requested from %s", device, agent_id)
+        port = ovs_db_v2.get_port(device)
+        if port:
+            vlan_id = ovs_db_v2.get_vlan(port['network_id'])
+            entry = {'device': device,
+                     'vlan_id': vlan_id,
+                     'network_id': port['network_id'],
+                     'port_id': port['id'],
+                     'admin_state_up': port['admin_state_up']}
+            # Set the port status to UP
+            ovs_db_v2.set_port_status(port['id'], api_common.PORT_STATUS_UP)
+        else:
+            entry = {'device': device}
+            LOG.debug("%s can not be found in database", device)
+        return entry
+
+    def update_device_down(self, context, **kwargs):
+        """Device no longer exists on agent"""
+        # (TODO) garyk - live migration and port status
+        agent_id = kwargs.get('agent_id')
+        device = kwargs.get('device')
+        LOG.debug("Device %s no longer exists on %s", device, agent_id)
+        port = ovs_db_v2.get_port(device)
+        if port:
+            entry = {'device': device,
+                     'exists': True}
+            # Set port status to DOWN
+            ovs_db_v2.set_port_status(port['id'], api_common.PORT_STATUS_DOWN)
+        else:
+            entry = {'device': device,
+                     'exists': False}
+            LOG.debug("%s can not be found in database", device)
+        return entry
+
+    def tunnel_sync(self, context, **kwargs):
+        """Update new tunnel.
+
+        Updates the datbase with the tunnel IP. All listening agents will also
+        be notified about the new tunnel IP.
+        """
+        tunnel_ip = kwargs.get('tunnel_ip')
+        # Update the database with the IP
+        tunnel = ovs_db_v2.add_tunnel(tunnel_ip)
+        tunnels = ovs_db_v2.get_tunnels()
+        entry = dict()
+        entry['tunnels'] = tunnels
+        # Notify all other listening agents
+        self.notifier.tunnel_update(self.context, tunnel.ip_address,
+                                    tunnel.id)
+        # Return the list of tunnels IP's to the agent
+        return entry
+
+
+class AgentNotifierApi(proxy.RpcProxy):
+    '''Agent side of the linux bridge rpc API.
+
+    API version history:
+        1.0 - Initial version.
+
+    '''
+
+    BASE_RPC_API_VERSION = '1.0'
+
+    def __init__(self, topic):
+        super(AgentNotifierApi, self).__init__(
+            topic=topic, default_version=self.BASE_RPC_API_VERSION)
+        self.topic_network_delete = topics.get_topic_name(topic,
+                                                          topics.NETWORK,
+                                                          topics.DELETE)
+        self.topic_port_update = topics.get_topic_name(topic,
+                                                       topics.PORT,
+                                                       topics.UPDATE)
+        self.topic_tunnel_update = topics.get_topic_name(topic,
+                                                         config.TUNNEL,
+                                                         topics.UPDATE)
+
+    def network_delete(self, context, network_id):
+        self.fanout_cast(context,
+                         self.make_msg('network_delete',
+                                       network_id=network_id),
+                         topic=self.topic_network_delete)
+
+    def port_update(self, context, port, vlan_id):
+        self.fanout_cast(context,
+                         self.make_msg('port_update',
+                                       port=port,
+                                       vlan_id=vlan_id),
+                         topic=self.topic_port_update)
+
+    def tunnel_update(self, context, tunnel_ip, tunnel_id):
+        self.fanout_cast(context,
+                         self.make_msg('tunnel_update',
+                                       tunnel_ip=tunnel_ip,
+                                       tunnel_id=tunnel_id),
+                         topic=self.topic_tunnel_update)
 
 
 class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2):
@@ -67,6 +191,25 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2):
 
         # update the vlan_id table based on current configuration
         ovs_db_v2.update_vlan_id_pool()
+        self.rpc = cfg.CONF.AGENT.rpc
+        if cfg.CONF.AGENT.rpc and cfg.CONF.AGENT.target_v2_api:
+            self.setup_rpc()
+        if not cfg.CONF.AGENT.target_v2_api:
+            self.rpc = False
+
+    def setup_rpc(self):
+        # RPC support
+        self.topic = topics.PLUGIN
+        self.context = context.RequestContext('quantum', 'quantum',
+                                              is_admin=False)
+        self.conn = rpc.create_connection(new=True)
+        self.notifier = AgentNotifierApi(topics.AGENT)
+        self.callbacks = OVSRpcCallbacks(self.context, self.notifier)
+        self.dispatcher = self.callbacks.create_rpc_dispatcher()
+        self.conn.create_consumer(self.topic, self.dispatcher,
+                                  fanout=False)
+        # Consume from all consumers in a thread
+        self.conn.consume_in_thread()
 
     # TODO(rkukura) Use core mechanism for attribute authorization
     # when available.
@@ -114,6 +257,8 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2):
         vlan_id = ovs_db_v2.get_vlan(id)
         result = super(OVSQuantumPluginV2, self).delete_network(context, id)
         ovs_db_v2.release_vlan_id(vlan_id)
+        if self.rpc:
+            self.notifier.network_delete(self.context, id)
         return result
 
     def get_network(self, context, id, fields=None, verbose=None):
@@ -129,3 +274,14 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2):
             self._extend_network_dict(context, net)
         # TODO(rkukura): Filter on extended attributes.
         return [self._fields(net, fields) for net in nets]
+
+    def update_port(self, context, id, port):
+        if self.rpc:
+            original_port = super(OVSQuantumPluginV2, self).get_port(context,
+                                                                     id)
+        port = super(OVSQuantumPluginV2, self).update_port(context, id, port)
+        if self.rpc:
+            if original_port['admin_state_up'] != port['admin_state_up']:
+                vlan_id = ovs_db_v2.get_vlan(port['network_id'])
+                self.notifier.port_update(self.context, port, vlan_id)
+        return port
diff --git a/quantum/plugins/openvswitch/tests/unit/test_rpcapi.py b/quantum/plugins/openvswitch/tests/unit/test_rpcapi.py
new file mode 100644 (file)
index 0000000..11c3506
--- /dev/null
@@ -0,0 +1,107 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012, Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+Unit Tests for openvswitch rpc
+"""
+
+import stubout
+import unittest2
+
+from quantum.agent import rpc as agent_rpc
+from quantum.common import topics
+from quantum.openstack.common import context
+from quantum.openstack.common import rpc
+from quantum.plugins.openvswitch import ovs_quantum_plugin as povs
+from quantum.plugins.openvswitch.common import config
+
+
+class rpcApiTestCase(unittest2.TestCase):
+
+    def _test_ovs_api(self, rpcapi, topic, method, rpc_method, **kwargs):
+        ctxt = context.RequestContext('fake_user', 'fake_project')
+        expected_retval = 'foo' if method == 'call' else None
+        expected_msg = rpcapi.make_msg(method, **kwargs)
+        expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION
+        if rpc_method == 'cast' and method == 'run_instance':
+            kwargs['call'] = False
+
+        self.fake_args = None
+        self.fake_kwargs = None
+
+        def _fake_rpc_method(*args, **kwargs):
+            self.fake_args = args
+            self.fake_kwargs = kwargs
+            if expected_retval:
+                return expected_retval
+
+        self.stubs = stubout.StubOutForTesting()
+        self.stubs.Set(rpc, rpc_method, _fake_rpc_method)
+
+        retval = getattr(rpcapi, method)(ctxt, **kwargs)
+
+        self.assertEqual(retval, expected_retval)
+        expected_args = [ctxt, topic, expected_msg]
+
+        for arg, expected_arg in zip(self.fake_args, expected_args):
+            self.assertEqual(arg, expected_arg)
+
+    def test_delete_network(self):
+        rpcapi = povs.AgentNotifierApi(topics.AGENT)
+        self._test_ovs_api(rpcapi,
+                           topics.get_topic_name(topics.AGENT,
+                                                 topics.NETWORK,
+                                                 topics.DELETE),
+                           'network_delete', rpc_method='fanout_cast',
+                           network_id='fake_request_spec')
+
+    def test_port_update(self):
+        rpcapi = povs.AgentNotifierApi(topics.AGENT)
+        self._test_ovs_api(rpcapi,
+                           topics.get_topic_name(topics.AGENT,
+                                                 topics.PORT,
+                                                 topics.UPDATE),
+                           'port_update', rpc_method='fanout_cast',
+                           port='fake_port', vlan_id='fake_vlan_id')
+
+    def test_tunnel_update(self):
+        rpcapi = povs.AgentNotifierApi(topics.AGENT)
+        self._test_ovs_api(rpcapi,
+                           topics.get_topic_name(topics.AGENT,
+                                                 config.TUNNEL,
+                                                 topics.UPDATE),
+                           'tunnel_update', rpc_method='fanout_cast',
+                           tunnel_ip='fake_ip', tunnel_id='fake_id')
+
+    def test_device_details(self):
+        rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
+        self._test_ovs_api(rpcapi, topics.PLUGIN,
+                           'get_device_details', rpc_method='call',
+                           device='fake_device',
+                           agent_id='fake_agent_id')
+
+    def test_update_device_down(self):
+        rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
+        self._test_ovs_api(rpcapi, topics.PLUGIN,
+                           'update_device_down', rpc_method='call',
+                           device='fake_device',
+                           agent_id='fake_agent_id')
+
+    def test_tunnel_sync(self):
+        rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
+        self._test_ovs_api(rpcapi, topics.PLUGIN,
+                           'tunnel_sync', rpc_method='call',
+                           tunnel_ip='fake_tunnel_ip')
index 5fd954591c9f359f8a8ba30331f98ef43dd3d72f..6589b6425dafb194d74c674aa7d4610dd8fb455c 100644 (file)
@@ -81,7 +81,7 @@ class TunnelTest(unittest.TestCase):
         b = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE,
                                                     self.TUN_BRIDGE,
                                                     '10.0.0.1',
-                                                    'sudo', 2, 2)
+                                                    'sudo', 2, 2, False, False)
         self.mox.VerifyAll()
 
     def testProvisionLocalVlan(self):
@@ -98,7 +98,7 @@ class TunnelTest(unittest.TestCase):
         a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE,
                                                     self.TUN_BRIDGE,
                                                     '10.0.0.1',
-                                                    'sudo', 2, 2)
+                                                    'sudo', 2, 2, False, False)
         a.available_local_vlans = set([LV_ID])
         a.provision_local_vlan(NET_UUID, LS_ID)
         self.mox.VerifyAll()
@@ -112,7 +112,7 @@ class TunnelTest(unittest.TestCase):
         a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE,
                                                     self.TUN_BRIDGE,
                                                     '10.0.0.1',
-                                                    'sudo', 2, 2)
+                                                    'sudo', 2, 2, False, False)
         a.available_local_vlans = set()
         a.local_vlan_map[NET_UUID] = LVM
         a.reclaim_local_vlan(NET_UUID, LVM)
@@ -128,7 +128,7 @@ class TunnelTest(unittest.TestCase):
         a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE,
                                                     self.TUN_BRIDGE,
                                                     '10.0.0.1',
-                                                    'sudo', 2, 2)
+                                                    'sudo', 2, 2, False, False)
         a.local_vlan_map[NET_UUID] = LVM
         a.port_bound(VIF_PORT, NET_UUID, LS_ID)
         self.mox.VerifyAll()
@@ -138,7 +138,7 @@ class TunnelTest(unittest.TestCase):
         a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE,
                                                     self.TUN_BRIDGE,
                                                     '10.0.0.1',
-                                                    'sudo', 2, 2)
+                                                    'sudo', 2, 2, False, False)
         a.available_local_vlans = set([LV_ID])
         a.local_vlan_map[NET_UUID] = LVM
         a.port_unbound(VIF_PORT, NET_UUID)
@@ -155,7 +155,7 @@ class TunnelTest(unittest.TestCase):
         a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE,
                                                     self.TUN_BRIDGE,
                                                     '10.0.0.1',
-                                                    'sudo', 2, 2)
+                                                    'sudo', 2, 2, False, False)
         a.available_local_vlans = set([LV_ID])
         a.local_vlan_map[NET_UUID] = LVM
         a.port_dead(VIF_PORT)
index 4bc041050348b1b2db7394c525fa11e8010d1784..44165790103bc71538f347002b70779139f8e2a7 100644 (file)
@@ -1,7 +1,7 @@
 Paste
 PasteDeploy==1.5.0
 Routes>=1.12.3
-eventlet>=0.9.12
+eventlet>=0.9.17
 httplib2
 iso8601>=0.1.4
 lxml