def _build_flow_expr_arr(self, **kwargs):
flow_expr_arr = []
is_delete_expr = kwargs.get('delete', False)
-
if not is_delete_expr:
prefix = ("hard_timeout=%s,idle_timeout=%s,priority=%s" %
(kwargs.get('hard_timeout', '0'),
edge_ports.append(p)
return edge_ports
+
+ def get_vif_port_set(self):
+ edge_ports = set()
+ port_names = self.get_port_name_list()
+ for name in port_names:
+ external_ids = self.db_get_map("Interface", name, "external_ids")
+ if "iface-id" in external_ids and "attached-mac" in external_ids:
+ edge_ports.add(external_ids['iface-id'])
+ elif ("xs-vif-uuid" in external_ids and
+ "attached-mac" in external_ids):
+ # if this is a xenserver and iface-id is not automatically
+ # synced to OVS from XAPI, we grab it from XAPI directly
+ iface_id = self.get_xapi_iface_id(external_ids["xs-vif-uuid"])
+ edge_ports.add(iface_id)
+ return edge_ports
+
+ def get_vif_port(self, port_name):
+ external_ids = self.db_get_map("Interface", port_name, "external_ids")
+ ofport = self.db_get_val("Interface", port_name, "ofport")
+ return VifPort(port_name, ofport, external_ids["iface-id"],
+ external_ids["attached-mac"], self)
from quantum.common import topics
from quantum.openstack.common import rpc
+from quantum.openstack.common.rpc import proxy
def create_consumers(dispatcher, prefix, topic_details):
connection.create_consumer(topic_name, dispatcher, fanout=True)
connection.consume_in_thread()
return connection
+
+
+class PluginApi(proxy.RpcProxy):
+ '''Agent side of the rpc API.
+
+ API version history:
+ 1.0 - Initial version.
+
+ '''
+
+ BASE_RPC_API_VERSION = '1.0'
+
+ def __init__(self, topic):
+ super(PluginApi, self).__init__(
+ topic=topic, default_version=self.BASE_RPC_API_VERSION)
+
+ def get_device_details(self, context, device, agent_id):
+ return self.call(context,
+ self.make_msg('get_device_details', device=device,
+ agent_id=agent_id),
+ topic=self.topic)
+
+ def update_device_down(self, context, device, agent_id):
+ return self.call(context,
+ self.make_msg('update_device_down', device=device,
+ agent_id=agent_id),
+ topic=self.topic)
+
+ def tunnel_sync(self, context, tunnel_ip):
+ return self.call(context,
+ self.make_msg('tunnel_sync', tunnel_ip=tunnel_ip),
+ topic=self.topic)
import pyudev
from sqlalchemy.ext.sqlsoup import SqlSoup
-from quantum.agent.rpc import create_consumers
+from quantum.agent import rpc as agent_rpc
from quantum.common import config as logging_config
from quantum.common import topics
from quantum.openstack.common import cfg
from quantum.openstack.common import context
from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import dispatcher
-from quantum.openstack.common.rpc import proxy
from quantum.plugins.linuxbridge.common import config
from quantum.agent.linux import utils
LOG.debug("Done deleting subinterface %s" % interface)
-class PluginApi(proxy.RpcProxy):
- '''Agent side of the linux bridge rpc API.
-
- API version history:
- 1.0 - Initial version.
-
- '''
-
- BASE_RPC_API_VERSION = '1.0'
-
- def __init__(self, topic):
- super(PluginApi, self).__init__(
- topic=topic, default_version=self.BASE_RPC_API_VERSION)
-
- def get_device_details(self, context, device, agent_id):
- return self.call(context,
- self.make_msg('get_device_details', device=device,
- agent_id=agent_id),
- topic=self.topic)
-
- def update_device_down(self, context, device, agent_id):
- return self.call(context,
- self.make_msg('update_device_down', device=device,
- agent_id=agent_id),
- topic=self.topic)
-
-
class LinuxBridgeRpcCallbacks():
# Set RPC API version to 1.0 by default.
mac = utils.get_interface_mac(physical_interface)
self.agent_id = '%s%s' % ('lb', (mac.replace(":", "")))
self.topic = topics.AGENT
- self.plugin_rpc = PluginApi(topics.PLUGIN)
+ self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
# RPC network init
self.context = context.RequestContext('quantum', 'quantum',
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE]]
- self.connection = create_consumers(self.dispatcher, self.topic,
- consumers)
+ self.connection = agent_rpc.create_consumers(self.dispatcher,
+ self.topic,
+ consumers)
self.udev = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(self.udev)
monitor.filter_by('net')
import stubout
import unittest2
+from quantum.agent import rpc as agent_rpc
from quantum.common import topics
from quantum.openstack.common import context
from quantum.openstack.common import rpc
-from quantum.plugins.linuxbridge.agent import linuxbridge_quantum_agent as alb
from quantum.plugins.linuxbridge import lb_quantum_plugin as plb
port='fake_port', vlan_id='fake_vlan_id')
def test_device_details(self):
- rpcapi = alb.PluginApi(topics.PLUGIN)
+ rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_lb_api(rpcapi, topics.PLUGIN,
'get_device_details', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id')
def test_update_device_down(self):
- rpcapi = alb.PluginApi(topics.PLUGIN)
+ rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_lb_api(rpcapi, topics.PLUGIN,
'update_device_down', rpc_method='call',
device='fake_device',
import sys
import time
+import eventlet
from sqlalchemy.ext import sqlsoup
+from quantum.agent import rpc as agent_rpc
from quantum.agent.linux import ovs_lib
+from quantum.agent.linux import utils
from quantum.common import config as logging_config
+from quantum.common import topics
from quantum.openstack.common import cfg
+from quantum.openstack.common import context
+from quantum.openstack.common import rpc
+from quantum.openstack.common.rpc import dispatcher
from quantum.plugins.openvswitch.common import config
logging.basicConfig()
return hash(self.id)
+class OVSRpcCallbacks():
+
+ # Set RPC API version to 1.0 by default.
+ RPC_API_VERSION = '1.0'
+
+ def __init__(self, context, int_br, local_ip=None, tun_br=None):
+ self.context = context
+ self.int_br = int_br
+ # Tunneling variables
+ self.local_ip = local_ip
+ self.tun_br = tun_br
+
+ def network_delete(self, context, **kwargs):
+ LOG.debug("network_delete received")
+ network_id = kwargs.get('network_id')
+ # (TODO) garyk delete the bridge interface
+ LOG.debug("Delete %s", network_id)
+
+ def port_update(self, context, **kwargs):
+ LOG.debug("port_update received")
+ port = kwargs.get('port')
+ port_name = 'tap%s' % port['id'][0:11]
+ vif_port = self.int_br.get_vif_port(port_name)
+ if port['admin_state_up']:
+ vlan_id = kwargs.get('vlan_id')
+ # create the networking for the port
+ self.int_br.set_db_attribute("Port", vif_port.port_name,
+ "tag", str(vlan_id))
+ self.int_br.delete_flows(in_port=vif_port.ofport)
+ else:
+ self.int_br.clear_db_attribute("Port", vif_port.port_name, "tag")
+
+ def tunnel_update(self, context, **kwargs):
+ LOG.debug("tunnel_update received")
+ tunnel_ip = kwargs.get('tunnel_ip')
+ tunnel_id = kwargs.get('tunnel_id')
+ if tunnel_ip == self.local_ip:
+ return
+ tun_name = 'gre-%s' % tunnel_id
+ self.tun_br.add_tunnel_port(tun_name, tunnel_ip)
+
+ def create_rpc_dispatcher(self):
+ '''Get the rpc dispatcher for this manager.
+
+ If a manager would like to set an rpc API version, or support more than
+ one class as the target of rpc messages, override this method.
+ '''
+ return dispatcher.RpcDispatcher([self])
+
+
class OVSQuantumAgent(object):
def __init__(self, integ_br, root_helper, polling_interval,
- reconnect_interval, target_v2_api=False):
+ reconnect_interval, target_v2_api, rpc):
self.root_helper = root_helper
self.setup_integration_br(integ_br)
self.polling_interval = polling_interval
self.reconnect_interval = reconnect_interval
self.target_v2_api = target_v2_api
+ self.rpc = rpc
+ if rpc:
+ self.setup_rpc(integ_br)
+
+ def setup_rpc(self, integ_br):
+ mac = utils.get_interface_mac(integ_br)
+ self.agent_id = '%s' % (mac.replace(":", ""))
+ self.topic = topics.AGENT
+ self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
+
+ # RPC network init
+ self.context = context.RequestContext('quantum', 'quantum',
+ is_admin=False)
+ # Handle updates from service
+ self.callbacks = OVSRpcCallbacks(self.context, self.int_br)
+ self.dispatcher = self.callbacks.create_rpc_dispatcher()
+ # Define the listening consumers for the agent
+ consumers = [[topics.PORT, topics.UPDATE],
+ [topics.NETWORK, topics.DELETE]]
+ self.connection = agent_rpc.create_consumers(self.dispatcher,
+ self.topic,
+ consumers)
def port_bound(self, port, vlan_id):
self.int_br.set_db_attribute("Port", port.port_name,
# switch all traffic using L2 learning
self.int_br.add_flow(priority=1, actions="normal")
- def daemon_loop(self, db_connection_url):
+ def db_loop(self, db_connection_url):
'''Main processing loop for Non-Tunneling Agent.
:param options: database information - in the event need to reconnect
time.sleep(self.polling_interval)
+ def update_ports(self, registered_ports):
+ ports = self.int_br.get_vif_port_set()
+ if ports == registered_ports:
+ return
+ added = ports - registered_ports
+ removed = registered_ports - ports
+ return {'current': ports,
+ 'added': added,
+ 'removed': removed}
+
+ def treat_devices_added(self, devices):
+ resync = False
+ for device in devices:
+ LOG.info("Port %s added", device)
+ try:
+ details = self.plugin_rpc.get_device_details(self.context,
+ device,
+ self.agent_id)
+ except Exception as e:
+ LOG.debug("Unable to get port details for %s: %s", device, e)
+ resync = True
+ continue
+ if 'port_id' in details:
+ LOG.info("Port %s updated. Details: %s", device, details)
+ port_name = 'tap%s' % details['port_id'][0:11]
+ port = self.int_br.get_vif_port(port_name)
+ if details['admin_state_up']:
+ self.port_bound(port, details['vlan_id'])
+ else:
+ self.port_unbound(port, True)
+ else:
+ LOG.debug("Device %s not defined on plugin", device)
+ return resync
+
+ def treat_devices_removed(self, devices):
+ resync = False
+ for device in devices:
+ LOG.info("Attachment %s removed", device)
+ try:
+ details = self.plugin_rpc.update_device_down(self.context,
+ device,
+ self.agent_id)
+ except Exception as e:
+ LOG.debug("port_removed failed for %s: %s", device, e)
+ resync = True
+ if details['exists']:
+ LOG.info("Port %s updated.", device)
+ # Nothing to do regarding local networking
+ else:
+ LOG.debug("Device %s not defined on plugin", device)
+ return resync
+
+ def process_network_ports(self, port_info):
+ resync_a = False
+ resync_b = False
+ if 'added' in port_info:
+ resync_a = self.treat_devices_added(port_info['added'])
+ if 'removed' in port_info:
+ resync_b = self.treat_devices_removed(port_info['removed'])
+ # If one of the above opertaions fails => resync with plugin
+ return (resync_a | resync_b)
+
+ def rpc_loop(self):
+ sync = True
+ ports = set()
+
+ while True:
+ start = time.time()
+ if sync:
+ LOG.info("Agent out of sync with plugin!")
+ ports.clear()
+ sync = False
+
+ port_info = self.update_ports(ports)
+
+ # notify plugin about port deltas
+ if port_info:
+ LOG.debug("Agent loop has new devices!")
+ # If treat devices fails - indicates must resync with plugin
+ sync = self.process_network_ports(port_info)
+ ports = port_info['current']
+
+ # sleep till end of polling interval
+ elapsed = (time.time() - start)
+ if (elapsed < self.polling_interval):
+ time.sleep(self.polling_interval - elapsed)
+ else:
+ LOG.debug("Loop iteration exceeded interval (%s vs. %s)!",
+ self.polling_interval, elapsed)
+
+ def daemon_loop(self, db_connection_url):
+ if self.rpc:
+ self.rpc_loop()
+ else:
+ self.db_loop(db_connection_url)
+
class OVSQuantumTunnelAgent(object):
'''Implements OVS-based tunneling.
MAX_VLAN_TAG = 4094
def __init__(self, integ_br, tun_br, local_ip, root_helper,
- polling_interval, reconnect_interval, target_v2_api=False):
+ polling_interval, reconnect_interval, target_v2_api,
+ rpc):
'''Constructor.
:param integ_br: name of the integration bridge.
:param polling_interval: interval (secs) to poll DB.
:param reconnect_internal: retry interval (secs) on DB error.
:param target_v2_api: if True use v2 api.
+ :param rpc: if True use RPC interface to interface with plugin.
'''
self.root_helper = root_helper
self.available_local_vlans = set(
self.tunnel_count = 0
self.setup_tunnel_br(tun_br)
self.target_v2_api = target_v2_api
+ self.rpc = rpc
+ if rpc:
+ self.setup_rpc(integ_br)
+
+ def setup_rpc(self, integ_br):
+ mac = utils.get_interface_mac(integ_br)
+ self.agent_id = '%s%s' % ('ovs', (mac.replace(":", "")))
+ self.topic = topics.AGENT
+ self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
+
+ # RPC network init
+ self.context = context.RequestContext('quantum', 'quantum',
+ is_admin=False)
+ # Handle updates from service
+ self.callbacks = OVSRpcCallbacks(self.context, self.int_br,
+ self.local_ip, self.tun_br)
+ self.dispatcher = self.callbacks.create_rpc_dispatcher()
+ # Define the listening consumers for the agent
+ consumers = [[topics.PORT, topics.UPDATE],
+ [topics.NETWORK, topics.DELETE],
+ [config.TUNNEL, topics.UPDATE]]
+ self.connection = agent_rpc.create_consumers(self.dispatcher,
+ self.topic,
+ consumers)
def provision_local_vlan(self, net_uuid, lsw_id):
'''Provisions a local VLAN.
except:
LOG.exception("Problem connecting to database")
- def daemon_loop(self, db_connection_url):
+ def db_loop(self, db_connection_url):
'''Main processing loop for Tunneling Agent.
:param options: database information - in the event need to reconnect
LOG.exception("Main-loop Exception:")
self.rollback_until_success(db)
+ def update_ports(self, registered_ports):
+ ports = self.int_br.get_vif_port_set()
+ if ports == registered_ports:
+ return
+ added = ports - registered_ports
+ removed = registered_ports - ports
+ return {'current': ports,
+ 'added': added,
+ 'removed': removed}
+
+ def treat_devices_added(self, devices):
+ resync = False
+ for device in devices:
+ LOG.info("Port %s added", device)
+ try:
+ details = self.plugin_rpc.get_device_details(self.context,
+ device,
+ self.agent_id)
+ except Exception as e:
+ LOG.debug("Unable to get port details for %s: %s", device, e)
+ resync = True
+ continue
+ if 'port_id' in details:
+ LOG.info("Port %s updated. Details: %s", device, details)
+ port_name = 'tap%s' % details['port_id'][0:11]
+ port = self.int_br.get_vif_port(port_name)
+ if details['admin_state_up']:
+ self.port_bound(port, details['network_id'],
+ details['vlan_id'])
+ else:
+ self.port_unbound(port, details['network_id'])
+ else:
+ LOG.debug("Device %s not defined on plugin", device)
+ return resync
+
+ def treat_devices_removed(self, devices):
+ resync = False
+ for device in devices:
+ LOG.info("Attachment %s removed", device)
+ try:
+ details = self.plugin_rpc.update_device_down(self.context,
+ device,
+ self.agent_id)
+ except Exception as e:
+ LOG.debug("port_removed failed for %s: %s", device, e)
+ resync = True
+ if details['exists']:
+ LOG.info("Port %s updated.", device)
+ # Nothing to do regarding local networking
+ else:
+ LOG.debug("Device %s not defined on plugin", device)
+ return resync
+
+ def process_network_ports(self, port_info):
+ resync_a = False
+ resync_b = False
+ if 'added' in port_info:
+ resync_a = self.treat_devices_added(port_info['added'])
+ if 'removed' in port_info:
+ resync_b = self.treat_devices_removed(port_info['removed'])
+ # If one of the above opertaions fails => resync with plugin
+ return (resync_a | resync_b)
+
+ def tunnel_sync(self):
+ resync = False
+ try:
+ details = self.plugin_rpc.tunnel_sync(self.context, self.local_ip)
+ tunnels = details['tunnels']
+ for tunnel in tunnels:
+ if self.local_ip != tunnel['ip_address']:
+ tun_name = 'gre-%s' % tunnel['id']
+ self.tun_br.add_tunnel_port(tun_name, tunnel['ip_address'])
+ except Exception as e:
+ LOG.debug("Unable to sync tunnel IP %s: %s", self.local_ip, e)
+ resync = True
+ return resync
+
+ def rpc_loop(self):
+ sync = True
+ ports = set()
+ tunnel_sync = True
+
+ while True:
+ start = time.time()
+ if sync:
+ LOG.info("Agent out of sync with plugin!")
+ ports.clear()
+ sync = False
+
+ # Notify the plugin of tunnel IP
+ if tunnel_sync:
+ LOG.info("Agent tunnel out of sync with plugin!")
+ tunnel_sync = self.tunnel_sync()
+
+ port_info = self.update_ports(ports)
+
+ # notify plugin about port deltas
+ if port_info:
+ LOG.debug("Agent loop has new devices!")
+ # If treat devices fails - indicates must resync with plugin
+ sync = self.process_network_ports(port_info)
+ ports = port_info['current']
+
+ # sleep till end of polling interval
+ elapsed = (time.time() - start)
+ if (elapsed < self.polling_interval):
+ time.sleep(self.polling_interval - elapsed)
+ else:
+ LOG.debug("Loop iteration exceeded interval (%s vs. %s)!",
+ self.polling_interval, elapsed)
+
+ def daemon_loop(self, db_connection_url):
+ if self.rpc:
+ self.rpc_loop()
+ else:
+ self.db_loop(db_connection_url)
+
def main():
cfg.CONF(args=sys.argv, project='quantum')
polling_interval = cfg.CONF.AGENT.polling_interval
reconnect_interval = cfg.CONF.DATABASE.reconnect_interval
root_helper = cfg.CONF.AGENT.root_helper
+ rpc = cfg.CONF.AGENT.rpc
# Determine API Version to use
target_v2_api = cfg.CONF.AGENT.target_v2_api
+ # RPC only works with v2
+ if rpc and not target_v2_api:
+ rpc = False
+
if enable_tunneling:
# Get parameters for OVSQuantumTunnelAgent
tun_br = cfg.CONF.OVS.tunnel_bridge
local_ip = cfg.CONF.OVS.local_ip
plugin = OVSQuantumTunnelAgent(integ_br, tun_br, local_ip, root_helper,
polling_interval, reconnect_interval,
- target_v2_api)
+ target_v2_api, rpc)
else:
# Get parameters for OVSQuantumAgent.
plugin = OVSQuantumAgent(integ_br, root_helper, polling_interval,
- reconnect_interval, target_v2_api)
+ reconnect_interval, target_v2_api, rpc)
# Start everything.
plugin.daemon_loop(db_connection_url)
sys.exit(0)
if __name__ == "__main__":
+ eventlet.monkey_patch()
main()
from quantum.openstack.common import cfg
+# Topic for tunnel notifications between the plugin and agent
+TUNNEL = 'tunnel'
+
database_opts = [
cfg.StrOpt('sql_connection', default='sqlite://'),
cfg.IntOpt('sql_max_retries', default=-1),
cfg.IntOpt('polling_interval', default=2),
cfg.StrOpt('root_helper', default='sudo'),
cfg.StrOpt('log_file', default=None),
+ cfg.BoolOpt('rpc', default=True),
]
from sqlalchemy.orm import exc
+from quantum.api import api_common
from quantum.common import exceptions as q_exc
+from quantum.db import models_v2
import quantum.db.api as db
from quantum.openstack.common import cfg
from quantum.plugins.openvswitch import ovs_models_v2
session.delete(record)
except exc.NoResultFound:
LOG.error("vlan id %s not found in release_vlan_id" % vlan_id)
+
+
+def get_port(port_id):
+ session = db.get_session()
+ try:
+ port = session.query(models_v2.Port).filter_by(id=port_id).one()
+ except exc.NoResultFound:
+ port = None
+ return port
+
+
+def set_port_status(port_id, status):
+ session = db.get_session()
+ try:
+ port = session.query(models_v2.Port).filter_by(id=port_id).one()
+ port['status'] = status
+ if status == api_common.PORT_STATUS_DOWN:
+ port['device_id'] = ''
+ session.merge(port)
+ session.flush()
+ except exc.NoResultFound:
+ raise q_exc.PortNotFound(port_id=port_id)
+
+
+def get_tunnels():
+ session = db.get_session()
+ try:
+ tunnels = session.query(ovs_models_v2.TunnelInfo).all()
+ except exc.NoResultFound:
+ return []
+ return [{'id': tunnel.id,
+ 'ip_address': tunnel.ip_address} for tunnel in tunnels]
+
+
+def generate_tunnel_id(session):
+ try:
+ tunnels = session.query(ovs_models_v2.TunnelInfo).all()
+ except exc.NoResultFound:
+ return 0
+ tunnel_ids = ([tunnel['id'] for tunnel in tunnels])
+ if tunnel_ids:
+ id = max(tunnel_ids)
+ else:
+ id = 0
+ return id + 1
+
+
+def add_tunnel(ip):
+ session = db.get_session()
+ try:
+ tunnel = (session.query(ovs_models_v2.TunnelInfo).
+ filter_by(ip_address=ip).one())
+ except exc.NoResultFound:
+ # Generate an id for the tunnel
+ id = generate_tunnel_id(session)
+ tunnel = ovs_models_v2.TunnelInfo(ip, id)
+ session.add(tunnel)
+ session.flush()
+ return tunnel
def __repr__(self):
return "<TunnelIP(%s)>" % (self.ip_address)
+
+
+class TunnelInfo(model_base.BASEV2):
+ """Represents remote tunnel information in tunnel mode."""
+ __tablename__ = 'tunnel_info'
+
+ ip_address = Column(String(64), primary_key=True)
+ id = Column(Integer, nullable=False)
+
+ def __init__(self, ip_address, id):
+ self.ip_address = ip_address
+ self.id = id
+
+ def __repr__(self):
+ return "<TunnelInfo(%s,%s)>" % (self.ip_address, self.id)
import logging
import os
+from quantum.api import api_common
from quantum.api.v2 import attributes
from quantum.common import exceptions as q_exc
+from quantum.common import topics
from quantum.common.utils import find_config_file
from quantum.db import api as db
from quantum.db import db_base_plugin_v2
from quantum.db import models_v2
+from quantum.openstack.common import context
from quantum.openstack.common import cfg
+from quantum.openstack.common import rpc
+from quantum.openstack.common.rpc import dispatcher
+from quantum.openstack.common.rpc import proxy
from quantum.plugins.openvswitch.common import config
from quantum.plugins.openvswitch import ovs_db_v2
from quantum import policy
-LOG = logging.getLogger("ovs_quantum_plugin")
+LOG = logging.getLogger(__name__)
+
+
+class OVSRpcCallbacks():
+
+ # Set RPC API version to 1.0 by default.
+ RPC_API_VERSION = '1.0'
+
+ def __init__(self, context, notifier):
+ self.context = context
+ self.notifier = notifier
+
+ def create_rpc_dispatcher(self):
+ '''Get the rpc dispatcher for this manager.
+
+ If a manager would like to set an rpc API version, or support more than
+ one class as the target of rpc messages, override this method.
+ '''
+ return dispatcher.RpcDispatcher([self])
+
+ def get_device_details(self, context, **kwargs):
+ """Agent requests device details"""
+ agent_id = kwargs.get('agent_id')
+ device = kwargs.get('device')
+ LOG.debug("Device %s details requested from %s", device, agent_id)
+ port = ovs_db_v2.get_port(device)
+ if port:
+ vlan_id = ovs_db_v2.get_vlan(port['network_id'])
+ entry = {'device': device,
+ 'vlan_id': vlan_id,
+ 'network_id': port['network_id'],
+ 'port_id': port['id'],
+ 'admin_state_up': port['admin_state_up']}
+ # Set the port status to UP
+ ovs_db_v2.set_port_status(port['id'], api_common.PORT_STATUS_UP)
+ else:
+ entry = {'device': device}
+ LOG.debug("%s can not be found in database", device)
+ return entry
+
+ def update_device_down(self, context, **kwargs):
+ """Device no longer exists on agent"""
+ # (TODO) garyk - live migration and port status
+ agent_id = kwargs.get('agent_id')
+ device = kwargs.get('device')
+ LOG.debug("Device %s no longer exists on %s", device, agent_id)
+ port = ovs_db_v2.get_port(device)
+ if port:
+ entry = {'device': device,
+ 'exists': True}
+ # Set port status to DOWN
+ ovs_db_v2.set_port_status(port['id'], api_common.PORT_STATUS_DOWN)
+ else:
+ entry = {'device': device,
+ 'exists': False}
+ LOG.debug("%s can not be found in database", device)
+ return entry
+
+ def tunnel_sync(self, context, **kwargs):
+ """Update new tunnel.
+
+ Updates the datbase with the tunnel IP. All listening agents will also
+ be notified about the new tunnel IP.
+ """
+ tunnel_ip = kwargs.get('tunnel_ip')
+ # Update the database with the IP
+ tunnel = ovs_db_v2.add_tunnel(tunnel_ip)
+ tunnels = ovs_db_v2.get_tunnels()
+ entry = dict()
+ entry['tunnels'] = tunnels
+ # Notify all other listening agents
+ self.notifier.tunnel_update(self.context, tunnel.ip_address,
+ tunnel.id)
+ # Return the list of tunnels IP's to the agent
+ return entry
+
+
+class AgentNotifierApi(proxy.RpcProxy):
+ '''Agent side of the linux bridge rpc API.
+
+ API version history:
+ 1.0 - Initial version.
+
+ '''
+
+ BASE_RPC_API_VERSION = '1.0'
+
+ def __init__(self, topic):
+ super(AgentNotifierApi, self).__init__(
+ topic=topic, default_version=self.BASE_RPC_API_VERSION)
+ self.topic_network_delete = topics.get_topic_name(topic,
+ topics.NETWORK,
+ topics.DELETE)
+ self.topic_port_update = topics.get_topic_name(topic,
+ topics.PORT,
+ topics.UPDATE)
+ self.topic_tunnel_update = topics.get_topic_name(topic,
+ config.TUNNEL,
+ topics.UPDATE)
+
+ def network_delete(self, context, network_id):
+ self.fanout_cast(context,
+ self.make_msg('network_delete',
+ network_id=network_id),
+ topic=self.topic_network_delete)
+
+ def port_update(self, context, port, vlan_id):
+ self.fanout_cast(context,
+ self.make_msg('port_update',
+ port=port,
+ vlan_id=vlan_id),
+ topic=self.topic_port_update)
+
+ def tunnel_update(self, context, tunnel_ip, tunnel_id):
+ self.fanout_cast(context,
+ self.make_msg('tunnel_update',
+ tunnel_ip=tunnel_ip,
+ tunnel_id=tunnel_id),
+ topic=self.topic_tunnel_update)
class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2):
# update the vlan_id table based on current configuration
ovs_db_v2.update_vlan_id_pool()
+ self.rpc = cfg.CONF.AGENT.rpc
+ if cfg.CONF.AGENT.rpc and cfg.CONF.AGENT.target_v2_api:
+ self.setup_rpc()
+ if not cfg.CONF.AGENT.target_v2_api:
+ self.rpc = False
+
+ def setup_rpc(self):
+ # RPC support
+ self.topic = topics.PLUGIN
+ self.context = context.RequestContext('quantum', 'quantum',
+ is_admin=False)
+ self.conn = rpc.create_connection(new=True)
+ self.notifier = AgentNotifierApi(topics.AGENT)
+ self.callbacks = OVSRpcCallbacks(self.context, self.notifier)
+ self.dispatcher = self.callbacks.create_rpc_dispatcher()
+ self.conn.create_consumer(self.topic, self.dispatcher,
+ fanout=False)
+ # Consume from all consumers in a thread
+ self.conn.consume_in_thread()
# TODO(rkukura) Use core mechanism for attribute authorization
# when available.
vlan_id = ovs_db_v2.get_vlan(id)
result = super(OVSQuantumPluginV2, self).delete_network(context, id)
ovs_db_v2.release_vlan_id(vlan_id)
+ if self.rpc:
+ self.notifier.network_delete(self.context, id)
return result
def get_network(self, context, id, fields=None, verbose=None):
self._extend_network_dict(context, net)
# TODO(rkukura): Filter on extended attributes.
return [self._fields(net, fields) for net in nets]
+
+ def update_port(self, context, id, port):
+ if self.rpc:
+ original_port = super(OVSQuantumPluginV2, self).get_port(context,
+ id)
+ port = super(OVSQuantumPluginV2, self).update_port(context, id, port)
+ if self.rpc:
+ if original_port['admin_state_up'] != port['admin_state_up']:
+ vlan_id = ovs_db_v2.get_vlan(port['network_id'])
+ self.notifier.port_update(self.context, port, vlan_id)
+ return port
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012, Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Unit Tests for openvswitch rpc
+"""
+
+import stubout
+import unittest2
+
+from quantum.agent import rpc as agent_rpc
+from quantum.common import topics
+from quantum.openstack.common import context
+from quantum.openstack.common import rpc
+from quantum.plugins.openvswitch import ovs_quantum_plugin as povs
+from quantum.plugins.openvswitch.common import config
+
+
+class rpcApiTestCase(unittest2.TestCase):
+
+ def _test_ovs_api(self, rpcapi, topic, method, rpc_method, **kwargs):
+ ctxt = context.RequestContext('fake_user', 'fake_project')
+ expected_retval = 'foo' if method == 'call' else None
+ expected_msg = rpcapi.make_msg(method, **kwargs)
+ expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION
+ if rpc_method == 'cast' and method == 'run_instance':
+ kwargs['call'] = False
+
+ self.fake_args = None
+ self.fake_kwargs = None
+
+ def _fake_rpc_method(*args, **kwargs):
+ self.fake_args = args
+ self.fake_kwargs = kwargs
+ if expected_retval:
+ return expected_retval
+
+ self.stubs = stubout.StubOutForTesting()
+ self.stubs.Set(rpc, rpc_method, _fake_rpc_method)
+
+ retval = getattr(rpcapi, method)(ctxt, **kwargs)
+
+ self.assertEqual(retval, expected_retval)
+ expected_args = [ctxt, topic, expected_msg]
+
+ for arg, expected_arg in zip(self.fake_args, expected_args):
+ self.assertEqual(arg, expected_arg)
+
+ def test_delete_network(self):
+ rpcapi = povs.AgentNotifierApi(topics.AGENT)
+ self._test_ovs_api(rpcapi,
+ topics.get_topic_name(topics.AGENT,
+ topics.NETWORK,
+ topics.DELETE),
+ 'network_delete', rpc_method='fanout_cast',
+ network_id='fake_request_spec')
+
+ def test_port_update(self):
+ rpcapi = povs.AgentNotifierApi(topics.AGENT)
+ self._test_ovs_api(rpcapi,
+ topics.get_topic_name(topics.AGENT,
+ topics.PORT,
+ topics.UPDATE),
+ 'port_update', rpc_method='fanout_cast',
+ port='fake_port', vlan_id='fake_vlan_id')
+
+ def test_tunnel_update(self):
+ rpcapi = povs.AgentNotifierApi(topics.AGENT)
+ self._test_ovs_api(rpcapi,
+ topics.get_topic_name(topics.AGENT,
+ config.TUNNEL,
+ topics.UPDATE),
+ 'tunnel_update', rpc_method='fanout_cast',
+ tunnel_ip='fake_ip', tunnel_id='fake_id')
+
+ def test_device_details(self):
+ rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
+ self._test_ovs_api(rpcapi, topics.PLUGIN,
+ 'get_device_details', rpc_method='call',
+ device='fake_device',
+ agent_id='fake_agent_id')
+
+ def test_update_device_down(self):
+ rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
+ self._test_ovs_api(rpcapi, topics.PLUGIN,
+ 'update_device_down', rpc_method='call',
+ device='fake_device',
+ agent_id='fake_agent_id')
+
+ def test_tunnel_sync(self):
+ rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
+ self._test_ovs_api(rpcapi, topics.PLUGIN,
+ 'tunnel_sync', rpc_method='call',
+ tunnel_ip='fake_tunnel_ip')
b = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE,
self.TUN_BRIDGE,
'10.0.0.1',
- 'sudo', 2, 2)
+ 'sudo', 2, 2, False, False)
self.mox.VerifyAll()
def testProvisionLocalVlan(self):
a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE,
self.TUN_BRIDGE,
'10.0.0.1',
- 'sudo', 2, 2)
+ 'sudo', 2, 2, False, False)
a.available_local_vlans = set([LV_ID])
a.provision_local_vlan(NET_UUID, LS_ID)
self.mox.VerifyAll()
a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE,
self.TUN_BRIDGE,
'10.0.0.1',
- 'sudo', 2, 2)
+ 'sudo', 2, 2, False, False)
a.available_local_vlans = set()
a.local_vlan_map[NET_UUID] = LVM
a.reclaim_local_vlan(NET_UUID, LVM)
a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE,
self.TUN_BRIDGE,
'10.0.0.1',
- 'sudo', 2, 2)
+ 'sudo', 2, 2, False, False)
a.local_vlan_map[NET_UUID] = LVM
a.port_bound(VIF_PORT, NET_UUID, LS_ID)
self.mox.VerifyAll()
a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE,
self.TUN_BRIDGE,
'10.0.0.1',
- 'sudo', 2, 2)
+ 'sudo', 2, 2, False, False)
a.available_local_vlans = set([LV_ID])
a.local_vlan_map[NET_UUID] = LVM
a.port_unbound(VIF_PORT, NET_UUID)
a = ovs_quantum_agent.OVSQuantumTunnelAgent(self.INT_BRIDGE,
self.TUN_BRIDGE,
'10.0.0.1',
- 'sudo', 2, 2)
+ 'sudo', 2, 2, False, False)
a.available_local_vlans = set([LV_ID])
a.local_vlan_map[NET_UUID] = LVM
a.port_dead(VIF_PORT)
Paste
PasteDeploy==1.5.0
Routes>=1.12.3
-eventlet>=0.9.12
+eventlet>=0.9.17
httplib2
iso8601>=0.1.4
lxml