# the RPC agents to operate. When 'agentless' is chosen, the config option metadata_mode
# becomes ineffective. The mode 'agentless' is not supported for NVP 3.2 or below.
# agent_mode = agent
+
+[nvp_sync]
+# Interval in seconds between runs of the status synchronization task.
+# The plugin will aim at resynchronizing operational status for all
+# resources in this interval, and it should be therefore large enough
+# to ensure the task is feasible. Otherwise the plugin will be
+# constantly synchronizing resource status, ie: a new task is started
+# as soon as the previous is completed.
+# If this value is set to 0, the state synchronization thread for this
+# Neutron instance will be disabled.
+# state_sync_interval = 120
+
+# Random additional delay between two runs of the state synchronization task.
+# An additional wait time between 0 and max_random_sync_delay seconds
+# will be added on top of state_sync_interval.
+# max_random_sync_delay = 0
+
+# Minimum delay, in seconds, between two status synchronization requests for NVP.
+# Depending on chunk size, controller load, and other factors, state
+# synchronization requests might be pretty heavy. This means the
+# controller might take time to respond, and its load might be quite
+# increased by them. This parameter allows to specify a minimum
+# interval between two subsequent requests.
+# The value for this parameter must never exceed state_sync_interval.
+# If this does, an error will be raised at startup.
+# min_sync_req_delay = 10
+
+# Minimum number of resources to be retrieved from NVP in a single status
+# synchronization request.
+# The actual size of the chunk will increase if the number of resources is such
+# that using the minimum chunk size will cause the interval between two
+# requests to be less than min_sync_req_delay
+# min_chunk_size = 500
# @author: Aaron Rosen, Nicira Networks, Inc.
-import hashlib
import logging
import os
from neutron.plugins.nicira.common import config
from neutron.plugins.nicira.common import exceptions as nvp_exc
from neutron.plugins.nicira.common import securitygroups as nvp_sec
+from neutron.plugins.nicira.common import sync
from neutron.plugins.nicira.dbexts import distributedrouter as dist_rtr
from neutron.plugins.nicira.dbexts import maclearning as mac_db
from neutron.plugins.nicira.dbexts import nicira_db
if not cfg.CONF.api_extensions_path:
cfg.CONF.set_override('api_extensions_path', NVP_EXT_PATH)
self.nvp_opts = cfg.CONF.NVP
+ self.nvp_sync_opts = cfg.CONF.NVP_SYNC
self.cluster = create_nvp_cluster(cfg.CONF,
self.nvp_opts.concurrent_connections,
self.nvp_opts.nvp_gen_timeout)
# Set this flag to false as the default gateway has not
# been yet updated from the config file
self._is_default_net_gw_in_sync = False
+ # Create a synchronizer instance for backend sync
+ self._synchronizer = sync.NvpSynchronizer(
+ self, self.cluster,
+ self.nvp_sync_opts.state_sync_interval,
+ self.nvp_sync_opts.min_sync_req_delay,
+ self.nvp_sync_opts.min_chunk_size,
+ self.nvp_sync_opts.max_random_sync_delay)
def _ensure_default_network_gateway(self):
if self._is_default_net_gw_in_sync:
with context.session.begin(subtransactions=True):
# goto to the plugin DB and fetch the network
network = self._get_network(context, id)
- # if the network is external, do not go to NVP
- if not network.external:
- # verify the fabric status of the corresponding
- # logical switch(es) in nvp
- try:
- lswitches = nvplib.get_lswitches(self.cluster, id)
- nvp_net_status = constants.NET_STATUS_ACTIVE
- neutron_status = network.status
- for lswitch in lswitches:
- relations = lswitch.get('_relations')
- if relations:
- lswitch_status = relations.get(
- 'LogicalSwitchStatus')
- # FIXME(salvatore-orlando): Being unable to fetch
- # logical switch status should be an exception.
- if (lswitch_status and
- not lswitch_status.get('fabric_status',
- None)):
- nvp_net_status = constants.NET_STATUS_DOWN
- break
- LOG.debug(_("Current network status:%(nvp_net_status)s; "
- "Status in Neutron DB:%(neutron_status)s"),
- {'nvp_net_status': nvp_net_status,
- 'neutron_status': neutron_status})
- if nvp_net_status != network.status:
- # update the network status
- network.status = nvp_net_status
- except q_exc.NotFound:
- network.status = constants.NET_STATUS_ERROR
- except Exception:
- err_msg = _("Unable to get logical switches")
- LOG.exception(err_msg)
- raise nvp_exc.NvpPluginException(err_msg=err_msg)
+ if fields and 'status' in fields:
+ # External networks are not backed by nvp lswitches
+ if not network.external:
+ # Perform explicit state synchronization
+ self._synchronizer.synchronize_network(
+ context, network)
# Don't do field selection here otherwise we won't be able
# to add provider networks fields
net_result = self._make_network_dict(network)
return self._fields(net_result, fields)
def get_networks(self, context, filters=None, fields=None):
- nvp_lswitches = {}
filters = filters or {}
with context.session.begin(subtransactions=True):
- neutron_lswitches = (
- super(NvpPluginV2, self).get_networks(context, filters))
- for net in neutron_lswitches:
+ networks = super(NvpPluginV2, self).get_networks(context, filters)
+ for net in networks:
self._extend_network_dict_provider(context, net)
self._extend_network_qos_queue(context, net)
-
- tenant_ids = filters and filters.get('tenant_id') or None
- filter_fmt = "&tag=%s&tag_scope=os_tid"
- if context.is_admin and not tenant_ids:
- tenant_filter = ""
- else:
- tenant_ids = tenant_ids or [context.tenant_id]
- tenant_filter = ''.join(filter_fmt % tid for tid in tenant_ids)
- lswitch_filters = "uuid,display_name,fabric_status,tags"
- lswitch_url_path_1 = (
- "/ws.v1/lswitch?fields=%s&relations=LogicalSwitchStatus%s"
- % (lswitch_filters, tenant_filter))
- lswitch_url_path_2 = nvplib._build_uri_path(
- nvplib.LSWITCH_RESOURCE,
- fields=lswitch_filters,
- relations='LogicalSwitchStatus',
- filters={'tag': 'true', 'tag_scope': 'shared'})
- try:
- res = nvplib.get_all_query_pages(lswitch_url_path_1, self.cluster)
- nvp_lswitches.update(dict((ls['uuid'], ls) for ls in res))
- # Issue a second query for fetching shared networks.
- # We cannot unfortunately use just a single query because tags
- # cannot be or-ed
- res_shared = nvplib.get_all_query_pages(lswitch_url_path_2,
- self.cluster)
- nvp_lswitches.update(dict((ls['uuid'], ls) for ls in res_shared))
- except Exception:
- err_msg = _("Unable to get logical switches")
- LOG.exception(err_msg)
- raise nvp_exc.NvpPluginException(err_msg=err_msg)
-
- if filters.get('id'):
- nvp_lswitches = dict(
- (uuid, ls) for (uuid, ls) in nvp_lswitches.iteritems()
- if uuid in set(filters['id']))
- for neutron_lswitch in neutron_lswitches:
- # Skip external networks as they do not exist in NVP
- if neutron_lswitch[l3.EXTERNAL]:
- continue
- elif neutron_lswitch['id'] not in nvp_lswitches:
- LOG.warning(_("Logical Switch %s found in neutron database "
- "but not in NVP."), neutron_lswitch["id"])
- neutron_lswitch["status"] = constants.NET_STATUS_ERROR
- else:
- # TODO(salvatore-orlando): be careful about "extended"
- # logical switches
- ls = nvp_lswitches.pop(neutron_lswitch['id'])
- if (ls["_relations"]["LogicalSwitchStatus"]["fabric_status"]):
- neutron_lswitch["status"] = constants.NET_STATUS_ACTIVE
- else:
- neutron_lswitch["status"] = constants.NET_STATUS_DOWN
-
- # do not make the case in which switches are found in NVP
- # but not in Neutron catastrophic.
- if nvp_lswitches:
- LOG.warning(_("Found %s logical switches not bound "
- "to Neutron networks. Neutron and NVP are "
- "potentially out of sync"), len(nvp_lswitches))
-
- LOG.debug(_("get_networks() completed for tenant %s"),
- context.tenant_id)
-
- if fields:
- ret_fields = []
- for neutron_lswitch in neutron_lswitches:
- row = {}
- for field in fields:
- row[field] = neutron_lswitch[field]
- ret_fields.append(row)
- return ret_fields
- return neutron_lswitches
+ return [self._fields(network, fields) for network in networks]
def update_network(self, context, id, network):
pnet._raise_if_updates_provider_attributes(network['network'])
def get_ports(self, context, filters=None, fields=None):
filters = filters or {}
with context.session.begin(subtransactions=True):
- neutron_lports = super(NvpPluginV2, self).get_ports(
- context, filters)
- if (filters.get('network_id') and len(filters.get('network_id')) and
- self._network_is_external(context, filters['network_id'][0])):
- # Do not perform check on NVP platform
- return neutron_lports
-
- vm_filter = ""
- tenant_filter = ""
- # This is used when calling delete_network. Neutron checks to see if
- # the network has any ports.
- if filters.get("network_id"):
- # FIXME (Aaron) If we get more than one network_id this won't work
- lswitch = filters["network_id"][0]
- else:
- lswitch = "*"
-
- if filters.get("device_id"):
- for vm_id in filters.get("device_id"):
- vm_filter = ("%stag_scope=vm_id&tag=%s&" % (vm_filter,
- hashlib.sha1(vm_id).hexdigest()))
- else:
- vm_id = ""
-
- if filters.get("tenant_id"):
- for tenant in filters.get("tenant_id"):
- tenant_filter = ("%stag_scope=os_tid&tag=%s&" %
- (tenant_filter, tenant))
-
- nvp_lports = {}
-
- lport_fields_str = ("tags,admin_status_enabled,display_name,"
- "fabric_status_up")
- try:
- lport_query_path = (
- "/ws.v1/lswitch/%s/lport?fields=%s&%s%stag_scope=q_port_id"
- "&relations=LogicalPortStatus" %
- (lswitch, lport_fields_str, vm_filter, tenant_filter))
-
- try:
- ports = nvplib.get_all_query_pages(lport_query_path,
- self.cluster)
- except q_exc.NotFound:
- LOG.warn(_("Lswitch %s not found in NVP"), lswitch)
- ports = None
-
- if ports:
- for port in ports:
- for tag in port["tags"]:
- if tag["scope"] == "q_port_id":
- nvp_lports[tag["tag"]] = port
- except Exception:
- err_msg = _("Unable to get ports")
- LOG.exception(err_msg)
- raise nvp_exc.NvpPluginException(err_msg=err_msg)
-
- lports = []
- for neutron_lport in neutron_lports:
- # if a neutron port is not found in NVP, this migth be because
- # such port is not mapped to a logical switch - ie: floating ip
- if neutron_lport['device_owner'] in (l3_db.DEVICE_OWNER_FLOATINGIP,
- l3_db.DEVICE_OWNER_ROUTER_GW):
- lports.append(neutron_lport)
- continue
- try:
- neutron_lport["admin_state_up"] = (
- nvp_lports[neutron_lport["id"]]["admin_status_enabled"])
-
- if (nvp_lports[neutron_lport["id"]]
- ["_relations"]
- ["LogicalPortStatus"]
- ["fabric_status_up"]):
- neutron_lport["status"] = constants.PORT_STATUS_ACTIVE
- else:
- neutron_lport["status"] = constants.PORT_STATUS_DOWN
-
- del nvp_lports[neutron_lport["id"]]
- except KeyError:
- neutron_lport["status"] = constants.PORT_STATUS_ERROR
- LOG.debug(_("Neutron logical port %s was not found on NVP"),
- neutron_lport['id'])
-
- lports.append(neutron_lport)
- # do not make the case in which ports are found in NVP
- # but not in Neutron catastrophic.
- if nvp_lports:
- LOG.warning(_("Found %s logical ports not bound "
- "to Neutron ports. Neutron and NVP are "
- "potentially out of sync"), len(nvp_lports))
-
- if fields:
- ret_fields = []
- for lport in lports:
- row = {}
- for field in fields:
- row[field] = lport[field]
- ret_fields.append(row)
- return ret_fields
- return lports
+ ports = super(NvpPluginV2, self).get_ports(context, filters)
+ for port in ports:
+ self._extend_port_qos_queue(context, port)
+ return [self._fields(port, fields) for port in ports]
def create_port(self, context, port):
# If PORTSECURITY is not the default value ATTR_NOT_SPECIFIED
def get_port(self, context, id, fields=None):
with context.session.begin(subtransactions=True):
- neutron_db_port = super(NvpPluginV2, self).get_port(context,
- id, fields)
- self._extend_port_qos_queue(context, neutron_db_port)
-
- if self._network_is_external(context,
- neutron_db_port['network_id']):
- return neutron_db_port
- nvp_id = self._nvp_get_port_id(context, self.cluster,
- neutron_db_port)
- # If there's no nvp IP do not bother going to NVP and put
- # the port in error state
- if nvp_id:
- # Find the NVP port corresponding to neutron port_id
- # Do not query by nvp id as the port might be on
- # an extended switch and we do not store the extended
- # switch uuid
- results = nvplib.query_lswitch_lports(
- self.cluster, '*',
- relations='LogicalPortStatus',
- filters={'tag': id, 'tag_scope': 'q_port_id'})
- if results:
- port = results[0]
- port_status = port["_relations"]["LogicalPortStatus"]
- neutron_db_port["admin_state_up"] = (
- port["admin_status_enabled"])
- if port_status["fabric_status_up"]:
- neutron_db_port["status"] = (
- constants.PORT_STATUS_ACTIVE)
- else:
- neutron_db_port["status"] = (
- constants.PORT_STATUS_DOWN)
- else:
- neutron_db_port["status"] = (
- constants.PORT_STATUS_ERROR)
+ if fields and 'status' in fields:
+ # Perform explicit state synchronization
+ db_port = self._get_port(context, id)
+ self._synchronizer.synchronize_port(
+ context, db_port)
+ port = self._make_port_dict(db_port, fields)
else:
- neutron_db_port["status"] = constants.PORT_STATUS_ERROR
- return neutron_db_port
+ port = super(NvpPluginV2, self).get_port(context, id, fields)
+ self._extend_port_qos_queue(context, port)
+ return port
+
+ def get_router(self, context, id, fields=None):
+ if fields and 'status' in fields:
+ db_router = self._get_router(context, id)
+ # Perform explicit state synchronization
+ self._synchronizer.synchronize_router(
+ context, db_router)
+ return self._make_router_dict(db_router, fields)
+ else:
+ return super(NvpPluginV2, self).get_router(context, id, fields)
def create_router(self, context, router):
# NOTE(salvatore-orlando): We completely override this method in
err_msg=(_("Unable to delete logical router '%s'"
"on NVP Platform") % router_id))
- def get_router(self, context, id, fields=None):
- router = self._get_router(context, id)
- try:
- lrouter = nvplib.get_lrouter(self.cluster, id)
- relations = lrouter.get('_relations')
- if relations:
- lrouter_status = relations.get('LogicalRouterStatus')
- # FIXME(salvatore-orlando): Being unable to fetch the
- # logical router status should be an exception.
- if lrouter_status:
- router_op_status = (lrouter_status.get('fabric_status')
- and constants.NET_STATUS_ACTIVE or
- constants.NET_STATUS_DOWN)
- except q_exc.NotFound:
- lrouter = {}
- router_op_status = constants.NET_STATUS_ERROR
- if router_op_status != router.status:
- LOG.debug(_("Current router status:%(router_status)s;"
- "Status in Neutron DB:%(db_router_status)s"),
- {'router_status': router_op_status,
- 'db_router_status': router.status})
- # update the router status
- with context.session.begin(subtransactions=True):
- router.status = router_op_status
- return self._make_router_dict(router, fields)
-
- def get_routers(self, context, filters=None, fields=None):
- router_query = self._apply_filters_to_query(
- self._model_query(context, l3_db.Router),
- l3_db.Router, filters)
- routers = router_query.all()
- # Query routers on NVP for updating operational status
- if context.is_admin and not filters.get("tenant_id"):
- tenant_id = None
- elif 'tenant_id' in filters:
- tenant_id = filters.get('tenant_id')[0]
- del filters['tenant_id']
- else:
- tenant_id = context.tenant_id
- try:
- nvp_lrouters = nvplib.get_lrouters(self.cluster,
- tenant_id,
- fields)
- except NvpApiClient.NvpApiException:
- err_msg = _("Unable to get logical routers from NVP controller")
- LOG.exception(err_msg)
- raise nvp_exc.NvpPluginException(err_msg=err_msg)
-
- nvp_lrouters_dict = {}
- for nvp_lrouter in nvp_lrouters:
- nvp_lrouters_dict[nvp_lrouter['uuid']] = nvp_lrouter
- for router in routers:
- nvp_lrouter = nvp_lrouters_dict.get(router['id'])
- if nvp_lrouter:
- if (nvp_lrouter["_relations"]["LogicalRouterStatus"]
- ["fabric_status"]):
- router.status = constants.NET_STATUS_ACTIVE
- else:
- router.status = constants.NET_STATUS_DOWN
- nvp_lrouters.remove(nvp_lrouter)
- else:
- router.status = constants.NET_STATUS_ERROR
-
- # do not make the case in which routers are found in NVP
- # but not in Neutron catastrophic.
- if nvp_lrouters:
- LOG.warning(_("Found %s logical routers not bound "
- "to Neutron routers. Neutron and NVP are "
- "potentially out of sync"), len(nvp_lrouters))
- return [self._make_router_dict(router, fields) for router in routers]
-
def add_router_interface(self, context, router_id, interface_info):
# When adding interface by port_id we need to create the
# peer port on the nvp logical router in this routine
help=_("The default network tranport type to use (stt, gre, "
"bridge, ipsec_gre, or ipsec_stt)")),
cfg.StrOpt('agent_mode', default=AgentModes.AGENT,
- help=_("The mode used to implement DHCP/metadata services.")),
+ help=_("The mode used to implement DHCP/metadata services."))
+]
+
+sync_opts = [
+ cfg.IntOpt('state_sync_interval', default=120,
+ help=_("Interval in seconds between runs of the state "
+ "synchronization task. Set it to 0 to disable it")),
+ cfg.IntOpt('max_random_sync_delay', default=0,
+ help=_("Maximum value for the additional random "
+ "delay in seconds between runs of the state "
+ "synchronization task")),
+ cfg.IntOpt('min_sync_req_delay', default=10,
+ help=_('Minimum delay, in seconds, between two state '
+ 'synchronization queries to NVP. It must not '
+ 'exceed state_sync_interval')),
+ cfg.IntOpt('min_chunk_size', default=500,
+ help=_('Minimum number of resources to be retrieved from NVP '
+ 'during state synchronization'))
]
connection_opts = [
cfg.CONF.register_opts(connection_opts)
cfg.CONF.register_opts(cluster_opts)
cfg.CONF.register_opts(nvp_opts, "NVP")
+cfg.CONF.register_opts(sync_opts, "NVP_SYNC")
+
# NOTE(armando-migliaccio): keep the following code until we support
# NVP configuration files in older format (Grizzly or older).
# ### BEGIN
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Nicira, Inc.
+# All Rights Reserved
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import random
+
+from neutron.common import constants
+from neutron.common import exceptions
+from neutron import context
+from neutron.db import l3_db
+from neutron.db import models_v2
+from neutron.openstack.common import jsonutils
+from neutron.openstack.common import log
+from neutron.openstack.common import loopingcall
+from neutron.openstack.common import timeutils
+from neutron.plugins.nicira.common import exceptions as nvp_exc
+from neutron.plugins.nicira import NvpApiClient
+from neutron.plugins.nicira import nvplib
+
+LOG = log.getLogger(__name__)
+
+
+class NvpCache(object):
+ """A simple Cache for NVP resources.
+
+ Associates resource id with resource hash to rapidly identify
+ updated resources.
+ Each entry in the cache also stores the following information:
+ - changed: the resource in the cache has been altered following
+ an update or a delete
+ - hit: the resource has been visited during an update (and possibly
+ left unchanged)
+ - data: current resource data
+ - data_bk: backup of resource data prior to its removal
+ """
+
+ def __init__(self):
+ # Maps a uuid to the dict containing it
+ self._uuid_dict_mappings = {}
+ # Dicts for NVP cached resources
+ self._lswitches = {}
+ self._lswitchports = {}
+ self._lrouters = {}
+
+ def __getitem__(self, key):
+ # uuids are unique across the various types of resources
+ # TODO(salv-orlando): Avoid lookups over all dictionaries
+ # when retrieving items
+ # Fetch lswitches, lports, or lrouters
+ resources = self._uuid_dict_mappings[key]
+ return resources[key]
+
+ def _update_resources(self, resources, new_resources):
+ # Clear the 'changed' attribute for all items
+ for uuid, item in resources.items():
+ if item.pop('changed', None) and not item.get('data'):
+ # The item is not anymore in NVP, so delete it
+ del resources[uuid]
+ del self._uuid_dict_mappings[uuid]
+
+ def do_hash(item):
+ return hash(jsonutils.dumps(item))
+
+ # Parse new data and identify new, deleted, and updated resources
+ for item in new_resources:
+ item_id = item['uuid']
+ if resources.get(item_id):
+ new_hash = do_hash(item)
+ if new_hash != resources[item_id]['hash']:
+ resources[item_id]['hash'] = new_hash
+ resources[item_id]['changed'] = True
+ resources[item_id]['data_bk'] = (
+ resources[item_id]['data'])
+ resources[item_id]['data'] = item
+ # Mark the item as hit in any case
+ resources[item_id]['hit'] = True
+ else:
+ resources[item_id] = {'hash': do_hash(item)}
+ resources[item_id]['hit'] = True
+ resources[item_id]['changed'] = True
+ resources[item_id]['data'] = item
+ # add a uuid to dict mapping for easy retrieval
+ # with __getitem__
+ self._uuid_dict_mappings[item_id] = resources
+
+ def _delete_resources(self, resources):
+ # Mark for removal all the elements which have not been visited.
+ # And clear the 'hit' attribute.
+ for to_delete in [k for (k, v) in resources.iteritems()
+ if not v.pop('hit', False)]:
+ resources[to_delete]['changed'] = True
+ resources[to_delete]['data_bk'] = (
+ resources[to_delete].pop('data', None))
+
+ def _get_resource_ids(self, resources, changed_only):
+ if changed_only:
+ return [k for (k, v) in resources.iteritems()
+ if v.get('changed')]
+ return resources.keys()
+
+ def get_lswitches(self, changed_only=False):
+ return self._get_resource_ids(self._lswitches, changed_only)
+
+ def get_lrouters(self, changed_only=False):
+ return self._get_resource_ids(self._lrouters, changed_only)
+
+ def get_lswitchports(self, changed_only=False):
+ return self._get_resource_ids(self._lswitchports, changed_only)
+
+ def update_lswitch(self, lswitch):
+ self._update_resources(self._lswitches, [lswitch])
+
+ def update_lrouter(self, lrouter):
+ self._update_resources(self._lrouters, [lrouter])
+
+ def update_lswitchport(self, lswitchport):
+ self._update_resources(self._lswitchports, [lswitchport])
+
+ def process_updates(self, lswitches=None,
+ lrouters=None, lswitchports=None):
+ self._update_resources(self._lswitches, lswitches)
+ self._update_resources(self._lrouters, lrouters)
+ self._update_resources(self._lswitchports, lswitchports)
+ return (self._get_resource_ids(self._lswitches, changed_only=True),
+ self._get_resource_ids(self._lrouters, changed_only=True),
+ self._get_resource_ids(self._lswitchports, changed_only=True))
+
+ def process_deletes(self):
+ self._delete_resources(self._lswitches)
+ self._delete_resources(self._lrouters)
+ self._delete_resources(self._lswitchports)
+ return (self._get_resource_ids(self._lswitches, changed_only=True),
+ self._get_resource_ids(self._lrouters, changed_only=True),
+ self._get_resource_ids(self._lswitchports, changed_only=True))
+
+
+class SyncParameters():
+ """Defines attributes used by the synchronization procedure.
+
+ chunk_size: Actual chunk size
+ extra_chunk_size: Additional data to fetch because of chunk size
+ adjustment
+ current_chunk: Counter of the current data chunk being synchronized
+ Page cursors: markers for the next resource to fetch.
+ 'start' means page cursor unset for fetching 1st page
+ init_sync_performed: True if the initial synchronization concluded
+ """
+
+ def __init__(self, min_chunk_size):
+ self.chunk_size = min_chunk_size
+ self.extra_chunk_size = 0
+ self.current_chunk = 0
+ self.ls_cursor = 'start'
+ self.lr_cursor = 'start'
+ self.lp_cursor = 'start'
+ self.init_sync_performed = False
+ self.total_size = 0
+
+
+def _start_loopingcall(min_chunk_size, state_sync_interval, func):
+ """Start a loopingcall for the synchronization task."""
+ # Start a looping call to synchronize operational status
+ # for neutron resources
+ if not state_sync_interval:
+ # do not start the looping call if specified
+ # sync interval is 0
+ return
+ state_synchronizer = loopingcall.DynamicLoopingCall(
+ func, sp=SyncParameters(min_chunk_size))
+ state_synchronizer.start(
+ periodic_interval_max=state_sync_interval)
+
+
+class NvpSynchronizer():
+
+ LS_URI = nvplib._build_uri_path(
+ nvplib.LSWITCH_RESOURCE, fields='uuid,tags,fabric_status',
+ relations='LogicalSwitchStatus')
+ LR_URI = nvplib._build_uri_path(
+ nvplib.LROUTER_RESOURCE, fields='uuid,tags,fabric_status',
+ relations='LogicalRouterStatus')
+ LP_URI = nvplib._build_uri_path(
+ nvplib.LSWITCHPORT_RESOURCE,
+ parent_resource_id='*',
+ fields='uuid,tags,fabric_status,link_status_up',
+ relations='LogicalPortStatus')
+
+ def __init__(self, plugin, cluster, state_sync_interval,
+ req_delay, min_chunk_size, max_rand_delay=0):
+ random.seed()
+ self._nvp_cache = NvpCache()
+ # Store parameters as instance members
+ # NOTE(salv-orlando): apologies if it looks java-ish
+ self._plugin = plugin
+ self._cluster = cluster
+ self._req_delay = req_delay
+ self._sync_interval = state_sync_interval
+ self._max_rand_delay = max_rand_delay
+ # Validate parameters
+ if self._sync_interval < self._req_delay:
+ err_msg = (_("Minimum request delay:%(req_delay)s must not "
+ "exceed synchronization interval:%(sync_interval)s") %
+ {'req_delay': self._req_delay,
+ 'sync_interval': self._sync_interval})
+ LOG.error(err_msg)
+ raise nvp_exc.NvpPluginException(err_msg=err_msg)
+ # Backoff time in case of failures while fetching sync data
+ self._sync_backoff = 1
+ _start_loopingcall(min_chunk_size, state_sync_interval,
+ self._synchronize_state)
+
+ def _get_tag_dict(self, tags):
+ return dict((tag.get('scope'), tag['tag']) for tag in tags)
+
+ def _update_neutron_object(self, context, neutron_data, status):
+ if status == neutron_data['status']:
+ # do nothing
+ return
+ with context.session.begin(subtransactions=True):
+ LOG.debug(_("Updating status for neutron resource %(q_id)s to: "
+ "%(status)s"), {'q_id': neutron_data['id'],
+ 'status': status})
+ neutron_data['status'] = status
+ context.session.add(neutron_data)
+
+ def synchronize_network(self, context, neutron_network_data,
+ lswitches=None):
+ """Synchronize a Neutron network with its NVP counterpart.
+
+ This routine synchronizes a set of switches when a Neutron
+ network is mapped to multiple lswitches.
+ """
+ if not lswitches:
+ # Try to get logical switches from nvp
+ try:
+ lswitches = nvplib.get_lswitches(
+ self._cluster, neutron_network_data['id'])
+ except exceptions.NetworkNotFound:
+ # TODO(salv-orlando): We should be catching
+ # NvpApiClient.ResourceNotFound here
+ # The logical switch was not found
+ LOG.warning(_("Logical switch for neutron network %s not "
+ "found on NVP."), neutron_network_data['id'])
+ lswitches = []
+ else:
+ for lswitch in lswitches:
+ self._nvp_cache.update_lswitch(lswitch)
+ # By default assume things go wrong
+ status = constants.NET_STATUS_ERROR
+ # In most cases lswitches will contain a single element
+ for ls in lswitches:
+ if not ls:
+ # Logical switch was deleted
+ break
+ ls_status = ls['_relations']['LogicalSwitchStatus']
+ if not ls_status['fabric_status']:
+ status = constants.NET_STATUS_DOWN
+ break
+ else:
+ # No switch was down or missing. Set status to ACTIVE unless
+ # there were no switches in the first place!
+ if lswitches:
+ status = constants.NET_STATUS_ACTIVE
+ # Update db object
+ self._update_neutron_object(context, neutron_network_data, status)
+
+ def _synchronize_lswitches(self, ctx, ls_uuids, scan_missing=False):
+ if not ls_uuids and not scan_missing:
+ return
+ neutron_net_ids = set()
+ neutron_nvp_mappings = {}
+ # TODO(salvatore-orlando): Deal with the case the tag
+ # has been tampered with
+ for ls_uuid in ls_uuids:
+ # If the lswitch has been deleted, get backup copy of data
+ lswitch = (self._nvp_cache[ls_uuid].get('data') or
+ self._nvp_cache[ls_uuid].get('data_bk'))
+ tags = self._get_tag_dict(lswitch['tags'])
+ neutron_id = tags.get('neutron_net_id', ls_uuid)
+ neutron_net_ids.add(neutron_id)
+ neutron_nvp_mappings[neutron_id] = (
+ neutron_nvp_mappings.get(neutron_id, []) +
+ [self._nvp_cache[ls_uuid]])
+ with ctx.session.begin(subtransactions=True):
+ # Fetch neutron networks from database
+ filters = {'router:external': [False]}
+ if not scan_missing:
+ filters['id'] = neutron_net_ids
+ # TODO(salv-orlando): Filter out external networks
+ for network in self._plugin._get_collection_query(
+ ctx, models_v2.Network, filters=filters):
+ lswitches = neutron_nvp_mappings.get(network['id'], [])
+ lswitches = [lswitch.get('data') for lswitch in lswitches]
+ self.synchronize_network(ctx, network, lswitches)
+
+ def synchronize_router(self, context, neutron_router_data,
+ lrouter=None):
+ """Synchronize a neutron router with its NVP counterpart."""
+ if not lrouter:
+ # Try to get router from nvp
+ try:
+ # This query will return the logical router status too
+ lrouter = nvplib.get_lrouter(
+ self._cluster, neutron_router_data['id'])
+ except exceptions.NotFound:
+ # NOTE(salv-orlando): We should be catching
+ # NvpApiClient.ResourceNotFound here
+ # The logical router was not found
+ LOG.warning(_("Logical router for neutron router %s not "
+ "found on NVP."), neutron_router_data['id'])
+ lrouter = None
+ else:
+ # Update the cache
+ self._nvp_cache.update_lrouter(lrouter)
+
+ # Note(salv-orlando): It might worth adding a check to verify neutron
+ # resource tag in nvp entity matches a Neutron id.
+ # By default assume things go wrong
+ status = constants.NET_STATUS_ERROR
+ if lrouter:
+ lr_status = (lrouter['_relations']
+ ['LogicalRouterStatus']
+ ['fabric_status'])
+ status = (lr_status and
+ constants.NET_STATUS_ACTIVE
+ or constants.NET_STATUS_DOWN)
+ # Update db object
+ self._update_neutron_object(context, neutron_router_data, status)
+
+ def _synchronize_lrouters(self, ctx, lr_uuids, scan_missing=False):
+ if not lr_uuids and not scan_missing:
+ return
+ neutron_router_mappings = (
+ dict((lr_uuid, self._nvp_cache[lr_uuid]) for lr_uuid in lr_uuids))
+ with ctx.session.begin(subtransactions=True):
+ # Fetch neutron routers from database
+ filters = ({} if scan_missing else
+ {'id': neutron_router_mappings.keys()})
+ for router in self._plugin._get_collection_query(
+ ctx, l3_db.Router, filters=filters):
+ lrouter = neutron_router_mappings.get(router['id'])
+ self.synchronize_router(
+ ctx, router, lrouter and lrouter.get('data'))
+
+ def synchronize_port(self, context, neutron_port_data,
+ lswitchport=None, ext_networks=None):
+ """Synchronize a Neutron port with its NVP counterpart."""
+ # Skip synchronization for ports on external networks
+ if not ext_networks:
+ ext_networks = [net['id'] for net in context.session.query(
+ models_v2.Network).join(
+ l3_db.ExternalNetwork,
+ (models_v2.Network.id ==
+ l3_db.ExternalNetwork.network_id))]
+ if neutron_port_data['network_id'] in ext_networks:
+ with context.session.begin(subtransactions=True):
+ neutron_port_data['status'] = constants.PORT_STATUS_ACTIVE
+ return
+
+ if not lswitchport:
+ # Try to get port from nvp
+ try:
+ lp_uuid = self._plugin._nvp_get_port_id(
+ context, self._cluster, neutron_port_data)
+ if lp_uuid:
+ lswitchport = nvplib.get_port(
+ self._cluster, neutron_port_data['network_id'],
+ lp_uuid, relations='LogicalPortStatus')
+ except exceptions.PortNotFoundOnNetwork:
+ # NOTE(salv-orlando): We should be catching
+ # NvpApiClient.ResourceNotFound here
+ # The logical switch port was not found
+ LOG.warning(_("Logical switch port for neutron port %s "
+ "not found on NVP."), neutron_port_data['id'])
+ lswitchport = None
+ else:
+ # Update the cache
+ self._nvp_cache.update_lswitchport(lswitchport)
+
+ # Note(salv-orlando): It might worth adding a check to verify neutron
+ # resource tag in nvp entity matches Neutron id.
+ # By default assume things go wrong
+ status = constants.PORT_STATUS_ERROR
+ if lswitchport:
+ lp_status = (lswitchport['_relations']
+ ['LogicalPortStatus']
+ ['link_status_up'])
+ status = (lp_status and
+ constants.PORT_STATUS_ACTIVE
+ or constants.PORT_STATUS_DOWN)
+ # Update db object
+ self._update_neutron_object(context, neutron_port_data, status)
+
+ def _synchronize_lswitchports(self, ctx, lp_uuids, scan_missing=False):
+ if not lp_uuids and not scan_missing:
+ return
+ # Find Neutron port id by tag - the tag is already
+ # loaded in memory, no reason for doing a db query
+ # TODO(salvatore-orlando): Deal with the case the tag
+ # has been tampered with
+ neutron_port_mappings = {}
+ for lp_uuid in lp_uuids:
+ lport = (self._nvp_cache[lp_uuid].get('data') or
+ self._nvp_cache[lp_uuid].get('data_bk'))
+ tags = self._get_tag_dict(lport['tags'])
+ neutron_port_id = tags.get('q_port_id')
+ if neutron_port_id:
+ neutron_port_mappings[neutron_port_id] = (
+ self._nvp_cache[lp_uuid])
+ with ctx.session.begin(subtransactions=True):
+ # Fetch neutron ports from database
+ # At the first sync we need to fetch all ports
+ filters = ({} if scan_missing else
+ {'id': neutron_port_mappings.keys()})
+ # TODO(salv-orlando): Work out a solution for avoiding
+ # this query
+ ext_nets = [net['id'] for net in ctx.session.query(
+ models_v2.Network).join(
+ l3_db.ExternalNetwork,
+ (models_v2.Network.id ==
+ l3_db.ExternalNetwork.network_id))]
+ for port in self._plugin._get_collection_query(
+ ctx, models_v2.Port, filters=filters):
+ lswitchport = neutron_port_mappings.get(port['id'])
+ self.synchronize_port(
+ ctx, port, lswitchport and lswitchport.get('data'),
+ ext_networks=ext_nets)
+
+ def _get_chunk_size(self, sp):
+ # NOTE(salv-orlando): Try to use __future__ for this routine only?
+ ratio = ((float(sp.total_size) / float(sp.chunk_size)) /
+ (float(self._sync_interval) / float(self._req_delay)))
+ new_size = max(1.0, ratio) * float(sp.chunk_size)
+ return int(new_size) + (new_size - int(new_size) > 0)
+
+ def _fetch_data(self, uri, cursor, page_size):
+ # If not cursor there is nothing to retrieve
+ if cursor:
+ if cursor == 'start':
+ cursor = None
+ # Chunk size tuning might, in some conditions, make it larger
+ # than 5,000, which is the maximum page size allowed by the NVP
+ # API. In this case the request should be split in multiple
+ # requests. This is not ideal, and therefore a log warning will
+ # be emitted.
+ requests = range(0, page_size / (nvplib.MAX_PAGE_SIZE + 1) + 1)
+ if len(requests) > 1:
+ LOG.warn(_("Requested page size is %(cur_chunk_size)d."
+ "It might be necessary to do %(num_requests)d "
+ "round-trips to NVP for fetching data. Please "
+ "tune sync parameters to ensure chunk size "
+ "is less than %(max_page_size)d"),
+ {'cur_chunk_size': page_size,
+ 'num_requests': len(requests),
+ 'max_page_size': nvplib.MAX_PAGE_SIZE})
+ results = []
+ actual_size = 0
+ for _req in requests:
+ req_results, cursor, req_size = nvplib.get_single_query_page(
+ uri, self._cluster, cursor,
+ min(page_size, nvplib.MAX_PAGE_SIZE))
+ results.extend(req_results)
+ actual_size = actual_size + req_size
+ # If no cursor is returned break the cycle as there is no
+ # actual need to perform multiple requests (all fetched)
+ # This happens when the overall size of resources exceeds
+ # the maximum page size, but the number for each single
+ # resource type is below this threshold
+ if not cursor:
+ break
+ # reset cursor before returning if we queried just to
+ # know the number of entities
+ return results, cursor if page_size else 'start', actual_size
+ return [], cursor, None
+
+ def _fetch_nvp_data_chunk(self, sp):
+ base_chunk_size = sp.chunk_size
+ chunk_size = base_chunk_size + sp.extra_chunk_size
+ LOG.info(_("Fetching up to %s resources "
+ "from NVP backend"), chunk_size)
+ fetched = ls_count = lr_count = lp_count = 0
+ lswitches = lrouters = lswitchports = []
+ if sp.ls_cursor or sp.ls_cursor == 'start':
+ (lswitches, sp.ls_cursor, ls_count) = self._fetch_data(
+ self.LS_URI, sp.ls_cursor, chunk_size)
+ fetched = len(lswitches)
+ if fetched < chunk_size and sp.lr_cursor or sp.lr_cursor == 'start':
+ (lrouters, sp.lr_cursor, lr_count) = self._fetch_data(
+ self.LR_URI, sp.lr_cursor, max(chunk_size - fetched, 0))
+ fetched += len(lrouters)
+ if fetched < chunk_size and sp.lp_cursor or sp.lp_cursor == 'start':
+ (lswitchports, sp.lp_cursor, lp_count) = self._fetch_data(
+ self.LP_URI, sp.lp_cursor, max(chunk_size - fetched, 0))
+ fetched += len(lswitchports)
+ if sp.current_chunk == 0:
+ # No cursors were provided. Then it must be possible to
+ # calculate the total amount of data to fetch
+ sp.total_size = ls_count + lr_count + lp_count
+ LOG.debug(_("Total data size: %d"), sp.total_size)
+ sp.chunk_size = self._get_chunk_size(sp)
+ # Calculate chunk size adjustment
+ sp.extra_chunk_size = sp.chunk_size - base_chunk_size
+ LOG.debug(_("Fetched %(num_lswitches)d logical switches, "
+ "%(num_lswitchports)d logical switch ports,"
+ "%(num_lrouters)d logical routers"),
+ {'num_lswitches': len(lswitches),
+ 'num_lswitchports': len(lswitchports),
+ 'num_lrouters': len(lrouters)})
+ return (lswitches, lrouters, lswitchports)
+
+ def _synchronize_state(self, sp):
+ # If the plugin has been destroyed, stop the LoopingCall
+ if not self._plugin:
+ raise loopingcall.LoopingCallDone
+ start = timeutils.utcnow()
+ # Reset page cursor variables if necessary
+ if sp.current_chunk == 0:
+ sp.ls_cursor = sp.lr_cursor = sp.lp_cursor = 'start'
+ LOG.info(_("Running state synchronization task. Chunk: %s"),
+ sp.current_chunk)
+ # Fetch chunk_size data from NVP
+ try:
+ (lswitches, lrouters, lswitchports) = (
+ self._fetch_nvp_data_chunk(sp))
+ except (NvpApiClient.RequestTimeout, NvpApiClient.NvpApiException):
+ sleep_interval = self._sync_backoff
+ # Cap max back off to 64 seconds
+ self._sync_backoff = min(self._sync_backoff * 2, 64)
+ LOG.exception(_("An error occured while communicating with "
+ "NVP backend. Will retry synchronization "
+ "in %d seconds"), sleep_interval)
+ return sleep_interval
+ LOG.debug(_("Time elapsed querying NVP: %s"),
+ timeutils.utcnow() - start)
+ if sp.total_size:
+ num_chunks = ((sp.total_size / sp.chunk_size) +
+ (sp.total_size % sp.chunk_size != 0))
+ else:
+ num_chunks = 1
+ LOG.debug(_("Number of chunks: %d"), num_chunks)
+ # Find objects which have changed on NVP side and need
+ # to be synchronized
+ (ls_uuids, lr_uuids, lp_uuids) = self._nvp_cache.process_updates(
+ lswitches, lrouters, lswitchports)
+ # Process removed objects only at the last chunk
+ scan_missing = (sp.current_chunk == num_chunks - 1 and
+ not sp.init_sync_performed)
+ if sp.current_chunk == num_chunks - 1:
+ self._nvp_cache.process_deletes()
+ ls_uuids = self._nvp_cache.get_lswitches(
+ changed_only=not scan_missing)
+ lr_uuids = self._nvp_cache.get_lrouters(
+ changed_only=not scan_missing)
+ lp_uuids = self._nvp_cache.get_lswitchports(
+ changed_only=not scan_missing)
+ LOG.debug(_("Time elapsed hashing data: %s"),
+ timeutils.utcnow() - start)
+ # Get an admin context
+ ctx = context.get_admin_context()
+ # Synchronize with database
+ with ctx.session.begin(subtransactions=True):
+ self._synchronize_lswitches(ctx, ls_uuids,
+ scan_missing=scan_missing)
+ self._synchronize_lrouters(ctx, lr_uuids,
+ scan_missing=scan_missing)
+ self._synchronize_lswitchports(ctx, lp_uuids,
+ scan_missing=scan_missing)
+ # Increase chunk counter
+ LOG.info(_("Synchronization for chunk %(chunk_num)d of "
+ "%(total_chunks)d performed"),
+ {'chunk_num': sp.current_chunk + 1,
+ 'total_chunks': num_chunks})
+ sp.current_chunk = (sp.current_chunk + 1) % num_chunks
+ added_delay = 0
+ if sp.current_chunk == 0:
+ # Ensure init_sync_performed is True
+ if not sp.init_sync_performed:
+ sp.init_sync_performed = True
+ # Add additional random delay
+ added_delay = random.randint(0, self._max_rand_delay)
+ LOG.debug(_("Time elapsed at end of sync: %s"),
+ timeutils.utcnow() - start)
+ return self._sync_interval / num_chunks + added_delay
"to_src_ip_max"]
DNAT_KEYS = ["to_dst_port", "to_dst_ip_min", "to_dst_ip_max"]
-
+# Maximum page size for a single request
+# NOTE(salv-orlando): This might become a version-dependent map should the
+# limit be raised in future versions
+MAX_PAGE_SIZE = 5000
# TODO(bgh): it would be more efficient to use a bitmap
taken_context_ids = []
return version
+def get_single_query_page(path, cluster, page_cursor=None,
+ page_length=1000, neutron_only=True):
+ params = []
+ if page_cursor:
+ params.append("_page_cursor=%s" % page_cursor)
+ params.append("_page_length=%s" % page_length)
+ # NOTE(salv-orlando): On the NVP backend the 'Quantum' tag is still
+ # used for marking Neutron entities in order to preserve compatibility
+ if neutron_only:
+ params.append("tag_scope=quantum")
+ query_params = "&".join(params)
+ path = "%s%s%s" % (path, "&" if (path.find("?") != -1) else "?",
+ query_params)
+ body = do_request(HTTP_GET, path, cluster=cluster)
+ # Result_count won't be returned if _page_cursor is supplied
+ return body['results'], body.get('page_cursor'), body.get('result_count')
+
+
def get_all_query_pages(path, c):
need_more_results = True
result_list = []
page_cursor = None
- query_marker = "&" if (path.find("?") != -1) else "?"
while need_more_results:
- page_cursor_str = (
- "_page_cursor=%s" % page_cursor if page_cursor else "")
- body = do_request(HTTP_GET,
- "%s%s%s" % (path, query_marker, page_cursor_str),
- cluster=c)
- page_cursor = body.get('page_cursor')
+ results, page_cursor = get_single_query_page(
+ path, c, page_cursor)[:2]
if not page_cursor:
need_more_results = False
- result_list.extend(body['results'])
+ result_list.extend(results)
return result_list
"lport_admin_up_count": %(lport_count)d,
"_schema": "/ws.v1/schema/LogicalRouterStatus",
"lport_count": %(lport_count)d,
- "fabric_status": true,
+ "fabric_status": %(status)s,
"type": "LogicalRouterStatus",
"lport_link_up_count": %(lport_count)d
}
"_href": "/ws.v1/lswitch/%(uuid)s",
"_schema": "/ws.v1/schema/LogicalSwitchConfig",
"_relations": {"LogicalSwitchStatus":
- {"fabric_status": true,
+ {"fabric_status": %(status)s,
"type": "LogicalSwitchStatus",
"lport_count": %(lport_count)d,
"_href": "/ws.v1/lswitch/%(uuid)s/status",
{"LogicalPortStatus":
{"type": "LogicalSwitchPortStatus",
"admin_status_enabled": true,
- "fabric_status_up": false,
+ "fabric_status_up": %(status)s,
+ "link_status_up": %(status)s,
"_href": "/ws.v1/lswitch/%(ls_uuid)s/lport/%(uuid)s/status",
"_schema": "/ws.v1/schema/LogicalSwitchPortStatus"},
"LogicalSwitchConfig":
LROUTER_LPORT_RESOURCE: ['LogicalPortAttachment'],
}
- _fake_lswitch_dict = {}
- _fake_lrouter_dict = {}
- _fake_lswitch_lport_dict = {}
- _fake_lrouter_lport_dict = {}
- _fake_lrouter_nat_dict = {}
- _fake_lswitch_lportstatus_dict = {}
- _fake_lrouter_lportstatus_dict = {}
- _fake_securityprofile_dict = {}
- _fake_lqueue_dict = {}
- _fake_gatewayservice_dict = {}
-
_validators = {
LSWITCH_RESOURCE: _validate_resource,
LSWITCH_LPORT_RESOURCE: _validate_resource,
def __init__(self, fake_files_path):
self.fake_files_path = fake_files_path
+ self._fake_lswitch_dict = {}
+ self._fake_lrouter_dict = {}
+ self._fake_lswitch_lport_dict = {}
+ self._fake_lrouter_lport_dict = {}
+ self._fake_lrouter_nat_dict = {}
+ self._fake_lswitch_lportstatus_dict = {}
+ self._fake_lrouter_lportstatus_dict = {}
+ self._fake_securityprofile_dict = {}
+ self._fake_lqueue_dict = {}
+ self._fake_gatewayservice_dict = {}
def _get_tag(self, resource, scope):
tags = [tag['tag'] for tag in resource['tags']
def _get_filters(self, querystring):
if not querystring:
- return (None, None)
+ return (None, None, None)
params = urlparse.parse_qs(querystring)
tag_filter = None
attr_filter = None
'tag': params['tag'][0]}
elif 'uuid' in params:
attr_filter = {'uuid': params['uuid'][0]}
- return (tag_filter, attr_filter)
+ # Handle page_length
+ # TODO(salv-orlando): Handle page cursor too
+ page_len = params.get('_page_length')
+ if page_len:
+ page_len = int(page_len[0])
+ else:
+ # Explicitly set it to None (avoid 0 or empty list)
+ page_len = None
+ return (tag_filter, attr_filter, page_len)
def _add_lswitch(self, body):
fake_lswitch = json.loads(body)
fake_lswitch['zone_uuid'] = zone_uuid
fake_lswitch['tenant_id'] = self._get_tag(fake_lswitch, 'os_tid')
fake_lswitch['lport_count'] = 0
+ # set status value
+ fake_lswitch['status'] = 'true'
return fake_lswitch
def _build_lrouter(self, body, uuid=None):
uuidutils.generate_uuid())
self._fake_lrouter_dict[fake_lrouter['uuid']] = fake_lrouter
fake_lrouter['lport_count'] = 0
+ # set status value
+ fake_lrouter['status'] = 'true'
return fake_lrouter
def _add_lqueue(self, body):
fake_lport_status['ls_uuid'] = fake_lswitch['uuid']
fake_lport_status['ls_name'] = fake_lswitch['display_name']
fake_lport_status['ls_zone_uuid'] = fake_lswitch['zone_uuid']
+ # set status value
+ fake_lport['status'] = 'true'
self._fake_lswitch_lportstatus_dict[new_uuid] = fake_lport_status
return fake_lport
def _list(self, resource_type, response_file,
parent_uuid=None, query=None, relations=None):
- (tag_filter, attr_filter) = self._get_filters(query)
+ (tag_filter, attr_filter, page_len) = self._get_filters(query)
with open("%s/%s" % (self.fake_files_path, response_file)) as f:
response_template = f.read()
res_dict = getattr(self, '_fake_%s_dict' % resource_type)
if (parent_func(res_uuid) and
_tag_match(res_uuid) and
_attr_match(res_uuid))]
- return json.dumps({'results': items,
- 'result_count': len(items)})
+ # Rather inefficient, but hey this is just a mock!
+ next_cursor = None
+ total_items = len(items)
+ if page_len:
+ try:
+ next_cursor = items[page_len]['uuid']
+ except IndexError:
+ next_cursor = None
+ items = items[:page_len]
+ response_dict = {'results': items,
+ 'result_count': total_items}
+ if next_cursor:
+ response_dict['page_cursor'] = next_cursor
+ return json.dumps(response_dict)
def _show(self, resource_type, response_file,
uuid1, uuid2=None, relations=None):
import mock
from neutron.common.test_lib import test_config
+from neutron.plugins.nicira.common import sync
from neutron.tests.unit.nicira import fake_nvpapiclient
from neutron.tests.unit.nicira import get_fake_conf
from neutron.tests.unit.nicira import NVPAPI_NAME
self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
self.mock_nvpapi = mock.patch(NVPAPI_NAME, autospec=True)
instance = self.mock_nvpapi.start()
+ # Avoid runs of the synchronizer looping call
+ patch_sync = mock.patch.object(sync, '_start_loopingcall')
+ patch_sync.start()
def _fake_request(*args, **kwargs):
return self.fc.fake_request(*args, **kwargs)
instance.return_value.request.side_effect = _fake_request
super(NVPDhcpAgentNotifierTestCase, self).setUp()
self.addCleanup(self.fc.reset_all)
+ self.addCleanup(patch_sync.stop)
self.addCleanup(self.mock_nvpapi.stop)
def _notification_mocks(self, hosts, mock_dhcp, net, subnet, port):
from neutron.common.test_lib import test_config
from neutron import context
from neutron.extensions import agent
+from neutron.plugins.nicira.common import sync
from neutron.plugins.nicira.NvpApiClient import NVPVersion
from neutron.tests.unit.nicira import fake_nvpapiclient
from neutron.tests.unit.nicira import get_fake_conf
self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
self.mock_nvpapi = mock.patch(NVPAPI_NAME, autospec=True)
instance = self.mock_nvpapi.start()
+ # Avoid runs of the synchronizer looping call
+ patch_sync = mock.patch.object(sync, '_start_loopingcall')
+ patch_sync.start()
def _fake_request(*args, **kwargs):
return self.fc.fake_request(*args, **kwargs)
cfg.CONF.set_override('metadata_mode', None, 'NVP')
self.addCleanup(self.fc.reset_all)
self.addCleanup(self.mock_nvpapi.stop)
+ self.addCleanup(patch_sync.stop)
self.addCleanup(self.restore_resource_attribute_map)
self.addCleanup(cfg.CONF.reset)
super(MacLearningDBTestCase, self).setUp()
from neutron import manager
from neutron.openstack.common import uuidutils
from neutron.plugins.nicira.common import exceptions as nvp_exc
+from neutron.plugins.nicira.common import sync
from neutron.plugins.nicira.dbexts import nicira_db
from neutron.plugins.nicira.dbexts import nicira_qos_db as qos_db
from neutron.plugins.nicira.extensions import distributedrouter as dist_router
self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
self.mock_nvpapi = mock.patch(NVPAPI_NAME, autospec=True)
self.mock_instance = self.mock_nvpapi.start()
+ # Avoid runs of the synchronizer looping call
+ patch_sync = mock.patch.object(sync, '_start_loopingcall')
+ patch_sync.start()
def _fake_request(*args, **kwargs):
return self.fc.fake_request(*args, **kwargs)
cfg.CONF.set_override('metadata_mode', None, 'NVP')
self.addCleanup(self.fc.reset_all)
self.addCleanup(self.mock_nvpapi.stop)
+ self.addCleanup(patch_sync.stop)
class TestNiciraBasicGet(test_plugin.TestBasicGet, NiciraPluginV2TestCase):
self.mock_nvpapi = mock.patch(NVPAPI_NAME, autospec=True)
instance = self.mock_nvpapi.start()
instance.return_value.login.return_value = "the_cookie"
+ # Avoid runs of the synchronizer looping call
+ patch_sync = mock.patch.object(sync, '_start_loopingcall')
+ patch_sync.start()
def _fake_request(*args, **kwargs):
return self.fc.fake_request(*args, **kwargs)
super(NiciraPortSecurityTestCase, self).setUp(PLUGIN_NAME)
self.addCleanup(self.fc.reset_all)
self.addCleanup(self.mock_nvpapi.stop)
+ self.addCleanup(patch_sync.stop)
class TestNiciraPortSecurity(NiciraPortSecurityTestCase,
self.mock_nvpapi = mock.patch(NVPAPI_NAME, autospec=True)
instance = self.mock_nvpapi.start()
instance.return_value.login.return_value = "the_cookie"
+ # Avoid runs of the synchronizer looping call
+ patch_sync = mock.patch.object(sync, '_start_loopingcall')
+ patch_sync.start()
def _fake_request(*args, **kwargs):
return fc.fake_request(*args, **kwargs)
instance.return_value.request.side_effect = _fake_request
+ self.addCleanup(self.mock_nvpapi.stop)
+ self.addCleanup(patch_sync.stop)
super(NiciraSecurityGroupsTestCase, self).setUp(PLUGIN_NAME)
- def tearDown(self):
- super(NiciraSecurityGroupsTestCase, self).tearDown()
- self.mock_nvpapi.stop()
-
class TestNiciraSecurityGroup(ext_sg.TestSecurityGroups,
NiciraSecurityGroupsTestCase):
res = req.get_response(self.api)
self.assertEqual(res.status_int, 204)
- def test_list_networks_not_in_nvp(self):
- res = self._create_network('json', 'net1', True)
- self.deserialize('json', res)
- self.fc._fake_lswitch_dict.clear()
- req = self.new_list_request('networks')
- nets = self.deserialize('json', req.get_response(self.api))
- self.assertEqual(nets['networks'][0]['status'],
- constants.NET_STATUS_ERROR)
-
def test_show_network_not_in_nvp(self):
res = self._create_network('json', 'net1', True)
net = self.deserialize('json', res)
self.fc._fake_lswitch_dict.clear()
- req = self.new_show_request('networks', net['network']['id'])
+ req = self.new_show_request('networks', net['network']['id'],
+ fields=['id', 'status'])
net = self.deserialize('json', req.get_response(self.api))
self.assertEqual(net['network']['status'],
constants.NET_STATUS_ERROR)
res = req.get_response(self.api)
self.assertEqual(res.status_int, 204)
- def test_list_port_not_in_nvp(self):
- res = self._create_network('json', 'net1', True)
- net1 = self.deserialize('json', res)
- res = self._create_port('json', net1['network']['id'])
- self.deserialize('json', res)
- self.fc._fake_lswitch_lport_dict.clear()
- req = self.new_list_request('ports')
- nets = self.deserialize('json', req.get_response(self.api))
- self.assertEqual(nets['ports'][0]['status'],
- constants.PORT_STATUS_ERROR)
-
def test_show_port_not_in_nvp(self):
res = self._create_network('json', 'net1', True)
net1 = self.deserialize('json', res)
port = self.deserialize('json', res)
self.fc._fake_lswitch_lport_dict.clear()
self.fc._fake_lswitch_lportstatus_dict.clear()
- req = self.new_show_request('ports', port['port']['id'])
+ req = self.new_show_request('ports', port['port']['id'],
+ fields=['id', 'status'])
net = self.deserialize('json', req.get_response(self.api))
self.assertEqual(net['port']['status'],
constants.PORT_STATUS_ERROR)
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, 204)
- def test_list_routers_not_in_nvp(self):
- res = self._create_router('json', 'tenant')
- self.deserialize('json', res)
- self.fc._fake_lrouter_dict.clear()
- req = self.new_list_request('routers')
- routers = self.deserialize('json', req.get_response(self.ext_api))
- self.assertEqual(routers['routers'][0]['status'],
- constants.NET_STATUS_ERROR)
-
def test_show_router_not_in_nvp(self):
res = self._create_router('json', 'tenant')
router = self.deserialize('json', res)
self.fc._fake_lrouter_dict.clear()
- req = self.new_show_request('routers', router['router']['id'])
+ req = self.new_show_request('routers', router['router']['id'],
+ fields=['id', 'status'])
router = self.deserialize('json', req.get_response(self.ext_api))
self.assertEqual(router['router']['status'],
constants.NET_STATUS_ERROR)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Nicira Networks, Inc.
+# All Rights Reserved
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import contextlib
+import time
+
+import mock
+from oslo.config import cfg
+
+from neutron.api.v2 import attributes as attr
+from neutron.common import config
+from neutron.common import constants
+from neutron import context
+from neutron.openstack.common import jsonutils as json
+from neutron.plugins.nicira.common import sync
+from neutron.plugins.nicira import NeutronPlugin
+from neutron.plugins.nicira import nvp_cluster
+from neutron.plugins.nicira import NvpApiClient
+from neutron.plugins.nicira import nvplib
+from neutron.tests import base
+from neutron.tests.unit.nicira import fake_nvpapiclient
+from neutron.tests.unit.nicira import get_fake_conf
+from neutron.tests.unit.nicira import NVPAPI_NAME
+from neutron.tests.unit.nicira import STUBS_PATH
+from neutron.tests.unit import test_api_v2
+
+from neutron.openstack.common import log
+
+LOG = log.getLogger(__name__)
+
+_uuid = test_api_v2._uuid
+LSWITCHES = [{'uuid': _uuid(), 'name': 'ls-1'},
+ {'uuid': _uuid(), 'name': 'ls-2'}]
+LSWITCHPORTS = [{'uuid': _uuid(), 'name': 'lp-1'},
+ {'uuid': _uuid(), 'name': 'lp-2'}]
+LROUTERS = [{'uuid': _uuid(), 'name': 'lr-1'},
+ {'uuid': _uuid(), 'name': 'lr-2'}]
+
+
+class NvpCacheTestCase(base.BaseTestCase):
+ """Test suite providing coverage for the NvpCache class."""
+
+ def setUp(self):
+ self.nvp_cache = sync.NvpCache()
+ for lswitch in LSWITCHES:
+ self.nvp_cache._uuid_dict_mappings[lswitch['uuid']] = (
+ self.nvp_cache._lswitches)
+ self.nvp_cache._lswitches[lswitch['uuid']] = (
+ {'data': lswitch,
+ 'hash': hash(json.dumps(lswitch))})
+ for lswitchport in LSWITCHPORTS:
+ self.nvp_cache._uuid_dict_mappings[lswitchport['uuid']] = (
+ self.nvp_cache._lswitchports)
+ self.nvp_cache._lswitchports[lswitchport['uuid']] = (
+ {'data': lswitchport,
+ 'hash': hash(json.dumps(lswitchport))})
+ for lrouter in LROUTERS:
+ self.nvp_cache._uuid_dict_mappings[lrouter['uuid']] = (
+ self.nvp_cache._lrouters)
+ self.nvp_cache._lrouters[lrouter['uuid']] = (
+ {'data': lrouter,
+ 'hash': hash(json.dumps(lrouter))})
+ super(NvpCacheTestCase, self).setUp()
+
+ def test_get_lswitches(self):
+ ls_uuids = self.nvp_cache.get_lswitches()
+ self.assertEqual(set(ls_uuids),
+ set([ls['uuid'] for ls in LSWITCHES]))
+
+ def test_get_lswitchports(self):
+ lp_uuids = self.nvp_cache.get_lswitchports()
+ self.assertEqual(set(lp_uuids),
+ set([lp['uuid'] for lp in LSWITCHPORTS]))
+
+ def test_get_lrouters(self):
+ lr_uuids = self.nvp_cache.get_lrouters()
+ self.assertEqual(set(lr_uuids),
+ set([lr['uuid'] for lr in LROUTERS]))
+
+ def test_get_lswitches_changed_only(self):
+ ls_uuids = self.nvp_cache.get_lswitches(changed_only=True)
+ self.assertEqual(0, len(ls_uuids))
+
+ def test_get_lswitchports_changed_only(self):
+ lp_uuids = self.nvp_cache.get_lswitchports(changed_only=True)
+ self.assertEqual(0, len(lp_uuids))
+
+ def test_get_lrouters_changed_only(self):
+ lr_uuids = self.nvp_cache.get_lrouters(changed_only=True)
+ self.assertEqual(0, len(lr_uuids))
+
+ def _verify_update(self, new_resource, changed=True, hit=True):
+ cached_resource = self.nvp_cache[new_resource['uuid']]
+ self.assertEqual(new_resource, cached_resource['data'])
+ self.assertEqual(hit, cached_resource.get('hit', False))
+ self.assertEqual(changed,
+ cached_resource.get('changed', False))
+
+ def test_update_lswitch_new_item(self):
+ new_switch_uuid = _uuid()
+ new_switch = {'uuid': new_switch_uuid, 'name': 'new_switch'}
+ self.nvp_cache.update_lswitch(new_switch)
+ self.assertIn(new_switch_uuid, self.nvp_cache._lswitches.keys())
+ self._verify_update(new_switch)
+
+ def test_update_lswitch_existing_item(self):
+ switch = LSWITCHES[0]
+ switch['name'] = 'new_name'
+ self.nvp_cache.update_lswitch(switch)
+ self.assertIn(switch['uuid'], self.nvp_cache._lswitches.keys())
+ self._verify_update(switch)
+
+ def test_update_lswitchport_new_item(self):
+ new_switchport_uuid = _uuid()
+ new_switchport = {'uuid': new_switchport_uuid,
+ 'name': 'new_switchport'}
+ self.nvp_cache.update_lswitchport(new_switchport)
+ self.assertIn(new_switchport_uuid,
+ self.nvp_cache._lswitchports.keys())
+ self._verify_update(new_switchport)
+
+ def test_update_lswitchport_existing_item(self):
+ switchport = LSWITCHPORTS[0]
+ switchport['name'] = 'new_name'
+ self.nvp_cache.update_lswitchport(switchport)
+ self.assertIn(switchport['uuid'],
+ self.nvp_cache._lswitchports.keys())
+ self._verify_update(switchport)
+
+ def test_update_lrouter_new_item(self):
+ new_router_uuid = _uuid()
+ new_router = {'uuid': new_router_uuid,
+ 'name': 'new_router'}
+ self.nvp_cache.update_lrouter(new_router)
+ self.assertIn(new_router_uuid,
+ self.nvp_cache._lrouters.keys())
+ self._verify_update(new_router)
+
+ def test_update_lrouter_existing_item(self):
+ router = LROUTERS[0]
+ router['name'] = 'new_name'
+ self.nvp_cache.update_lrouter(router)
+ self.assertIn(router['uuid'],
+ self.nvp_cache._lrouters.keys())
+ self._verify_update(router)
+
+ def test_process_updates_initial(self):
+ # Clear cache content to simulate first-time filling
+ self.nvp_cache._lswitches.clear()
+ self.nvp_cache._lswitchports.clear()
+ self.nvp_cache._lrouters.clear()
+ self.nvp_cache.process_updates(LSWITCHES, LROUTERS, LSWITCHPORTS)
+ for resource in LSWITCHES + LROUTERS + LSWITCHPORTS:
+ self._verify_update(resource)
+
+ def test_process_updates_no_change(self):
+ self.nvp_cache.process_updates(LSWITCHES, LROUTERS, LSWITCHPORTS)
+ for resource in LSWITCHES + LROUTERS + LSWITCHPORTS:
+ self._verify_update(resource, changed=False)
+
+ def test_process_updates_with_changes(self):
+ LSWITCHES[0]['name'] = 'altered'
+ self.nvp_cache.process_updates(LSWITCHES, LROUTERS, LSWITCHPORTS)
+ for resource in LSWITCHES + LROUTERS + LSWITCHPORTS:
+ changed = (True if resource['uuid'] == LSWITCHES[0]['uuid']
+ else False)
+ self._verify_update(resource, changed=changed)
+
+ def _test_process_updates_with_removals(self):
+ lswitches = LSWITCHES[:]
+ lswitch = lswitches.pop()
+ self.nvp_cache.process_updates(lswitches, LROUTERS, LSWITCHPORTS)
+ for resource in LSWITCHES + LROUTERS + LSWITCHPORTS:
+ hit = (False if resource['uuid'] == lswitch['uuid']
+ else True)
+ self._verify_update(resource, changed=False, hit=hit)
+ return (lswitch, lswitches)
+
+ def test_process_updates_with_removals(self):
+ self._test_process_updates_with_removals()
+
+ def test_process_updates_cleanup_after_delete(self):
+ deleted_lswitch, lswitches = self._test_process_updates_with_removals()
+ self.nvp_cache.process_deletes()
+ self.nvp_cache.process_updates(lswitches, LROUTERS, LSWITCHPORTS)
+ self.assertNotIn(deleted_lswitch['uuid'], self.nvp_cache._lswitches)
+
+ def _verify_delete(self, resource, deleted=True, hit=True):
+ cached_resource = self.nvp_cache[resource['uuid']]
+ data_field = 'data_bk' if deleted else 'data'
+ self.assertEqual(resource, cached_resource[data_field])
+ self.assertEqual(hit, cached_resource.get('hit', False))
+ self.assertEqual(deleted,
+ cached_resource.get('changed', False))
+
+ def _set_hit(self, resources, uuid_to_delete=None):
+ for resource in resources:
+ if resource['data']['uuid'] != uuid_to_delete:
+ resource['hit'] = True
+
+ def test_process_deletes_no_change(self):
+ # Mark all resources as hit
+ self._set_hit(self.nvp_cache._lswitches.values())
+ self._set_hit(self.nvp_cache._lswitchports.values())
+ self._set_hit(self.nvp_cache._lrouters.values())
+ self.nvp_cache.process_deletes()
+ for resource in LSWITCHES + LROUTERS + LSWITCHPORTS:
+ self._verify_delete(resource, hit=False, deleted=False)
+
+ def test_process_deletes_with_removals(self):
+ # Mark all resources but one as hit
+ uuid_to_delete = LSWITCHPORTS[0]['uuid']
+ self._set_hit(self.nvp_cache._lswitches.values(),
+ uuid_to_delete)
+ self._set_hit(self.nvp_cache._lswitchports.values(),
+ uuid_to_delete)
+ self._set_hit(self.nvp_cache._lrouters.values(),
+ uuid_to_delete)
+ self.nvp_cache.process_deletes()
+ for resource in LSWITCHES + LROUTERS + LSWITCHPORTS:
+ deleted = resource['uuid'] == uuid_to_delete
+ self._verify_delete(resource, hit=False, deleted=deleted)
+
+
+class SyncLoopingCallTestCase(base.BaseTestCase):
+
+ def test_looping_calls(self):
+ # Avoid runs of the synchronization process - just start
+ # the looping call
+ with mock.patch.object(
+ sync.NvpSynchronizer, '_synchronize_state',
+ return_value=0.01):
+ synchronizer = sync.NvpSynchronizer(None, None,
+ 100, 0, 0)
+ time.sleep(0.04999)
+ self.assertEqual(
+ 5, synchronizer._synchronize_state.call_count)
+
+
+class NvpSyncTestCase(base.BaseTestCase):
+
+ def setUp(self):
+ # mock nvp api client
+ self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
+ mock_nvpapi = mock.patch(NVPAPI_NAME, autospec=True)
+ # Avoid runs of the synchronizer looping call
+ # These unit tests will excplicitly invoke synchronization
+ patch_sync = mock.patch.object(sync, '_start_loopingcall')
+ self.mock_nvpapi = mock_nvpapi.start()
+ patch_sync.start()
+ self.mock_nvpapi.return_value.login.return_value = "the_cookie"
+ # Emulate tests against NVP 3.x
+ self.mock_nvpapi.return_value.get_nvp_version.return_value = (
+ NvpApiClient.NVPVersion("3.1"))
+
+ def _fake_request(*args, **kwargs):
+ return self.fc.fake_request(*args, **kwargs)
+
+ self.mock_nvpapi.return_value.request.side_effect = _fake_request
+ self.fake_cluster = nvp_cluster.NVPCluster(
+ name='fake-cluster', nvp_controllers=['1.1.1.1:999'],
+ default_tz_uuid=_uuid(), nvp_user='foo', nvp_password='bar')
+ self.fake_cluster.api_client = NvpApiClient.NVPApiHelper(
+ ('1.1.1.1', '999', True),
+ self.fake_cluster.nvp_user, self.fake_cluster.nvp_password,
+ self.fake_cluster.req_timeout, self.fake_cluster.http_timeout,
+ self.fake_cluster.retries, self.fake_cluster.redirects)
+ # Instantiate Neutron plugin
+ # and setup needed config variables
+ args = ['--config-file', get_fake_conf('neutron.conf.test'),
+ '--config-file', get_fake_conf('nvp.ini.test')]
+ config.parse(args=args)
+ self._plugin = NeutronPlugin.NvpPluginV2()
+ super(NvpSyncTestCase, self).setUp()
+ self.addCleanup(self.fc.reset_all)
+ self.addCleanup(patch_sync.stop)
+ self.addCleanup(mock_nvpapi.stop)
+
+ def tearDown(self):
+ cfg.CONF.reset()
+ super(NvpSyncTestCase, self).tearDown()
+
+ @contextlib.contextmanager
+ def _populate_data(self, ctx, net_size=2, port_size=2, router_size=2):
+
+ def network(idx):
+ return {'network': {'name': 'net-%s' % idx,
+ 'admin_state_up': True,
+ 'shared': False,
+ 'port_security_enabled': True,
+ 'tenant_id': 'foo'}}
+
+ def subnet(idx, net_id):
+ return {'subnet':
+ {'cidr': '10.10.%s.0/24' % idx,
+ 'name': 'sub-%s' % idx,
+ 'gateway_ip': attr.ATTR_NOT_SPECIFIED,
+ 'allocation_pools': attr.ATTR_NOT_SPECIFIED,
+ 'ip_version': 4,
+ 'dns_nameservers': attr.ATTR_NOT_SPECIFIED,
+ 'host_routes': attr.ATTR_NOT_SPECIFIED,
+ 'enable_dhcp': True,
+ 'network_id': net_id,
+ 'tenant_id': 'foo'}}
+
+ def port(idx, net_id):
+ return {'port': {'network_id': net_id,
+ 'name': 'port-%s' % idx,
+ 'admin_state_up': True,
+ 'device_id': 'miao',
+ 'device_owner': 'bau',
+ 'fixed_ips': attr.ATTR_NOT_SPECIFIED,
+ 'mac_address': attr.ATTR_NOT_SPECIFIED,
+ 'tenant_id': 'foo'}}
+
+ def router(idx):
+ # Use random uuids as names
+ return {'router': {'name': 'rtr-%s' % idx,
+ 'admin_state_up': True,
+ 'tenant_id': 'foo'}}
+
+ networks = []
+ ports = []
+ routers = []
+ for i in range(0, net_size):
+ net = self._plugin.create_network(ctx, network(i))
+ networks.append(net)
+ self._plugin.create_subnet(ctx, subnet(i, net['id']))
+ for j in range(0, port_size):
+ ports.append(self._plugin.create_port(
+ ctx, port("%s-%s" % (i, j), net['id'])))
+ for i in range(0, router_size):
+ routers.append(self._plugin.create_router(ctx, router(i)))
+ # Do not return anything as the user does need the actual
+ # data created
+ try:
+ yield
+ finally:
+ # Remove everything
+ for router in routers:
+ self._plugin.delete_router(ctx, router['id'])
+ for port in ports:
+ self._plugin.delete_port(ctx, port['id'])
+ # This will remove networks and subnets
+ for network in networks:
+ self._plugin.delete_network(ctx, network['id'])
+
+ def _get_tag_dict(self, tags):
+ return dict((tag['scope'], tag['tag']) for tag in tags)
+
+ def _test_sync(self, exp_net_status,
+ exp_port_status, exp_router_status,
+ action_callback=None, sp=None):
+ neutron_net_id = ls_uuid = self.fc._fake_lswitch_dict.keys()[0]
+ lp_uuid = self.fc._fake_lswitch_lport_dict.keys()[0]
+ neutron_port_id = self._get_tag_dict(
+ self.fc._fake_lswitch_lport_dict[lp_uuid]['tags'])['q_port_id']
+ neutron_rtr_id = lr_uuid = self.fc._fake_lrouter_dict.keys()[0]
+ if action_callback:
+ action_callback(ls_uuid, lp_uuid, lr_uuid)
+ # Make chunk big enough to read everything
+ if not sp:
+ sp = sync.SyncParameters(100)
+ self._plugin._synchronizer._synchronize_state(sp)
+ # Verify element is in expected status
+ # TODO(salv-orlando): Verify status for all elements
+ ctx = context.get_admin_context()
+ neutron_net = self._plugin.get_network(ctx, neutron_net_id)
+ neutron_port = self._plugin.get_port(ctx, neutron_port_id)
+ neutron_rtr = self._plugin.get_router(ctx, neutron_rtr_id)
+ self.assertEqual(exp_net_status, neutron_net['status'])
+ self.assertEqual(exp_port_status, neutron_port['status'])
+ self.assertEqual(exp_router_status, neutron_rtr['status'])
+
+ def _action_callback_status_down(self, ls_uuid, lp_uuid, lr_uuid):
+ self.fc._fake_lswitch_dict[ls_uuid]['status'] = 'false'
+ self.fc._fake_lswitch_lport_dict[lp_uuid]['status'] = 'false'
+ self.fc._fake_lrouter_dict[lr_uuid]['status'] = 'false'
+
+ def test_initial_sync(self):
+ ctx = context.get_admin_context()
+ with self._populate_data(ctx):
+ self._test_sync(
+ constants.NET_STATUS_ACTIVE,
+ constants.PORT_STATUS_ACTIVE,
+ constants.NET_STATUS_ACTIVE)
+
+ def test_initial_sync_with_resources_down(self):
+ ctx = context.get_admin_context()
+ with self._populate_data(ctx):
+ self._test_sync(
+ constants.NET_STATUS_DOWN, constants.PORT_STATUS_DOWN,
+ constants.NET_STATUS_DOWN, self._action_callback_status_down)
+
+ def test_resync_with_resources_down(self):
+ ctx = context.get_admin_context()
+ with self._populate_data(ctx):
+ sp = sync.SyncParameters(100)
+ self._plugin._synchronizer._synchronize_state(sp)
+ self._test_sync(
+ constants.NET_STATUS_DOWN, constants.PORT_STATUS_DOWN,
+ constants.NET_STATUS_DOWN, self._action_callback_status_down)
+
+ def _action_callback_del_resource(self, ls_uuid, lp_uuid, lr_uuid):
+ del self.fc._fake_lswitch_dict[ls_uuid]
+ del self.fc._fake_lswitch_lport_dict[lp_uuid]
+ del self.fc._fake_lrouter_dict[lr_uuid]
+
+ def test_initial_sync_with_resources_removed(self):
+ ctx = context.get_admin_context()
+ with self._populate_data(ctx):
+ self._test_sync(
+ constants.NET_STATUS_ERROR, constants.PORT_STATUS_ERROR,
+ constants.NET_STATUS_ERROR, self._action_callback_del_resource)
+
+ def test_resync_with_resources_removed(self):
+ ctx = context.get_admin_context()
+ with self._populate_data(ctx):
+ sp = sync.SyncParameters(100)
+ self._plugin._synchronizer._synchronize_state(sp)
+ self._test_sync(
+ constants.NET_STATUS_ERROR, constants.PORT_STATUS_ERROR,
+ constants.NET_STATUS_ERROR, self._action_callback_del_resource)
+
+ def _test_sync_with_chunk_larger_maxpagesize(
+ self, net_size, port_size, router_size, chunk_size, exp_calls):
+ ctx = context.get_admin_context()
+ real_func = nvplib.get_single_query_page
+ sp = sync.SyncParameters(chunk_size)
+ with self._populate_data(ctx, net_size=net_size,
+ port_size=port_size,
+ router_size=router_size):
+ with mock.patch.object(nvplib, 'MAX_PAGE_SIZE', 15):
+ # The following mock is just for counting calls,
+ # but we will still run the actual function
+ with mock.patch.object(
+ nvplib, 'get_single_query_page',
+ side_effect=real_func) as mock_get_page:
+ self._test_sync(
+ constants.NET_STATUS_ACTIVE,
+ constants.PORT_STATUS_ACTIVE,
+ constants.NET_STATUS_ACTIVE,
+ sp=sp)
+ # As each resource type does not exceed the maximum page size,
+ # the method should be called once for each resource type
+ self.assertEqual(exp_calls, mock_get_page.call_count)
+
+ def test_sync_chunk_larger_maxpagesize_no_multiple_requests(self):
+ # total resource size = 20
+ # total size for each resource does not exceed max page size (15)
+ self._test_sync_with_chunk_larger_maxpagesize(
+ net_size=5, port_size=2, router_size=5,
+ chunk_size=20, exp_calls=3)
+
+ def test_sync_chunk_larger_maxpagesize_triggers_multiple_requests(self):
+ # total resource size = 48
+ # total size for each resource does exceed max page size (15)
+ self._test_sync_with_chunk_larger_maxpagesize(
+ net_size=16, port_size=1, router_size=16,
+ chunk_size=48, exp_calls=6)
+
+ def test_sync_multi_chunk(self):
+ # The fake NVP API client cannot be used for this test
+ ctx = context.get_admin_context()
+ # Generate 4 networks, 1 port per network, and 4 routers
+ with self._populate_data(ctx, net_size=4, port_size=1, router_size=4):
+ fake_lswitches = json.loads(
+ self.fc.handle_get('/ws.v1/lswitch'))['results']
+ fake_lrouters = json.loads(
+ self.fc.handle_get('/ws.v1/lrouter'))['results']
+ fake_lswitchports = json.loads(
+ self.fc.handle_get('/ws.v1/lswitch/*/lport'))['results']
+ return_values = [
+ # Chunk 0 - lswitches
+ (fake_lswitches, None, 4),
+ # Chunk 0 - lrouters
+ (fake_lrouters[:2], 'xxx', 4),
+ # Chunk 0 - lports (size only)
+ ([], 'start', 4),
+ # Chunk 1 - lrouters (2 more) (lswitches are skipped)
+ (fake_lrouters[2:], None, None),
+ # Chunk 1 - lports
+ (fake_lswitchports, None, 4)]
+
+ def fake_fetch_data(*args, **kwargs):
+ return return_values.pop(0)
+
+ # 2 Chunks, with 6 resources each.
+ # 1st chunk lswitches and lrouters
+ # 2nd chunk lrouters and lports
+ # Mock _fetch_data
+ with mock.patch.object(
+ self._plugin._synchronizer, '_fetch_data',
+ side_effect=fake_fetch_data):
+ sp = sync.SyncParameters(6)
+
+ def do_chunk(chunk_idx, ls_cursor, lr_cursor, lp_cursor):
+ self._plugin._synchronizer._synchronize_state(sp)
+ self.assertEqual(chunk_idx, sp.current_chunk)
+ self.assertEqual(ls_cursor, sp.ls_cursor)
+ self.assertEqual(lr_cursor, sp.lr_cursor)
+ self.assertEqual(lp_cursor, sp.lp_cursor)
+
+ # check 1st chunk
+ do_chunk(1, None, 'xxx', 'start')
+ # check 2nd chunk
+ do_chunk(0, None, None, None)
+ # Chunk size should have stayed the same
+ self.assertEqual(sp.chunk_size, 6)
+
+ def test_synchronize_network(self):
+ ctx = context.get_admin_context()
+ with self._populate_data(ctx):
+ # Put a network down to verify synchronization
+ q_net_id = ls_uuid = self.fc._fake_lswitch_dict.keys()[0]
+ self.fc._fake_lswitch_dict[ls_uuid]['status'] = 'false'
+ q_net_data = self._plugin._get_network(ctx, q_net_id)
+ self._plugin._synchronizer.synchronize_network(ctx, q_net_data)
+ # Reload from db
+ q_nets = self._plugin.get_networks(ctx)
+ for q_net in q_nets:
+ if q_net['id'] == q_net_id:
+ exp_status = constants.NET_STATUS_DOWN
+ else:
+ exp_status = constants.NET_STATUS_ACTIVE
+ self.assertEqual(exp_status, q_net['status'])
+
+ def test_synchronize_port(self):
+ ctx = context.get_admin_context()
+ with self._populate_data(ctx):
+ # Put a network down to verify synchronization
+ lp_uuid = self.fc._fake_lswitch_lport_dict.keys()[0]
+ lport = self.fc._fake_lswitch_lport_dict[lp_uuid]
+ q_port_id = self._get_tag_dict(lport['tags'])['q_port_id']
+ lport['status'] = 'false'
+ q_port_data = self._plugin._get_port(ctx, q_port_id)
+ self._plugin._synchronizer.synchronize_port(ctx, q_port_data)
+ # Reload from db
+ q_ports = self._plugin.get_ports(ctx)
+ for q_port in q_ports:
+ if q_port['id'] == q_port_id:
+ exp_status = constants.PORT_STATUS_DOWN
+ else:
+ exp_status = constants.PORT_STATUS_ACTIVE
+ self.assertEqual(exp_status, q_port['status'])
+
+ def test_synchronize_router(self):
+ ctx = context.get_admin_context()
+ with self._populate_data(ctx):
+ # Put a network down to verify synchronization
+ q_rtr_id = lr_uuid = self.fc._fake_lrouter_dict.keys()[0]
+ self.fc._fake_lrouter_dict[lr_uuid]['status'] = 'false'
+ q_rtr_data = self._plugin._get_router(ctx, q_rtr_id)
+ self._plugin._synchronizer.synchronize_router(ctx, q_rtr_data)
+ # Reload from db
+ q_routers = self._plugin.get_routers(ctx)
+ for q_rtr in q_routers:
+ if q_rtr['id'] == q_rtr_id:
+ exp_status = constants.NET_STATUS_DOWN
+ else:
+ exp_status = constants.NET_STATUS_ACTIVE
+ self.assertEqual(exp_status, q_rtr['status'])
+
+ def test_sync_nvp_failure_backoff(self):
+ self.mock_nvpapi.return_value.request.side_effect = (
+ NvpApiClient.RequestTimeout)
+ # chunk size won't matter here
+ sp = sync.SyncParameters(999)
+ for i in range(0, 10):
+ self.assertEqual(
+ min(64, 2 ** i),
+ self._plugin._synchronizer._synchronize_state(sp))
import fixtures
import testtools
+import mock
from oslo.config import cfg
from neutron.common import config as q_config
from neutron.openstack.common import uuidutils
from neutron.plugins.nicira.common import config # noqa
from neutron.plugins.nicira.common import exceptions
+from neutron.plugins.nicira.common import sync
from neutron.plugins.nicira import nvp_cluster
from neutron.tests.unit.nicira import get_fake_conf
from neutron.tests.unit.nicira import PLUGIN_NAME
self.useFixture(fixtures.MonkeyPatch(
'neutron.manager.NeutronManager._instance',
None))
+ # Avoid runs of the synchronizer looping call
+ patch_sync = mock.patch.object(sync, '_start_loopingcall')
+ patch_sync.start()
+ self.addCleanup(patch_sync.stop)
def _assert_required_options(self, cluster):
self.assertEqual(cluster.nvp_controllers, ['fake_1:443', 'fake_2:443'])
self.useFixture(fixtures.MonkeyPatch(
'neutron.manager.NeutronManager._instance',
None))
+ # Avoid runs of the synchronizer looping call
+ patch_sync = mock.patch.object(sync, '_start_loopingcall')
+ patch_sync.start()
+ self.addCleanup(patch_sync.stop)
def _assert_required_options(self, cluster):
self.assertEqual(cluster.nvp_controllers, ['fake_1:443', 'fake_2:443'])
'GET', resource, None, fmt, params=params, subresource=subresource
)
- def new_show_request(self, resource, id, fmt=None, subresource=None):
- return self._req(
- 'GET', resource, None, fmt, id=id, subresource=subresource
- )
+ def new_show_request(self, resource, id, fmt=None,
+ subresource=None, fields=None):
+ if fields:
+ params = "&".join(["fields=%s" % x for x in fields])
+ else:
+ params = None
+ return self._req('GET', resource, None, fmt, id=id,
+ params=params, subresource=subresource)
def new_delete_request(self, resource, id, fmt=None, subresource=None,
sub_id=None):