"get_port:binding:host_id": "rule:admin_only",
"get_port:binding:profile": "rule:admin_only",
"update_port": "rule:admin_or_owner or rule:context_is_advsvc",
+ "update_port:mac_address": "rule:admin_only or rule:context_is_advsvc",
"update_port:fixed_ips": "rule:admin_or_network_owner or rule:context_is_advsvc",
"update_port:port_security_enabled": "rule:admin_or_network_owner or rule:context_is_advsvc",
"update_port:binding:host_id": "rule:admin_only",
'default': True,
'convert_to': convert_to_boolean,
'is_visible': True},
- 'mac_address': {'allow_post': True, 'allow_put': False,
+ 'mac_address': {'allow_post': True, 'allow_put': True,
'default': ATTR_NOT_SPECIFIED,
'validate': {'type:mac_address': None},
'enforce_policy': True,
"device %(device_id)s.")
+class PortBound(InUse):
+ message = _("Unable to complete operation on port %(port_id)s, "
+ "port is already bound, port type: %(vif_type)s, "
+ "old_mac %(old_mac)s, new_mac %(new_mac)s")
+
+
class MacAddressInUse(InUse):
message = _("Unable to complete operation for network %(net_id)s. "
"The mac address %(mac)s is in use.")
message = _("The allocation pool %(pool)s is not valid.")
+class UnsupportedPortDeviceOwner(Conflict):
+ message = _("Operation %(op)s is not supported for device_owner "
+ "%(device_owner)s on port %(port_id)s.")
+
+
class OverlappingAllocationPools(Conflict):
message = _("Found overlapping allocation pools:"
"%(pool_1)s %(pool_2)s for subnet %(subnet_cidr)s.")
return self._get_collection_count(context, models_v2.Subnet,
filters=filters)
+ def _check_mac_addr_update(self, context, port, new_mac, device_owner):
+ if (device_owner and device_owner.startswith('network:')):
+ raise n_exc.UnsupportedPortDeviceOwner(
+ op=_("mac address update"), port_id=id,
+ device_owner=device_owner)
+
def create_port_bulk(self, context, ports):
return self._create_bulk('port', context, ports)
self._enforce_device_owner_not_router_intf_or_device_id(
context, p, port['tenant_id'], port)
+ new_mac = p.get('mac_address')
+ if new_mac and new_mac != port['mac_address']:
+ self._check_mac_addr_update(
+ context, port, new_mac, current_device_owner)
+
# Check if the IPs need to be updated
+ network_id = port['network_id']
if 'fixed_ips' in p:
changed_ips = True
original = self._make_port_dict(port, process_extensions=False)
added_ips, prev_ips = self._update_ips_for_port(
- context, port["network_id"], id,
+ context, network_id, id,
original["fixed_ips"], p['fixed_ips'],
original['mac_address'], port['device_owner'])
# Update ips if necessary
for ip in added_ips:
NeutronDbPluginV2._store_ip_allocation(
- context, ip['ip_address'], port['network_id'],
+ context, ip['ip_address'], network_id,
ip['subnet_id'], port.id)
- # Remove all attributes in p which are not in the port DB model
- # and then update the port
- port.update(self._filter_non_model_columns(p, models_v2.Port))
+ # Remove all attributes in p which are not in the port DB model
+ # and then update the port
+ try:
+ port.update(self._filter_non_model_columns(p, models_v2.Port))
+ context.session.flush()
+ except db_exc.DBDuplicateEntry:
+ raise n_exc.MacAddressInUse(net_id=network_id, mac=new_mac)
result = self._make_port_dict(port)
# Keep up with fields that changed
"""
need_notify = False
if (original_port['fixed_ips'] != updated_port['fixed_ips'] or
+ original_port['mac_address'] != updated_port['mac_address'] or
not utils.compare_elements(
original_port.get(ext_sg.SECURITYGROUPS),
updated_port.get(ext_sg.SECURITYGROUPS))):
"""Implentation of Brocade ML2 Mechanism driver for ML2 Plugin."""
from oslo.config import cfg
+from oslo.utils import excutils
from oslo.utils import importutils
from neutron.i18n import _LE, _LI
context = mech_context._plugin_context
- network = brocade_db.get_network(context, network_id)
- vlan_id = network['vlan']
-
- interface_mac = port['mac_address']
-
- # convert mac format: xx:xx:xx:xx:xx:xx -> xxxx.xxxx.xxxx
- mac = self.mac_reformat_62to34(interface_mac)
- try:
- self._driver.associate_mac_to_network(self._switch['address'],
- self._switch['username'],
- self._switch['password'],
- vlan_id,
- mac)
- except Exception:
- LOG.exception(
- _LE("Brocade NOS driver: failed to associate mac %s"),
- interface_mac)
- raise Exception(
- _("Brocade switch exception: create_port_postcommit failed"))
+ self._associate_mac_to_net(context, network_id, port['mac_address'],
+ "create_port_postcommit")
LOG.info(
_LI("created port (postcommit): port_id=%(port_id)s"
context = mech_context._plugin_context
- network = brocade_db.get_network(context, network_id)
- vlan_id = network['vlan']
-
- interface_mac = port['mac_address']
-
- # convert mac format: xx:xx:xx:xx:xx:xx -> xxxx.xxxx.xxxx
- mac = self.mac_reformat_62to34(interface_mac)
- try:
- self._driver.dissociate_mac_from_network(
- self._switch['address'],
- self._switch['username'],
- self._switch['password'],
- vlan_id,
- mac)
- except Exception:
- LOG.exception(
- _LE("Brocade NOS driver: failed to dissociate MAC %s"),
- interface_mac)
- raise Exception(
- _("Brocade switch exception, delete_port_postcommit failed"))
+ self._dissociate_mac_from_net(context, network_id, port['mac_address'],
+ "delete_port_postcommit")
LOG.info(
_LI("delete port (postcommit): port_id=%(port_id)s"
LOG.debug("update_port_precommit(self: called")
def update_port_postcommit(self, mech_context):
- """Noop now, it is left here for future."""
+ """If mac changes, update association to network."""
+
LOG.debug("update_port_postcommit: called")
+ port = mech_context.current
+ old_port = mech_context.original
+ if port['mac_address'] == old_port['mac_address']:
+ return
+ port_id = port['id']
+ network_id = port['network_id']
+ tenant_id = port['tenant_id']
+
+ context = mech_context._plugin_context
+
+ self._dissociate_mac_from_net(context, network_id,
+ old_port['mac_address'],
+ "update_port_postcommit")
+ self._associate_mac_to_net(context, network_id, port['mac_address'],
+ "update_port_postcommit")
+
+ LOG.info(
+ _LI("update port (postcommit): port_id=%(port_id)s"
+ " network_id=%(network_id)s tenant_id=%(tenant_id)s."),
+ {'port_id': port_id,
+ 'network_id': network_id, 'tenant_id': tenant_id})
def create_subnet_precommit(self, mech_context):
"""Noop now, it is left here for future."""
"""Noop now, it is left here for future."""
LOG.debug("update_subnet_postcommit: called")
+ def _associate_mac_to_net(self, context, network_id, interface_mac, op):
+ network = brocade_db.get_network(context, network_id)
+ vlan_id = network['vlan']
+
+ # convert mac format: xx:xx:xx:xx:xx:xx -> xxxx.xxxx.xxxx
+ mac = self.mac_reformat_62to34(interface_mac)
+ try:
+ self._driver.associate_mac_to_network(self._switch['address'],
+ self._switch['username'],
+ self._switch['password'],
+ vlan_id,
+ mac)
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ LOG.exception(
+ _LE("Brocade NOS driver: failed to associate mac %s"),
+ interface_mac)
+
+ def _dissociate_mac_from_net(self, context, network_id, interface_mac, op):
+
+ network = brocade_db.get_network(context, network_id)
+ vlan_id = network['vlan']
+
+ # convert mac format: xx:xx:xx:xx:xx:xx -> xxxx.xxxx.xxxx
+ mac = self.mac_reformat_62to34(interface_mac)
+ try:
+ self._driver.dissociate_mac_from_network(
+ self._switch['address'],
+ self._switch['username'],
+ self._switch['password'],
+ vlan_id,
+ mac)
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ LOG.exception(
+ _LE("Brocade NOS driver: failed to dissociate MAC %s"),
+ interface_mac)
+
@staticmethod
def mac_reformat_62to34(interface_mac):
"""Transform MAC address format.
from neutron.db import api as db_api
from neutron.i18n import _LW
from neutron.openstack.common import log as logging
+from neutron.plugins.ml2.common import exceptions as ml2_exc
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2.drivers.l2pop import config # noqa
from neutron.plugins.ml2.drivers.l2pop import db as l2pop_db
port = context.current
orig = context.original
+ if (orig['mac_address'] != port['mac_address'] and
+ context.status == const.PORT_STATUS_ACTIVE):
+ LOG.warning(_LW("unable to modify mac_address of ACTIVE port "
+ "%s"), port['id'])
+ raise ml2_exc.MechansimDriverError(method='update_port_postcommit')
diff_ips = self._get_diff_ips(orig, port)
if diff_ips:
self._fixed_ips_changed(context, orig, port, diff_ips)
if (attributes.is_attr_set(host) and binding.host != host):
return mech_context.current
+ def _check_mac_update_allowed(self, orig_port, port, binding):
+ unplugged_types = (portbindings.VIF_TYPE_BINDING_FAILED,
+ portbindings.VIF_TYPE_UNBOUND)
+ new_mac = port.get('mac_address')
+ mac_change = (new_mac is not None and
+ orig_port['mac_address'] != new_mac)
+ if (mac_change and binding.vif_type not in unplugged_types):
+ raise exc.PortBound(port_id=orig_port['id'],
+ vif_type=binding.vif_type,
+ old_mac=orig_port['mac_address'],
+ new_mac=port['mac_address'])
+ return mac_change
+
def _process_port_binding(self, mech_context, attrs):
binding = mech_context._binding
port = mech_context.current
def update_port(self, context, id, port):
attrs = port['port']
need_port_update_notify = False
+ l3plugin = manager.NeutronManager.get_service_plugins().get(
+ service_constants.L3_ROUTER_NAT)
+ is_dvr_enabled = utils.is_extension_supported(
+ l3plugin, const.L3_DISTRIBUTED_EXT_ALIAS)
session = context.session
port_db, binding = db.get_locked_port_and_binding(session, id)
if not port_db:
raise exc.PortNotFound(port_id=id)
+ mac_address_updated = self._check_mac_update_allowed(
+ port_db, port, binding)
+ need_port_update_notify |= mac_address_updated
original_port = self._make_port_dict(port_db)
updated_port = super(Ml2Plugin, self).update_port(context, id,
port)
mech_context, attrs)
self.mechanism_manager.update_port_precommit(mech_context)
- # Notification must be sent after the above transaction is complete
+ # Notifications must be sent after the above transaction is complete
+ if mac_address_updated and l3plugin and is_dvr_enabled:
+ # NOTE: "add" actually does a 'replace' operation
+ l3plugin.dvr_vmarp_table_update(context, updated_port, "add")
self._notify_l3_agent_new_port(context, new_host_port)
# TODO(apech) - handle errors raised by update_port, potentially
self._assertExpectedHTTP(result.status_int,
c_exc.NexusMissingRequiredFields)
+ def test_update_port_mac(self):
+ # REVISIT: test passes, but is back-end OK?
+ host_arg = {
+ portbindings.HOST_ID: COMP_HOST_NAME,
+ 'device_id': DEVICE_ID_1,
+ }
+ arg_list = (portbindings.HOST_ID, 'device_id',)
+ self.check_update_port_mac(host_arg=host_arg, arg_list=arg_list)
+
class TestCiscoNetworksV2(CiscoML2MechanismTestCase,
test_ml2_plugin.TestMl2NetworksV2):
pnet.SEGMENTATION_ID,),
**net_arg)
+ net_arg = {pnet.NETWORK_TYPE: 'flat',
+ pnet.PHYSICAL_NETWORK: 'noagent'}
+ self._network3 = self._make_network(self.fmt, 'net3', True,
+ arg_list=(pnet.NETWORK_TYPE,
+ pnet.PHYSICAL_NETWORK,),
+ **net_arg)
+
notifier_patch = mock.patch(NOTIFIER)
notifier_patch.start()
self.mock_fanout.assert_any_call(
mock.ANY, 'remove_fdb_entries', expected)
+ def test_mac_addr_changed(self):
+ self._register_ml2_agents()
+
+ with self.subnet(network=self._network) as subnet:
+ host_arg = {portbindings.HOST_ID: HOST + '_5'}
+ with self.port(subnet=subnet,
+ device_owner=DEVICE_OWNER_COMPUTE,
+ arg_list=(portbindings.HOST_ID,),
+ **host_arg) as port1:
+ p1 = port1['port']
+
+ self.mock_fanout.reset_mock()
+ device = 'tap' + p1['id']
+
+ old_mac = p1['mac_address']
+ mac = old_mac.split(':')
+ mac[5] = '01' if mac[5] != '01' else '00'
+ new_mac = ':'.join(mac)
+ data = {'port': {'mac_address': new_mac,
+ portbindings.HOST_ID: HOST}}
+ req = self.new_update_request('ports', data, p1['id'])
+ res = self.deserialize(self.fmt, req.get_response(self.api))
+ self.assertIn('port', res)
+ self.assertEqual(new_mac, res['port']['mac_address'])
+
+ # port was not bound before, so no fdb call expected yet
+ self.assertFalse(self.mock_fanout.called)
+
+ self.callbacks.update_device_up(self.adminContext,
+ agent_id=HOST,
+ device=device)
+
+ self.assertEqual(1, self.mock_fanout.call_count)
+ add_expected = {
+ p1['network_id']: {
+ 'segment_id': 1,
+ 'network_type': 'vxlan',
+ 'ports': {
+ '20.0.0.1': [
+ l2pop_rpc.PortInfo('00:00:00:00:00:00',
+ '0.0.0.0'),
+ l2pop_rpc.PortInfo(new_mac, '10.0.0.2')
+ ]
+ }
+ }
+ }
+ self.mock_fanout.assert_called_with(
+ mock.ANY, 'add_fdb_entries', add_expected)
+
def test_fixed_ips_changed(self):
self._register_ml2_agents()
PLUGIN_NAME = 'neutron.plugins.ml2.plugin.Ml2Plugin'
+DEVICE_OWNER_COMPUTE = 'compute:None'
+HOST = 'fake_host'
+
class Ml2PluginConf(object):
"""Plugin configuration shared across the unit and functional tests.
self.assertEqual('DOWN', port['port']['status'])
self.assertEqual('DOWN', self.port_create_status)
+ def test_update_port_mac(self):
+ self.check_update_port_mac(
+ host_arg={portbindings.HOST_ID: HOST},
+ arg_list=(portbindings.HOST_ID,))
+
def test_update_non_existent_port(self):
ctx = context.get_admin_context()
plugin = manager.NeutronManager.get_plugin()
plugin.notifier = mock.Mock()
plugin._get_host_port_if_changed = mock.Mock(
return_value=new_host_port)
+ plugin._check_mac_update_allowed = mock.Mock(return_value=True)
plugin._notify_l3_agent_new_port = mock.Mock()
plugin._notify_l3_agent_new_port.side_effect = (
with contextlib.nested(
mock.patch.object(ml2_plugin.Ml2Plugin, '__init__'),
mock.patch.object(base_plugin.NeutronDbPluginV2, 'update_port'),
- ) as (init, super_update_port):
+ mock.patch.object(manager.NeutronManager, 'get_service_plugins'),
+ ) as (init, super_update_port, get_service_plugins):
init.return_value = None
+ l3plugin = mock.Mock()
+ l3plugin.supported_extension_aliases = [
+ constants.L3_DISTRIBUTED_EXT_ALIAS,
+ ]
+ get_service_plugins.return_value = {
+ service_constants.L3_ROUTER_NAT: l3plugin,
+ }
new_host_port = mock.Mock()
plugin = self._create_plugin_for_create_update_port(new_host_port)
plugin._notify_l3_agent_new_port.assert_called_once_with(
self.context, new_host_port)
+ l3plugin.dvr_vmarp_table_update.assert_called_once_with(
+ self.context, mock.ANY, "add")
def test_vmarp_table_update_outside_of_delete_transaction(self):
l3plugin = mock.Mock()
self.skipTest("This method tests private method of "
"which contrail isn't using")
+ def test_update_port_mac_bad_owner(self):
+ self.check_update_port_mac(
+ device_owner='network:router',
+ expected_status=webob.exc.HTTPConflict.code,
+ expected_error='ContrailConflictError')
+
+ def test_update_port_mac_used(self):
+ self.check_update_port_mac_used(expected_error='ContrailConflictError')
+
class TestContrailSecurityGroups(test_sg.TestSecurityGroups,
ContrailPluginTestCase):
DB_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
+DEVICE_OWNER_COMPUTE = 'compute:None'
+DEVICE_OWNER_NOT_COMPUTE = constants.DEVICE_OWNER_DHCP
+
def optional_ctx(obj, fallback):
if not obj:
self.assertEqual(res['port']['admin_state_up'],
data['port']['admin_state_up'])
+ def update_port_mac(self, port, updated_fixed_ips=None):
+ orig_mac = port['mac_address']
+ mac = orig_mac.split(':')
+ mac[5] = '01' if mac[5] != '01' else '00'
+ new_mac = ':'.join(mac)
+ data = {'port': {'mac_address': new_mac}}
+ if updated_fixed_ips:
+ data['port']['fixed_ips'] = updated_fixed_ips
+ req = self.new_update_request('ports', data, port['id'])
+ return req.get_response(self.api), new_mac
+
+ def _check_v6_auto_address_address(self, port, subnet):
+ if ipv6_utils.is_auto_address_subnet(subnet['subnet']):
+ port_mac = port['port']['mac_address']
+ subnet_cidr = subnet['subnet']['cidr']
+ eui_addr = str(ipv6_utils.get_ipv6_addr_by_EUI64(subnet_cidr,
+ port_mac))
+ self.assertEqual(port['port']['fixed_ips'][0]['ip_address'],
+ eui_addr)
+
+ def check_update_port_mac(
+ self, expected_status=webob.exc.HTTPOk.code,
+ expected_error='StateInvalid', subnet=None,
+ device_owner=DEVICE_OWNER_COMPUTE, updated_fixed_ips=None,
+ host_arg={}, arg_list=[]):
+ with self.port(device_owner=device_owner, subnet=subnet,
+ arg_list=arg_list, **host_arg) as port:
+ self.assertIn('mac_address', port['port'])
+ res, new_mac = self.update_port_mac(
+ port['port'], updated_fixed_ips=updated_fixed_ips)
+ self.assertEqual(expected_status, res.status_int)
+ if expected_status == webob.exc.HTTPOk.code:
+ result = self.deserialize(self.fmt, res)
+ self.assertIn('port', result)
+ self.assertEqual(new_mac, result['port']['mac_address'])
+ if subnet and subnet['subnet']['ip_version'] == 6:
+ self._check_v6_auto_address_address(port, subnet)
+ else:
+ error = self.deserialize(self.fmt, res)
+ self.assertEqual(expected_error,
+ error['NeutronError']['type'])
+
+ def test_update_port_mac(self):
+ self.check_update_port_mac()
+ # sub-classes for plugins/drivers that support mac address update
+ # override this method
+
+ def test_update_port_mac_ip(self):
+ with self.subnet() as subnet:
+ updated_fixed_ips = [{'subnet_id': subnet['subnet']['id'],
+ 'ip_address': '10.0.0.3'}]
+ self.check_update_port_mac(subnet=subnet,
+ updated_fixed_ips=updated_fixed_ips)
+
+ def test_update_port_mac_v6_slaac(self):
+ with self.subnet(gateway_ip='fe80::1',
+ cidr='2607:f0d0:1002:51::/64',
+ ip_version=6,
+ ipv6_address_mode=constants.IPV6_SLAAC) as subnet:
+ self.assertTrue(
+ ipv6_utils.is_auto_address_subnet(subnet['subnet']))
+ self.check_update_port_mac(subnet=subnet)
+
+ def test_update_port_mac_bad_owner(self):
+ self.check_update_port_mac(
+ device_owner=DEVICE_OWNER_NOT_COMPUTE,
+ expected_status=webob.exc.HTTPConflict.code,
+ expected_error='UnsupportedPortDeviceOwner')
+
+ def check_update_port_mac_used(self, expected_error='MacAddressInUse'):
+ with self.subnet() as subnet:
+ with self.port(subnet=subnet) as port:
+ with self.port(subnet=subnet) as port2:
+ self.assertIn('mac_address', port['port'])
+ new_mac = port2['port']['mac_address']
+ data = {'port': {'mac_address': new_mac}}
+ req = self.new_update_request('ports', data,
+ port['port']['id'])
+ res = req.get_response(self.api)
+ self.assertEqual(webob.exc.HTTPConflict.code,
+ res.status_int)
+ error = self.deserialize(self.fmt, res)
+ self.assertEqual(expected_error,
+ error['NeutronError']['type'])
+
+ def test_update_port_mac_used(self):
+ self.check_update_port_mac_used()
+
def test_update_port_not_admin(self):
res = self._create_network(self.fmt, 'net1', True,
tenant_id='not_admin',