from neutron.api.v2 import attributes as attr
from neutron.common import exceptions as nexception
+from oslo.config import cfg
+
+allowed_address_pair_opts = [
+ #TODO(limao): use quota framework when it support quota for attributes
+ cfg.IntOpt('max_allowed_address_pair', default=10,
+ help=_("Maximum number of allowed address pairs")),
+]
+
+cfg.CONF.register_opts(allowed_address_pair_opts)
class AllowedAddressPairsMissingIP(nexception.InvalidInput):
"mac_address %(mac_address)s ip_address %(ip_address)s.")
+class AllowedAddressPairExhausted(nexception.BadRequest):
+ message = _("The number of allowed address pair "
+ "exceeds the maximum %(quota)s.")
+
+
def _validate_allowed_address_pairs(address_pairs, valid_values=None):
unique_check = {}
+ if len(address_pairs) > cfg.CONF.max_allowed_address_pair:
+ raise AllowedAddressPairExhausted(
+ quota=cfg.CONF.max_allowed_address_pair)
+
for address_pair in address_pairs:
# mac_address is optional, if not set we use the mac on the port
if 'mac_address' in address_pair:
from neutron.extensions import portsecurity as psec
from neutron import manager
from neutron.tests.unit import test_db_plugin
+from oslo.config import cfg
+
DB_PLUGIN_KLASS = ('neutron.tests.unit.test_extension_allowedaddresspairs.'
'AllowedAddressPairTestPlugin')
'ip_address': '10.0.0.1'}]
self._create_port_with_address_pairs(address_pairs, 400)
+ def test_more_than_max_allowed_address_pair(self):
+ cfg.CONF.set_default('max_allowed_address_pair', 3)
+ address_pairs = [{'mac_address': '00:00:00:00:00:01',
+ 'ip_address': '10.0.0.1'},
+ {'mac_address': '00:00:00:00:00:02',
+ 'ip_address': '10.0.0.2'},
+ {'mac_address': '00:00:00:00:00:03',
+ 'ip_address': '10.0.0.3'},
+ {'mac_address': '00:00:00:00:00:04',
+ 'ip_address': '10.0.0.4'}]
+ self._create_port_with_address_pairs(address_pairs, 400)
+
+ def test_equal_to_max_allowed_address_pair(self):
+ cfg.CONF.set_default('max_allowed_address_pair', 3)
+ address_pairs = [{'mac_address': '00:00:00:00:00:01',
+ 'ip_address': '10.0.0.1'},
+ {'mac_address': '00:00:00:00:00:02',
+ 'ip_address': '10.0.0.2'},
+ {'mac_address': '00:00:00:00:00:03',
+ 'ip_address': '10.0.0.3'}]
+ self._create_port_with_address_pairs(address_pairs, 201)
+
def test_create_overlap_with_fixed_ip(self):
address_pairs = [{'mac_address': '00:00:00:00:00:01',
'ip_address': '10.0.0.2'}]
res = self._create_port(self.fmt, net['network']['id'],
arg_list=(addr_pair.ADDRESS_PAIRS,),
allowed_address_pairs=address_pairs)
- self.deserialize(self.fmt, res)
+ port = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, ret_code)
+ if ret_code == 201:
+ self._delete('ports', port['port']['id'])
def test_update_add_address_pairs(self):
with self.network() as net: