From dea9ecf4b2ab31baef56a5d80d157ab8b539bd57 Mon Sep 17 00:00:00 2001 From: Gary Kotton Date: Tue, 10 Jul 2012 11:28:08 -0400 Subject: [PATCH] RPC support for Linux Bridge Plugin and Agent blueprint scalable-agent-comms This is the first stage of the blueprint. This adds support to the linux bridge plugin. The development followed the design described in: https://docs.google.com/document/d/1MbcBA2Os4b98ybdgAw2qe_68R1NG6KMh8zdZKgOlpvg/edit?pli=1 Change-Id: I4004c05a63ce49f020c2016c8763e73238b465a7 --- bin/quantum-server | 2 + etc/quantum.conf | 81 +++-- .../plugins/linuxbridge/linuxbridge_conf.ini | 8 +- quantum/agent/linux/utils.py | 14 + quantum/agent/rpc.py | 36 +++ quantum/common/topics.py | 42 +++ quantum/openstack/common/cfg.py | 5 +- quantum/openstack/common/exception.py | 1 - quantum/openstack/common/log.py | 33 +- quantum/openstack/common/rpc/common.py | 6 +- quantum/openstack/common/rpc/impl_qpid.py | 2 +- quantum/openstack/common/timeutils.py | 1 - .../agent/linuxbridge_quantum_agent.py | 281 ++++++++++++++++-- quantum/plugins/linuxbridge/common/config.py | 1 + .../plugins/linuxbridge/db/l2network_db.py | 30 ++ .../plugins/linuxbridge/lb_quantum_plugin.py | 132 ++++++++ .../tests/unit/_test_linuxbridgeAgent.py | 22 +- .../linuxbridge/tests/unit/test_rpcapi.py | 91 ++++++ tools/pip-requires | 1 + 19 files changed, 686 insertions(+), 103 deletions(-) create mode 100644 quantum/agent/rpc.py create mode 100644 quantum/common/topics.py create mode 100644 quantum/plugins/linuxbridge/tests/unit/test_rpcapi.py diff --git a/bin/quantum-server b/bin/quantum-server index 1ea7a1619..25ed9f8db 100755 --- a/bin/quantum-server +++ b/bin/quantum-server @@ -16,9 +16,11 @@ # License for the specific language governing permissions and limitations # under the License. +import eventlet import os import sys sys.path.insert(0, os.getcwd()) from quantum.server import main as server +eventlet.monkey_patch() server() diff --git a/etc/quantum.conf b/etc/quantum.conf index 92afe4155..97584d271 100644 --- a/etc/quantum.conf +++ b/etc/quantum.conf @@ -39,41 +39,9 @@ api_paste_config = api-paste.ini # Maximum amount of retries to generate a unique MAC address # mac_generation_retries = 16 -[QUOTAS] -# number of networks allowed per tenant -# quota_network = 10 - -# number of subnets allowed per tenant -# quota_subnet = 10 - -# number of ports allowed per tenant -# quota_port = 50 - -# default driver to use for quota checks -# quota_driver = quantum.quota.ConfDriver - -# ============ Notification System Options ===================== - -# Notifications can be sent when network/subnet/port are create, updated or deleted. -# There are four methods of sending notifications, logging (via the -# log_file directive), rpc (via a message queue), -# noop (no notifications sent, the default) or list of them - -# Defined in notifier api -# notification_driver = quantum.openstack.common.notifier.no_op_notifier -# default_notification_level = INFO -# myhost = myhost.com -# default_publisher_id = $myhost - -# Defined in rabbit_notifier for rpc way -# notification_topics = notifications - -# Defined in list_notifier -# list_notifier_drivers = quantum.openstack.common.notifier.no_op_notifier - -# Defined in rpc __init__ +# RPC configuration options. Defined in rpc __init__ # The messaging module to use, defaults to kombu. -# rpc_backend =quantum.openstack.common.notifier.rpc.impl_kombu +# rpc_backend = quantum.openstack.common.notifier.rpc.impl_kombu # Size of RPC thread pool # rpc_thread_pool_size = 64, # Size of RPC connection pool @@ -85,8 +53,9 @@ api_paste_config = api-paste.ini # Modules of exceptions that are permitted to be recreated # upon receiving exception data from an rpc call. # allowed_rpc_exception_modules = quantum.openstack.common.exception, nova.exception -# AMQP exchange to connect to if using RabbitMQ or Qpid -# control_exchange = nova +# AMQP exchange to connect to if using RabbitMQ or QPID +control_exchange = quantum + # If passed, use a fake RabbitMQ provider # fake_rabbit = False @@ -152,11 +121,35 @@ api_paste_config = api-paste.ini # ZeroMQ bind address. Should be a wildcard (*), an ethernet interface, or IP. # The "host" option should point or resolve to this address. # rpc_zmq_bind_address = * -# MatchMaker driver -# rpc_zmq_matchmaker = openstack.common.rpc.matchmaker.MatchMakerLocalhost -# ZeroMQ receiver listening port -# rpc_zmq_port = 9501 -# Number of ZeroMQ contexts, defaults to 1 -# rpc_zmq_contexts = 1 -# Directory for holding IPC sockets -# rpc_zmq_ipc_dir = /var/run/openstack + +[QUOTAS] +# number of networks allowed per tenant +# quota_network = 10 + +# number of subnets allowed per tenant +# quota_subnet = 10 + +# number of ports allowed per tenant +# quota_port = 50 + +# default driver to use for quota checks +# quota_driver = quantum.quota.ConfDriver + +# ============ Notification System Options ===================== + +# Notifications can be sent when network/subnet/port are create, updated or deleted. +# There are four methods of sending notifications, logging (via the +# log_file directive), rpc (via a message queue), +# noop (no notifications sent, the default) or list of them + +# Defined in notifier api +# notification_driver = quantum.openstack.common.notifier.no_op_notifier +# default_notification_level = INFO +# myhost = myhost.com +# default_publisher_id = $myhost + +# Defined in rabbit_notifier for rpc way +# notification_topics = notifications + +# Defined in list_notifier +# list_notifier_drivers = quantum.openstack.common.notifier.no_op_notifier diff --git a/etc/quantum/plugins/linuxbridge/linuxbridge_conf.ini b/etc/quantum/plugins/linuxbridge/linuxbridge_conf.ini index 59dcd1382..f494e5b28 100644 --- a/etc/quantum/plugins/linuxbridge/linuxbridge_conf.ini +++ b/etc/quantum/plugins/linuxbridge/linuxbridge_conf.ini @@ -1,6 +1,6 @@ [VLANS] -vlan_start=1000 -vlan_end=3000 +vlan_start = 1000 +vlan_end = 3000 [DATABASE] # This line MUST be changed to actually run the plugin. @@ -25,3 +25,7 @@ polling_interval = 2 # Change to "sudo quantum-rootwrap" to limit commands that can be run # as root. root_helper = sudo +# Use Quantumv2 API +# target_v2_api = False +# Use RPC messaging to interface between agent and plugin +# rpc = True diff --git a/quantum/agent/linux/utils.py b/quantum/agent/linux/utils.py index a2a0294c8..bb218a0ab 100644 --- a/quantum/agent/linux/utils.py +++ b/quantum/agent/linux/utils.py @@ -17,9 +17,12 @@ # # @author: Juliano Martinez, Locaweb. +import fcntl import logging import os import shlex +import socket +import struct import subprocess LOG = logging.getLogger(__name__) @@ -50,3 +53,14 @@ def execute(cmd, root_helper=None, process_input=None, addl_env=None, raise RuntimeError(m) return return_stderr and (_stdout, _stderr) or _stdout + + +def get_interface_mac(interface): + DEVICE_NAME_LEN = 15 + MAC_START = 18 + MAC_END = 24 + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + info = fcntl.ioctl(s.fileno(), 0x8927, + struct.pack('256s', interface[:DEVICE_NAME_LEN])) + return ''.join(['%02x:' % ord(char) + for char in info[MAC_START:MAC_END]])[:-1] diff --git a/quantum/agent/rpc.py b/quantum/agent/rpc.py new file mode 100644 index 000000000..7eb85f10f --- /dev/null +++ b/quantum/agent/rpc.py @@ -0,0 +1,36 @@ +# Copyright (c) 2012 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from quantum.common import topics +from quantum.openstack.common import rpc + + +def create_consumers(dispatcher, prefix, topic_details): + """Create agent RPC consumers. + + :param dispatcher: The dispatcher to process the incoming messages. + :param prefix: Common prefix for the plugin/agent message queues. + :param topic_details: A list of topics. Each topic has a name and a + operation. + + :returns: A common Connection. + """ + + connection = rpc.create_connection(new=True) + for topic, operation in topic_details: + topic_name = topics.get_topic_name(prefix, topic, operation) + connection.create_consumer(topic_name, dispatcher, fanout=True) + connection.consume_in_thread() + return connection diff --git a/quantum/common/topics.py b/quantum/common/topics.py new file mode 100644 index 000000000..a5521c09c --- /dev/null +++ b/quantum/common/topics.py @@ -0,0 +1,42 @@ +# Copyright (c) 2012 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +NETWORK = 'network' +SUBNET = 'subnet' +PORT = 'port' + +CREATE = 'create' +DELETE = 'delete' +UPDATE = 'update' + +AGENT = 'q-agent-notifier' +PLUGIN = 'q-plugin' + + +def get_topic_name(prefix, table, operation): + """Create a topic name. + + The topic name needs to be synced between the agent and the + plugin. The plugin will send a fanout message to all of the + listening agents so that the agents in turn can perform their + updates accordingly. + + :param prefix: Common prefix for the plugin/agent message queues. + :param table: The table in question (NETWORK, SUBNET, PORT). + :param operation: The operation that invokes notification (CREATE, + DELETE, UPDATE) + :returns: The topic name. + """ + return '%s-%s-%s' % (prefix, table, operation) diff --git a/quantum/openstack/common/cfg.py b/quantum/openstack/common/cfg.py index 9ad115f0f..fa3a350f0 100644 --- a/quantum/openstack/common/cfg.py +++ b/quantum/openstack/common/cfg.py @@ -464,7 +464,7 @@ def _is_opt_registered(opts, opt): :raises: DuplicateOptError if a naming conflict is detected """ if opt.dest in opts: - if opts[opt.dest]['opt'] is not opt: + if opts[opt.dest]['opt'] != opt: raise DuplicateOptError(opt.name) return True else: @@ -527,6 +527,9 @@ class Opt(object): else: self.deprecated_name = None + def __ne__(self, another): + return vars(self) != vars(another) + def _get_from_config_parser(self, cparser, section): """Retrieves the option value from a MultiConfigParser object. diff --git a/quantum/openstack/common/exception.py b/quantum/openstack/common/exception.py index e5da94b94..ba32da550 100644 --- a/quantum/openstack/common/exception.py +++ b/quantum/openstack/common/exception.py @@ -19,7 +19,6 @@ Exceptions common to OpenStack projects """ -import itertools import logging diff --git a/quantum/openstack/common/log.py b/quantum/openstack/common/log.py index 0bf986bf1..f9af4a785 100644 --- a/quantum/openstack/common/log.py +++ b/quantum/openstack/common/log.py @@ -257,16 +257,18 @@ class PublishErrorsHandler(logging.Handler): dict(error=record.msg)) -def handle_exception(type, value, tb): - extra = {} - if CONF.verbose: - extra['exc_info'] = (type, value, tb) - getLogger().critical(str(value), **extra) +def _create_logging_excepthook(product_name): + def logging_excepthook(type, value, tb): + extra = {} + if CONF.verbose: + extra['exc_info'] = (type, value, tb) + getLogger(product_name).critical(str(value), **extra) + return logging_excepthook def setup(product_name): """Setup logging.""" - sys.excepthook = handle_exception + sys.excepthook = _create_logging_excepthook(product_name) if CONF.log_config: try: @@ -357,17 +359,6 @@ def _setup_logging_from_conf(product_name): for handler in log_root.handlers: logger.addHandler(handler) - # NOTE(jkoelker) Clear the handlers for the root logger that was setup - # by basicConfig in nova/__init__.py and install the - # NullHandler. - root = logging.getLogger() - for handler in root.handlers: - root.removeHandler(handler) - handler = NullHandler() - handler.setFormatter(logging.Formatter()) - root.addHandler(handler) - - _loggers = {} @@ -405,8 +396,12 @@ class LegacyFormatter(logging.Formatter): def format(self, record): """Uses contextstring if request_id is set, otherwise default.""" - if 'instance' not in record.__dict__: - record.__dict__['instance'] = '' + # NOTE(sdague): default the fancier formating params + # to an empty string so we don't throw an exception if + # they get used + for key in ('instance', 'color'): + if key not in record.__dict__: + record.__dict__[key] = '' if record.__dict__.get('request_id', None): self._fmt = CONF.logging_context_format_string diff --git a/quantum/openstack/common/rpc/common.py b/quantum/openstack/common/rpc/common.py index 9ef1fc1d9..84a6c723a 100644 --- a/quantum/openstack/common/rpc/common.py +++ b/quantum/openstack/common/rpc/common.py @@ -108,7 +108,7 @@ class Connection(object): """ raise NotImplementedError() - def create_consumer(self, conf, topic, proxy, fanout=False): + def create_consumer(self, topic, proxy, fanout=False): """Create a consumer on this connection. A consumer is associated with a message queue on the backend message @@ -117,7 +117,6 @@ class Connection(object): off of the queue will determine which method gets called on the proxy object. - :param conf: An openstack.common.cfg configuration object. :param topic: This is a name associated with what to consume from. Multiple instances of a service may consume from the same topic. For example, all instances of nova-compute consume @@ -133,7 +132,7 @@ class Connection(object): """ raise NotImplementedError() - def create_worker(self, conf, topic, proxy, pool_name): + def create_worker(self, topic, proxy, pool_name): """Create a worker on this connection. A worker is like a regular consumer of messages directed to a @@ -143,7 +142,6 @@ class Connection(object): be asked to process it. Load is distributed across the members of the pool in round-robin fashion. - :param conf: An openstack.common.cfg configuration object. :param topic: This is a name associated with what to consume from. Multiple instances of a service may consume from the same topic. diff --git a/quantum/openstack/common/rpc/impl_qpid.py b/quantum/openstack/common/rpc/impl_qpid.py index ec9140ed1..b222a8089 100644 --- a/quantum/openstack/common/rpc/impl_qpid.py +++ b/quantum/openstack/common/rpc/impl_qpid.py @@ -329,7 +329,7 @@ class Connection(object): if self.conf.qpid_reconnect_interval: self.connection.reconnect_interval = ( self.conf.qpid_reconnect_interval) - self.connection.hearbeat = self.conf.qpid_heartbeat + self.connection.heartbeat = self.conf.qpid_heartbeat self.connection.protocol = self.conf.qpid_protocol self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay diff --git a/quantum/openstack/common/timeutils.py b/quantum/openstack/common/timeutils.py index 5eeaf70aa..4416a3b19 100644 --- a/quantum/openstack/common/timeutils.py +++ b/quantum/openstack/common/timeutils.py @@ -21,7 +21,6 @@ Time related utilities and helper functions. import calendar import datetime -import time import iso8601 diff --git a/quantum/plugins/linuxbridge/agent/linuxbridge_quantum_agent.py b/quantum/plugins/linuxbridge/agent/linuxbridge_quantum_agent.py index 00629c331..1acae609e 100755 --- a/quantum/plugins/linuxbridge/agent/linuxbridge_quantum_agent.py +++ b/quantum/plugins/linuxbridge/agent/linuxbridge_quantum_agent.py @@ -30,10 +30,18 @@ import subprocess import sys import time +import eventlet +import pyudev from sqlalchemy.ext.sqlsoup import SqlSoup -from quantum.openstack.common import cfg +from quantum.agent.rpc import create_consumers from quantum.common import config as logging_config +from quantum.common import topics +from quantum.openstack.common import cfg +from quantum.openstack.common import context +from quantum.openstack.common import rpc +from quantum.openstack.common.rpc import dispatcher +from quantum.openstack.common.rpc import proxy from quantum.plugins.linuxbridge.common import config from quantum.agent.linux import utils @@ -312,25 +320,97 @@ class LinuxBridge: LOG.debug("Done deleting subinterface %s" % interface) -class LinuxBridgeQuantumAgent: +class PluginApi(proxy.RpcProxy): + '''Agent side of the linux bridge rpc API. + + API version history: + 1.0 - Initial version. + + ''' + + BASE_RPC_API_VERSION = '1.0' + + def __init__(self, topic): + super(PluginApi, self).__init__( + topic=topic, default_version=self.BASE_RPC_API_VERSION) + + def get_device_details(self, context, device, agent_id): + return self.call(context, + self.make_msg('get_device_details', device=device, + agent_id=agent_id), + topic=self.topic) + + def update_device_down(self, context, device, agent_id): + return self.call(context, + self.make_msg('update_device_down', device=device, + agent_id=agent_id), + topic=self.topic) + + +class LinuxBridgeRpcCallbacks(): + + # Set RPC API version to 1.0 by default. + RPC_API_VERSION = '1.0' + + def __init__(self, context, linux_br): + self.context = context + self.linux_br = linux_br + + def network_delete(self, context, **kwargs): + LOG.debug("network_delete received") + network_id = kwargs.get('network_id') + bridge_name = self.linux_br.get_bridge_name(network_id) + # (TODO) garyk delete the bridge interface + LOG.debug("Delete %s", bridge_name) + + def port_update(self, context, **kwargs): + LOG.debug("port_update received") + port = kwargs.get('port') + if port['admin_state_up']: + vlan_id = kwargs.get('vlan_id') + # create the networking for the port + self.linux_br.add_interface(port['network_id'], + vlan_id, + port['id']) + else: + bridge_name = self.linux_br.get_bridge_name(port['network_id']) + tap_device_name = self.linux_br.get_tap_device_name(port['id']) + self.linux_br.remove_interface(bridge_name, tap_device_name) + + def create_rpc_dispatcher(self): + '''Get the rpc dispatcher for this manager. + + If a manager would like to set an rpc API version, or support more than + one class as the target of rpc messages, override this method. + ''' + return dispatcher.RpcDispatcher([self]) + + +class LinuxBridgeQuantumAgentDB: def __init__(self, br_name_prefix, physical_interface, polling_interval, - reconnect_interval, root_helper, target_v2_api): + reconnect_interval, root_helper, target_v2_api, + db_connection_url): self.polling_interval = polling_interval - self.reconnect_interval = reconnect_interval self.root_helper = root_helper self.setup_linux_bridge(br_name_prefix, physical_interface) - self.db_connected = False self.target_v2_api = target_v2_api + self.reconnect_interval = reconnect_interval + self.db_connected = False + self.db_connection_url = db_connection_url def setup_linux_bridge(self, br_name_prefix, physical_interface): self.linux_br = LinuxBridge(br_name_prefix, physical_interface, self.root_helper) - def process_port_binding(self, port_id, network_id, interface_id, - vlan_id): + def process_port_binding(self, network_id, interface_id, vlan_id): return self.linux_br.add_interface(network_id, vlan_id, interface_id) + def remove_port_binding(self, network_id, interface_id): + bridge_name = self.linux_br.get_bridge_name(network_id) + tap_device_name = self.linux_br.get_tap_device_name(interface_id) + return self.linux_br.remove_interface(bridge_name, tap_device_name) + def process_unplugged_interfaces(self, plugged_interfaces): """ If there are any tap devices that are not corresponding to the @@ -434,8 +514,7 @@ class LinuxBridgeQuantumAgent: interface_id = pb['interface_id'] vlan_id = str(vlan_bindings[pb['network_id']]['vlan_id']) - if self.process_port_binding(port_id, - pb['network_id'], + if self.process_port_binding(pb['network_id'], interface_id, vlan_id): if self.target_v2_api: @@ -466,7 +545,7 @@ class LinuxBridgeQuantumAgent: return {VLAN_BINDINGS: vlan_bindings, PORT_BINDINGS: port_bindings} - def daemon_loop(self, db_connection_url): + def daemon_loop(self): old_vlan_bindings = {} old_port_bindings = [] self.db_connected = False @@ -474,7 +553,7 @@ class LinuxBridgeQuantumAgent: while True: if not self.db_connected: time.sleep(self.reconnect_interval) - db = SqlSoup(db_connection_url) + db = SqlSoup(self.db_connection_url) self.db_connected = True LOG.info("Connecting to database \"%s\" on %s" % (db.engine.url.database, db.engine.url.host)) @@ -486,6 +565,158 @@ class LinuxBridgeQuantumAgent: time.sleep(self.polling_interval) +class LinuxBridgeQuantumAgentRPC: + + def __init__(self, br_name_prefix, physical_interface, polling_interval, + root_helper): + self.polling_interval = polling_interval + self.root_helper = root_helper + self.setup_linux_bridge(br_name_prefix, physical_interface) + self.setup_rpc(physical_interface) + + def setup_rpc(self, physical_interface): + mac = utils.get_interface_mac(physical_interface) + self.agent_id = '%s%s' % ('lb', (mac.replace(":", ""))) + self.topic = topics.AGENT + self.plugin_rpc = PluginApi(topics.PLUGIN) + + # RPC network init + self.context = context.RequestContext('quantum', 'quantum', + is_admin=False) + # Handle updates from service + self.callbacks = LinuxBridgeRpcCallbacks(self.context, + self.linux_br) + self.dispatcher = self.callbacks.create_rpc_dispatcher() + # Define the listening consumers for the agent + consumers = [[topics.PORT, topics.UPDATE], + [topics.NETWORK, topics.DELETE]] + self.connection = create_consumers(self.dispatcher, self.topic, + consumers) + self.udev = pyudev.Context() + monitor = pyudev.Monitor.from_netlink(self.udev) + monitor.filter_by('net') + + def setup_linux_bridge(self, br_name_prefix, physical_interface): + self.linux_br = LinuxBridge(br_name_prefix, physical_interface, + self.root_helper) + + def process_port_binding(self, network_id, interface_id, vlan_id): + return self.linux_br.add_interface(network_id, vlan_id, interface_id) + + def remove_port_binding(self, network_id, interface_id): + bridge_name = self.linux_br.get_bridge_name(network_id) + tap_device_name = self.linux_br.get_tap_device_name(interface_id) + return self.linux_br.remove_interface(bridge_name, tap_device_name) + + def update_devices(self, registered_devices): + devices = self.udev_get_all_tap_devices() + if devices == registered_devices: + return + added = devices - registered_devices + removed = registered_devices - devices + return {'current': devices, + 'added': added, + 'removed': removed} + + def udev_get_all_tap_devices(self): + devices = set() + for device in self.udev.list_devices(subsystem='net'): + name = self.udev_get_name(device) + if self.is_tap_device(name): + devices.add(name) + return devices + + def is_tap_device(self, name): + return name.startswith(TAP_INTERFACE_PREFIX) + + def udev_get_name(self, device): + return device.sys_name + + def process_network_devices(self, device_info): + resync_a = False + resync_b = False + if 'added' in device_info: + resync_a = self.treat_devices_added(device_info['added']) + if 'removed' in device_info: + resync_b = self.treat_devices_removed(device_info['removed']) + # If one of the above operations fails => resync with plugin + return (resync_a | resync_b) + + def treat_devices_added(self, devices): + resync = False + for device in devices: + LOG.info("Port %s added", device) + try: + details = self.plugin_rpc.get_device_details(self.context, + device, + self.agent_id) + except Exception as e: + LOG.debug("Unable to get port details for %s: %s", device, e) + resync = True + continue + if 'port_id' in details: + LOG.info("Port %s updated. Details: %s", device, details) + if details['admin_state_up']: + # create the networking for the port + self.process_port_binding(details['network_id'], + details['port_id'], + details['vlan_id']) + else: + self.remove_port_binding(details['network_id'], + details['port_id']) + else: + LOG.debug("Device %s not defined on plugin", device) + return resync + + def treat_devices_removed(self, devices): + resync = False + for device in devices: + LOG.info("Attachment %s removed", device) + try: + details = self.plugin_rpc.update_device_down(self.context, + device, + self.agent_id) + except Exception as e: + LOG.debug("port_removed failed for %s: %s", device, e) + resync = True + if details['exists']: + LOG.info("Port %s updated.", device) + # Nothing to do regarding local networking + else: + LOG.debug("Device %s not defined on plugin", device) + return resync + + def daemon_loop(self): + sync = True + devices = set() + + LOG.info("LinuxBridge Agent RPC Daemon Started!") + + while True: + start = time.time() + if sync: + LOG.info("Agent out of sync with plugin!") + devices.clear() + sync = False + + device_info = self.update_devices(devices) + + # notify plugin about device deltas + if device_info: + LOG.debug("Agent loop has new devices!") + # If treat devices fails - indicates must resync with plugin + sync = self.process_network_devices(device_info) + devices = device_info['current'] + + # sleep till end of polling interval + elapsed = (time.time() - start) + if (elapsed < self.polling_interval): + time.sleep(self.polling_interval - elapsed) + else: + LOG.debug("Loop iteration exceeded interval (%s vs. %s)!", + self.polling_interval, elapsed) + + def main(): cfg.CONF(args=sys.argv, project='quantum') @@ -497,15 +728,29 @@ def main(): polling_interval = cfg.CONF.AGENT.polling_interval reconnect_interval = cfg.CONF.DATABASE.reconnect_interval root_helper = cfg.CONF.AGENT.root_helper - 'Establish database connection and load models' - db_connection_url = cfg.CONF.DATABASE.sql_connection - plugin = LinuxBridgeQuantumAgent(br_name_prefix, physical_interface, - polling_interval, reconnect_interval, - root_helper, cfg.CONF.AGENT.target_v2_api) + rpc = cfg.CONF.AGENT.rpc + if not cfg.CONF.AGENT.target_v2_api: + rpc = False + + if rpc: + plugin = LinuxBridgeQuantumAgentRPC(br_name_prefix, + physical_interface, + polling_interval, + root_helper) + else: + db_connection_url = cfg.CONF.DATABASE.sql_connection + target_v2_api = cfg.CONF.AGENT.target_v2_api + plugin = LinuxBridgeQuantumAgentDB(br_name_prefix, + physical_interface, + polling_interval, + reconnect_interval, + root_helper, + target_v2_api, + db_connection_url) LOG.info("Agent initialized successfully, now running... ") - plugin.daemon_loop(db_connection_url) - + plugin.daemon_loop() sys.exit(0) if __name__ == "__main__": + eventlet.monkey_patch() main() diff --git a/quantum/plugins/linuxbridge/common/config.py b/quantum/plugins/linuxbridge/common/config.py index 290fb2a41..cafe455f2 100644 --- a/quantum/plugins/linuxbridge/common/config.py +++ b/quantum/plugins/linuxbridge/common/config.py @@ -39,6 +39,7 @@ agent_opts = [ cfg.IntOpt('polling_interval', default=2), cfg.StrOpt('root_helper', default='sudo'), cfg.BoolOpt('target_v2_api', default=False), + cfg.BoolOpt('rpc', default=True), ] diff --git a/quantum/plugins/linuxbridge/db/l2network_db.py b/quantum/plugins/linuxbridge/db/l2network_db.py index 80e8c9d8d..a0ca30f75 100644 --- a/quantum/plugins/linuxbridge/db/l2network_db.py +++ b/quantum/plugins/linuxbridge/db/l2network_db.py @@ -20,8 +20,10 @@ import logging from sqlalchemy import func from sqlalchemy.orm import exc +from quantum.api import api_common from quantum.common import exceptions as q_exc import quantum.db.api as db +from quantum.db import models_v2 from quantum.openstack.common import cfg from quantum.plugins.linuxbridge.common import config from quantum.plugins.linuxbridge.common import exceptions as c_exc @@ -281,3 +283,31 @@ def update_vlan_binding(netid, newvlanid=None): return binding except exc.NoResultFound: raise q_exc.NetworkNotFound(net_id=netid) + + +def get_port_from_device(device): + """Get port from database""" + LOG.debug("get_port_from_device() called") + session = db.get_session() + ports = session.query(models_v2.Port).all() + if not ports: + return + for port in ports: + if port['id'].startswith(device): + return port + return + + +def set_port_status(port_id, status): + """Set the port status""" + LOG.debug("set_port_status as %s called", status) + session = db.get_session() + try: + port = session.query(models_v2.Port).filter_by(id=port_id).one() + port['status'] = status + if status == api_common.OperationalStatus.DOWN: + port['device_id'] = '' + session.merge(port) + session.flush() + except exc.NoResultFound: + raise q_exc.PortNotFound(port_id=port_id) diff --git a/quantum/plugins/linuxbridge/lb_quantum_plugin.py b/quantum/plugins/linuxbridge/lb_quantum_plugin.py index 80f2e407f..13aaa7d15 100644 --- a/quantum/plugins/linuxbridge/lb_quantum_plugin.py +++ b/quantum/plugins/linuxbridge/lb_quantum_plugin.py @@ -15,15 +15,114 @@ import logging +from quantum.api import api_common from quantum.api.v2 import attributes +from quantum.common import topics from quantum.db import db_base_plugin_v2 from quantum.db import models_v2 +from quantum.openstack.common import context +from quantum.openstack.common import cfg +from quantum.openstack.common import rpc +from quantum.openstack.common.rpc import dispatcher +from quantum.openstack.common.rpc import proxy from quantum.plugins.linuxbridge.db import l2network_db as cdb from quantum import policy + LOG = logging.getLogger(__name__) +class LinuxBridgeRpcCallbacks(): + + # Set RPC API version to 1.0 by default. + RPC_API_VERSION = '1.0' + # Device names start with "tap" + TAP_PREFIX_LEN = 3 + + def __init__(self, context): + self.context = context + + def create_rpc_dispatcher(self): + '''Get the rpc dispatcher for this manager. + + If a manager would like to set an rpc API version, or support more than + one class as the target of rpc messages, override this method. + ''' + return dispatcher.RpcDispatcher([self]) + + def get_device_details(self, context, **kwargs): + """Agent requests device details""" + agent_id = kwargs.get('agent_id') + device = kwargs.get('device') + LOG.debug("Device %s details requested from %s", device, agent_id) + port = cdb.get_port_from_device(device[self.TAP_PREFIX_LEN:]) + if port: + vlan_binding = cdb.get_vlan_binding(port['network_id']) + entry = {'device': device, + 'vlan_id': vlan_binding['vlan_id'], + 'network_id': port['network_id'], + 'port_id': port['id'], + 'admin_state_up': port['admin_state_up']} + # Set the port status to UP + cdb.set_port_status(port['id'], api_common.OperationalStatus.UP) + else: + entry = {'device': device} + LOG.debug("%s can not be found in database", device) + return entry + + def update_device_down(self, context, **kwargs): + """Device no longer exists on agent""" + # (TODO) garyk - live migration and port status + agent_id = kwargs.get('agent_id') + device = kwargs.get('device') + LOG.debug("Device %s no longer exists on %s", device, agent_id) + port = cdb.get_port_from_device(device[self.TAP_PREFIX_LEN:]) + if port: + entry = {'device': device, + 'exists': True} + # Set port status to DOWN + cdb.set_port_status(port['id'], api_common.OperationalStatus.DOWN) + else: + entry = {'device': device, + 'exists': False} + LOG.debug("%s can not be found in database", device) + return entry + + +class AgentNotifierApi(proxy.RpcProxy): + '''Agent side of the linux bridge rpc API. + + API version history: + 1.0 - Initial version. + + ''' + + BASE_RPC_API_VERSION = '1.0' + + def __init__(self, topic): + super(AgentNotifierApi, self).__init__( + topic=topic, default_version=self.BASE_RPC_API_VERSION) + self.topic_network_delete = topics.get_topic_name(topic, + topics.NETWORK, + topics.DELETE) + self.topic_port_update = topics.get_topic_name(topic, + topics.PORT, + topics.UPDATE) + + def network_delete(self, context, network_id): + self.fanout_cast(context, + self.make_msg('network_delete', + network_id=network_id), + topic=self.topic_network_delete) + + def port_update(self, context, port, vlan_id): + self.fanout_cast(context, + self.make_msg('port_update', + port=port, + vlan_id=vlan_id), + topic=self.topic_port_update) + + class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2): """Implement the Quantum abstractions using Linux bridging. @@ -42,8 +141,27 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2): def __init__(self): cdb.initialize(base=models_v2.model_base.BASEV2) + self.rpc = cfg.CONF.AGENT.rpc + if cfg.CONF.AGENT.rpc and cfg.CONF.AGENT.target_v2_api: + self.setup_rpc() + if not cfg.CONF.AGENT.target_v2_api: + self.rpc = False LOG.debug("Linux Bridge Plugin initialization complete") + def setup_rpc(self): + # RPC support + self.topic = topics.PLUGIN + self.context = context.RequestContext('quantum', 'quantum', + is_admin=False) + self.conn = rpc.create_connection(new=True) + self.callbacks = LinuxBridgeRpcCallbacks(self.context) + self.dispatcher = self.callbacks.create_rpc_dispatcher() + self.conn.create_consumer(self.topic, self.dispatcher, + fanout=False) + # Consume from all consumers in a thread + self.conn.consume_in_thread() + self.notifier = AgentNotifierApi(topics.AGENT) + # TODO(rkukura) Use core mechanism for attribute authorization # when available. @@ -91,6 +209,8 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2): vlan_binding = cdb.get_vlan_binding(id) result = super(LinuxBridgePluginV2, self).delete_network(context, id) cdb.release_vlanid(vlan_binding['vlan_id']) + if self.rpc: + self.notifier.network_delete(self.context, id) return result def get_network(self, context, id, fields=None, verbose=None): @@ -106,3 +226,15 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2): self._extend_network_dict(context, net) # TODO(rkukura): Filter on extended attributes. return [self._fields(net, fields) for net in nets] + + def update_port(self, context, id, port): + if self.rpc: + original_port = super(LinuxBridgePluginV2, self).get_port(context, + id) + port = super(LinuxBridgePluginV2, self).update_port(context, id, port) + if self.rpc: + if original_port['admin_state_up'] != port['admin_state_up']: + vlan_binding = cdb.get_vlan_binding(port['network_id']) + self.notifier.port_update(self.context, port, + vlan_binding['vlan_id']) + return port diff --git a/quantum/plugins/linuxbridge/tests/unit/_test_linuxbridgeAgent.py b/quantum/plugins/linuxbridge/tests/unit/_test_linuxbridgeAgent.py index 0ea8fcd1d..7e4bd5de9 100644 --- a/quantum/plugins/linuxbridge/tests/unit/_test_linuxbridgeAgent.py +++ b/quantum/plugins/linuxbridge/tests/unit/_test_linuxbridgeAgent.py @@ -62,8 +62,7 @@ class LinuxBridgeAgentTest(unittest.TestCase): vlan_id = vlan_bind[lconst.VLANID] self._linuxbridge_quantum_agent.process_port_binding( - new_port[lconst.PORT_ID], new_network[lconst.NET_ID], - device_name, str(vlan_id)) + new_network[lconst.NET_ID], device_name, str(vlan_id)) list_interface = (self._linuxbridge_quantum_agent.linux_br. get_interfaces_on_bridge(bridge_name)) @@ -101,8 +100,7 @@ class LinuxBridgeAgentTest(unittest.TestCase): vlan_id = vlan_bind[lconst.VLANID] self._linuxbridge_quantum_agent.process_port_binding( - new_port[lconst.PORT_ID], new_network[lconst.NET_ID], - interface_id, str(vlan_id)) + new_network[lconst.NET_ID], interface_id, str(vlan_id)) list_interface = (self._linuxbridge_quantum_agent.linux_br. get_interfaces_on_bridge(bridge_name)) @@ -140,8 +138,7 @@ class LinuxBridgeAgentTest(unittest.TestCase): vlan_id = vlan_bind[lconst.VLANID] self._linuxbridge_quantum_agent.process_port_binding( - new_port[lconst.PORT_ID], new_network[lconst.NET_ID], - interface_id, str(vlan_id)) + new_network[lconst.NET_ID], interface_id, str(vlan_id)) list_interface = (self._linuxbridge_quantum_agent.linux_br. get_interfaces_on_bridge(bridge_name)) @@ -283,8 +280,7 @@ class LinuxBridgeAgentTest(unittest.TestCase): vlan_id = vlan_bind[lconst.VLANID] self._linuxbridge_quantum_agent.process_port_binding( - new_port[lconst.PORT_ID], new_network[lconst.NET_ID], - interface_id, str(vlan_id)) + new_network[lconst.NET_ID], interface_id, str(vlan_id)) list_interface = (self._linuxbridge_quantum_agent.linux_br. get_interfaces_on_bridge(bridge_name)) self._linuxbridge_plugin.unplug_interface(tenant_id, @@ -347,8 +343,7 @@ class LinuxBridgeAgentTest(unittest.TestCase): vlan_id = vlan_bind[lconst.VLANID] self._linuxbridge_quantum_agent.process_port_binding( - new_port[lconst.PORT_ID], new_network[lconst.NET_ID], - interface_id, str(vlan_id)) + new_network[lconst.NET_ID], interface_id, str(vlan_id)) list_interface = (self._linuxbridge_quantum_agent.linux_br. get_interfaces_on_bridge(bridge_name)) self._linuxbridge_plugin.unplug_interface(tenant_id, @@ -412,6 +407,7 @@ class LinuxBridgeAgentTest(unittest.TestCase): self.gw_name_prefix = "gw-" self.tap_name_prefix = "tap" self.v2 = True + self.db_connection = 'sqlite://' self._linuxbridge_plugin = LinuxBridgePlugin.LinuxBridgePlugin() try: fh = open(self.config_file) @@ -430,13 +426,15 @@ class LinuxBridgeAgentTest(unittest.TestCase): self._linuxbridge = linux_agent.LinuxBridge(self.br_name_prefix, self.physical_interface, self.root_helper) - self._linuxbridge_quantum_agent = linux_agent.LinuxBridgeQuantumAgent( + plugin = linux_agent.LinuxBridgeQuantumAgentDB( self.br_name_prefix, self.physical_interface, self.polling_interval, self.reconnect_interval, self.root_helper, - self.v2) + self.v2, + self.db_connection) + self._linuxbridge_quantum_agent = plugin def run_cmd(self, args): cmd = shlex.split(self.root_helper) + args diff --git a/quantum/plugins/linuxbridge/tests/unit/test_rpcapi.py b/quantum/plugins/linuxbridge/tests/unit/test_rpcapi.py new file mode 100644 index 000000000..22213c5f2 --- /dev/null +++ b/quantum/plugins/linuxbridge/tests/unit/test_rpcapi.py @@ -0,0 +1,91 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012, Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Unit Tests for linuxbridge rpc +""" + +import stubout +import unittest2 + +from quantum.common import topics +from quantum.openstack.common import context +from quantum.openstack.common import rpc +from quantum.plugins.linuxbridge.agent import linuxbridge_quantum_agent as alb +from quantum.plugins.linuxbridge import lb_quantum_plugin as plb + + +class rpcApiTestCase(unittest2.TestCase): + + def _test_lb_api(self, rpcapi, topic, method, rpc_method, **kwargs): + ctxt = context.RequestContext('fake_user', 'fake_project') + expected_retval = 'foo' if method == 'call' else None + expected_msg = rpcapi.make_msg(method, **kwargs) + expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION + if rpc_method == 'cast' and method == 'run_instance': + kwargs['call'] = False + + self.fake_args = None + self.fake_kwargs = None + + def _fake_rpc_method(*args, **kwargs): + self.fake_args = args + self.fake_kwargs = kwargs + if expected_retval: + return expected_retval + + self.stubs = stubout.StubOutForTesting() + self.stubs.Set(rpc, rpc_method, _fake_rpc_method) + + retval = getattr(rpcapi, method)(ctxt, **kwargs) + + self.assertEqual(retval, expected_retval) + expected_args = [ctxt, topic, expected_msg] + + for arg, expected_arg in zip(self.fake_args, expected_args): + self.assertEqual(arg, expected_arg) + + def test_delete_network(self): + rpcapi = plb.AgentNotifierApi(topics.AGENT) + self._test_lb_api(rpcapi, + topics.get_topic_name(topics.AGENT, + topics.NETWORK, + topics.DELETE), + 'network_delete', rpc_method='fanout_cast', + network_id='fake_request_spec') + + def test_port_update(self): + rpcapi = plb.AgentNotifierApi(topics.AGENT) + self._test_lb_api(rpcapi, + topics.get_topic_name(topics.AGENT, + topics.PORT, + topics.UPDATE), + 'port_update', rpc_method='fanout_cast', + port='fake_port', vlan_id='fake_vlan_id') + + def test_device_details(self): + rpcapi = alb.PluginApi(topics.PLUGIN) + self._test_lb_api(rpcapi, topics.PLUGIN, + 'get_device_details', rpc_method='call', + device='fake_device', + agent_id='fake_agent_id') + + def test_update_device_down(self): + rpcapi = alb.PluginApi(topics.PLUGIN) + self._test_lb_api(rpcapi, topics.PLUGIN, + 'update_device_down', rpc_method='call', + device='fake_device', + agent_id='fake_agent_id') diff --git a/tools/pip-requires b/tools/pip-requires index 5e3a1f260..4bc041050 100644 --- a/tools/pip-requires +++ b/tools/pip-requires @@ -8,5 +8,6 @@ lxml netaddr python-gflags==1.3 python-quantumclient>=0.1,<0.2 +pyudev sqlalchemy>0.6.4 webob==1.2.0 -- 2.45.2