# root filter facility.
# Change to "sudo" to skip the filtering and just run the comand directly
root_helper = "sudo"
-# Use RPC messaging to interface between agent and plugin
-# rpc = True
return dispatcher.RpcDispatcher([self])
-class LinuxBridgeQuantumAgentDB:
-
- def __init__(self, interface_mappings, polling_interval,
- reconnect_interval, root_helper, db_connection_url):
- self.polling_interval = polling_interval
- self.root_helper = root_helper
- self.setup_linux_bridge(interface_mappings)
- self.reconnect_interval = reconnect_interval
- self.db_connected = False
- self.db_connection_url = db_connection_url
-
- def setup_linux_bridge(self, interface_mappings):
- self.linux_br = LinuxBridge(interface_mappings, self.root_helper)
-
- def process_port_binding(self, network_id, interface_id,
- physical_network, vlan_id):
- return self.linux_br.add_interface(network_id,
- physical_network, vlan_id,
- interface_id)
-
- def remove_port_binding(self, network_id, interface_id):
- bridge_name = self.linux_br.get_bridge_name(network_id)
- tap_device_name = self.linux_br.get_tap_device_name(interface_id)
- return self.linux_br.remove_interface(bridge_name, tap_device_name)
-
- def process_unplugged_interfaces(self, plugged_interfaces):
- """
- If there are any tap devices that are not corresponding to the
- list of attached VIFs, then those are corresponding to recently
- unplugged VIFs, so we need to remove those tap devices from their
- current bridge association
- """
- plugged_tap_device_names = []
- plugged_gateway_device_names = []
- for interface in plugged_interfaces:
- if interface.startswith(GATEWAY_INTERFACE_PREFIX):
- """
- The name for the gateway devices is set by the linux net
- driver, hence we use the name as is
- """
- plugged_gateway_device_names.append(interface)
- else:
- tap_device_name = self.linux_br.get_tap_device_name(interface)
- plugged_tap_device_names.append(tap_device_name)
-
- LOG.debug("plugged tap device names %s" % plugged_tap_device_names)
- for tap_device in self.linux_br.get_all_tap_devices():
- if tap_device not in plugged_tap_device_names:
- current_bridge_name = (
- self.linux_br.get_bridge_for_tap_device(tap_device))
- if current_bridge_name:
- self.linux_br.remove_interface(current_bridge_name,
- tap_device)
-
- for gw_device in self.linux_br.get_all_gateway_devices():
- if gw_device not in plugged_gateway_device_names:
- current_bridge_name = (
- self.linux_br.get_bridge_for_tap_device(gw_device))
- if current_bridge_name:
- self.linux_br.remove_interface(current_bridge_name,
- gw_device)
-
- def process_deleted_networks(self, vlan_bindings):
- current_quantum_networks = vlan_bindings.keys()
- current_quantum_bridge_names = []
- for network in current_quantum_networks:
- bridge_name = self.linux_br.get_bridge_name(network)
- current_quantum_bridge_names.append(bridge_name)
-
- quantum_bridges_on_this_host = self.linux_br.get_all_quantum_bridges()
- for bridge in quantum_bridges_on_this_host:
- if bridge not in current_quantum_bridge_names:
- self.linux_br.delete_vlan_bridge(bridge)
-
- def manage_networks_on_host(self, db,
- old_vlan_bindings,
- old_port_bindings):
- vlan_bindings = {}
- try:
- network_binds = db.network_bindings.all()
- except Exception as e:
- LOG.info("Unable to get network bindings! Exception: %s" % e)
- self.db_connected = False
- return {VLAN_BINDINGS: {},
- PORT_BINDINGS: []}
-
- vlans_string = ""
- for bind in network_binds:
- entry = {'network_id': bind.network_id,
- 'physical_network': bind.physical_network,
- 'vlan_id': bind.vlan_id}
- vlan_bindings[bind.network_id] = entry
- vlans_string = "%s %s" % (vlans_string, entry)
-
- port_bindings = []
- try:
- port_binds = db.ports.all()
- except Exception as e:
- LOG.info("Unable to get port bindings! Exception: %s" % e)
- self.db_connected = False
- return {VLAN_BINDINGS: {},
- PORT_BINDINGS: []}
-
- all_bindings = {}
- for bind in port_binds:
- append_entry = False
- all_bindings[bind.id] = bind
- entry = {'network_id': bind.network_id,
- 'uuid': bind.id,
- 'status': bind.status,
- 'interface_id': bind.id}
- append_entry = bind.admin_state_up
- if append_entry:
- port_bindings.append(entry)
-
- plugged_interfaces = []
- ports_string = ""
- for pb in port_bindings:
- ports_string = "%s %s" % (ports_string, pb)
- port_id = pb['uuid']
- interface_id = pb['interface_id']
- network_id = pb['network_id']
-
- physical_network = vlan_bindings[network_id]['physical_network']
- vlan_id = str(vlan_bindings[network_id]['vlan_id'])
- if self.process_port_binding(network_id,
- interface_id,
- physical_network,
- vlan_id):
- all_bindings[port_id].status = constants.PORT_STATUS_ACTIVE
-
- plugged_interfaces.append(interface_id)
-
- if old_port_bindings != port_bindings:
- LOG.debug("Port-bindings: %s" % ports_string)
-
- self.process_unplugged_interfaces(plugged_interfaces)
-
- if old_vlan_bindings != vlan_bindings:
- LOG.debug("VLAN-bindings: %s" % vlans_string)
-
- self.process_deleted_networks(vlan_bindings)
-
- try:
- db.commit()
- except Exception as e:
- LOG.info("Unable to update database! Exception: %s" % e)
- db.rollback()
- vlan_bindings = {}
- port_bindings = []
-
- return {VLAN_BINDINGS: vlan_bindings,
- PORT_BINDINGS: port_bindings}
-
- def daemon_loop(self):
- old_vlan_bindings = {}
- old_port_bindings = []
- self.db_connected = False
-
- while True:
- if not self.db_connected:
- time.sleep(self.reconnect_interval)
- db = SqlSoup(self.db_connection_url)
- self.db_connected = True
- LOG.info("Connecting to database \"%s\" on %s" %
- (db.engine.url.database, db.engine.url.host))
- bindings = self.manage_networks_on_host(db,
- old_vlan_bindings,
- old_port_bindings)
- old_vlan_bindings = bindings[VLAN_BINDINGS]
- old_port_bindings = bindings[PORT_BINDINGS]
- time.sleep(self.polling_interval)
-
-
class LinuxBridgeQuantumAgentRPC:
def __init__(self, interface_mappings, polling_interval,
sys.exit(1)
polling_interval = cfg.CONF.AGENT.polling_interval
- reconnect_interval = cfg.CONF.DATABASE.reconnect_interval
root_helper = cfg.CONF.AGENT.root_helper
- rpc = cfg.CONF.AGENT.rpc
- if rpc:
- plugin = LinuxBridgeQuantumAgentRPC(interface_mappings,
- polling_interval,
- root_helper)
- else:
- db_connection_url = cfg.CONF.DATABASE.sql_connection
- plugin = LinuxBridgeQuantumAgentDB(interface_mappings,
- polling_interval,
- reconnect_interval,
- root_helper,
- db_connection_url)
+ plugin = LinuxBridgeQuantumAgentRPC(interface_mappings,
+ polling_interval,
+ root_helper)
LOG.info("Agent initialized successfully, now running... ")
plugin.daemon_loop()
sys.exit(0)
agent_opts = [
cfg.IntOpt('polling_interval', default=2),
cfg.StrOpt('root_helper', default='sudo'),
- cfg.BoolOpt('rpc', default=True),
]
"Service terminated!" %
self.tenant_network_type)
sys.exit(1)
- self.agent_rpc = cfg.CONF.AGENT.rpc
self._setup_rpc()
LOG.debug("Linux Bridge Plugin initialization complete")
binding.vlan_id, self.network_vlan_ranges)
# the network_binding record is deleted via cascade from
# the network record, so explicit removal is not necessary
- if self.agent_rpc:
- self.notifier.network_delete(self.rpc_context, id)
+ self.notifier.network_delete(self.rpc_context, id)
def get_network(self, context, id, fields=None):
net = super(LinuxBridgePluginV2, self).get_network(context, id, None)
return [self._fields(net, fields) for net in nets]
def update_port(self, context, id, port):
- if self.agent_rpc:
- original_port = super(LinuxBridgePluginV2, self).get_port(context,
- id)
+ original_port = super(LinuxBridgePluginV2, self).get_port(context,
+ id)
port = super(LinuxBridgePluginV2, self).update_port(context, id, port)
- if self.agent_rpc:
- if original_port['admin_state_up'] != port['admin_state_up']:
- binding = db.get_network_binding(context.session,
- port['network_id'])
- self.notifier.port_update(self.rpc_context, port,
- binding.physical_network,
- binding.vlan_id)
+ if original_port['admin_state_up'] != port['admin_state_up']:
+ binding = db.get_network_binding(context.session,
+ port['network_id'])
+ self.notifier.port_update(self.rpc_context, port,
+ binding.physical_network,
+ binding.vlan_id)
return port
def delete_port(self, context, id, l3_port_check=True):
def __init__(self, integ_br, tun_br, local_ip,
bridge_mappings, root_helper,
- polling_interval, reconnect_interval, rpc, enable_tunneling):
+ polling_interval, enable_tunneling):
'''Constructor.
:param integ_br: name of the integration bridge.
:param bridge_mappings: mappings from phyiscal interface to bridge.
:param root_helper: utility to use when running shell cmds.
:param polling_interval: interval (secs) to poll DB.
- :param reconnect_internal: retry interval (secs) on DB error.
- :param rpc: if True use RPC interface to interface with plugin.
:param enable_tunneling: if True enable GRE networks.
'''
self.root_helper = root_helper
self.local_vlan_map = {}
self.polling_interval = polling_interval
- self.reconnect_interval = reconnect_interval
self.enable_tunneling = enable_tunneling
self.local_ip = local_ip
if self.enable_tunneling:
self.setup_tunnel_br(tun_br)
- self.rpc = rpc
- if rpc:
- self.setup_rpc(integ_br)
+ self.setup_rpc(integ_br)
def setup_rpc(self, integ_br):
mac = utils.get_interface_mac(integ_br)
int_veth.link.set_up()
phys_veth.link.set_up()
- def manage_tunnels(self, tunnel_ips, old_tunnel_ips, db):
- if self.local_ip in tunnel_ips:
- tunnel_ips.remove(self.local_ip)
- else:
- db.ovs_tunnel_ips.insert(ip_address=self.local_ip)
-
- new_tunnel_ips = tunnel_ips - old_tunnel_ips
- if new_tunnel_ips:
- LOG.info("Adding tunnels to: %s", new_tunnel_ips)
- for ip in new_tunnel_ips:
- tun_name = "gre-" + str(self.tunnel_count)
- self.tun_br.add_tunnel_port(tun_name, ip)
- self.tunnel_count += 1
-
- def rollback_until_success(self, db):
- while True:
- time.sleep(self.reconnect_interval)
- try:
- db.rollback()
- break
- except:
- LOG.exception("Problem connecting to database")
-
- def db_loop(self, db_connection_url):
- '''Main processing loop for Tunneling Agent.
-
- :param options: database information - in the event need to reconnect
- '''
- old_local_bindings = {}
- old_vif_ports = {}
- old_tunnel_ips = set()
-
- db = sqlsoup.SqlSoup(db_connection_url)
- LOG.info("Connecting to database \"%s\" on %s",
- db.engine.url.database, db.engine.url.host)
-
- while True:
- try:
- all_bindings = dict((p.id, Port(p))
- for p in db.ports.all())
- all_bindings_vif_port_ids = set(all_bindings)
- net_bindings = dict((bind.network_id, bind)
- for bind in
- db.ovs_network_bindings.all())
-
- if self.enable_tunneling:
- tunnel_ips = set(x.ip_address for x in
- db.ovs_tunnel_ips.all())
- self.manage_tunnels(tunnel_ips, old_tunnel_ips, db)
-
- # Get bindings from OVS bridge.
- vif_ports = self.int_br.get_vif_ports()
- new_vif_ports = dict([(p.vif_id, p) for p in vif_ports])
- new_vif_ports_ids = set(new_vif_ports.keys())
-
- old_vif_ports_ids = set(old_vif_ports.keys())
- dead_vif_ports_ids = (new_vif_ports_ids -
- all_bindings_vif_port_ids)
- dead_vif_ports = [new_vif_ports[p] for p in dead_vif_ports_ids]
- disappeared_vif_ports_ids = (old_vif_ports_ids -
- new_vif_ports_ids)
- new_local_bindings_ids = (all_bindings_vif_port_ids.
- intersection(new_vif_ports_ids))
- new_local_bindings = dict([(p, all_bindings.get(p))
- for p in new_vif_ports_ids])
- new_bindings = set(
- (p, old_local_bindings.get(p),
- new_local_bindings.get(p)) for p in new_vif_ports_ids)
- changed_bindings = set([b for b in new_bindings
- if b[2] != b[1]])
-
- LOG.debug('all_bindings: %s', all_bindings)
- LOG.debug('net_bindings: %s', net_bindings)
- LOG.debug('new_vif_ports_ids: %s', new_vif_ports_ids)
- LOG.debug('dead_vif_ports_ids: %s', dead_vif_ports_ids)
- LOG.debug('old_vif_ports_ids: %s', old_vif_ports_ids)
- LOG.debug('new_local_bindings_ids: %s',
- new_local_bindings_ids)
- LOG.debug('new_local_bindings: %s', new_local_bindings)
- LOG.debug('new_bindings: %s', new_bindings)
- LOG.debug('changed_bindings: %s', changed_bindings)
-
- # Take action.
- for p in dead_vif_ports:
- LOG.info("No quantum binding for port " + str(p)
- + "putting on dead vlan")
- self.port_dead(p)
-
- for b in changed_bindings:
- port_id, old_port, new_port = b
- p = new_vif_ports[port_id]
- if old_port:
- old_net_uuid = old_port.network_id
- LOG.info("Removing binding to net-id = " +
- old_net_uuid + " for " + str(p)
- + " added to dead vlan")
- self.port_unbound(p.vif_id, old_net_uuid)
- if p.vif_id in all_bindings:
- all_bindings[p.vif_id].status = (
- q_const.PORT_STATUS_DOWN)
- if not new_port:
- self.port_dead(p)
-
- if new_port:
- new_net_uuid = new_port.network_id
- if new_net_uuid not in net_bindings:
- LOG.warn("No network binding found for net-id"
- " '%s'", new_net_uuid)
- continue
-
- bind = net_bindings[new_net_uuid]
- self.port_bound(p, new_net_uuid,
- bind.network_type,
- bind.physical_network,
- bind.segmentation_id)
- all_bindings[p.vif_id].status = (
- q_const.PORT_STATUS_ACTIVE)
- LOG.info("Port %s on net-id = %s bound to %s ",
- str(p), new_net_uuid,
- str(self.local_vlan_map[new_net_uuid]))
-
- for vif_id in disappeared_vif_ports_ids:
- LOG.info("Port Disappeared: " + vif_id)
- if vif_id in all_bindings:
- all_bindings[vif_id].status = (
- q_const.PORT_STATUS_DOWN)
- old_port = old_local_bindings.get(vif_id)
- if old_port:
- self.port_unbound(vif_id, old_port.network_id)
- # commit any DB changes and expire
- # data loaded from the database
- db.commit()
-
- # sleep and re-initialize state for next pass
- time.sleep(self.polling_interval)
- if self.enable_tunneling:
- old_tunnel_ips = tunnel_ips
- old_vif_ports = new_vif_ports
- old_local_bindings = new_local_bindings
-
- except:
- LOG.exception("Main-loop Exception:")
- self.rollback_until_success(db)
-
def update_ports(self, registered_ports):
ports = self.int_br.get_vif_port_set()
if ports == registered_ports:
LOG.debug("Loop iteration exceeded interval (%s vs. %s)!",
self.polling_interval, elapsed)
- def daemon_loop(self, db_connection_url):
- if self.rpc:
- self.rpc_loop()
- else:
- self.db_loop(db_connection_url)
+ def daemon_loop(self):
+ self.rpc_loop()
def parse_bridge_mappings(bridge_mapping_list):
bridge_mappings=bridge_mappings,
root_helper=config.AGENT.root_helper,
polling_interval=config.AGENT.polling_interval,
- reconnect_interval=config.DATABASE.reconnect_interval,
- rpc=config.AGENT.rpc,
enable_tunneling=config.OVS.enable_tunneling,
)
# Start everything.
LOG.info("Agent initialized successfully, now running... ")
- plugin.daemon_loop(cfg.CONF.DATABASE.sql_connection)
-
+ plugin.daemon_loop()
sys.exit(0)
agent_opts = [
cfg.IntOpt('polling_interval', default=2),
cfg.StrOpt('root_helper', default='sudo'),
- cfg.BoolOpt('rpc', default=True),
]
LOG.error("Tunneling disabled but tenant_network_type is 'gre'. "
"Agent terminated!")
sys.exit(1)
- self.agent_rpc = cfg.CONF.AGENT.rpc
self.setup_rpc()
def setup_rpc(self):
self.network_vlan_ranges)
# the network_binding record is deleted via cascade from
# the network record, so explicit removal is not necessary
- if self.agent_rpc:
- self.notifier.network_delete(self.rpc_context, id)
+ self.notifier.network_delete(self.rpc_context, id)
def get_network(self, context, id, fields=None):
net = super(OVSQuantumPluginV2, self).get_network(context, id, None)
return [self._fields(net, fields) for net in nets]
def update_port(self, context, id, port):
- if self.agent_rpc:
- original_port = super(OVSQuantumPluginV2, self).get_port(context,
- id)
+ original_port = super(OVSQuantumPluginV2, self).get_port(context,
+ id)
port = super(OVSQuantumPluginV2, self).update_port(context, id, port)
- if self.agent_rpc:
- if original_port['admin_state_up'] != port['admin_state_up']:
- binding = ovs_db_v2.get_network_binding(None,
- port['network_id'])
- self.notifier.port_update(self.rpc_context, port,
- binding.network_type,
- binding.segmentation_id,
- binding.physical_network)
+ if original_port['admin_state_up'] != port['admin_state_up']:
+ binding = ovs_db_v2.get_network_binding(None,
+ port['network_id'])
+ self.notifier.port_update(self.rpc_context, port,
+ binding.network_type,
+ binding.segmentation_id,
+ binding.physical_network)
return port
def delete_port(self, context, id, l3_port_check=True):
self.assertEqual(2, cfg.CONF.DATABASE.reconnect_interval)
self.assertEqual(2, cfg.CONF.AGENT.polling_interval)
self.assertEqual('sudo', cfg.CONF.AGENT.root_helper)
- self.assertTrue(cfg.CONF.AGENT.rpc)
self.assertEqual('local', cfg.CONF.OVS.tenant_network_type)
self.assertEqual(0, len(cfg.CONF.OVS.bridge_mappings))
self.assertEqual(0, len(cfg.CONF.OVS.network_vlan_ranges))
def setUp(self):
self.addCleanup(cfg.CONF.reset)
# Avoid rpc initialization for unit tests
- cfg.CONF.set_override('rpc', False, group='AGENT')
+ cfg.CONF.set_override('rpc_backend',
+ 'quantum.openstack.common.rpc.impl_fake')
kwargs = ovs_quantum_agent.create_agent_config_map(cfg.CONF)
with mock.patch('quantum.plugins.openvswitch.agent.ovs_quantum_agent.'
'OVSQuantumAgent.setup_integration_br',
return_value=mock.Mock()):
- self.agent = ovs_quantum_agent.OVSQuantumAgent(**kwargs)
+ with mock.patch('quantum.agent.linux.utils.get_interface_mac',
+ return_value='000000000001'):
+ self.agent = ovs_quantum_agent.OVSQuantumAgent(**kwargs)
self.agent.plugin_rpc = mock.Mock()
self.agent.context = mock.Mock()
self.agent.agent_id = mock.Mock()
port.ofport = ofport
net_uuid = 'my-net-uuid'
with mock.patch.object(self.agent.int_br,
- 'set_db_attribute') as db_func:
- with mock.patch.object(self.agent.int_br,
- 'delete_flows') as delete_flows_func:
- self.agent.port_bound(port, net_uuid, 'local', None, None)
- self.assertTrue(db_func.called)
+ 'delete_flows') as delete_flows_func:
+ self.agent.port_bound(port, net_uuid, 'local', None, None)
self.assertEqual(delete_flows_func.called, ofport != -1)
def test_port_bound_deletes_flows_for_valid_ofport(self):
def test_port_dead(self):
with mock.patch.object(self.agent.int_br,
- 'set_db_attribute') as db_func:
- with mock.patch.object(self.agent.int_br,
- 'add_flow') as add_flow_func:
- self.agent.port_dead(mock.Mock())
- self.assertTrue(db_func.called)
+ 'add_flow') as add_flow_func:
+ self.agent.port_dead(mock.Mock())
self.assertTrue(add_flow_func.called)
def mock_update_ports(self, vif_port_set=None, registered_ports=None):
import mox
from quantum.agent.linux import ovs_lib
+from quantum.agent.linux import utils
from quantum.plugins.openvswitch.agent import ovs_quantum_agent
# Useful global dummy variables.
self.mock_tun_bridge.remove_all_flows()
self.mock_tun_bridge.add_flow(priority=1, actions='drop')
+ self.mox.StubOutWithMock(utils, 'get_interface_mac')
+ utils.get_interface_mac(self.INT_BRIDGE).AndReturn('000000000001')
+
def tearDown(self):
self.mox.UnsetStubs()
b = ovs_quantum_agent.OVSQuantumAgent(self.INT_BRIDGE,
self.TUN_BRIDGE,
'10.0.0.1', {},
- 'sudo', 2, 2, False, True)
+ 'sudo', 2, True)
self.mox.VerifyAll()
def testProvisionLocalVlan(self):
a = ovs_quantum_agent.OVSQuantumAgent(self.INT_BRIDGE,
self.TUN_BRIDGE,
'10.0.0.1', {},
- 'sudo', 2, 2, False, True)
+ 'sudo', 2, True)
a.available_local_vlans = set([LV_ID])
a.provision_local_vlan(NET_UUID, 'gre', None, LS_ID)
self.mox.VerifyAll()
a = ovs_quantum_agent.OVSQuantumAgent(self.INT_BRIDGE,
self.TUN_BRIDGE,
'10.0.0.1', {},
- 'sudo', 2, 2, False, True)
+ 'sudo', 2, True)
a.available_local_vlans = set()
a.local_vlan_map[NET_UUID] = LVM
a.reclaim_local_vlan(NET_UUID, LVM)
a = ovs_quantum_agent.OVSQuantumAgent(self.INT_BRIDGE,
self.TUN_BRIDGE,
'10.0.0.1', {},
- 'sudo', 2, 2, False, True)
+ 'sudo', 2, True)
a.local_vlan_map[NET_UUID] = LVM
a.port_bound(VIF_PORT, NET_UUID, 'gre', None, LS_ID)
self.mox.VerifyAll()
a = ovs_quantum_agent.OVSQuantumAgent(self.INT_BRIDGE,
self.TUN_BRIDGE,
'10.0.0.1', {},
- 'sudo', 2, 2, False, True)
+ 'sudo', 2, True)
a.available_local_vlans = set([LV_ID])
a.local_vlan_map[NET_UUID] = LVM
a.port_unbound(VIF_ID, NET_UUID)
a = ovs_quantum_agent.OVSQuantumAgent(self.INT_BRIDGE,
self.TUN_BRIDGE,
'10.0.0.1', {},
- 'sudo', 2, 2, False, True)
+ 'sudo', 2, True)
a.available_local_vlans = set([LV_ID])
a.local_vlan_map[NET_UUID] = LVM
a.port_dead(VIF_PORT)