# under the License.
import netaddr
+from oslo_db import exception
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.orm import exc
from neutron.db import model_base
from neutron.db import models_v2
from neutron.extensions import securitygroup as ext_sg
+from neutron.openstack.common import log as logging
from neutron.openstack.common import uuidutils
+LOG = logging.getLogger(__name__)
IP_PROTOCOL_MAP = {constants.PROTO_NAME_TCP: constants.PROTO_NUM_TCP,
constants.PROTO_NAME_UDP: constants.PROTO_NUM_UDP,
tenant_id = self._get_tenant_id_for_create(context, s)
if not default_sg:
- self._ensure_default_security_group(context, tenant_id)
+ try:
+ self._ensure_default_security_group(context, tenant_id)
+ except exception.DBDuplicateEntry as ex:
+ LOG.debug("Duplicate default security group %s was not"
+ " created", ex.value)
with context.session.begin(subtransactions=True):
security_group_db = SecurityGroup(id=s.get('id') or (
import contextlib
import mock
+import oslo_db.exception as exc
import webob.exc
from neutron.api.v2 import attributes as attr
'port_range_min': None}
self._assert_sg_rule_has_kvs(v6_rule, expected)
+ def test_skip_duplicate_default_sg_error(self):
+ with mock.patch.object(SecurityGroupTestPlugin,
+ '_ensure_default_security_group',
+ side_effect=exc.DBDuplicateEntry()):
+ self._make_security_group(self.fmt, 'test_sg', 'test_desc')
+
def test_update_security_group(self):
with self.security_group() as sg:
data = {'security_group': {'name': 'new_name',