def update_lswitch(cluster, lswitch_id, display_name,
tenant_id=None, **kwargs):
uri = nsxlib._build_uri_path(LSWITCH_RESOURCE, resource_id=lswitch_id)
- lswitch_obj = {"display_name": utils.check_and_truncate(display_name),
- "tags": utils.get_tags(os_tid=tenant_id)}
- if "tags" in kwargs:
- lswitch_obj["tags"].extend(kwargs["tags"])
+ lswitch_obj = {"display_name": utils.check_and_truncate(display_name)}
+ # NOTE: tag update will not 'merge' existing tags with new ones.
+ tags = []
+ if tenant_id:
+ tags = utils.get_tags(os_tid=tenant_id)
+ # The 'tags' kwarg might existing and be None
+ tags.extend(kwargs.get('tags') or [])
+ if tags:
+ lswitch_obj['tags'] = tags
try:
return nsxlib.do_request(HTTP_PUT, uri, json.dumps(lswitch_obj),
cluster=cluster)
self._process_network_queue_mapping(context, net, net_queue_id)
self._process_l3_update(context, net, network['network'])
self._extend_network_dict_provider(context, net)
+ # If provided, update port name on backend; treat backend failures as
+ # not critical (log error, but do not raise)
+ if 'name' in network['network']:
+ # in case of chained switches update name only for the first one
+ nsx_switch_ids = nsx_utils.get_nsx_switch_ids(
+ context.session, self.cluster, id)
+ if not nsx_switch_ids or len(nsx_switch_ids) < 1:
+ LOG.warn(_("Unable to find NSX mappings for neutron "
+ "network:%s"), id)
+ try:
+ switchlib.update_lswitch(self.cluster,
+ nsx_switch_ids[0],
+ network['network']['name'])
+ except api_exc.NsxApiException as e:
+ LOG.warn(_("Logical switch update on NSX backend failed. "
+ "Neutron network id:%(net_id)s; "
+ "NSX lswitch id:%(lswitch_id)s;"
+ "Error:%(error)s"),
+ {'net_id': id, 'lswitch_id': nsx_switch_ids[0],
+ 'error': e})
+
return net
def create_port(self, context, port):
self.assertEqual(second_ls_tags['quantum_net_id'],
network_id)
- def test_update_lswitch(self):
- new_name = 'new-name'
- new_tags = [{'scope': 'new_tag', 'tag': 'xxx'}]
+ def _test_update_lswitch(self, tenant_id, name, tags):
transport_zones_config = [{'zone_uuid': _uuid(),
'transport_type': 'stt'}]
lswitch = switchlib.create_lswitch(self.fake_cluster,
'fake-switch',
transport_zones_config)
switchlib.update_lswitch(self.fake_cluster, lswitch['uuid'],
- new_name, tags=new_tags)
+ name, tenant_id=tenant_id, tags=tags)
res_lswitch = switchlib.get_lswitches(self.fake_cluster,
lswitch['uuid'])
self.assertEqual(len(res_lswitch), 1)
- self.assertEqual(res_lswitch[0]['display_name'], new_name)
+ self.assertEqual(res_lswitch[0]['display_name'], name)
+ if not tags:
+ # no need to validate tags
+ return
switch_tags = self._build_tag_dict(res_lswitch[0]['tags'])
- self.assertIn('new_tag', switch_tags)
- self.assertEqual(switch_tags['new_tag'], 'xxx')
+ for tag in tags:
+ self.assertIn(tag['scope'], switch_tags)
+ self.assertEqual(tag['tag'], switch_tags[tag['scope']])
+
+ def test_update_lswitch(self):
+ self._test_update_lswitch(None, 'new-name',
+ [{'scope': 'new_tag', 'tag': 'xxx'}])
+
+ def test_update_lswitch_no_tags(self):
+ self._test_update_lswitch(None, 'new-name', None)
+
+ def test_update_lswitch_tenant_id(self):
+ self._test_update_lswitch('whatever', 'new-name', None)
def test_update_non_existing_lswitch_raises(self):
self.assertRaises(exceptions.NetworkNotFound,
context.get_admin_context(),
net['network']['id'], data)
+ def test_update_network_with_name_calls_nsx(self):
+ with mock.patch.object(
+ nsxlib.switch, 'update_lswitch') as update_lswitch_mock:
+ # don't worry about deleting this network, do not use
+ # context manager
+ ctx = context.get_admin_context()
+ plugin = manager.NeutronManager.get_plugin()
+ net = plugin.create_network(
+ ctx, {'network': {'name': 'xxx',
+ 'admin_state_up': True,
+ 'shared': False,
+ 'port_security_enabled': True}})
+ plugin.update_network(ctx, net['id'],
+ {'network': {'name': 'yyy'}})
+ update_lswitch_mock.assert_called_once_with(
+ mock.ANY, mock.ANY, 'yyy')
+
class SecurityGroupsTestCase(ext_sg.SecurityGroupDBTestCase):