# License for the specific language governing permissions and limitations
# under the License.
+import contextlib
+
from oslo_config import cfg
from oslo_db.sqlalchemy import session
+from sqlalchemy import exc
+
_FACADE = None
facade = _create_facade_lazily()
return facade.get_session(autocommit=autocommit,
expire_on_commit=expire_on_commit)
+
+
+@contextlib.contextmanager
+def autonested_transaction(sess):
+ """This is a convenience method to not bother with 'nested' parameter."""
+ try:
+ session_context = sess.begin_nested()
+ except exc.InvalidRequestError:
+ session_context = sess.begin(subtransactions=True)
+ finally:
+ with session_context as tx:
+ yield tx
from neutron.api.v2 import attributes
from neutron.common import constants
+from neutron.db import api as db_api
from neutron.db import db_base_plugin_v2
from neutron.db import model_base
from neutron.db import models_v2
tenant_id = self._get_tenant_id_for_create(context, s)
if not default_sg:
- try:
- self._ensure_default_security_group(context, tenant_id)
- except exception.DBDuplicateEntry as ex:
- LOG.debug("Duplicate default security group %s was not"
- " created", ex.value)
+ self._ensure_default_security_group(context, tenant_id)
with context.session.begin(subtransactions=True):
security_group_db = SecurityGroup(id=s.get('id') or (
:returns: the default security group id.
"""
query = self._model_query(context, DefaultSecurityGroup)
- try:
- default_group = query.filter(
- DefaultSecurityGroup.tenant_id == tenant_id).one()
- except exc.NoResultFound:
- security_group = {
- 'security_group': {'name': 'default',
- 'tenant_id': tenant_id,
- 'description': _('Default security group')}
- }
- ret = self.create_security_group(context, security_group,
- default_sg=True)
- return ret['id']
- else:
- return default_group['security_group_id']
+ # the next loop should do 2 iterations at max
+ while True:
+ with db_api.autonested_transaction(context.session):
+ try:
+ default_group = query.filter_by(tenant_id=tenant_id).one()
+ except exc.NoResultFound:
+ security_group = {
+ 'security_group':
+ {'name': 'default',
+ 'tenant_id': tenant_id,
+ 'description': _('Default security group')}
+ }
+ try:
+ ret = self.create_security_group(
+ context, security_group, default_sg=True)
+ except exception.DBDuplicateEntry as ex:
+ LOG.debug("Duplicate default security group %s was "
+ "not created", ex.value)
+ continue
+ else:
+ return ret['id']
+ else:
+ return default_group['security_group_id']
def _get_security_groups_on_port(self, context, port):
"""Check that all security groups on port belong to tenant.
self._assert_sg_rule_has_kvs(v6_rule, expected)
def test_skip_duplicate_default_sg_error(self):
+ # can't always raise, or create_security_group will hang
with mock.patch.object(SecurityGroupTestPlugin,
- '_ensure_default_security_group',
- side_effect=exc.DBDuplicateEntry()):
- self._make_security_group(self.fmt, 'test_sg', 'test_desc')
+ 'create_security_group',
+ side_effect=[exc.DBDuplicateEntry(),
+ {'id': 'foo'}]):
+ self.plugin.create_network(
+ context.get_admin_context(),
+ {'network': {'name': 'foo',
+ 'admin_state_up': True,
+ 'shared': False,
+ 'tenant_id': 'bar'}})
def test_update_security_group(self):
with self.security_group() as sg: