"""Make a remote process call to get the dhcp port."""
return dhcp.DictModel(self.call(self.context,
self.make_msg('get_dhcp_port',
- network_id=network_id,
- device_id=device_id,
- host=self.host),
+ network_id=network_id,
+ device_id=device_id,
+ host=self.host),
topic=self.topic))
def create_dhcp_port(self, port):
def __init__(self, application,
ext_mgr=None):
self.ext_mgr = (ext_mgr
- or ExtensionManager(
- get_extensions_path()))
+ or ExtensionManager(get_extensions_path()))
mapper = routes.Mapper()
# extended resources
### commands auto generated by Alembic - please adjust! ###
op.create_table('networksecuritybindings',
sa.Column('network_id', sa.String(length=36),
- nullable=False),
+ nullable=False),
sa.Column('port_security_enabled', sa.Boolean(),
- nullable=False),
+ nullable=False),
sa.ForeignKeyConstraint(['network_id'], ['networks.id'],
- ondelete='CASCADE'),
+ ondelete='CASCADE'),
sa.PrimaryKeyConstraint('network_id'))
op.create_table('portsecuritybindings',
sa.Column('port_id', sa.String(length=36),
- nullable=False),
+ nullable=False),
sa.Column('port_security_enabled', sa.Boolean(),
- nullable=False),
+ nullable=False),
sa.ForeignKeyConstraint(['port_id'], ['ports.id'],
- ondelete='CASCADE'),
+ ondelete='CASCADE'),
sa.PrimaryKeyConstraint('port_id'))
### end Alembic commands ###
sa.Column('network_id', sa.String(length=36),
primary_key=True),
sa.Column('binding_type',
- sa.Enum('flat', 'vlan', 'stt', 'gre', 'l3_ext',
- name=(
+ sa.Enum(
+ 'flat', 'vlan', 'stt', 'gre', 'l3_ext',
+ name=(
'nvp_network_bindings_binding_type')),
nullable=False, primary_key=True),
sa.Column('phy_uuid', sa.String(36), primary_key=True,
"(SELECT network_id from nvp_multi_provider_networks)")
# create table with previous contains
- op.create_table('rename_nvp_network_bindings',
- sa.Column('network_id', sa.String(length=36),
- primary_key=True),
- sa.Column('binding_type',
- sa.Enum('flat', 'vlan', 'stt', 'gre', 'l3_ext',
- name=(
- 'nvp_network_bindings_binding_type')),
- nullable=False),
- sa.Column('phy_uuid', sa.String(36),
- nullable=True),
- sa.Column('vlan_id', sa.Integer,
- nullable=True, autoincrement=False))
+ op.create_table(
+ 'rename_nvp_network_bindings',
+ sa.Column('network_id', sa.String(length=36), primary_key=True),
+ sa.Column('binding_type',
+ sa.Enum('flat', 'vlan', 'stt', 'gre', 'l3_ext',
+ name=('nvp_network_bindings_binding_type')),
+ nullable=False),
+ sa.Column('phy_uuid', sa.String(36), nullable=True),
+ sa.Column('vlan_id', sa.Integer, nullable=True, autoincrement=False))
# copy data from nvp_network_bindings into rename_nvp_network_bindings
op.execute("INSERT INTO rename_nvp_network_bindings SELECT network_id, "
sa.Column('port_id', sa.String(length=36), nullable=False),
sa.Column('mac_learning_enabled', sa.Boolean(), nullable=False),
sa.ForeignKeyConstraint(
- ['port_id'], ['ports.id'], ondelete='CASCADE'),
+ ['port_id'], ['ports.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('port_id'))
)
op.add_column(
u'nsxrouterextattributess',
- sa.Column('service_router',
- sa.Boolean(),
- nullable=False))
+ sa.Column('service_router', sa.Boolean(), nullable=False))
op.execute("UPDATE nsxrouterextattributess set service_router=False")
sa.Column(
'encryption_algorithm',
sa.Enum('3des', 'aes-128', 'aes-256', 'aes-192',
- name='vpn_encrypt_algorithms'), nullable=False),
+ name='vpn_encrypt_algorithms'), nullable=False),
sa.Column(
'phase1_negotiation_mode',
sa.Enum('main', name='ike_phase1_mode'), nullable=False),
'neutron.debug.commands.ExecProbe'),
'ping-all': utils.import_class(
'neutron.debug.commands.PingAll'),
-#TODO(nati) ping, netcat , nmap, bench
+ #TODO(nati) ping, netcat , nmap, bench
}
COMMANDS = {'2.0': COMMAND_V2}
FLAVOR_ATTRIBUTE = {
'networks': {
- FLAVOR_NETWORK: {'allow_post': True,
- 'allow_put': False,
- 'is_visible': True,
- 'default': attributes.ATTR_NOT_SPECIFIED}
+ FLAVOR_NETWORK: {'allow_post': True,
+ 'allow_put': False,
+ 'is_visible': True,
+ 'default': attributes.ATTR_NOT_SPECIFIED}
},
'routers': {
- FLAVOR_ROUTER: {'allow_post': True,
- 'allow_put': False,
- 'is_visible': True,
- 'default': attributes.ATTR_NOT_SPECIFIED}
+ FLAVOR_ROUTER: {'allow_post': True,
+ 'allow_put': False,
+ 'is_visible': True,
+ 'default': attributes.ATTR_NOT_SPECIFIED}
}
}
for (segment1_id, segment2_id) in segment_pairs:
(db_session.query(n1kv_models_v2.
N1kvMultiSegmentNetworkBinding).filter_by(
- multi_segment_id=multi_segment_id,
- segment1_id=segment1_id,
- segment2_id=segment2_id).delete())
+ multi_segment_id=multi_segment_id,
+ segment1_id=segment1_id,
+ segment2_id=segment2_id).delete())
def add_trunk_segment_binding(db_session, trunk_segment_id, segment_pairs):
with db_session.begin(subtransactions=True):
a_set_q = (db_session.query(n1kv_models_v2.ProfileBinding).
filter_by(tenant_id=c_const.TENANT_ID_NOT_SET,
- profile_type=c_const.POLICY))
+ profile_type=c_const.POLICY))
a_set = set(i.profile_id for i in a_set_q)
b_set_q = (db_session.query(n1kv_models_v2.ProfileBinding).
filter(and_(n1kv_models_v2.ProfileBinding.
trunk_segment_id = sa.Column(sa.String(36),
sa.ForeignKey('networks.id',
- ondelete="CASCADE"),
+ ondelete="CASCADE"),
primary_key=True)
segment_id = sa.Column(sa.String(36), nullable=False, primary_key=True)
dot1qtag = sa.Column(sa.String(36), nullable=False, primary_key=True)
multi_segment_id = sa.Column(sa.String(36),
sa.ForeignKey('networks.id',
- ondelete="CASCADE"),
+ ondelete="CASCADE"),
primary_key=True)
segment1_id = sa.Column(sa.String(36), nullable=False, primary_key=True)
segment2_id = sa.Column(sa.String(36), nullable=False, primary_key=True)
n1kv_db_v2.add_multi_segment_encap_profile_name(session,
net_id,
(segment1,
- segment2),
+ segment2),
encap_profile)
else:
raise cisco_exceptions.NoClusterFound
binding = (
n1kv_db_v2.get_multi_segment_network_binding(session, net_id,
(segment1,
- segment2)))
+ segment2)))
encap_profile = binding['encap_profile_name']
if encap_profile in encap_dict:
profile_dict = encap_dict[encap_profile]
'physical_network_vswitch_mappings',
default=[],
help=_('List of <physical_network>:<vswitch> '
- 'where the physical networks can be expressed with '
- 'wildcards, e.g.: ."*:external"')),
+ 'where the physical networks can be expressed with '
+ 'wildcards, e.g.: ."*:external"')),
cfg.StrOpt(
'local_network_vswitch',
default='private',
hyperv_opts = [
cfg.StrOpt('tenant_network_type', default='local',
help=_("Network type for tenant networks "
- "(local, flat, vlan or none)")),
+ "(local, flat, vlan or none)")),
cfg.ListOpt('network_vlan_ranges',
default=DEFAULT_VLAN_RANGES,
help=_("List of <physical_network>:<vlan_min>:<vlan_max> "
- "or <physical_network>")),
+ "or <physical_network>")),
]
cfg.CONF.register_opts(hyperv_opts, "HYPERV")
"""
cmds = ['auth url %s user %s password %s' %
(self._keystone_url(),
- self.keystone_conf.admin_user,
- self.keystone_conf.admin_password)]
+ self.keystone_conf.admin_user,
+ self.keystone_conf.admin_password)]
self._run_openstack_cmds(cmds)
vlan_opts = [
cfg.StrOpt('tenant_network_type', default='vlan',
help=_("Network type for tenant networks "
- "(local, ib, vlan, or none)")),
+ "(local, ib, vlan, or none)")),
cfg.ListOpt('network_vlan_ranges',
default=DEFAULT_VLAN_RANGES,
help=_("List of <physical_network>:<vlan_min>:<vlan_max> "
try:
entry = (session.query(mlnx_models_v2.SegmentationIdAllocation).
filter_by(physical_network=physical_network,
- segmentation_id=segmentation_id).
- with_lockmode('update').
- one())
+ segmentation_id=segmentation_id).
+ with_lockmode('update').one())
if entry.allocated:
raise q_exc.VlanIdInUse(vlan_id=segmentation_id,
physical_network=physical_network)
in securitygroups_db.IP_PROTOCOL_MAP.values())
if (not port_based_proto and
(r['port_range_min'] is not None or
- r['port_range_max'] is not None)):
+ r['port_range_max'] is not None)):
msg = (_("Port values not valid for "
"protocol: %s") % r['protocol'])
raise q_exc.BadRequest(resource='security_group_rule',
if (default_gateways[svc_type] and
default_gateways[svc_type] not in gateway_services):
print("\t\t\tError: specified default %s gateway (%s) is "
- "missing from NVP Gateway Services!" % (svc_type,
- default_gateways[svc_type]))
+ "missing from NVP Gateway Services!" % (
+ svc_type,
+ default_gateways[svc_type]))
errors += 1
transport_zones = get_transport_zones(cluster)
print("\tTransport zones: %s" % transport_zones)
except vcns_exc.VcnsApiException as e:
LOG.exception(_("Failed to get firewall rule: %(rule_id)s "
"with edge_id: %(edge_id)s"), {
- 'rule_id': id,
- 'edge_id': edge_id})
+ 'rule_id': id,
+ 'edge_id': edge_id})
raise e
return self._restore_firewall_rule(context, edge_id, response)
context.session, id, edge_id)
if not pool_binding:
msg = (_("pool_binding not found with id: %(id)s "
- "edge_id: %(edge_id)s") % {
- 'id': id,
- 'edge_id': edge_id})
+ "edge_id: %(edge_id)s") % {'id': id, 'edge_id': edge_id})
LOG.error(msg)
raise vcns_exc.VcnsNotFound(
resource='router_service_binding', msg=msg)
context.session, id, edge_id)
if not monitor_binding:
msg = (_("monitor_binding not found with id: %(id)s "
- "edge_id: %(edge_id)s") % {
- 'id': id,
- 'edge_id': edge_id})
+ "edge_id: %(edge_id)s") % {'id': id, 'edge_id': edge_id})
LOG.error(msg)
raise vcns_exc.VcnsNotFound(
resource='router_service_binding', msg=msg)
# install
LOG.debug(_("Apply fw on Router List: '%s'"),
[ri.router['id']
- for ri in router_info_list])
+ for ri in router_info_list])
# no need to apply sync data for ACTIVE fw
if fw['status'] != constants.ACTIVE:
self._invoke_driver_for_sync_from_plugin(
'nexthops': ['4.4.4.4', '4.4.4.5']}]
body1 = self._update('routers', r1_id,
{'router':
- {'router_rules': router1_rules}})
+ {'router_rules': router1_rules}})
body2 = self._update('routers', r2_id,
{'router':
- {'router_rules': router2_rules}})
+ {'router_rules': router2_rules}})
body1 = self._show('routers', r1_id)
body2 = self._show('routers', r2_id)
_db_profile = (n1kv_db_v2.create_network_profile(
self.session, TEST_NETWORK_PROFILE_MULTI_SEGMENT))
self.assertIsNotNone(_db_profile)
- db_profile = (self.session.query(n1kv_models_v2.NetworkProfile).
- filter_by(
- name=TEST_NETWORK_PROFILE_MULTI_SEGMENT['name']).
- one())
+ db_profile = (
+ self.session.query(
+ n1kv_models_v2.NetworkProfile).filter_by(
+ name=TEST_NETWORK_PROFILE_MULTI_SEGMENT['name'])
+ .one())
self.assertIsNotNone(db_profile)
self.assertEqual(_db_profile.id, db_profile.id)
self.assertEqual(_db_profile.name, db_profile.name)
self._delete(
'ipsec-site-connections',
ipsec_site_connection[
- 'ipsec_site_connection']['id']
+ 'ipsec_site_connection']['id']
)
req = self.new_show_request(
'ipsec-site-connections',
ipsec_site_connection[
- 'ipsec_site_connection']['id'],
+ 'ipsec_site_connection']['id'],
fmt=self.fmt
)
res = self.deserialize(
'192.168.1.10',
'192.168.1.10',
['192.168.2.0/24',
- '192.168.3.0/24'],
+ '192.168.3.0/24'],
1500,
'abcdef',
'bi-directional',
'192.168.1.10',
'192.168.1.10',
['192.168.2.0/24',
- '192.168.3.0/24'],
+ '192.168.3.0/24'],
1500,
'abcdef',
'bi-directional',
[{pnet.NETWORK_TYPE: 'vlan',
pnet.PHYSICAL_NETWORK: 'physnet1',
pnet.SEGMENTATION_ID: 1},
- {pnet.NETWORK_TYPE: 'vlan',
- pnet.PHYSICAL_NETWORK: 'physnet1',
- pnet.SEGMENTATION_ID: 2}],
+ {pnet.NETWORK_TYPE: 'vlan',
+ pnet.PHYSICAL_NETWORK: 'physnet1',
+ pnet.SEGMENTATION_ID: 2}],
'tenant_id': 'tenant_one'}}
network_req = self.new_create_request('networks', data)
network = self.deserialize(self.fmt,
[{pnet.NETWORK_TYPE: 'vlan',
pnet.PHYSICAL_NETWORK: 'physnet1',
pnet.SEGMENTATION_ID: 1},
- {pnet.NETWORK_TYPE: 'vlan',
- pnet.PHYSICAL_NETWORK: 'physnet1',
- pnet.SEGMENTATION_ID: 1}],
+ {pnet.NETWORK_TYPE: 'vlan',
+ pnet.PHYSICAL_NETWORK: 'physnet1',
+ pnet.SEGMENTATION_ID: 1}],
'tenant_id': 'tenant_one'}}
network_req = self.new_create_request('networks', data)
res = network_req.get_response(self.api)
self.assertIsNone(self.driver.
get_vxlan_allocation(self.session,
- (TUN_MIN + 5 - 1)))
+ (TUN_MIN + 5 - 1)))
self.assertFalse(self.driver.
get_vxlan_allocation(self.session, (TUN_MIN + 5)).
allocated)
allocated)
self.assertIsNone(self.driver.
get_vxlan_allocation(self.session,
- (TUN_MAX + 5 + 1)))
+ (TUN_MAX + 5 + 1)))
def test_reserve_provider_segment(self):
segment = {api.NETWORK_TYPE: 'vxlan',
[{pnet.NETWORK_TYPE: 'vlan',
pnet.PHYSICAL_NETWORK: 'physnet1',
pnet.SEGMENTATION_ID: 1},
- {pnet.NETWORK_TYPE: 'stt',
- pnet.PHYSICAL_NETWORK: 'physnet1'}],
+ {pnet.NETWORK_TYPE: 'stt',
+ pnet.PHYSICAL_NETWORK: 'physnet1'}],
'tenant_id': 'tenant_one'}}
network_req = self.new_create_request('networks', data)
network = self.deserialize(self.fmt,
[{pnet.NETWORK_TYPE: 'vlan',
pnet.PHYSICAL_NETWORK: 'physnet1',
pnet.SEGMENTATION_ID: 1},
- {pnet.NETWORK_TYPE: 'vlan',
- pnet.PHYSICAL_NETWORK: 'physnet1',
- pnet.SEGMENTATION_ID: 1}],
+ {pnet.NETWORK_TYPE: 'vlan',
+ pnet.PHYSICAL_NETWORK: 'physnet1',
+ pnet.SEGMENTATION_ID: 1}],
'tenant_id': 'tenant_one'}}
network_req = self.new_create_request('networks', data)
res = network_req.get_response(self.api)
'fake-tenant',
'fake-gateway',
[{'id': _uuid(),
- 'interface_name': 'xxx'}])
+ 'interface_name': 'xxx'}])
def test_delete_l2_gw_service_on_failure(self):
self.assertRaises(nvplib.NvpApiClient.NvpApiException,
'default_route_next_hop':
{'gateway_ip_address': 'fake_address',
'type': 'RouterNextHop'},
- 'type': 'SingleDefaultRouteImplicitRoutingConfig'},
+ 'type': 'SingleDefaultRouteImplicitRoutingConfig'},
'tags': [{'scope': 'os_tid', 'tag': 'fake_tenant_id'},
{'scope': 'quantum',
'tag': nvplib.NEUTRON_VERSION}],
host_routes = [{'destination': '135.207.0.0/16',
'nexthop': '1.2.3.4'},
{'destination': '12.0.0.0/8',
- 'nexthop': '4.3.2.1'}]
+ 'nexthop': '4.3.2.1'}]
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
mock.call.delete_port('fake_port')])
self.driver.assert_has_calls([mock.call.get_device_name(mock.ANY),
mock.call.unplug('tap12345678-12',
- namespace=namespace,
- bridge=None)])
+ namespace=namespace,
+ bridge=None)])
def test_delete_probe_external(self):
fake_network = {'network': {'id': 'fake_net',
mock.call.delete_port('fake_port')])
self.driver.assert_has_calls([mock.call.get_device_name(mock.ANY),
mock.call.unplug('tap12345678-12',
- namespace=namespace,
- bridge='br-ex')])
+ namespace=namespace,
+ bridge='br-ex')])
def test_delete_probe_without_namespace(self):
cfg.CONF.set_override('use_namespaces', False)
subnets=[fake_subnet1, fake_subnet2],
ports=[fake_port1]))
-fake_meta_network = dhcp.NetModel(True,
- dict(id='12345678-1234-5678-1234567890ab',
- tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
- admin_state_up=True,
- subnets=[fake_meta_subnet],
- ports=[fake_meta_port]))
+fake_meta_network = dhcp.NetModel(
+ True, dict(id='12345678-1234-5678-1234567890ab',
+ tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
+ admin_state_up=True,
+ subnets=[fake_meta_subnet],
+ ports=[fake_meta_port]))
-fake_down_network = dhcp.NetModel(True,
- dict(id='12345678-dddd-dddd-1234567890ab',
- tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
- admin_state_up=False,
- subnets=[],
- ports=[]))
+fake_down_network = dhcp.NetModel(
+ True, dict(id='12345678-dddd-dddd-1234567890ab',
+ tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
+ admin_state_up=False,
+ subnets=[],
+ ports=[]))
class TestDhcpAgent(base.BaseTestCase):
fake_network)
def test_put_port(self):
- fake_net = dhcp.NetModel(True,
- dict(id='12345678-1234-5678-1234567890ab',
- tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
- subnets=[fake_subnet1],
- ports=[fake_port1]))
+ fake_net = dhcp.NetModel(
+ True, dict(id='12345678-1234-5678-1234567890ab',
+ tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
+ subnets=[fake_subnet1],
+ ports=[fake_port1]))
nc = dhcp_agent.NetworkCache()
nc.put(fake_net)
nc.put_port(fake_port2)
self.assertIn(fake_port2, fake_net.ports)
def test_put_port_existing(self):
- fake_net = dhcp.NetModel(True,
- dict(id='12345678-1234-5678-1234567890ab',
- tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
- subnets=[fake_subnet1],
- ports=[fake_port1, fake_port2]))
+ fake_net = dhcp.NetModel(
+ True, dict(id='12345678-1234-5678-1234567890ab',
+ tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
+ subnets=[fake_subnet1],
+ ports=[fake_port1, fake_port2]))
nc = dhcp_agent.NetworkCache()
nc.put(fake_net)
nc.put_port(fake_port2)
self.assertIn(fake_port2, fake_net.ports)
def test_remove_port_existing(self):
- fake_net = dhcp.NetModel(True,
- dict(id='12345678-1234-5678-1234567890ab',
- tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
- subnets=[fake_subnet1],
- ports=[fake_port1, fake_port2]))
+ fake_net = dhcp.NetModel(
+ True, dict(id='12345678-1234-5678-1234567890ab',
+ tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
+ subnets=[fake_subnet1],
+ ports=[fake_port1, fake_port2]))
nc = dhcp_agent.NetworkCache()
nc.put(fake_net)
nc.remove_port(fake_port2)
self.assertFalse(plugin.update_dhcp_port.called)
def test_destroy(self):
- fake_net = dhcp.NetModel(True,
- dict(id='12345678-1234-5678-1234567890ab',
- tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa'))
+ fake_net = dhcp.NetModel(
+ True, dict(id='12345678-1234-5678-1234567890ab',
+ tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa'))
- fake_port = dhcp.DictModel(dict(id='12345678-1234-aaaa-1234567890ab',
- mac_address='aa:bb:cc:dd:ee:ff'))
+ fake_port = dhcp.DictModel(
+ dict(id='12345678-1234-aaaa-1234567890ab',
+ mac_address='aa:bb:cc:dd:ee:ff'))
with mock.patch('neutron.agent.linux.interface.NullDriver') as dvr_cls:
mock_driver = mock.MagicMock()
[mock.call.release_dhcp_port(fake_net.id, mock.ANY)])
def test_get_interface_name(self):
- fake_net = dhcp.NetModel(True,
- dict(id='12345678-1234-5678-1234567890ab',
- tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa'))
+ fake_net = dhcp.NetModel(
+ True, dict(id='12345678-1234-5678-1234567890ab',
+ tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa'))
- fake_port = dhcp.DictModel(dict(id='12345678-1234-aaaa-1234567890ab',
- mac_address='aa:bb:cc:dd:ee:ff'))
+ fake_port = dhcp.DictModel(
+ dict(id='12345678-1234-aaaa-1234567890ab',
+ mac_address='aa:bb:cc:dd:ee:ff'))
with mock.patch('neutron.agent.linux.interface.NullDriver') as dvr_cls:
mock_driver = mock.MagicMock()
self.assertEqual(len(plugin.mock_calls), 0)
def test_get_device_id(self):
- fake_net = dhcp.NetModel(True,
- dict(id='12345678-1234-5678-1234567890ab',
- tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa'))
+ fake_net = dhcp.NetModel(
+ True, dict(id='12345678-1234-5678-1234567890ab',
+ tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa'))
expected = ('dhcp1ae5f96c-c527-5079-82ea-371a01645457-12345678-1234-'
'5678-1234567890ab')
filters = {'port_id': [id]}
security_groups = (super(PortSecurityTestPlugin, self).
_get_port_security_group_bindings(
- context, filters))
+ context, filters))
if security_groups and not delete_security_groups:
raise psec.PortSecurityPortHasSecurityGroup()
with self.security_group() as sg:
res = self._create_port(self.fmt, n['network']['id'],
security_groups=(
- [sg['security_group']['id']]))
+ [sg['security_group']['id']]))
port = self.deserialize(self.fmt, res)
data = {'port': {'fixed_ips': port['port']['fixed_ips'],
with self.security_group() as sg:
res = self._create_port(self.fmt, n['network']['id'],
security_groups=(
- [sg['security_group']['id']]))
+ [sg['security_group']['id']]))
port = self.deserialize(self.fmt, res)
data = {'port': {'fixed_ips': port['port']['fixed_ips'],
with self.security_group() as sg:
res = self._create_port(self.fmt, n['network']['id'],
security_groups=(
- [sg['security_group']['id']]))
+ [sg['security_group']['id']]))
port = self.deserialize(self.fmt, res)
self.assertEqual(port['port'][ext_sg.SECURITYGROUPS][0],
sg['security_group']['id'])
calls += [call.add_rule(
'ofake_dev', '-m state --state INVALID -j DROP'),
call.add_rule(
- 'ofake_dev',
- '-m state --state RELATED,ESTABLISHED -j RETURN')]
+ 'ofake_dev',
+ '-m state --state RELATED,ESTABLISHED -j RETURN')]
if egress_expected_call:
calls.append(egress_expected_call)
expected = [['ip', 'route', 'replace', 'to', '110.100.30.0/24',
'via', '10.100.10.30'],
['ip', 'route', 'replace', 'to', '110.100.31.0/24',
- 'via', '10.100.10.30']]
+ 'via', '10.100.10.30']]
self._check_agent_method_called(agent, expected, namespace)
def test_enable(self):
attrs_to_mock = dict(
[(a, mock.DEFAULT) for a in
- ['active', 'get_conf_file_name', 'interface_name']]
+ ['active', 'get_conf_file_name', 'interface_name']]
)
with mock.patch.multiple(LocalChild, **attrs_to_mock) as mocks:
attrs_to_mock = dict(
[(a, mock.DEFAULT) for a in
- ['_output_opts_file', 'get_conf_file_name', 'interface_name']]
+ ['_output_opts_file', 'get_conf_file_name', 'interface_name']]
)
with mock.patch.multiple(dhcp.Dnsmasq, **attrs_to_mock) as mocks:
ip_lib.IPWrapper('sudo').add_veth('tap0', 'tap1')
self.execute.assert_called_once_with('', 'link',
('add', 'tap0', 'type', 'veth',
- 'peer', 'name', 'tap1'),
+ 'peer', 'name', 'tap1'),
'sudo', None)
def test_add_veth_with_namespaces(self):
[call(None,
{'args':
{'devices': ['fake_device']},
- 'method': 'security_group_rules_for_devices',
- 'namespace': None},
+ 'method': 'security_group_rules_for_devices',
+ 'namespace': None},
version=sg_rpc.SG_RPC_VERSION,
topic='fake_topic')])