from sqlalchemy.orm import scoped_session
from neutron.api.v2 import attributes
+from neutron.callbacks import events
+from neutron.callbacks import exceptions
+from neutron.callbacks import registry
+from neutron.callbacks import resources
from neutron.common import constants
from neutron.db import api as db_api
from neutron.db import db_base_plugin_v2
a given tenant if it does not exist.
"""
s = security_group['security_group']
+ kwargs = {
+ 'context': context,
+ 'security_group': s,
+ 'is_default': default_sg,
+ }
+ # NOTE(armax): a callback exception here will prevent the request
+ # from being processed. This is a hook point for backend's validation;
+ # we raise to propagate the reason for the failure.
+ try:
+ registry.notify(
+ resources.SECURITY_GROUP, events.BEFORE_CREATE, self,
+ **kwargs)
+ except exceptions.CallbackFailure as e:
+ raise ext_sg.SecurityGroupConflict(reason=e)
+
tenant_id = self._get_tenant_id_for_create(context, s)
if not default_sg:
ethertype=ethertype)
context.session.add(egress_rule)
- return self._make_security_group_dict(security_group_db)
+ secgroup_dict = self._make_security_group_dict(security_group_db)
+
+ kwargs['security_group'] = secgroup_dict
+ registry.notify(resources.SECURITY_GROUP, events.AFTER_CREATE, self,
+ **kwargs)
+ return secgroup_dict
def get_security_groups(self, context, filters=None, fields=None,
sorts=None, limit=None,
if sg['name'] == 'default' and not context.is_admin:
raise ext_sg.SecurityGroupCannotRemoveDefault()
+ kwargs = {
+ 'context': context,
+ 'security_group_id': id,
+ 'security_group': sg,
+ }
+ # NOTE(armax): a callback exception here will prevent the request
+ # from being processed. This is a hook point for backend's validation;
+ # we raise to propagate the reason for the failure.
+ try:
+ registry.notify(
+ resources.SECURITY_GROUP, events.BEFORE_DELETE, self,
+ **kwargs)
+ except exceptions.CallbackFailure as e:
+ reason = _('cannot be deleted due to %s') % e
+ raise ext_sg.SecurityGroupInUse(id=id, reason=reason)
+
with context.session.begin(subtransactions=True):
context.session.delete(sg)
+ kwargs.pop('security_group')
+ registry.notify(resources.SECURITY_GROUP, events.AFTER_DELETE, self,
+ **kwargs)
+
def update_security_group(self, context, id, security_group):
s = security_group['security_group']
+
+ kwargs = {
+ 'context': context,
+ 'security_group_id': id,
+ 'security_group': s,
+ }
+ # NOTE(armax): a callback exception here will prevent the request
+ # from being processed. This is a hook point for backend's validation;
+ # we raise to propagate the reason for the failure.
+ try:
+ registry.notify(
+ resources.SECURITY_GROUP, events.BEFORE_UPDATE, self,
+ **kwargs)
+ except exceptions.CallbackFailure as e:
+ raise ext_sg.SecurityGroupConflict(reason=e)
+
with context.session.begin(subtransactions=True):
sg = self._get_security_group(context, id)
if sg['name'] == 'default' and 'name' in s:
raise ext_sg.SecurityGroupCannotUpdateDefault()
sg.update(s)
- return self._make_security_group_dict(sg)
+ sg_dict = self._make_security_group_dict(sg)
+
+ kwargs['security_group'] = sg_dict
+ registry.notify(resources.SECURITY_GROUP, events.AFTER_UPDATE, self,
+ **kwargs)
+ return sg_dict
def _make_security_group_dict(self, security_group, fields=None):
res = {'id': security_group['id'],
return ret
def create_security_group_rule(self, context, security_group_rule):
+ kwargs = {
+ 'context': context,
+ 'security_group_rule': security_group_rule,
+ }
+ # NOTE(armax): a callback exception here will prevent the request
+ # from being processed. This is a hook point for backend's validation;
+ # we raise to propagate the reason for the failure.
+ try:
+ registry.notify(
+ resources.SECURITY_GROUP_RULE, events.BEFORE_CREATE, self,
+ **kwargs)
+ except exceptions.CallbackFailure as e:
+ raise ext_sg.SecurityGroupConflict(reason=e)
+
bulk_rule = {'security_group_rules': [security_group_rule]}
- return self.create_security_group_rule_bulk_native(context,
- bulk_rule)[0]
+ sg_rule_dict = self.create_security_group_rule_bulk_native(
+ context, bulk_rule)[0]
+
+ kwargs['security_group_rule'] = sg_rule_dict
+ registry.notify(
+ resources.SECURITY_GROUP_RULE, events.AFTER_CREATE, self,
+ **kwargs)
+ return sg_rule_dict
def _get_ip_proto_number(self, protocol):
if protocol is None:
return sgr
def delete_security_group_rule(self, context, id):
+ kwargs = {
+ 'context': context,
+ 'security_group_rule_id': id
+ }
+ # NOTE(armax): a callback exception here will prevent the request
+ # from being processed. This is a hook point for backend's validation;
+ # we raise to propagate the reason for the failure.
+ try:
+ registry.notify(
+ resources.SECURITY_GROUP_RULE, events.BEFORE_DELETE, self,
+ **kwargs)
+ except exceptions.CallbackFailure as e:
+ reason = _('cannot be deleted due to %s') % e
+ raise ext_sg.SecurityGroupRuleInUse(id=id, reason=reason)
+
with context.session.begin(subtransactions=True):
query = self._model_query(context, SecurityGroupRule)
if query.filter(SecurityGroupRule.id == id).delete() == 0:
raise ext_sg.SecurityGroupRuleNotFound(id=id)
+ registry.notify(
+ resources.SECURITY_GROUP_RULE, events.AFTER_DELETE, self,
+ **kwargs)
+
def _extend_port_dict_security_group(self, port_res, port_db):
# Security group bindings will be retrieved from the sqlalchemy
# model. As they're loaded eagerly with ports because of the
# See the License for the specific language governing permissions and
# limitations under the License.
+import contextlib
+import mock
import testtools
+from neutron.callbacks import exceptions
+from neutron.callbacks import registry
from neutron import context
from neutron.db import common_db_mixin
from neutron.db import securitygroups_db
self.ctx = context.get_admin_context()
self.mixin = SecurityGroupDbMixinImpl()
+ def test_create_security_group_conflict(self):
+ with mock.patch.object(registry, "notify") as mock_notify:
+ mock_notify.side_effect = exceptions.CallbackFailure(Exception())
+ secgroup = {'security_group': mock.ANY}
+ with testtools.ExpectedException(
+ securitygroup.SecurityGroupConflict):
+ self.mixin.create_security_group(self.ctx, secgroup)
+
+ def test_delete_security_group_in_use(self):
+ with contextlib.nested(
+ mock.patch.object(self.mixin, '_get_port_security_group_bindings'),
+ mock.patch.object(self.mixin, '_get_security_group'),
+ mock.patch.object(registry, "notify"),
+ ) as (_, _, mock_notify):
+ mock_notify.side_effect = exceptions.CallbackFailure(Exception())
+ with testtools.ExpectedException(
+ securitygroup.SecurityGroupInUse):
+ self.mixin.delete_security_group(self.ctx, mock.ANY)
+
+ def test_update_security_group_conflict(self):
+ with mock.patch.object(registry, "notify") as mock_notify:
+ mock_notify.side_effect = exceptions.CallbackFailure(Exception())
+ secgroup = {'security_group': mock.ANY}
+ with testtools.ExpectedException(
+ securitygroup.SecurityGroupConflict):
+ self.mixin.update_security_group(self.ctx, 'foo_id', secgroup)
+
+ def test_create_security_group_rule_conflict(self):
+ with mock.patch.object(registry, "notify") as mock_notify:
+ mock_notify.side_effect = exceptions.CallbackFailure(Exception())
+ with testtools.ExpectedException(
+ securitygroup.SecurityGroupConflict):
+ self.mixin.create_security_group_rule(self.ctx, mock.ANY)
+
+ def test_delete_security_group_rule_in_use(self):
+ with mock.patch.object(registry, "notify") as mock_notify:
+ mock_notify.side_effect = exceptions.CallbackFailure(Exception())
+ with testtools.ExpectedException(
+ securitygroup.SecurityGroupRuleInUse):
+ self.mixin.delete_security_group_rule(self.ctx, mock.ANY)
+
def test_delete_security_group_rule_raise_error_on_not_found(self):
with testtools.ExpectedException(
securitygroup.SecurityGroupRuleNotFound):