# License for the specific language governing permissions and limitations
# under the License.
+import eventlet
import os
import sys
sys.path.insert(0, os.getcwd())
from quantum.server import main as server
+eventlet.monkey_patch()
server()
# Maximum amount of retries to generate a unique MAC address
# mac_generation_retries = 16
-[QUOTAS]
-# number of networks allowed per tenant
-# quota_network = 10
-
-# number of subnets allowed per tenant
-# quota_subnet = 10
-
-# number of ports allowed per tenant
-# quota_port = 50
-
-# default driver to use for quota checks
-# quota_driver = quantum.quota.ConfDriver
-
-# ============ Notification System Options =====================
-
-# Notifications can be sent when network/subnet/port are create, updated or deleted.
-# There are four methods of sending notifications, logging (via the
-# log_file directive), rpc (via a message queue),
-# noop (no notifications sent, the default) or list of them
-
-# Defined in notifier api
-# notification_driver = quantum.openstack.common.notifier.no_op_notifier
-# default_notification_level = INFO
-# myhost = myhost.com
-# default_publisher_id = $myhost
-
-# Defined in rabbit_notifier for rpc way
-# notification_topics = notifications
-
-# Defined in list_notifier
-# list_notifier_drivers = quantum.openstack.common.notifier.no_op_notifier
-
-# Defined in rpc __init__
+# RPC configuration options. Defined in rpc __init__
# The messaging module to use, defaults to kombu.
-# rpc_backend =quantum.openstack.common.notifier.rpc.impl_kombu
+# rpc_backend = quantum.openstack.common.notifier.rpc.impl_kombu
# Size of RPC thread pool
# rpc_thread_pool_size = 64,
# Size of RPC connection pool
# Modules of exceptions that are permitted to be recreated
# upon receiving exception data from an rpc call.
# allowed_rpc_exception_modules = quantum.openstack.common.exception, nova.exception
-# AMQP exchange to connect to if using RabbitMQ or Qpid
-# control_exchange = nova
+# AMQP exchange to connect to if using RabbitMQ or QPID
+control_exchange = quantum
+
# If passed, use a fake RabbitMQ provider
# fake_rabbit = False
# ZeroMQ bind address. Should be a wildcard (*), an ethernet interface, or IP.
# The "host" option should point or resolve to this address.
# rpc_zmq_bind_address = *
-# MatchMaker driver
-# rpc_zmq_matchmaker = openstack.common.rpc.matchmaker.MatchMakerLocalhost
-# ZeroMQ receiver listening port
-# rpc_zmq_port = 9501
-# Number of ZeroMQ contexts, defaults to 1
-# rpc_zmq_contexts = 1
-# Directory for holding IPC sockets
-# rpc_zmq_ipc_dir = /var/run/openstack
+
+[QUOTAS]
+# number of networks allowed per tenant
+# quota_network = 10
+
+# number of subnets allowed per tenant
+# quota_subnet = 10
+
+# number of ports allowed per tenant
+# quota_port = 50
+
+# default driver to use for quota checks
+# quota_driver = quantum.quota.ConfDriver
+
+# ============ Notification System Options =====================
+
+# Notifications can be sent when network/subnet/port are create, updated or deleted.
+# There are four methods of sending notifications, logging (via the
+# log_file directive), rpc (via a message queue),
+# noop (no notifications sent, the default) or list of them
+
+# Defined in notifier api
+# notification_driver = quantum.openstack.common.notifier.no_op_notifier
+# default_notification_level = INFO
+# myhost = myhost.com
+# default_publisher_id = $myhost
+
+# Defined in rabbit_notifier for rpc way
+# notification_topics = notifications
+
+# Defined in list_notifier
+# list_notifier_drivers = quantum.openstack.common.notifier.no_op_notifier
[VLANS]
-vlan_start=1000
-vlan_end=3000
+vlan_start = 1000
+vlan_end = 3000
[DATABASE]
# This line MUST be changed to actually run the plugin.
# Change to "sudo quantum-rootwrap" to limit commands that can be run
# as root.
root_helper = sudo
+# Use Quantumv2 API
+# target_v2_api = False
+# Use RPC messaging to interface between agent and plugin
+# rpc = True
#
# @author: Juliano Martinez, Locaweb.
+import fcntl
import logging
import os
import shlex
+import socket
+import struct
import subprocess
LOG = logging.getLogger(__name__)
raise RuntimeError(m)
return return_stderr and (_stdout, _stderr) or _stdout
+
+
+def get_interface_mac(interface):
+ DEVICE_NAME_LEN = 15
+ MAC_START = 18
+ MAC_END = 24
+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ info = fcntl.ioctl(s.fileno(), 0x8927,
+ struct.pack('256s', interface[:DEVICE_NAME_LEN]))
+ return ''.join(['%02x:' % ord(char)
+ for char in info[MAC_START:MAC_END]])[:-1]
--- /dev/null
+# Copyright (c) 2012 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from quantum.common import topics
+from quantum.openstack.common import rpc
+
+
+def create_consumers(dispatcher, prefix, topic_details):
+ """Create agent RPC consumers.
+
+ :param dispatcher: The dispatcher to process the incoming messages.
+ :param prefix: Common prefix for the plugin/agent message queues.
+ :param topic_details: A list of topics. Each topic has a name and a
+ operation.
+
+ :returns: A common Connection.
+ """
+
+ connection = rpc.create_connection(new=True)
+ for topic, operation in topic_details:
+ topic_name = topics.get_topic_name(prefix, topic, operation)
+ connection.create_consumer(topic_name, dispatcher, fanout=True)
+ connection.consume_in_thread()
+ return connection
--- /dev/null
+# Copyright (c) 2012 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+NETWORK = 'network'
+SUBNET = 'subnet'
+PORT = 'port'
+
+CREATE = 'create'
+DELETE = 'delete'
+UPDATE = 'update'
+
+AGENT = 'q-agent-notifier'
+PLUGIN = 'q-plugin'
+
+
+def get_topic_name(prefix, table, operation):
+ """Create a topic name.
+
+ The topic name needs to be synced between the agent and the
+ plugin. The plugin will send a fanout message to all of the
+ listening agents so that the agents in turn can perform their
+ updates accordingly.
+
+ :param prefix: Common prefix for the plugin/agent message queues.
+ :param table: The table in question (NETWORK, SUBNET, PORT).
+ :param operation: The operation that invokes notification (CREATE,
+ DELETE, UPDATE)
+ :returns: The topic name.
+ """
+ return '%s-%s-%s' % (prefix, table, operation)
:raises: DuplicateOptError if a naming conflict is detected
"""
if opt.dest in opts:
- if opts[opt.dest]['opt'] is not opt:
+ if opts[opt.dest]['opt'] != opt:
raise DuplicateOptError(opt.name)
return True
else:
else:
self.deprecated_name = None
+ def __ne__(self, another):
+ return vars(self) != vars(another)
+
def _get_from_config_parser(self, cparser, section):
"""Retrieves the option value from a MultiConfigParser object.
Exceptions common to OpenStack projects
"""
-import itertools
import logging
dict(error=record.msg))
-def handle_exception(type, value, tb):
- extra = {}
- if CONF.verbose:
- extra['exc_info'] = (type, value, tb)
- getLogger().critical(str(value), **extra)
+def _create_logging_excepthook(product_name):
+ def logging_excepthook(type, value, tb):
+ extra = {}
+ if CONF.verbose:
+ extra['exc_info'] = (type, value, tb)
+ getLogger(product_name).critical(str(value), **extra)
+ return logging_excepthook
def setup(product_name):
"""Setup logging."""
- sys.excepthook = handle_exception
+ sys.excepthook = _create_logging_excepthook(product_name)
if CONF.log_config:
try:
for handler in log_root.handlers:
logger.addHandler(handler)
- # NOTE(jkoelker) Clear the handlers for the root logger that was setup
- # by basicConfig in nova/__init__.py and install the
- # NullHandler.
- root = logging.getLogger()
- for handler in root.handlers:
- root.removeHandler(handler)
- handler = NullHandler()
- handler.setFormatter(logging.Formatter())
- root.addHandler(handler)
-
-
_loggers = {}
def format(self, record):
"""Uses contextstring if request_id is set, otherwise default."""
- if 'instance' not in record.__dict__:
- record.__dict__['instance'] = ''
+ # NOTE(sdague): default the fancier formating params
+ # to an empty string so we don't throw an exception if
+ # they get used
+ for key in ('instance', 'color'):
+ if key not in record.__dict__:
+ record.__dict__[key] = ''
if record.__dict__.get('request_id', None):
self._fmt = CONF.logging_context_format_string
"""
raise NotImplementedError()
- def create_consumer(self, conf, topic, proxy, fanout=False):
+ def create_consumer(self, topic, proxy, fanout=False):
"""Create a consumer on this connection.
A consumer is associated with a message queue on the backend message
off of the queue will determine which method gets called on the proxy
object.
- :param conf: An openstack.common.cfg configuration object.
:param topic: This is a name associated with what to consume from.
Multiple instances of a service may consume from the same
topic. For example, all instances of nova-compute consume
"""
raise NotImplementedError()
- def create_worker(self, conf, topic, proxy, pool_name):
+ def create_worker(self, topic, proxy, pool_name):
"""Create a worker on this connection.
A worker is like a regular consumer of messages directed to a
be asked to process it. Load is distributed across the members
of the pool in round-robin fashion.
- :param conf: An openstack.common.cfg configuration object.
:param topic: This is a name associated with what to consume from.
Multiple instances of a service may consume from the same
topic.
if self.conf.qpid_reconnect_interval:
self.connection.reconnect_interval = (
self.conf.qpid_reconnect_interval)
- self.connection.hearbeat = self.conf.qpid_heartbeat
+ self.connection.heartbeat = self.conf.qpid_heartbeat
self.connection.protocol = self.conf.qpid_protocol
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
import calendar
import datetime
-import time
import iso8601
import sys
import time
+import eventlet
+import pyudev
from sqlalchemy.ext.sqlsoup import SqlSoup
-from quantum.openstack.common import cfg
+from quantum.agent.rpc import create_consumers
from quantum.common import config as logging_config
+from quantum.common import topics
+from quantum.openstack.common import cfg
+from quantum.openstack.common import context
+from quantum.openstack.common import rpc
+from quantum.openstack.common.rpc import dispatcher
+from quantum.openstack.common.rpc import proxy
from quantum.plugins.linuxbridge.common import config
from quantum.agent.linux import utils
LOG.debug("Done deleting subinterface %s" % interface)
-class LinuxBridgeQuantumAgent:
+class PluginApi(proxy.RpcProxy):
+ '''Agent side of the linux bridge rpc API.
+
+ API version history:
+ 1.0 - Initial version.
+
+ '''
+
+ BASE_RPC_API_VERSION = '1.0'
+
+ def __init__(self, topic):
+ super(PluginApi, self).__init__(
+ topic=topic, default_version=self.BASE_RPC_API_VERSION)
+
+ def get_device_details(self, context, device, agent_id):
+ return self.call(context,
+ self.make_msg('get_device_details', device=device,
+ agent_id=agent_id),
+ topic=self.topic)
+
+ def update_device_down(self, context, device, agent_id):
+ return self.call(context,
+ self.make_msg('update_device_down', device=device,
+ agent_id=agent_id),
+ topic=self.topic)
+
+
+class LinuxBridgeRpcCallbacks():
+
+ # Set RPC API version to 1.0 by default.
+ RPC_API_VERSION = '1.0'
+
+ def __init__(self, context, linux_br):
+ self.context = context
+ self.linux_br = linux_br
+
+ def network_delete(self, context, **kwargs):
+ LOG.debug("network_delete received")
+ network_id = kwargs.get('network_id')
+ bridge_name = self.linux_br.get_bridge_name(network_id)
+ # (TODO) garyk delete the bridge interface
+ LOG.debug("Delete %s", bridge_name)
+
+ def port_update(self, context, **kwargs):
+ LOG.debug("port_update received")
+ port = kwargs.get('port')
+ if port['admin_state_up']:
+ vlan_id = kwargs.get('vlan_id')
+ # create the networking for the port
+ self.linux_br.add_interface(port['network_id'],
+ vlan_id,
+ port['id'])
+ else:
+ bridge_name = self.linux_br.get_bridge_name(port['network_id'])
+ tap_device_name = self.linux_br.get_tap_device_name(port['id'])
+ self.linux_br.remove_interface(bridge_name, tap_device_name)
+
+ def create_rpc_dispatcher(self):
+ '''Get the rpc dispatcher for this manager.
+
+ If a manager would like to set an rpc API version, or support more than
+ one class as the target of rpc messages, override this method.
+ '''
+ return dispatcher.RpcDispatcher([self])
+
+
+class LinuxBridgeQuantumAgentDB:
def __init__(self, br_name_prefix, physical_interface, polling_interval,
- reconnect_interval, root_helper, target_v2_api):
+ reconnect_interval, root_helper, target_v2_api,
+ db_connection_url):
self.polling_interval = polling_interval
- self.reconnect_interval = reconnect_interval
self.root_helper = root_helper
self.setup_linux_bridge(br_name_prefix, physical_interface)
- self.db_connected = False
self.target_v2_api = target_v2_api
+ self.reconnect_interval = reconnect_interval
+ self.db_connected = False
+ self.db_connection_url = db_connection_url
def setup_linux_bridge(self, br_name_prefix, physical_interface):
self.linux_br = LinuxBridge(br_name_prefix, physical_interface,
self.root_helper)
- def process_port_binding(self, port_id, network_id, interface_id,
- vlan_id):
+ def process_port_binding(self, network_id, interface_id, vlan_id):
return self.linux_br.add_interface(network_id, vlan_id, interface_id)
+ def remove_port_binding(self, network_id, interface_id):
+ bridge_name = self.linux_br.get_bridge_name(network_id)
+ tap_device_name = self.linux_br.get_tap_device_name(interface_id)
+ return self.linux_br.remove_interface(bridge_name, tap_device_name)
+
def process_unplugged_interfaces(self, plugged_interfaces):
"""
If there are any tap devices that are not corresponding to the
interface_id = pb['interface_id']
vlan_id = str(vlan_bindings[pb['network_id']]['vlan_id'])
- if self.process_port_binding(port_id,
- pb['network_id'],
+ if self.process_port_binding(pb['network_id'],
interface_id,
vlan_id):
if self.target_v2_api:
return {VLAN_BINDINGS: vlan_bindings,
PORT_BINDINGS: port_bindings}
- def daemon_loop(self, db_connection_url):
+ def daemon_loop(self):
old_vlan_bindings = {}
old_port_bindings = []
self.db_connected = False
while True:
if not self.db_connected:
time.sleep(self.reconnect_interval)
- db = SqlSoup(db_connection_url)
+ db = SqlSoup(self.db_connection_url)
self.db_connected = True
LOG.info("Connecting to database \"%s\" on %s" %
(db.engine.url.database, db.engine.url.host))
time.sleep(self.polling_interval)
+class LinuxBridgeQuantumAgentRPC:
+
+ def __init__(self, br_name_prefix, physical_interface, polling_interval,
+ root_helper):
+ self.polling_interval = polling_interval
+ self.root_helper = root_helper
+ self.setup_linux_bridge(br_name_prefix, physical_interface)
+ self.setup_rpc(physical_interface)
+
+ def setup_rpc(self, physical_interface):
+ mac = utils.get_interface_mac(physical_interface)
+ self.agent_id = '%s%s' % ('lb', (mac.replace(":", "")))
+ self.topic = topics.AGENT
+ self.plugin_rpc = PluginApi(topics.PLUGIN)
+
+ # RPC network init
+ self.context = context.RequestContext('quantum', 'quantum',
+ is_admin=False)
+ # Handle updates from service
+ self.callbacks = LinuxBridgeRpcCallbacks(self.context,
+ self.linux_br)
+ self.dispatcher = self.callbacks.create_rpc_dispatcher()
+ # Define the listening consumers for the agent
+ consumers = [[topics.PORT, topics.UPDATE],
+ [topics.NETWORK, topics.DELETE]]
+ self.connection = create_consumers(self.dispatcher, self.topic,
+ consumers)
+ self.udev = pyudev.Context()
+ monitor = pyudev.Monitor.from_netlink(self.udev)
+ monitor.filter_by('net')
+
+ def setup_linux_bridge(self, br_name_prefix, physical_interface):
+ self.linux_br = LinuxBridge(br_name_prefix, physical_interface,
+ self.root_helper)
+
+ def process_port_binding(self, network_id, interface_id, vlan_id):
+ return self.linux_br.add_interface(network_id, vlan_id, interface_id)
+
+ def remove_port_binding(self, network_id, interface_id):
+ bridge_name = self.linux_br.get_bridge_name(network_id)
+ tap_device_name = self.linux_br.get_tap_device_name(interface_id)
+ return self.linux_br.remove_interface(bridge_name, tap_device_name)
+
+ def update_devices(self, registered_devices):
+ devices = self.udev_get_all_tap_devices()
+ if devices == registered_devices:
+ return
+ added = devices - registered_devices
+ removed = registered_devices - devices
+ return {'current': devices,
+ 'added': added,
+ 'removed': removed}
+
+ def udev_get_all_tap_devices(self):
+ devices = set()
+ for device in self.udev.list_devices(subsystem='net'):
+ name = self.udev_get_name(device)
+ if self.is_tap_device(name):
+ devices.add(name)
+ return devices
+
+ def is_tap_device(self, name):
+ return name.startswith(TAP_INTERFACE_PREFIX)
+
+ def udev_get_name(self, device):
+ return device.sys_name
+
+ def process_network_devices(self, device_info):
+ resync_a = False
+ resync_b = False
+ if 'added' in device_info:
+ resync_a = self.treat_devices_added(device_info['added'])
+ if 'removed' in device_info:
+ resync_b = self.treat_devices_removed(device_info['removed'])
+ # If one of the above operations fails => resync with plugin
+ return (resync_a | resync_b)
+
+ def treat_devices_added(self, devices):
+ resync = False
+ for device in devices:
+ LOG.info("Port %s added", device)
+ try:
+ details = self.plugin_rpc.get_device_details(self.context,
+ device,
+ self.agent_id)
+ except Exception as e:
+ LOG.debug("Unable to get port details for %s: %s", device, e)
+ resync = True
+ continue
+ if 'port_id' in details:
+ LOG.info("Port %s updated. Details: %s", device, details)
+ if details['admin_state_up']:
+ # create the networking for the port
+ self.process_port_binding(details['network_id'],
+ details['port_id'],
+ details['vlan_id'])
+ else:
+ self.remove_port_binding(details['network_id'],
+ details['port_id'])
+ else:
+ LOG.debug("Device %s not defined on plugin", device)
+ return resync
+
+ def treat_devices_removed(self, devices):
+ resync = False
+ for device in devices:
+ LOG.info("Attachment %s removed", device)
+ try:
+ details = self.plugin_rpc.update_device_down(self.context,
+ device,
+ self.agent_id)
+ except Exception as e:
+ LOG.debug("port_removed failed for %s: %s", device, e)
+ resync = True
+ if details['exists']:
+ LOG.info("Port %s updated.", device)
+ # Nothing to do regarding local networking
+ else:
+ LOG.debug("Device %s not defined on plugin", device)
+ return resync
+
+ def daemon_loop(self):
+ sync = True
+ devices = set()
+
+ LOG.info("LinuxBridge Agent RPC Daemon Started!")
+
+ while True:
+ start = time.time()
+ if sync:
+ LOG.info("Agent out of sync with plugin!")
+ devices.clear()
+ sync = False
+
+ device_info = self.update_devices(devices)
+
+ # notify plugin about device deltas
+ if device_info:
+ LOG.debug("Agent loop has new devices!")
+ # If treat devices fails - indicates must resync with plugin
+ sync = self.process_network_devices(device_info)
+ devices = device_info['current']
+
+ # sleep till end of polling interval
+ elapsed = (time.time() - start)
+ if (elapsed < self.polling_interval):
+ time.sleep(self.polling_interval - elapsed)
+ else:
+ LOG.debug("Loop iteration exceeded interval (%s vs. %s)!",
+ self.polling_interval, elapsed)
+
+
def main():
cfg.CONF(args=sys.argv, project='quantum')
polling_interval = cfg.CONF.AGENT.polling_interval
reconnect_interval = cfg.CONF.DATABASE.reconnect_interval
root_helper = cfg.CONF.AGENT.root_helper
- 'Establish database connection and load models'
- db_connection_url = cfg.CONF.DATABASE.sql_connection
- plugin = LinuxBridgeQuantumAgent(br_name_prefix, physical_interface,
- polling_interval, reconnect_interval,
- root_helper, cfg.CONF.AGENT.target_v2_api)
+ rpc = cfg.CONF.AGENT.rpc
+ if not cfg.CONF.AGENT.target_v2_api:
+ rpc = False
+
+ if rpc:
+ plugin = LinuxBridgeQuantumAgentRPC(br_name_prefix,
+ physical_interface,
+ polling_interval,
+ root_helper)
+ else:
+ db_connection_url = cfg.CONF.DATABASE.sql_connection
+ target_v2_api = cfg.CONF.AGENT.target_v2_api
+ plugin = LinuxBridgeQuantumAgentDB(br_name_prefix,
+ physical_interface,
+ polling_interval,
+ reconnect_interval,
+ root_helper,
+ target_v2_api,
+ db_connection_url)
LOG.info("Agent initialized successfully, now running... ")
- plugin.daemon_loop(db_connection_url)
-
+ plugin.daemon_loop()
sys.exit(0)
if __name__ == "__main__":
+ eventlet.monkey_patch()
main()
cfg.IntOpt('polling_interval', default=2),
cfg.StrOpt('root_helper', default='sudo'),
cfg.BoolOpt('target_v2_api', default=False),
+ cfg.BoolOpt('rpc', default=True),
]
from sqlalchemy import func
from sqlalchemy.orm import exc
+from quantum.api import api_common
from quantum.common import exceptions as q_exc
import quantum.db.api as db
+from quantum.db import models_v2
from quantum.openstack.common import cfg
from quantum.plugins.linuxbridge.common import config
from quantum.plugins.linuxbridge.common import exceptions as c_exc
return binding
except exc.NoResultFound:
raise q_exc.NetworkNotFound(net_id=netid)
+
+
+def get_port_from_device(device):
+ """Get port from database"""
+ LOG.debug("get_port_from_device() called")
+ session = db.get_session()
+ ports = session.query(models_v2.Port).all()
+ if not ports:
+ return
+ for port in ports:
+ if port['id'].startswith(device):
+ return port
+ return
+
+
+def set_port_status(port_id, status):
+ """Set the port status"""
+ LOG.debug("set_port_status as %s called", status)
+ session = db.get_session()
+ try:
+ port = session.query(models_v2.Port).filter_by(id=port_id).one()
+ port['status'] = status
+ if status == api_common.OperationalStatus.DOWN:
+ port['device_id'] = ''
+ session.merge(port)
+ session.flush()
+ except exc.NoResultFound:
+ raise q_exc.PortNotFound(port_id=port_id)
import logging
+from quantum.api import api_common
from quantum.api.v2 import attributes
+from quantum.common import topics
from quantum.db import db_base_plugin_v2
from quantum.db import models_v2
+from quantum.openstack.common import context
+from quantum.openstack.common import cfg
+from quantum.openstack.common import rpc
+from quantum.openstack.common.rpc import dispatcher
+from quantum.openstack.common.rpc import proxy
from quantum.plugins.linuxbridge.db import l2network_db as cdb
from quantum import policy
+
LOG = logging.getLogger(__name__)
+class LinuxBridgeRpcCallbacks():
+
+ # Set RPC API version to 1.0 by default.
+ RPC_API_VERSION = '1.0'
+ # Device names start with "tap"
+ TAP_PREFIX_LEN = 3
+
+ def __init__(self, context):
+ self.context = context
+
+ def create_rpc_dispatcher(self):
+ '''Get the rpc dispatcher for this manager.
+
+ If a manager would like to set an rpc API version, or support more than
+ one class as the target of rpc messages, override this method.
+ '''
+ return dispatcher.RpcDispatcher([self])
+
+ def get_device_details(self, context, **kwargs):
+ """Agent requests device details"""
+ agent_id = kwargs.get('agent_id')
+ device = kwargs.get('device')
+ LOG.debug("Device %s details requested from %s", device, agent_id)
+ port = cdb.get_port_from_device(device[self.TAP_PREFIX_LEN:])
+ if port:
+ vlan_binding = cdb.get_vlan_binding(port['network_id'])
+ entry = {'device': device,
+ 'vlan_id': vlan_binding['vlan_id'],
+ 'network_id': port['network_id'],
+ 'port_id': port['id'],
+ 'admin_state_up': port['admin_state_up']}
+ # Set the port status to UP
+ cdb.set_port_status(port['id'], api_common.OperationalStatus.UP)
+ else:
+ entry = {'device': device}
+ LOG.debug("%s can not be found in database", device)
+ return entry
+
+ def update_device_down(self, context, **kwargs):
+ """Device no longer exists on agent"""
+ # (TODO) garyk - live migration and port status
+ agent_id = kwargs.get('agent_id')
+ device = kwargs.get('device')
+ LOG.debug("Device %s no longer exists on %s", device, agent_id)
+ port = cdb.get_port_from_device(device[self.TAP_PREFIX_LEN:])
+ if port:
+ entry = {'device': device,
+ 'exists': True}
+ # Set port status to DOWN
+ cdb.set_port_status(port['id'], api_common.OperationalStatus.DOWN)
+ else:
+ entry = {'device': device,
+ 'exists': False}
+ LOG.debug("%s can not be found in database", device)
+ return entry
+
+
+class AgentNotifierApi(proxy.RpcProxy):
+ '''Agent side of the linux bridge rpc API.
+
+ API version history:
+ 1.0 - Initial version.
+
+ '''
+
+ BASE_RPC_API_VERSION = '1.0'
+
+ def __init__(self, topic):
+ super(AgentNotifierApi, self).__init__(
+ topic=topic, default_version=self.BASE_RPC_API_VERSION)
+ self.topic_network_delete = topics.get_topic_name(topic,
+ topics.NETWORK,
+ topics.DELETE)
+ self.topic_port_update = topics.get_topic_name(topic,
+ topics.PORT,
+ topics.UPDATE)
+
+ def network_delete(self, context, network_id):
+ self.fanout_cast(context,
+ self.make_msg('network_delete',
+ network_id=network_id),
+ topic=self.topic_network_delete)
+
+ def port_update(self, context, port, vlan_id):
+ self.fanout_cast(context,
+ self.make_msg('port_update',
+ port=port,
+ vlan_id=vlan_id),
+ topic=self.topic_port_update)
+
+
class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2):
"""Implement the Quantum abstractions using Linux bridging.
def __init__(self):
cdb.initialize(base=models_v2.model_base.BASEV2)
+ self.rpc = cfg.CONF.AGENT.rpc
+ if cfg.CONF.AGENT.rpc and cfg.CONF.AGENT.target_v2_api:
+ self.setup_rpc()
+ if not cfg.CONF.AGENT.target_v2_api:
+ self.rpc = False
LOG.debug("Linux Bridge Plugin initialization complete")
+ def setup_rpc(self):
+ # RPC support
+ self.topic = topics.PLUGIN
+ self.context = context.RequestContext('quantum', 'quantum',
+ is_admin=False)
+ self.conn = rpc.create_connection(new=True)
+ self.callbacks = LinuxBridgeRpcCallbacks(self.context)
+ self.dispatcher = self.callbacks.create_rpc_dispatcher()
+ self.conn.create_consumer(self.topic, self.dispatcher,
+ fanout=False)
+ # Consume from all consumers in a thread
+ self.conn.consume_in_thread()
+ self.notifier = AgentNotifierApi(topics.AGENT)
+
# TODO(rkukura) Use core mechanism for attribute authorization
# when available.
vlan_binding = cdb.get_vlan_binding(id)
result = super(LinuxBridgePluginV2, self).delete_network(context, id)
cdb.release_vlanid(vlan_binding['vlan_id'])
+ if self.rpc:
+ self.notifier.network_delete(self.context, id)
return result
def get_network(self, context, id, fields=None, verbose=None):
self._extend_network_dict(context, net)
# TODO(rkukura): Filter on extended attributes.
return [self._fields(net, fields) for net in nets]
+
+ def update_port(self, context, id, port):
+ if self.rpc:
+ original_port = super(LinuxBridgePluginV2, self).get_port(context,
+ id)
+ port = super(LinuxBridgePluginV2, self).update_port(context, id, port)
+ if self.rpc:
+ if original_port['admin_state_up'] != port['admin_state_up']:
+ vlan_binding = cdb.get_vlan_binding(port['network_id'])
+ self.notifier.port_update(self.context, port,
+ vlan_binding['vlan_id'])
+ return port
vlan_id = vlan_bind[lconst.VLANID]
self._linuxbridge_quantum_agent.process_port_binding(
- new_port[lconst.PORT_ID], new_network[lconst.NET_ID],
- device_name, str(vlan_id))
+ new_network[lconst.NET_ID], device_name, str(vlan_id))
list_interface = (self._linuxbridge_quantum_agent.linux_br.
get_interfaces_on_bridge(bridge_name))
vlan_id = vlan_bind[lconst.VLANID]
self._linuxbridge_quantum_agent.process_port_binding(
- new_port[lconst.PORT_ID], new_network[lconst.NET_ID],
- interface_id, str(vlan_id))
+ new_network[lconst.NET_ID], interface_id, str(vlan_id))
list_interface = (self._linuxbridge_quantum_agent.linux_br.
get_interfaces_on_bridge(bridge_name))
vlan_id = vlan_bind[lconst.VLANID]
self._linuxbridge_quantum_agent.process_port_binding(
- new_port[lconst.PORT_ID], new_network[lconst.NET_ID],
- interface_id, str(vlan_id))
+ new_network[lconst.NET_ID], interface_id, str(vlan_id))
list_interface = (self._linuxbridge_quantum_agent.linux_br.
get_interfaces_on_bridge(bridge_name))
vlan_id = vlan_bind[lconst.VLANID]
self._linuxbridge_quantum_agent.process_port_binding(
- new_port[lconst.PORT_ID], new_network[lconst.NET_ID],
- interface_id, str(vlan_id))
+ new_network[lconst.NET_ID], interface_id, str(vlan_id))
list_interface = (self._linuxbridge_quantum_agent.linux_br.
get_interfaces_on_bridge(bridge_name))
self._linuxbridge_plugin.unplug_interface(tenant_id,
vlan_id = vlan_bind[lconst.VLANID]
self._linuxbridge_quantum_agent.process_port_binding(
- new_port[lconst.PORT_ID], new_network[lconst.NET_ID],
- interface_id, str(vlan_id))
+ new_network[lconst.NET_ID], interface_id, str(vlan_id))
list_interface = (self._linuxbridge_quantum_agent.linux_br.
get_interfaces_on_bridge(bridge_name))
self._linuxbridge_plugin.unplug_interface(tenant_id,
self.gw_name_prefix = "gw-"
self.tap_name_prefix = "tap"
self.v2 = True
+ self.db_connection = 'sqlite://'
self._linuxbridge_plugin = LinuxBridgePlugin.LinuxBridgePlugin()
try:
fh = open(self.config_file)
self._linuxbridge = linux_agent.LinuxBridge(self.br_name_prefix,
self.physical_interface,
self.root_helper)
- self._linuxbridge_quantum_agent = linux_agent.LinuxBridgeQuantumAgent(
+ plugin = linux_agent.LinuxBridgeQuantumAgentDB(
self.br_name_prefix,
self.physical_interface,
self.polling_interval,
self.reconnect_interval,
self.root_helper,
- self.v2)
+ self.v2,
+ self.db_connection)
+ self._linuxbridge_quantum_agent = plugin
def run_cmd(self, args):
cmd = shlex.split(self.root_helper) + args
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012, Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Unit Tests for linuxbridge rpc
+"""
+
+import stubout
+import unittest2
+
+from quantum.common import topics
+from quantum.openstack.common import context
+from quantum.openstack.common import rpc
+from quantum.plugins.linuxbridge.agent import linuxbridge_quantum_agent as alb
+from quantum.plugins.linuxbridge import lb_quantum_plugin as plb
+
+
+class rpcApiTestCase(unittest2.TestCase):
+
+ def _test_lb_api(self, rpcapi, topic, method, rpc_method, **kwargs):
+ ctxt = context.RequestContext('fake_user', 'fake_project')
+ expected_retval = 'foo' if method == 'call' else None
+ expected_msg = rpcapi.make_msg(method, **kwargs)
+ expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION
+ if rpc_method == 'cast' and method == 'run_instance':
+ kwargs['call'] = False
+
+ self.fake_args = None
+ self.fake_kwargs = None
+
+ def _fake_rpc_method(*args, **kwargs):
+ self.fake_args = args
+ self.fake_kwargs = kwargs
+ if expected_retval:
+ return expected_retval
+
+ self.stubs = stubout.StubOutForTesting()
+ self.stubs.Set(rpc, rpc_method, _fake_rpc_method)
+
+ retval = getattr(rpcapi, method)(ctxt, **kwargs)
+
+ self.assertEqual(retval, expected_retval)
+ expected_args = [ctxt, topic, expected_msg]
+
+ for arg, expected_arg in zip(self.fake_args, expected_args):
+ self.assertEqual(arg, expected_arg)
+
+ def test_delete_network(self):
+ rpcapi = plb.AgentNotifierApi(topics.AGENT)
+ self._test_lb_api(rpcapi,
+ topics.get_topic_name(topics.AGENT,
+ topics.NETWORK,
+ topics.DELETE),
+ 'network_delete', rpc_method='fanout_cast',
+ network_id='fake_request_spec')
+
+ def test_port_update(self):
+ rpcapi = plb.AgentNotifierApi(topics.AGENT)
+ self._test_lb_api(rpcapi,
+ topics.get_topic_name(topics.AGENT,
+ topics.PORT,
+ topics.UPDATE),
+ 'port_update', rpc_method='fanout_cast',
+ port='fake_port', vlan_id='fake_vlan_id')
+
+ def test_device_details(self):
+ rpcapi = alb.PluginApi(topics.PLUGIN)
+ self._test_lb_api(rpcapi, topics.PLUGIN,
+ 'get_device_details', rpc_method='call',
+ device='fake_device',
+ agent_id='fake_agent_id')
+
+ def test_update_device_down(self):
+ rpcapi = alb.PluginApi(topics.PLUGIN)
+ self._test_lb_api(rpcapi, topics.PLUGIN,
+ 'update_device_down', rpc_method='call',
+ device='fake_device',
+ agent_id='fake_agent_id')
netaddr
python-gflags==1.3
python-quantumclient>=0.1,<0.2
+pyudev
sqlalchemy>0.6.4
webob==1.2.0