host=review.openstack.org
port=29418
project=openstack/neutron.git
-defaultbranch=stable/icehouse
'default': True,
'convert_to': convert_to_boolean,
'is_visible': True},
- # NOTE: The following two attributes will be made visible once IPv6
- # will be fully supported
- 'ipv6_ra_mode': {'allow_post': False, 'allow_put': False,
+ 'ipv6_ra_mode': {'allow_post': True, 'allow_put': True,
'default': ATTR_NOT_SPECIFIED,
'validate': {'type:values': constants.IPV6_MODES},
- 'is_visible': False},
- 'ipv6_address_mode': {'allow_post': False, 'allow_put': False,
+ 'is_visible': True},
+ 'ipv6_address_mode': {'allow_post': True, 'allow_put': True,
'default': ATTR_NOT_SPECIFIED,
'validate': {'type:values':
constants.IPV6_MODES},
- 'is_visible': False},
+ 'is_visible': True},
SHARED: {'allow_post': False,
'allow_put': False,
'default': False,
'gateway_ip': s['gateway_ip'],
'shared': network.shared}
if s['ip_version'] == 6 and s['enable_dhcp']:
- if attributes.is_attr_set(s.get('ipv6_ra_mode')):
+ if attributes.is_attr_set(s['ipv6_ra_mode']):
args['ipv6_ra_mode'] = s['ipv6_ra_mode']
- if attributes.is_attr_set(s.get('ipv6_address_mode')):
+ if attributes.is_attr_set(s['ipv6_address_mode']):
args['ipv6_address_mode'] = s['ipv6_address_mode']
subnet = models_v2.Subnet(**args)
return {"name": name, "value": str(value)}
-def _get_opts(name, value):
- return {"name": name, "value": str(value)}
-
-
def lsn_port_dhcp_configure(
cluster, lsn_id, lsn_port_id, is_enabled=True, dhcp_options=None):
dhcp_options = dhcp_options or {}
self.assertFalse(connection.forced_down)
self.assertEqual(2, self.admin_state.call_count)
- def test_update_connection_admin_up(self):
- """Connection updated to admin up state - record."""
- # Make existing service, and connection that was admin down
- conn_data = {u'id': '1', u'status': constants.DOWN,
- u'admin_state_up': False,
- u'cisco': {u'site_conn_id': u'Tunnel0'}}
- service_data = {u'id': u'123',
- u'status': constants.DOWN,
- u'external_ip': u'1.1.1.1',
- u'admin_state_up': True,
- u'ipsec_conns': [conn_data]}
- self.driver.update_service(self.context, service_data)
- self.driver.mark_existing_connections_as_dirty()
- # Now simulate that the notification shows the connection admin up
- conn_data[u'admin_state_up'] = True
- conn_data[u'status'] = constants.DOWN
-
- connection = self.driver.update_connection(self.context,
- u'123', conn_data)
- self.assertFalse(connection.is_dirty)
- self.assertFalse(connection.forced_down)
- self.assertEqual(u'Tunnel0', connection.tunnel)
- self.assertEqual(constants.DOWN, connection.last_status)
- self.assertEqual(1, self.conn_create.call_count)
-
def test_update_for_vpn_service_create(self):
"""Creation of new IPSec connection on new VPN service - create.
of a service that is in the admin down state. Structures will be
created, but forced down.
"""
- conn_data = {u'id': u'1', u'status': constants.ACTIVE,
- u'admin_state_up': True,
- u'cisco': {u'site_conn_id': u'Tunnel0'}}
service_data = {u'id': u'123',
u'status': constants.DOWN,
u'external_ip': u'1.1.1.1',
import mock
from oslo.config import cfg
from testtools import matchers
-from testtools import testcase
import webob.exc
import neutron
res = subnet_req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
- @testcase.skip("Skipped until bug 1304093 is fixed")
def test_create_subnet_ipv6_attributes(self):
gateway_ip = 'fe80::1'
cidr = 'fe80::/80'
ipv6_ra_mode=mode,
ipv6_address_mode=mode)
- @testcase.skip("Skipped until bug 1304093 is fixed")
def test_create_subnet_ipv6_attributes_no_dhcp_enabled(self):
gateway_ip = 'fe80::1'
cidr = 'fe80::/80'
self.assertEqual(ctx_manager.exception.code,
webob.exc.HTTPClientError.code)
- @testcase.skip("Skipped until bug 1304093 is fixed")
def test_create_subnet_ipv6_single_attribute_set(self):
gateway_ip = 'fe80::1'
cidr = 'fe80::/80'
self.assertEqual(res.status_int,
webob.exc.HTTPConflict.code)
- @testcase.skip("Skipped until bug 1304093 is fixed")
def test_update_subnet_ipv6_attributes(self):
with self.subnet(ip_version=6, cidr='fe80::/80',
ipv6_ra_mode=constants.IPV6_SLAAC,
self.assertEqual(res['subnet']['ipv6_address_mode'],
data['subnet']['ipv6_address_mode'])
- @testcase.skip("Skipped until bug 1304093 is fixed")
def test_update_subnet_ipv6_inconsistent_ra_attribute(self):
with self.subnet(ip_version=6, cidr='fe80::/80',
ipv6_ra_mode=constants.IPV6_SLAAC,
self.assertEqual(res.status_int,
webob.exc.HTTPClientError.code)
- @testcase.skip("Skipped until bug 1304093 is fixed")
def test_update_subnet_ipv6_inconsistent_address_attribute(self):
with self.subnet(ip_version=6, cidr='fe80::/80',
ipv6_ra_mode=constants.IPV6_SLAAC,
self.assertEqual(res.status_int,
webob.exc.HTTPClientError.code)
- @testcase.skip("Skipped until bug 1304093 is fixed")
def test_update_subnet_ipv6_inconsistent_enable_dhcp(self):
with self.subnet(ip_version=6, cidr='fe80::/80',
ipv6_ra_mode=constants.IPV6_SLAAC,
self.assertEqual(1, len(nat_rules_delta))
self._verify_snat_rules(nat_rules_delta, router)
- def test_process_ipv6_only_gw(self):
- agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
- router = self._prepare_router_data(ip_version=6)
- # Get NAT rules without the gw_port
- gw_port = router['gw_port']
- router['gw_port'] = None
- ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
- self.conf.use_namespaces, router=router)
- agent.external_gateway_added = mock.Mock()
- agent.process_router(ri)
- orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
-
- # Get NAT rules with the gw_port
- router['gw_port'] = gw_port
- ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
- self.conf.use_namespaces, router=router)
- with mock.patch.object(
- agent,
- 'external_gateway_nat_rules') as external_gateway_nat_rules:
- agent.process_router(ri)
- new_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
-
- # There should be no change with the NAT rules
- self.assertFalse(external_gateway_nat_rules.called)
- self.assertEqual(orig_nat_rules, new_nat_rules)
-
- def test_process_router_ipv6_interface_added(self):
- agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
- router = self._prepare_router_data()
- ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
- self.conf.use_namespaces, router=router)
- agent.external_gateway_added = mock.Mock()
- # Process with NAT
- agent.process_router(ri)
- orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
- # Add an IPv6 interface and reprocess
- router[l3_constants.INTERFACE_KEY].append(
- {'id': _uuid(),
- 'network_id': _uuid(),
- 'admin_state_up': True,
- 'fixed_ips': [{'ip_address': 'fd00::2',
- 'subnet_id': _uuid()}],
- 'mac_address': 'ca:fe:de:ad:be:ef',
- 'subnet': {'cidr': 'fd00::/64',
- 'gateway_ip': 'fd00::1'}})
- # Reassign the router object to RouterInfo
- ri.router = router
- agent.process_router(ri)
- # For some reason set logic does not work well with
- # IpTablesRule instances
- nat_rules_delta = [r for r in ri.iptables_manager.ipv4['nat'].rules
- if r not in orig_nat_rules]
- self.assertFalse(nat_rules_delta)
-
- def test_process_router_ipv6v4_interface_added(self):
- agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
- router = self._prepare_router_data()
- ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
- self.conf.use_namespaces, router=router)
- agent.external_gateway_added = mock.Mock()
- # Process with NAT
- agent.process_router(ri)
- orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
- # Add an IPv4 and IPv6 interface and reprocess
- router[l3_constants.INTERFACE_KEY].append(
- {'id': _uuid(),
- 'network_id': _uuid(),
- 'admin_state_up': True,
- 'fixed_ips': [{'ip_address': '35.4.1.4',
- 'subnet_id': _uuid()}],
- 'mac_address': 'ca:fe:de:ad:be:ef',
- 'subnet': {'cidr': '35.4.1.0/24',
- 'gateway_ip': '35.4.1.1'}})
-
- router[l3_constants.INTERFACE_KEY].append(
- {'id': _uuid(),
- 'network_id': _uuid(),
- 'admin_state_up': True,
- 'fixed_ips': [{'ip_address': 'fd00::2',
- 'subnet_id': _uuid()}],
- 'mac_address': 'ca:fe:de:ad:be:ef',
- 'subnet': {'cidr': 'fd00::/64',
- 'gateway_ip': 'fd00::1'}})
- # Reassign the router object to RouterInfo
- ri.router = router
- agent.process_router(ri)
- # For some reason set logic does not work well with
- # IpTablesRule instances
- nat_rules_delta = [r for r in ri.iptables_manager.ipv4['nat'].rules
- if r not in orig_nat_rules]
- self.assertEqual(1, len(nat_rules_delta))
- self._verify_snat_rules(nat_rules_delta, router)
-
def test_process_router_interface_removed(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
router = self._prepare_router_data(num_internal_ports=2)
'NoopFirewallDriver')
-class SecurityGroupAgentRpcTestCaseForNoneDriver(base.BaseTestCase):
- def test_init_firewall_with_none_driver(self):
- cfg.CONF.set_override(
- 'enable_security_group', False,
- group='SECURITYGROUP')
- agent = sg_rpc.SecurityGroupAgentRpcMixin()
- agent.init_firewall()
- self.assertEqual(agent.firewall.__class__.__name__,
- 'NoopFirewallDriver')
-
-
class SecurityGroupAgentRpcTestCase(base.BaseTestCase):
def setUp(self, defer_refresh_firewall=False):
super(SecurityGroupAgentRpcTestCase, self).setUp()