From c7cf2edcefe49a636092ad43a621569c3cefd4a8 Mon Sep 17 00:00:00 2001 From: Salvatore Orlando Date: Fri, 21 Jun 2013 00:20:45 +0200 Subject: [PATCH] Introduce periodic state synchronization with backend Blueprint nicira-plugin-get-improvements With this patch GET operations on the Nicira plugin will not be forwarded anymore to the NVP backend. Resource operational status will be periodically retrieved from the NVP backend using a DynamicLoopingCall. The process has been designed with the aim of avoiding: 1) frequent queries to NVP for retrieving resource status 2) execution of large queries to NVP for retrieving the status of a consistent number of resources. The process can be tuned using a set of configuration variables. GET operations will now return a status which might differ from the actual status of the resource. For retrieving status in a punctual way, the field 'status' should be explicitly specified in the GET request (only 'show' support has been implemented in this patch) This patchs also makes some changes to the fake nvp api client in order to ensure each instance has a private set of dictionaries for fake nvp entities. Change-Id: Ia745b80d2826de32ba8d6883c0d6e0893047e123 --- etc/neutron/plugins/nicira/nvp.ini | 33 + neutron/plugins/nicira/NeutronPlugin.py | 356 ++--------- neutron/plugins/nicira/common/config.py | 21 +- neutron/plugins/nicira/common/sync.py | 596 ++++++++++++++++++ neutron/plugins/nicira/nvplib.py | 34 +- .../unit/nicira/etc/fake_get_lrouter.json | 2 +- .../unit/nicira/etc/fake_get_lswitch.json | 2 +- .../nicira/etc/fake_get_lswitch_lport.json | 3 +- .../tests/unit/nicira/fake_nvpapiclient.py | 57 +- .../tests/unit/nicira/test_agent_scheduler.py | 5 + neutron/tests/unit/nicira/test_maclearning.py | 5 + .../tests/unit/nicira/test_nicira_plugin.py | 56 +- neutron/tests/unit/nicira/test_nvp_sync.py | 587 +++++++++++++++++ neutron/tests/unit/nicira/test_nvpopts.py | 10 + neutron/tests/unit/test_db_plugin.py | 12 +- 15 files changed, 1395 insertions(+), 384 deletions(-) create mode 100644 neutron/plugins/nicira/common/sync.py create mode 100644 neutron/tests/unit/nicira/test_nvp_sync.py diff --git a/etc/neutron/plugins/nicira/nvp.ini b/etc/neutron/plugins/nicira/nvp.ini index a4d59737f..a5e0059b9 100644 --- a/etc/neutron/plugins/nicira/nvp.ini +++ b/etc/neutron/plugins/nicira/nvp.ini @@ -101,3 +101,36 @@ # the RPC agents to operate. When 'agentless' is chosen, the config option metadata_mode # becomes ineffective. The mode 'agentless' is not supported for NVP 3.2 or below. # agent_mode = agent + +[nvp_sync] +# Interval in seconds between runs of the status synchronization task. +# The plugin will aim at resynchronizing operational status for all +# resources in this interval, and it should be therefore large enough +# to ensure the task is feasible. Otherwise the plugin will be +# constantly synchronizing resource status, ie: a new task is started +# as soon as the previous is completed. +# If this value is set to 0, the state synchronization thread for this +# Neutron instance will be disabled. +# state_sync_interval = 120 + +# Random additional delay between two runs of the state synchronization task. +# An additional wait time between 0 and max_random_sync_delay seconds +# will be added on top of state_sync_interval. +# max_random_sync_delay = 0 + +# Minimum delay, in seconds, between two status synchronization requests for NVP. +# Depending on chunk size, controller load, and other factors, state +# synchronization requests might be pretty heavy. This means the +# controller might take time to respond, and its load might be quite +# increased by them. This parameter allows to specify a minimum +# interval between two subsequent requests. +# The value for this parameter must never exceed state_sync_interval. +# If this does, an error will be raised at startup. +# min_sync_req_delay = 10 + +# Minimum number of resources to be retrieved from NVP in a single status +# synchronization request. +# The actual size of the chunk will increase if the number of resources is such +# that using the minimum chunk size will cause the interval between two +# requests to be less than min_sync_req_delay +# min_chunk_size = 500 diff --git a/neutron/plugins/nicira/NeutronPlugin.py b/neutron/plugins/nicira/NeutronPlugin.py index 5b2b19814..f3c1d1eae 100644 --- a/neutron/plugins/nicira/NeutronPlugin.py +++ b/neutron/plugins/nicira/NeutronPlugin.py @@ -20,7 +20,6 @@ # @author: Aaron Rosen, Nicira Networks, Inc. -import hashlib import logging import os @@ -56,6 +55,7 @@ from neutron.openstack.common import excutils from neutron.plugins.nicira.common import config from neutron.plugins.nicira.common import exceptions as nvp_exc from neutron.plugins.nicira.common import securitygroups as nvp_sec +from neutron.plugins.nicira.common import sync from neutron.plugins.nicira.dbexts import distributedrouter as dist_rtr from neutron.plugins.nicira.dbexts import maclearning as mac_db from neutron.plugins.nicira.dbexts import nicira_db @@ -180,6 +180,7 @@ class NvpPluginV2(agentschedulers_db.DhcpAgentSchedulerDbMixin, if not cfg.CONF.api_extensions_path: cfg.CONF.set_override('api_extensions_path', NVP_EXT_PATH) self.nvp_opts = cfg.CONF.NVP + self.nvp_sync_opts = cfg.CONF.NVP_SYNC self.cluster = create_nvp_cluster(cfg.CONF, self.nvp_opts.concurrent_connections, self.nvp_opts.nvp_gen_timeout) @@ -196,6 +197,13 @@ class NvpPluginV2(agentschedulers_db.DhcpAgentSchedulerDbMixin, # Set this flag to false as the default gateway has not # been yet updated from the config file self._is_default_net_gw_in_sync = False + # Create a synchronizer instance for backend sync + self._synchronizer = sync.NvpSynchronizer( + self, self.cluster, + self.nvp_sync_opts.state_sync_interval, + self.nvp_sync_opts.min_sync_req_delay, + self.nvp_sync_opts.min_chunk_size, + self.nvp_sync_opts.max_random_sync_delay) def _ensure_default_network_gateway(self): if self._is_default_net_gw_in_sync: @@ -1049,39 +1057,12 @@ class NvpPluginV2(agentschedulers_db.DhcpAgentSchedulerDbMixin, with context.session.begin(subtransactions=True): # goto to the plugin DB and fetch the network network = self._get_network(context, id) - # if the network is external, do not go to NVP - if not network.external: - # verify the fabric status of the corresponding - # logical switch(es) in nvp - try: - lswitches = nvplib.get_lswitches(self.cluster, id) - nvp_net_status = constants.NET_STATUS_ACTIVE - neutron_status = network.status - for lswitch in lswitches: - relations = lswitch.get('_relations') - if relations: - lswitch_status = relations.get( - 'LogicalSwitchStatus') - # FIXME(salvatore-orlando): Being unable to fetch - # logical switch status should be an exception. - if (lswitch_status and - not lswitch_status.get('fabric_status', - None)): - nvp_net_status = constants.NET_STATUS_DOWN - break - LOG.debug(_("Current network status:%(nvp_net_status)s; " - "Status in Neutron DB:%(neutron_status)s"), - {'nvp_net_status': nvp_net_status, - 'neutron_status': neutron_status}) - if nvp_net_status != network.status: - # update the network status - network.status = nvp_net_status - except q_exc.NotFound: - network.status = constants.NET_STATUS_ERROR - except Exception: - err_msg = _("Unable to get logical switches") - LOG.exception(err_msg) - raise nvp_exc.NvpPluginException(err_msg=err_msg) + if fields and 'status' in fields: + # External networks are not backed by nvp lswitches + if not network.external: + # Perform explicit state synchronization + self._synchronizer.synchronize_network( + context, network) # Don't do field selection here otherwise we won't be able # to add provider networks fields net_result = self._make_network_dict(network) @@ -1090,85 +1071,13 @@ class NvpPluginV2(agentschedulers_db.DhcpAgentSchedulerDbMixin, return self._fields(net_result, fields) def get_networks(self, context, filters=None, fields=None): - nvp_lswitches = {} filters = filters or {} with context.session.begin(subtransactions=True): - neutron_lswitches = ( - super(NvpPluginV2, self).get_networks(context, filters)) - for net in neutron_lswitches: + networks = super(NvpPluginV2, self).get_networks(context, filters) + for net in networks: self._extend_network_dict_provider(context, net) self._extend_network_qos_queue(context, net) - - tenant_ids = filters and filters.get('tenant_id') or None - filter_fmt = "&tag=%s&tag_scope=os_tid" - if context.is_admin and not tenant_ids: - tenant_filter = "" - else: - tenant_ids = tenant_ids or [context.tenant_id] - tenant_filter = ''.join(filter_fmt % tid for tid in tenant_ids) - lswitch_filters = "uuid,display_name,fabric_status,tags" - lswitch_url_path_1 = ( - "/ws.v1/lswitch?fields=%s&relations=LogicalSwitchStatus%s" - % (lswitch_filters, tenant_filter)) - lswitch_url_path_2 = nvplib._build_uri_path( - nvplib.LSWITCH_RESOURCE, - fields=lswitch_filters, - relations='LogicalSwitchStatus', - filters={'tag': 'true', 'tag_scope': 'shared'}) - try: - res = nvplib.get_all_query_pages(lswitch_url_path_1, self.cluster) - nvp_lswitches.update(dict((ls['uuid'], ls) for ls in res)) - # Issue a second query for fetching shared networks. - # We cannot unfortunately use just a single query because tags - # cannot be or-ed - res_shared = nvplib.get_all_query_pages(lswitch_url_path_2, - self.cluster) - nvp_lswitches.update(dict((ls['uuid'], ls) for ls in res_shared)) - except Exception: - err_msg = _("Unable to get logical switches") - LOG.exception(err_msg) - raise nvp_exc.NvpPluginException(err_msg=err_msg) - - if filters.get('id'): - nvp_lswitches = dict( - (uuid, ls) for (uuid, ls) in nvp_lswitches.iteritems() - if uuid in set(filters['id'])) - for neutron_lswitch in neutron_lswitches: - # Skip external networks as they do not exist in NVP - if neutron_lswitch[l3.EXTERNAL]: - continue - elif neutron_lswitch['id'] not in nvp_lswitches: - LOG.warning(_("Logical Switch %s found in neutron database " - "but not in NVP."), neutron_lswitch["id"]) - neutron_lswitch["status"] = constants.NET_STATUS_ERROR - else: - # TODO(salvatore-orlando): be careful about "extended" - # logical switches - ls = nvp_lswitches.pop(neutron_lswitch['id']) - if (ls["_relations"]["LogicalSwitchStatus"]["fabric_status"]): - neutron_lswitch["status"] = constants.NET_STATUS_ACTIVE - else: - neutron_lswitch["status"] = constants.NET_STATUS_DOWN - - # do not make the case in which switches are found in NVP - # but not in Neutron catastrophic. - if nvp_lswitches: - LOG.warning(_("Found %s logical switches not bound " - "to Neutron networks. Neutron and NVP are " - "potentially out of sync"), len(nvp_lswitches)) - - LOG.debug(_("get_networks() completed for tenant %s"), - context.tenant_id) - - if fields: - ret_fields = [] - for neutron_lswitch in neutron_lswitches: - row = {} - for field in fields: - row[field] = neutron_lswitch[field] - ret_fields.append(row) - return ret_fields - return neutron_lswitches + return [self._fields(network, fields) for network in networks] def update_network(self, context, id, network): pnet._raise_if_updates_provider_attributes(network['network']) @@ -1194,105 +1103,10 @@ class NvpPluginV2(agentschedulers_db.DhcpAgentSchedulerDbMixin, def get_ports(self, context, filters=None, fields=None): filters = filters or {} with context.session.begin(subtransactions=True): - neutron_lports = super(NvpPluginV2, self).get_ports( - context, filters) - if (filters.get('network_id') and len(filters.get('network_id')) and - self._network_is_external(context, filters['network_id'][0])): - # Do not perform check on NVP platform - return neutron_lports - - vm_filter = "" - tenant_filter = "" - # This is used when calling delete_network. Neutron checks to see if - # the network has any ports. - if filters.get("network_id"): - # FIXME (Aaron) If we get more than one network_id this won't work - lswitch = filters["network_id"][0] - else: - lswitch = "*" - - if filters.get("device_id"): - for vm_id in filters.get("device_id"): - vm_filter = ("%stag_scope=vm_id&tag=%s&" % (vm_filter, - hashlib.sha1(vm_id).hexdigest())) - else: - vm_id = "" - - if filters.get("tenant_id"): - for tenant in filters.get("tenant_id"): - tenant_filter = ("%stag_scope=os_tid&tag=%s&" % - (tenant_filter, tenant)) - - nvp_lports = {} - - lport_fields_str = ("tags,admin_status_enabled,display_name," - "fabric_status_up") - try: - lport_query_path = ( - "/ws.v1/lswitch/%s/lport?fields=%s&%s%stag_scope=q_port_id" - "&relations=LogicalPortStatus" % - (lswitch, lport_fields_str, vm_filter, tenant_filter)) - - try: - ports = nvplib.get_all_query_pages(lport_query_path, - self.cluster) - except q_exc.NotFound: - LOG.warn(_("Lswitch %s not found in NVP"), lswitch) - ports = None - - if ports: - for port in ports: - for tag in port["tags"]: - if tag["scope"] == "q_port_id": - nvp_lports[tag["tag"]] = port - except Exception: - err_msg = _("Unable to get ports") - LOG.exception(err_msg) - raise nvp_exc.NvpPluginException(err_msg=err_msg) - - lports = [] - for neutron_lport in neutron_lports: - # if a neutron port is not found in NVP, this migth be because - # such port is not mapped to a logical switch - ie: floating ip - if neutron_lport['device_owner'] in (l3_db.DEVICE_OWNER_FLOATINGIP, - l3_db.DEVICE_OWNER_ROUTER_GW): - lports.append(neutron_lport) - continue - try: - neutron_lport["admin_state_up"] = ( - nvp_lports[neutron_lport["id"]]["admin_status_enabled"]) - - if (nvp_lports[neutron_lport["id"]] - ["_relations"] - ["LogicalPortStatus"] - ["fabric_status_up"]): - neutron_lport["status"] = constants.PORT_STATUS_ACTIVE - else: - neutron_lport["status"] = constants.PORT_STATUS_DOWN - - del nvp_lports[neutron_lport["id"]] - except KeyError: - neutron_lport["status"] = constants.PORT_STATUS_ERROR - LOG.debug(_("Neutron logical port %s was not found on NVP"), - neutron_lport['id']) - - lports.append(neutron_lport) - # do not make the case in which ports are found in NVP - # but not in Neutron catastrophic. - if nvp_lports: - LOG.warning(_("Found %s logical ports not bound " - "to Neutron ports. Neutron and NVP are " - "potentially out of sync"), len(nvp_lports)) - - if fields: - ret_fields = [] - for lport in lports: - row = {} - for field in fields: - row[field] = lport[field] - ret_fields.append(row) - return ret_fields - return lports + ports = super(NvpPluginV2, self).get_ports(context, filters) + for port in ports: + self._extend_port_qos_queue(context, port) + return [self._fields(port, fields) for port in ports] def create_port(self, context, port): # If PORTSECURITY is not the default value ATTR_NOT_SPECIFIED @@ -1504,43 +1318,26 @@ class NvpPluginV2(agentschedulers_db.DhcpAgentSchedulerDbMixin, def get_port(self, context, id, fields=None): with context.session.begin(subtransactions=True): - neutron_db_port = super(NvpPluginV2, self).get_port(context, - id, fields) - self._extend_port_qos_queue(context, neutron_db_port) - - if self._network_is_external(context, - neutron_db_port['network_id']): - return neutron_db_port - nvp_id = self._nvp_get_port_id(context, self.cluster, - neutron_db_port) - # If there's no nvp IP do not bother going to NVP and put - # the port in error state - if nvp_id: - # Find the NVP port corresponding to neutron port_id - # Do not query by nvp id as the port might be on - # an extended switch and we do not store the extended - # switch uuid - results = nvplib.query_lswitch_lports( - self.cluster, '*', - relations='LogicalPortStatus', - filters={'tag': id, 'tag_scope': 'q_port_id'}) - if results: - port = results[0] - port_status = port["_relations"]["LogicalPortStatus"] - neutron_db_port["admin_state_up"] = ( - port["admin_status_enabled"]) - if port_status["fabric_status_up"]: - neutron_db_port["status"] = ( - constants.PORT_STATUS_ACTIVE) - else: - neutron_db_port["status"] = ( - constants.PORT_STATUS_DOWN) - else: - neutron_db_port["status"] = ( - constants.PORT_STATUS_ERROR) + if fields and 'status' in fields: + # Perform explicit state synchronization + db_port = self._get_port(context, id) + self._synchronizer.synchronize_port( + context, db_port) + port = self._make_port_dict(db_port, fields) else: - neutron_db_port["status"] = constants.PORT_STATUS_ERROR - return neutron_db_port + port = super(NvpPluginV2, self).get_port(context, id, fields) + self._extend_port_qos_queue(context, port) + return port + + def get_router(self, context, id, fields=None): + if fields and 'status' in fields: + db_router = self._get_router(context, id) + # Perform explicit state synchronization + self._synchronizer.synchronize_router( + context, db_router) + return self._make_router_dict(db_router, fields) + else: + return super(NvpPluginV2, self).get_router(context, id, fields) def create_router(self, context, router): # NOTE(salvatore-orlando): We completely override this method in @@ -1713,77 +1510,6 @@ class NvpPluginV2(agentschedulers_db.DhcpAgentSchedulerDbMixin, err_msg=(_("Unable to delete logical router '%s'" "on NVP Platform") % router_id)) - def get_router(self, context, id, fields=None): - router = self._get_router(context, id) - try: - lrouter = nvplib.get_lrouter(self.cluster, id) - relations = lrouter.get('_relations') - if relations: - lrouter_status = relations.get('LogicalRouterStatus') - # FIXME(salvatore-orlando): Being unable to fetch the - # logical router status should be an exception. - if lrouter_status: - router_op_status = (lrouter_status.get('fabric_status') - and constants.NET_STATUS_ACTIVE or - constants.NET_STATUS_DOWN) - except q_exc.NotFound: - lrouter = {} - router_op_status = constants.NET_STATUS_ERROR - if router_op_status != router.status: - LOG.debug(_("Current router status:%(router_status)s;" - "Status in Neutron DB:%(db_router_status)s"), - {'router_status': router_op_status, - 'db_router_status': router.status}) - # update the router status - with context.session.begin(subtransactions=True): - router.status = router_op_status - return self._make_router_dict(router, fields) - - def get_routers(self, context, filters=None, fields=None): - router_query = self._apply_filters_to_query( - self._model_query(context, l3_db.Router), - l3_db.Router, filters) - routers = router_query.all() - # Query routers on NVP for updating operational status - if context.is_admin and not filters.get("tenant_id"): - tenant_id = None - elif 'tenant_id' in filters: - tenant_id = filters.get('tenant_id')[0] - del filters['tenant_id'] - else: - tenant_id = context.tenant_id - try: - nvp_lrouters = nvplib.get_lrouters(self.cluster, - tenant_id, - fields) - except NvpApiClient.NvpApiException: - err_msg = _("Unable to get logical routers from NVP controller") - LOG.exception(err_msg) - raise nvp_exc.NvpPluginException(err_msg=err_msg) - - nvp_lrouters_dict = {} - for nvp_lrouter in nvp_lrouters: - nvp_lrouters_dict[nvp_lrouter['uuid']] = nvp_lrouter - for router in routers: - nvp_lrouter = nvp_lrouters_dict.get(router['id']) - if nvp_lrouter: - if (nvp_lrouter["_relations"]["LogicalRouterStatus"] - ["fabric_status"]): - router.status = constants.NET_STATUS_ACTIVE - else: - router.status = constants.NET_STATUS_DOWN - nvp_lrouters.remove(nvp_lrouter) - else: - router.status = constants.NET_STATUS_ERROR - - # do not make the case in which routers are found in NVP - # but not in Neutron catastrophic. - if nvp_lrouters: - LOG.warning(_("Found %s logical routers not bound " - "to Neutron routers. Neutron and NVP are " - "potentially out of sync"), len(nvp_lrouters)) - return [self._make_router_dict(router, fields) for router in routers] - def add_router_interface(self, context, router_id, interface_info): # When adding interface by port_id we need to create the # peer port on the nvp logical router in this routine diff --git a/neutron/plugins/nicira/common/config.py b/neutron/plugins/nicira/common/config.py index 8f0c08d1c..88c9ea348 100644 --- a/neutron/plugins/nicira/common/config.py +++ b/neutron/plugins/nicira/common/config.py @@ -53,7 +53,24 @@ nvp_opts = [ help=_("The default network tranport type to use (stt, gre, " "bridge, ipsec_gre, or ipsec_stt)")), cfg.StrOpt('agent_mode', default=AgentModes.AGENT, - help=_("The mode used to implement DHCP/metadata services.")), + help=_("The mode used to implement DHCP/metadata services.")) +] + +sync_opts = [ + cfg.IntOpt('state_sync_interval', default=120, + help=_("Interval in seconds between runs of the state " + "synchronization task. Set it to 0 to disable it")), + cfg.IntOpt('max_random_sync_delay', default=0, + help=_("Maximum value for the additional random " + "delay in seconds between runs of the state " + "synchronization task")), + cfg.IntOpt('min_sync_req_delay', default=10, + help=_('Minimum delay, in seconds, between two state ' + 'synchronization queries to NVP. It must not ' + 'exceed state_sync_interval')), + cfg.IntOpt('min_chunk_size', default=500, + help=_('Minimum number of resources to be retrieved from NVP ' + 'during state synchronization')) ] connection_opts = [ @@ -107,6 +124,8 @@ cluster_opts = [ cfg.CONF.register_opts(connection_opts) cfg.CONF.register_opts(cluster_opts) cfg.CONF.register_opts(nvp_opts, "NVP") +cfg.CONF.register_opts(sync_opts, "NVP_SYNC") + # NOTE(armando-migliaccio): keep the following code until we support # NVP configuration files in older format (Grizzly or older). # ### BEGIN diff --git a/neutron/plugins/nicira/common/sync.py b/neutron/plugins/nicira/common/sync.py new file mode 100644 index 000000000..a321d6941 --- /dev/null +++ b/neutron/plugins/nicira/common/sync.py @@ -0,0 +1,596 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Nicira, Inc. +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import random + +from neutron.common import constants +from neutron.common import exceptions +from neutron import context +from neutron.db import l3_db +from neutron.db import models_v2 +from neutron.openstack.common import jsonutils +from neutron.openstack.common import log +from neutron.openstack.common import loopingcall +from neutron.openstack.common import timeutils +from neutron.plugins.nicira.common import exceptions as nvp_exc +from neutron.plugins.nicira import NvpApiClient +from neutron.plugins.nicira import nvplib + +LOG = log.getLogger(__name__) + + +class NvpCache(object): + """A simple Cache for NVP resources. + + Associates resource id with resource hash to rapidly identify + updated resources. + Each entry in the cache also stores the following information: + - changed: the resource in the cache has been altered following + an update or a delete + - hit: the resource has been visited during an update (and possibly + left unchanged) + - data: current resource data + - data_bk: backup of resource data prior to its removal + """ + + def __init__(self): + # Maps a uuid to the dict containing it + self._uuid_dict_mappings = {} + # Dicts for NVP cached resources + self._lswitches = {} + self._lswitchports = {} + self._lrouters = {} + + def __getitem__(self, key): + # uuids are unique across the various types of resources + # TODO(salv-orlando): Avoid lookups over all dictionaries + # when retrieving items + # Fetch lswitches, lports, or lrouters + resources = self._uuid_dict_mappings[key] + return resources[key] + + def _update_resources(self, resources, new_resources): + # Clear the 'changed' attribute for all items + for uuid, item in resources.items(): + if item.pop('changed', None) and not item.get('data'): + # The item is not anymore in NVP, so delete it + del resources[uuid] + del self._uuid_dict_mappings[uuid] + + def do_hash(item): + return hash(jsonutils.dumps(item)) + + # Parse new data and identify new, deleted, and updated resources + for item in new_resources: + item_id = item['uuid'] + if resources.get(item_id): + new_hash = do_hash(item) + if new_hash != resources[item_id]['hash']: + resources[item_id]['hash'] = new_hash + resources[item_id]['changed'] = True + resources[item_id]['data_bk'] = ( + resources[item_id]['data']) + resources[item_id]['data'] = item + # Mark the item as hit in any case + resources[item_id]['hit'] = True + else: + resources[item_id] = {'hash': do_hash(item)} + resources[item_id]['hit'] = True + resources[item_id]['changed'] = True + resources[item_id]['data'] = item + # add a uuid to dict mapping for easy retrieval + # with __getitem__ + self._uuid_dict_mappings[item_id] = resources + + def _delete_resources(self, resources): + # Mark for removal all the elements which have not been visited. + # And clear the 'hit' attribute. + for to_delete in [k for (k, v) in resources.iteritems() + if not v.pop('hit', False)]: + resources[to_delete]['changed'] = True + resources[to_delete]['data_bk'] = ( + resources[to_delete].pop('data', None)) + + def _get_resource_ids(self, resources, changed_only): + if changed_only: + return [k for (k, v) in resources.iteritems() + if v.get('changed')] + return resources.keys() + + def get_lswitches(self, changed_only=False): + return self._get_resource_ids(self._lswitches, changed_only) + + def get_lrouters(self, changed_only=False): + return self._get_resource_ids(self._lrouters, changed_only) + + def get_lswitchports(self, changed_only=False): + return self._get_resource_ids(self._lswitchports, changed_only) + + def update_lswitch(self, lswitch): + self._update_resources(self._lswitches, [lswitch]) + + def update_lrouter(self, lrouter): + self._update_resources(self._lrouters, [lrouter]) + + def update_lswitchport(self, lswitchport): + self._update_resources(self._lswitchports, [lswitchport]) + + def process_updates(self, lswitches=None, + lrouters=None, lswitchports=None): + self._update_resources(self._lswitches, lswitches) + self._update_resources(self._lrouters, lrouters) + self._update_resources(self._lswitchports, lswitchports) + return (self._get_resource_ids(self._lswitches, changed_only=True), + self._get_resource_ids(self._lrouters, changed_only=True), + self._get_resource_ids(self._lswitchports, changed_only=True)) + + def process_deletes(self): + self._delete_resources(self._lswitches) + self._delete_resources(self._lrouters) + self._delete_resources(self._lswitchports) + return (self._get_resource_ids(self._lswitches, changed_only=True), + self._get_resource_ids(self._lrouters, changed_only=True), + self._get_resource_ids(self._lswitchports, changed_only=True)) + + +class SyncParameters(): + """Defines attributes used by the synchronization procedure. + + chunk_size: Actual chunk size + extra_chunk_size: Additional data to fetch because of chunk size + adjustment + current_chunk: Counter of the current data chunk being synchronized + Page cursors: markers for the next resource to fetch. + 'start' means page cursor unset for fetching 1st page + init_sync_performed: True if the initial synchronization concluded + """ + + def __init__(self, min_chunk_size): + self.chunk_size = min_chunk_size + self.extra_chunk_size = 0 + self.current_chunk = 0 + self.ls_cursor = 'start' + self.lr_cursor = 'start' + self.lp_cursor = 'start' + self.init_sync_performed = False + self.total_size = 0 + + +def _start_loopingcall(min_chunk_size, state_sync_interval, func): + """Start a loopingcall for the synchronization task.""" + # Start a looping call to synchronize operational status + # for neutron resources + if not state_sync_interval: + # do not start the looping call if specified + # sync interval is 0 + return + state_synchronizer = loopingcall.DynamicLoopingCall( + func, sp=SyncParameters(min_chunk_size)) + state_synchronizer.start( + periodic_interval_max=state_sync_interval) + + +class NvpSynchronizer(): + + LS_URI = nvplib._build_uri_path( + nvplib.LSWITCH_RESOURCE, fields='uuid,tags,fabric_status', + relations='LogicalSwitchStatus') + LR_URI = nvplib._build_uri_path( + nvplib.LROUTER_RESOURCE, fields='uuid,tags,fabric_status', + relations='LogicalRouterStatus') + LP_URI = nvplib._build_uri_path( + nvplib.LSWITCHPORT_RESOURCE, + parent_resource_id='*', + fields='uuid,tags,fabric_status,link_status_up', + relations='LogicalPortStatus') + + def __init__(self, plugin, cluster, state_sync_interval, + req_delay, min_chunk_size, max_rand_delay=0): + random.seed() + self._nvp_cache = NvpCache() + # Store parameters as instance members + # NOTE(salv-orlando): apologies if it looks java-ish + self._plugin = plugin + self._cluster = cluster + self._req_delay = req_delay + self._sync_interval = state_sync_interval + self._max_rand_delay = max_rand_delay + # Validate parameters + if self._sync_interval < self._req_delay: + err_msg = (_("Minimum request delay:%(req_delay)s must not " + "exceed synchronization interval:%(sync_interval)s") % + {'req_delay': self._req_delay, + 'sync_interval': self._sync_interval}) + LOG.error(err_msg) + raise nvp_exc.NvpPluginException(err_msg=err_msg) + # Backoff time in case of failures while fetching sync data + self._sync_backoff = 1 + _start_loopingcall(min_chunk_size, state_sync_interval, + self._synchronize_state) + + def _get_tag_dict(self, tags): + return dict((tag.get('scope'), tag['tag']) for tag in tags) + + def _update_neutron_object(self, context, neutron_data, status): + if status == neutron_data['status']: + # do nothing + return + with context.session.begin(subtransactions=True): + LOG.debug(_("Updating status for neutron resource %(q_id)s to: " + "%(status)s"), {'q_id': neutron_data['id'], + 'status': status}) + neutron_data['status'] = status + context.session.add(neutron_data) + + def synchronize_network(self, context, neutron_network_data, + lswitches=None): + """Synchronize a Neutron network with its NVP counterpart. + + This routine synchronizes a set of switches when a Neutron + network is mapped to multiple lswitches. + """ + if not lswitches: + # Try to get logical switches from nvp + try: + lswitches = nvplib.get_lswitches( + self._cluster, neutron_network_data['id']) + except exceptions.NetworkNotFound: + # TODO(salv-orlando): We should be catching + # NvpApiClient.ResourceNotFound here + # The logical switch was not found + LOG.warning(_("Logical switch for neutron network %s not " + "found on NVP."), neutron_network_data['id']) + lswitches = [] + else: + for lswitch in lswitches: + self._nvp_cache.update_lswitch(lswitch) + # By default assume things go wrong + status = constants.NET_STATUS_ERROR + # In most cases lswitches will contain a single element + for ls in lswitches: + if not ls: + # Logical switch was deleted + break + ls_status = ls['_relations']['LogicalSwitchStatus'] + if not ls_status['fabric_status']: + status = constants.NET_STATUS_DOWN + break + else: + # No switch was down or missing. Set status to ACTIVE unless + # there were no switches in the first place! + if lswitches: + status = constants.NET_STATUS_ACTIVE + # Update db object + self._update_neutron_object(context, neutron_network_data, status) + + def _synchronize_lswitches(self, ctx, ls_uuids, scan_missing=False): + if not ls_uuids and not scan_missing: + return + neutron_net_ids = set() + neutron_nvp_mappings = {} + # TODO(salvatore-orlando): Deal with the case the tag + # has been tampered with + for ls_uuid in ls_uuids: + # If the lswitch has been deleted, get backup copy of data + lswitch = (self._nvp_cache[ls_uuid].get('data') or + self._nvp_cache[ls_uuid].get('data_bk')) + tags = self._get_tag_dict(lswitch['tags']) + neutron_id = tags.get('neutron_net_id', ls_uuid) + neutron_net_ids.add(neutron_id) + neutron_nvp_mappings[neutron_id] = ( + neutron_nvp_mappings.get(neutron_id, []) + + [self._nvp_cache[ls_uuid]]) + with ctx.session.begin(subtransactions=True): + # Fetch neutron networks from database + filters = {'router:external': [False]} + if not scan_missing: + filters['id'] = neutron_net_ids + # TODO(salv-orlando): Filter out external networks + for network in self._plugin._get_collection_query( + ctx, models_v2.Network, filters=filters): + lswitches = neutron_nvp_mappings.get(network['id'], []) + lswitches = [lswitch.get('data') for lswitch in lswitches] + self.synchronize_network(ctx, network, lswitches) + + def synchronize_router(self, context, neutron_router_data, + lrouter=None): + """Synchronize a neutron router with its NVP counterpart.""" + if not lrouter: + # Try to get router from nvp + try: + # This query will return the logical router status too + lrouter = nvplib.get_lrouter( + self._cluster, neutron_router_data['id']) + except exceptions.NotFound: + # NOTE(salv-orlando): We should be catching + # NvpApiClient.ResourceNotFound here + # The logical router was not found + LOG.warning(_("Logical router for neutron router %s not " + "found on NVP."), neutron_router_data['id']) + lrouter = None + else: + # Update the cache + self._nvp_cache.update_lrouter(lrouter) + + # Note(salv-orlando): It might worth adding a check to verify neutron + # resource tag in nvp entity matches a Neutron id. + # By default assume things go wrong + status = constants.NET_STATUS_ERROR + if lrouter: + lr_status = (lrouter['_relations'] + ['LogicalRouterStatus'] + ['fabric_status']) + status = (lr_status and + constants.NET_STATUS_ACTIVE + or constants.NET_STATUS_DOWN) + # Update db object + self._update_neutron_object(context, neutron_router_data, status) + + def _synchronize_lrouters(self, ctx, lr_uuids, scan_missing=False): + if not lr_uuids and not scan_missing: + return + neutron_router_mappings = ( + dict((lr_uuid, self._nvp_cache[lr_uuid]) for lr_uuid in lr_uuids)) + with ctx.session.begin(subtransactions=True): + # Fetch neutron routers from database + filters = ({} if scan_missing else + {'id': neutron_router_mappings.keys()}) + for router in self._plugin._get_collection_query( + ctx, l3_db.Router, filters=filters): + lrouter = neutron_router_mappings.get(router['id']) + self.synchronize_router( + ctx, router, lrouter and lrouter.get('data')) + + def synchronize_port(self, context, neutron_port_data, + lswitchport=None, ext_networks=None): + """Synchronize a Neutron port with its NVP counterpart.""" + # Skip synchronization for ports on external networks + if not ext_networks: + ext_networks = [net['id'] for net in context.session.query( + models_v2.Network).join( + l3_db.ExternalNetwork, + (models_v2.Network.id == + l3_db.ExternalNetwork.network_id))] + if neutron_port_data['network_id'] in ext_networks: + with context.session.begin(subtransactions=True): + neutron_port_data['status'] = constants.PORT_STATUS_ACTIVE + return + + if not lswitchport: + # Try to get port from nvp + try: + lp_uuid = self._plugin._nvp_get_port_id( + context, self._cluster, neutron_port_data) + if lp_uuid: + lswitchport = nvplib.get_port( + self._cluster, neutron_port_data['network_id'], + lp_uuid, relations='LogicalPortStatus') + except exceptions.PortNotFoundOnNetwork: + # NOTE(salv-orlando): We should be catching + # NvpApiClient.ResourceNotFound here + # The logical switch port was not found + LOG.warning(_("Logical switch port for neutron port %s " + "not found on NVP."), neutron_port_data['id']) + lswitchport = None + else: + # Update the cache + self._nvp_cache.update_lswitchport(lswitchport) + + # Note(salv-orlando): It might worth adding a check to verify neutron + # resource tag in nvp entity matches Neutron id. + # By default assume things go wrong + status = constants.PORT_STATUS_ERROR + if lswitchport: + lp_status = (lswitchport['_relations'] + ['LogicalPortStatus'] + ['link_status_up']) + status = (lp_status and + constants.PORT_STATUS_ACTIVE + or constants.PORT_STATUS_DOWN) + # Update db object + self._update_neutron_object(context, neutron_port_data, status) + + def _synchronize_lswitchports(self, ctx, lp_uuids, scan_missing=False): + if not lp_uuids and not scan_missing: + return + # Find Neutron port id by tag - the tag is already + # loaded in memory, no reason for doing a db query + # TODO(salvatore-orlando): Deal with the case the tag + # has been tampered with + neutron_port_mappings = {} + for lp_uuid in lp_uuids: + lport = (self._nvp_cache[lp_uuid].get('data') or + self._nvp_cache[lp_uuid].get('data_bk')) + tags = self._get_tag_dict(lport['tags']) + neutron_port_id = tags.get('q_port_id') + if neutron_port_id: + neutron_port_mappings[neutron_port_id] = ( + self._nvp_cache[lp_uuid]) + with ctx.session.begin(subtransactions=True): + # Fetch neutron ports from database + # At the first sync we need to fetch all ports + filters = ({} if scan_missing else + {'id': neutron_port_mappings.keys()}) + # TODO(salv-orlando): Work out a solution for avoiding + # this query + ext_nets = [net['id'] for net in ctx.session.query( + models_v2.Network).join( + l3_db.ExternalNetwork, + (models_v2.Network.id == + l3_db.ExternalNetwork.network_id))] + for port in self._plugin._get_collection_query( + ctx, models_v2.Port, filters=filters): + lswitchport = neutron_port_mappings.get(port['id']) + self.synchronize_port( + ctx, port, lswitchport and lswitchport.get('data'), + ext_networks=ext_nets) + + def _get_chunk_size(self, sp): + # NOTE(salv-orlando): Try to use __future__ for this routine only? + ratio = ((float(sp.total_size) / float(sp.chunk_size)) / + (float(self._sync_interval) / float(self._req_delay))) + new_size = max(1.0, ratio) * float(sp.chunk_size) + return int(new_size) + (new_size - int(new_size) > 0) + + def _fetch_data(self, uri, cursor, page_size): + # If not cursor there is nothing to retrieve + if cursor: + if cursor == 'start': + cursor = None + # Chunk size tuning might, in some conditions, make it larger + # than 5,000, which is the maximum page size allowed by the NVP + # API. In this case the request should be split in multiple + # requests. This is not ideal, and therefore a log warning will + # be emitted. + requests = range(0, page_size / (nvplib.MAX_PAGE_SIZE + 1) + 1) + if len(requests) > 1: + LOG.warn(_("Requested page size is %(cur_chunk_size)d." + "It might be necessary to do %(num_requests)d " + "round-trips to NVP for fetching data. Please " + "tune sync parameters to ensure chunk size " + "is less than %(max_page_size)d"), + {'cur_chunk_size': page_size, + 'num_requests': len(requests), + 'max_page_size': nvplib.MAX_PAGE_SIZE}) + results = [] + actual_size = 0 + for _req in requests: + req_results, cursor, req_size = nvplib.get_single_query_page( + uri, self._cluster, cursor, + min(page_size, nvplib.MAX_PAGE_SIZE)) + results.extend(req_results) + actual_size = actual_size + req_size + # If no cursor is returned break the cycle as there is no + # actual need to perform multiple requests (all fetched) + # This happens when the overall size of resources exceeds + # the maximum page size, but the number for each single + # resource type is below this threshold + if not cursor: + break + # reset cursor before returning if we queried just to + # know the number of entities + return results, cursor if page_size else 'start', actual_size + return [], cursor, None + + def _fetch_nvp_data_chunk(self, sp): + base_chunk_size = sp.chunk_size + chunk_size = base_chunk_size + sp.extra_chunk_size + LOG.info(_("Fetching up to %s resources " + "from NVP backend"), chunk_size) + fetched = ls_count = lr_count = lp_count = 0 + lswitches = lrouters = lswitchports = [] + if sp.ls_cursor or sp.ls_cursor == 'start': + (lswitches, sp.ls_cursor, ls_count) = self._fetch_data( + self.LS_URI, sp.ls_cursor, chunk_size) + fetched = len(lswitches) + if fetched < chunk_size and sp.lr_cursor or sp.lr_cursor == 'start': + (lrouters, sp.lr_cursor, lr_count) = self._fetch_data( + self.LR_URI, sp.lr_cursor, max(chunk_size - fetched, 0)) + fetched += len(lrouters) + if fetched < chunk_size and sp.lp_cursor or sp.lp_cursor == 'start': + (lswitchports, sp.lp_cursor, lp_count) = self._fetch_data( + self.LP_URI, sp.lp_cursor, max(chunk_size - fetched, 0)) + fetched += len(lswitchports) + if sp.current_chunk == 0: + # No cursors were provided. Then it must be possible to + # calculate the total amount of data to fetch + sp.total_size = ls_count + lr_count + lp_count + LOG.debug(_("Total data size: %d"), sp.total_size) + sp.chunk_size = self._get_chunk_size(sp) + # Calculate chunk size adjustment + sp.extra_chunk_size = sp.chunk_size - base_chunk_size + LOG.debug(_("Fetched %(num_lswitches)d logical switches, " + "%(num_lswitchports)d logical switch ports," + "%(num_lrouters)d logical routers"), + {'num_lswitches': len(lswitches), + 'num_lswitchports': len(lswitchports), + 'num_lrouters': len(lrouters)}) + return (lswitches, lrouters, lswitchports) + + def _synchronize_state(self, sp): + # If the plugin has been destroyed, stop the LoopingCall + if not self._plugin: + raise loopingcall.LoopingCallDone + start = timeutils.utcnow() + # Reset page cursor variables if necessary + if sp.current_chunk == 0: + sp.ls_cursor = sp.lr_cursor = sp.lp_cursor = 'start' + LOG.info(_("Running state synchronization task. Chunk: %s"), + sp.current_chunk) + # Fetch chunk_size data from NVP + try: + (lswitches, lrouters, lswitchports) = ( + self._fetch_nvp_data_chunk(sp)) + except (NvpApiClient.RequestTimeout, NvpApiClient.NvpApiException): + sleep_interval = self._sync_backoff + # Cap max back off to 64 seconds + self._sync_backoff = min(self._sync_backoff * 2, 64) + LOG.exception(_("An error occured while communicating with " + "NVP backend. Will retry synchronization " + "in %d seconds"), sleep_interval) + return sleep_interval + LOG.debug(_("Time elapsed querying NVP: %s"), + timeutils.utcnow() - start) + if sp.total_size: + num_chunks = ((sp.total_size / sp.chunk_size) + + (sp.total_size % sp.chunk_size != 0)) + else: + num_chunks = 1 + LOG.debug(_("Number of chunks: %d"), num_chunks) + # Find objects which have changed on NVP side and need + # to be synchronized + (ls_uuids, lr_uuids, lp_uuids) = self._nvp_cache.process_updates( + lswitches, lrouters, lswitchports) + # Process removed objects only at the last chunk + scan_missing = (sp.current_chunk == num_chunks - 1 and + not sp.init_sync_performed) + if sp.current_chunk == num_chunks - 1: + self._nvp_cache.process_deletes() + ls_uuids = self._nvp_cache.get_lswitches( + changed_only=not scan_missing) + lr_uuids = self._nvp_cache.get_lrouters( + changed_only=not scan_missing) + lp_uuids = self._nvp_cache.get_lswitchports( + changed_only=not scan_missing) + LOG.debug(_("Time elapsed hashing data: %s"), + timeutils.utcnow() - start) + # Get an admin context + ctx = context.get_admin_context() + # Synchronize with database + with ctx.session.begin(subtransactions=True): + self._synchronize_lswitches(ctx, ls_uuids, + scan_missing=scan_missing) + self._synchronize_lrouters(ctx, lr_uuids, + scan_missing=scan_missing) + self._synchronize_lswitchports(ctx, lp_uuids, + scan_missing=scan_missing) + # Increase chunk counter + LOG.info(_("Synchronization for chunk %(chunk_num)d of " + "%(total_chunks)d performed"), + {'chunk_num': sp.current_chunk + 1, + 'total_chunks': num_chunks}) + sp.current_chunk = (sp.current_chunk + 1) % num_chunks + added_delay = 0 + if sp.current_chunk == 0: + # Ensure init_sync_performed is True + if not sp.init_sync_performed: + sp.init_sync_performed = True + # Add additional random delay + added_delay = random.randint(0, self._max_rand_delay) + LOG.debug(_("Time elapsed at end of sync: %s"), + timeutils.utcnow() - start) + return self._sync_interval / num_chunks + added_delay diff --git a/neutron/plugins/nicira/nvplib.py b/neutron/plugins/nicira/nvplib.py index 2b0e6e5b4..646a21490 100644 --- a/neutron/plugins/nicira/nvplib.py +++ b/neutron/plugins/nicira/nvplib.py @@ -66,7 +66,10 @@ SNAT_KEYS = ["to_src_port_min", "to_src_port_max", "to_src_ip_min", "to_src_ip_max"] DNAT_KEYS = ["to_dst_port", "to_dst_ip_min", "to_dst_ip_max"] - +# Maximum page size for a single request +# NOTE(salv-orlando): This might become a version-dependent map should the +# limit be raised in future versions +MAX_PAGE_SIZE = 5000 # TODO(bgh): it would be more efficient to use a bitmap taken_context_ids = [] @@ -157,21 +160,34 @@ def get_cluster_version(cluster): return version +def get_single_query_page(path, cluster, page_cursor=None, + page_length=1000, neutron_only=True): + params = [] + if page_cursor: + params.append("_page_cursor=%s" % page_cursor) + params.append("_page_length=%s" % page_length) + # NOTE(salv-orlando): On the NVP backend the 'Quantum' tag is still + # used for marking Neutron entities in order to preserve compatibility + if neutron_only: + params.append("tag_scope=quantum") + query_params = "&".join(params) + path = "%s%s%s" % (path, "&" if (path.find("?") != -1) else "?", + query_params) + body = do_request(HTTP_GET, path, cluster=cluster) + # Result_count won't be returned if _page_cursor is supplied + return body['results'], body.get('page_cursor'), body.get('result_count') + + def get_all_query_pages(path, c): need_more_results = True result_list = [] page_cursor = None - query_marker = "&" if (path.find("?") != -1) else "?" while need_more_results: - page_cursor_str = ( - "_page_cursor=%s" % page_cursor if page_cursor else "") - body = do_request(HTTP_GET, - "%s%s%s" % (path, query_marker, page_cursor_str), - cluster=c) - page_cursor = body.get('page_cursor') + results, page_cursor = get_single_query_page( + path, c, page_cursor)[:2] if not page_cursor: need_more_results = False - result_list.extend(body['results']) + result_list.extend(results) return result_list diff --git a/neutron/tests/unit/nicira/etc/fake_get_lrouter.json b/neutron/tests/unit/nicira/etc/fake_get_lrouter.json index 9bda5b476..9425ad654 100644 --- a/neutron/tests/unit/nicira/etc/fake_get_lrouter.json +++ b/neutron/tests/unit/nicira/etc/fake_get_lrouter.json @@ -19,7 +19,7 @@ "lport_admin_up_count": %(lport_count)d, "_schema": "/ws.v1/schema/LogicalRouterStatus", "lport_count": %(lport_count)d, - "fabric_status": true, + "fabric_status": %(status)s, "type": "LogicalRouterStatus", "lport_link_up_count": %(lport_count)d } diff --git a/neutron/tests/unit/nicira/etc/fake_get_lswitch.json b/neutron/tests/unit/nicira/etc/fake_get_lswitch.json index 58b132b30..a55d508c7 100644 --- a/neutron/tests/unit/nicira/etc/fake_get_lswitch.json +++ b/neutron/tests/unit/nicira/etc/fake_get_lswitch.json @@ -2,7 +2,7 @@ "_href": "/ws.v1/lswitch/%(uuid)s", "_schema": "/ws.v1/schema/LogicalSwitchConfig", "_relations": {"LogicalSwitchStatus": - {"fabric_status": true, + {"fabric_status": %(status)s, "type": "LogicalSwitchStatus", "lport_count": %(lport_count)d, "_href": "/ws.v1/lswitch/%(uuid)s/status", diff --git a/neutron/tests/unit/nicira/etc/fake_get_lswitch_lport.json b/neutron/tests/unit/nicira/etc/fake_get_lswitch_lport.json index cfa7aed46..3e5cb90c2 100644 --- a/neutron/tests/unit/nicira/etc/fake_get_lswitch_lport.json +++ b/neutron/tests/unit/nicira/etc/fake_get_lswitch_lport.json @@ -3,7 +3,8 @@ {"LogicalPortStatus": {"type": "LogicalSwitchPortStatus", "admin_status_enabled": true, - "fabric_status_up": false, + "fabric_status_up": %(status)s, + "link_status_up": %(status)s, "_href": "/ws.v1/lswitch/%(ls_uuid)s/lport/%(uuid)s/status", "_schema": "/ws.v1/schema/LogicalSwitchPortStatus"}, "LogicalSwitchConfig": diff --git a/neutron/tests/unit/nicira/fake_nvpapiclient.py b/neutron/tests/unit/nicira/fake_nvpapiclient.py index 1133b0e7e..e8b6f5b41 100644 --- a/neutron/tests/unit/nicira/fake_nvpapiclient.py +++ b/neutron/tests/unit/nicira/fake_nvpapiclient.py @@ -106,17 +106,6 @@ class FakeClient: LROUTER_LPORT_RESOURCE: ['LogicalPortAttachment'], } - _fake_lswitch_dict = {} - _fake_lrouter_dict = {} - _fake_lswitch_lport_dict = {} - _fake_lrouter_lport_dict = {} - _fake_lrouter_nat_dict = {} - _fake_lswitch_lportstatus_dict = {} - _fake_lrouter_lportstatus_dict = {} - _fake_securityprofile_dict = {} - _fake_lqueue_dict = {} - _fake_gatewayservice_dict = {} - _validators = { LSWITCH_RESOURCE: _validate_resource, LSWITCH_LPORT_RESOURCE: _validate_resource, @@ -128,6 +117,16 @@ class FakeClient: def __init__(self, fake_files_path): self.fake_files_path = fake_files_path + self._fake_lswitch_dict = {} + self._fake_lrouter_dict = {} + self._fake_lswitch_lport_dict = {} + self._fake_lrouter_lport_dict = {} + self._fake_lrouter_nat_dict = {} + self._fake_lswitch_lportstatus_dict = {} + self._fake_lrouter_lportstatus_dict = {} + self._fake_securityprofile_dict = {} + self._fake_lqueue_dict = {} + self._fake_gatewayservice_dict = {} def _get_tag(self, resource, scope): tags = [tag['tag'] for tag in resource['tags'] @@ -136,7 +135,7 @@ class FakeClient: def _get_filters(self, querystring): if not querystring: - return (None, None) + return (None, None, None) params = urlparse.parse_qs(querystring) tag_filter = None attr_filter = None @@ -145,7 +144,15 @@ class FakeClient: 'tag': params['tag'][0]} elif 'uuid' in params: attr_filter = {'uuid': params['uuid'][0]} - return (tag_filter, attr_filter) + # Handle page_length + # TODO(salv-orlando): Handle page cursor too + page_len = params.get('_page_length') + if page_len: + page_len = int(page_len[0]) + else: + # Explicitly set it to None (avoid 0 or empty list) + page_len = None + return (tag_filter, attr_filter, page_len) def _add_lswitch(self, body): fake_lswitch = json.loads(body) @@ -157,6 +164,8 @@ class FakeClient: fake_lswitch['zone_uuid'] = zone_uuid fake_lswitch['tenant_id'] = self._get_tag(fake_lswitch, 'os_tid') fake_lswitch['lport_count'] = 0 + # set status value + fake_lswitch['status'] = 'true' return fake_lswitch def _build_lrouter(self, body, uuid=None): @@ -183,6 +192,8 @@ class FakeClient: uuidutils.generate_uuid()) self._fake_lrouter_dict[fake_lrouter['uuid']] = fake_lrouter fake_lrouter['lport_count'] = 0 + # set status value + fake_lrouter['status'] = 'true' return fake_lrouter def _add_lqueue(self, body): @@ -213,6 +224,8 @@ class FakeClient: fake_lport_status['ls_uuid'] = fake_lswitch['uuid'] fake_lport_status['ls_name'] = fake_lswitch['display_name'] fake_lport_status['ls_zone_uuid'] = fake_lswitch['zone_uuid'] + # set status value + fake_lport['status'] = 'true' self._fake_lswitch_lportstatus_dict[new_uuid] = fake_lport_status return fake_lport @@ -356,7 +369,7 @@ class FakeClient: def _list(self, resource_type, response_file, parent_uuid=None, query=None, relations=None): - (tag_filter, attr_filter) = self._get_filters(query) + (tag_filter, attr_filter, page_len) = self._get_filters(query) with open("%s/%s" % (self.fake_files_path, response_file)) as f: response_template = f.read() res_dict = getattr(self, '_fake_%s_dict' % resource_type) @@ -425,8 +438,20 @@ class FakeClient: if (parent_func(res_uuid) and _tag_match(res_uuid) and _attr_match(res_uuid))] - return json.dumps({'results': items, - 'result_count': len(items)}) + # Rather inefficient, but hey this is just a mock! + next_cursor = None + total_items = len(items) + if page_len: + try: + next_cursor = items[page_len]['uuid'] + except IndexError: + next_cursor = None + items = items[:page_len] + response_dict = {'results': items, + 'result_count': total_items} + if next_cursor: + response_dict['page_cursor'] = next_cursor + return json.dumps(response_dict) def _show(self, resource_type, response_file, uuid1, uuid2=None, relations=None): diff --git a/neutron/tests/unit/nicira/test_agent_scheduler.py b/neutron/tests/unit/nicira/test_agent_scheduler.py index aad015405..7bd2af700 100644 --- a/neutron/tests/unit/nicira/test_agent_scheduler.py +++ b/neutron/tests/unit/nicira/test_agent_scheduler.py @@ -16,6 +16,7 @@ import mock from neutron.common.test_lib import test_config +from neutron.plugins.nicira.common import sync from neutron.tests.unit.nicira import fake_nvpapiclient from neutron.tests.unit.nicira import get_fake_conf from neutron.tests.unit.nicira import NVPAPI_NAME @@ -35,6 +36,9 @@ class NVPDhcpAgentNotifierTestCase(test_base.OvsDhcpAgentNotifierTestCase): self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH) self.mock_nvpapi = mock.patch(NVPAPI_NAME, autospec=True) instance = self.mock_nvpapi.start() + # Avoid runs of the synchronizer looping call + patch_sync = mock.patch.object(sync, '_start_loopingcall') + patch_sync.start() def _fake_request(*args, **kwargs): return self.fc.fake_request(*args, **kwargs) @@ -44,6 +48,7 @@ class NVPDhcpAgentNotifierTestCase(test_base.OvsDhcpAgentNotifierTestCase): instance.return_value.request.side_effect = _fake_request super(NVPDhcpAgentNotifierTestCase, self).setUp() self.addCleanup(self.fc.reset_all) + self.addCleanup(patch_sync.stop) self.addCleanup(self.mock_nvpapi.stop) def _notification_mocks(self, hosts, mock_dhcp, net, subnet, port): diff --git a/neutron/tests/unit/nicira/test_maclearning.py b/neutron/tests/unit/nicira/test_maclearning.py index 9534ccae0..2df2e7419 100644 --- a/neutron/tests/unit/nicira/test_maclearning.py +++ b/neutron/tests/unit/nicira/test_maclearning.py @@ -24,6 +24,7 @@ from neutron.api.v2 import attributes from neutron.common.test_lib import test_config from neutron import context from neutron.extensions import agent +from neutron.plugins.nicira.common import sync from neutron.plugins.nicira.NvpApiClient import NVPVersion from neutron.tests.unit.nicira import fake_nvpapiclient from neutron.tests.unit.nicira import get_fake_conf @@ -70,6 +71,9 @@ class MacLearningDBTestCase(test_db_plugin.NeutronDbPluginV2TestCase): self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH) self.mock_nvpapi = mock.patch(NVPAPI_NAME, autospec=True) instance = self.mock_nvpapi.start() + # Avoid runs of the synchronizer looping call + patch_sync = mock.patch.object(sync, '_start_loopingcall') + patch_sync.start() def _fake_request(*args, **kwargs): return self.fc.fake_request(*args, **kwargs) @@ -80,6 +84,7 @@ class MacLearningDBTestCase(test_db_plugin.NeutronDbPluginV2TestCase): cfg.CONF.set_override('metadata_mode', None, 'NVP') self.addCleanup(self.fc.reset_all) self.addCleanup(self.mock_nvpapi.stop) + self.addCleanup(patch_sync.stop) self.addCleanup(self.restore_resource_attribute_map) self.addCleanup(cfg.CONF.reset) super(MacLearningDBTestCase, self).setUp() diff --git a/neutron/tests/unit/nicira/test_nicira_plugin.py b/neutron/tests/unit/nicira/test_nicira_plugin.py index f2dc40a6e..d51d527c0 100644 --- a/neutron/tests/unit/nicira/test_nicira_plugin.py +++ b/neutron/tests/unit/nicira/test_nicira_plugin.py @@ -34,6 +34,7 @@ from neutron.extensions import securitygroup as secgrp from neutron import manager from neutron.openstack.common import uuidutils from neutron.plugins.nicira.common import exceptions as nvp_exc +from neutron.plugins.nicira.common import sync from neutron.plugins.nicira.dbexts import nicira_db from neutron.plugins.nicira.dbexts import nicira_qos_db as qos_db from neutron.plugins.nicira.extensions import distributedrouter as dist_router @@ -92,6 +93,9 @@ class NiciraPluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase): self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH) self.mock_nvpapi = mock.patch(NVPAPI_NAME, autospec=True) self.mock_instance = self.mock_nvpapi.start() + # Avoid runs of the synchronizer looping call + patch_sync = mock.patch.object(sync, '_start_loopingcall') + patch_sync.start() def _fake_request(*args, **kwargs): return self.fc.fake_request(*args, **kwargs) @@ -106,6 +110,7 @@ class NiciraPluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase): cfg.CONF.set_override('metadata_mode', None, 'NVP') self.addCleanup(self.fc.reset_all) self.addCleanup(self.mock_nvpapi.stop) + self.addCleanup(patch_sync.stop) class TestNiciraBasicGet(test_plugin.TestBasicGet, NiciraPluginV2TestCase): @@ -325,6 +330,9 @@ class NiciraPortSecurityTestCase(psec.PortSecurityDBTestCase): self.mock_nvpapi = mock.patch(NVPAPI_NAME, autospec=True) instance = self.mock_nvpapi.start() instance.return_value.login.return_value = "the_cookie" + # Avoid runs of the synchronizer looping call + patch_sync = mock.patch.object(sync, '_start_loopingcall') + patch_sync.start() def _fake_request(*args, **kwargs): return self.fc.fake_request(*args, **kwargs) @@ -333,6 +341,7 @@ class NiciraPortSecurityTestCase(psec.PortSecurityDBTestCase): super(NiciraPortSecurityTestCase, self).setUp(PLUGIN_NAME) self.addCleanup(self.fc.reset_all) self.addCleanup(self.mock_nvpapi.stop) + self.addCleanup(patch_sync.stop) class TestNiciraPortSecurity(NiciraPortSecurityTestCase, @@ -349,17 +358,18 @@ class NiciraSecurityGroupsTestCase(ext_sg.SecurityGroupDBTestCase): self.mock_nvpapi = mock.patch(NVPAPI_NAME, autospec=True) instance = self.mock_nvpapi.start() instance.return_value.login.return_value = "the_cookie" + # Avoid runs of the synchronizer looping call + patch_sync = mock.patch.object(sync, '_start_loopingcall') + patch_sync.start() def _fake_request(*args, **kwargs): return fc.fake_request(*args, **kwargs) instance.return_value.request.side_effect = _fake_request + self.addCleanup(self.mock_nvpapi.stop) + self.addCleanup(patch_sync.stop) super(NiciraSecurityGroupsTestCase, self).setUp(PLUGIN_NAME) - def tearDown(self): - super(NiciraSecurityGroupsTestCase, self).tearDown() - self.mock_nvpapi.stop() - class TestNiciraSecurityGroup(ext_sg.TestSecurityGroups, NiciraSecurityGroupsTestCase): @@ -1125,20 +1135,12 @@ class NiciraNeutronNVPOutOfSync(NiciraPluginV2TestCase, res = req.get_response(self.api) self.assertEqual(res.status_int, 204) - def test_list_networks_not_in_nvp(self): - res = self._create_network('json', 'net1', True) - self.deserialize('json', res) - self.fc._fake_lswitch_dict.clear() - req = self.new_list_request('networks') - nets = self.deserialize('json', req.get_response(self.api)) - self.assertEqual(nets['networks'][0]['status'], - constants.NET_STATUS_ERROR) - def test_show_network_not_in_nvp(self): res = self._create_network('json', 'net1', True) net = self.deserialize('json', res) self.fc._fake_lswitch_dict.clear() - req = self.new_show_request('networks', net['network']['id']) + req = self.new_show_request('networks', net['network']['id'], + fields=['id', 'status']) net = self.deserialize('json', req.get_response(self.api)) self.assertEqual(net['network']['status'], constants.NET_STATUS_ERROR) @@ -1153,17 +1155,6 @@ class NiciraNeutronNVPOutOfSync(NiciraPluginV2TestCase, res = req.get_response(self.api) self.assertEqual(res.status_int, 204) - def test_list_port_not_in_nvp(self): - res = self._create_network('json', 'net1', True) - net1 = self.deserialize('json', res) - res = self._create_port('json', net1['network']['id']) - self.deserialize('json', res) - self.fc._fake_lswitch_lport_dict.clear() - req = self.new_list_request('ports') - nets = self.deserialize('json', req.get_response(self.api)) - self.assertEqual(nets['ports'][0]['status'], - constants.PORT_STATUS_ERROR) - def test_show_port_not_in_nvp(self): res = self._create_network('json', 'net1', True) net1 = self.deserialize('json', res) @@ -1171,7 +1162,8 @@ class NiciraNeutronNVPOutOfSync(NiciraPluginV2TestCase, port = self.deserialize('json', res) self.fc._fake_lswitch_lport_dict.clear() self.fc._fake_lswitch_lportstatus_dict.clear() - req = self.new_show_request('ports', port['port']['id']) + req = self.new_show_request('ports', port['port']['id'], + fields=['id', 'status']) net = self.deserialize('json', req.get_response(self.api)) self.assertEqual(net['port']['status'], constants.PORT_STATUS_ERROR) @@ -1218,20 +1210,12 @@ class NiciraNeutronNVPOutOfSync(NiciraPluginV2TestCase, res = req.get_response(self.ext_api) self.assertEqual(res.status_int, 204) - def test_list_routers_not_in_nvp(self): - res = self._create_router('json', 'tenant') - self.deserialize('json', res) - self.fc._fake_lrouter_dict.clear() - req = self.new_list_request('routers') - routers = self.deserialize('json', req.get_response(self.ext_api)) - self.assertEqual(routers['routers'][0]['status'], - constants.NET_STATUS_ERROR) - def test_show_router_not_in_nvp(self): res = self._create_router('json', 'tenant') router = self.deserialize('json', res) self.fc._fake_lrouter_dict.clear() - req = self.new_show_request('routers', router['router']['id']) + req = self.new_show_request('routers', router['router']['id'], + fields=['id', 'status']) router = self.deserialize('json', req.get_response(self.ext_api)) self.assertEqual(router['router']['status'], constants.NET_STATUS_ERROR) diff --git a/neutron/tests/unit/nicira/test_nvp_sync.py b/neutron/tests/unit/nicira/test_nvp_sync.py new file mode 100644 index 000000000..9b2b28fa0 --- /dev/null +++ b/neutron/tests/unit/nicira/test_nvp_sync.py @@ -0,0 +1,587 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Nicira Networks, Inc. +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import contextlib +import time + +import mock +from oslo.config import cfg + +from neutron.api.v2 import attributes as attr +from neutron.common import config +from neutron.common import constants +from neutron import context +from neutron.openstack.common import jsonutils as json +from neutron.plugins.nicira.common import sync +from neutron.plugins.nicira import NeutronPlugin +from neutron.plugins.nicira import nvp_cluster +from neutron.plugins.nicira import NvpApiClient +from neutron.plugins.nicira import nvplib +from neutron.tests import base +from neutron.tests.unit.nicira import fake_nvpapiclient +from neutron.tests.unit.nicira import get_fake_conf +from neutron.tests.unit.nicira import NVPAPI_NAME +from neutron.tests.unit.nicira import STUBS_PATH +from neutron.tests.unit import test_api_v2 + +from neutron.openstack.common import log + +LOG = log.getLogger(__name__) + +_uuid = test_api_v2._uuid +LSWITCHES = [{'uuid': _uuid(), 'name': 'ls-1'}, + {'uuid': _uuid(), 'name': 'ls-2'}] +LSWITCHPORTS = [{'uuid': _uuid(), 'name': 'lp-1'}, + {'uuid': _uuid(), 'name': 'lp-2'}] +LROUTERS = [{'uuid': _uuid(), 'name': 'lr-1'}, + {'uuid': _uuid(), 'name': 'lr-2'}] + + +class NvpCacheTestCase(base.BaseTestCase): + """Test suite providing coverage for the NvpCache class.""" + + def setUp(self): + self.nvp_cache = sync.NvpCache() + for lswitch in LSWITCHES: + self.nvp_cache._uuid_dict_mappings[lswitch['uuid']] = ( + self.nvp_cache._lswitches) + self.nvp_cache._lswitches[lswitch['uuid']] = ( + {'data': lswitch, + 'hash': hash(json.dumps(lswitch))}) + for lswitchport in LSWITCHPORTS: + self.nvp_cache._uuid_dict_mappings[lswitchport['uuid']] = ( + self.nvp_cache._lswitchports) + self.nvp_cache._lswitchports[lswitchport['uuid']] = ( + {'data': lswitchport, + 'hash': hash(json.dumps(lswitchport))}) + for lrouter in LROUTERS: + self.nvp_cache._uuid_dict_mappings[lrouter['uuid']] = ( + self.nvp_cache._lrouters) + self.nvp_cache._lrouters[lrouter['uuid']] = ( + {'data': lrouter, + 'hash': hash(json.dumps(lrouter))}) + super(NvpCacheTestCase, self).setUp() + + def test_get_lswitches(self): + ls_uuids = self.nvp_cache.get_lswitches() + self.assertEqual(set(ls_uuids), + set([ls['uuid'] for ls in LSWITCHES])) + + def test_get_lswitchports(self): + lp_uuids = self.nvp_cache.get_lswitchports() + self.assertEqual(set(lp_uuids), + set([lp['uuid'] for lp in LSWITCHPORTS])) + + def test_get_lrouters(self): + lr_uuids = self.nvp_cache.get_lrouters() + self.assertEqual(set(lr_uuids), + set([lr['uuid'] for lr in LROUTERS])) + + def test_get_lswitches_changed_only(self): + ls_uuids = self.nvp_cache.get_lswitches(changed_only=True) + self.assertEqual(0, len(ls_uuids)) + + def test_get_lswitchports_changed_only(self): + lp_uuids = self.nvp_cache.get_lswitchports(changed_only=True) + self.assertEqual(0, len(lp_uuids)) + + def test_get_lrouters_changed_only(self): + lr_uuids = self.nvp_cache.get_lrouters(changed_only=True) + self.assertEqual(0, len(lr_uuids)) + + def _verify_update(self, new_resource, changed=True, hit=True): + cached_resource = self.nvp_cache[new_resource['uuid']] + self.assertEqual(new_resource, cached_resource['data']) + self.assertEqual(hit, cached_resource.get('hit', False)) + self.assertEqual(changed, + cached_resource.get('changed', False)) + + def test_update_lswitch_new_item(self): + new_switch_uuid = _uuid() + new_switch = {'uuid': new_switch_uuid, 'name': 'new_switch'} + self.nvp_cache.update_lswitch(new_switch) + self.assertIn(new_switch_uuid, self.nvp_cache._lswitches.keys()) + self._verify_update(new_switch) + + def test_update_lswitch_existing_item(self): + switch = LSWITCHES[0] + switch['name'] = 'new_name' + self.nvp_cache.update_lswitch(switch) + self.assertIn(switch['uuid'], self.nvp_cache._lswitches.keys()) + self._verify_update(switch) + + def test_update_lswitchport_new_item(self): + new_switchport_uuid = _uuid() + new_switchport = {'uuid': new_switchport_uuid, + 'name': 'new_switchport'} + self.nvp_cache.update_lswitchport(new_switchport) + self.assertIn(new_switchport_uuid, + self.nvp_cache._lswitchports.keys()) + self._verify_update(new_switchport) + + def test_update_lswitchport_existing_item(self): + switchport = LSWITCHPORTS[0] + switchport['name'] = 'new_name' + self.nvp_cache.update_lswitchport(switchport) + self.assertIn(switchport['uuid'], + self.nvp_cache._lswitchports.keys()) + self._verify_update(switchport) + + def test_update_lrouter_new_item(self): + new_router_uuid = _uuid() + new_router = {'uuid': new_router_uuid, + 'name': 'new_router'} + self.nvp_cache.update_lrouter(new_router) + self.assertIn(new_router_uuid, + self.nvp_cache._lrouters.keys()) + self._verify_update(new_router) + + def test_update_lrouter_existing_item(self): + router = LROUTERS[0] + router['name'] = 'new_name' + self.nvp_cache.update_lrouter(router) + self.assertIn(router['uuid'], + self.nvp_cache._lrouters.keys()) + self._verify_update(router) + + def test_process_updates_initial(self): + # Clear cache content to simulate first-time filling + self.nvp_cache._lswitches.clear() + self.nvp_cache._lswitchports.clear() + self.nvp_cache._lrouters.clear() + self.nvp_cache.process_updates(LSWITCHES, LROUTERS, LSWITCHPORTS) + for resource in LSWITCHES + LROUTERS + LSWITCHPORTS: + self._verify_update(resource) + + def test_process_updates_no_change(self): + self.nvp_cache.process_updates(LSWITCHES, LROUTERS, LSWITCHPORTS) + for resource in LSWITCHES + LROUTERS + LSWITCHPORTS: + self._verify_update(resource, changed=False) + + def test_process_updates_with_changes(self): + LSWITCHES[0]['name'] = 'altered' + self.nvp_cache.process_updates(LSWITCHES, LROUTERS, LSWITCHPORTS) + for resource in LSWITCHES + LROUTERS + LSWITCHPORTS: + changed = (True if resource['uuid'] == LSWITCHES[0]['uuid'] + else False) + self._verify_update(resource, changed=changed) + + def _test_process_updates_with_removals(self): + lswitches = LSWITCHES[:] + lswitch = lswitches.pop() + self.nvp_cache.process_updates(lswitches, LROUTERS, LSWITCHPORTS) + for resource in LSWITCHES + LROUTERS + LSWITCHPORTS: + hit = (False if resource['uuid'] == lswitch['uuid'] + else True) + self._verify_update(resource, changed=False, hit=hit) + return (lswitch, lswitches) + + def test_process_updates_with_removals(self): + self._test_process_updates_with_removals() + + def test_process_updates_cleanup_after_delete(self): + deleted_lswitch, lswitches = self._test_process_updates_with_removals() + self.nvp_cache.process_deletes() + self.nvp_cache.process_updates(lswitches, LROUTERS, LSWITCHPORTS) + self.assertNotIn(deleted_lswitch['uuid'], self.nvp_cache._lswitches) + + def _verify_delete(self, resource, deleted=True, hit=True): + cached_resource = self.nvp_cache[resource['uuid']] + data_field = 'data_bk' if deleted else 'data' + self.assertEqual(resource, cached_resource[data_field]) + self.assertEqual(hit, cached_resource.get('hit', False)) + self.assertEqual(deleted, + cached_resource.get('changed', False)) + + def _set_hit(self, resources, uuid_to_delete=None): + for resource in resources: + if resource['data']['uuid'] != uuid_to_delete: + resource['hit'] = True + + def test_process_deletes_no_change(self): + # Mark all resources as hit + self._set_hit(self.nvp_cache._lswitches.values()) + self._set_hit(self.nvp_cache._lswitchports.values()) + self._set_hit(self.nvp_cache._lrouters.values()) + self.nvp_cache.process_deletes() + for resource in LSWITCHES + LROUTERS + LSWITCHPORTS: + self._verify_delete(resource, hit=False, deleted=False) + + def test_process_deletes_with_removals(self): + # Mark all resources but one as hit + uuid_to_delete = LSWITCHPORTS[0]['uuid'] + self._set_hit(self.nvp_cache._lswitches.values(), + uuid_to_delete) + self._set_hit(self.nvp_cache._lswitchports.values(), + uuid_to_delete) + self._set_hit(self.nvp_cache._lrouters.values(), + uuid_to_delete) + self.nvp_cache.process_deletes() + for resource in LSWITCHES + LROUTERS + LSWITCHPORTS: + deleted = resource['uuid'] == uuid_to_delete + self._verify_delete(resource, hit=False, deleted=deleted) + + +class SyncLoopingCallTestCase(base.BaseTestCase): + + def test_looping_calls(self): + # Avoid runs of the synchronization process - just start + # the looping call + with mock.patch.object( + sync.NvpSynchronizer, '_synchronize_state', + return_value=0.01): + synchronizer = sync.NvpSynchronizer(None, None, + 100, 0, 0) + time.sleep(0.04999) + self.assertEqual( + 5, synchronizer._synchronize_state.call_count) + + +class NvpSyncTestCase(base.BaseTestCase): + + def setUp(self): + # mock nvp api client + self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH) + mock_nvpapi = mock.patch(NVPAPI_NAME, autospec=True) + # Avoid runs of the synchronizer looping call + # These unit tests will excplicitly invoke synchronization + patch_sync = mock.patch.object(sync, '_start_loopingcall') + self.mock_nvpapi = mock_nvpapi.start() + patch_sync.start() + self.mock_nvpapi.return_value.login.return_value = "the_cookie" + # Emulate tests against NVP 3.x + self.mock_nvpapi.return_value.get_nvp_version.return_value = ( + NvpApiClient.NVPVersion("3.1")) + + def _fake_request(*args, **kwargs): + return self.fc.fake_request(*args, **kwargs) + + self.mock_nvpapi.return_value.request.side_effect = _fake_request + self.fake_cluster = nvp_cluster.NVPCluster( + name='fake-cluster', nvp_controllers=['1.1.1.1:999'], + default_tz_uuid=_uuid(), nvp_user='foo', nvp_password='bar') + self.fake_cluster.api_client = NvpApiClient.NVPApiHelper( + ('1.1.1.1', '999', True), + self.fake_cluster.nvp_user, self.fake_cluster.nvp_password, + self.fake_cluster.req_timeout, self.fake_cluster.http_timeout, + self.fake_cluster.retries, self.fake_cluster.redirects) + # Instantiate Neutron plugin + # and setup needed config variables + args = ['--config-file', get_fake_conf('neutron.conf.test'), + '--config-file', get_fake_conf('nvp.ini.test')] + config.parse(args=args) + self._plugin = NeutronPlugin.NvpPluginV2() + super(NvpSyncTestCase, self).setUp() + self.addCleanup(self.fc.reset_all) + self.addCleanup(patch_sync.stop) + self.addCleanup(mock_nvpapi.stop) + + def tearDown(self): + cfg.CONF.reset() + super(NvpSyncTestCase, self).tearDown() + + @contextlib.contextmanager + def _populate_data(self, ctx, net_size=2, port_size=2, router_size=2): + + def network(idx): + return {'network': {'name': 'net-%s' % idx, + 'admin_state_up': True, + 'shared': False, + 'port_security_enabled': True, + 'tenant_id': 'foo'}} + + def subnet(idx, net_id): + return {'subnet': + {'cidr': '10.10.%s.0/24' % idx, + 'name': 'sub-%s' % idx, + 'gateway_ip': attr.ATTR_NOT_SPECIFIED, + 'allocation_pools': attr.ATTR_NOT_SPECIFIED, + 'ip_version': 4, + 'dns_nameservers': attr.ATTR_NOT_SPECIFIED, + 'host_routes': attr.ATTR_NOT_SPECIFIED, + 'enable_dhcp': True, + 'network_id': net_id, + 'tenant_id': 'foo'}} + + def port(idx, net_id): + return {'port': {'network_id': net_id, + 'name': 'port-%s' % idx, + 'admin_state_up': True, + 'device_id': 'miao', + 'device_owner': 'bau', + 'fixed_ips': attr.ATTR_NOT_SPECIFIED, + 'mac_address': attr.ATTR_NOT_SPECIFIED, + 'tenant_id': 'foo'}} + + def router(idx): + # Use random uuids as names + return {'router': {'name': 'rtr-%s' % idx, + 'admin_state_up': True, + 'tenant_id': 'foo'}} + + networks = [] + ports = [] + routers = [] + for i in range(0, net_size): + net = self._plugin.create_network(ctx, network(i)) + networks.append(net) + self._plugin.create_subnet(ctx, subnet(i, net['id'])) + for j in range(0, port_size): + ports.append(self._plugin.create_port( + ctx, port("%s-%s" % (i, j), net['id']))) + for i in range(0, router_size): + routers.append(self._plugin.create_router(ctx, router(i))) + # Do not return anything as the user does need the actual + # data created + try: + yield + finally: + # Remove everything + for router in routers: + self._plugin.delete_router(ctx, router['id']) + for port in ports: + self._plugin.delete_port(ctx, port['id']) + # This will remove networks and subnets + for network in networks: + self._plugin.delete_network(ctx, network['id']) + + def _get_tag_dict(self, tags): + return dict((tag['scope'], tag['tag']) for tag in tags) + + def _test_sync(self, exp_net_status, + exp_port_status, exp_router_status, + action_callback=None, sp=None): + neutron_net_id = ls_uuid = self.fc._fake_lswitch_dict.keys()[0] + lp_uuid = self.fc._fake_lswitch_lport_dict.keys()[0] + neutron_port_id = self._get_tag_dict( + self.fc._fake_lswitch_lport_dict[lp_uuid]['tags'])['q_port_id'] + neutron_rtr_id = lr_uuid = self.fc._fake_lrouter_dict.keys()[0] + if action_callback: + action_callback(ls_uuid, lp_uuid, lr_uuid) + # Make chunk big enough to read everything + if not sp: + sp = sync.SyncParameters(100) + self._plugin._synchronizer._synchronize_state(sp) + # Verify element is in expected status + # TODO(salv-orlando): Verify status for all elements + ctx = context.get_admin_context() + neutron_net = self._plugin.get_network(ctx, neutron_net_id) + neutron_port = self._plugin.get_port(ctx, neutron_port_id) + neutron_rtr = self._plugin.get_router(ctx, neutron_rtr_id) + self.assertEqual(exp_net_status, neutron_net['status']) + self.assertEqual(exp_port_status, neutron_port['status']) + self.assertEqual(exp_router_status, neutron_rtr['status']) + + def _action_callback_status_down(self, ls_uuid, lp_uuid, lr_uuid): + self.fc._fake_lswitch_dict[ls_uuid]['status'] = 'false' + self.fc._fake_lswitch_lport_dict[lp_uuid]['status'] = 'false' + self.fc._fake_lrouter_dict[lr_uuid]['status'] = 'false' + + def test_initial_sync(self): + ctx = context.get_admin_context() + with self._populate_data(ctx): + self._test_sync( + constants.NET_STATUS_ACTIVE, + constants.PORT_STATUS_ACTIVE, + constants.NET_STATUS_ACTIVE) + + def test_initial_sync_with_resources_down(self): + ctx = context.get_admin_context() + with self._populate_data(ctx): + self._test_sync( + constants.NET_STATUS_DOWN, constants.PORT_STATUS_DOWN, + constants.NET_STATUS_DOWN, self._action_callback_status_down) + + def test_resync_with_resources_down(self): + ctx = context.get_admin_context() + with self._populate_data(ctx): + sp = sync.SyncParameters(100) + self._plugin._synchronizer._synchronize_state(sp) + self._test_sync( + constants.NET_STATUS_DOWN, constants.PORT_STATUS_DOWN, + constants.NET_STATUS_DOWN, self._action_callback_status_down) + + def _action_callback_del_resource(self, ls_uuid, lp_uuid, lr_uuid): + del self.fc._fake_lswitch_dict[ls_uuid] + del self.fc._fake_lswitch_lport_dict[lp_uuid] + del self.fc._fake_lrouter_dict[lr_uuid] + + def test_initial_sync_with_resources_removed(self): + ctx = context.get_admin_context() + with self._populate_data(ctx): + self._test_sync( + constants.NET_STATUS_ERROR, constants.PORT_STATUS_ERROR, + constants.NET_STATUS_ERROR, self._action_callback_del_resource) + + def test_resync_with_resources_removed(self): + ctx = context.get_admin_context() + with self._populate_data(ctx): + sp = sync.SyncParameters(100) + self._plugin._synchronizer._synchronize_state(sp) + self._test_sync( + constants.NET_STATUS_ERROR, constants.PORT_STATUS_ERROR, + constants.NET_STATUS_ERROR, self._action_callback_del_resource) + + def _test_sync_with_chunk_larger_maxpagesize( + self, net_size, port_size, router_size, chunk_size, exp_calls): + ctx = context.get_admin_context() + real_func = nvplib.get_single_query_page + sp = sync.SyncParameters(chunk_size) + with self._populate_data(ctx, net_size=net_size, + port_size=port_size, + router_size=router_size): + with mock.patch.object(nvplib, 'MAX_PAGE_SIZE', 15): + # The following mock is just for counting calls, + # but we will still run the actual function + with mock.patch.object( + nvplib, 'get_single_query_page', + side_effect=real_func) as mock_get_page: + self._test_sync( + constants.NET_STATUS_ACTIVE, + constants.PORT_STATUS_ACTIVE, + constants.NET_STATUS_ACTIVE, + sp=sp) + # As each resource type does not exceed the maximum page size, + # the method should be called once for each resource type + self.assertEqual(exp_calls, mock_get_page.call_count) + + def test_sync_chunk_larger_maxpagesize_no_multiple_requests(self): + # total resource size = 20 + # total size for each resource does not exceed max page size (15) + self._test_sync_with_chunk_larger_maxpagesize( + net_size=5, port_size=2, router_size=5, + chunk_size=20, exp_calls=3) + + def test_sync_chunk_larger_maxpagesize_triggers_multiple_requests(self): + # total resource size = 48 + # total size for each resource does exceed max page size (15) + self._test_sync_with_chunk_larger_maxpagesize( + net_size=16, port_size=1, router_size=16, + chunk_size=48, exp_calls=6) + + def test_sync_multi_chunk(self): + # The fake NVP API client cannot be used for this test + ctx = context.get_admin_context() + # Generate 4 networks, 1 port per network, and 4 routers + with self._populate_data(ctx, net_size=4, port_size=1, router_size=4): + fake_lswitches = json.loads( + self.fc.handle_get('/ws.v1/lswitch'))['results'] + fake_lrouters = json.loads( + self.fc.handle_get('/ws.v1/lrouter'))['results'] + fake_lswitchports = json.loads( + self.fc.handle_get('/ws.v1/lswitch/*/lport'))['results'] + return_values = [ + # Chunk 0 - lswitches + (fake_lswitches, None, 4), + # Chunk 0 - lrouters + (fake_lrouters[:2], 'xxx', 4), + # Chunk 0 - lports (size only) + ([], 'start', 4), + # Chunk 1 - lrouters (2 more) (lswitches are skipped) + (fake_lrouters[2:], None, None), + # Chunk 1 - lports + (fake_lswitchports, None, 4)] + + def fake_fetch_data(*args, **kwargs): + return return_values.pop(0) + + # 2 Chunks, with 6 resources each. + # 1st chunk lswitches and lrouters + # 2nd chunk lrouters and lports + # Mock _fetch_data + with mock.patch.object( + self._plugin._synchronizer, '_fetch_data', + side_effect=fake_fetch_data): + sp = sync.SyncParameters(6) + + def do_chunk(chunk_idx, ls_cursor, lr_cursor, lp_cursor): + self._plugin._synchronizer._synchronize_state(sp) + self.assertEqual(chunk_idx, sp.current_chunk) + self.assertEqual(ls_cursor, sp.ls_cursor) + self.assertEqual(lr_cursor, sp.lr_cursor) + self.assertEqual(lp_cursor, sp.lp_cursor) + + # check 1st chunk + do_chunk(1, None, 'xxx', 'start') + # check 2nd chunk + do_chunk(0, None, None, None) + # Chunk size should have stayed the same + self.assertEqual(sp.chunk_size, 6) + + def test_synchronize_network(self): + ctx = context.get_admin_context() + with self._populate_data(ctx): + # Put a network down to verify synchronization + q_net_id = ls_uuid = self.fc._fake_lswitch_dict.keys()[0] + self.fc._fake_lswitch_dict[ls_uuid]['status'] = 'false' + q_net_data = self._plugin._get_network(ctx, q_net_id) + self._plugin._synchronizer.synchronize_network(ctx, q_net_data) + # Reload from db + q_nets = self._plugin.get_networks(ctx) + for q_net in q_nets: + if q_net['id'] == q_net_id: + exp_status = constants.NET_STATUS_DOWN + else: + exp_status = constants.NET_STATUS_ACTIVE + self.assertEqual(exp_status, q_net['status']) + + def test_synchronize_port(self): + ctx = context.get_admin_context() + with self._populate_data(ctx): + # Put a network down to verify synchronization + lp_uuid = self.fc._fake_lswitch_lport_dict.keys()[0] + lport = self.fc._fake_lswitch_lport_dict[lp_uuid] + q_port_id = self._get_tag_dict(lport['tags'])['q_port_id'] + lport['status'] = 'false' + q_port_data = self._plugin._get_port(ctx, q_port_id) + self._plugin._synchronizer.synchronize_port(ctx, q_port_data) + # Reload from db + q_ports = self._plugin.get_ports(ctx) + for q_port in q_ports: + if q_port['id'] == q_port_id: + exp_status = constants.PORT_STATUS_DOWN + else: + exp_status = constants.PORT_STATUS_ACTIVE + self.assertEqual(exp_status, q_port['status']) + + def test_synchronize_router(self): + ctx = context.get_admin_context() + with self._populate_data(ctx): + # Put a network down to verify synchronization + q_rtr_id = lr_uuid = self.fc._fake_lrouter_dict.keys()[0] + self.fc._fake_lrouter_dict[lr_uuid]['status'] = 'false' + q_rtr_data = self._plugin._get_router(ctx, q_rtr_id) + self._plugin._synchronizer.synchronize_router(ctx, q_rtr_data) + # Reload from db + q_routers = self._plugin.get_routers(ctx) + for q_rtr in q_routers: + if q_rtr['id'] == q_rtr_id: + exp_status = constants.NET_STATUS_DOWN + else: + exp_status = constants.NET_STATUS_ACTIVE + self.assertEqual(exp_status, q_rtr['status']) + + def test_sync_nvp_failure_backoff(self): + self.mock_nvpapi.return_value.request.side_effect = ( + NvpApiClient.RequestTimeout) + # chunk size won't matter here + sp = sync.SyncParameters(999) + for i in range(0, 10): + self.assertEqual( + min(64, 2 ** i), + self._plugin._synchronizer._synchronize_state(sp)) diff --git a/neutron/tests/unit/nicira/test_nvpopts.py b/neutron/tests/unit/nicira/test_nvpopts.py index 3d5b2587e..891f11fbd 100644 --- a/neutron/tests/unit/nicira/test_nvpopts.py +++ b/neutron/tests/unit/nicira/test_nvpopts.py @@ -16,6 +16,7 @@ import fixtures import testtools +import mock from oslo.config import cfg from neutron.common import config as q_config @@ -23,6 +24,7 @@ from neutron.manager import NeutronManager from neutron.openstack.common import uuidutils from neutron.plugins.nicira.common import config # noqa from neutron.plugins.nicira.common import exceptions +from neutron.plugins.nicira.common import sync from neutron.plugins.nicira import nvp_cluster from neutron.tests.unit.nicira import get_fake_conf from neutron.tests.unit.nicira import PLUGIN_NAME @@ -81,6 +83,10 @@ class ConfigurationTest(testtools.TestCase): self.useFixture(fixtures.MonkeyPatch( 'neutron.manager.NeutronManager._instance', None)) + # Avoid runs of the synchronizer looping call + patch_sync = mock.patch.object(sync, '_start_loopingcall') + patch_sync.start() + self.addCleanup(patch_sync.stop) def _assert_required_options(self, cluster): self.assertEqual(cluster.nvp_controllers, ['fake_1:443', 'fake_2:443']) @@ -175,6 +181,10 @@ class OldConfigurationTest(testtools.TestCase): self.useFixture(fixtures.MonkeyPatch( 'neutron.manager.NeutronManager._instance', None)) + # Avoid runs of the synchronizer looping call + patch_sync = mock.patch.object(sync, '_start_loopingcall') + patch_sync.start() + self.addCleanup(patch_sync.stop) def _assert_required_options(self, cluster): self.assertEqual(cluster.nvp_controllers, ['fake_1:443', 'fake_2:443']) diff --git a/neutron/tests/unit/test_db_plugin.py b/neutron/tests/unit/test_db_plugin.py index e628192f4..b89743ccd 100644 --- a/neutron/tests/unit/test_db_plugin.py +++ b/neutron/tests/unit/test_db_plugin.py @@ -202,10 +202,14 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): 'GET', resource, None, fmt, params=params, subresource=subresource ) - def new_show_request(self, resource, id, fmt=None, subresource=None): - return self._req( - 'GET', resource, None, fmt, id=id, subresource=subresource - ) + def new_show_request(self, resource, id, fmt=None, + subresource=None, fields=None): + if fields: + params = "&".join(["fields=%s" % x for x in fields]) + else: + params = None + return self._req('GET', resource, None, fmt, id=id, + params=params, subresource=subresource) def new_delete_request(self, resource, id, fmt=None, subresource=None, sub_id=None): -- 2.45.2