]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
RPC support for Linux Bridge Plugin and Agent
authorGary Kotton <gkotton@redhat.com>
Tue, 10 Jul 2012 15:28:08 +0000 (11:28 -0400)
committerGary Kotton <gkotton@redhat.com>
Wed, 8 Aug 2012 14:09:47 +0000 (10:09 -0400)
blueprint scalable-agent-comms

This is the first stage of the blueprint. This adds support to the linux bridge
plugin.

The development followed the design described in:
https://docs.google.com/document/d/1MbcBA2Os4b98ybdgAw2qe_68R1NG6KMh8zdZKgOlpvg/edit?pli=1

Change-Id: I4004c05a63ce49f020c2016c8763e73238b465a7

19 files changed:
bin/quantum-server
etc/quantum.conf
etc/quantum/plugins/linuxbridge/linuxbridge_conf.ini
quantum/agent/linux/utils.py
quantum/agent/rpc.py [new file with mode: 0644]
quantum/common/topics.py [new file with mode: 0644]
quantum/openstack/common/cfg.py
quantum/openstack/common/exception.py
quantum/openstack/common/log.py
quantum/openstack/common/rpc/common.py
quantum/openstack/common/rpc/impl_qpid.py
quantum/openstack/common/timeutils.py
quantum/plugins/linuxbridge/agent/linuxbridge_quantum_agent.py
quantum/plugins/linuxbridge/common/config.py
quantum/plugins/linuxbridge/db/l2network_db.py
quantum/plugins/linuxbridge/lb_quantum_plugin.py
quantum/plugins/linuxbridge/tests/unit/_test_linuxbridgeAgent.py
quantum/plugins/linuxbridge/tests/unit/test_rpcapi.py [new file with mode: 0644]
tools/pip-requires

index 1ea7a16190e47243c04d70657752bd450d12e6ef..25ed9f8dbc7c388a73da431a4bdb1bfbf7b897ef 100755 (executable)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import eventlet
 import os
 import sys
 sys.path.insert(0, os.getcwd())
 from quantum.server import main as server
 
+eventlet.monkey_patch()
 server()
index 92afe415552e9c87f295f06500e201c0e8f89b01..97584d271e14618f03225c45ec42e4e0b8d63503 100644 (file)
@@ -39,41 +39,9 @@ api_paste_config = api-paste.ini
 # Maximum amount of retries to generate a unique MAC address
 # mac_generation_retries = 16
 
-[QUOTAS]
-# number of networks allowed per tenant
-# quota_network = 10
-
-# number of subnets allowed per tenant
-# quota_subnet = 10
-
-# number of ports allowed per tenant
-# quota_port = 50
-
-# default driver to use for quota checks
-# quota_driver = quantum.quota.ConfDriver
-
-# ============ Notification System Options =====================
-
-# Notifications can be sent when network/subnet/port are create, updated or deleted.
-# There are four methods of sending notifications, logging (via the
-# log_file directive), rpc (via a message queue),
-# noop (no notifications sent, the default) or list of them
-
-# Defined in notifier api
-# notification_driver = quantum.openstack.common.notifier.no_op_notifier
-# default_notification_level = INFO
-# myhost = myhost.com
-# default_publisher_id = $myhost
-
-# Defined in rabbit_notifier for rpc way
-# notification_topics = notifications
-
-# Defined in list_notifier
-# list_notifier_drivers = quantum.openstack.common.notifier.no_op_notifier
-
-# Defined in rpc __init__
+# RPC configuration options. Defined in rpc __init__
 # The messaging module to use, defaults to kombu.
-# rpc_backend =quantum.openstack.common.notifier.rpc.impl_kombu
+# rpc_backend = quantum.openstack.common.notifier.rpc.impl_kombu
 # Size of RPC thread pool
 # rpc_thread_pool_size = 64,
 # Size of RPC connection pool
@@ -85,8 +53,9 @@ api_paste_config = api-paste.ini
 # Modules of exceptions that are permitted to be recreated
 # upon receiving exception data from an rpc call.
 # allowed_rpc_exception_modules = quantum.openstack.common.exception, nova.exception
-# AMQP exchange to connect to if using RabbitMQ or Qpid
-# control_exchange = nova
+# AMQP exchange to connect to if using RabbitMQ or QPID
+control_exchange = quantum
+
 # If passed, use a fake RabbitMQ provider
 # fake_rabbit = False
 
@@ -152,11 +121,35 @@ api_paste_config = api-paste.ini
 # ZeroMQ bind address. Should be a wildcard (*), an ethernet interface, or IP.
 # The "host" option should point or resolve to this address.
 # rpc_zmq_bind_address = *
-# MatchMaker driver
-# rpc_zmq_matchmaker = openstack.common.rpc.matchmaker.MatchMakerLocalhost
-# ZeroMQ receiver listening port
-# rpc_zmq_port = 9501
-# Number of ZeroMQ contexts, defaults to 1
-# rpc_zmq_contexts = 1
-# Directory for holding IPC sockets
-# rpc_zmq_ipc_dir = /var/run/openstack
+
+[QUOTAS]
+# number of networks allowed per tenant
+# quota_network = 10
+
+# number of subnets allowed per tenant
+# quota_subnet = 10
+
+# number of ports allowed per tenant
+# quota_port = 50
+
+# default driver to use for quota checks
+# quota_driver = quantum.quota.ConfDriver
+
+# ============ Notification System Options =====================
+
+# Notifications can be sent when network/subnet/port are create, updated or deleted.
+# There are four methods of sending notifications, logging (via the
+# log_file directive), rpc (via a message queue),
+# noop (no notifications sent, the default) or list of them
+
+# Defined in notifier api
+# notification_driver = quantum.openstack.common.notifier.no_op_notifier
+# default_notification_level = INFO
+# myhost = myhost.com
+# default_publisher_id = $myhost
+
+# Defined in rabbit_notifier for rpc way
+# notification_topics = notifications
+
+# Defined in list_notifier
+# list_notifier_drivers = quantum.openstack.common.notifier.no_op_notifier
index 59dcd138239d9892c2e74d44d8146159ea77cd62..f494e5b28a60611246ba9c6ecd22eea576996604 100644 (file)
@@ -1,6 +1,6 @@
 [VLANS]
