# License for the specific language governing permissions and limitations
# under the License.
-import random
-
import netaddr
from oslo.config import cfg
+from oslo.db import exception as db_exc
from oslo.utils import excutils
from sqlalchemy import and_
from sqlalchemy import event
from neutron.common import constants
from neutron.common import exceptions as n_exc
from neutron.common import ipv6_utils
+from neutron.common import utils
from neutron import context as ctx
from neutron.db import common_db_mixin
from neutron.db import models_v2
return context.session.query(models_v2.Subnet).all()
@staticmethod
- def _generate_mac(context, network_id):
- base_mac = cfg.CONF.base_mac.split(':')
- max_retries = cfg.CONF.mac_generation_retries
- for i in range(max_retries):
- mac = [int(base_mac[0], 16), int(base_mac[1], 16),
- int(base_mac[2], 16), random.randint(0x00, 0xff),
- random.randint(0x00, 0xff), random.randint(0x00, 0xff)]
- if base_mac[3] != '00':
- mac[3] = int(base_mac[3], 16)
- mac_address = ':'.join(map(lambda x: "%02x" % x, mac))
- if NeutronDbPluginV2._check_unique_mac(context, network_id,
- mac_address):
- LOG.debug("Generated mac for network %(network_id)s "
- "is %(mac_address)s",
- {'network_id': network_id,
- 'mac_address': mac_address})
- return mac_address
- else:
- LOG.debug("Generated mac %(mac_address)s exists. Remaining "
- "attempts %(max_retries)s.",
- {'mac_address': mac_address,
- 'max_retries': max_retries - (i + 1)})
- LOG.error(_LE("Unable to generate mac address after %s attempts"),
- max_retries)
- raise n_exc.MacAddressGenerationFailure(net_id=network_id)
-
- @staticmethod
- def _check_unique_mac(context, network_id, mac_address):
- mac_qry = context.session.query(models_v2.Port)
- try:
- mac_qry.filter_by(network_id=network_id,
- mac_address=mac_address).one()
- except exc.NoResultFound:
- return True
- return False
+ def _generate_mac():
+ return utils.get_random_mac(cfg.CONF.base_mac.split(':'))
@staticmethod
def _delete_ip_allocation(context, network_id, subnet_id, ip_address):
def create_port_bulk(self, context, ports):
return self._create_bulk('port', context, ports)
+ def _create_port_with_mac(self, context, network_id, port_data,
+ mac_address, nested=False):
+ try:
+ with context.session.begin(subtransactions=True, nested=nested):
+ db_port = models_v2.Port(mac_address=mac_address, **port_data)
+ context.session.add(db_port)
+ return db_port
+ except db_exc.DBDuplicateEntry:
+ raise n_exc.MacAddressInUse(net_id=network_id, mac=mac_address)
+
+ def _create_port(self, context, network_id, port_data):
+ max_retries = cfg.CONF.mac_generation_retries
+ for i in range(max_retries):
+ mac = self._generate_mac()
+ try:
+ # nested = True frames an operation that may potentially fail
+ # within a transaction, so that it can be rolled back to the
+ # point before its failure while maintaining the enclosing
+ # transaction
+ return self._create_port_with_mac(
+ context, network_id, port_data, mac, nested=True)
+ except n_exc.MacAddressInUse:
+ LOG.debug('Generated mac %(mac_address)s exists on '
+ 'network %(network_id)s',
+ {'mac_address': mac, 'network_id': network_id})
+
+ LOG.error(_LE("Unable to generate mac address after %s attempts"),
+ max_retries)
+ raise n_exc.MacAddressGenerationFailure(net_id=network_id)
+
def create_port(self, context, port):
p = port['port']
port_id = p.get('id') or uuidutils.generate_uuid()
self._enforce_device_owner_not_router_intf_or_device_id(context, p,
tenant_id)
+ port_data = dict(tenant_id=tenant_id,
+ name=p['name'],
+ id=port_id,
+ network_id=network_id,
+ admin_state_up=p['admin_state_up'],
+ status=p.get('status', constants.PORT_STATUS_ACTIVE),
+ device_id=p['device_id'],
+ device_owner=p['device_owner'])
+
with context.session.begin(subtransactions=True):
# Ensure that the network exists.
self._get_network(context, network_id)
- # Ensure that a MAC address is defined and it is unique on the
- # network
+ # Create the port
if p['mac_address'] is attributes.ATTR_NOT_SPECIFIED:
- #Note(scollins) Add the generated mac_address to the port,
- #since _allocate_ips_for_port will need the mac when
- #calculating an EUI-64 address for a v6 subnet
- p['mac_address'] = NeutronDbPluginV2._generate_mac(context,
- network_id)
- else:
- # Ensure that the mac on the network is unique
- if not NeutronDbPluginV2._check_unique_mac(context,
- network_id,
- p['mac_address']):
- raise n_exc.MacAddressInUse(net_id=network_id,
- mac=p['mac_address'])
-
- if 'status' not in p:
- status = constants.PORT_STATUS_ACTIVE
+ db_port = self._create_port(context, network_id, port_data)
+ p['mac_address'] = db_port['mac_address']
else:
- status = p['status']
-
- db_port = models_v2.Port(tenant_id=tenant_id,
- name=p['name'],
- id=port_id,
- network_id=network_id,
- mac_address=p['mac_address'],
- admin_state_up=p['admin_state_up'],
- status=status,
- device_id=p['device_id'],
- device_owner=p['device_owner'])
- context.session.add(db_port)
+ db_port = self._create_port_with_mac(
+ context, network_id, port_data, p['mac_address'])
# Update the IP's for the port
ips = self._allocate_ips_for_port(context, port)
# limitations under the License.
from neutron import context
+from neutron.db import db_base_plugin_v2
from neutron.db import models_v2
from neutron.extensions import portbindings
from neutron.openstack.common import uuidutils
self.ctx.session.add(models_v2.Network(id=network_id))
def _setup_neutron_port(self, network_id, port_id):
+ mac_address = db_base_plugin_v2.NeutronDbPluginV2._generate_mac()
with self.ctx.session.begin(subtransactions=True):
port = models_v2.Port(id=port_id,
network_id=network_id,
- mac_address='foo_mac_address',
+ mac_address=mac_address,
admin_state_up=True,
status='DOWN',
device_id='',
device_owner='')
self.ctx.session.add(port)
+ return port
def _setup_neutron_portbinding(self, port_id, vif_type, host):
with self.ctx.session.begin(subtransactions=True):
def test_get_port_from_device_mac(self):
network_id = 'foo-network-id'
port_id = 'foo-port-id'
- mac_address = 'foo_mac_address'
self._setup_neutron_network(network_id)
- self._setup_neutron_port(network_id, port_id)
+ port = self._setup_neutron_port(network_id, port_id)
- port = ml2_db.get_port_from_device_mac(mac_address)
- self.assertEqual(port_id, port.id)
+ observed_port = ml2_db.get_port_from_device_mac(port['mac_address'])
+ self.assertEqual(port_id, observed_port.id)
def test_get_locked_port_and_binding(self):
network_id = 'foo-network-id'
def test_mac_exhaustion(self):
# rather than actually consuming all MAC (would take a LONG time)
- # we just raise the exception that would result.
- @staticmethod
- def fake_gen_mac(context, net_id):
- raise n_exc.MacAddressGenerationFailure(net_id=net_id)
-
- with mock.patch.object(neutron.db.db_base_plugin_v2.NeutronDbPluginV2,
- '_generate_mac', new=fake_gen_mac):
- res = self._create_network(fmt=self.fmt, name='net1',
- admin_state_up=True)
- network = self.deserialize(self.fmt, res)
- net_id = network['network']['id']
+ # we try to allocate an already allocated mac address
+ cfg.CONF.set_override('mac_generation_retries', 3)
+
+ res = self._create_network(fmt=self.fmt, name='net1',
+ admin_state_up=True)
+ network = self.deserialize(self.fmt, res)
+ net_id = network['network']['id']
+
+ error = n_exc.MacAddressInUse(net_id=net_id, mac='00:11:22:33:44:55')
+ with mock.patch.object(
+ neutron.db.db_base_plugin_v2.NeutronDbPluginV2,
+ '_create_port_with_mac', side_effect=error) as create_mock:
res = self._create_port(self.fmt, net_id=net_id)
self.assertEqual(res.status_int,
webob.exc.HTTPServiceUnavailable.code)
+ self.assertEqual(3, create_mock.call_count)
def test_requested_duplicate_ip(self):
with self.subnet() as subnet: