import sys
import time
-from sqlalchemy.ext.sqlsoup import SqlSoup
+import sqlalchemy
+from sqlalchemy.ext import sqlsoup
+
+from quantum.plugins.openvswitch import ovs_models
logging.basicConfig()
LOG = logging.getLogger(__name__)
full_args = ["ovs-ofctl", cmd, self.br_name] + args
return self.run_cmd(full_args)
+ def count_flows(self):
+ flow_list = self.run_ofctl("dump-flows", []).split("\n")[1:]
+ return len(flow_list) - 1
+
def remove_all_flows(self):
self.run_ofctl("del-flows", [])
return "lv-id = %s ls-id = %s" % (self.vlan, self.lsw_id)
+class Port(object):
+ '''class stores port data in an ORM-free way,
+ so attributes are still available even if a
+ row has been deleted.
+ '''
+
+ def __init__(self, p):
+ self.uuid = p.uuid
+ self.network_id = p.network_id
+ self.interface_id = p.interface_id
+ self.state = p.state
+ self.op_status = p.op_status
+
+ def __eq__(self, other):
+ '''compare only fields that will cause us to re-wire
+ '''
+ try:
+ return (self and other
+ and self.interface_id == other.interface_id
+ and self.state == other.state)
+ except:
+ return False
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ def __hash__(self):
+ return hash(self.uuid)
+
+
class OVSQuantumAgent(object):
def __init__(self, integ_br, root_helper,
self.int_br.add_flow(priority=1, actions="normal")
def daemon_loop(self, db_connection_url):
+ '''Main processing loop for Non-Tunneling Agent.
+
+ :param options: database information - in the event need to reconnect
+ '''
self.local_vlan_map = {}
old_local_bindings = {}
old_vif_ports = {}
while True:
if not db_connected:
time.sleep(self.reconnect_interval)
- db = SqlSoup(db_connection_url)
+ db = sqlsoup.SqlSoup(db_connection_url)
db_connected = True
LOG.info("Connecting to database \"%s\" on %s" %
(db.engine.url.database, db.engine.url.host))
# Upper bound on available vlans.
MAX_VLAN_TAG = 4094
- def __init__(self, integ_br, tun_br, remote_ip_file, local_ip,
- root_helper, polling_interval, reconnect_interval):
+ def __init__(self, integ_br, tun_br, local_ip, root_helper,
+ polling_interval, reconnect_interval):
'''Constructor.
:param integ_br: name of the integration bridge.
:param tun_br: name of the tunnel bridge.
- :param remote_ip_file: name of file containing list of hypervisor IPs.
- :param local_ip: local IP address of this hypervisor.'''
+ :param local_ip: local IP address of this hypervisor.
+ :param root_helper: utility to use when running shell cmds.
+ :param polling_interval: interval (secs) to poll DB.
+ :param reconnect_internal: retry interval (secs) on DB error.'''
self.root_helper = root_helper
self.available_local_vlans = set(
xrange(OVSQuantumTunnelAgent.MIN_VLAN_TAG,
OVSQuantumTunnelAgent.MAX_VLAN_TAG))
self.setup_integration_br(integ_br)
self.local_vlan_map = {}
- self.setup_tunnel_br(tun_br, remote_ip_file, local_ip)
- self.db_connected = False
+
self.polling_interval = polling_interval
self.reconnect_interval = reconnect_interval
+ self.local_ip = local_ip
+ self.tunnel_count = 0
+ self.setup_tunnel_br(tun_br)
+
def provision_local_vlan(self, net_uuid, lsw_id):
'''Provisions a local VLAN.
# outbound
self.tun_br.add_flow(priority=4, match="in_port=%s,dl_vlan=%s" %
(self.patch_int_ofport, lvid),
- actions="set_tunnel:%s,normal" % (lsw_id))
-
+ actions="strip_vlan,set_tunnel:%s,normal" %
+ (lsw_id))
# inbound
self.tun_br.add_flow(priority=3, match="tun_id=%s" % lsw_id,
actions="mod_vlan_vid:%s,output:%s" %
# switch all traffic using L2 learning
self.int_br.add_flow(priority=1, actions="normal")
- def setup_tunnel_br(self, tun_br, remote_ip_file, local_ip):
+ def setup_tunnel_br(self, tun_br):
'''Setup the tunnel bridge.
- Reads in list of IP addresses. Creates GRE tunnels to each of these
- addresses and then clears out existing flows. local_ip is the address
- of the local node. A tunnel is not created to this IP address.
+ Creates tunnel bridge, and links it to the integration bridge
+ using a patch port.
- :param tun_br: the name of the tunnel bridge.
- :param remote_ip_file: path to file that contains list of destination
- IP addresses.
- :param local_ip: the ip address of this node.'''
+ :param tun_br: the name of the tunnel bridge.'''
self.tun_br = OVSBridge(tun_br, self.root_helper)
self.tun_br.reset_bridge()
self.patch_int_ofport = self.tun_br.add_patch_port("patch-int",
"patch-tun")
- try:
- with open(remote_ip_file, 'r') as f:
- remote_ip_list = f.readlines()
- clean_ips = (x.rstrip() for x in remote_ip_list)
- tunnel_ips = (x for x in clean_ips if x != local_ip and x)
- for i, remote_ip in enumerate(tunnel_ips):
- self.tun_br.add_tunnel_port("gre-" + str(i), remote_ip)
- except Exception as e:
- LOG.error("Error configuring tunnels: '%s' %s" %
- (remote_ip_file, str(e)))
- raise
-
self.tun_br.remove_all_flows()
- # default drop
self.tun_br.add_flow(priority=1, actions="drop")
- def get_db_port_bindings(self, db):
- '''Get database port bindings from central Quantum database.
-
- The central quantum database 'ovs_quantum' resides on the openstack
- mysql server.
-
- :returns: a dictionary containing port bindings.'''
- ports = []
- try:
- ports = db.ports.all()
- except Exceptioni as e:
- LOG.info("Unable to get port bindings! Exception: %s" % e)
- self.db_connected = False
- return {}
-
- return dict([(port.interface_id, port) for port in ports])
-
- def get_db_vlan_bindings(self, db):
- '''Get database vlan bindings from central Quantum database.
-
- The central quantum database 'ovs_quantum' resides on the openstack
- mysql server.
-
- :returns: a dictionary containing vlan bindings.'''
- lsw_id_binds = []
- try:
- lsw_id_binds.extend(db.vlan_bindings.all())
- except Exception as e:
- LOG.info("Unable to get vlan bindings! Exception: %s" % e)
- self.db_connected = False
- return {}
-
- return dict([(bind.network_id, bind.vlan_id)
- for bind in lsw_id_binds])
+ def manage_tunnels(self, tunnel_ips, old_tunnel_ips, db):
+ if self.local_ip in tunnel_ips:
+ tunnel_ips.remove(self.local_ip)
+ else:
+ db.tunnel_ips.insert(ip_address=self.local_ip)
+
+ new_tunnel_ips = tunnel_ips - old_tunnel_ips
+ if new_tunnel_ips:
+ LOG.info("adding tunnels to: %s" % new_tunnel_ips)
+ for ip in new_tunnel_ips:
+ tun_name = "gre-" + str(self.tunnel_count)
+ self.tun_br.add_tunnel_port(tun_name, ip)
+ self.tunnel_count += 1
+
+ # adding new ports can void flows, so reset flows
+ self.tun_br.remove_all_flows()
+ self.tun_br.add_flow(priority=1, actions="drop")
+ for lv_ojb in self.local_vlan_map.values():
+ self.add_tun_br_flows_for_local_vlan(lv_obj)
+
+ def rollback_until_success(self, db):
+ while True:
+ time.sleep(self.reconnect_interval)
+ try:
+ db.rollback()
+ break
+ except:
+ LOG.exception("Problem connecting to database")
def daemon_loop(self, db_connection_url):
- '''Main processing loop (not currently used).
+ '''Main processing loop for Tunneling Agent.
:param options: database information - in the event need to reconnect
'''
old_local_bindings = {}
old_vif_ports = {}
- self.db_connected = False
+ old_tunnel_ips = set()
- while True:
- if not self.db_connected:
- time.sleep(self.reconnect_interval)
- db = SqlSoup(db_connection_url)
- self.db_connected = True
- LOG.info("Connecting to database \"%s\" on %s" %
- (db.engine.url.database, db.engine.url.host))
+ db = sqlsoup.SqlSoup(db_connection_url)
+ LOG.info("Connecting to database \"%s\" on %s" %
+ (db.engine.url.database, db.engine.url.host))
- # Get bindings from db.
- all_bindings = self.get_db_port_bindings(db)
- if not self.db_connected:
- continue
- all_bindings_vif_port_ids = set(all_bindings.keys())
- lsw_id_bindings = self.get_db_vlan_bindings(db)
- if not self.db_connected:
- continue
-
- # Get bindings from OVS bridge.
- vif_ports = self.int_br.get_vif_ports()
- new_vif_ports = dict([(p.vif_id, p) for p in vif_ports])
- new_vif_ports_ids = set(new_vif_ports.keys())
-
- old_vif_ports_ids = set(old_vif_ports.keys())
- dead_vif_ports_ids = new_vif_ports_ids - all_bindings_vif_port_ids
- dead_vif_ports = [new_vif_ports[p] for p in dead_vif_ports_ids]
- disappeared_vif_ports_ids = old_vif_ports_ids - new_vif_ports_ids
- new_local_bindings_ids = all_bindings_vif_port_ids.intersection(
- new_vif_ports_ids)
- new_local_bindings = dict([(p, all_bindings.get(p))
- for p in new_vif_ports_ids])
- new_bindings = set(
- (p, old_local_bindings.get(p),
- new_local_bindings.get(p)) for p in new_vif_ports_ids)
- changed_bindings = set([b for b in new_bindings if b[2] != b[1]])
-
- LOG.debug('all_bindings: %s' % all_bindings)
- LOG.debug('lsw_id_bindings: %s' % lsw_id_bindings)
- LOG.debug('old_vif_ports_ids: %s' % old_vif_ports_ids)
- LOG.debug('dead_vif_ports_ids: %s' % dead_vif_ports_ids)
- LOG.debug('old_vif_ports_ids: %s' % old_vif_ports_ids)
- LOG.debug('new_local_bindings_ids: %s' % new_local_bindings_ids)
- LOG.debug('new_local_bindings: %s' % new_local_bindings)
- LOG.debug('new_bindings: %s' % new_bindings)
- LOG.debug('changed_bindings: %s' % changed_bindings)
-
- # Take action.
- for p in dead_vif_ports:
- LOG.info("No quantum binding for port " + str(p)
- + "putting on dead vlan")
- self.port_dead(p)
-
- for b in changed_bindings:
- port_id, old_port, new_port = b
- p = new_vif_ports[port_id]
- if old_port:
- old_net_uuid = old_port.network_id
- LOG.info("Removing binding to net-id = " +
- old_net_uuid + " for " + str(p)
- + " added to dead vlan")
- self.port_unbound(p, old_net_uuid)
- if not new_port:
- self.port_dead(p)
-
- if new_port:
- new_net_uuid = new_port.network_id
- if new_net_uuid not in lsw_id_bindings:
- LOG.warn("No ls-id binding found for net-id '%s'" %
- new_net_uuid)
- continue
-
- lsw_id = lsw_id_bindings[new_net_uuid]
- try:
+ while True:
+ try:
+ all_bindings = dict((p.interface_id, Port(p))
+ for p in db.ports.all())
+ all_bindings_vif_port_ids = set(all_bindings)
+ lsw_id_bindings = dict((bind.network_id, bind.vlan_id)
+ for bind in db.vlan_bindings.all())
+
+ tunnel_ips = set(x.ip_address for x in db.tunnel_ips.all())
+ self.manage_tunnels(tunnel_ips, old_tunnel_ips, db)
+
+ # Get bindings from OVS bridge.
+ vif_ports = self.int_br.get_vif_ports()
+ new_vif_ports = dict([(p.vif_id, p) for p in vif_ports])
+ new_vif_ports_ids = set(new_vif_ports.keys())
+
+ old_vif_ports_ids = set(old_vif_ports.keys())
+ dead_vif_ports_ids = (new_vif_ports_ids -
+ all_bindings_vif_port_ids)
+ dead_vif_ports = [new_vif_ports[p] for p in dead_vif_ports_ids]
+ disappeared_vif_ports_ids = (old_vif_ports_ids -
+ new_vif_ports_ids)
+ new_local_bindings_ids = (all_bindings_vif_port_ids.
+ intersection(new_vif_ports_ids))
+ new_local_bindings = dict([(p, all_bindings.get(p))
+ for p in new_vif_ports_ids])
+ new_bindings = set(
+ (p, old_local_bindings.get(p),
+ new_local_bindings.get(p)) for p in new_vif_ports_ids)
+ changed_bindings = set([b for b in new_bindings
+ if b[2] != b[1]])
+
+ LOG.debug('all_bindings: %s', all_bindings)
+ LOG.debug('lsw_id_bindings: %s', lsw_id_bindings)
+ LOG.debug('new_vif_ports_ids: %s', new_vif_ports_ids)
+ LOG.debug('dead_vif_ports_ids: %s', dead_vif_ports_ids)
+ LOG.debug('old_vif_ports_ids: %s', old_vif_ports_ids)
+ LOG.debug('new_local_bindings_ids: %s',
+ new_local_bindings_ids)
+ LOG.debug('new_local_bindings: %s', new_local_bindings)
+ LOG.debug('new_bindings: %s', new_bindings)
+ LOG.debug('changed_bindings: %s', changed_bindings)
+
+ # Take action.
+ for p in dead_vif_ports:
+ LOG.info("No quantum binding for port " + str(p)
+ + "putting on dead vlan")
+ self.port_dead(p)
+
+ for b in changed_bindings:
+ port_id, old_port, new_port = b
+ p = new_vif_ports[port_id]
+ if old_port:
+ old_net_uuid = old_port.network_id
+ LOG.info("Removing binding to net-id = " +
+ old_net_uuid + " for " + str(p)
+ + " added to dead vlan")
+ self.port_unbound(p, old_net_uuid)
+ all_bindings[p.vif_id].op_status = OP_STATUS_DOWN
+ if not new_port:
+ self.port_dead(p)
+
+ if new_port:
+ new_net_uuid = new_port.network_id
+ if new_net_uuid not in lsw_id_bindings:
+ LOG.warn("No ls-id binding found for net-id '%s'" %
+ new_net_uuid)
+ continue
+
+ lsw_id = lsw_id_bindings[new_net_uuid]
self.port_bound(p, new_net_uuid, lsw_id)
+ all_bindings[p.vif_id].op_status = OP_STATUS_UP
LOG.info("Port " + str(p) + " on net-id = "
+ new_net_uuid + " bound to " +
str(self.local_vlan_map[new_net_uuid]))
- except Exception as e:
- LOG.info("Unable to bind Port " + str(p) +
- " on netid = " + new_net_uuid + " to "
- + str(self.local_vlan_map[new_net_uuid]))
-
- for vif_id in disappeared_vif_ports_ids:
- LOG.info("Port Disappeared: " + vif_id)
- old_port = old_local_bindings.get(vif_id)
- if old_port:
- try:
+
+ for vif_id in disappeared_vif_ports_ids:
+ LOG.info("Port Disappeared: " + vif_id)
+ if vif_id in all_bindings:
+ all_bindings[vif_id].op_status = OP_STATUS_DOWN
+ old_port = old_local_bindings.get(vif_id)
+ if old_port:
self.port_unbound(old_vif_ports[vif_id],
old_port.network_id)
- except Exception:
- LOG.info("Unable to unbind Port " + str(p) +
- " on net-id = " + old_port.network_uuid)
-
- old_vif_ports = new_vif_ports
- old_local_bindings = new_local_bindings
- try:
+ # commit any DB changes and expire
+ # data loaded from the database
db.commit()
- except Exception as e:
- LOG.info("Unable to commit to database! Exception: %s" % e)
- db.rollback()
- old_local_bindings = {}
- old_vif_ports = {}
- time.sleep(self.polling_interval)
+ # sleep and re-initialize state for next pass
+ time.sleep(self.polling_interval)
+ old_tunnel_ips = tunnel_ips
+ old_vif_ports = new_vif_ports
+ old_local_bindings = new_local_bindings
+
+ except:
+ LOG.exception("Main-loop Exception:")
+ self.rollback_until_success(db)
def main():
raise Exception('Empty tunnel-bridge in configuration file.')
# Mandatory parameter.
- remote_ip_file = config.get("OVS", "remote-ip-file")
- if not len(remote_ip_file):
- raise Exception('Empty remote-ip-file in configuration file.')
-
- # Mandatory parameter.
- remote_ip_file = config.get("OVS", "remote-ip-file")
local_ip = config.get("OVS", "local-ip")
if not len(local_ip):
raise Exception('Empty local-ip in configuration file.')
(config_file, str(e)))
sys.exit(1)
- plugin = OVSQuantumTunnelAgent(integ_br, tun_br, remote_ip_file,
- local_ip, root_helper,
+ plugin = OVSQuantumTunnelAgent(integ_br, tun_br, local_ip, root_helper,
polling_interval, reconnect_interval)
else:
# Get parameters for OVSQuantumAgent.