"""
# Each QoS driver should define the set of rule types that it supports, and
- # correspoding handlers that has the following names:
+ # corresponding handlers that has the following names:
#
# create_<type>
# update_<type>
# Even with an infinite lease, a client may choose to renew a
# previous lease on reboot or interface bounce so we should have
# an entry for it.
- # Dnsmasq timestamp format for an infinite lease is is 0.
+ # Dnsmasq timestamp format for an infinite lease is 0.
timestamp = 0
else:
timestamp = int(time.time()) + self.conf.dhcp_lease_duration
@staticmethod
def _make_canonical(ip_version, settings):
- """Converts settings to a canonical represention to compare easily"""
+ """Converts settings to a canonical representation to compare easily"""
def canonicalize_fwmark_string(fwmark_mask):
"""Reformats fwmark/mask in to a canonical form
router['ns_name'],
'link')
# There is a delay before the LLA becomes active.
- # This is because the kernal runs DAD to make sure LLA uniqueness
+ # This is because the kernel runs DAD to make sure LLA uniqueness
# Spawn a thread to wait for the interface to be ready
self._spawn_lla_thread(router['gw_interface'],
router['ns_name'],
# functions
def create_connection(new=True):
- # NOTE(salv-orlando): This is a clever interpreation of the factory design
+ # NOTE(salv-orlando): This is a clever interpretation of the factory design
# patter aimed at preventing plugins from initializing RPC servers upon
# initialization when they are running in the REST over HTTP API server.
# The educated reader will perfectly be able that this a fairly dirty hack
def get_dhcp_agent_device_id(network_id, host):
# Split host so as to always use only the hostname and
- # not the domain name. This will guarantee consistentcy
+ # not the domain name. This will guarantee consistency
# whether a local hostname or an fqdn is passed in.
local_hostname = host.split('.')[0]
host_uuid = uuid.uuid5(uuid.NAMESPACE_DNS, str(local_hostname))
router_id = sa.Column(sa.String(36), sa.ForeignKey('routers.id'))
# Additional attribute for keeping track of the router where the floating
# ip was associated in order to be able to ensure consistency even if an
- # aysnchronous backend is unavailable when the floating IP is disassociated
+ # asynchronous backend is unavailable when the floating IP is disassociated
last_known_router_id = sa.Column(sa.String(36))
status = sa.Column(sa.String(16))
router = orm.relationship(Router, backref='floating_ips')
for quota in context.session.query(quota_models.Quota):
tenant_id = quota['tenant_id']
- # avoid setdefault() because only want to copy when actually req'd
+ # avoid setdefault() because only want to copy when actually
+ # required
tenant_quota = all_tenant_quotas.get(tenant_id)
if tenant_quota is None:
tenant_quota = tenant_default.copy()
# concurrent reservations.
# For this reason it might be advisable to handle contention using
# this kind of locks and paying the cost of a write set certification
- # failure when a mysql galera cluster is employed. Also, this class of
+ # failure when a MySQL Galera cluster is employed. Also, this class of
# locks should be ok to use when support for sending "hotspot" writes
- # to a single node will be avaialable.
+ # to a single node will be available.
requested_resources = deltas.keys()
with db_api.autonested_transaction(context.session):
# get_tenant_quotes needs in input a dictionary mapping resource
context, plugin, tenant_id, resync_usage=False)) for
resource in requested_resources)
# Adjust for expired reservations. Apparently it is cheaper than
- # querying everytime for active reservations and counting overall
+ # querying every time for active reservations and counting overall
# quantity of resources reserved
expired_deltas = quota_api.get_reservations_for_resources(
context, tenant_id, requested_resources, expired=True)
def commit_reservation(self, context, reservation_id):
# Do not mark resource usage as dirty. If a reservation is committed,
- # then the releveant resources have been created. Usage data for these
+ # then the relevant resources have been created. Usage data for these
# resources has therefore already been marked dirty.
quota_api.remove_reservation(context, reservation_id,
set_dirty=False)
**kwargs)
def _extend_port_dict_security_group(self, port_res, port_db):
- # Security group bindings will be retrieved from the sqlalchemy
+ # Security group bindings will be retrieved from the SQLAlchemy
# model. As they're loaded eagerly with ports because of the
# joined load they will not cause an extra query.
security_group_ids = [sec_group_mapping['security_group_id'] for
sort_key_attr = getattr(model, sort_key)
except AttributeError:
# Extension attribute doesn't support for sorting. Because it
- # existed in attr_info, it will be catched at here
+ # existed in attr_info, it will be caught here
msg = _("%s is invalid attribute for sort_key") % sort_key
raise n_exc.BadRequest(resource=model.__tablename__, msg=msg)
if isinstance(sort_key_attr.property, properties.RelationshipProperty):
def create_range(self, session, allocation_pool_id,
range_start, range_end):
- """Create an availabilty range for a given pool.
+ """Create an availability range for a given pool.
This method does not perform any validation on parameters; it simply
persist data on the database.
# NOTE(cbrandily): the update disallows 2 concurrent subnet allocation
# to succeed: at most 1 transaction will succeed, others will be
- # rollbacked and be caught in neutron.db.v2.base
+ # rolled back and be caught in neutron.db.v2.base
query = self._context.session.query(models_v2.SubnetPool).filter_by(
id=self._subnetpool['id'], hash=current_hash)
count = query.update({'hash': new_hash})
priority = 130
def before(self, state):
- # TODO(salv-orlando): This hook must go when adaptin the pecan code to
+ # TODO(salv-orlando): This hook must go when adapting the pecan code to
# use reservations.
if state.request.method != 'POST':
return
# NOTE(salv-orlando): If you are care about code quality, please read below
# Hackiness is strong with the piece of code below. It is used for
# populating resource plurals and registering resources with the quota
- # engine, but the method it calls were not coinceived with this aim.
+ # engine, but the method it calls were not conceived with this aim.
# Therefore it only leverages side-effects from those methods. Moreover,
# as it is really not advisable to load an instance of
# neutron.api.v2.router.APIRouter just to register resources with the
# TODO(shiv) need support for security groups
-"""Implentation of Brocade Neutron Plugin."""
+"""Implementation of Brocade Neutron Plugin."""
from oslo_config import cfg
from oslo_log import log as logging
def get_port(session, port_id):
- """Get port record for update within transcation."""
+ """Get port record for update within transaction."""
with session.begin(subtransactions=True):
try:
# under the License.
-"""Implentation of Brocade ML2 Mechanism driver for ML2 Plugin."""
+"""Implementation of Brocade ML2 Mechanism driver for ML2 Plugin."""
from oslo_config import cfg
def chain_name(vif):
- # start each chain with a common identifer for cleanup to find
+ # start each chain with a common identifier for cleanup to find
return '%s%s' % (SPOOF_CHAIN_PREFIX, vif)
def scan_devices(self, previous, sync):
device_info = {}
- # Save and reinitialise the set variable that the port_update RPC uses.
+ # Save and reinitialize the set variable that the port_update RPC uses.
# This should be thread-safe as the greenthread should not yield
# between these two statements.
updated_devices = self.updated_devices
# that subnet
class LocalDVRSubnetMapping(object):
def __init__(self, subnet, csnat_ofport=constants.OFPORT_INVALID):
- # set of commpute ports on on this dvr subnet
+ # set of compute ports on this dvr subnet
self.compute_ports = {}
self.subnet = subnet
self.csnat_ofport = csnat_ofport
br.install_flood_to_tun(lvm.vlan, lvm.segmentation_id,
lvm.tun_ofports)
else:
- # This local vlan doesn't require any more tunnelling
+ # This local vlan doesn't require any more tunneling
br.delete_flood_to_tun(lvm.vlan)
else:
self.setup_entry_for_arp_reply(br, 'remove', lvm.vlan,
self.tun_br_ofports[tunnel_type][remote_ip] = ofport
# Add flow in default table to resubmit to the right
- # tunnelling table (lvid will be set in the latter)
+ # tunneling table (lvid will be set in the latter)
br.setup_tunnel_port(tunnel_type, ofport)
ofports = self.tun_br_ofports[tunnel_type].values()
# To prevent a possible binding loop, don't try to bind with
# this driver if the same driver has already bound at a higher
# level to one of the segments we are currently trying to
- # bind. Note that is is OK for the same driver to bind at
+ # bind. Note that it is OK for the same driver to bind at
# multiple levels using different segments.
for level in binding_levels:
if (level.driver == driver and
if original_port['admin_state_up'] != updated_port['admin_state_up']:
need_port_update_notify = True
# NOTE: In the case of DVR ports, the port-binding is done after
- # router scheduling when sync_routers is callede and so this call
+ # router scheduling when sync_routers is called and so this call
# below may not be required for DVR routed interfaces. But still
# since we don't have the mech_context for the DVR router interfaces
# at certain times, we just pass the port-context and return it, so
return quota_api.ReservationInfo('fake', None, None, None)
def commit_reservation(self, context, reservation_id):
- """Tnis is a noop as this driver does not support reservations."""
+ """This is a noop as this driver does not support reservations."""
def cancel_reservation(self, context, reservation_id):
- """Tnis is a noop as this driver does not support reservations."""
+ """This is a noop as this driver does not support reservations."""
class QuotaEngine(object):
#
-"""Implentation of Brocade SVI service Plugin."""
+"""Implementation of Brocade SVI service Plugin."""
from oslo_config import cfg
def test_list_agent(self):
body = self.admin_client.list_agents()
agents = body['agents']
- # Hearthbeats must be excluded from comparison
+ # Heartbeats must be excluded from comparison
self.agent.pop('heartbeat_timestamp', None)
self.agent.pop('configurations', None)
for agent in agents:
action='access_as_shared', target_tenant='*')['rbac_policy']
self.admin_client.delete_rbac_policy(res['policy']['id'])
- # now that wilcard is the only remainin, it should be subjected to
+ # now that wildcard is the only remaining, it should be subjected to
# to the same restriction
with testtools.ExpectedException(lib_exc.Conflict):
self.admin_client.delete_rbac_policy(wild['id'])
- # similarily, we can't update the policy to a different tenant
+ # similarly, we can't update the policy to a different tenant
with testtools.ExpectedException(lib_exc.Conflict):
self.admin_client.update_rbac_policy(
wild['id'], target_tenant=self.client2.tenant_id)
self.client.insert_firewall_rule_in_policy(
fw_policy_id, fw_rule_id2, fw_rule_id1, '')
- # Verify the posiition of rule after insertion
+ # Verify the position of rule after insertion
fw_rule = self.client.show_firewall_rule(
fw_rule_id2)
# Insert rule to firewall policy before the first rule
self.client.insert_firewall_rule_in_policy(
fw_policy_id, fw_rule_id2, '', fw_rule_id1)
- # Verify the posiition of rule after insertion
+ # Verify the position of rule after insertion
fw_rule = self.client.show_firewall_rule(
fw_rule_id2)
self.assertEqual(int(fw_rule['firewall_rule']['position']), 1)
fw_policy_id = body['firewall_policy']['id']
self.addCleanup(self._try_delete_policy, fw_policy_id)
self.assertFalse(body['firewall_policy']['audited'])
- # Update firewall policy audited attribute to ture
+ # Update firewall policy audited attribute to true
self.client.update_firewall_policy(fw_policy_id,
audited=True)
# Insert Firewall rule to firewall policy
self.addCleanup(self.client.delete_subnetpool, pool_id_2)
- # now update the pool_id_1 with the prefix interesecting with
+ # now update the pool_id_1 with the prefix intersecting with
# pool_id_2
subnetpool_data = {'subnetpool': {'prefixes':
pool_1_updated_prefixes}}
Returns the tenant_id of the client current user
"""
# TODO(jroovers) This is a temporary workaround to get the tenant_id
- # of the the current client. Replace this once tenant_isolation for
+ # of the current client. Replace this once tenant_isolation for
# neutron is fixed.
body = self.client.show_network(self.network['id'])
return body['network']['tenant_id']
address=self.address)
if self.protocol == self.UDP:
# Create an ASSURED entry in conntrack table for UDP packets,
- # that requires 3-way communcation
+ # that requires 3-way communication
# 1st transmission creates UNREPLIED
# 2nd transmission removes UNREPLIED
# 3rd transmission creates ASSURED
# Regarding MRO, it goes BaseOVSLinuxTestCase, WithScenarios,
-# BaseSudoTestCase, ..., UnitTest, object. setUp is not dfined in
+# BaseSudoTestCase, ..., UnitTest, object. setUp is not defined in
# WithScenarios, so it will correctly be found in BaseSudoTestCase.
class BaseOVSLinuxTestCase(testscenarios.WithScenarios, base.BaseSudoTestCase):
scenarios = [
# clear agent router_info as it will be after restart
self.agent.router_info = {}
- # Synchonize the agent with the plug-in
+ # Synchronize the agent with the plug-in
with mock.patch.object(namespace_manager.NamespaceManager, 'list_all',
return_value=ns_names_to_retrieve):
self.agent.periodic_sync_routers_task(self.agent.context)
def test_dvr_router_add_internal_network_set_arp_cache(self):
# Check that, when the router is set up and there are
- # existing ports on the the uplinked subnet, the ARP
+ # existing ports on the uplinked subnet, the ARP
# cache is properly populated.
self.agent.conf.agent_mode = 'dvr_snat'
router_info = l3_test_common.prepare_router_data()
migration.do_alembic_command(self.alembic_config, 'upgrade',
'heads')
insp = sqlalchemy.engine.reflection.Inspector.from_engine(engine)
- # Test that table creation on mysql only builds InnoDB tables
+ # Test that table creation on MySQL only builds InnoDB tables
tables = insp.get_table_names()
self.assertTrue(len(tables) > 0,
"No tables found. Wrong schema?")
def get_scenarios():
if rest_enabled():
# FIXME(marun) Remove local import once tempest config is safe
- # to import alonside neutron config
+ # to import alongside neutron config
from neutron.tests.retargetable import rest_fixture
return [('tempest', {'client': rest_fixture.RestClientFixture()})]
else:
def create_network(self, **kwargs):
# Supply defaults that are expected to be set by the api
- # framwork
+ # framework
kwargs.setdefault('admin_state_up', True)
kwargs.setdefault('shared', False)
data = dict(network=kwargs)
# We want a helper function here to check and see if admin credentials
# are available so we can do a single call from skip_checks if admin
-# creds area vailable.
+# creds are available.
def is_admin_available():
is_admin = True
# If tenant isolation is enabled admin will be available
new_index = str(roles) + '-' + str(len(self.isolated_creds))
self.isolated_creds[new_index] = exist_creds
del self.isolated_creds[str(roles)]
- # Handle isolated neutron resouces if they exist too
+ # Handle isolated neutron resources if they exist too
if CONF.service_available.neutron:
exist_net = self.isolated_net_resources.get(str(roles))
if exist_net:
return
# NOTE(afazekas): The instance is in "ready for action state"
# when no task in progress
- # NOTE(afazekas): Converted to string bacuse of the XML
+ # NOTE(afazekas): Converted to string because of the XML
# responses
if str(task_state) == "None":
# without state api extension 3 sec usually enough
qos_policy_id = port['qos_policy_id']
port_id = port['port_id']
self.qos_ext.handle_port(self.context, port)
- # we make sure the underlaying qos driver is called with the
+ # we make sure the underlying qos driver is called with the
# right parameters
self.qos_ext.qos_driver.create.assert_called_once_with(
port, TEST_POLICY)
def test_remove_unknown_port(self):
port = self._fake_port()
self.firewall.remove_port_filter(port)
- # checking no exception occures
+ # checking no exception occurs
self.assertFalse(self.v4filter_inst.called)
def test_defer_apply(self):
def setUp(self):
super(OVSHybridIptablesFirewallTestCase, self).setUp()
self.firewall = iptables_firewall.OVSHybridIptablesFirewallDriver()
- # inital data has 1, 2, and 9 in use, see RAW_TABLE_OUTPUT above.
+ # initial data has 1, 2, and 9 in use, see RAW_TABLE_OUTPUT above.
self._dev_zone_map = {'61634509-31': 2, '8f46cf18-12': 9,
'95c24827-02': 2, 'e804433b-61': 1}
self.assertEqual(self._dev_zone_map, self.firewall._device_zone_map)
def test__generate_device_zone(self):
- # inital data has 1, 2, and 9 in use.
+ # initial data has 1, 2, and 9 in use.
# we fill from top up first.
self.assertEqual(10, self.firewall._generate_device_zone('test'))
'%s:neutron/tests/unit/extensions' % self.base_path)
def test_get_extensions_path_no_extensions(self):
- # Reset to default value, as it's overriden by base class
+ # Reset to default value, as it's overridden by base class
cfg.CONF.set_override('api_extensions_path', '')
path = extensions.get_extensions_path()
self.assertEqual(path, self.base_path)
super(QuotaTest, self).setUp()
# Use mock to let the API use a different QuotaEngine instance for
# unit test in this class. This will ensure resource are registered
- # again and instanciated with neutron.quota.resource.CountableResource
+ # again and instantiated with neutron.quota.resource.CountableResource
replacement_registry = resource_registry.ResourceRegistry()
registry_patcher = mock.patch('neutron.quota.resource_registry.'
'ResourceRegistry.get_instance')
ret_port.update(port['port'])
if (delete_addr_pairs or has_addr_pairs):
- # delete address pairds and readd them
+ # delete address pairs and readd them
self._delete_allowed_address_pairs(context, id)
self._process_create_allowed_address_pairs(
context, ret_port,
cfg.CONF.set_override('allow_pagination', True)
cfg.CONF.set_override('allow_sorting', True)
self.api = router.APIRouter()
- # Set the defualt status
+ # Set the default status
self.net_create_status = 'ACTIVE'
self.port_create_status = 'ACTIVE'
class DummyServicePlugin(service_base.ServicePluginBase):
- """This is a simple plugin for managing instantes of a fictional 'dummy'
+ """This is a simple plugin for managing instances of a fictional 'dummy'
service. This plugin is provided as a proof-of-concept of how
advanced service might leverage the service type extension.
Ideally, instances of real advanced services, such as load balancing
self.net_ext = external_net_db.ExternalNetwork(
network_id=self.ext_net_id)
self.context.session.add(self.network)
- # The following is to avoid complains from sqlite on
+ # The following is to avoid complaints from SQLite on
# foreign key violations
self.context.session.flush()
self.context.session.add(self.net_ext)
def test_create_port_with_security_group_and_net_sec_false(self):
# This tests that port_security_enabled is true when creating
# a port on a network that is marked as port_security_enabled=False
- # that has a subnet and securiy_groups are passed it.
+ # that has a subnet and security_groups are passed it.
if self._skip_security_group:
self.skipTest("Plugin does not support security groups")
res = self._create_network('json', 'net1', True,
class TestNeutronDbIpamSubnet(testlib_api.SqlTestCase,
TestNeutronDbIpamMixin):
- """Test case for Subnet interface for Nuetron's DB IPAM driver.
+ """Test case for Subnet interface for Neutron's DB IPAM driver.
This test case exercises the reference IPAM driver.
Even if it loads a plugin, the unit tests in this class do not exercise
val = res['networks'][0]['network_extension']
self.assertEqual("", val)
- # Test create with explict value.
+ # Test create with explicit value.
res = self._create_network(self.fmt,
'test-network', True,
arg_list=('network_extension', ),
self.assertEqual("", val)
with self.network() as network:
- # Test create with explict value.
+ # Test create with explicit value.
data = {'subnet':
{'network_id': network['network']['id'],
'cidr': '10.1.0.0/24',
self.assertEqual("", val)
with self.network() as network:
- # Test create with explict value.
+ # Test create with explicit value.
res = self._create_port(self.fmt,
network['network']['id'],
arg_list=('port_extension', ),
def test_full_uuids_skip_port_id_lookup(self):
plugin = manager.NeutronManager.get_plugin()
# when full UUIDs are provided, the _or statement should only
- # have one matching 'IN' critiera for all of the IDs
+ # have one matching 'IN' criteria for all of the IDs
with mock.patch('neutron.plugins.ml2.db.or_') as or_mock,\
mock.patch('sqlalchemy.orm.Session.query') as qmock:
fmock = qmock.return_value.outerjoin.return_value.filter
def test_add_delete_data_triggers_event(self):
res = self._create_resource()
other_res = self._create_other_resource()
- # Validate dirty tenants since mock does not work well with sqlalchemy
+ # Validate dirty tenants since mock does not work well with SQLAlchemy
# event handlers.
self._add_data()
self._add_data('someone_else')
def open_no_proxy(*args, **kwargs):
# NOTE(jamespage):
- # Deal with more secure certification chain verficiation
+ # Deal with more secure certification chain verification
# introduced in python 2.7.9 under PEP-0476
# https://github.com/python/peps/blob/master/pep-0476.txt
if hasattr(ssl, "_create_unverified_context"):
raise webob.exc.HTTPNotAcceptable(msg)
def _deserialize(self, data, content_type):
- """Deserialize the request body to the specefied content type.
+ """Deserialize the request body to the specified content type.
Uses self._serialization_metadata if it exists, which is a dict mapping
MIME types to information needed to serialize to that type.