From: Gary Kotton Date: Tue, 24 Apr 2012 06:02:03 +0000 (-0400) Subject: blueprint agent-db-ha X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=be45b704ac6fc39e9f934f886403a622c30803cb;p=openstack-build%2Fneutron-build.git blueprint agent-db-ha bug 985470 bug 985646 The fixes enable the OVS and linuxbridge agenets to "keep alive" when the host running the server/plugin is down. Fixes after comments. Better logging Fixes after comments - added reconnect interval + cleanup Fixes after comments - simplify code + ovs intervals moved to configuration file Fixes after comments - move int conversion to configuration Fixes after comments - if one of the polling interval or reconnect interval are not defined in the relevant ini files then a default value is used. Fixes after comments and merges with HACKING.rst fixes Fixes after port binding comments Fixes after comments from gongysh Fixes after comments - align comments in agent ini files Fixes - revert some code Change-Id: I9194f142478b130e8ef198b019539357a9916d7f --- diff --git a/etc/quantum/plugins/linuxbridge/linuxbridge_conf.ini b/etc/quantum/plugins/linuxbridge/linuxbridge_conf.ini index dd25e0786..93b66d4d7 100644 --- a/etc/quantum/plugins/linuxbridge/linuxbridge_conf.ini +++ b/etc/quantum/plugins/linuxbridge/linuxbridge_conf.ini @@ -16,12 +16,14 @@ host = port = 3306 [LINUX_BRIDGE] -#this is the interface connected to the switch on your Quantum network +# This is the interface connected to the switch on your Quantum network physical_interface = eth1 [AGENT] -#agent's polling interval in seconds +# Agent's polling interval in seconds polling_interval = 2 +# Agent's database reconnection interval in seconds - in event connectivity is lost +reconnect_interval = 2 # Change to "sudo quantum-rootwrap" to limit commands that can be run # as root. root_helper = sudo diff --git a/etc/quantum/plugins/openvswitch/ovs_quantum_plugin.ini b/etc/quantum/plugins/openvswitch/ovs_quantum_plugin.ini index a7a7f1a6b..12796aa0c 100644 --- a/etc/quantum/plugins/openvswitch/ovs_quantum_plugin.ini +++ b/etc/quantum/plugins/openvswitch/ovs_quantum_plugin.ini @@ -32,6 +32,10 @@ integration-bridge = br-int # local-ip = 10.0.0.3 [AGENT] +# Agent's polling interval in seconds +polling_interval = 2 +# Agent's database reconnection interval in seconds - in event connectivity is lost +reconnect_interval = 2 # Change to "sudo quantum-rootwrap" to limit commands that can be run # as root. root_helper = sudo diff --git a/quantum/plugins/linuxbridge/agent/linuxbridge_quantum_agent.py b/quantum/plugins/linuxbridge/agent/linuxbridge_quantum_agent.py index 863a8a546..0c1a896df 100755 --- a/quantum/plugins/linuxbridge/agent/linuxbridge_quantum_agent.py +++ b/quantum/plugins/linuxbridge/agent/linuxbridge_quantum_agent.py @@ -28,14 +28,13 @@ from optparse import OptionParser import os import shlex import signal -import sqlite3 import subprocess import sys import time -import MySQLdb - +from sqlalchemy.ext.sqlsoup import SqlSoup +logging.basicConfig() LOG = logging.getLogger(__name__) @@ -52,7 +51,9 @@ VLAN_BINDINGS = "vlan_bindings" PORT_BINDINGS = "port_bindings" OP_STATUS_UP = "UP" OP_STATUS_DOWN = "DOWN" -DB_CONNECTION = None +# Default inteval values +DEFAULT_POLLING_INTERVAL = 2 +DEFAULT_RECONNECT_INTERVAL = 2 class LinuxBridge: @@ -288,10 +289,12 @@ class LinuxBridge: class LinuxBridgeQuantumAgent: def __init__(self, br_name_prefix, physical_interface, polling_interval, - root_helper): - self.polling_interval = int(polling_interval) + reconnect_interval, root_helper): + self.polling_interval = polling_interval + self.reconnect_interval = reconnect_interval self.root_helper = root_helper self.setup_linux_bridge(br_name_prefix, physical_interface) + self.db_connected = False def setup_linux_bridge(self, br_name_prefix, physical_interface): self.linux_br = LinuxBridge(br_name_prefix, physical_interface, @@ -350,27 +353,43 @@ class LinuxBridgeQuantumAgent: if bridge not in current_quantum_bridge_names: self.linux_br.delete_vlan_bridge(bridge) - def manage_networks_on_host(self, conn, old_vlan_bindings, + def manage_networks_on_host(self, db, + old_vlan_bindings, old_port_bindings): - if DB_CONNECTION != 'sqlite': - cursor = MySQLdb.cursors.DictCursor(conn) - else: - cursor = conn.cursor() - cursor.execute("SELECT * FROM vlan_bindings") - rows = cursor.fetchall() - cursor.close() vlan_bindings = {} + try: + vlan_binds = db.vlan_bindings.all() + except Exception as e: + LOG.info("Unable to get vlan bindings! Exception: %s" % e) + self.db_connected = False + return {VLAN_BINDINGS: {}, + PORT_BINDINGS: []} + vlans_string = "" - for row in rows: - vlan_bindings[row['network_id']] = row - vlans_string = "%s %s" % (vlans_string, row) + for bind in vlan_binds: + entry = {'network_id': bind.network_id, 'vlan_id': bind.vlan_id} + vlan_bindings[bind.network_id] = entry + vlans_string = "%s %s" % (vlans_string, entry) + + port_bindings = [] + try: + port_binds = db.ports.all() + except Exception as e: + LOG.info("Unable to get port bindings! Exception: %s" % e) + self.db_connected = False + return {VLAN_BINDINGS: {}, + PORT_BINDINGS: []} + + all_bindings = {} + for bind in port_binds: + all_bindings[bind.uuid] = bind + entry = {'network_id': bind.network_id, 'state': bind.state, + 'op_status': bind.op_status, 'uuid': bind.uuid, + 'interface_id': bind.interface_id} + if bind.state == 'ACTIVE': + port_bindings.append(entry) plugged_interfaces = [] - cursor = MySQLdb.cursors.DictCursor(conn) - cursor.execute("SELECT * FROM ports where state = 'ACTIVE'") - port_bindings = cursor.fetchall() - cursor.close() - ports_string = "" for pb in port_bindings: ports_string = "%s %s" % (ports_string, pb) @@ -380,10 +399,7 @@ class LinuxBridgeQuantumAgent: pb['network_id'], pb['interface_id'], vlan_id): - cursor = MySQLdb.cursors.DictCursor(conn) - sql = PORT_OPSTATUS_UPDATESQL % (pb['uuid'], OP_STATUS_UP) - cursor.execute(sql) - cursor.close() + all_bindings[pb['uuid']].op_status = OP_STATUS_UP plugged_interfaces.append(pb['interface_id']) if old_port_bindings != port_bindings: @@ -396,16 +412,30 @@ class LinuxBridgeQuantumAgent: self.process_deleted_networks(vlan_bindings) - conn.commit() + try: + db.commit() + except Exception as e: + LOG.info("Unable to update database! Exception: %s" % e) + db.rollback() + vlan_bindings = {} + port_bindings = [] + return {VLAN_BINDINGS: vlan_bindings, PORT_BINDINGS: port_bindings} - def daemon_loop(self, conn): + def daemon_loop(self, db_connection_url): old_vlan_bindings = {} - old_port_bindings = {} + old_port_bindings = [] + self.db_connected = False while True: - bindings = self.manage_networks_on_host(conn, + if not self.db_connected: + time.sleep(self.reconnect_interval) + db = SqlSoup(db_connection_url) + self.db_connected = True + LOG.info("Connecting to database \"%s\" on %s" % + (db.engine.url.database, db.engine.url.host)) + bindings = self.manage_networks_on_host(db, old_vlan_bindings, old_port_bindings) old_vlan_bindings = bindings[VLAN_BINDINGS] @@ -422,9 +452,9 @@ def main(): options, args = parser.parse_args() if options.verbose: - LOG.basicConfig(level=LOG.DEBUG) + LOG.setLevel(logging.DEBUG) else: - LOG.basicConfig(level=LOG.WARN) + LOG.setLevel(logging.WARNING) if len(args) != 1: parser.print_help() @@ -432,22 +462,28 @@ def main(): config_file = args[0] config = ConfigParser.ConfigParser() - conn = None try: fh = open(config_file) fh.close() config.read(config_file) br_name_prefix = BRIDGE_NAME_PREFIX physical_interface = config.get("LINUX_BRIDGE", "physical_interface") - polling_interval = config.get("AGENT", "polling_interval") + if config.has_option("AGENT", "polling_interval"): + polling_interval = config.getint("AGENT", "polling_interval") + else: + polling_interval = DEFAULT_POLLING_INTERVAL + LOG.info("Polling interval not defined. Using default.") + if config.has_option("AGENT", "reconnect_interval"): + reconnect_interval = config.getint("AGENT", "reconnect_interval") + else: + reconnect_interval = DEFAULT_RECONNECT_INTERVAL + LOG.info("Reconnect interval not defined. Using default.") root_helper = config.get("AGENT", "root_helper") 'Establish database connection and load models' - global DB_CONNECTION - DB_CONNECTION = config.get("DATABASE", "connection") - if DB_CONNECTION == 'sqlite': + connection = config.get("DATABASE", "connection") + if connection == 'sqlite': LOG.info("Connecting to sqlite DB") - conn = sqlite3.connect(":memory:") - conn.row_factory = sqlite3.Row + db_connection_url = "sqlite:///:memory:" else: db_name = config.get("DATABASE", "name") db_user = config.get("DATABASE", "user") @@ -455,21 +491,18 @@ def main(): db_host = config.get("DATABASE", "host") db_port = int(config.get("DATABASE", "port")) LOG.info("Connecting to database %s on %s" % (db_name, db_host)) - conn = MySQLdb.connect(host=db_host, user=db_user, port=db_port, - passwd=db_pass, db=db_name) - except Exception, e: - LOG.error("Unable to parse config file \"%s\": \nException%s" - % (config_file, str(e))) + db_connection_url = ("%s://%s:%s@%s:%d/%s" % + (connection, db_user, db_pass, db_host, db_port, db_name)) + except Exception as e: + LOG.error("Unable to parse config file \"%s\": \nException %s" % + (config_file, str(e))) sys.exit(1) - try: - plugin = LinuxBridgeQuantumAgent(br_name_prefix, physical_interface, - polling_interval, root_helper) - LOG.info("Agent initialized successfully, now running...") - plugin.daemon_loop(conn) - finally: - if conn: - conn.close() + plugin = LinuxBridgeQuantumAgent(br_name_prefix, physical_interface, + polling_interval, reconnect_interval, + root_helper) + LOG.info("Agent initialized successfully, now running... ") + plugin.daemon_loop(db_connection_url) sys.exit(0) diff --git a/quantum/plugins/linuxbridge/tests/unit/_test_linuxbridgeAgent.py b/quantum/plugins/linuxbridge/tests/unit/_test_linuxbridgeAgent.py index 3fd7f5bb0..21b142acb 100644 --- a/quantum/plugins/linuxbridge/tests/unit/_test_linuxbridgeAgent.py +++ b/quantum/plugins/linuxbridge/tests/unit/_test_linuxbridgeAgent.py @@ -34,7 +34,7 @@ from quantum.plugins.linuxbridge.common import constants as lconst from quantum.plugins.linuxbridge.db import l2network_db as cdb -LOG = logger.getLogger(__name__) +LOG = logging.getLogger(__name__) class LinuxBridgeAgentTest(unittest.TestCase): diff --git a/quantum/plugins/openvswitch/agent/ovs_quantum_agent.py b/quantum/plugins/openvswitch/agent/ovs_quantum_agent.py index bb51b84d8..a01289d73 100755 --- a/quantum/plugins/openvswitch/agent/ovs_quantum_agent.py +++ b/quantum/plugins/openvswitch/agent/ovs_quantum_agent.py @@ -30,10 +30,9 @@ import time from sqlalchemy.ext.sqlsoup import SqlSoup - +logging.basicConfig() LOG = logging.getLogger(__name__) - # Global constants. OP_STATUS_UP = "UP" OP_STATUS_DOWN = "DOWN" @@ -41,7 +40,9 @@ OP_STATUS_DOWN = "DOWN" # A placeholder for dead vlans. DEAD_VLAN_TAG = "4095" -REFRESH_INTERVAL = 2 +# Default interval values +DEFAULT_POLLING_INTERVAL = 2 +DEFAULT_RECONNECT_INTERVAL = 2 # A class to represent a VIF (i.e., a port that has 'iface-id' and 'vif-mac' @@ -215,9 +216,12 @@ class LocalVLANMapping: class OVSQuantumAgent(object): - def __init__(self, integ_br, root_helper): + def __init__(self, integ_br, root_helper, + polling_interval, reconnect_interval): self.root_helper = root_helper self.setup_integration_br(integ_br) + self.polling_interval = polling_interval + self.reconnect_interval = reconnect_interval def port_bound(self, port, vlan_id): self.int_br.set_db_attribute("Port", port.port_name, @@ -234,26 +238,39 @@ class OVSQuantumAgent(object): # switch all traffic using L2 learning self.int_br.add_flow(priority=1, actions="normal") - def daemon_loop(self, db): + def daemon_loop(self, db_connection_url): self.local_vlan_map = {} old_local_bindings = {} old_vif_ports = {} + db_connected = False while True: + if not db_connected: + time.sleep(self.reconnect_interval) + db = SqlSoup(db_connection_url) + db_connected = True + LOG.info("Connecting to database \"%s\" on %s" % + (db.engine.url.database, db.engine.url.host)) all_bindings = {} try: ports = db.ports.all() - except: - ports = [] + except Exception as e: + LOG.info("Unable to get port bindings! Exception: %s" % e) + db_connected = False + continue + for port in ports: all_bindings[port.interface_id] = port vlan_bindings = {} try: vlan_binds = db.vlan_bindings.all() - except: - vlan_binds = [] + except Exception as e: + LOG.info("Unable to get vlan bindings! Exception: %s" % e) + db_connected = False + continue + for bind in vlan_binds: vlan_bindings[bind.network_id] = bind.vlan_id @@ -306,8 +323,15 @@ class OVSQuantumAgent(object): old_vif_ports = new_vif_ports old_local_bindings = new_local_bindings - db.commit() - time.sleep(REFRESH_INTERVAL) + try: + db.commit() + except Exception as e: + LOG.info("Unable to commit to database! Exception: %s" % e) + db.rollback() + old_local_bindings = {} + old_vif_ports = {} + + time.sleep(self.polling_interval) class OVSQuantumTunnelAgent(object): @@ -335,7 +359,7 @@ class OVSQuantumTunnelAgent(object): MAX_VLAN_TAG = 4094 def __init__(self, integ_br, tun_br, remote_ip_file, local_ip, - root_helper): + root_helper, polling_interval, reconnect_interval): '''Constructor. :param integ_br: name of the integration bridge. @@ -349,6 +373,9 @@ class OVSQuantumTunnelAgent(object): self.setup_integration_br(integ_br) self.local_vlan_map = {} self.setup_tunnel_br(tun_br, remote_ip_file, local_ip) + self.db_connected = False + self.polling_interval = polling_interval + self.reconnect_interval = reconnect_interval def provision_local_vlan(self, net_uuid, lsw_id): '''Provisions a local VLAN. @@ -466,7 +493,7 @@ class OVSQuantumTunnelAgent(object): tunnel_ips = (x for x in clean_ips if x != local_ip and x) for i, remote_ip in enumerate(tunnel_ips): self.tun_br.add_tunnel_port("gre-" + str(i), remote_ip) - except Exception, e: + except Exception as e: LOG.error("Error configuring tunnels: '%s' %s" % (remote_ip_file, str(e))) raise @@ -485,8 +512,10 @@ class OVSQuantumTunnelAgent(object): ports = [] try: ports = db.ports.all() - except Exception, e: - LOG.info("Exception accessing db.ports: %s" % e) + except Exceptioni as e: + LOG.info("Unable to get port bindings! Exception: %s" % e) + self.db_connected = False + return {} return dict([(port.interface_id, port) for port in ports]) @@ -500,25 +529,39 @@ class OVSQuantumTunnelAgent(object): lsw_id_binds = [] try: lsw_id_binds.extend(db.vlan_bindings.all()) - except Exception, e: - LOG.info("Exception accessing db.vlan_bindings: %s" % e) + except Exception as e: + LOG.info("Unable to get vlan bindings! Exception: %s" % e) + self.db_connected = False + return {} return dict([(bind.network_id, bind.vlan_id) for bind in lsw_id_binds]) - def daemon_loop(self, db): + def daemon_loop(self, db_connection_url): '''Main processing loop (not currently used). - :param db: reference to database layer. + :param options: database information - in the event need to reconnect ''' old_local_bindings = {} old_vif_ports = {} + self.db_connected = False while True: + if not self.db_connected: + time.sleep(self.reconnect_interval) + db = SqlSoup(db_connection_url) + self.db_connected = True + LOG.info("Connecting to database \"%s\" on %s" % + (db.engine.url.database, db.engine.url.host)) + # Get bindings from db. all_bindings = self.get_db_port_bindings(db) + if not self.db_connected: + continue all_bindings_vif_port_ids = set(all_bindings.keys()) lsw_id_bindings = self.get_db_vlan_bindings(db) + if not self.db_connected: + continue # Get bindings from OVS bridge. vif_ports = self.int_br.get_vif_ports() @@ -579,7 +622,7 @@ class OVSQuantumTunnelAgent(object): LOG.info("Port " + str(p) + " on net-id = " + new_net_uuid + " bound to " + str(self.local_vlan_map[new_net_uuid])) - except Exception, e: + except Exception as e: LOG.info("Unable to bind Port " + str(p) + " on netid = " + new_net_uuid + " to " + str(self.local_vlan_map[new_net_uuid])) @@ -597,7 +640,7 @@ class OVSQuantumTunnelAgent(object): old_vif_ports = new_vif_ports old_local_bindings = new_local_bindings - time.sleep(REFRESH_INTERVAL) + time.sleep(self.polling_interval) def main(): @@ -609,9 +652,9 @@ def main(): options, args = parser.parse_args() if options.verbose: - LOG.basicConfig(level=LOG.DEBUG) + LOG.setLevel(logging.DEBUG) else: - LOG.basicConfig(level=LOG.WARN) + LOG.setLevel(logging.WARNING) if len(args) != 1: parser.print_help() @@ -621,7 +664,7 @@ def main(): config = ConfigParser.ConfigParser() try: config.read(config_file) - except Exception, e: + except Exception as e: LOG.error("Unable to parse config file \"%s\": %s" % (config_file, str(e))) raise e @@ -630,7 +673,7 @@ def main(): enable_tunneling = False try: enable_tunneling = config.getboolean("OVS", "enable-tunneling") - except Exception, e: + except Exception as e: pass # Get common parameters. @@ -643,9 +686,19 @@ def main(): if not len(db_connection_url): raise Exception('Empty db_connection_url in configuration file.') + if config.has_option("AGENT", "polling_interval"): + polling_interval = config.getint("AGENT", "polling_interval") + else: + polling_interval = DEFAULT_POLLING_INTERVAL + LOG.info("Polling interval not defined. Using default.") + if config.has_option("AGENT", "reconnect_interval"): + reconnect_interval = config.getint("AGENT", "reconnect_interval") + else: + reconnect_interval = DEFAULT_RECONNECT_INTERVAL + LOG.info("Reconnect interval not defined. Using default.") root_helper = config.get("AGENT", "root_helper") - except Exception, e: + except Exception as e: LOG.error("Error parsing common params in config_file: '%s': %s" % (config_file, str(e))) sys.exit(1) @@ -668,24 +721,22 @@ def main(): local_ip = config.get("OVS", "local-ip") if not len(local_ip): raise Exception('Empty local-ip in configuration file.') - except Exception, e: + + except Exception as e: LOG.error("Error parsing tunnel params in config_file: '%s': %s" % (config_file, str(e))) sys.exit(1) plugin = OVSQuantumTunnelAgent(integ_br, tun_br, remote_ip_file, - local_ip, root_helper) + local_ip, root_helper, + polling_interval, reconnect_interval) else: # Get parameters for OVSQuantumAgent. - plugin = OVSQuantumAgent(integ_br, root_helper) + plugin = OVSQuantumAgent(integ_br, root_helper, + polling_interval, reconnect_interval) # Start everything. - options = {"sql_connection": db_connection_url} - db = SqlSoup(options["sql_connection"]) - LOG.info("Connecting to database \"%s\" on %s" % - (db.engine.url.database, db.engine.url.host)) - - plugin.daemon_loop(db) + plugin.daemon_loop(db_connection_url) sys.exit(0) diff --git a/quantum/plugins/openvswitch/tests/unit/test_tunnel.py b/quantum/plugins/openvswitch/tests/unit/test_tunnel.py index c0bf2b35d..7d00434ca 100644 --- a/quantum/plugins/openvswitch/tests/unit/test_tunnel.py +++ b/quantum/plugins/openvswitch/tests/unit/test_tunnel.py @@ -93,7 +93,7 @@ class TunnelTest(unittest.TestCase): self.TUN_BRIDGE, REMOTE_IP_FILE, '10.0.0.1', - 'sudo') + 'sudo', 2, 2) self.mox.VerifyAll() def testProvisionLocalVlan(self): @@ -113,7 +113,7 @@ class TunnelTest(unittest.TestCase): self.TUN_BRIDGE, REMOTE_IP_FILE, '10.0.0.1', - 'sudo') + 'sudo', 2, 2) a.available_local_vlans = set([LV_ID]) a.provision_local_vlan(NET_UUID, LS_ID) self.mox.VerifyAll() @@ -130,7 +130,7 @@ class TunnelTest(unittest.TestCase): self.TUN_BRIDGE, REMOTE_IP_FILE, '10.0.0.1', - 'sudo') + 'sudo', 2, 2) a.available_local_vlans = set() a.local_vlan_map[NET_UUID] = LVM a.reclaim_local_vlan(NET_UUID, LVM) @@ -147,7 +147,7 @@ class TunnelTest(unittest.TestCase): self.TUN_BRIDGE, REMOTE_IP_FILE, '10.0.0.1', - 'sudo') + 'sudo', 2, 2) a.local_vlan_map[NET_UUID] = LVM a.port_bound(VIF_PORT, NET_UUID, LS_ID) self.mox.VerifyAll() @@ -158,7 +158,7 @@ class TunnelTest(unittest.TestCase): self.TUN_BRIDGE, REMOTE_IP_FILE, '10.0.0.1', - 'sudo') + 'sudo', 2, 2) a.available_local_vlans = set([LV_ID]) a.local_vlan_map[NET_UUID] = LVM a.port_unbound(VIF_PORT, NET_UUID) @@ -177,7 +177,7 @@ class TunnelTest(unittest.TestCase): self.TUN_BRIDGE, REMOTE_IP_FILE, '10.0.0.1', - 'sudo') + 'sudo', 2, 2) a.available_local_vlans = set([LV_ID]) a.local_vlan_map[NET_UUID] = LVM a.port_dead(VIF_PORT) @@ -200,7 +200,7 @@ class TunnelTest(unittest.TestCase): self.TUN_BRIDGE, REMOTE_IP_FILE, '10.0.0.1', - 'sudo') + 'sudo', 2, 2) all_bindings = a.get_db_port_bindings(db) lsw_id_bindings = a.get_db_vlan_bindings(db)