From a23b1e6de07704a771c1d3a8ed9ecf4b7807c290 Mon Sep 17 00:00:00 2001 From: Ivar Lazzaro Date: Tue, 14 Jan 2014 11:17:05 -0800 Subject: [PATCH] Embrane Tempest Compliance This changeset tracks the changes needed by the Embrane's Neutron Plugin in order to consistently pass tempest tests. Changes: - Some db transactions were too long and were causing lock timeout exception. Removed useless transactions (waiting on non-db tasks to complete) to fix the problem. - The operation filter was useless, and breaking the tests. Most of the logic which guarantees the appliance correct state when an operation in executed is now in the internal library used for the heleos APIs. The filter was therefore removed (as well as the corresponding exception). - Fixed "sync" mode. The behavior was incorrect due to the queue timeout. Furthermore, parallel requests were not waiting on the correct thread. - Added missing methods for floating IPs (not all the scenarios were covered). - Minor bug fixes caught during the tests. Change-Id: If081b50b4629158016ba421b94612a4cfac82257 Closes-Bug:1269098 --- neutron/plugins/embrane/agent/dispatcher.py | 35 +- .../agent/operations/router_operations.py | 2 + neutron/plugins/embrane/base_plugin.py | 335 +++++++++--------- neutron/plugins/embrane/common/constants.py | 12 - neutron/plugins/embrane/common/exceptions.py | 11 - neutron/plugins/embrane/common/utils.py | 10 +- .../embrane/plugins/embrane_ovs_plugin.py | 1 + 7 files changed, 196 insertions(+), 210 deletions(-) diff --git a/neutron/plugins/embrane/agent/dispatcher.py b/neutron/plugins/embrane/agent/dispatcher.py index 8038b93e0..31bc15511 100644 --- a/neutron/plugins/embrane/agent/dispatcher.py +++ b/neutron/plugins/embrane/agent/dispatcher.py @@ -26,18 +26,10 @@ from neutron.openstack.common import log as logging from neutron.plugins.embrane.agent.operations import router_operations from neutron.plugins.embrane.common import constants as p_con from neutron.plugins.embrane.common import contexts as ctx -from neutron.plugins.embrane.common import exceptions as plugin_exc - LOG = logging.getLogger(__name__) -def _validate_operation(event, status, item_id): - if status and event not in p_con.operation_filter[status]: - raise plugin_exc.StateConstraintException(operation=event, - dva_id=item_id, state=status) - - class Dispatcher(object): def __init__(self, plugin, async=True): @@ -52,28 +44,29 @@ class Dispatcher(object): chain = d_context.chain item_id = item["id"] - # First round validation (Controller level) - _validate_operation(event, item["status"], item_id) - handlers = router_operations.handlers if event in handlers: for f in handlers[event]: first_run = False if item_id not in self.sync_items: - self.sync_items[item_id] = queue.Queue() + self.sync_items[item_id] = (queue.Queue(),) first_run = True - self.sync_items[item_id].put( + self.sync_items[item_id][0].put( ctx.OperationContext(event, q_context, item, chain, f, args, kwargs)) + t = None if first_run: t = greenthread.spawn(self._consume_l3, item_id, - self.sync_items[item_id], - self._plugin) + self.sync_items[item_id][0], + self._plugin, + self._async) + self.sync_items[item_id] += (t,) if not self._async: + t = self.sync_items[item_id][1] t.wait() - def _consume_l3(self, sync_item, sync_queue, plugin): + def _consume_l3(self, sync_item, sync_queue, plugin, a_sync): current_state = None while True: try: @@ -83,15 +76,13 @@ class Dispatcher(object): del self.sync_items[sync_item] return try: + # If synchronous op, empty the queue as fast as possible operation_context = sync_queue.get( + block=a_sync, timeout=p_con.QUEUE_TIMEOUT) except queue.Empty: del self.sync_items[sync_item] return - # Second round validation (enqueued level) - _validate_operation(operation_context.event, - current_state, - operation_context.item["id"]) # Execute the preliminary operations (operation_context.chain and operation_context.chain.execute_all()) @@ -134,12 +125,10 @@ class Dispatcher(object): operation_context.q_context, operation_context.item["id"]) # Error state cannot be reverted - elif current_state != p_con.Status.ERROR: + elif transient_state != p_con.Status.ERROR: current_state = plugin._update_neutron_state( operation_context.q_context, operation_context.item, transient_state) - except plugin_exc.StateConstraintException as e: - LOG.error(_("%s"), e.message) except Exception: LOG.exception(_("Unhandled exception occurred")) diff --git a/neutron/plugins/embrane/agent/operations/router_operations.py b/neutron/plugins/embrane/agent/operations/router_operations.py index d460bc47e..032a5298f 100644 --- a/neutron/plugins/embrane/agent/operations/router_operations.py +++ b/neutron/plugins/embrane/agent/operations/router_operations.py @@ -127,6 +127,8 @@ def _shrink_dva_iface(api, tenant_id, neutron_router, port_id): except h_exc.InterfaceNotFound: LOG.warning(_("Interface %s not found in the heleos back-end," "likely already deleted"), port_id) + return (p_con.Status.ACTIVE if neutron_router["admin_state_up"] else + p_con.Status.READY) except h_exc.PreliminaryOperationsFailed as ex: raise h_exc.BrokenInterface(err_msg=ex.message) state = api.extract_dva_state(dva) diff --git a/neutron/plugins/embrane/base_plugin.py b/neutron/plugins/embrane/base_plugin.py index 2db95e877..33d213888 100644 --- a/neutron/plugins/embrane/base_plugin.py +++ b/neutron/plugins/embrane/base_plugin.py @@ -21,6 +21,7 @@ from heleosapi import backend_operations as h_op from heleosapi import constants as h_con from heleosapi import exceptions as h_exc from oslo.config import cfg +from sqlalchemy.orm import exc from neutron.common import constants as l3_constants from neutron.common import exceptions as neutron_exc @@ -33,7 +34,6 @@ from neutron.plugins.embrane.agent import dispatcher from neutron.plugins.embrane.common import config # noqa from neutron.plugins.embrane.common import constants as p_con from neutron.plugins.embrane.common import contexts as embrane_ctx -from neutron.plugins.embrane.common import exceptions as c_exc from neutron.plugins.embrane.common import operation from neutron.plugins.embrane.common import utils @@ -111,7 +111,7 @@ class EmbranePlugin(object): def _retrieve_prefix_from_port(self, context, neutron_port): subnet_id = neutron_port["fixed_ips"][0]["subnet_id"] - subnet = self._get_subnet(context, subnet_id) + subnet = utils.retrieve_subnet(context, subnet_id) prefix = subnet["cidr"].split("/")[1] return prefix @@ -119,80 +119,47 @@ class EmbranePlugin(object): def create_router(self, context, router): r = router["router"] self._get_tenant_id_for_create(context, r) - with context.session.begin(subtransactions=True): - neutron_router = self._l3super.create_router(self, context, router) - network_id = None - gw_port = None - ext_gw_info = neutron_router.get(l3_db.EXTERNAL_GW_INFO) - if ext_gw_info: - network_id = ext_gw_info.get("network_id") - if network_id: - gw_ports = self.get_ports( - context, - {"device_id": [id], - "device_owner": ["network:router_gateway"]}) - if len(gw_ports) != 1: - raise c_exc.EmbranePluginException( - err_msg=_("There must be only one gateway port " - "per router at once")) - gw_port = gw_ports[0] - - # For now, only small flavor is used - utif_info = (self._plugin_support.retrieve_utif_info(context, - gw_port) - if network_id else None) - ip_allocation_info = (utils.retrieve_ip_allocation_info(self, - context, - gw_port) - if network_id else None) - neutron_router = self._l3super._get_router(self, context, - neutron_router["id"]) - neutron_router["status"] = p_con.Status.CREATING - self._dispatcher.dispatch_l3( - d_context=embrane_ctx.DispatcherContext( - p_con.Events.CREATE_ROUTER, neutron_router, context, None), - args=(h_con.Flavor.SMALL, utif_info, ip_allocation_info)) - return self._make_router_dict(neutron_router) + db_router = self._l3super.create_router(self, context, router) + neutron_router = self._get_router(context, db_router['id']) + gw_port = neutron_router.gw_port + # For now, only small flavor is used + utif_info = (self._plugin_support.retrieve_utif_info(context, + gw_port) + if gw_port else None) + ip_allocation_info = (utils.retrieve_ip_allocation_info(context, + gw_port) + if gw_port else None) + neutron_router = self._l3super._get_router(self, context, + neutron_router["id"]) + neutron_router["status"] = p_con.Status.CREATING + self._dispatcher.dispatch_l3( + d_context=embrane_ctx.DispatcherContext( + p_con.Events.CREATE_ROUTER, neutron_router, context, None), + args=(h_con.Flavor.SMALL, utif_info, ip_allocation_info)) + return self._make_router_dict(neutron_router) def update_router(self, context, id, router): - with context.session.begin(subtransactions=True): - db_router = self._l3super.update_router(self, context, id, router) - gw_port = None - ext_gw_info = db_router.get(l3_db.EXTERNAL_GW_INFO) - if ext_gw_info: - ext_gw_info = db_router[l3_db.EXTERNAL_GW_INFO] - network_id = (ext_gw_info.get("network_id") - if ext_gw_info else None) - if network_id: - gw_ports = self.get_ports( - context, - {"device_id": [id], - "device_owner": ["network:router_gateway"]}) - if len(gw_ports) != 1: - raise c_exc.EmbranePluginException( - err_msg=_("There must be only one gateway port " - "per router at once")) - gw_port = gw_ports[0] - - utif_info = (self._plugin_support.retrieve_utif_info(context, - gw_port) - if gw_port else None) - ip_allocation_info = (utils.retrieve_ip_allocation_info(self, - context, - gw_port) - if gw_port else None) - - routes_info = router["router"].get("routes") - - neutron_router = self._l3super._get_router(self, context, id) - state_change = operation.Operation( - self._set_db_router_state, - args=(context, neutron_router, p_con.Status.UPDATING)) - self._dispatcher.dispatch_l3( - d_context=embrane_ctx.DispatcherContext( - p_con.Events.UPDATE_ROUTER, neutron_router, context, - state_change), - args=(utif_info, ip_allocation_info, routes_info)) + db_router = self._l3super.update_router(self, context, id, router) + neutron_router = self._get_router(context, db_router['id']) + gw_port = neutron_router.gw_port + utif_info = (self._plugin_support.retrieve_utif_info(context, + gw_port) + if gw_port else None) + ip_allocation_info = (utils.retrieve_ip_allocation_info(context, + gw_port) + if gw_port else None) + + routes_info = router["router"].get("routes") + + neutron_router = self._l3super._get_router(self, context, id) + state_change = operation.Operation( + self._set_db_router_state, + args=(context, neutron_router, p_con.Status.UPDATING)) + self._dispatcher.dispatch_l3( + d_context=embrane_ctx.DispatcherContext( + p_con.Events.UPDATE_ROUTER, neutron_router, context, + state_change), + args=(utif_info, ip_allocation_info, routes_info)) return self._make_router_dict(neutron_router) def get_router(self, context, id, fields=None): @@ -241,58 +208,55 @@ class EmbranePlugin(object): """Deletes the DVA with the specific router id.""" # Copy of the parent validation code, shouldn't the base modules # provide functions for validating operations? - with context.session.begin(subtransactions=True): - DEVICE_OWNER_ROUTER_INTF = l3_constants.DEVICE_OWNER_ROUTER_INTF - fips = self.get_floatingips_count(context.elevated(), - filters={"router_id": [id]}) - if fips: - raise l3.RouterInUse(router_id=id) - - device_filter = {"device_id": [id], - "device_owner": [DEVICE_OWNER_ROUTER_INTF]} - ports = self.get_ports_count(context.elevated(), - filters=device_filter) - if ports: - raise l3.RouterInUse(router_id=id) - neutron_router = self._get_router(context, id) - state_change = operation.Operation(self._set_db_router_state, - args=(context, neutron_router, - p_con.Status.DELETING)) - self._dispatcher.dispatch_l3( - d_context=embrane_ctx.DispatcherContext( - p_con.Events.DELETE_ROUTER, neutron_router, context, - state_change), args=()) - LOG.debug(_("Deleting router=%s"), neutron_router) - return neutron_router + device_owner_router_intf = l3_constants.DEVICE_OWNER_ROUTER_INTF + fips = self.get_floatingips_count(context.elevated(), + filters={"router_id": [id]}) + if fips: + raise l3.RouterInUse(router_id=id) + + device_filter = {"device_id": [id], + "device_owner": [device_owner_router_intf]} + ports = self.get_ports_count(context.elevated(), + filters=device_filter) + if ports: + raise l3.RouterInUse(router_id=id) + neutron_router = self._get_router(context, id) + state_change = operation.Operation(self._set_db_router_state, + args=(context, neutron_router, + p_con.Status.DELETING)) + self._dispatcher.dispatch_l3( + d_context=embrane_ctx.DispatcherContext( + p_con.Events.DELETE_ROUTER, neutron_router, context, + state_change), args=()) + LOG.debug(_("Deleting router=%s"), neutron_router) + return neutron_router def add_router_interface(self, context, router_id, interface_info): """Grows DVA interface in the specified subnet.""" - with context.session.begin(subtransactions=True): - neutron_router = self._get_router(context, router_id) - rport_qry = context.session.query(models_v2.Port) - ports = rport_qry.filter_by( - device_id=router_id).all() - if len(ports) >= p_con.UTIF_LIMIT: - raise neutron_exc.BadRequest( - resource=router_id, - msg=("this router doesn't support more than " - + str(p_con.UTIF_LIMIT) + " interfaces")) - neutron_router_iface = self._l3super.add_router_interface( - self, context, router_id, interface_info) - port = self._get_port(context, neutron_router_iface["port_id"]) - utif_info = self._plugin_support.retrieve_utif_info(context, port) - ip_allocation_info = utils.retrieve_ip_allocation_info(self, - context, - port) - state_change = operation.Operation(self._set_db_router_state, - args=(context, neutron_router, - p_con.Status.UPDATING)) - self._dispatcher.dispatch_l3( - d_context=embrane_ctx.DispatcherContext( - p_con.Events.GROW_ROUTER_IF, neutron_router, context, - state_change), - args=(utif_info, ip_allocation_info)) - return neutron_router_iface + neutron_router = self._get_router(context, router_id) + rport_qry = context.session.query(models_v2.Port) + ports = rport_qry.filter_by( + device_id=router_id).all() + if len(ports) >= p_con.UTIF_LIMIT: + raise neutron_exc.BadRequest( + resource=router_id, + msg=("this router doesn't support more than " + + str(p_con.UTIF_LIMIT) + " interfaces")) + neutron_router_iface = self._l3super.add_router_interface( + self, context, router_id, interface_info) + port = self._get_port(context, neutron_router_iface["port_id"]) + utif_info = self._plugin_support.retrieve_utif_info(context, port) + ip_allocation_info = utils.retrieve_ip_allocation_info(context, + port) + state_change = operation.Operation(self._set_db_router_state, + args=(context, neutron_router, + p_con.Status.UPDATING)) + self._dispatcher.dispatch_l3( + d_context=embrane_ctx.DispatcherContext( + p_con.Events.GROW_ROUTER_IF, neutron_router, context, + state_change), + args=(utif_info, ip_allocation_info)) + return neutron_router_iface def remove_router_interface(self, context, router_id, interface_info): port_id = None @@ -300,7 +264,7 @@ class EmbranePlugin(object): port_id = interface_info["port_id"] elif "subnet_id" in interface_info: subnet_id = interface_info["subnet_id"] - subnet = self._get_subnet(context, subnet_id) + subnet = utils.retrieve_subnet(context, subnet_id) rport_qry = context.session.query(models_v2.Port) ports = rport_qry.filter_by( device_id=router_id, @@ -322,43 +286,90 @@ class EmbranePlugin(object): state_change), args=(port_id,)) + def create_floatingip(self, context, floatingip): + result = self._l3super.create_floatingip( + self, context, floatingip) + + if result["port_id"]: + neutron_router = self._get_router(context, result["router_id"]) + db_fixed_port = self._get_port(context, result["port_id"]) + fixed_prefix = self._retrieve_prefix_from_port(context, + db_fixed_port) + db_floating_port = neutron_router["gw_port"] + floating_prefix = self._retrieve_prefix_from_port( + context, db_floating_port) + nat_info = utils.retrieve_nat_info(context, result, + fixed_prefix, + floating_prefix, + neutron_router) + state_change = operation.Operation( + self._set_db_router_state, + args=(context, neutron_router, p_con.Status.UPDATING)) + + self._dispatcher.dispatch_l3( + d_context=embrane_ctx.DispatcherContext( + p_con.Events.SET_NAT_RULE, neutron_router, context, + state_change), + args=(nat_info,)) + return result + def update_floatingip(self, context, id, floatingip): - with context.session.begin(subtransactions=True): - db_fip = self._l3super.get_floatingip(self, context, id) - result = self._l3super.update_floatingip(self, context, id, - floatingip) - - if db_fip["port_id"]: - neutron_router = self._get_router(context, db_fip["router_id"]) - fip_id = db_fip["id"] - state_change = operation.Operation( - self._set_db_router_state, - args=(context, neutron_router, p_con.Status.UPDATING)) - - self._dispatcher.dispatch_l3( - d_context=embrane_ctx.DispatcherContext( - p_con.Events.RESET_NAT_RULE, neutron_router, context, - state_change), - args=(fip_id,)) - if floatingip["floatingip"]["port_id"]: - neutron_router = self._get_router(context, result["router_id"]) - db_fixed_port = self._get_port(context, result["port_id"]) - fixed_prefix = self._retrieve_prefix_from_port(context, - db_fixed_port) - db_floating_port = neutron_router["gw_port"] - floating_prefix = self._retrieve_prefix_from_port( - context, db_floating_port) - nat_info = utils.retrieve_nat_info(context, result, - fixed_prefix, - floating_prefix, - neutron_router) - state_change = operation.Operation( - self._set_db_router_state, - args=(context, neutron_router, p_con.Status.UPDATING)) - - self._dispatcher.dispatch_l3( - d_context=embrane_ctx.DispatcherContext( - p_con.Events.SET_NAT_RULE, neutron_router, context, - state_change), - args=(nat_info,)) + db_fip = self._l3super.get_floatingip(self, context, id) + result = self._l3super.update_floatingip(self, context, id, + floatingip) + + if db_fip["port_id"] and db_fip["port_id"] != result["port_id"]: + neutron_router = self._get_router(context, db_fip["router_id"]) + fip_id = db_fip["id"] + state_change = operation.Operation( + self._set_db_router_state, + args=(context, neutron_router, p_con.Status.UPDATING)) + + self._dispatcher.dispatch_l3( + d_context=embrane_ctx.DispatcherContext( + p_con.Events.RESET_NAT_RULE, neutron_router, context, + state_change), + args=(fip_id,)) + if result["port_id"]: + neutron_router = self._get_router(context, result["router_id"]) + db_fixed_port = self._get_port(context, result["port_id"]) + fixed_prefix = self._retrieve_prefix_from_port(context, + db_fixed_port) + db_floating_port = neutron_router["gw_port"] + floating_prefix = self._retrieve_prefix_from_port( + context, db_floating_port) + nat_info = utils.retrieve_nat_info(context, result, + fixed_prefix, + floating_prefix, + neutron_router) + state_change = operation.Operation( + self._set_db_router_state, + args=(context, neutron_router, p_con.Status.UPDATING)) + + self._dispatcher.dispatch_l3( + d_context=embrane_ctx.DispatcherContext( + p_con.Events.SET_NAT_RULE, neutron_router, context, + state_change), + args=(nat_info,)) return result + + def disassociate_floatingips(self, context, port_id): + try: + fip_qry = context.session.query(l3_db.FloatingIP) + floating_ip = fip_qry.filter_by(fixed_port_id=port_id).one() + router_id = floating_ip["router_id"] + except exc.NoResultFound: + return + self._l3super.disassociate_floatingips(self, context, port_id) + if router_id: + neutron_router = self._get_router(context, router_id) + fip_id = floating_ip["id"] + state_change = operation.Operation( + self._set_db_router_state, + args=(context, neutron_router, p_con.Status.UPDATING)) + + self._dispatcher.dispatch_l3( + d_context=embrane_ctx.DispatcherContext( + p_con.Events.RESET_NAT_RULE, neutron_router, context, + state_change), + args=(fip_id,)) diff --git a/neutron/plugins/embrane/common/constants.py b/neutron/plugins/embrane/common/constants.py index ae75e89d2..65f3818a2 100644 --- a/neutron/plugins/embrane/common/constants.py +++ b/neutron/plugins/embrane/common/constants.py @@ -48,18 +48,6 @@ class Events: SET_NAT_RULE = "set_nat_rule" RESET_NAT_RULE = "reset_nat_rule" -operation_filter = { - Status.ACTIVE: [Events.DELETE_ROUTER, Events.GROW_ROUTER_IF, - Events.SHRINK_ROUTER_IF, Events.UPDATE_ROUTER, - Events.SET_NAT_RULE, Events.RESET_NAT_RULE], - Status.READY: [Events.DELETE_ROUTER, Events.GROW_ROUTER_IF, - Events.SHRINK_ROUTER_IF, Events.UPDATE_ROUTER], - Status.ERROR: [Events.DELETE_ROUTER, Events.SHRINK_ROUTER_IF], - Status.UPDATING: [Events.DELETE_ROUTER, Events.SHRINK_ROUTER_IF, - Events.RESET_NAT_RULE], - Status.CREATING: [Events.DELETE_ROUTER, Events.CREATE_ROUTER], - Status.DELETING: [Events.DELETE_ROUTER]} - _DVA_PENDING_ERROR_MSG = _("Dva is pending for the following reason: %s") _DVA_NOT_FOUNT_ERROR_MSG = _("Dva can't be found to execute the operation, " "probably was cancelled through the heleos UI") diff --git a/neutron/plugins/embrane/common/exceptions.py b/neutron/plugins/embrane/common/exceptions.py index 763dabdad..d2e2c1fdd 100644 --- a/neutron/plugins/embrane/common/exceptions.py +++ b/neutron/plugins/embrane/common/exceptions.py @@ -22,14 +22,3 @@ from neutron.common import exceptions as neutron_exec class EmbranePluginException(neutron_exec.NeutronException): message = _("An unexpected error occurred:%(err_msg)s") - - -# Not permitted operation -class NonPermitted(neutron_exec.BadRequest): - pass - - -class StateConstraintException(NonPermitted): - message = _("Operation not permitted due to state constraint violation:" - "%(operation)s not allowed for DVA %(dva_id)s in state " - " %(state)s") diff --git a/neutron/plugins/embrane/common/utils.py b/neutron/plugins/embrane/common/utils.py index 78c3f8f73..8e4a92aba 100644 --- a/neutron/plugins/embrane/common/utils.py +++ b/neutron/plugins/embrane/common/utils.py @@ -19,6 +19,7 @@ from heleosapi import info as h_info +from neutron.db import models_v2 from neutron.openstack.common import log as logging LOG = logging.getLogger(__name__) @@ -31,7 +32,12 @@ def set_db_item_state(context, neutron_item, new_state): context.session.merge(neutron_item) -def retrieve_ip_allocation_info(l2_plugin, context, neutron_port): +def retrieve_subnet(context, subnet_id): + return (context.session.query( + models_v2.Subnet).filter(models_v2.Subnet.id == subnet_id).one()) + + +def retrieve_ip_allocation_info(context, neutron_port): """Retrieves ip allocation info for a specific port if any.""" try: @@ -39,7 +45,7 @@ def retrieve_ip_allocation_info(l2_plugin, context, neutron_port): except (KeyError, IndexError): LOG.info(_("No ip allocation set")) return - subnet = l2_plugin._get_subnet(context, subnet_id) + subnet = retrieve_subnet(context, subnet_id) allocated_ip = neutron_port["fixed_ips"][0]["ip_address"] is_gw_port = neutron_port["device_owner"] == "network:router_gateway" gateway_ip = subnet["gateway_ip"] diff --git a/neutron/plugins/embrane/plugins/embrane_ovs_plugin.py b/neutron/plugins/embrane/plugins/embrane_ovs_plugin.py index 1ad936826..d4d5ac180 100644 --- a/neutron/plugins/embrane/plugins/embrane_ovs_plugin.py +++ b/neutron/plugins/embrane/plugins/embrane_ovs_plugin.py @@ -33,5 +33,6 @@ class EmbraneOvsPlugin(base.EmbranePlugin, l2.OVSNeutronPluginV2): def __init__(self): '''First run plugin specific initialization, then Embrane's.''' + self._supported_extension_aliases.remove("l3_agent_scheduler") l2.OVSNeutronPluginV2.__init__(self) self._run_embrane_config() -- 2.45.2