-vlan_start=1000
-vlan_end=3000
+vlan_start = 1000
+vlan_end = 3000
 
 [DATABASE]
 # This line MUST be changed to actually run the plugin.
@@ -25,3 +25,7 @@ polling_interval = 2
 # Change to "sudo quantum-rootwrap" to limit commands that can be run
 # as root.
 root_helper = sudo
+# Use Quantumv2 API
+# target_v2_api = False
+# Use RPC messaging to interface between agent and plugin
+# rpc = True
index a2a0294c8bcefcfadaba0140fe4719ea3439ecb0..bb218a0abefe68939a4e58c76ec8d72ecf518bd5 100644 (file)
 #
 # @author: Juliano Martinez, Locaweb.
 
+import fcntl
 import logging
 import os
 import shlex
+import socket
+import struct
 import subprocess
 
 LOG = logging.getLogger(__name__)
@@ -50,3 +53,14 @@ def execute(cmd, root_helper=None, process_input=None, addl_env=None,
         raise RuntimeError(m)
 
     return return_stderr and (_stdout, _stderr) or _stdout
+
+
+def get_interface_mac(interface):
+    DEVICE_NAME_LEN = 15
+    MAC_START = 18
+    MAC_END = 24
+    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+    info = fcntl.ioctl(s.fileno(), 0x8927,
+                       struct.pack('256s', interface[:DEVICE_NAME_LEN]))
+    return ''.join(['%02x:' % ord(char)
+                   for char in info[MAC_START:MAC_END]])[:-1]
diff --git a/quantum/agent/rpc.py b/quantum/agent/rpc.py
new file mode 100644 (file)
index 0000000..7eb85f1
--- /dev/null
@@ -0,0 +1,36 @@
+# Copyright (c) 2012 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from quantum.common import topics
+from quantum.openstack.common import rpc
+
+
+def create_consumers(dispatcher, prefix, topic_details):
+    """Create agent RPC consumers.
+
+    :param dispatcher: The dispatcher to process the incoming messages.
+    :param prefix: Common prefix for the plugin/agent message queues.
+    :param topic_details: A list of topics. Each topic has a name and a
+                          operation.
+
+    :returns: A common Connection.
+    """
+
+    connection = rpc.create_connection(new=True)
+    for topic, operation in topic_details:
+        topic_name = topics.get_topic_name(prefix, topic, operation)
+        connection.create_consumer(topic_name, dispatcher, fanout=True)
+    connection.consume_in_thread()
+    return connection
diff --git a/quantum/common/topics.py b/quantum/common/topics.py
new file mode 100644 (file)
index 0000000..a5521c0
--- /dev/null
@@ -0,0 +1,42 @@
+# Copyright (c) 2012 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+NETWORK = 'network'
+SUBNET = 'subnet'
+PORT = 'port'
+
+CREATE = 'create'
+DELETE = 'delete'
+UPDATE = 'update'
+
+AGENT = 'q-agent-notifier'
+PLUGIN = 'q-plugin'
+
+
+def get_topic_name(prefix, table, operation):
+    """Create a topic name.
+
+    The topic name needs to be synced between the agent and the
+    plugin. The plugin will send a fanout message to all of the
+    listening agents so that the agents in turn can perform their
+    updates accordingly.
+
+    :param prefix: Common prefix for the plugin/agent message queues.
+    :param table: The table in question (NETWORK, SUBNET, PORT).
+    :param operation: The operation that invokes notification (CREATE,
+                      DELETE, UPDATE)
+    :returns: The topic name.
+    """
+    return '%s-%s-%s' % (prefix, table, operation)
index 9ad115f0f5eb7bf5f77b6dbe9244b391af5b7528..fa3a350f090baa2934d2b5d30c343a873f2fcdc5 100644 (file)
@@ -464,7 +464,7 @@ def _is_opt_registered(opts, opt):
     :raises: DuplicateOptError if a naming conflict is detected
     """
     if opt.dest in opts:
-        if opts[opt.dest]['opt'] is not opt:
+        if opts[opt.dest]['opt'] != opt:
             raise DuplicateOptError(opt.name)
         return True
     else:
@@ -527,6 +527,9 @@ class Opt(object):
         else:
             self.deprecated_name = None
 
+    def __ne__(self, another):
+        return vars(self) != vars(another)
+
     def _get_from_config_parser(self, cparser, section):
         """Retrieves the option value from a MultiConfigParser object.
 
index e5da94b9496518923bf69507a22065589e7f4bfa..ba32da550b01a1b353becc043b544fecde474eb7 100644 (file)
@@ -19,7 +19,6 @@
 Exceptions common to OpenStack projects
 """
 
-import itertools
 import logging
 
 
index 0bf986bf13dea615a483c7d832a2403d5ad1f34c..f9af4a785d9c8ae369702cc993bab48f3c556fcb 100644 (file)
@@ -257,16 +257,18 @@ class PublishErrorsHandler(logging.Handler):
                             dict(error=record.msg))
 
 
-def handle_exception(type, value, tb):
-    extra = {}
-    if CONF.verbose:
-        extra['exc_info'] = (type, value, tb)
-    getLogger().critical(str(value), **extra)
+def _create_logging_excepthook(product_name):
+    def logging_excepthook(type, value, tb):
+        extra = {}
+        if CONF.verbose:
+            extra['exc_info'] = (type, value, tb)
+        getLogger(product_name).critical(str(value), **extra)
+    return logging_excepthook
 
 
 def setup(product_name):
     """Setup logging."""
-    sys.excepthook = handle_exception
+    sys.excepthook = _create_logging_excepthook(product_name)
 
     if CONF.log_config:
         try:
@@ -357,17 +359,6 @@ def _setup_logging_from_conf(product_name):
         for handler in log_root.handlers:
             logger.addHandler(handler)
 
-    # NOTE(jkoelker) Clear the handlers for the root logger that was setup
-    #                by basicConfig in nova/__init__.py and install the
-    #                NullHandler.
-    root = logging.getLogger()
-    for handler in root.handlers:
-        root.removeHandler(handler)
-    handler = NullHandler()
-    handler.setFormatter(logging.Formatter())
-    root.addHandler(handler)
-
-
 _loggers = {}
 
 
@@ -405,8 +396,12 @@ class LegacyFormatter(logging.Formatter):
 
     def format(self, record):
         """Uses contextstring if request_id is set, otherwise default."""
-        if 'instance' not in record.__dict__:
-            record.__dict__['instance'] = ''
+        # NOTE(sdague): default the fancier formating params
+        # to an empty string so we don't throw an exception if
+        # they get used
+        for key in ('instance', 'color'):
+            if key not in record.__dict__:
+                record.__dict__[key] = ''
 
         if record.__dict__.get('request_id', None):
             self._fmt = CONF.logging_context_format_string
index 9ef1fc1d9cb3d02a846b2731b6dcdc8cedfa2100..84a6c723a6e1575fb19b770bf251ee57b8b971b2 100644 (file)
@@ -108,7 +108,7 @@ class Connection(object):
         """
         raise NotImplementedError()
 
-    def create_consumer(self, conf, topic, proxy, fanout=False):
+    def create_consumer(self, topic, proxy, fanout=False):
         """Create a consumer on this connection.
 
         A consumer is associated with a message queue on the backend message
@@ -117,7 +117,6 @@ class Connection(object):
         off of the queue will determine which method gets called on the proxy
         object.
 
-        :param conf:  An openstack.common.cfg configuration object.
         :param topic: This is a name associated with what to consume from.
                       Multiple instances of a service may consume from the same
                       topic. For example, all instances of nova-compute consume
@@ -133,7 +132,7 @@ class Connection(object):
         """
         raise NotImplementedError()
 
-    def create_worker(self, conf, topic, proxy, pool_name):
+    def create_worker(self, topic, proxy, pool_name):
         """Create a worker on this connection.
 
         A worker is like a regular consumer of messages directed to a
@@ -143,7 +142,6 @@ class Connection(object):
         be asked to process it. Load is distributed across the members
         of the pool in round-robin fashion.
 
-        :param conf:  An openstack.common.cfg configuration object.
         :param topic: This is a name associated with what to consume from.
                       Multiple instances of a service may consume from the same
                       topic.
index ec9140ed1082b02b84632aad9c61c499df422f12..b222a8089e672610ac27aa857e0882c00063b379 100644 (file)
@@ -329,7 +329,7 @@ class Connection(object):
         if self.conf.qpid_reconnect_interval:
             self.connection.reconnect_interval = (
                 self.conf.qpid_reconnect_interval)
-        self.connection.hearbeat = self.conf.qpid_heartbeat
+        self.connection.heartbeat = self.conf.qpid_heartbeat
         self.connection.protocol = self.conf.qpid_protocol
         self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
 
index 5eeaf70aa49fc7b3d35ad7fa5e4d4d7be0f57106..4416a3b19e5a958306ea60296c037bafae346c71 100644 (file)
@@ -21,7 +21,6 @@ Time related utilities and helper functions.
 
 import calendar
 import datetime
-import time
 
 import iso8601
 
index 00629c331265f6cdf3889f7b6bbed5e1c9649ecb..1acae609e01bf89ba84b05ca4ea749d5b9558dcd 100755 (executable)
@@ -30,10 +30,18 @@ import subprocess
 import sys
 import time
 
+import eventlet
+import pyudev
 from sqlalchemy.ext.sqlsoup import SqlSoup
 
-from quantum.openstack.common import cfg
+from quantum.agent.rpc import create_consumers
 from quantum.common import config as logging_config
+from quantum.common import topics
+from quantum.openstack.common import cfg
+from quantum.openstack.common import context
+from quantum.openstack.common import rpc
+from quantum.openstack.common.rpc import dispatcher
+from quantum.openstack.common.rpc import proxy
 from quantum.plugins.linuxbridge.common import config
 
 from quantum.agent.linux import utils
@@ -312,25 +320,97 @@ class LinuxBridge:
             LOG.debug("Done deleting subinterface %s" % interface)
 
 
-class LinuxBridgeQuantumAgent:
+class PluginApi(proxy.RpcProxy):
+    '''Agent side of the linux bridge rpc API.
+
+    API version history:
+        1.0 - Initial version.
+
+    '''
+
+    BASE_RPC_API_VERSION = '1.0'
+
+    def __init__(self, topic):
+        super(PluginApi, self).__init__(
+            topic=topic, default_version=self.BASE_RPC_API_VERSION)
+
+    def get_device_details(self, context, device, agent_id):
+        return self.call(context,
+                         self.make_msg('get_device_details', device=device,
+                                       agent_id=agent_id),
+                         topic=self.topic)
+
+    def update_device_down(self, context, device, agent_id):
+        return self.call(context,
+                         self.make_msg('update_device_down', device=device,
+                                       agent_id=agent_id),
+                         topic=self.topic)
+
+
+class LinuxBridgeRpcCallbacks():
+
+    # Set RPC API version to 1.0 by default.
+    RPC_API_VERSION = '1.0'
+
+    def __init__(self, context, linux_br):
+        self.context = context
+        self.linux_br = linux_br
+
+    def network_delete(self, context, **kwargs):
+        LOG.debug("network_delete received")
+        network_id = kwargs.get('network_id')
+        bridge_name = self.linux_br.get_bridge_name(network_id)
+        # (TODO) garyk delete the bridge interface
+        LOG.debug("Delete %s", bridge_name)
+
+    def port_update(self, context, **kwargs):
+        LOG.debug("port_update received")
+        port = kwargs.get('port')
+        if port['admin_state_up']:
+            vlan_id = kwargs.get('vlan_id')
+            # create the networking for the port
+            self.linux_br.add_interface(port['network_id'],
+                                        vlan_id,
+                                        port['id'])
+        else:
+            bridge_name = self.linux_br.get_bridge_name(port['network_id'])
+            tap_device_name = self.linux_br.get_tap_device_name(port['id'])
+            self.linux_br.remove_interface(bridge_name, tap_device_name)
+
+    def create_rpc_dispatcher(self):
+        '''Get the rpc dispatcher for this manager.
+
+        If a manager would like to set an rpc API version, or support more than
+        one class as the target of rpc messages, override this method.
+        '''
+        return dispatcher.RpcDispatcher([self])
+
+
+class LinuxBridgeQuantumAgentDB:
 
     def __init__(self, br_name_prefix, physical_interface, polling_interval,
-                 reconnect_interval, root_helper, target_v2_api):
+                 reconnect_interval, root_helper, target_v2_api,
+                 db_connection_url):
         self.polling_interval = polling_interval
-        self.reconnect_interval = reconnect_interval
         self.root_helper = root_helper
         self.setup_linux_bridge(br_name_prefix, physical_interface)
-        self.db_connected = False
         self.target_v2_api = target_v2_api
+        self.reconnect_interval = reconnect_interval
+        self.db_connected = False
+        self.db_connection_url = db_connection_url
 
     def setup_linux_bridge(self, br_name_prefix, physical_interface):
         self.linux_br = LinuxBridge(br_name_prefix, physical_interface,
                                     self.root_helper)
 
-    def process_port_binding(self, port_id, network_id, interface_id,
-                             vlan_id):
+    def process_port_binding(self, network_id, interface_id, vlan_id):
         return self.linux_br.add_interface(network_id, vlan_id, interface_id)
 
+    def remove_port_binding(self, network_id, interface_id):
+        bridge_name = self.linux_br.get_bridge_name(network_id)
+        tap_device_name = self.linux_br.get_tap_device_name(interface_id)
+        return self.linux_br.remove_interface(bridge_name, tap_device_name)
+
     def process_unplugged_interfaces(self, plugged_interfaces):
         """
         If there are any tap devices that are not corresponding to the
@@ -434,8 +514,7 @@ class LinuxBridgeQuantumAgent:
             interface_id = pb['interface_id']
 
             vlan_id = str(vlan_bindings[pb['network_id']]['vlan_id'])
-            if self.process_port_binding(port_id,
-                                         pb['network_id'],
+            if self.process_port_binding(pb['network_id'],
                                          interface_id,
                                          vlan_id):
                 if self.target_v2_api:
@@ -466,7 +545,7 @@ class LinuxBridgeQuantumAgent:
         return {VLAN_BINDINGS: vlan_bindings,
                 PORT_BINDINGS: port_bindings}
 
-    def daemon_loop(self, db_connection_url):
+    def daemon_loop(self):
         old_vlan_bindings = {}
         old_port_bindings = []
         self.db_connected = False
@@ -474,7 +553,7 @@ class LinuxBridgeQuantumAgent:
         while True:
             if not self.db_connected:
                 time.sleep(self.reconnect_interval)
-                db = SqlSoup(db_connection_url)
+                db = SqlSoup(self.db_connection_url)
                 self.db_connected = True
                 LOG.info("Connecting to database \"%s\" on %s" %
                          (db.engine.url.database, db.engine.url.host))
@@ -486,6 +565,158 @@ class LinuxBridgeQuantumAgent:
             time.sleep(self.polling_interval)
 
 
+class LinuxBridgeQuantumAgentRPC:
+
+    def __init__(self, br_name_prefix, physical_interface, polling_interval,
+                 root_helper):
+        self.polling_interval = polling_interval
+        self.root_helper = root_helper
+        self.setup_linux_bridge(br_name_prefix, physical_interface)
+        self.setup_rpc(physical_interface)
+
+    def setup_rpc(self, physical_interface):
+        mac = utils.get_interface_mac(physical_interface)
+        self.agent_id = '%s%s' % ('lb', (mac.replace(":", "")))
+        self.topic = topics.AGENT
+        self.plugin_rpc = PluginApi(topics.PLUGIN)
+
+        # RPC network init
+        self.context = context.RequestContext('quantum', 'quantum',
+                                              is_admin=False)
+        # Handle updates from service
+        self.callbacks = LinuxBridgeRpcCallbacks(self.context,
+                                                 self.linux_br)
+        self.dispatcher = self.callbacks.create_rpc_dispatcher()
+        # Define the listening consumers for the agent
+        consumers = [[topics.PORT, topics.UPDATE],
+                     [topics.NETWORK, topics.DELETE]]
+        self.connection = create_consumers(self.dispatcher, self.topic,
+                                           consumers)
+        self.udev = pyudev.Context()
+        monitor = pyudev.Monitor.from_netlink(self.udev)
+        monitor.filter_by('net')
+
+    def setup_linux_bridge(self, br_name_prefix, physical_interface):
+        self.linux_br = LinuxBridge(br_name_prefix, physical_interface,
+                                    self.root_helper)
+
+    def process_port_binding(self, network_id, interface_id, vlan_id):
+        return self.linux_br.add_interface(network_id, vlan_id, interface_id)
+
+    def remove_port_binding(self, network_id, interface_id):
+        bridge_name = self.linux_br.get_bridge_name(network_id)
+        tap_device_name = self.linux_br.get_tap_device_name(interface_id)
+        return self.linux_br.remove_interface(bridge_name, tap_device_name)
+
+    def update_devices(self, registered_devices):
+        devices = self.udev_get_all_tap_devices()
+        if devices == registered_devices:
+            return
+        added = devices - registered_devices
+        removed = registered_devices - devices
+        return {'current': devices,
+                'added': added,
+                'removed': removed}
+
+    def udev_get_all_tap_devices(self):
+        devices = set()
+        for device in self.udev.list_devices(subsystem='net'):
+            name = self.udev_get_name(device)
+            if self.is_tap_device(name):
+                devices.add(name)
+        return devices
+
+    def is_tap_device(self, name):
+        return name.startswith(TAP_INTERFACE_PREFIX)
+
+    def udev_get_name(self, device):
+        return device.sys_name
+
+    def process_network_devices(self, device_info):
+        resync_a = False
+        resync_b = False
+        if 'added' in device_info:
+            resync_a = self.treat_devices_added(device_info['added'])
+        if 'removed' in device_info:
+            resync_b = self.treat_devices_removed(device_info['removed'])
+        # If one of the above operations fails => resync with plugin
+        return (resync_a | resync_b)
+
+    def treat_devices_added(self, devices):
+        resync = False
+        for device in devices:
+            LOG.info("Port %s added", device)
+            try:
+                details = self.plugin_rpc.get_device_details(self.context,
+                                                             device,
+                                                             self.agent_id)
+            except Exception as e:
+                LOG.debug("Unable to get port details for %s: %s", device, e)
+                resync = True
+                continue
+            if 'port_id' in details:
+                LOG.info("Port %s updated. Details: %s", device, details)
+                if details['admin_state_up']:
+                    # create the networking for the port
+                    self.process_port_binding(details['network_id'],
+                                              details['port_id'],
+                                              details['vlan_id'])
+                else:
+                    self.remove_port_binding(details['network_id'],
+                                             details['port_id'])
+            else:
+                LOG.debug("Device %s not defined on plugin", device)
+        return resync
+
+    def treat_devices_removed(self, devices):
+        resync = False
+        for device in devices:
+            LOG.info("Attachment %s removed", device)
+            try:
+                details = self.plugin_rpc.update_device_down(self.context,
+                                                             device,
+                                                             self.agent_id)
+            except Exception as e:
+                LOG.debug("port_removed failed for %s: %s", device, e)
+                resync = True
+            if details['exists']:
+                LOG.info("Port %s updated.", device)
+                # Nothing to do regarding local networking
+            else:
+                LOG.debug("Device %s not defined on plugin", device)
+        return resync
+
+    def daemon_loop(self):
+        sync = True
+        devices = set()
+
+        LOG.info("LinuxBridge Agent RPC Daemon Started!")
+
+        while True:
+            start = time.time()
+            if sync:
+                LOG.info("Agent out of sync with plugin!")
+                devices.clear()
+                sync = False
+
+            device_info = self.update_devices(devices)
+
+            # notify plugin about device deltas
+            if device_info:
+                LOG.debug("Agent loop has new devices!")
+                # If treat devices fails - indicates must resync with plugin
+                sync = self.process_network_devices(device_info)
+                devices = device_info['current']
+
+            # sleep till end of polling interval
+            elapsed = (time.time() - start)
+            if (elapsed < self.polling_interval):
+                time.sleep(self.polling_interval - elapsed)
+            else:
+                LOG.debug("Loop iteration exceeded interval (%s vs. %s)!",
+                          self.polling_interval, elapsed)
+
+
 def main():
     cfg.CONF(args=sys.argv, project='quantum')
 
@@ -497,15 +728,29 @@ def main():
     polling_interval = cfg.CONF.AGENT.polling_interval
     reconnect_interval = cfg.CONF.DATABASE.reconnect_interval
     root_helper = cfg.CONF.AGENT.root_helper
-    'Establish database connection and load models'
-    db_connection_url = cfg.CONF.DATABASE.sql_connection
-    plugin = LinuxBridgeQuantumAgent(br_name_prefix, physical_interface,
-                                     polling_interval, reconnect_interval,
-                                     root_helper, cfg.CONF.AGENT.target_v2_api)
+    rpc = cfg.CONF.AGENT.rpc
+    if not cfg.CONF.AGENT.target_v2_api:
+        rpc = False
+
+    if rpc:
+        plugin = LinuxBridgeQuantumAgentRPC(br_name_prefix,
+                                            physical_interface,
+                                            polling_interval,
+                                            root_helper)
+    else:
+        db_connection_url = cfg.CONF.DATABASE.sql_connection
+        target_v2_api = cfg.CONF.AGENT.target_v2_api
+        plugin = LinuxBridgeQuantumAgentDB(br_name_prefix,
+                                           physical_interface,
+                                           polling_interval,
+                                           reconnect_interval,
+                                           root_helper,
+                                           target_v2_api,
+                                           db_connection_url)
     LOG.info("Agent initialized successfully, now running... ")
-    plugin.daemon_loop(db_connection_url)
-
+    plugin.daemon_loop()
     sys.exit(0)
 
 if __name__ == "__main__":
+    eventlet.monkey_patch()
     main()
index 290fb2a418ed4b43676a4b0047f2d41ec1386376..cafe455f245749a9cc69a0078faff31fc423beba 100644 (file)
@@ -39,6 +39,7 @@ agent_opts = [
     cfg.IntOpt('polling_interval', default=2),
     cfg.StrOpt('root_helper', default='sudo'),
     cfg.BoolOpt('target_v2_api', default=False),
+    cfg.BoolOpt('rpc', default=True),
 ]
 
 
index 80e8c9d8d6f1917b5f042aa28f592f283232df6b..a0ca30f75e48b096b9b220e702589873bbc59e17 100644 (file)
@@ -20,8 +20,10 @@ import logging
 from sqlalchemy import func
 from sqlalchemy.orm import exc
 
+from quantum.api import api_common
 from quantum.common import exceptions as q_exc
 import quantum.db.api as db
+from quantum.db import models_v2
 from quantum.openstack.common import cfg
 from quantum.plugins.linuxbridge.common import config
 from quantum.plugins.linuxbridge.common import exceptions as c_exc
@@ -281,3 +283,31 @@ def update_vlan_binding(netid, newvlanid=None):
         return binding
     except exc.NoResultFound:
         raise q_exc.NetworkNotFound(net_id=netid)
+
+
+def get_port_from_device(device):
+    """Get port from database"""
+    LOG.debug("get_port_from_device() called")
+    session = db.get_session()
+    ports = session.query(models_v2.Port).all()
+    if not ports:
+        return
+    for port in ports:
+        if port['id'].startswith(device):
+            return port
+    return
+
+
+def set_port_status(port_id, status):
+    """Set the port status"""
+    LOG.debug("set_port_status as %s called", status)
+    session = db.get_session()
+    try:
+        port = session.query(models_v2.Port).filter_by(id=port_id).one()
+        port['status'] = status
+        if status == api_common.OperationalStatus.DOWN:
+            port['device_id'] = ''
+        session.merge(port)
+        session.flush()
+    except exc.NoResultFound:
+        raise q_exc.PortNotFound(port_id=port_id)
index 80f2e407fc6edf10c956c368b315d762d96e9f18..13aaa7d154618d2803bfbe9445d80b4d6cbbdd88 100644 (file)
 
 import logging
 
+from quantum.api import api_common
 from quantum.api.v2 import attributes
+from quantum.common import topics
 from quantum.db import db_base_plugin_v2
 from quantum.db import models_v2
+from quantum.openstack.common import context
+from quantum.openstack.common import cfg
+from quantum.openstack.common import rpc
+from quantum.openstack.common.rpc import dispatcher
+from quantum.openstack.common.rpc import proxy
 from quantum.plugins.linuxbridge.db import l2network_db as cdb
 from quantum import policy
 
+
 LOG = logging.getLogger(__name__)
 
 
+class LinuxBridgeRpcCallbacks():
+
+    # Set RPC API version to 1.0 by default.
+    RPC_API_VERSION = '1.0'
+    # Device names start with "tap"
+    TAP_PREFIX_LEN = 3
+
+    def __init__(self, context):
+        self.context = context
+
+    def create_rpc_dispatcher(self):
+        '''Get the rpc dispatcher for this manager.
+
+        If a manager would like to set an rpc API version, or support more than
+        one class as the target of rpc messages, override this method.
+        '''
+        return dispatcher.RpcDispatcher([self])
+
+    def get_device_details(self, context, **kwargs):
+        """Agent requests device details"""
+        agent_id = kwargs.get('agent_id')
+        device = kwargs.get('device')
+        LOG.debug("Device %s details requested from %s", device, agent_id)
+        port = cdb.get_port_from_device(device[self.TAP_PREFIX_LEN:])
+        if port:
+            vlan_binding = cdb.get_vlan_binding(port['network_id'])
+            entry = {'device': device,
+                     'vlan_id': vlan_binding['vlan_id'],
+                     'network_id': port['network_id'],
+                     'port_id': port['id'],
+                     'admin_state_up': port['admin_state_up']}
+            # Set the port status to UP
+            cdb.set_port_status(port['id'], api_common.OperationalStatus.UP)
+        else:
+            entry = {'device': device}
+            LOG.debug("%s can not be found in database", device)
+        return entry
+
+    def update_device_down(self, context, **kwargs):
+        """Device no longer exists on agent"""
+        # (TODO) garyk - live migration and port status
+        agent_id = kwargs.get('agent_id')
+        device = kwargs.get('device')
+        LOG.debug("Device %s no longer exists on %s", device, agent_id)
+        port = cdb.get_port_from_device(device[self.TAP_PREFIX_LEN:])
+        if port:
+            entry = {'device': device,
+                     'exists': True}
+            # Set port status to DOWN
+            cdb.set_port_status(port['id'], api_common.OperationalStatus.DOWN)
+        else:
+            entry = {'device': device,
+                     'exists': False}
+            LOG.debug("%s can not be found in database", device)
+        return entry
+
+
+class AgentNotifierApi(proxy.RpcProxy):
+    '''Agent side of the linux bridge rpc API.
+
+    API version history:
+        1.0 - Initial version.
+
+    '''
+
+    BASE_RPC_API_VERSION = '1.0'
+
+    def __init__(self, topic):
+        super(AgentNotifierApi, self).__init__(
+            topic=topic, default_version=self.BASE_RPC_API_VERSION)
+        self.topic_network_delete = topics.get_topic_name(topic,
+                                                          topics.NETWORK,
+                                                          topics.DELETE)
+        self.topic_port_update = topics.get_topic_name(topic,
+                                                       topics.PORT,
+                                                       topics.UPDATE)
+
+    def network_delete(self, context, network_id):
+        self.fanout_cast(context,
+                         self.make_msg('network_delete',
+                                       network_id=network_id),
+                         topic=self.topic_network_delete)
+
+    def port_update(self, context, port, vlan_id):
+        self.fanout_cast(context,
+                         self.make_msg('port_update',
+                                       port=port,
+                                       vlan_id=vlan_id),
+                         topic=self.topic_port_update)
+
+
 class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2):
     """Implement the Quantum abstractions using Linux bridging.
 
@@ -42,8 +141,27 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2):
 
     def __init__(self):
         cdb.initialize(base=models_v2.model_base.BASEV2)
+        self.rpc = cfg.CONF.AGENT.rpc
+        if cfg.CONF.AGENT.rpc and cfg.CONF.AGENT.target_v2_api:
+            self.setup_rpc()
+        if not cfg.CONF.AGENT.target_v2_api:
+            self.rpc = False
         LOG.debug("Linux Bridge Plugin initialization complete")
 
+    def setup_rpc(self):
+        # RPC support
+        self.topic = topics.PLUGIN
+        self.context = context.RequestContext('quantum', 'quantum',
+                                              is_admin=False)
+        self.conn = rpc.create_connection(new=True)
+        self.callbacks = LinuxBridgeRpcCallbacks(self.context)
+        self.dispatcher = self.callbacks.create_rpc_dispatcher()
+        self.conn.create_consumer(self.topic, self.dispatcher,
+                                  fanout=False)
+        # Consume from all consumers in a thread
+        self.conn.consume_in_thread()
+        self.notifier = AgentNotifierApi(topics.AGENT)
+
     # TODO(rkukura) Use core mechanism for attribute authorization
     # when available.
 
@@ -91,6 +209,8 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2):
         vlan_binding = cdb.get_vlan_binding(id)
         result = super(LinuxBridgePluginV2, self).delete_network(context, id)
         cdb.release_vlanid(vlan_binding['vlan_id'])
+        if self.rpc:
+            self.notifier.network_delete(self.context, id)
         return result
 
     def get_network(self, context, id, fields=None, verbose=None):
@@ -106,3 +226,15 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2):
             self._extend_network_dict(context, net)
         # TODO(rkukura): Filter on extended attributes.
         return [self._fields(net, fields) for net in nets]
+
+    def update_port(self, context, id, port):
+        if self.rpc:
+            original_port = super(LinuxBridgePluginV2, self).get_port(context,
+                                                                      id)
+        port = super(LinuxBridgePluginV2, self).update_port(context, id, port)
+        if self.rpc:
+            if original_port['admin_state_up'] != port['admin_state_up']:
+                vlan_binding = cdb.get_vlan_binding(port['network_id'])
+                self.notifier.port_update(self.context, port,
+                                          vlan_binding['vlan_id'])
+        return port
index 0ea8fcd1db08cf2d2ccffa8673c64daf896fd1d6..7e4bd5de9ede9c7af15f333853df33265f03e021 100644 (file)
@@ -62,8 +62,7 @@ class LinuxBridgeAgentTest(unittest.TestCase):
         vlan_id = vlan_bind[lconst.VLANID]
 
         self._linuxbridge_quantum_agent.process_port_binding(
-            new_port[lconst.PORT_ID], new_network[lconst.NET_ID],
-            device_name, str(vlan_id))
+            new_network[lconst.NET_ID], device_name, str(vlan_id))
         list_interface = (self._linuxbridge_quantum_agent.linux_br.
                           get_interfaces_on_bridge(bridge_name))
 
@@ -101,8 +100,7 @@ class LinuxBridgeAgentTest(unittest.TestCase):
         vlan_id = vlan_bind[lconst.VLANID]
 
         self._linuxbridge_quantum_agent.process_port_binding(
-            new_port[lconst.PORT_ID], new_network[lconst.NET_ID],
-            interface_id, str(vlan_id))
+            new_network[lconst.NET_ID], interface_id, str(vlan_id))
         list_interface = (self._linuxbridge_quantum_agent.linux_br.
                           get_interfaces_on_bridge(bridge_name))
 
@@ -140,8 +138,7 @@ class LinuxBridgeAgentTest(unittest.TestCase):
         vlan_id = vlan_bind[lconst.VLANID]
 
         self._linuxbridge_quantum_agent.process_port_binding(
-            new_port[lconst.PORT_ID], new_network[lconst.NET_ID],
-            interface_id, str(vlan_id))
+            new_network[lconst.NET_ID], interface_id, str(vlan_id))
         list_interface = (self._linuxbridge_quantum_agent.linux_br.
                           get_interfaces_on_bridge(bridge_name))
 
@@ -283,8 +280,7 @@ class LinuxBridgeAgentTest(unittest.TestCase):
         vlan_id = vlan_bind[lconst.VLANID]
 
         self._linuxbridge_quantum_agent.process_port_binding(
-            new_port[lconst.PORT_ID], new_network[lconst.NET_ID],
-            interface_id, str(vlan_id))
+            new_network[lconst.NET_ID], interface_id, str(vlan_id))
         list_interface = (self._linuxbridge_quantum_agent.linux_br.
                           get_interfaces_on_bridge(bridge_name))
         self._linuxbridge_plugin.unplug_interface(tenant_id,
@@ -347,8 +343,7 @@ class LinuxBridgeAgentTest(unittest.TestCase):
         vlan_id = vlan_bind[lconst.VLANID]
 
         self._linuxbridge_quantum_agent.process_port_binding(
-            new_port[lconst.PORT_ID], new_network[lconst.NET_ID],
-            interface_id, str(vlan_id))
+            new_network[lconst.NET_ID], interface_id, str(vlan_id))
         list_interface = (self._linuxbridge_quantum_agent.linux_br.
                           get_interfaces_on_bridge(bridge_name))
         self._linuxbridge_plugin.unplug_interface(tenant_id,
@@ -412,6 +407,7 @@ class LinuxBridgeAgentTest(unittest.TestCase):
         self.gw_name_prefix = "gw-"
         self.tap_name_prefix = "tap"
         self.v2 = True
+        self.db_connection = 'sqlite://'
         self._linuxbridge_plugin = LinuxBridgePlugin.LinuxBridgePlugin()
         try:
             fh = open(self.config_file)
@@ -430,13 +426,15 @@ class LinuxBridgeAgentTest(unittest.TestCase):
         self._linuxbridge = linux_agent.LinuxBridge(self.br_name_prefix,
                                                     self.physical_interface,
                                                     self.root_helper)
-        self._linuxbridge_quantum_agent = linux_agent.LinuxBridgeQuantumAgent(
+        plugin = linux_agent.LinuxBridgeQuantumAgentDB(
             self.br_name_prefix,
             self.physical_interface,
             self.polling_interval,
             self.reconnect_interval,
             self.root_helper,
-            self.v2)
+            self.v2,
+            self.db_connection)
+        self._linuxbridge_quantum_agent = plugin
 
     def run_cmd(self, args):
         cmd = shlex.split(self.root_helper) + args
diff --git a/quantum/plugins/linuxbridge/tests/unit/test_rpcapi.py b/quantum/plugins/linuxbridge/tests/unit/test_rpcapi.py
new file mode 100644 (file)
index 0000000..22213c5
--- /dev/null
@@ -0,0 +1,91 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012, Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+Unit Tests for linuxbridge rpc
+"""
+
+import stubout
+import unittest2
+
+from quantum.common import topics
+from quantum.openstack.common import context
+from quantum.openstack.common import rpc
+from quantum.plugins.linuxbridge.agent import linuxbridge_quantum_agent as alb
+from quantum.plugins.linuxbridge import lb_quantum_plugin as plb
+
+
+class rpcApiTestCase(unittest2.TestCase):
+
+    def _test_lb_api(self, rpcapi, topic, method, rpc_method, **kwargs):
+        ctxt = context.RequestContext('fake_user', 'fake_project')
+        expected_retval = 'foo' if method == 'call' else None
+        expected_msg = rpcapi.make_msg(method, **kwargs)
+        expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION
+        if rpc_method == 'cast' and method == 'run_instance':
+            kwargs['call'] = False
+
+        self.fake_args = None
+        self.fake_kwargs = None
+
+        def _fake_rpc_method(*args, **kwargs):
+            self.fake_args = args
+            self.fake_kwargs = kwargs
+            if expected_retval:
+                return expected_retval
+
+        self.stubs = stubout.StubOutForTesting()
+        self.stubs.Set(rpc, rpc_method, _fake_rpc_method)
+
+        retval = getattr(rpcapi, method)(ctxt, **kwargs)
+
+        self.assertEqual(retval, expected_retval)
+        expected_args = [ctxt, topic, expected_msg]
+
+        for arg, expected_arg in zip(self.fake_args, expected_args):
+            self.assertEqual(arg, expected_arg)
+
+    def test_delete_network(self):
+        rpcapi = plb.AgentNotifierApi(topics.AGENT)
+        self._test_lb_api(rpcapi,
+                          topics.get_topic_name(topics.AGENT,
+                                                topics.NETWORK,
+                                                topics.DELETE),
+                          'network_delete', rpc_method='fanout_cast',
+                          network_id='fake_request_spec')
+
+    def test_port_update(self):
+        rpcapi = plb.AgentNotifierApi(topics.AGENT)
+        self._test_lb_api(rpcapi,
+                          topics.get_topic_name(topics.AGENT,
+                                                topics.PORT,
+                                                topics.UPDATE),
+                          'port_update', rpc_method='fanout_cast',
+                          port='fake_port', vlan_id='fake_vlan_id')
+
+    def test_device_details(self):
+        rpcapi = alb.PluginApi(topics.PLUGIN)
+        self._test_lb_api(rpcapi, topics.PLUGIN,
+                          'get_device_details', rpc_method='call',
+                          device='fake_device',
+                          agent_id='fake_agent_id')
+
+    def test_update_device_down(self):
+        rpcapi = alb.PluginApi(topics.PLUGIN)
+        self._test_lb_api(rpcapi, topics.PLUGIN,
+                          'update_device_down', rpc_method='call',
+                          device='fake_device',
+                          agent_id='fake_agent_id')
index 5e3a1f260873c028524bc48a7a749bd18c96b288..4bc041050348b1b2db7394c525fa11e8010d1784 100644 (file)
@@ -8,5 +8,6 @@ lxml
 netaddr
 python-gflags==1.3
 python-quantumclient>=0.1,<0.2
+pyudev
 sqlalchemy>0.6.4
 webob==1.2.0