Set lock_path correctly.
[openstack-build/neutron-build.git] / neutron / tests / unit / extensions / test_l3.py
1 # Copyright 2012 VMware, Inc.
2 # All rights reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15 #
16
17 import contextlib
18 import copy
19
20 import mock
21 import netaddr
22 from oslo_config import cfg
23 from oslo_utils import importutils
24 from oslo_utils import uuidutils
25 from webob import exc
26
27 from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
28 from neutron.api.rpc.handlers import l3_rpc
29 from neutron.api.v2 import attributes
30 from neutron.callbacks import events
31 from neutron.callbacks import exceptions
32 from neutron.callbacks import registry
33 from neutron.callbacks import resources
34 from neutron.common import constants as l3_constants
35 from neutron.common import exceptions as n_exc
36 from neutron import context
37 from neutron.db import common_db_mixin
38 from neutron.db import db_base_plugin_v2
39 from neutron.db import external_net_db
40 from neutron.db import l3_agentschedulers_db
41 from neutron.db import l3_attrs_db
42 from neutron.db import l3_db
43 from neutron.db import l3_dvr_db
44 from neutron.db import l3_dvrscheduler_db
45 from neutron.extensions import external_net
46 from neutron.extensions import l3
47 from neutron.extensions import portbindings
48 from neutron import manager
49 from neutron.plugins.common import constants as service_constants
50 from neutron.tests import base
51 from neutron.tests.common import helpers
52 from neutron.tests import fake_notifier
53 from neutron.tests.unit.api import test_extensions
54 from neutron.tests.unit.api.v2 import test_base
55 from neutron.tests.unit.db import test_db_base_plugin_v2
56 from neutron.tests.unit.extensions import base as test_extensions_base
57 from neutron.tests.unit.extensions import test_agent
58 from neutron.tests.unit.plugins.ml2 import base as ml2_base
59
60
61 _uuid = uuidutils.generate_uuid
62 _get_path = test_base._get_path
63
64
65 DEVICE_OWNER_COMPUTE = l3_constants.DEVICE_OWNER_COMPUTE_PREFIX + 'fake'
66
67
68 class L3TestExtensionManager(object):
69
70     def get_resources(self):
71         # Add the resources to the global attribute map
72         # This is done here as the setup process won't
73         # initialize the main API router which extends
74         # the global attribute map
75         attributes.RESOURCE_ATTRIBUTE_MAP.update(
76             l3.RESOURCE_ATTRIBUTE_MAP)
77         return l3.L3.get_resources()
78
79     def get_actions(self):
80         return []
81
82     def get_request_extensions(self):
83         return []
84
85
86 class L3NatExtensionTestCase(test_extensions_base.ExtensionTestCase):
87     fmt = 'json'
88
89     def setUp(self):
90         super(L3NatExtensionTestCase, self).setUp()
91         self._setUpExtension(
92             'neutron.extensions.l3.RouterPluginBase', None,
93             l3.RESOURCE_ATTRIBUTE_MAP, l3.L3, '',
94             allow_pagination=True, allow_sorting=True,
95             supported_extension_aliases=['router'],
96             use_quota=True)
97
98     def test_router_create(self):
99         router_id = _uuid()
100         data = {'router': {'name': 'router1', 'admin_state_up': True,
101                            'tenant_id': _uuid(),
102                            'external_gateway_info': None}}
103         return_value = copy.deepcopy(data['router'])
104         return_value.update({'status': "ACTIVE", 'id': router_id})
105
106         instance = self.plugin.return_value
107         instance.create_router.return_value = return_value
108         instance.get_routers_count.return_value = 0
109         res = self.api.post(_get_path('routers', fmt=self.fmt),
110                             self.serialize(data),
111                             content_type='application/%s' % self.fmt)
112         instance.create_router.assert_called_with(mock.ANY,
113                                                   router=data)
114         self.assertEqual(res.status_int, exc.HTTPCreated.code)
115         res = self.deserialize(res)
116         self.assertIn('router', res)
117         router = res['router']
118         self.assertEqual(router['id'], router_id)
119         self.assertEqual(router['status'], "ACTIVE")
120         self.assertTrue(router['admin_state_up'])
121
122     def test_router_list(self):
123         router_id = _uuid()
124         return_value = [{'name': 'router1', 'admin_state_up': True,
125                          'tenant_id': _uuid(), 'id': router_id}]
126
127         instance = self.plugin.return_value
128         instance.get_routers.return_value = return_value
129
130         res = self.api.get(_get_path('routers', fmt=self.fmt))
131
132         instance.get_routers.assert_called_with(mock.ANY, fields=mock.ANY,
133                                                 filters=mock.ANY,
134                                                 sorts=mock.ANY,
135                                                 limit=mock.ANY,
136                                                 marker=mock.ANY,
137                                                 page_reverse=mock.ANY)
138         self.assertEqual(res.status_int, exc.HTTPOk.code)
139         res = self.deserialize(res)
140         self.assertIn('routers', res)
141         self.assertEqual(1, len(res['routers']))
142         self.assertEqual(router_id, res['routers'][0]['id'])
143
144     def test_router_update(self):
145         router_id = _uuid()
146         update_data = {'router': {'admin_state_up': False}}
147         return_value = {'name': 'router1', 'admin_state_up': False,
148                         'tenant_id': _uuid(),
149                         'status': "ACTIVE", 'id': router_id}
150
151         instance = self.plugin.return_value
152         instance.update_router.return_value = return_value
153
154         res = self.api.put(_get_path('routers', id=router_id,
155                                      fmt=self.fmt),
156                            self.serialize(update_data))
157
158         instance.update_router.assert_called_with(mock.ANY, router_id,
159                                                   router=update_data)
160         self.assertEqual(res.status_int, exc.HTTPOk.code)
161         res = self.deserialize(res)
162         self.assertIn('router', res)
163         router = res['router']
164         self.assertEqual(router['id'], router_id)
165         self.assertEqual(router['status'], "ACTIVE")
166         self.assertFalse(router['admin_state_up'])
167
168     def test_router_get(self):
169         router_id = _uuid()
170         return_value = {'name': 'router1', 'admin_state_up': False,
171                         'tenant_id': _uuid(),
172                         'status': "ACTIVE", 'id': router_id}
173
174         instance = self.plugin.return_value
175         instance.get_router.return_value = return_value
176
177         res = self.api.get(_get_path('routers', id=router_id,
178                                      fmt=self.fmt))
179
180         instance.get_router.assert_called_with(mock.ANY, router_id,
181                                                fields=mock.ANY)
182         self.assertEqual(res.status_int, exc.HTTPOk.code)
183         res = self.deserialize(res)
184         self.assertIn('router', res)
185         router = res['router']
186         self.assertEqual(router['id'], router_id)
187         self.assertEqual(router['status'], "ACTIVE")
188         self.assertFalse(router['admin_state_up'])
189
190     def test_router_delete(self):
191         router_id = _uuid()
192
193         res = self.api.delete(_get_path('routers', id=router_id))
194
195         instance = self.plugin.return_value
196         instance.delete_router.assert_called_with(mock.ANY, router_id)
197         self.assertEqual(res.status_int, exc.HTTPNoContent.code)
198
199     def test_router_add_interface(self):
200         router_id = _uuid()
201         subnet_id = _uuid()
202         port_id = _uuid()
203
204         interface_data = {'subnet_id': subnet_id}
205         return_value = copy.deepcopy(interface_data)
206         return_value['port_id'] = port_id
207
208         instance = self.plugin.return_value
209         instance.add_router_interface.return_value = return_value
210
211         path = _get_path('routers', id=router_id,
212                          action="add_router_interface",
213                          fmt=self.fmt)
214         res = self.api.put(path, self.serialize(interface_data))
215
216         instance.add_router_interface.assert_called_with(mock.ANY, router_id,
217                                                          interface_data)
218         self.assertEqual(res.status_int, exc.HTTPOk.code)
219         res = self.deserialize(res)
220         self.assertIn('port_id', res)
221         self.assertEqual(res['port_id'], port_id)
222         self.assertEqual(res['subnet_id'], subnet_id)
223
224
225 # This base plugin class is for tests.
226 class TestL3NatBasePlugin(db_base_plugin_v2.NeutronDbPluginV2,
227                           external_net_db.External_net_db_mixin):
228
229     __native_pagination_support = True
230     __native_sorting_support = True
231
232     def create_network(self, context, network):
233         session = context.session
234         with session.begin(subtransactions=True):
235             net = super(TestL3NatBasePlugin, self).create_network(context,
236                                                                   network)
237             self._process_l3_create(context, net, network['network'])
238         return net
239
240     def update_network(self, context, id, network):
241
242         session = context.session
243         with session.begin(subtransactions=True):
244             net = super(TestL3NatBasePlugin, self).update_network(context, id,
245                                                                   network)
246             self._process_l3_update(context, net, network['network'])
247         return net
248
249     def delete_network(self, context, id):
250         with context.session.begin(subtransactions=True):
251             self._process_l3_delete(context, id)
252             super(TestL3NatBasePlugin, self).delete_network(context, id)
253
254     def delete_port(self, context, id, l3_port_check=True):
255         plugin = manager.NeutronManager.get_service_plugins().get(
256             service_constants.L3_ROUTER_NAT)
257         if plugin:
258             if l3_port_check:
259                 plugin.prevent_l3_port_deletion(context, id)
260             plugin.disassociate_floatingips(context, id)
261         return super(TestL3NatBasePlugin, self).delete_port(context, id)
262
263
264 # This plugin class is for tests with plugin that integrates L3.
265 class TestL3NatIntPlugin(TestL3NatBasePlugin,
266                          l3_db.L3_NAT_db_mixin):
267
268     supported_extension_aliases = ["external-net", "router"]
269
270
271 # This plugin class is for tests with plugin that integrates L3 and L3 agent
272 # scheduling.
273 class TestL3NatIntAgentSchedulingPlugin(TestL3NatIntPlugin,
274                                         l3_agentschedulers_db.
275                                         L3AgentSchedulerDbMixin):
276
277     supported_extension_aliases = ["external-net", "router",
278                                    "l3_agent_scheduler"]
279     router_scheduler = importutils.import_object(
280         cfg.CONF.router_scheduler_driver)
281
282
283 # This plugin class is for tests with plugin not supporting L3.
284 class TestNoL3NatPlugin(TestL3NatBasePlugin):
285
286     __native_pagination_support = True
287     __native_sorting_support = True
288
289     supported_extension_aliases = ["external-net"]
290
291
292 # A L3 routing service plugin class for tests with plugins that
293 # delegate away L3 routing functionality
294 class TestL3NatServicePlugin(common_db_mixin.CommonDbMixin,
295                              l3_dvr_db.L3_NAT_with_dvr_db_mixin,
296                              l3_db.L3_NAT_db_mixin):
297
298     supported_extension_aliases = ["router"]
299
300     def get_plugin_type(self):
301         return service_constants.L3_ROUTER_NAT
302
303     def get_plugin_description(self):
304         return "L3 Routing Service Plugin for testing"
305
306
307 # A L3 routing with L3 agent scheduling service plugin class for tests with
308 # plugins that delegate away L3 routing functionality
309 class TestL3NatAgentSchedulingServicePlugin(TestL3NatServicePlugin,
310                                             l3_dvrscheduler_db.
311                                             L3_DVRsch_db_mixin):
312
313     supported_extension_aliases = ["router", "l3_agent_scheduler"]
314
315     def __init__(self):
316         super(TestL3NatAgentSchedulingServicePlugin, self).__init__()
317         self.router_scheduler = importutils.import_object(
318             cfg.CONF.router_scheduler_driver)
319         self.agent_notifiers.update(
320             {l3_constants.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()})
321
322
323 class L3NatTestCaseMixin(object):
324
325     def _create_router(self, fmt, tenant_id, name=None,
326                        admin_state_up=None, set_context=False,
327                        arg_list=None, **kwargs):
328         data = {'router': {'tenant_id': tenant_id}}
329         if name:
330             data['router']['name'] = name
331         if admin_state_up:
332             data['router']['admin_state_up'] = admin_state_up
333         for arg in (('admin_state_up', 'tenant_id', 'availability_zone_hints')
334                     + (arg_list or ())):
335             # Arg must be present and not empty
336             if arg in kwargs:
337                 data['router'][arg] = kwargs[arg]
338         router_req = self.new_create_request('routers', data, fmt)
339         if set_context and tenant_id:
340             # create a specific auth context for this request
341             router_req.environ['neutron.context'] = context.Context(
342                 '', tenant_id)
343
344         return router_req.get_response(self.ext_api)
345
346     def _make_router(self, fmt, tenant_id, name=None, admin_state_up=None,
347                      external_gateway_info=None, set_context=False,
348                      arg_list=None, **kwargs):
349         if external_gateway_info:
350             arg_list = ('external_gateway_info', ) + (arg_list or ())
351         res = self._create_router(fmt, tenant_id, name,
352                                   admin_state_up, set_context,
353                                   arg_list=arg_list,
354                                   external_gateway_info=external_gateway_info,
355                                   **kwargs)
356         return self.deserialize(fmt, res)
357
358     def _add_external_gateway_to_router(self, router_id, network_id,
359                                         expected_code=exc.HTTPOk.code,
360                                         neutron_context=None, ext_ips=None):
361         ext_ips = ext_ips or []
362         body = {'router':
363                 {'external_gateway_info': {'network_id': network_id}}}
364         if ext_ips:
365             body['router']['external_gateway_info'][
366                 'external_fixed_ips'] = ext_ips
367         return self._update('routers', router_id, body,
368                             expected_code=expected_code,
369                             neutron_context=neutron_context)
370
371     def _remove_external_gateway_from_router(self, router_id, network_id,
372                                              expected_code=exc.HTTPOk.code,
373                                              external_gw_info=None):
374         return self._update('routers', router_id,
375                             {'router': {'external_gateway_info':
376                                         external_gw_info}},
377                             expected_code=expected_code)
378
379     def _router_interface_action(self, action, router_id, subnet_id, port_id,
380                                  expected_code=exc.HTTPOk.code,
381                                  expected_body=None,
382                                  tenant_id=None,
383                                  msg=None):
384         interface_data = {}
385         if subnet_id:
386             interface_data.update({'subnet_id': subnet_id})
387         if port_id:
388             interface_data.update({'port_id': port_id})
389
390         req = self.new_action_request('routers', interface_data, router_id,
391                                       "%s_router_interface" % action)
392         # if tenant_id was specified, create a tenant context for this request
393         if tenant_id:
394             req.environ['neutron.context'] = context.Context(
395                 '', tenant_id)
396         res = req.get_response(self.ext_api)
397         self.assertEqual(res.status_int, expected_code, msg)
398         response = self.deserialize(self.fmt, res)
399         if expected_body:
400             self.assertEqual(response, expected_body, msg)
401         return response
402
403     @contextlib.contextmanager
404     def router(self, name='router1', admin_state_up=True,
405                fmt=None, tenant_id=_uuid(),
406                external_gateway_info=None, set_context=False,
407                **kwargs):
408         router = self._make_router(fmt or self.fmt, tenant_id, name,
409                                    admin_state_up, external_gateway_info,
410                                    set_context, **kwargs)
411         yield router
412
413     def _set_net_external(self, net_id):
414         self._update('networks', net_id,
415                      {'network': {external_net.EXTERNAL: True}})
416
417     def _create_floatingip(self, fmt, network_id, port_id=None,
418                            fixed_ip=None, set_context=False,
419                            floating_ip=None, subnet_id=False):
420         data = {'floatingip': {'floating_network_id': network_id,
421                                'tenant_id': self._tenant_id}}
422         if port_id:
423             data['floatingip']['port_id'] = port_id
424             if fixed_ip:
425                 data['floatingip']['fixed_ip_address'] = fixed_ip
426
427         if floating_ip:
428             data['floatingip']['floating_ip_address'] = floating_ip
429
430         if subnet_id:
431             data['floatingip']['subnet_id'] = subnet_id
432         floatingip_req = self.new_create_request('floatingips', data, fmt)
433         if set_context and self._tenant_id:
434             # create a specific auth context for this request
435             floatingip_req.environ['neutron.context'] = context.Context(
436                 '', self._tenant_id)
437         return floatingip_req.get_response(self.ext_api)
438
439     def _make_floatingip(self, fmt, network_id, port_id=None,
440                          fixed_ip=None, set_context=False, floating_ip=None,
441                          http_status=exc.HTTPCreated.code):
442         res = self._create_floatingip(fmt, network_id, port_id,
443                                       fixed_ip, set_context, floating_ip)
444         self.assertEqual(res.status_int, http_status)
445         return self.deserialize(fmt, res)
446
447     def _validate_floating_ip(self, fip):
448         body = self._list('floatingips')
449         self.assertEqual(len(body['floatingips']), 1)
450         self.assertEqual(body['floatingips'][0]['id'],
451                          fip['floatingip']['id'])
452
453         body = self._show('floatingips', fip['floatingip']['id'])
454         self.assertEqual(body['floatingip']['id'],
455                          fip['floatingip']['id'])
456
457     @contextlib.contextmanager
458     def floatingip_with_assoc(self, port_id=None, fmt=None, fixed_ip=None,
459                               set_context=False):
460         with self.subnet(cidr='11.0.0.0/24') as public_sub:
461             self._set_net_external(public_sub['subnet']['network_id'])
462             private_port = None
463             if port_id:
464                 private_port = self._show('ports', port_id)
465             with test_db_base_plugin_v2.optional_ctx(private_port,
466                                              self.port) as private_port:
467                 with self.router() as r:
468                     sid = private_port['port']['fixed_ips'][0]['subnet_id']
469                     private_sub = {'subnet': {'id': sid}}
470                     floatingip = None
471
472                     self._add_external_gateway_to_router(
473                         r['router']['id'],
474                         public_sub['subnet']['network_id'])
475                     self._router_interface_action(
476                         'add', r['router']['id'],
477                         private_sub['subnet']['id'], None)
478
479                     floatingip = self._make_floatingip(
480                         fmt or self.fmt,
481                         public_sub['subnet']['network_id'],
482                         port_id=private_port['port']['id'],
483                         fixed_ip=fixed_ip,
484                         set_context=set_context)
485                     yield floatingip
486
487                     if floatingip:
488                         self._delete('floatingips',
489                                      floatingip['floatingip']['id'])
490
491     @contextlib.contextmanager
492     def floatingip_no_assoc_with_public_sub(
493         self, private_sub, fmt=None, set_context=False, public_sub=None):
494         self._set_net_external(public_sub['subnet']['network_id'])
495         with self.router() as r:
496             floatingip = None
497
498             self._add_external_gateway_to_router(
499                 r['router']['id'],
500                 public_sub['subnet']['network_id'])
501             self._router_interface_action('add', r['router']['id'],
502                                           private_sub['subnet']['id'],
503                                           None)
504
505             floatingip = self._make_floatingip(
506                 fmt or self.fmt,
507                 public_sub['subnet']['network_id'],
508                 set_context=set_context)
509             yield floatingip, r
510
511             if floatingip:
512                 self._delete('floatingips',
513                              floatingip['floatingip']['id'])
514
515     @contextlib.contextmanager
516     def floatingip_no_assoc(self, private_sub, fmt=None, set_context=False):
517         with self.subnet(cidr='12.0.0.0/24') as public_sub:
518             with self.floatingip_no_assoc_with_public_sub(
519                 private_sub, fmt, set_context, public_sub) as (f, r):
520                 # Yield only the floating ip object
521                 yield f
522
523
524 class ExtraAttributesMixinTestCase(base.BaseTestCase):
525
526     def setUp(self):
527         super(ExtraAttributesMixinTestCase, self).setUp()
528         self.mixin = l3_attrs_db.ExtraAttributesMixin()
529
530     def _test__extend_extra_router_dict(
531         self, extra_attributes, attributes, expected_attributes):
532         self.mixin._extend_extra_router_dict(
533             attributes, {'extra_attributes': extra_attributes})
534         self.assertEqual(expected_attributes, attributes)
535
536     def test__extend_extra_router_dict_string_default(self):
537         self.mixin.extra_attributes = [{
538             'name': "foo_key",
539             'default': 'foo_default'
540         }]
541         extension_attributes = {'foo_key': 'my_fancy_value'}
542         self._test__extend_extra_router_dict(
543             extension_attributes, {}, extension_attributes)
544
545     def test__extend_extra_router_dict_booleans_false_default(self):
546         self.mixin.extra_attributes = [{
547             'name': "foo_key",
548             'default': False
549         }]
550         extension_attributes = {'foo_key': True}
551         self._test__extend_extra_router_dict(
552             extension_attributes, {}, extension_attributes)
553
554     def test__extend_extra_router_dict_booleans_true_default(self):
555         self.mixin.extra_attributes = [{
556             'name': "foo_key",
557             'default': True
558         }]
559         # Test that the default is overridden
560         extension_attributes = {'foo_key': False}
561         self._test__extend_extra_router_dict(
562             extension_attributes, {}, extension_attributes)
563
564     def test__extend_extra_router_dict_no_extension_attributes(self):
565         self.mixin.extra_attributes = [{
566             'name': "foo_key",
567             'default': 'foo_value'
568         }]
569         self._test__extend_extra_router_dict({}, {}, {'foo_key': 'foo_value'})
570
571     def test__extend_extra_router_dict_none_extension_attributes(self):
572         self._test__extend_extra_router_dict(None, {}, {})
573
574
575 class L3NatTestCaseBase(L3NatTestCaseMixin):
576
577     def test_router_create(self):
578         name = 'router1'
579         tenant_id = _uuid()
580         expected_value = [('name', name), ('tenant_id', tenant_id),
581                           ('admin_state_up', True), ('status', 'ACTIVE'),
582                           ('external_gateway_info', None)]
583         with self.router(name='router1', admin_state_up=True,
584                          tenant_id=tenant_id) as router:
585             for k, v in expected_value:
586                 self.assertEqual(router['router'][k], v)
587
588     def test_router_create_call_extensions(self):
589         self.extension_called = False
590
591         def _extend_router_dict_test_attr(*args, **kwargs):
592             self.extension_called = True
593
594         db_base_plugin_v2.NeutronDbPluginV2.register_dict_extend_funcs(
595             l3.ROUTERS, [_extend_router_dict_test_attr])
596         self.assertFalse(self.extension_called)
597         with self.router():
598             self.assertTrue(self.extension_called)
599
600     def test_router_create_with_gwinfo(self):
601         with self.subnet() as s:
602             self._set_net_external(s['subnet']['network_id'])
603             data = {'router': {'tenant_id': _uuid()}}
604             data['router']['name'] = 'router1'
605             data['router']['external_gateway_info'] = {
606                 'network_id': s['subnet']['network_id']}
607             router_req = self.new_create_request('routers', data, self.fmt)
608             res = router_req.get_response(self.ext_api)
609             router = self.deserialize(self.fmt, res)
610             self.assertEqual(
611                 s['subnet']['network_id'],
612                 router['router']['external_gateway_info']['network_id'])
613
614     def test_router_create_with_gwinfo_ext_ip(self):
615         with self.subnet() as s:
616             self._set_net_external(s['subnet']['network_id'])
617             ext_info = {
618                 'network_id': s['subnet']['network_id'],
619                 'external_fixed_ips': [{'ip_address': '10.0.0.99'}]
620             }
621             res = self._create_router(
622                 self.fmt, _uuid(), arg_list=('external_gateway_info',),
623                 external_gateway_info=ext_info
624             )
625             router = self.deserialize(self.fmt, res)
626             self.assertEqual(
627                 [{'ip_address': '10.0.0.99', 'subnet_id': s['subnet']['id']}],
628                 router['router']['external_gateway_info'][
629                     'external_fixed_ips'])
630
631     def test_router_create_with_gwinfo_ext_ip_subnet(self):
632         with self.network() as n:
633             with self.subnet(network=n) as v1,\
634                     self.subnet(network=n, cidr='1.0.0.0/24') as v2,\
635                     self.subnet(network=n, cidr='2.0.0.0/24') as v3:
636                 subnets = (v1, v2, v3)
637                 self._set_net_external(n['network']['id'])
638                 for s in subnets:
639                     ext_info = {
640                         'network_id': n['network']['id'],
641                         'external_fixed_ips': [
642                             {'subnet_id': s['subnet']['id']}]
643                     }
644                     res = self._create_router(
645                         self.fmt, _uuid(), arg_list=('external_gateway_info',),
646                         external_gateway_info=ext_info
647                     )
648                     router = self.deserialize(self.fmt, res)
649                     ext_ips = router['router']['external_gateway_info'][
650                         'external_fixed_ips']
651
652                     self.assertEqual(
653                         [{'subnet_id': s['subnet']['id'],
654                           'ip_address': mock.ANY}], ext_ips)
655
656     def test_router_create_with_gwinfo_ext_ip_non_admin(self):
657         with self.subnet() as s:
658             self._set_net_external(s['subnet']['network_id'])
659             ext_info = {
660                 'network_id': s['subnet']['network_id'],
661                 'external_fixed_ips': [{'ip_address': '10.0.0.99'}]
662             }
663             res = self._create_router(
664                 self.fmt, _uuid(), arg_list=('external_gateway_info',),
665                 set_context=True, external_gateway_info=ext_info
666             )
667             self.assertEqual(res.status_int, exc.HTTPForbidden.code)
668
669     def test_router_list(self):
670         with self.router() as v1, self.router() as v2, self.router() as v3:
671             routers = (v1, v2, v3)
672             self._test_list_resources('router', routers)
673
674     def test_router_list_with_parameters(self):
675         with self.router(name='router1') as router1,\
676                 self.router(name='router2') as router2:
677             query_params = 'name=router1'
678             self._test_list_resources('router', [router1],
679                                       query_params=query_params)
680             query_params = 'name=router2'
681             self._test_list_resources('router', [router2],
682                                       query_params=query_params)
683             query_params = 'name=router3'
684             self._test_list_resources('router', [],
685                                       query_params=query_params)
686
687     def test_router_list_with_sort(self):
688         with self.router(name='router1') as router1,\
689                 self.router(name='router2') as router2,\
690                 self.router(name='router3') as router3:
691             self._test_list_with_sort('router', (router3, router2, router1),
692                                       [('name', 'desc')])
693
694     def test_router_list_with_pagination(self):
695         with self.router(name='router1') as router1,\
696                 self.router(name='router2') as router2,\
697                 self.router(name='router3') as router3:
698             self._test_list_with_pagination('router',
699                                             (router1, router2, router3),
700                                             ('name', 'asc'), 2, 2)
701
702     def test_router_list_with_pagination_reverse(self):
703         with self.router(name='router1') as router1,\
704                 self.router(name='router2') as router2,\
705                 self.router(name='router3') as router3:
706             self._test_list_with_pagination_reverse('router',
707                                                     (router1, router2,
708                                                      router3),
709                                                     ('name', 'asc'), 2, 2)
710
711     def test_router_update(self):
712         rname1 = "yourrouter"
713         rname2 = "nachorouter"
714         with self.router(name=rname1) as r:
715             body = self._show('routers', r['router']['id'])
716             self.assertEqual(body['router']['name'], rname1)
717
718             body = self._update('routers', r['router']['id'],
719                                 {'router': {'name': rname2}})
720
721             body = self._show('routers', r['router']['id'])
722             self.assertEqual(body['router']['name'], rname2)
723
724     def test_router_update_gateway(self):
725         with self.router() as r:
726             with self.subnet() as s1:
727                 with self.subnet() as s2:
728                     self._set_net_external(s1['subnet']['network_id'])
729                     self._add_external_gateway_to_router(
730                         r['router']['id'],
731                         s1['subnet']['network_id'])
732                     body = self._show('routers', r['router']['id'])
733                     net_id = (body['router']
734                               ['external_gateway_info']['network_id'])
735                     self.assertEqual(net_id, s1['subnet']['network_id'])
736                     self._set_net_external(s2['subnet']['network_id'])
737                     self._add_external_gateway_to_router(
738                         r['router']['id'],
739                         s2['subnet']['network_id'])
740                     body = self._show('routers', r['router']['id'])
741                     net_id = (body['router']
742                               ['external_gateway_info']['network_id'])
743                     self.assertEqual(net_id, s2['subnet']['network_id'])
744                     # Validate that we can clear the gateway with
745                     # an empty dict, in any other case, we fall back
746                     # on None as default value
747                     self._remove_external_gateway_from_router(
748                         r['router']['id'],
749                         s2['subnet']['network_id'],
750                         external_gw_info={})
751
752     def test_router_update_gateway_with_external_ip_used_by_gw(self):
753         with self.router() as r:
754             with self.subnet() as s:
755                 self._set_net_external(s['subnet']['network_id'])
756                 self._add_external_gateway_to_router(
757                     r['router']['id'],
758                     s['subnet']['network_id'],
759                     ext_ips=[{'ip_address': s['subnet']['gateway_ip']}],
760                     expected_code=exc.HTTPBadRequest.code)
761
762     def test_router_update_gateway_with_invalid_external_ip(self):
763         with self.router() as r:
764             with self.subnet() as s:
765                 self._set_net_external(s['subnet']['network_id'])
766                 self._add_external_gateway_to_router(
767                     r['router']['id'],
768                     s['subnet']['network_id'],
769                     ext_ips=[{'ip_address': '99.99.99.99'}],
770                     expected_code=exc.HTTPBadRequest.code)
771
772     def test_router_update_gateway_with_invalid_external_subnet(self):
773         with self.subnet() as s1,\
774                 self.subnet(cidr='1.0.0.0/24') as s2,\
775                 self.router() as r:
776             self._set_net_external(s1['subnet']['network_id'])
777             self._add_external_gateway_to_router(
778                 r['router']['id'],
779                 s1['subnet']['network_id'],
780                 # this subnet is not on the same network so this should fail
781                 ext_ips=[{'subnet_id': s2['subnet']['id']}],
782                 expected_code=exc.HTTPBadRequest.code)
783
784     def test_router_update_gateway_with_different_external_subnet(self):
785         with self.network() as n:
786             with self.subnet(network=n) as s1,\
787                     self.subnet(network=n, cidr='1.0.0.0/24') as s2,\
788                     self.router() as r:
789                 self._set_net_external(n['network']['id'])
790                 res1 = self._add_external_gateway_to_router(
791                     r['router']['id'],
792                     n['network']['id'],
793                     ext_ips=[{'subnet_id': s1['subnet']['id']}])
794                 res2 = self._add_external_gateway_to_router(
795                     r['router']['id'],
796                     n['network']['id'],
797                     ext_ips=[{'subnet_id': s2['subnet']['id']}])
798         fip1 = res1['router']['external_gateway_info']['external_fixed_ips'][0]
799         fip2 = res2['router']['external_gateway_info']['external_fixed_ips'][0]
800         self.assertEqual(s1['subnet']['id'], fip1['subnet_id'])
801         self.assertEqual(s2['subnet']['id'], fip2['subnet_id'])
802         self.assertNotEqual(fip1['subnet_id'], fip2['subnet_id'])
803         self.assertNotEqual(fip1['ip_address'], fip2['ip_address'])
804
805     def test_router_update_gateway_with_existed_floatingip(self):
806         with self.subnet() as subnet:
807             self._set_net_external(subnet['subnet']['network_id'])
808             with self.floatingip_with_assoc() as fip:
809                 self._add_external_gateway_to_router(
810                     fip['floatingip']['router_id'],
811                     subnet['subnet']['network_id'],
812                     expected_code=exc.HTTPConflict.code)
813
814     def test_router_update_gateway_to_empty_with_existed_floatingip(self):
815         with self.floatingip_with_assoc() as fip:
816             self._remove_external_gateway_from_router(
817                 fip['floatingip']['router_id'], None,
818                 expected_code=exc.HTTPConflict.code)
819
820     def test_router_update_gateway_add_multiple_prefixes_ipv6(self):
821         with self.network() as n:
822             with self.subnet(network=n) as s1, \
823                 self.subnet(network=n, ip_version=6, cidr='2001:db8::/32') \
824                 as s2, (self.router()) as r:
825                 self._set_net_external(n['network']['id'])
826                 res1 = self._add_external_gateway_to_router(
827                         r['router']['id'],
828                         n['network']['id'],
829                         ext_ips=[{'subnet_id': s1['subnet']['id']}])
830                 fip1 = (res1['router']['external_gateway_info']
831                         ['external_fixed_ips'][0])
832                 self.assertEqual(s1['subnet']['id'], fip1['subnet_id'])
833                 res2 = self._add_external_gateway_to_router(
834                         r['router']['id'],
835                         n['network']['id'],
836                         ext_ips=[{'ip_address': fip1['ip_address'],
837                                   'subnet_id': s1['subnet']['id']},
838                                  {'subnet_id': s2['subnet']['id']}])
839                 self.assertEqual(fip1, res2['router']['external_gateway_info']
840                                            ['external_fixed_ips'][0])
841                 fip2 = (res2['router']['external_gateway_info']
842                         ['external_fixed_ips'][1])
843                 self.assertEqual(s2['subnet']['id'], fip2['subnet_id'])
844                 self.assertNotEqual(fip1['subnet_id'],
845                                     fip2['subnet_id'])
846                 self.assertNotEqual(fip1['ip_address'],
847                                     fip2['ip_address'])
848
849     def test_router_update_gateway_upon_subnet_create_ipv6(self):
850         with self.network() as n:
851             with self.subnet(network=n) as s1, self.router() as r:
852                 self._set_net_external(n['network']['id'])
853                 res1 = self._add_external_gateway_to_router(
854                           r['router']['id'],
855                           n['network']['id'],
856                           ext_ips=[{'subnet_id': s1['subnet']['id']}])
857                 fip1 = (res1['router']['external_gateway_info']
858                         ['external_fixed_ips'][0])
859                 sres = self._create_subnet(self.fmt, net_id=n['network']['id'],
860                                          ip_version=6, cidr='2001:db8::/32',
861                                          expected_res_status=(
862                                              exc.HTTPCreated.code))
863                 s2 = self.deserialize(self.fmt, sres)
864                 res2 = self._show('routers', r['router']['id'])
865                 self.assertEqual(fip1, res2['router']['external_gateway_info']
866                                            ['external_fixed_ips'][0])
867                 fip2 = (res2['router']['external_gateway_info']
868                         ['external_fixed_ips'][1])
869                 self.assertEqual(s2['subnet']['id'], fip2['subnet_id'])
870                 self.assertNotEqual(fip1['subnet_id'], fip2['subnet_id'])
871                 self.assertNotEqual(fip1['ip_address'], fip2['ip_address'])
872
873     def test_router_update_gateway_upon_subnet_create_max_ips_ipv6(self):
874         """Create subnet should not cause excess fixed IPs on router gw
875
876         If a router gateway port has the maximum of one IPv4 and one IPv6
877         fixed, create subnet should not add any more IP addresses to the port
878         (unless this is the subnet is a SLAAC/DHCPv6-stateless subnet in which
879         case the addresses are added automatically)
880
881         """
882         with self.router() as r, self.network() as n:
883             with self.subnet(cidr='10.0.0.0/24', network=n) as s1, (
884                     self.subnet(ip_version=6, cidr='2001:db8::/64',
885                         network=n)) as s2:
886                 self._set_net_external(n['network']['id'])
887                 self._add_external_gateway_to_router(
888                         r['router']['id'],
889                         n['network']['id'],
890                         ext_ips=[{'subnet_id': s1['subnet']['id']},
891                                  {'subnet_id': s2['subnet']['id']}],
892                         expected_code=exc.HTTPOk.code)
893                 res1 = self._show('routers', r['router']['id'])
894                 original_fips = (res1['router']['external_gateway_info']
895                                  ['external_fixed_ips'])
896                 # Add another IPv4 subnet - a fip SHOULD NOT be added
897                 # to the external gateway port as it already has a v4 address
898                 self._create_subnet(self.fmt, net_id=n['network']['id'],
899                                     cidr='10.0.1.0/24')
900                 res2 = self._show('routers', r['router']['id'])
901                 self.assertEqual(original_fips,
902                                  res2['router']['external_gateway_info']
903                                  ['external_fixed_ips'])
904                 # Add a SLAAC subnet - a fip from this subnet SHOULD be added
905                 # to the external gateway port
906                 s3 = self.deserialize(self.fmt,
907                         self._create_subnet(self.fmt,
908                             net_id=n['network']['id'],
909                             ip_version=6, cidr='2001:db8:1::/64',
910                             ipv6_ra_mode=l3_constants.IPV6_SLAAC,
911                             ipv6_address_mode=l3_constants.IPV6_SLAAC))
912                 res3 = self._show('routers', r['router']['id'])
913                 fips = (res3['router']['external_gateway_info']
914                         ['external_fixed_ips'])
915                 fip_subnet_ids = [fip['subnet_id'] for fip in fips]
916                 self.assertIn(s1['subnet']['id'], fip_subnet_ids)
917                 self.assertIn(s2['subnet']['id'], fip_subnet_ids)
918                 self.assertIn(s3['subnet']['id'], fip_subnet_ids)
919                 self._remove_external_gateway_from_router(
920                     r['router']['id'],
921                     n['network']['id'])
922
923     def _test_router_add_interface_subnet(self, router, subnet, msg=None):
924         exp_notifications = ['router.create.start',
925                              'router.create.end',
926                              'network.create.start',
927                              'network.create.end',
928                              'subnet.create.start',
929                              'subnet.create.end',
930                              'router.interface.create',
931                              'router.interface.delete']
932         body = self._router_interface_action('add',
933                                              router['router']['id'],
934                                              subnet['subnet']['id'],
935                                              None)
936         self.assertIn('port_id', body, msg)
937
938         # fetch port and confirm device_id
939         r_port_id = body['port_id']
940         port = self._show('ports', r_port_id)
941         self.assertEqual(port['port']['device_id'],
942                          router['router']['id'], msg)
943
944         self._router_interface_action('remove',
945                                       router['router']['id'],
946                                       subnet['subnet']['id'],
947                                       None)
948         self._show('ports', r_port_id,
949                    expected_code=exc.HTTPNotFound.code)
950
951         self.assertEqual(
952             set(exp_notifications),
953             set(n['event_type'] for n in fake_notifier.NOTIFICATIONS), msg)
954
955         for n in fake_notifier.NOTIFICATIONS:
956             if n['event_type'].startswith('router.interface.'):
957                 payload = n['payload']['router_interface']
958                 self.assertIn('id', payload)
959                 self.assertEqual(payload['id'], router['router']['id'])
960                 self.assertIn('tenant_id', payload)
961                 stid = subnet['subnet']['tenant_id']
962                 # tolerate subnet tenant deliberately set to '' in the
963                 # nsx metadata access case
964                 self.assertIn(payload['tenant_id'], [stid, ''], msg)
965
966     def test_router_add_interface_subnet(self):
967         fake_notifier.reset()
968         with self.router() as r:
969             with self.network() as n:
970                 with self.subnet(network=n) as s:
971                     self._test_router_add_interface_subnet(r, s)
972
973     def test_router_add_interface_ipv6_subnet(self):
974         """Test router-interface-add for valid ipv6 subnets.
975
976         Verify the valid use-cases of an IPv6 subnet where we
977         are allowed to associate to the Neutron Router are successful.
978         """
979         slaac = l3_constants.IPV6_SLAAC
980         stateful = l3_constants.DHCPV6_STATEFUL
981         stateless = l3_constants.DHCPV6_STATELESS
982         use_cases = [{'msg': 'IPv6 Subnet Modes (slaac, none)',
983                       'ra_mode': slaac, 'address_mode': None},
984                      {'msg': 'IPv6 Subnet Modes (none, none)',
985                       'ra_mode': None, 'address_mode': None},
986                      {'msg': 'IPv6 Subnet Modes (dhcpv6-stateful, none)',
987                       'ra_mode': stateful, 'address_mode': None},
988                      {'msg': 'IPv6 Subnet Modes (dhcpv6-stateless, none)',
989                       'ra_mode': stateless, 'address_mode': None},
990                      {'msg': 'IPv6 Subnet Modes (slaac, slaac)',
991                       'ra_mode': slaac, 'address_mode': slaac},
992                      {'msg': 'IPv6 Subnet Modes (dhcpv6-stateful,'
993                       'dhcpv6-stateful)', 'ra_mode': stateful,
994                       'address_mode': stateful},
995                      {'msg': 'IPv6 Subnet Modes (dhcpv6-stateless,'
996                       'dhcpv6-stateless)', 'ra_mode': stateless,
997                       'address_mode': stateless}]
998         for uc in use_cases:
999             fake_notifier.reset()
1000             with self.router() as r, self.network() as n:
1001                 with self.subnet(network=n, cidr='fd00::1/64',
1002                                  gateway_ip='fd00::1', ip_version=6,
1003                                  ipv6_ra_mode=uc['ra_mode'],
1004                                  ipv6_address_mode=uc['address_mode']) as s:
1005                     self._test_router_add_interface_subnet(r, s, uc['msg'])
1006
1007     def test_router_add_interface_multiple_ipv4_subnets(self):
1008         """Test router-interface-add for multiple ipv4 subnets.
1009
1010         Verify that adding multiple ipv4 subnets from the same network
1011         to a router places them all on different router interfaces.
1012         """
1013         with self.router() as r, self.network() as n:
1014             with self.subnet(network=n, cidr='10.0.0.0/24') as s1, (
1015                  self.subnet(network=n, cidr='10.0.1.0/24')) as s2:
1016                     body = self._router_interface_action('add',
1017                                                          r['router']['id'],
1018                                                          s1['subnet']['id'],
1019                                                          None)
1020                     pid1 = body['port_id']
1021                     body = self._router_interface_action('add',
1022                                                          r['router']['id'],
1023                                                          s2['subnet']['id'],
1024                                                          None)
1025                     pid2 = body['port_id']
1026                     self.assertNotEqual(pid1, pid2)
1027                     self._router_interface_action('remove', r['router']['id'],
1028                                                   s1['subnet']['id'], None)
1029                     self._router_interface_action('remove', r['router']['id'],
1030                                                   s2['subnet']['id'], None)
1031
1032     def test_router_add_interface_multiple_ipv6_subnets_same_net(self):
1033         """Test router-interface-add for multiple ipv6 subnets on a network.
1034
1035         Verify that adding multiple ipv6 subnets from the same network
1036         to a router places them all on the same router interface.
1037         """
1038         with self.router() as r, self.network() as n:
1039             with (self.subnet(network=n, cidr='fd00::1/64', ip_version=6)
1040                   ) as s1, self.subnet(network=n, cidr='fd01::1/64',
1041                                        ip_version=6) as s2:
1042                     body = self._router_interface_action('add',
1043                                                          r['router']['id'],
1044                                                          s1['subnet']['id'],
1045                                                          None)
1046                     pid1 = body['port_id']
1047                     body = self._router_interface_action('add',
1048                                                          r['router']['id'],
1049                                                          s2['subnet']['id'],
1050                                                          None)
1051                     pid2 = body['port_id']
1052                     self.assertEqual(pid1, pid2)
1053                     port = self._show('ports', pid1)
1054                     self.assertEqual(2, len(port['port']['fixed_ips']))
1055                     port_subnet_ids = [fip['subnet_id'] for fip in
1056                                        port['port']['fixed_ips']]
1057                     self.assertIn(s1['subnet']['id'], port_subnet_ids)
1058                     self.assertIn(s2['subnet']['id'], port_subnet_ids)
1059                     self._router_interface_action('remove', r['router']['id'],
1060                                                   s1['subnet']['id'], None)
1061                     self._router_interface_action('remove', r['router']['id'],
1062                                                   s2['subnet']['id'], None)
1063
1064     def test_router_add_interface_multiple_ipv6_subnets_different_net(self):
1065         """Test router-interface-add for ipv6 subnets on different networks.
1066
1067         Verify that adding multiple ipv6 subnets from different networks
1068         to a router places them on different router interfaces.
1069         """
1070         with self.router() as r, self.network() as n1, self.network() as n2:
1071             with (self.subnet(network=n1, cidr='fd00::1/64', ip_version=6)
1072                   ) as s1, self.subnet(network=n2, cidr='fd01::1/64',
1073                                        ip_version=6) as s2:
1074                     body = self._router_interface_action('add',
1075                                                          r['router']['id'],
1076                                                          s1['subnet']['id'],
1077                                                          None)
1078                     pid1 = body['port_id']
1079                     body = self._router_interface_action('add',
1080                                                          r['router']['id'],
1081                                                          s2['subnet']['id'],
1082                                                          None)
1083                     pid2 = body['port_id']
1084                     self.assertNotEqual(pid1, pid2)
1085                     self._router_interface_action('remove', r['router']['id'],
1086                                                   s1['subnet']['id'], None)
1087                     self._router_interface_action('remove', r['router']['id'],
1088                                                   s2['subnet']['id'], None)
1089
1090     def test_router_add_iface_ipv6_ext_ra_subnet_returns_400(self):
1091         """Test router-interface-add for in-valid ipv6 subnets.
1092
1093         Verify that an appropriate error message is displayed when
1094         an IPv6 subnet configured to use an external_router for Router
1095         Advertisements (i.e., ipv6_ra_mode is None and ipv6_address_mode
1096         is not None) is attempted to associate with a Neutron Router.
1097         """
1098         use_cases = [{'msg': 'IPv6 Subnet Modes (none, slaac)',
1099                       'ra_mode': None,
1100                       'address_mode': l3_constants.IPV6_SLAAC},
1101                      {'msg': 'IPv6 Subnet Modes (none, dhcpv6-stateful)',
1102                       'ra_mode': None,
1103                       'address_mode': l3_constants.DHCPV6_STATEFUL},
1104                      {'msg': 'IPv6 Subnet Modes (none, dhcpv6-stateless)',
1105                       'ra_mode': None,
1106                       'address_mode': l3_constants.DHCPV6_STATELESS}]
1107         for uc in use_cases:
1108             with self.router() as r, self.network() as n:
1109                 with self.subnet(network=n, cidr='fd00::1/64',
1110                                  gateway_ip='fd00::1', ip_version=6,
1111                                  ipv6_ra_mode=uc['ra_mode'],
1112                                  ipv6_address_mode=uc['address_mode']) as s:
1113                     exp_code = exc.HTTPBadRequest.code
1114                     self._router_interface_action('add',
1115                                                   r['router']['id'],
1116                                                   s['subnet']['id'],
1117                                                   None,
1118                                                   expected_code=exp_code,
1119                                                   msg=uc['msg'])
1120
1121     def test_router_add_interface_ipv6_subnet_without_gateway_ip(self):
1122         with self.router() as r:
1123             with self.subnet(ip_version=6, cidr='fe80::/64',
1124                              gateway_ip=None) as s:
1125                 error_code = exc.HTTPBadRequest.code
1126                 self._router_interface_action('add',
1127                                               r['router']['id'],
1128                                               s['subnet']['id'],
1129                                               None,
1130                                               expected_code=error_code)
1131
1132     def test_router_add_interface_subnet_with_bad_tenant_returns_404(self):
1133         tenant_id = _uuid()
1134         with self.router(tenant_id=tenant_id, set_context=True) as r:
1135             with self.network(tenant_id=tenant_id, set_context=True) as n:
1136                 with self.subnet(network=n, set_context=True) as s:
1137                     err_code = exc.HTTPNotFound.code
1138                     self._router_interface_action('add',
1139                                                   r['router']['id'],
1140                                                   s['subnet']['id'],
1141                                                   None,
1142                                                   expected_code=err_code,
1143                                                   tenant_id='bad_tenant')
1144                     body = self._router_interface_action('add',
1145                                                          r['router']['id'],
1146                                                          s['subnet']['id'],
1147                                                          None)
1148                     self.assertIn('port_id', body)
1149                     self._router_interface_action('remove',
1150                                                   r['router']['id'],
1151                                                   s['subnet']['id'],
1152                                                   None,
1153                                                   expected_code=err_code,
1154                                                   tenant_id='bad_tenant')
1155
1156     def test_router_add_interface_subnet_with_port_from_other_tenant(self):
1157         tenant_id = _uuid()
1158         other_tenant_id = _uuid()
1159         with self.router(tenant_id=tenant_id) as r,\
1160                 self.network(tenant_id=tenant_id) as n1,\
1161                 self.network(tenant_id=other_tenant_id) as n2:
1162             with self.subnet(network=n1, cidr='10.0.0.0/24') as s1,\
1163                     self.subnet(network=n2, cidr='10.1.0.0/24') as s2:
1164                 body = self._router_interface_action(
1165                     'add',
1166                     r['router']['id'],
1167                     s2['subnet']['id'],
1168                     None)
1169                 self.assertIn('port_id', body)
1170                 self._router_interface_action(
1171                     'add',
1172                     r['router']['id'],
1173                     s1['subnet']['id'],
1174                     None,
1175                     tenant_id=tenant_id)
1176                 self.assertIn('port_id', body)
1177
1178     def test_router_add_interface_port(self):
1179         orig_update_port = self.plugin.update_port
1180         with self.router() as r, (
1181             self.port()) as p, (
1182                 mock.patch.object(self.plugin, 'update_port')) as update_port:
1183             update_port.side_effect = orig_update_port
1184             body = self._router_interface_action('add',
1185                                                  r['router']['id'],
1186                                                  None,
1187                                                  p['port']['id'])
1188             self.assertIn('port_id', body)
1189             self.assertEqual(p['port']['id'], body['port_id'])
1190             expected_port_update = {
1191                 'device_owner': l3_constants.DEVICE_OWNER_ROUTER_INTF,
1192                 'device_id': r['router']['id']}
1193             update_port.assert_called_with(
1194                 mock.ANY, p['port']['id'], {'port': expected_port_update})
1195             # fetch port and confirm device_id
1196             body = self._show('ports', p['port']['id'])
1197             self.assertEqual(r['router']['id'], body['port']['device_id'])
1198
1199             # clean-up
1200             self._router_interface_action('remove',
1201                                           r['router']['id'],
1202                                           None,
1203                                           p['port']['id'])
1204
1205     def test_router_add_interface_multiple_ipv4_subnet_port_returns_400(self):
1206         """Test adding router port with multiple IPv4 subnets fails.
1207
1208         Multiple IPv4 subnets are not allowed on a single router port.
1209         Ensure that adding a port with multiple IPv4 subnets to a router fails.
1210         """
1211         with self.network() as n, self.router() as r:
1212             with self.subnet(network=n, cidr='10.0.0.0/24') as s1, (
1213                  self.subnet(network=n, cidr='10.0.1.0/24')) as s2:
1214                 fixed_ips = [{'subnet_id': s1['subnet']['id']},
1215                              {'subnet_id': s2['subnet']['id']}]
1216                 with self.port(subnet=s1, fixed_ips=fixed_ips) as p:
1217                     exp_code = exc.HTTPBadRequest.code
1218                     self._router_interface_action('add',
1219                                                   r['router']['id'],
1220                                                   None,
1221                                                   p['port']['id'],
1222                                                   expected_code=exp_code)
1223
1224     def test_router_add_interface_ipv6_port_existing_network_returns_400(self):
1225         """Ensure unique IPv6 router ports per network id.
1226
1227         Adding a router port containing one or more IPv6 subnets with the same
1228         network id as an existing router port should fail. This is so
1229         there is no ambiguity regarding on which port to add an IPv6 subnet
1230         when executing router-interface-add with a subnet and no port.
1231         """
1232         with self.network() as n, self.router() as r:
1233             with self.subnet(network=n, cidr='fd00::/64',
1234                              ip_version=6) as s1, (
1235                  self.subnet(network=n, cidr='fd01::/64',
1236                              ip_version=6)) as s2:
1237                 with self.port(subnet=s1) as p:
1238                     self._router_interface_action('add',
1239                                                   r['router']['id'],
1240                                                   s2['subnet']['id'],
1241                                                   None)
1242                     exp_code = exc.HTTPBadRequest.code
1243                     self._router_interface_action('add',
1244                                                   r['router']['id'],
1245                                                   None,
1246                                                   p['port']['id'],
1247                                                   expected_code=exp_code)
1248                     self._router_interface_action('remove',
1249                                                   r['router']['id'],
1250                                                   s2['subnet']['id'],
1251                                                   None)
1252
1253     def test_router_add_interface_multiple_ipv6_subnet_port(self):
1254         """A port with multiple IPv6 subnets can be added to a router
1255
1256         Create a port with multiple associated IPv6 subnets and attach
1257         it to a router. The action should succeed.
1258         """
1259         with self.network() as n, self.router() as r:
1260             with self.subnet(network=n, cidr='fd00::/64',
1261                              ip_version=6) as s1, (
1262                  self.subnet(network=n, cidr='fd01::/64',
1263                              ip_version=6)) as s2:
1264                 fixed_ips = [{'subnet_id': s1['subnet']['id']},
1265                              {'subnet_id': s2['subnet']['id']}]
1266                 with self.port(subnet=s1, fixed_ips=fixed_ips) as p:
1267                     self._router_interface_action('add',
1268                                                   r['router']['id'],
1269                                                   None,
1270                                                   p['port']['id'])
1271                     self._router_interface_action('remove',
1272                                                   r['router']['id'],
1273                                                   None,
1274                                                   p['port']['id'])
1275
1276     def test_router_add_interface_empty_port_and_subnet_ids(self):
1277         with self.router() as r:
1278             self._router_interface_action('add', r['router']['id'],
1279                                           None, None,
1280                                           expected_code=exc.
1281                                           HTTPBadRequest.code)
1282
1283     def test_router_add_interface_port_bad_tenant_returns_404(self):
1284         tenant_id = _uuid()
1285         with self.router(tenant_id=tenant_id, set_context=True) as r:
1286             with self.network(tenant_id=tenant_id, set_context=True) as n:
1287                 with self.subnet(tenant_id=tenant_id, network=n,
1288                                  set_context=True) as s:
1289                     with self.port(tenant_id=tenant_id, subnet=s,
1290                                    set_context=True) as p:
1291                         err_code = exc.HTTPNotFound.code
1292                         self._router_interface_action('add',
1293                                                     r['router']['id'],
1294                                                     None,
1295                                                     p['port']['id'],
1296                                                     expected_code=err_code,
1297                                                     tenant_id='bad_tenant')
1298                         self._router_interface_action('add',
1299                                                     r['router']['id'],
1300                                                     None,
1301                                                     p['port']['id'],
1302                                                     tenant_id=tenant_id)
1303
1304                         # clean-up should fail as well
1305                         self._router_interface_action('remove',
1306                                                     r['router']['id'],
1307                                                     None,
1308                                                     p['port']['id'],
1309                                                     expected_code=err_code,
1310                                                     tenant_id='bad_tenant')
1311
1312     def test_router_add_interface_port_without_ips(self):
1313         with self.network() as network, self.router() as r:
1314             # Create a router port without ips
1315             p = self._make_port(self.fmt, network['network']['id'],
1316                 device_owner=l3_constants.DEVICE_OWNER_ROUTER_INTF)
1317             err_code = exc.HTTPBadRequest.code
1318             self._router_interface_action('add',
1319                                           r['router']['id'],
1320                                           None,
1321                                           p['port']['id'],
1322                                           expected_code=err_code)
1323
1324     def test_router_add_interface_dup_subnet1_returns_400(self):
1325         with self.router() as r:
1326             with self.subnet() as s:
1327                 self._router_interface_action('add',
1328                                               r['router']['id'],
1329                                               s['subnet']['id'],
1330                                               None)
1331                 self._router_interface_action('add',
1332                                               r['router']['id'],
1333                                               s['subnet']['id'],
1334                                               None,
1335                                               expected_code=exc.
1336                                               HTTPBadRequest.code)
1337
1338     def test_router_add_interface_dup_subnet2_returns_400(self):
1339         with self.router() as r:
1340             with self.subnet() as s:
1341                 with self.port(subnet=s) as p1:
1342                     with self.port(subnet=s) as p2:
1343                         self._router_interface_action('add',
1344                                                       r['router']['id'],
1345                                                       None,
1346                                                       p1['port']['id'])
1347                         self._router_interface_action('add',
1348                                                       r['router']['id'],
1349                                                       None,
1350                                                       p2['port']['id'],
1351                                                       expected_code=exc.
1352                                                       HTTPBadRequest.code)
1353
1354     def test_router_add_interface_overlapped_cidr_returns_400(self):
1355         with self.router() as r:
1356             with self.subnet(cidr='10.0.1.0/24') as s1:
1357                 self._router_interface_action('add',
1358                                               r['router']['id'],
1359                                               s1['subnet']['id'],
1360                                               None)
1361
1362                 def try_overlapped_cidr(cidr):
1363                     with self.subnet(cidr=cidr) as s2:
1364                         self._router_interface_action('add',
1365                                                       r['router']['id'],
1366                                                       s2['subnet']['id'],
1367                                                       None,
1368                                                       expected_code=exc.
1369                                                       HTTPBadRequest.code)
1370                 # another subnet with same cidr
1371                 try_overlapped_cidr('10.0.1.0/24')
1372                 # another subnet with overlapped cidr including s1
1373                 try_overlapped_cidr('10.0.0.0/16')
1374
1375     def test_router_add_interface_no_data_returns_400(self):
1376         with self.router() as r:
1377             self._router_interface_action('add',
1378                                           r['router']['id'],
1379                                           None,
1380                                           None,
1381                                           expected_code=exc.
1382                                           HTTPBadRequest.code)
1383
1384     def test_router_add_interface_with_both_ids_returns_400(self):
1385         with self.router() as r:
1386             with self.subnet() as s:
1387                 with self.port(subnet=s) as p:
1388                     self._router_interface_action('add',
1389                                                   r['router']['id'],
1390                                                   s['subnet']['id'],
1391                                                   p['port']['id'],
1392                                                   expected_code=exc.
1393                                                   HTTPBadRequest.code)
1394
1395     def test_router_add_gateway_dup_subnet1_returns_400(self):
1396         with self.router() as r:
1397             with self.subnet() as s:
1398                 self._router_interface_action('add',
1399                                               r['router']['id'],
1400                                               s['subnet']['id'],
1401                                               None)
1402                 self._set_net_external(s['subnet']['network_id'])
1403                 self._add_external_gateway_to_router(
1404                     r['router']['id'],
1405                     s['subnet']['network_id'],
1406                     expected_code=exc.HTTPBadRequest.code)
1407
1408     def test_router_add_gateway_dup_subnet2_returns_400(self):
1409         with self.router() as r:
1410             with self.subnet() as s:
1411                 self._set_net_external(s['subnet']['network_id'])
1412                 self._add_external_gateway_to_router(
1413                     r['router']['id'],
1414                     s['subnet']['network_id'])
1415                 self._router_interface_action('add',
1416                                               r['router']['id'],
1417                                               s['subnet']['id'],
1418                                               None,
1419                                               expected_code=exc.
1420                                               HTTPBadRequest.code)
1421
1422     def test_router_add_gateway_multiple_subnets_ipv6(self):
1423         """Ensure external gateway set doesn't add excess IPs on router gw
1424
1425         Setting the gateway of a router to an external network with more than
1426         one IPv4 and one IPv6 subnet should only add an address from the first
1427         IPv4 subnet, an address from the first IPv6-stateful subnet, and an
1428         address from each IPv6-stateless (SLAAC and DHCPv6-stateless) subnet
1429
1430         """
1431         with self.router() as r, self.network() as n:
1432             with self.subnet(
1433                     cidr='10.0.0.0/24', network=n) as s1, (
1434                  self.subnet(
1435                     cidr='10.0.1.0/24', network=n)) as s2, (
1436                  self.subnet(
1437                     cidr='2001:db8::/64', network=n,
1438                     ip_version=6,
1439                     ipv6_ra_mode=l3_constants.IPV6_SLAAC,
1440                     ipv6_address_mode=l3_constants.IPV6_SLAAC)) as s3, (
1441                  self.subnet(
1442                     cidr='2001:db8:1::/64', network=n,
1443                     ip_version=6,
1444                     ipv6_ra_mode=l3_constants.DHCPV6_STATEFUL,
1445                     ipv6_address_mode=l3_constants.DHCPV6_STATEFUL)) as s4, (
1446                  self.subnet(
1447                     cidr='2001:db8:2::/64', network=n,
1448                     ip_version=6,
1449                     ipv6_ra_mode=l3_constants.DHCPV6_STATELESS,
1450                     ipv6_address_mode=l3_constants.DHCPV6_STATELESS)) as s5:
1451                 self._set_net_external(n['network']['id'])
1452                 self._add_external_gateway_to_router(
1453                         r['router']['id'],
1454                         n['network']['id'])
1455                 res = self._show('routers', r['router']['id'])
1456                 fips = (res['router']['external_gateway_info']
1457                         ['external_fixed_ips'])
1458                 fip_subnet_ids = {fip['subnet_id'] for fip in fips}
1459                 # one of s1 or s2 should be in the list.
1460                 if s1['subnet']['id'] in fip_subnet_ids:
1461                     self.assertEqual({s1['subnet']['id'],
1462                                       s3['subnet']['id'],
1463                                       s4['subnet']['id'],
1464                                       s5['subnet']['id']},
1465                                      fip_subnet_ids)
1466                 else:
1467                     self.assertEqual({s2['subnet']['id'],
1468                                       s3['subnet']['id'],
1469                                       s4['subnet']['id'],
1470                                       s5['subnet']['id']},
1471                                      fip_subnet_ids)
1472                 self._remove_external_gateway_from_router(
1473                     r['router']['id'],
1474                     n['network']['id'])
1475
1476     def test_router_add_and_remove_gateway(self):
1477         with self.router() as r:
1478             with self.subnet() as s:
1479                 self._set_net_external(s['subnet']['network_id'])
1480                 self._add_external_gateway_to_router(
1481                     r['router']['id'],
1482                     s['subnet']['network_id'])
1483                 body = self._show('routers', r['router']['id'])
1484                 net_id = body['router']['external_gateway_info']['network_id']
1485                 self.assertEqual(net_id, s['subnet']['network_id'])
1486                 self._remove_external_gateway_from_router(
1487                     r['router']['id'],
1488                     s['subnet']['network_id'])
1489                 body = self._show('routers', r['router']['id'])
1490                 gw_info = body['router']['external_gateway_info']
1491                 self.assertIsNone(gw_info)
1492
1493     def test_router_add_and_remove_gateway_tenant_ctx(self):
1494         with self.router(tenant_id='noadmin',
1495                          set_context=True) as r:
1496             with self.subnet() as s:
1497                 self._set_net_external(s['subnet']['network_id'])
1498                 ctx = context.Context('', 'noadmin')
1499                 self._add_external_gateway_to_router(
1500                     r['router']['id'],
1501                     s['subnet']['network_id'],
1502                     neutron_context=ctx)
1503                 body = self._show('routers', r['router']['id'])
1504                 net_id = body['router']['external_gateway_info']['network_id']
1505                 self.assertEqual(net_id, s['subnet']['network_id'])
1506                 self._remove_external_gateway_from_router(
1507                     r['router']['id'],
1508                     s['subnet']['network_id'])
1509                 body = self._show('routers', r['router']['id'])
1510                 gw_info = body['router']['external_gateway_info']
1511                 self.assertIsNone(gw_info)
1512
1513     def test_create_router_port_with_device_id_of_other_teants_router(self):
1514         with self.router() as admin_router:
1515             with self.network(tenant_id='tenant_a',
1516                               set_context=True) as n:
1517                 with self.subnet(network=n):
1518                     for device_owner in l3_constants.ROUTER_INTERFACE_OWNERS:
1519                         self._create_port(
1520                             self.fmt, n['network']['id'],
1521                             tenant_id='tenant_a',
1522                             device_id=admin_router['router']['id'],
1523                             device_owner=device_owner,
1524                             set_context=True,
1525                             expected_res_status=exc.HTTPConflict.code)
1526
1527     def test_create_non_router_port_device_id_of_other_teants_router_update(
1528         self):
1529         # This tests that HTTPConflict is raised if we create a non-router
1530         # port that matches the device_id of another tenants router and then
1531         # we change the device_owner to be network:router_interface.
1532         with self.router() as admin_router:
1533             with self.network(tenant_id='tenant_a',
1534                               set_context=True) as n:
1535                 with self.subnet(network=n):
1536                     for device_owner in l3_constants.ROUTER_INTERFACE_OWNERS:
1537                         port_res = self._create_port(
1538                             self.fmt, n['network']['id'],
1539                             tenant_id='tenant_a',
1540                             device_id=admin_router['router']['id'],
1541                             set_context=True)
1542                         port = self.deserialize(self.fmt, port_res)
1543                         neutron_context = context.Context('', 'tenant_a')
1544                         data = {'port': {'device_owner': device_owner}}
1545                         self._update('ports', port['port']['id'], data,
1546                                      neutron_context=neutron_context,
1547                                      expected_code=exc.HTTPConflict.code)
1548
1549     def test_update_port_device_id_to_different_tenants_router(self):
1550         with self.router() as admin_router:
1551             with self.router(tenant_id='tenant_a',
1552                              set_context=True) as tenant_router:
1553                 with self.network(tenant_id='tenant_a',
1554                                   set_context=True) as n:
1555                     with self.subnet(network=n) as s:
1556                         port = self._router_interface_action(
1557                             'add', tenant_router['router']['id'],
1558                             s['subnet']['id'], None, tenant_id='tenant_a')
1559                         neutron_context = context.Context('', 'tenant_a')
1560                         data = {'port':
1561                                 {'device_id': admin_router['router']['id']}}
1562                         self._update('ports', port['port_id'], data,
1563                                      neutron_context=neutron_context,
1564                                      expected_code=exc.HTTPConflict.code)
1565
1566     def test_router_add_gateway_invalid_network_returns_400(self):
1567         with self.router() as r:
1568             self._add_external_gateway_to_router(
1569                 r['router']['id'],
1570                 "foobar", expected_code=exc.HTTPBadRequest.code)
1571
1572     def test_router_add_gateway_non_existent_network_returns_404(self):
1573         with self.router() as r:
1574             self._add_external_gateway_to_router(
1575                 r['router']['id'],
1576                 _uuid(), expected_code=exc.HTTPNotFound.code)
1577
1578     def test_router_add_gateway_net_not_external_returns_400(self):
1579         with self.router() as r:
1580             with self.subnet() as s:
1581                 # intentionally do not set net as external
1582                 self._add_external_gateway_to_router(
1583                     r['router']['id'],
1584                     s['subnet']['network_id'],
1585                     expected_code=exc.HTTPBadRequest.code)
1586
1587     def test_router_add_gateway_no_subnet(self):
1588         with self.router() as r:
1589             with self.network() as n:
1590                 self._set_net_external(n['network']['id'])
1591                 self._add_external_gateway_to_router(
1592                     r['router']['id'],
1593                     n['network']['id'])
1594                 body = self._show('routers', r['router']['id'])
1595                 net_id = body['router']['external_gateway_info']['network_id']
1596                 self.assertEqual(net_id, n['network']['id'])
1597                 self._remove_external_gateway_from_router(
1598                     r['router']['id'],
1599                     n['network']['id'])
1600                 body = self._show('routers', r['router']['id'])
1601                 gw_info = body['router']['external_gateway_info']
1602                 self.assertIsNone(gw_info)
1603
1604     def test_router_remove_interface_inuse_returns_409(self):
1605         with self.router() as r:
1606             with self.subnet() as s:
1607                 self._router_interface_action('add',
1608                                               r['router']['id'],
1609                                               s['subnet']['id'],
1610                                               None)
1611                 self._delete('routers', r['router']['id'],
1612                              expected_code=exc.HTTPConflict.code)
1613
1614     def test_router_remove_interface_callback_failure_returns_409(self):
1615         with self.router() as r,\
1616                 self.subnet() as s,\
1617                 mock.patch.object(registry, 'notify') as notify:
1618             errors = [
1619                 exceptions.NotificationError(
1620                     'foo_callback_id', n_exc.InUse()),
1621             ]
1622             # we fail the first time, but not the second, when
1623             # the clean-up takes place
1624             notify.side_effect = [
1625                 exceptions.CallbackFailure(errors=errors), None
1626             ]
1627             self._router_interface_action('add',
1628                                           r['router']['id'],
1629                                           s['subnet']['id'],
1630                                           None)
1631             self._router_interface_action(
1632                 'remove',
1633                 r['router']['id'],
1634                 s['subnet']['id'],
1635                 None,
1636                 exc.HTTPConflict.code)
1637
1638     def test_router_clear_gateway_callback_failure_returns_409(self):
1639         with self.router() as r,\
1640                 self.subnet() as s,\
1641                 mock.patch.object(registry, 'notify') as notify:
1642             errors = [
1643                 exceptions.NotificationError(
1644                     'foo_callback_id', n_exc.InUse()),
1645             ]
1646             notify.side_effect = exceptions.CallbackFailure(errors=errors)
1647             self._set_net_external(s['subnet']['network_id'])
1648             self._add_external_gateway_to_router(
1649                     r['router']['id'],
1650                     s['subnet']['network_id'])
1651             self._remove_external_gateway_from_router(
1652                 r['router']['id'],
1653                 s['subnet']['network_id'],
1654                 external_gw_info={},
1655                 expected_code=exc.HTTPConflict.code)
1656
1657     def test_router_remove_interface_wrong_subnet_returns_400(self):
1658         with self.router() as r:
1659             with self.subnet() as s:
1660                 with self.port() as p:
1661                     self._router_interface_action('add',
1662                                                   r['router']['id'],
1663                                                   None,
1664                                                   p['port']['id'])
1665                     self._router_interface_action('remove',
1666                                                   r['router']['id'],
1667                                                   s['subnet']['id'],
1668                                                   p['port']['id'],
1669                                                   exc.HTTPBadRequest.code)
1670
1671     def test_router_remove_interface_nothing_returns_400(self):
1672         with self.router() as r:
1673             with self.subnet() as s:
1674                 with self.port(subnet=s) as p:
1675                     self._router_interface_action('add',
1676                                                   r['router']['id'],
1677                                                   None,
1678                                                   p['port']['id'])
1679                     self._router_interface_action('remove',
1680                                                   r['router']['id'],
1681                                                   None,
1682                                                   None,
1683                                                   exc.HTTPBadRequest.code)
1684                     #remove properly to clean-up
1685                     self._router_interface_action('remove',
1686                                                   r['router']['id'],
1687                                                   None,
1688                                                   p['port']['id'])
1689
1690     def test_router_remove_interface_returns_200(self):
1691         with self.router() as r:
1692             with self.port() as p:
1693                 body = self._router_interface_action('add',
1694                                                      r['router']['id'],
1695                                                      None,
1696                                                      p['port']['id'])
1697                 self._router_interface_action('remove',
1698                                               r['router']['id'],
1699                                               None,
1700                                               p['port']['id'],
1701                                               expected_body=body)
1702
1703     def test_router_remove_interface_with_both_ids_returns_200(self):
1704         with self.router() as r:
1705             with self.subnet() as s:
1706                 with self.port(subnet=s) as p:
1707                     self._router_interface_action('add',
1708                                                   r['router']['id'],
1709                                                   None,
1710                                                   p['port']['id'])
1711                     self._router_interface_action('remove',
1712                                                   r['router']['id'],
1713                                                   s['subnet']['id'],
1714                                                   p['port']['id'])
1715
1716     def test_router_remove_interface_wrong_port_returns_404(self):
1717         with self.router() as r:
1718             with self.subnet():
1719                 with self.port() as p:
1720                     self._router_interface_action('add',
1721                                                   r['router']['id'],
1722                                                   None,
1723                                                   p['port']['id'])
1724                     # create another port for testing failure case
1725                     res = self._create_port(self.fmt, p['port']['network_id'])
1726                     p2 = self.deserialize(self.fmt, res)
1727                     self._router_interface_action('remove',
1728                                                   r['router']['id'],
1729                                                   None,
1730                                                   p2['port']['id'],
1731                                                   exc.HTTPNotFound.code)
1732
1733     def test_router_remove_ipv6_subnet_from_interface(self):
1734         """Delete a subnet from a router interface
1735
1736         Verify that deleting a subnet with router-interface-delete removes
1737         that subnet when there are multiple subnets on the interface and
1738         removes the interface when it is the last subnet on the interface.
1739         """
1740         with self.router() as r, self.network() as n:
1741             with (self.subnet(network=n, cidr='fd00::1/64', ip_version=6)
1742                   ) as s1, self.subnet(network=n, cidr='fd01::1/64',
1743                                        ip_version=6) as s2:
1744                 body = self._router_interface_action('add', r['router']['id'],
1745                                                      s1['subnet']['id'],
1746                                                      None)
1747                 self._router_interface_action('add', r['router']['id'],
1748                                               s2['subnet']['id'], None)
1749                 port = self._show('ports', body['port_id'])
1750                 self.assertEqual(2, len(port['port']['fixed_ips']))
1751                 self._router_interface_action('remove', r['router']['id'],
1752                                               s1['subnet']['id'], None)
1753                 port = self._show('ports', body['port_id'])
1754                 self.assertEqual(1, len(port['port']['fixed_ips']))
1755                 self._router_interface_action('remove', r['router']['id'],
1756                                               s2['subnet']['id'], None)
1757                 exp_code = exc.HTTPNotFound.code
1758                 port = self._show('ports', body['port_id'],
1759                                   expected_code=exp_code)
1760
1761     def test_router_delete(self):
1762         with self.router() as router:
1763             router_id = router['router']['id']
1764         req = self.new_show_request('router', router_id)
1765         res = req.get_response(self._api_for_resource('router'))
1766         self.assertEqual(res.status_int, 404)
1767
1768     def test_router_delete_with_port_existed_returns_409(self):
1769         with self.subnet() as subnet:
1770             res = self._create_router(self.fmt, _uuid())
1771             router = self.deserialize(self.fmt, res)
1772             self._router_interface_action('add',
1773                                           router['router']['id'],
1774                                           subnet['subnet']['id'],
1775                                           None)
1776             self._delete('routers', router['router']['id'],
1777                          exc.HTTPConflict.code)
1778
1779     def test_router_delete_with_floatingip_existed_returns_409(self):
1780         with self.port() as p:
1781             private_sub = {'subnet': {'id':
1782                                       p['port']['fixed_ips'][0]['subnet_id']}}
1783             with self.subnet(cidr='12.0.0.0/24') as public_sub:
1784                 self._set_net_external(public_sub['subnet']['network_id'])
1785                 res = self._create_router(self.fmt, _uuid())
1786                 r = self.deserialize(self.fmt, res)
1787                 self._add_external_gateway_to_router(
1788                     r['router']['id'],
1789                     public_sub['subnet']['network_id'])
1790                 self._router_interface_action('add', r['router']['id'],
1791                                               private_sub['subnet']['id'],
1792                                               None)
1793                 res = self._create_floatingip(
1794                     self.fmt, public_sub['subnet']['network_id'],
1795                     port_id=p['port']['id'])
1796                 self.assertEqual(res.status_int, exc.HTTPCreated.code)
1797                 self._delete('routers', r['router']['id'],
1798                              expected_code=exc.HTTPConflict.code)
1799
1800     def test_router_show(self):
1801         name = 'router1'
1802         tenant_id = _uuid()
1803         expected_value = [('name', name), ('tenant_id', tenant_id),
1804                           ('admin_state_up', True), ('status', 'ACTIVE'),
1805                           ('external_gateway_info', None)]
1806         with self.router(name='router1', admin_state_up=True,
1807                          tenant_id=tenant_id) as router:
1808             res = self._show('routers', router['router']['id'])
1809             for k, v in expected_value:
1810                 self.assertEqual(res['router'][k], v)
1811
1812     def test_network_update_external_failure(self):
1813         with self.router() as r:
1814             with self.subnet() as s1:
1815                 self._set_net_external(s1['subnet']['network_id'])
1816                 self._add_external_gateway_to_router(
1817                     r['router']['id'],
1818                     s1['subnet']['network_id'])
1819                 self._update('networks', s1['subnet']['network_id'],
1820                              {'network': {external_net.EXTERNAL: False}},
1821                              expected_code=exc.HTTPConflict.code)
1822
1823     def test_network_update_external(self):
1824         with self.router() as r:
1825             with self.network('test_net') as testnet:
1826                 self._set_net_external(testnet['network']['id'])
1827                 with self.subnet() as s1:
1828                     self._set_net_external(s1['subnet']['network_id'])
1829                     self._add_external_gateway_to_router(
1830                         r['router']['id'],
1831                         s1['subnet']['network_id'])
1832                     self._update('networks', testnet['network']['id'],
1833                                  {'network': {external_net.EXTERNAL: False}})
1834
1835     def test_floatingip_crd_ops(self):
1836         with self.floatingip_with_assoc() as fip:
1837             self._validate_floating_ip(fip)
1838
1839         # post-delete, check that it is really gone
1840         body = self._list('floatingips')
1841         self.assertEqual(len(body['floatingips']), 0)
1842
1843         self._show('floatingips', fip['floatingip']['id'],
1844                    expected_code=exc.HTTPNotFound.code)
1845
1846     def _test_floatingip_with_assoc_fails(self, plugin_method):
1847         with self.subnet(cidr='200.0.0.0/24') as public_sub:
1848             self._set_net_external(public_sub['subnet']['network_id'])
1849             with self.port() as private_port:
1850                 with self.router() as r:
1851                     sid = private_port['port']['fixed_ips'][0]['subnet_id']
1852                     private_sub = {'subnet': {'id': sid}}
1853                     self._add_external_gateway_to_router(
1854                         r['router']['id'],
1855                         public_sub['subnet']['network_id'])
1856                     self._router_interface_action('add', r['router']['id'],
1857                                                   private_sub['subnet']['id'],
1858                                                   None)
1859                     with mock.patch(plugin_method) as pl:
1860                         pl.side_effect = n_exc.BadRequest(
1861                             resource='floatingip',
1862                             msg='fake_error')
1863                         res = self._create_floatingip(
1864                             self.fmt,
1865                             public_sub['subnet']['network_id'],
1866                             port_id=private_port['port']['id'])
1867                         self.assertEqual(res.status_int, 400)
1868                     for p in self._list('ports')['ports']:
1869                         if (p['device_owner'] ==
1870                             l3_constants.DEVICE_OWNER_FLOATINGIP):
1871                             self.fail('garbage port is not deleted')
1872
1873     def test_floatingip_with_assoc_fails(self):
1874         self._test_floatingip_with_assoc_fails(
1875             'neutron.db.l3_db.L3_NAT_db_mixin._check_and_get_fip_assoc')
1876
1877     def test_create_floatingip_with_assoc(
1878         self, expected_status=l3_constants.FLOATINGIP_STATUS_ACTIVE):
1879         with self.floatingip_with_assoc() as fip:
1880             body = self._show('floatingips', fip['floatingip']['id'])
1881             self.assertEqual(body['floatingip']['id'],
1882                              fip['floatingip']['id'])
1883             self.assertEqual(body['floatingip']['port_id'],
1884                              fip['floatingip']['port_id'])
1885             self.assertEqual(expected_status, body['floatingip']['status'])
1886             self.assertIsNotNone(body['floatingip']['fixed_ip_address'])
1887             self.assertIsNotNone(body['floatingip']['router_id'])
1888
1889     def test_create_floatingip_non_admin_context_agent_notification(self):
1890         plugin = manager.NeutronManager.get_service_plugins()[
1891             service_constants.L3_ROUTER_NAT]
1892         if not hasattr(plugin, 'l3_rpc_notifier'):
1893             self.skipTest("Plugin does not support l3_rpc_notifier")
1894
1895         with self.subnet(cidr='11.0.0.0/24') as public_sub,\
1896                 self.port() as private_port,\
1897                 self.router() as r:
1898             self._set_net_external(public_sub['subnet']['network_id'])
1899             subnet_id = private_port['port']['fixed_ips'][0]['subnet_id']
1900             private_sub = {'subnet': {'id': subnet_id}}
1901
1902             self._add_external_gateway_to_router(
1903                 r['router']['id'],
1904                 public_sub['subnet']['network_id'])
1905             self._router_interface_action(
1906                 'add', r['router']['id'],
1907                 private_sub['subnet']['id'], None)
1908
1909             with mock.patch.object(plugin.l3_rpc_notifier,
1910                                    'routers_updated') as agent_notification:
1911                 self._make_floatingip(
1912                     self.fmt,
1913                     public_sub['subnet']['network_id'],
1914                     port_id=private_port['port']['id'],
1915                     set_context=True)
1916                 self.assertTrue(agent_notification.called)
1917
1918     def test_floating_port_status_not_applicable(self):
1919         with self.floatingip_with_assoc():
1920             port_body = self._list('ports',
1921                query_params='device_owner=network:floatingip')['ports'][0]
1922             self.assertEqual(l3_constants.PORT_STATUS_NOTAPPLICABLE,
1923                              port_body['status'])
1924
1925     def test_floatingip_update(
1926         self, expected_status=l3_constants.FLOATINGIP_STATUS_ACTIVE):
1927         with self.port() as p:
1928             private_sub = {'subnet': {'id':
1929                                       p['port']['fixed_ips'][0]['subnet_id']}}
1930             with self.floatingip_no_assoc(private_sub) as fip:
1931                 body = self._show('floatingips', fip['floatingip']['id'])
1932                 self.assertIsNone(body['floatingip']['port_id'])
1933                 self.assertIsNone(body['floatingip']['fixed_ip_address'])
1934                 self.assertEqual(body['floatingip']['status'], expected_status)
1935
1936                 port_id = p['port']['id']
1937                 ip_address = p['port']['fixed_ips'][0]['ip_address']
1938                 body = self._update('floatingips', fip['floatingip']['id'],
1939                                     {'floatingip': {'port_id': port_id}})
1940                 self.assertEqual(body['floatingip']['port_id'], port_id)
1941                 self.assertEqual(body['floatingip']['fixed_ip_address'],
1942                                  ip_address)
1943
1944     def test_floatingip_create_different_fixed_ip_same_port(self):
1945         '''This tests that it is possible to delete a port that has
1946         multiple floating ip addresses associated with it (each floating
1947         address associated with a unique fixed address).
1948         '''
1949
1950         with self.router() as r:
1951             with self.subnet(cidr='11.0.0.0/24') as public_sub:
1952                 self._set_net_external(public_sub['subnet']['network_id'])
1953                 self._add_external_gateway_to_router(
1954                     r['router']['id'],
1955                     public_sub['subnet']['network_id'])
1956
1957                 with self.subnet() as private_sub:
1958                     ip_range = list(netaddr.IPNetwork(
1959                         private_sub['subnet']['cidr']))
1960                     fixed_ips = [{'ip_address': str(ip_range[-3])},
1961                                  {'ip_address': str(ip_range[-2])}]
1962
1963                     self._router_interface_action(
1964                         'add', r['router']['id'],
1965                         private_sub['subnet']['id'], None)
1966
1967                     with self.port(subnet=private_sub,
1968                                    fixed_ips=fixed_ips) as p:
1969
1970                         fip1 = self._make_floatingip(
1971                             self.fmt,
1972                             public_sub['subnet']['network_id'],
1973                             p['port']['id'],
1974                             fixed_ip=str(ip_range[-2]))
1975                         fip2 = self._make_floatingip(
1976                             self.fmt,
1977                             public_sub['subnet']['network_id'],
1978                             p['port']['id'],
1979                             fixed_ip=str(ip_range[-3]))
1980
1981                         # Test that floating ips are assigned successfully.
1982                         body = self._show('floatingips',
1983                                           fip1['floatingip']['id'])
1984                         self.assertEqual(
1985                             body['floatingip']['port_id'],
1986                             fip1['floatingip']['port_id'])
1987
1988                         body = self._show('floatingips',
1989                                           fip2['floatingip']['id'])
1990                         self.assertEqual(
1991                             body['floatingip']['port_id'],
1992                             fip2['floatingip']['port_id'])
1993                     self._delete('ports', p['port']['id'])
1994                     # Test that port has been successfully deleted.
1995                     body = self._show('ports', p['port']['id'],
1996                                       expected_code=exc.HTTPNotFound.code)
1997
1998     def test_floatingip_update_different_fixed_ip_same_port(self):
1999         with self.subnet() as s:
2000             ip_range = list(netaddr.IPNetwork(s['subnet']['cidr']))
2001             fixed_ips = [{'ip_address': str(ip_range[-3])},
2002                          {'ip_address': str(ip_range[-2])}]
2003             with self.port(subnet=s, fixed_ips=fixed_ips) as p:
2004                 with self.floatingip_with_assoc(
2005                     port_id=p['port']['id'],
2006                     fixed_ip=str(ip_range[-3])) as fip:
2007                     body = self._show('floatingips', fip['floatingip']['id'])
2008                     self.assertEqual(fip['floatingip']['id'],
2009                                      body['floatingip']['id'])
2010                     self.assertEqual(fip['floatingip']['port_id'],
2011                                      body['floatingip']['port_id'])
2012                     self.assertEqual(str(ip_range[-3]),
2013                                      body['floatingip']['fixed_ip_address'])
2014                     self.assertIsNotNone(body['floatingip']['router_id'])
2015                     body_2 = self._update(
2016                         'floatingips', fip['floatingip']['id'],
2017                         {'floatingip': {'port_id': p['port']['id'],
2018                                         'fixed_ip_address': str(ip_range[-2])}
2019                          })
2020                     self.assertEqual(fip['floatingip']['port_id'],
2021                                      body_2['floatingip']['port_id'])
2022                     self.assertEqual(str(ip_range[-2]),
2023                                      body_2['floatingip']['fixed_ip_address'])
2024
2025     def test_floatingip_update_different_router(self):
2026         # Create subnet with different CIDRs to account for plugins which
2027         # do not support overlapping IPs
2028         with self.subnet(cidr='10.0.0.0/24') as s1,\
2029                 self.subnet(cidr='10.0.1.0/24') as s2:
2030             with self.port(subnet=s1) as p1, self.port(subnet=s2) as p2:
2031                 private_sub1 = {'subnet':
2032                                 {'id':
2033                                  p1['port']['fixed_ips'][0]['subnet_id']}}
2034                 private_sub2 = {'subnet':
2035                                 {'id':
2036                                  p2['port']['fixed_ips'][0]['subnet_id']}}
2037                 with self.subnet(cidr='12.0.0.0/24') as public_sub:
2038                     with self.floatingip_no_assoc_with_public_sub(
2039                         private_sub1,
2040                         public_sub=public_sub) as (fip1, r1),\
2041                             self.floatingip_no_assoc_with_public_sub(
2042                                 private_sub2,
2043                                 public_sub=public_sub) as (fip2, r2):
2044
2045                         def assert_no_assoc(fip):
2046                             body = self._show('floatingips',
2047                                               fip['floatingip']['id'])
2048                             self.assertIsNone(body['floatingip']['port_id'])
2049                             self.assertIsNone(
2050                                 body['floatingip']['fixed_ip_address'])
2051
2052                         assert_no_assoc(fip1)
2053                         assert_no_assoc(fip2)
2054
2055                         def associate_and_assert(fip, port):
2056                             port_id = port['port']['id']
2057                             ip_address = (port['port']['fixed_ips']
2058                                           [0]['ip_address'])
2059                             body = self._update(
2060                                 'floatingips', fip['floatingip']['id'],
2061                                 {'floatingip': {'port_id': port_id}})
2062                             self.assertEqual(body['floatingip']['port_id'],
2063                                              port_id)
2064                             self.assertEqual(
2065                                 body['floatingip']['fixed_ip_address'],
2066                                 ip_address)
2067                             return body['floatingip']['router_id']
2068
2069                         fip1_r1_res = associate_and_assert(fip1, p1)
2070                         self.assertEqual(fip1_r1_res, r1['router']['id'])
2071                         # The following operation will associate the floating
2072                         # ip to a different router
2073                         fip1_r2_res = associate_and_assert(fip1, p2)
2074                         self.assertEqual(fip1_r2_res, r2['router']['id'])
2075                         fip2_r1_res = associate_and_assert(fip2, p1)
2076                         self.assertEqual(fip2_r1_res, r1['router']['id'])
2077                         # disassociate fip1
2078                         self._update(
2079                             'floatingips', fip1['floatingip']['id'],
2080                             {'floatingip': {'port_id': None}})
2081                         fip2_r2_res = associate_and_assert(fip2, p2)
2082                         self.assertEqual(fip2_r2_res, r2['router']['id'])
2083
2084     def test_floatingip_port_delete(self):
2085         with self.subnet() as private_sub:
2086             with self.floatingip_no_assoc(private_sub) as fip:
2087                 with self.port(subnet=private_sub) as p:
2088                     body = self._update('floatingips', fip['floatingip']['id'],
2089                                         {'floatingip':
2090                                          {'port_id': p['port']['id']}})
2091                 # note: once this port goes out of scope, the port will be
2092                 # deleted, which is what we want to test. We want to confirm
2093                 # that the fields are set back to None
2094                 self._delete('ports', p['port']['id'])
2095                 body = self._show('floatingips', fip['floatingip']['id'])
2096                 self.assertEqual(body['floatingip']['id'],
2097                                  fip['floatingip']['id'])
2098                 self.assertIsNone(body['floatingip']['port_id'])
2099                 self.assertIsNone(body['floatingip']['fixed_ip_address'])
2100                 self.assertIsNone(body['floatingip']['router_id'])
2101
2102     def test_two_fips_one_port_invalid_return_409(self):
2103         with self.floatingip_with_assoc() as fip1:
2104             res = self._create_floatingip(
2105                 self.fmt,
2106                 fip1['floatingip']['floating_network_id'],
2107                 fip1['floatingip']['port_id'])
2108             self.assertEqual(res.status_int, exc.HTTPConflict.code)
2109
2110     def test_floating_ip_direct_port_delete_returns_409(self):
2111         found = False
2112         with self.floatingip_with_assoc():
2113             for p in self._list('ports')['ports']:
2114                 if p['device_owner'] == l3_constants.DEVICE_OWNER_FLOATINGIP:
2115                     self._delete('ports', p['id'],
2116                                  expected_code=exc.HTTPConflict.code)
2117                     found = True
2118         self.assertTrue(found)
2119
2120     def _test_floatingip_with_invalid_create_port(self, plugin_class):
2121         with self.port() as p:
2122             private_sub = {'subnet': {'id':
2123                                       p['port']['fixed_ips'][0]['subnet_id']}}
2124             with self.subnet(cidr='12.0.0.0/24') as public_sub:
2125                 self._set_net_external(public_sub['subnet']['network_id'])
2126                 res = self._create_router(self.fmt, _uuid())
2127                 r = self.deserialize(self.fmt, res)
2128                 self._add_external_gateway_to_router(
2129                     r['router']['id'],
2130                     public_sub['subnet']['network_id'])
2131                 self._router_interface_action(
2132                     'add', r['router']['id'],
2133                     private_sub['subnet']['id'],
2134                     None)
2135
2136                 with mock.patch(plugin_class + '.create_port') as createport:
2137                     createport.return_value = {'fixed_ips': []}
2138                     res = self._create_floatingip(
2139                         self.fmt, public_sub['subnet']['network_id'],
2140                         port_id=p['port']['id'])
2141                     self.assertEqual(res.status_int,
2142                                      exc.HTTPBadRequest.code)
2143
2144     def test_floatingip_with_invalid_create_port(self):
2145         self._test_floatingip_with_invalid_create_port(
2146             'neutron.db.db_base_plugin_v2.NeutronDbPluginV2')
2147
2148     def test_create_floatingip_with_subnet_id_non_admin(self):
2149         with self.subnet() as public_sub:
2150             self._set_net_external(public_sub['subnet']['network_id'])
2151             with self.router():
2152                 res = self._create_floatingip(
2153                     self.fmt,
2154                     public_sub['subnet']['network_id'],
2155                     subnet_id=public_sub['subnet']['id'],
2156                     set_context=True)
2157         self.assertEqual(res.status_int, exc.HTTPCreated.code)
2158
2159     def test_create_floatingip_with_multisubnet_id(self):
2160         with self.network() as network:
2161             self._set_net_external(network['network']['id'])
2162             with self.subnet(network, cidr='10.0.12.0/24') as subnet1:
2163                 with self.subnet(network, cidr='10.0.13.0/24') as subnet2:
2164                     with self.router():
2165                         res = self._create_floatingip(
2166                             self.fmt,
2167                             subnet1['subnet']['network_id'],
2168                             subnet_id=subnet1['subnet']['id'])
2169                         fip1 = self.deserialize(self.fmt, res)
2170                         res = self._create_floatingip(
2171                             self.fmt,
2172                             subnet1['subnet']['network_id'],
2173                             subnet_id=subnet2['subnet']['id'])
2174                         fip2 = self.deserialize(self.fmt, res)
2175         self.assertTrue(
2176             fip1['floatingip']['floating_ip_address'].startswith('10.0.12'))
2177         self.assertTrue(
2178             fip2['floatingip']['floating_ip_address'].startswith('10.0.13'))
2179
2180     def test_create_floatingip_with_wrong_subnet_id(self):
2181         with self.network() as network1:
2182             self._set_net_external(network1['network']['id'])
2183             with self.subnet(network1, cidr='10.0.12.0/24') as subnet1:
2184                 with self.network() as network2:
2185                     self._set_net_external(network2['network']['id'])
2186                     with self.subnet(network2, cidr='10.0.13.0/24') as subnet2:
2187                         with self.router():
2188                             res = self._create_floatingip(
2189                                 self.fmt,
2190                                 subnet1['subnet']['network_id'],
2191                                 subnet_id=subnet2['subnet']['id'])
2192         self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
2193
2194     def test_create_floatingip_no_ext_gateway_return_404(self):
2195         with self.subnet() as public_sub:
2196             self._set_net_external(public_sub['subnet']['network_id'])
2197             with self.port() as private_port:
2198                 with self.router():
2199                     res = self._create_floatingip(
2200                         self.fmt,
2201                         public_sub['subnet']['network_id'],
2202                         port_id=private_port['port']['id'])
2203                     # this should be some kind of error
2204                     self.assertEqual(res.status_int, exc.HTTPNotFound.code)
2205
2206     def test_create_floating_non_ext_network_returns_400(self):
2207         with self.subnet() as public_sub:
2208             # normally we would set the network of public_sub to be
2209             # external, but the point of this test is to handle when
2210             # that is not the case
2211             with self.router():
2212                 res = self._create_floatingip(
2213                     self.fmt,
2214                     public_sub['subnet']['network_id'])
2215                 self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
2216
2217     def test_create_floatingip_no_public_subnet_returns_400(self):
2218         with self.network() as public_network:
2219             with self.port() as private_port:
2220                 with self.router() as r:
2221                     sid = private_port['port']['fixed_ips'][0]['subnet_id']
2222                     private_sub = {'subnet': {'id': sid}}
2223                     self._router_interface_action('add', r['router']['id'],
2224                                                   private_sub['subnet']['id'],
2225                                                   None)
2226
2227                     res = self._create_floatingip(
2228                         self.fmt,
2229                         public_network['network']['id'],
2230                         port_id=private_port['port']['id'])
2231                     self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
2232
2233     def test_create_floatingip_invalid_floating_network_id_returns_400(self):
2234         # API-level test - no need to create all objects for l3 plugin
2235         res = self._create_floatingip(self.fmt, 'iamnotanuuid',
2236                                       uuidutils.generate_uuid(), '192.168.0.1')
2237         self.assertEqual(res.status_int, 400)
2238
2239     def test_create_floatingip_invalid_floating_port_id_returns_400(self):
2240         # API-level test - no need to create all objects for l3 plugin
2241         res = self._create_floatingip(self.fmt, uuidutils.generate_uuid(),
2242                                       'iamnotanuuid', '192.168.0.1')
2243         self.assertEqual(res.status_int, 400)
2244
2245     def test_create_floatingip_invalid_fixed_ip_address_returns_400(self):
2246         # API-level test - no need to create all objects for l3 plugin
2247         res = self._create_floatingip(self.fmt, uuidutils.generate_uuid(),
2248                                       uuidutils.generate_uuid(), 'iamnotnanip')
2249         self.assertEqual(res.status_int, 400)
2250
2251     def test_floatingip_list_with_sort(self):
2252         with self.subnet(cidr="10.0.0.0/24") as s1,\
2253                 self.subnet(cidr="11.0.0.0/24") as s2,\
2254                 self.subnet(cidr="12.0.0.0/24") as s3:
2255             network_id1 = s1['subnet']['network_id']
2256             network_id2 = s2['subnet']['network_id']
2257             network_id3 = s3['subnet']['network_id']
2258             self._set_net_external(network_id1)
2259             self._set_net_external(network_id2)
2260             self._set_net_external(network_id3)
2261             fp1 = self._make_floatingip(self.fmt, network_id1)
2262             fp2 = self._make_floatingip(self.fmt, network_id2)
2263             fp3 = self._make_floatingip(self.fmt, network_id3)
2264             self._test_list_with_sort('floatingip', (fp3, fp2, fp1),
2265                                       [('floating_ip_address', 'desc')])
2266
2267     def test_floatingip_list_with_port_id(self):
2268         with self.floatingip_with_assoc() as fip:
2269             port_id = fip['floatingip']['port_id']
2270             res = self._list('floatingips',
2271                              query_params="port_id=%s" % port_id)
2272             self.assertEqual(len(res['floatingips']), 1)
2273             res = self._list('floatingips', query_params="port_id=aaa")
2274             self.assertEqual(len(res['floatingips']), 0)
2275
2276     def test_floatingip_list_with_pagination(self):
2277         with self.subnet(cidr="10.0.0.0/24") as s1,\
2278                 self.subnet(cidr="11.0.0.0/24") as s2,\
2279                 self.subnet(cidr="12.0.0.0/24") as s3:
2280             network_id1 = s1['subnet']['network_id']
2281             network_id2 = s2['subnet']['network_id']
2282             network_id3 = s3['subnet']['network_id']
2283             self._set_net_external(network_id1)
2284             self._set_net_external(network_id2)
2285             self._set_net_external(network_id3)
2286             fp1 = self._make_floatingip(self.fmt, network_id1)
2287             fp2 = self._make_floatingip(self.fmt, network_id2)
2288             fp3 = self._make_floatingip(self.fmt, network_id3)
2289             self._test_list_with_pagination(
2290                 'floatingip', (fp1, fp2, fp3),
2291                 ('floating_ip_address', 'asc'), 2, 2)
2292
2293     def test_floatingip_list_with_pagination_reverse(self):
2294         with self.subnet(cidr="10.0.0.0/24") as s1,\
2295                 self.subnet(cidr="11.0.0.0/24") as s2,\
2296                 self.subnet(cidr="12.0.0.0/24") as s3:
2297             network_id1 = s1['subnet']['network_id']
2298             network_id2 = s2['subnet']['network_id']
2299             network_id3 = s3['subnet']['network_id']
2300             self._set_net_external(network_id1)
2301             self._set_net_external(network_id2)
2302             self._set_net_external(network_id3)
2303             fp1 = self._make_floatingip(self.fmt, network_id1)
2304             fp2 = self._make_floatingip(self.fmt, network_id2)
2305             fp3 = self._make_floatingip(self.fmt, network_id3)
2306             self._test_list_with_pagination_reverse(
2307                 'floatingip', (fp1, fp2, fp3),
2308                 ('floating_ip_address', 'asc'), 2, 2)
2309
2310     def test_floatingip_multi_external_one_internal(self):
2311         with self.subnet(cidr="10.0.0.0/24") as exs1,\
2312                 self.subnet(cidr="11.0.0.0/24") as exs2,\
2313                 self.subnet(cidr="12.0.0.0/24") as ins1:
2314             network_ex_id1 = exs1['subnet']['network_id']
2315             network_ex_id2 = exs2['subnet']['network_id']
2316             self._set_net_external(network_ex_id1)
2317             self._set_net_external(network_ex_id2)
2318
2319             r2i_fixed_ips = [{'ip_address': '12.0.0.2'}]
2320             with self.router() as r1,\
2321                     self.router() as r2,\
2322                     self.port(subnet=ins1,
2323                               fixed_ips=r2i_fixed_ips) as r2i_port:
2324                 self._add_external_gateway_to_router(
2325                     r1['router']['id'],
2326                     network_ex_id1)
2327                 self._router_interface_action('add', r1['router']['id'],
2328                                               ins1['subnet']['id'],
2329                                               None)
2330                 self._add_external_gateway_to_router(
2331                     r2['router']['id'],
2332                     network_ex_id2)
2333                 self._router_interface_action('add', r2['router']['id'],
2334                                               None,
2335                                               r2i_port['port']['id'])
2336
2337                 with self.port(subnet=ins1,
2338                                fixed_ips=[{'ip_address': '12.0.0.3'}]
2339                                ) as private_port:
2340
2341                     fp1 = self._make_floatingip(self.fmt, network_ex_id1,
2342                                             private_port['port']['id'],
2343                                             floating_ip='10.0.0.3')
2344                     fp2 = self._make_floatingip(self.fmt, network_ex_id2,
2345                                             private_port['port']['id'],
2346                                             floating_ip='11.0.0.3')
2347                     self.assertEqual(fp1['floatingip']['router_id'],
2348                                      r1['router']['id'])
2349                     self.assertEqual(fp2['floatingip']['router_id'],
2350                                      r2['router']['id'])
2351
2352     def test_floatingip_same_external_and_internal(self):
2353         # Select router with subnet's gateway_ip for floatingip when
2354         # routers connected to same subnet and external network.
2355         with self.subnet(cidr="10.0.0.0/24") as exs,\
2356                 self.subnet(cidr="12.0.0.0/24", gateway_ip="12.0.0.50") as ins:
2357             network_ex_id = exs['subnet']['network_id']
2358             self._set_net_external(network_ex_id)
2359
2360             r2i_fixed_ips = [{'ip_address': '12.0.0.2'}]
2361             with self.router() as r1,\
2362                     self.router() as r2,\
2363                     self.port(subnet=ins,
2364                               fixed_ips=r2i_fixed_ips) as r2i_port:
2365                 self._add_external_gateway_to_router(
2366                     r1['router']['id'],
2367                     network_ex_id)
2368                 self._router_interface_action('add', r2['router']['id'],
2369                                               None,
2370                                               r2i_port['port']['id'])
2371                 self._router_interface_action('add', r1['router']['id'],
2372                                               ins['subnet']['id'],
2373                                               None)
2374                 self._add_external_gateway_to_router(
2375                     r2['router']['id'],
2376                     network_ex_id)
2377
2378                 with self.port(subnet=ins,
2379                                fixed_ips=[{'ip_address': '12.0.0.8'}]
2380                                ) as private_port:
2381
2382                     fp = self._make_floatingip(self.fmt, network_ex_id,
2383                                             private_port['port']['id'],
2384                                             floating_ip='10.0.0.8')
2385                     self.assertEqual(r1['router']['id'],
2386                                      fp['floatingip']['router_id'])
2387
2388     def test_floatingip_delete_router_intf_with_subnet_id_returns_409(self):
2389         found = False
2390         with self.floatingip_with_assoc():
2391             for p in self._list('ports')['ports']:
2392                 if p['device_owner'] == l3_constants.DEVICE_OWNER_ROUTER_INTF:
2393                     subnet_id = p['fixed_ips'][0]['subnet_id']
2394                     router_id = p['device_id']
2395                     self._router_interface_action(
2396                         'remove', router_id, subnet_id, None,
2397                         expected_code=exc.HTTPConflict.code)
2398                     found = True
2399                     break
2400         self.assertTrue(found)
2401
2402     def test_floatingip_delete_router_intf_with_port_id_returns_409(self):
2403         found = False
2404         with self.floatingip_with_assoc():
2405             for p in self._list('ports')['ports']:
2406                 if p['device_owner'] == l3_constants.DEVICE_OWNER_ROUTER_INTF:
2407                     router_id = p['device_id']
2408                     self._router_interface_action(
2409                         'remove', router_id, None, p['id'],
2410                         expected_code=exc.HTTPConflict.code)
2411                     found = True
2412                     break
2413         self.assertTrue(found)
2414
2415     def _test_router_delete_subnet_inuse_returns_409(self, router, subnet):
2416         r, s = router, subnet
2417         self._router_interface_action('add',
2418                                       r['router']['id'],
2419                                       s['subnet']['id'],
2420                                       None)
2421         # subnet cannot be deleted as it's attached to a router
2422         self._delete('subnets', s['subnet']['id'],
2423                      expected_code=exc.HTTPConflict.code)
2424
2425     def _ipv6_subnet(self, mode):
2426         return self.subnet(cidr='fd00::1/64', gateway_ip='fd00::1',
2427                            ip_version=6,
2428                            ipv6_ra_mode=mode,
2429                            ipv6_address_mode=mode)
2430
2431     def test_router_delete_subnet_inuse_returns_409(self):
2432         with self.router() as r:
2433             with self.subnet() as s:
2434                 self._test_router_delete_subnet_inuse_returns_409(r, s)
2435
2436     def test_router_delete_ipv6_slaac_subnet_inuse_returns_409(self):
2437         with self.router() as r:
2438             with self._ipv6_subnet(l3_constants.IPV6_SLAAC) as s:
2439                 self._test_router_delete_subnet_inuse_returns_409(r, s)
2440
2441     def test_router_delete_dhcpv6_stateless_subnet_inuse_returns_409(self):
2442         with self.router() as r:
2443             with self._ipv6_subnet(l3_constants.DHCPV6_STATELESS) as s:
2444                 self._test_router_delete_subnet_inuse_returns_409(r, s)
2445
2446     def test_delete_ext_net_with_disassociated_floating_ips(self):
2447         with self.network() as net:
2448             net_id = net['network']['id']
2449             self._set_net_external(net_id)
2450             with self.subnet(network=net):
2451                 self._make_floatingip(self.fmt, net_id)
2452
2453     def test_create_floatingip_with_specific_ip(self):
2454         with self.subnet(cidr='10.0.0.0/24') as s:
2455             network_id = s['subnet']['network_id']
2456             self._set_net_external(network_id)
2457             fp = self._make_floatingip(self.fmt, network_id,
2458                                        floating_ip='10.0.0.10')
2459             self.assertEqual(fp['floatingip']['floating_ip_address'],
2460                              '10.0.0.10')
2461
2462     def test_create_floatingip_with_specific_ip_out_of_allocation(self):
2463         with self.subnet(cidr='10.0.0.0/24',
2464                          allocation_pools=[
2465                              {'start': '10.0.0.10', 'end': '10.0.0.20'}]
2466                          ) as s:
2467             network_id = s['subnet']['network_id']
2468             self._set_net_external(network_id)
2469             fp = self._make_floatingip(self.fmt, network_id,
2470                                        floating_ip='10.0.0.30')
2471             self.assertEqual(fp['floatingip']['floating_ip_address'],
2472                              '10.0.0.30')
2473
2474     def test_create_floatingip_with_specific_ip_non_admin(self):
2475         ctx = context.Context('user_id', 'tenant_id')
2476
2477         with self.subnet(cidr='10.0.0.0/24') as s:
2478             network_id = s['subnet']['network_id']
2479             self._set_net_external(network_id)
2480             self._make_floatingip(self.fmt, network_id,
2481                                   set_context=ctx,
2482                                   floating_ip='10.0.0.10',
2483                                   http_status=exc.HTTPForbidden.code)
2484
2485     def test_create_floatingip_with_specific_ip_out_of_subnet(self):
2486
2487         with self.subnet(cidr='10.0.0.0/24') as s:
2488             network_id = s['subnet']['network_id']
2489             self._set_net_external(network_id)
2490             self._make_floatingip(self.fmt, network_id,
2491                                   floating_ip='10.0.1.10',
2492                                   http_status=exc.HTTPBadRequest.code)
2493
2494     def test_create_floatingip_with_duplicated_specific_ip(self):
2495
2496         with self.subnet(cidr='10.0.0.0/24') as s:
2497             network_id = s['subnet']['network_id']
2498             self._set_net_external(network_id)
2499             self._make_floatingip(self.fmt, network_id,
2500                                   floating_ip='10.0.0.10')
2501
2502             self._make_floatingip(self.fmt, network_id,
2503                                   floating_ip='10.0.0.10',
2504                                   http_status=exc.HTTPConflict.code)
2505
2506     def test_router_specify_id_backend(self):
2507         plugin = manager.NeutronManager.get_service_plugins()[
2508                     service_constants.L3_ROUTER_NAT]
2509         router_req = {'router': {'id': _uuid(), 'name': 'router',
2510                                  'tenant_id': 'foo',
2511                                  'admin_state_up': True}}
2512         result = plugin.create_router(context.Context('', 'foo'), router_req)
2513         self.assertEqual(result['id'], router_req['router']['id'])
2514
2515     def test_create_floatingip_ipv6_only_network_returns_400(self):
2516         with self.subnet(cidr="2001:db8::/48", ip_version=6) as public_sub:
2517             self._set_net_external(public_sub['subnet']['network_id'])
2518             res = self._create_floatingip(
2519                 self.fmt,
2520                 public_sub['subnet']['network_id'])
2521             self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
2522
2523     def test_create_floatingip_ipv6_and_ipv4_network_creates_ipv4(self):
2524         with self.network() as n,\
2525                 self.subnet(cidr="2001:db8::/48", ip_version=6, network=n),\
2526                 self.subnet(cidr="192.168.1.0/24", ip_version=4, network=n):
2527             self._set_net_external(n['network']['id'])
2528             fip = self._make_floatingip(self.fmt, n['network']['id'])
2529             self.assertEqual(fip['floatingip']['floating_ip_address'],
2530                              '192.168.1.2')
2531
2532     def test_create_floatingip_with_assoc_to_ipv6_subnet(self):
2533         with self.subnet() as public_sub:
2534             self._set_net_external(public_sub['subnet']['network_id'])
2535             with self.subnet(cidr="2001:db8::/48",
2536                              ip_version=6) as private_sub:
2537                 with self.port(subnet=private_sub) as private_port:
2538                     res = self._create_floatingip(
2539                         self.fmt,
2540                         public_sub['subnet']['network_id'],
2541                         port_id=private_port['port']['id'])
2542                     self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
2543
2544     def test_create_floatingip_with_assoc_to_ipv4_and_ipv6_port(self):
2545         with self.network() as n,\
2546                 self.subnet(cidr='10.0.0.0/24', network=n) as s4,\
2547                 self.subnet(cidr='2001:db8::/64', ip_version=6, network=n),\
2548                 self.port(subnet=s4) as p:
2549             self.assertEqual(len(p['port']['fixed_ips']), 2)
2550             ipv4_address = next(i['ip_address'] for i in
2551                     p['port']['fixed_ips'] if
2552                     netaddr.IPAddress(i['ip_address']).version == 4)
2553             with self.floatingip_with_assoc(port_id=p['port']['id']) as fip:
2554                 self.assertEqual(fip['floatingip']['fixed_ip_address'],
2555                                  ipv4_address)
2556                 floating_ip = netaddr.IPAddress(
2557                         fip['floatingip']['floating_ip_address'])
2558                 self.assertEqual(floating_ip.version, 4)
2559
2560     def test_update_subnet_gateway_for_external_net(self):
2561         """Test to make sure notification to routers occurs when the gateway
2562             ip address of a subnet of the external network is changed.
2563         """
2564         plugin = manager.NeutronManager.get_service_plugins()[
2565             service_constants.L3_ROUTER_NAT]
2566         if not hasattr(plugin, 'l3_rpc_notifier'):
2567             self.skipTest("Plugin does not support l3_rpc_notifier")
2568         # make sure the callback is registered.
2569         registry.subscribe(
2570             l3_db._notify_subnet_gateway_ip_update, resources.SUBNET_GATEWAY,
2571             events.AFTER_UPDATE)
2572         with mock.patch.object(plugin.l3_rpc_notifier,
2573                                'routers_updated') as chk_method:
2574             with self.network() as network:
2575                 allocation_pools = [{'start': '120.0.0.3',
2576                                      'end': '120.0.0.254'}]
2577                 with self.subnet(network=network,
2578                                  gateway_ip='120.0.0.1',
2579                                  allocation_pools=allocation_pools,
2580                                  cidr='120.0.0.0/24') as subnet:
2581                     kwargs = {
2582                         'device_owner': l3_constants.DEVICE_OWNER_ROUTER_GW,
2583                         'device_id': 'fake_device'}
2584                     with self.port(subnet=subnet, **kwargs):
2585                         data = {'subnet': {'gateway_ip': '120.0.0.2'}}
2586                         req = self.new_update_request('subnets', data,
2587                                                       subnet['subnet']['id'])
2588                         res = self.deserialize(self.fmt,
2589                                                req.get_response(self.api))
2590                         self.assertEqual(res['subnet']['gateway_ip'],
2591                                          data['subnet']['gateway_ip'])
2592                         chk_method.assert_called_with(mock.ANY,
2593                                                       ['fake_device'], None)
2594
2595
2596 class L3AgentDbTestCaseBase(L3NatTestCaseMixin):
2597
2598     """Unit tests for methods called by the L3 agent."""
2599
2600     def test_l3_agent_routers_query_interfaces(self):
2601         with self.router() as r:
2602             with self.port() as p:
2603                 self._router_interface_action('add',
2604                                               r['router']['id'],
2605                                               None,
2606                                               p['port']['id'])
2607
2608                 routers = self.plugin.get_sync_data(
2609                     context.get_admin_context(), None)
2610                 self.assertEqual(1, len(routers))
2611                 interfaces = routers[0][l3_constants.INTERFACE_KEY]
2612                 self.assertEqual(1, len(interfaces))
2613                 subnets = interfaces[0]['subnets']
2614                 self.assertEqual(1, len(subnets))
2615                 subnet_id = subnets[0]['id']
2616                 wanted_subnetid = p['port']['fixed_ips'][0]['subnet_id']
2617                 self.assertEqual(wanted_subnetid, subnet_id)
2618
2619     def test_l3_agent_sync_interfaces(self):
2620         """Test L3 interfaces query return valid result"""
2621         with self.router() as router1, self.router() as router2:
2622             with self.port() as port1, self.port() as port2:
2623                 self._router_interface_action('add',
2624                                               router1['router']['id'],
2625                                               None,
2626                                               port1['port']['id'])
2627                 self._router_interface_action('add',
2628                                               router2['router']['id'],
2629                                               None,
2630                                               port2['port']['id'])
2631                 admin_ctx = context.get_admin_context()
2632                 router1_id = router1['router']['id']
2633                 router2_id = router2['router']['id']
2634
2635                 # Verify if router1 pass in, return only interface from router1
2636                 ifaces = self.plugin._get_sync_interfaces(admin_ctx,
2637                                                           [router1_id])
2638                 self.assertEqual(1, len(ifaces))
2639                 self.assertEqual(router1_id,
2640                                  ifaces[0]['device_id'])
2641
2642                 # Verify if router1 and router2 pass in, return both interfaces
2643                 ifaces = self.plugin._get_sync_interfaces(admin_ctx,
2644                                                           [router1_id,
2645                                                            router2_id])
2646                 self.assertEqual(2, len(ifaces))
2647                 device_list = [i['device_id'] for i in ifaces]
2648                 self.assertIn(router1_id, device_list)
2649                 self.assertIn(router2_id, device_list)
2650
2651                 #Verify if no router pass in, return empty list
2652                 ifaces = self.plugin._get_sync_interfaces(admin_ctx, None)
2653                 self.assertEqual(0, len(ifaces))
2654
2655     def test_l3_agent_routers_query_ignore_interfaces_with_moreThanOneIp(self):
2656         with self.router() as r:
2657             with self.subnet(cidr='9.0.1.0/24') as subnet:
2658                 with self.port(subnet=subnet,
2659                                fixed_ips=[{'ip_address': '9.0.1.3'}]) as p:
2660                     self._router_interface_action('add',
2661                                                   r['router']['id'],
2662                                                   None,
2663                                                   p['port']['id'])
2664                     port = {'port': {'fixed_ips':
2665                                      [{'ip_address': '9.0.1.4',
2666                                        'subnet_id': subnet['subnet']['id']},
2667                                       {'ip_address': '9.0.1.5',
2668                                        'subnet_id': subnet['subnet']['id']}]}}
2669                     ctx = context.get_admin_context()
2670                     self.core_plugin.update_port(ctx, p['port']['id'], port)
2671                     routers = self.plugin.get_sync_data(ctx, None)
2672                     self.assertEqual(1, len(routers))
2673                     interfaces = routers[0].get(l3_constants.INTERFACE_KEY, [])
2674                     self.assertEqual(1, len(interfaces))
2675
2676     def test_l3_agent_routers_query_gateway(self):
2677         with self.router() as r:
2678             with self.subnet() as s:
2679                 self._set_net_external(s['subnet']['network_id'])
2680                 self._add_external_gateway_to_router(
2681                     r['router']['id'],
2682                     s['subnet']['network_id'])
2683                 routers = self.plugin.get_sync_data(
2684                     context.get_admin_context(), [r['router']['id']])
2685                 self.assertEqual(1, len(routers))
2686                 gw_port = routers[0]['gw_port']
2687                 subnets = gw_port.get('subnets')
2688                 self.assertEqual(1, len(subnets))
2689                 self.assertEqual(s['subnet']['id'], subnets[0]['id'])
2690                 self._remove_external_gateway_from_router(
2691                     r['router']['id'],
2692                     s['subnet']['network_id'])
2693
2694     def test_l3_agent_routers_query_floatingips(self):
2695         with self.floatingip_with_assoc() as fip:
2696             routers = self.plugin.get_sync_data(
2697                 context.get_admin_context(), [fip['floatingip']['router_id']])
2698             self.assertEqual(1, len(routers))
2699             floatingips = routers[0][l3_constants.FLOATINGIP_KEY]
2700             self.assertEqual(1, len(floatingips))
2701             self.assertEqual(floatingips[0]['id'],
2702                              fip['floatingip']['id'])
2703             self.assertEqual(floatingips[0]['port_id'],
2704                              fip['floatingip']['port_id'])
2705             self.assertIsNotNone(floatingips[0]['fixed_ip_address'])
2706             self.assertIsNotNone(floatingips[0]['router_id'])
2707
2708     def _test_notify_op_agent(self, target_func, *args):
2709         l3_rpc_agent_api_str = (
2710             'neutron.api.rpc.agentnotifiers.l3_rpc_agent_api.L3AgentNotifyAPI')
2711         with mock.patch(l3_rpc_agent_api_str):
2712             plugin = manager.NeutronManager.get_service_plugins()[
2713                 service_constants.L3_ROUTER_NAT]
2714             notifyApi = plugin.l3_rpc_notifier
2715             kargs = [item for item in args]
2716             kargs.append(notifyApi)
2717             target_func(*kargs)
2718
2719     def _test_router_gateway_op_agent(self, notifyApi):
2720         with self.router() as r:
2721             with self.subnet() as s:
2722                 self._set_net_external(s['subnet']['network_id'])
2723                 self._add_external_gateway_to_router(
2724                     r['router']['id'],
2725                     s['subnet']['network_id'])
2726                 self._remove_external_gateway_from_router(
2727                     r['router']['id'],
2728                     s['subnet']['network_id'])
2729                 self.assertEqual(
2730                     2, notifyApi.routers_updated.call_count)
2731
2732     def test_router_gateway_op_agent(self):
2733         self._test_notify_op_agent(self._test_router_gateway_op_agent)
2734
2735     def _test_interfaces_op_agent(self, r, notifyApi):
2736         with self.port() as p:
2737             self._router_interface_action('add',
2738                                           r['router']['id'],
2739                                           None,
2740                                           p['port']['id'])
2741             # clean-up
2742             self._router_interface_action('remove',
2743                                           r['router']['id'],
2744                                           None,
2745                                           p['port']['id'])
2746         self.assertEqual(2, notifyApi.routers_updated.call_count)
2747
2748     def test_interfaces_op_agent(self):
2749         with self.router() as r:
2750             self._test_notify_op_agent(
2751                 self._test_interfaces_op_agent, r)
2752
2753     def _test_floatingips_op_agent(self, notifyApi):
2754         with self.floatingip_with_assoc():
2755             pass
2756         # add gateway, add interface, associate, deletion of floatingip
2757         self.assertEqual(4, notifyApi.routers_updated.call_count)
2758
2759     def test_floatingips_op_agent(self):
2760         self._test_notify_op_agent(self._test_floatingips_op_agent)
2761
2762
2763 class L3BaseForIntTests(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
2764
2765     mock_rescheduling = True
2766
2767     def setUp(self, plugin=None, ext_mgr=None, service_plugins=None):
2768         if not plugin:
2769             plugin = 'neutron.tests.unit.extensions.test_l3.TestL3NatIntPlugin'
2770         # for these tests we need to enable overlapping ips
2771         cfg.CONF.set_default('allow_overlapping_ips', True)
2772         ext_mgr = ext_mgr or L3TestExtensionManager()
2773
2774         if self.mock_rescheduling:
2775             mock.patch('%s._check_router_needs_rescheduling' % plugin,
2776                        new=lambda *a: False).start()
2777
2778         super(L3BaseForIntTests, self).setUp(plugin=plugin, ext_mgr=ext_mgr,
2779                                              service_plugins=service_plugins)
2780
2781         self.setup_notification_driver()
2782
2783
2784 class L3BaseForSepTests(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
2785
2786     def setUp(self, plugin=None, ext_mgr=None):
2787         # the plugin without L3 support
2788         if not plugin:
2789             plugin = 'neutron.tests.unit.extensions.test_l3.TestNoL3NatPlugin'
2790         # the L3 service plugin
2791         l3_plugin = ('neutron.tests.unit.extensions.test_l3.'
2792                      'TestL3NatServicePlugin')
2793         service_plugins = {'l3_plugin_name': l3_plugin}
2794
2795         # for these tests we need to enable overlapping ips
2796         cfg.CONF.set_default('allow_overlapping_ips', True)
2797         if not ext_mgr:
2798             ext_mgr = L3TestExtensionManager()
2799         super(L3BaseForSepTests, self).setUp(plugin=plugin, ext_mgr=ext_mgr,
2800                                              service_plugins=service_plugins)
2801
2802         self.setup_notification_driver()
2803
2804
2805 class L3NatDBIntAgentSchedulingTestCase(L3BaseForIntTests,
2806                                         L3NatTestCaseMixin,
2807                                         test_agent.
2808                                         AgentDBTestMixIn):
2809
2810     """Unit tests for core plugin with L3 routing and scheduling integrated."""
2811
2812     def setUp(self, plugin='neutron.tests.unit.extensions.test_l3.'
2813                            'TestL3NatIntAgentSchedulingPlugin',
2814               ext_mgr=None, service_plugins=None):
2815         self.mock_rescheduling = False
2816         super(L3NatDBIntAgentSchedulingTestCase, self).setUp(
2817             plugin, ext_mgr, service_plugins)
2818         self.adminContext = context.get_admin_context()
2819
2820     def _assert_router_on_agent(self, router_id, agent_host):
2821         plugin = manager.NeutronManager.get_service_plugins().get(
2822             service_constants.L3_ROUTER_NAT)
2823         agents = plugin.list_l3_agents_hosting_router(
2824             self.adminContext, router_id)['agents']
2825         self.assertEqual(len(agents), 1)
2826         self.assertEqual(agents[0]['host'], agent_host)
2827
2828     def test_update_gateway_agent_exists_supporting_network(self):
2829         with self.router() as r, self.subnet() as s1, self.subnet() as s2:
2830             self._set_net_external(s1['subnet']['network_id'])
2831             l3_rpc_cb = l3_rpc.L3RpcCallback()
2832             helpers.register_l3_agent(
2833                 host='host1',
2834                 ext_net_id=s1['subnet']['network_id'])
2835             helpers.register_l3_agent(
2836                 host='host2', internal_only=False,
2837                 ext_net_id=s2['subnet']['network_id'])
2838             l3_rpc_cb.sync_routers(self.adminContext,
2839                                    host='host1')
2840             self._assert_router_on_agent(r['router']['id'], 'host1')
2841
2842             self._add_external_gateway_to_router(
2843                 r['router']['id'],
2844                 s1['subnet']['network_id'])
2845             self._assert_router_on_agent(r['router']['id'], 'host1')
2846
2847             self._set_net_external(s2['subnet']['network_id'])
2848             self._add_external_gateway_to_router(
2849                 r['router']['id'],
2850                 s2['subnet']['network_id'])
2851             self._assert_router_on_agent(r['router']['id'], 'host2')
2852
2853     def test_update_gateway_agent_exists_supporting_multiple_network(self):
2854         with self.router() as r, self.subnet() as s1, self.subnet() as s2:
2855             self._set_net_external(s1['subnet']['network_id'])
2856             l3_rpc_cb = l3_rpc.L3RpcCallback()
2857             helpers.register_l3_agent(
2858                 host='host1',
2859                 ext_net_id=s1['subnet']['network_id'])
2860             helpers.register_l3_agent(
2861                 host='host2', internal_only=False,
2862                 ext_net_id='', ext_bridge='')
2863             l3_rpc_cb.sync_routers(self.adminContext,
2864                                    host='host1')
2865             self._assert_router_on_agent(r['router']['id'], 'host1')
2866
2867             self._add_external_gateway_to_router(
2868                 r['router']['id'],
2869                 s1['subnet']['network_id'])
2870             self._assert_router_on_agent(r['router']['id'], 'host1')
2871
2872             self._set_net_external(s2['subnet']['network_id'])
2873             self._add_external_gateway_to_router(
2874                 r['router']['id'],
2875                 s2['subnet']['network_id'])
2876             self._assert_router_on_agent(r['router']['id'], 'host2')
2877
2878     def test_router_update_gateway_no_eligible_l3_agent(self):
2879         with self.router() as r:
2880             with self.subnet() as s1:
2881                 with self.subnet() as s2:
2882                     self._set_net_external(s1['subnet']['network_id'])
2883                     self._set_net_external(s2['subnet']['network_id'])
2884                     self._add_external_gateway_to_router(
2885                         r['router']['id'],
2886                         s1['subnet']['network_id'],
2887                         expected_code=exc.HTTPBadRequest.code)
2888
2889
2890 class L3RpcCallbackTestCase(base.BaseTestCase):
2891
2892     def setUp(self):
2893         super(L3RpcCallbackTestCase, self).setUp()
2894         self.mock_plugin = mock.patch.object(
2895             l3_rpc.L3RpcCallback,
2896             'plugin', new_callable=mock.PropertyMock).start()
2897         self.mock_l3plugin = mock.patch.object(
2898             l3_rpc.L3RpcCallback,
2899             'l3plugin', new_callable=mock.PropertyMock).start()
2900         self.l3_rpc_cb = l3_rpc.L3RpcCallback()
2901
2902     def test__ensure_host_set_on_port_host_id_none(self):
2903         port = {'id': 'id', portbindings.HOST_ID: 'somehost'}
2904         self.l3_rpc_cb._ensure_host_set_on_port(None, None, port)
2905         self.assertFalse(self.l3_rpc_cb.plugin.update_port.called)
2906
2907     def test__ensure_host_set_on_port_update_on_concurrent_delete(self):
2908         port_id = 'foo_port_id'
2909         port = {
2910             'id': port_id,
2911             'device_owner': DEVICE_OWNER_COMPUTE,
2912             portbindings.HOST_ID: '',
2913             portbindings.VIF_TYPE: portbindings.VIF_TYPE_BINDING_FAILED
2914         }
2915         router_id = 'foo_router_id'
2916         self.l3_rpc_cb.plugin.update_port.side_effect = n_exc.PortNotFound(
2917             port_id=port_id)
2918         with mock.patch.object(l3_rpc.LOG, 'debug') as mock_log:
2919             self.l3_rpc_cb._ensure_host_set_on_port(
2920                 mock.ANY, mock.ANY, port, router_id)
2921         self.l3_rpc_cb.plugin.update_port.assert_called_once_with(
2922             mock.ANY, port_id, {'port': {portbindings.HOST_ID: mock.ANY}})
2923         self.assertTrue(mock_log.call_count)
2924         expected_message = ('Port foo_port_id not found while updating '
2925                             'agent binding for router foo_router_id.')
2926         actual_message = mock_log.call_args[0][0] % mock_log.call_args[0][1]
2927         self.assertEqual(expected_message, actual_message)
2928
2929
2930 class L3AgentDbIntTestCase(L3BaseForIntTests, L3AgentDbTestCaseBase):
2931
2932     """Unit tests for methods called by the L3 agent for
2933     the case where core plugin implements L3 routing.
2934     """
2935
2936     def setUp(self):
2937         super(L3AgentDbIntTestCase, self).setUp()
2938         self.core_plugin = TestL3NatIntPlugin()
2939         self.plugin = self.core_plugin
2940
2941
2942 class L3AgentDbSepTestCase(L3BaseForSepTests, L3AgentDbTestCaseBase):
2943
2944     """Unit tests for methods called by the L3 agent for the
2945     case where separate service plugin implements L3 routing.
2946     """
2947
2948     def setUp(self):
2949         super(L3AgentDbSepTestCase, self).setUp()
2950         self.core_plugin = TestNoL3NatPlugin()
2951         self.plugin = TestL3NatServicePlugin()
2952
2953
2954 class TestL3DbOperationBounds(test_db_base_plugin_v2.DbOperationBoundMixin,
2955                               L3NatTestCaseMixin,
2956                               ml2_base.ML2TestFramework):
2957     def setUp(self):
2958         super(TestL3DbOperationBounds, self).setUp()
2959         ext_mgr = L3TestExtensionManager()
2960         self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
2961
2962     def test_router_list_queries_constant(self):
2963         with self.subnet() as s:
2964             self._set_net_external(s['subnet']['network_id'])
2965
2966             def router_maker():
2967                 ext_info = {'network_id': s['subnet']['network_id']}
2968                 self._create_router(self.fmt, _uuid(),
2969                                     arg_list=('external_gateway_info',),
2970                                     external_gateway_info=ext_info)
2971
2972             self._assert_object_list_queries_constant(router_maker, 'routers')
2973
2974     def test_floatingip_list_queries_constant(self):
2975         with self.floatingip_with_assoc() as flip:
2976             internal_port = self._show('ports', flip['floatingip']['port_id'])
2977             internal_net_id = internal_port['port']['network_id']
2978
2979             def float_maker():
2980                 port = self._make_port(self.fmt, internal_net_id)
2981                 self._make_floatingip(
2982                     self.fmt, flip['floatingip']['floating_network_id'],
2983                     port_id=port['port']['id'])
2984
2985             self._assert_object_list_queries_constant(float_maker,
2986                                                       'floatingips')
2987
2988
2989 class L3NatDBTestCaseMixin(object):
2990     """L3_NAT_dbonly_mixin specific test cases."""
2991
2992     def setUp(self):
2993         super(L3NatDBTestCaseMixin, self).setUp()
2994         plugin = manager.NeutronManager.get_service_plugins()[
2995             service_constants.L3_ROUTER_NAT]
2996         if not isinstance(plugin, l3_db.L3_NAT_dbonly_mixin):
2997             self.skipTest("Plugin is not L3_NAT_dbonly_mixin")
2998
2999     def test_create_router_gateway_fails(self):
3000         """Force _update_router_gw_info failure and see
3001         the exception is propagated.
3002         """
3003
3004         plugin = manager.NeutronManager.get_service_plugins()[
3005             service_constants.L3_ROUTER_NAT]
3006         ctx = context.Context('', 'foo')
3007
3008         class MyException(Exception):
3009             pass
3010
3011         mock.patch.object(plugin, '_update_router_gw_info',
3012                           side_effect=MyException).start()
3013         with self.network() as n:
3014             data = {'router': {
3015                 'name': 'router1', 'admin_state_up': True,
3016                 'tenant_id': ctx.tenant_id,
3017                 'external_gateway_info': {'network_id': n['network']['id']}}}
3018
3019             self.assertRaises(MyException, plugin.create_router, ctx, data)
3020             # Verify router doesn't persist on failure
3021             routers = plugin.get_routers(ctx)
3022             self.assertEqual(0, len(routers))
3023
3024
3025 class L3NatDBIntTestCase(L3BaseForIntTests, L3NatTestCaseBase,
3026                          L3NatDBTestCaseMixin):
3027
3028     """Unit tests for core plugin with L3 routing integrated."""
3029     pass
3030
3031
3032 class L3NatDBSepTestCase(L3BaseForSepTests, L3NatTestCaseBase,
3033                          L3NatDBTestCaseMixin):
3034
3035     """Unit tests for a separate L3 routing service plugin."""
3036
3037     def test_port_deletion_prevention_handles_missing_port(self):
3038         pl = manager.NeutronManager.get_service_plugins().get(
3039             service_constants.L3_ROUTER_NAT)
3040         self.assertIsNone(
3041             pl.prevent_l3_port_deletion(context.get_admin_context(), 'fakeid')
3042         )