super(SubnetInUse, self).__init__(**kwargs)
+class SubnetPoolInUse(InUse):
+ message = _("Unable to complete operation on subnet pool "
+ "%(subnet_pool_id)s. %(reason)s.")
+
+ def __init__(self, **kwargs):
+ if 'reason' not in kwargs:
+ kwargs['reason'] = _("Two or more concurrent subnets allocated.")
+ super(SubnetPoolInUse, self).__init__(**kwargs)
+
+
class PortInUse(InUse):
message = _("Unable to complete operation on port %(port_id)s "
"for network %(net_id)s. Port already has an attached "
id=self._subnetpool['id'], hash=current_hash)
count = query.update({'hash': new_hash})
if not count:
- raise db_exc.RetryRequest()
+ raise db_exc.RetryRequest(n_exc.SubnetPoolInUse(
+ subnet_pool_id=self._subnetpool['id']))
def _get_allocated_cidrs(self):
query = self._context.session.query(models_v2.Subnet)
# License for the specific language governing permissions and limitations
# under the License.
+import mock
import netaddr
from oslo_config import cfg
+from oslo_db import exception as db_exc
from oslo_utils import uuidutils
from neutron.api.v2 import attributes
self.assertRaises(n_exc.SubnetPoolQuotaExceeded,
sa.allocate_subnet,
req)
+
+ def test_subnetpool_concurrent_allocation_exception(self):
+ sp = self._create_subnet_pool(self.plugin, self.ctx, 'test-sp',
+ ['fe80::/48'],
+ 48, 6, default_quota=1)
+ sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
+ sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
+ req = ipam_req.SpecificSubnetRequest(self._tenant_id,
+ uuidutils.generate_uuid(),
+ 'fe80::/63')
+ with mock.patch("sqlalchemy.orm.query.Query.update", return_value=0):
+ self.assertRaises(db_exc.RetryRequest, sa.allocate_subnet, req)