* QosPolicy: directly maps to the conceptual policy resource.
* QosNetworkPolicyBinding, QosPortPolicyBinding: defines attachment between a
Neutron resource and a QoS policy.
-* QosRule: defines common rule fields for all supported rule types.
-* QosBandwidthLimitRule: defines rule fields that are specific to
- bandwidth_limit type (the only type supported by the service as of time of
- writing).
+* QosBandwidthLimitRule: defines the only rule type available at the moment.
-There is a one-to-one relationship between QosRule and type specific
-Qos<type>Rule database objects. We represent the single object with two tables
-to avoid duplication of common fields. (That introduces some complexity in
-neutron objects for rule resources, but see below).
All database models are defined under:
the database if the field is not inspected by consumers.
For Qos<type>Rule objects, an extendable approach was taken to allow easy
-addition of objects for new rule types. To accomodate this, all the methods
-that access the database were implemented in a base class called QosRule that
-is then inherited into type-specific rule implementations that, ideally, only
-define additional fields and some other minor things.
+addition of objects for new rule types. To accomodate this, fields common to
+all types are put into a base class called QosRule that is then inherited into
+type-specific rule implementations that, ideally, only define additional fields
+and some other minor things.
Note that the QosRule base class is not registered with oslo.versionedobjects
registry, because it's not expected that 'generic' rules should be
message = _("User does not have admin privileges: %(reason)s")
+class ObjectNotFound(NotFound):
+ message = _("Object %(id)s not found.")
+
+
class NetworkNotFound(NotFound):
message = _("Network %(net_id)s could not be found")
from sqlalchemy import exc
from sqlalchemy import orm
+from neutron.common import exceptions as n_exc
from neutron.db import common_db_mixin
return db_obj.__dict__
+def _safe_get_object(context, model, id):
+ db_obj = get_object(context, model, id=id)
+ if db_obj is None:
+ raise n_exc.ObjectNotFound(id=id)
+ return db_obj
+
+
def update_object(context, model, id, values):
with context.session.begin(subtransactions=True):
- db_obj = get_object(context, model, id=id)
+ db_obj = _safe_get_object(context, model, id)
db_obj.update(values)
db_obj.save(session=context.session)
return db_obj.__dict__
def delete_object(context, model, id):
with context.session.begin(subtransactions=True):
- db_obj = get_object(context, model, id=id)
+ db_obj = _safe_get_object(context, model, id)
context.session.delete(db_obj)
nullable=False, unique=True))
op.create_table(
- 'qos_rules',
+ 'qos_bandwidth_limit_rules',
sa.Column('id', sa.String(length=36), primary_key=True),
sa.Column('qos_policy_id', sa.String(length=36),
sa.ForeignKey('qos_policies.id', ondelete='CASCADE'),
nullable=False),
- sa.Column('type', sa.String(length=255)))
-
- op.create_table(
- 'qos_bandwidth_limit_rules',
- sa.Column('id', sa.String(length=36),
- sa.ForeignKey('qos_rules.id', ondelete='CASCADE'),
- nullable=False,
- primary_key=True),
sa.Column('max_kbps', sa.Integer()),
sa.Column('max_burst_kbps', sa.Integer()))
cascade='delete', lazy='joined'))
-class QosRule(model_base.BASEV2, models_v2.HasId):
- __tablename__ = 'qos_rules'
- type = sa.Column(sa.String(255))
- qos_policy_id = sa.Column(sa.String(36),
- sa.ForeignKey('qos_policies.id',
- ondelete='CASCADE'),
- nullable=False)
+class QosRuleColumns(models_v2.HasId):
+ qos_policy_id = sa.Column(sa.String(36), nullable=False)
+ __table_args__ = (
+ sa.ForeignKeyConstraint(['qos_policy_id'], ['qos_policies.id']),
+ model_base.BASEV2.__table_args__
+ )
-class QosBandwidthLimitRule(model_base.BASEV2):
+
+class QosBandwidthLimitRule(QosRuleColumns, model_base.BASEV2):
__tablename__ = 'qos_bandwidth_limit_rules'
max_kbps = sa.Column(sa.Integer)
max_burst_kbps = sa.Column(sa.Integer)
- id = sa.Column(sa.String(36),
- sa.ForeignKey('qos_rules.id',
- ondelete='CASCADE'),
- nullable=False,
- primary_key=True)
from oslo_versionedobjects import base as obj_base
import six
+from neutron.common import exceptions
from neutron.db import api as db_api
+class NeutronObjectUpdateForbidden(exceptions.NeutronException):
+ message = _("Unable to update the following object fields: %(fields)s")
+
+
+def get_updatable_fields(cls, fields):
+ fields = fields.copy()
+ for field in cls.fields_no_update:
+ if field in fields:
+ del fields[field]
+ return fields
+
+
@six.add_metaclass(abc.ABCMeta)
class NeutronObject(obj_base.VersionedObject,
obj_base.VersionedObjectDictCompat,
# should be overridden for all persistent objects
db_model = None
- # fields that are not allowed to update
- fields_no_update = []
-
synthetic_fields = []
+ fields_no_update = []
+
def from_db_object(self, *objs):
for field in self.fields:
for db_obj in objs:
del fields[field]
return fields
+ def _validate_changed_fields(self, fields):
+ fields = fields.copy()
+ # We won't allow id update anyway, so let's pop it out not to trigger
+ # update on id field touched by the consumer
+ fields.pop('id', None)
+
+ forbidden_updates = set(self.fields_no_update) & set(fields.keys())
+ if forbidden_updates:
+ raise NeutronObjectUpdateForbidden(fields=forbidden_updates)
+
+ return fields
+
def create(self):
fields = self._get_changed_persistent_fields()
db_obj = db_api.create_object(self._context, self.db_model, fields)
def update(self):
updates = self._get_changed_persistent_fields()
+ updates = self._validate_changed_fields(updates)
+
if updates:
db_obj = db_api.update_object(self._context, self.db_model,
self.id, updates)
action='obj_load_attr', reason='unable to load %s' % attrname)
rule_cls = getattr(rule_obj_impl, self.rule_fields[attrname])
- rules = rule_cls.get_rules_by_policy(self._context, self.id)
+ rules = rule_cls.get_objects(self._context, qos_policy_id=self.id)
setattr(self, attrname, rules)
self.obj_reset_changes([attrname])
return cls._get_object_policy(context, cls.port_binding_model,
port_id=port_id)
+ # TODO(QoS): Consider extending base to trigger registered methods for us
def create(self):
with db_api.autonested_transaction(self._context.session):
super(QosPolicy, self).create()
from oslo_versionedobjects import fields as obj_fields
import six
-from neutron.db import api as db_api
from neutron.db.qos import models as qos_db_model
from neutron.objects import base
-from neutron.services.qos import qos_consts
@six.add_metaclass(abc.ABCMeta)
class QosRule(base.NeutronDbObject):
- base_db_model = qos_db_model.QosRule
-
fields = {
'id': obj_fields.UUIDField(),
- #TODO(QoS): We ought to kill the `type' attribute
- 'type': obj_fields.StringField(),
'qos_policy_id': obj_fields.UUIDField()
}
- fields_no_update = ['id', 'tenant_id', 'qos_policy_id']
-
- # each rule subclass should redefine it
- rule_type = None
-
- _core_fields = list(fields.keys())
-
- _common_fields = ['id']
-
- @classmethod
- def _is_common_field(cls, field):
- return field in cls._common_fields
-
- @classmethod
- def _is_core_field(cls, field):
- return field in cls._core_fields
-
- @classmethod
- def _is_addn_field(cls, field):
- return not cls._is_core_field(field) or cls._is_common_field(field)
-
- @staticmethod
- def _filter_fields(fields, func):
- return {
- key: val for key, val in fields.items()
- if func(key)
- }
-
- def _get_changed_core_fields(self):
- fields = self.obj_get_changes()
- return self._filter_fields(fields, self._is_core_field)
-
- def _get_changed_addn_fields(self):
- fields = self.obj_get_changes()
- return self._filter_fields(fields, self._is_addn_field)
-
- def _copy_common_fields(self, from_, to_):
- for field in self._common_fields:
- to_[field] = from_[field]
-
- @classmethod
- def get_objects(cls, context, **kwargs):
- # TODO(QoS): support searching for subtype fields
- db_objs = db_api.get_objects(context, cls.base_db_model, **kwargs)
- return [cls.get_by_id(context, db_obj['id']) for db_obj in db_objs]
-
- @classmethod
- def get_by_id(cls, context, id):
- obj = super(QosRule, cls).get_by_id(context, id)
-
- if obj:
- # the object above does not contain fields from base QosRule yet,
- # so fetch it and mix its fields into the object
- base_db_obj = db_api.get_object(context, cls.base_db_model, id=id)
- for field in cls._core_fields:
- setattr(obj, field, base_db_obj[field])
-
- obj.obj_reset_changes()
- return obj
-
- # TODO(QoS): Test that create is in single transaction
- def create(self):
-
- # TODO(QoS): enforce that type field value is bound to specific class
- self.type = self.rule_type
-
- # create base qos_rule
- core_fields = self._get_changed_core_fields()
-
- with db_api.autonested_transaction(self._context.session):
- base_db_obj = db_api.create_object(
- self._context, self.base_db_model, core_fields)
-
- # create type specific qos_..._rule
- addn_fields = self._get_changed_addn_fields()
- self._copy_common_fields(core_fields, addn_fields)
- addn_db_obj = db_api.create_object(
- self._context, self.db_model, addn_fields)
-
- # merge two db objects into single neutron one
- self.from_db_object(base_db_obj, addn_db_obj)
-
- # TODO(QoS): Test that update is in single transaction
- def update(self):
- updated_db_objs = []
-
- # TODO(QoS): enforce that type field cannot be changed
-
- # update base qos_rule, if needed
- core_fields = self._get_changed_core_fields()
-
- with db_api.autonested_transaction(self._context.session):
- if core_fields:
- base_db_obj = db_api.update_object(
- self._context, self.base_db_model, self.id, core_fields)
- updated_db_objs.append(base_db_obj)
-
- addn_fields = self._get_changed_addn_fields()
- if addn_fields:
- addn_db_obj = db_api.update_object(
- self._context, self.db_model, self.id, addn_fields)
- updated_db_objs.append(addn_db_obj)
-
- # update neutron object with values from both database objects
- self.from_db_object(*updated_db_objs)
-
- # delete is the same, additional rule object cleanup is done thru cascading
-
- @classmethod
- def get_rules_by_policy(cls, context, policy_id):
- return cls.get_objects(context, qos_policy_id=policy_id)
+ fields_no_update = ['id', 'qos_policy_id']
@obj_base.VersionedObjectRegistry.register
db_model = qos_db_model.QosBandwidthLimitRule
- rule_type = qos_consts.RULE_TYPE_BANDWIDTH_LIMIT
-
fields = {
'max_kbps': obj_fields.IntegerField(nullable=True),
'max_burst_kbps': obj_fields.IntegerField(nullable=True)
def setUp(self):
super(QosPolicyObjectTestCase, self).setUp()
- self.db_qos_rules = [self.get_random_fields(rule.QosRule)
- for _ in range(3)]
-
- # Tie qos rules with policies
- self.db_qos_rules[0]['qos_policy_id'] = self.db_objs[0]['id']
- self.db_qos_rules[1]['qos_policy_id'] = self.db_objs[0]['id']
- self.db_qos_rules[2]['qos_policy_id'] = self.db_objs[1]['id']
-
+ # qos_policy_ids will be incorrect, but we don't care in this test
self.db_qos_bandwidth_rules = [
self.get_random_fields(rule.QosBandwidthLimitRule)
for _ in range(3)]
- # Tie qos rules with qos bandwidth limit rules
- for i, qos_rule in enumerate(self.db_qos_rules):
- self.db_qos_bandwidth_rules[i]['id'] = qos_rule['id']
-
self.model_map = {
self._test_class.db_model: self.db_objs,
- rule.QosRule.base_db_model: self.db_qos_rules,
rule.QosBandwidthLimitRule.db_model: self.db_qos_bandwidth_rules}
- def fake_get_objects(self, context, model, qos_policy_id=None):
- objs = self.model_map[model]
- if model is rule.QosRule.base_db_model and qos_policy_id:
- return [obj for obj in objs
- if obj['qos_policy_id'] == qos_policy_id]
- return objs
+ def fake_get_objects(self, context, model, **kwargs):
+ return self.model_map[model]
def fake_get_object(self, context, model, id):
objects = self.model_map[model]
objs = self._test_class.get_objects(self.context)
context_mock.assert_called_once_with()
- get_objects_mock.assert_any_call(
- admin_context, self._test_class.db_model)
+ get_objects_mock.assert_any_call(
+ admin_context, self._test_class.db_model)
self._validate_objects(self.db_objs, objs)
def test_get_by_id(self):
# License for the specific language governing permissions and limitations
# under the License.
-import mock
-
-from neutron.db import api as db_api
from neutron.objects.qos import policy
from neutron.objects.qos import rule
from neutron.tests.unit.objects import test_base
_test_class = rule.QosBandwidthLimitRule
- @classmethod
- def get_random_fields(cls):
- # object middleware should not allow random types, so override it with
- # proper type
- fields = (super(QosBandwidthLimitRuleObjectTestCase, cls)
- .get_random_fields())
- fields['type'] = cls._test_class.rule_type
- return fields
-
- def _filter_db_object(self, func):
- return {
- field: self.db_obj[field]
- for field in self._test_class.fields
- if func(field)
- }
-
- def _get_core_db_obj(self):
- return self._filter_db_object(
- lambda field: self._test_class._is_core_field(field))
-
- def _get_addn_db_obj(self):
- return self._filter_db_object(
- lambda field: self._test_class._is_addn_field(field))
-
- def test_get_by_id(self):
- with mock.patch.object(db_api, 'get_object',
- return_value=self.db_obj) as get_object_mock:
- obj = self._test_class.get_by_id(self.context, id='fake_id')
- self.assertTrue(self._is_test_class(obj))
- self.assertEqual(self.db_obj, test_base.get_obj_db_fields(obj))
- get_object_mock.assert_has_calls([
- mock.call(self.context, model, id='fake_id')
- for model in (self._test_class.db_model,
- self._test_class.base_db_model)
- ], any_order=True)
-
- def test_get_objects(self):
- with mock.patch.object(db_api, 'get_objects',
- return_value=self.db_objs):
-
- @classmethod
- def _get_by_id(cls, context, id):
- for db_obj in self.db_objs:
- if db_obj['id'] == id:
- return self._test_class(context, **db_obj)
-
- with mock.patch.object(rule.QosRule, 'get_by_id', new=_get_by_id):
- objs = self._test_class.get_objects(self.context)
- self.assertFalse(
- filter(lambda obj: not self._is_test_class(obj), objs))
- self.assertEqual(
- sorted(self.db_objs),
- sorted(test_base.get_obj_db_fields(obj) for obj in objs))
-
- def test_create(self):
- with mock.patch.object(db_api, 'create_object',
- return_value=self.db_obj) as create_mock:
- test_class = self._test_class
- obj = test_class(self.context, **self.db_obj)
- self._check_equal(obj, self.db_obj)
- obj.create()
- self._check_equal(obj, self.db_obj)
-
- core_db_obj = self._get_core_db_obj()
- addn_db_obj = self._get_addn_db_obj()
- create_mock.assert_has_calls(
- [mock.call(self.context, self._test_class.base_db_model,
- core_db_obj),
- mock.call(self.context, self._test_class.db_model,
- addn_db_obj)]
- )
-
- def test_update_changes(self):
- with mock.patch.object(db_api, 'update_object',
- return_value=self.db_obj) as update_mock:
- obj = self._test_class(self.context, **self.db_obj)
- self._check_equal(obj, self.db_obj)
- obj.update()
- self._check_equal(obj, self.db_obj)
-
- core_db_obj = self._get_core_db_obj()
- update_mock.assert_any_call(
- self.context, self._test_class.base_db_model, obj.id,
- core_db_obj)
-
- addn_db_obj = self._get_addn_db_obj()
- update_mock.assert_any_call(
- self.context, self._test_class.db_model, obj.id,
- addn_db_obj)
-
class QosBandwidthLimitRuleDbObjectTestCase(test_base.BaseDbObjectTestCase,
testlib_api.SqlTestCase):
from oslo_versionedobjects import base as obj_base
from oslo_versionedobjects import fields as obj_fields
+from neutron.common import exceptions as n_exc
from neutron import context
from neutron.db import api as db_api
from neutron.objects import base
'field2': obj_fields.StringField()
}
+ fields_no_update = ['id']
+
def _random_string(n=10):
return ''.join(random.choice(string.ascii_lowercase) for _ in range(n))
fields[field] = generator()
return fields
+ def get_updatable_fields(self, fields):
+ return base.get_updatable_fields(self._test_class, fields)
+
@classmethod
def _is_test_class(cls, obj):
return isinstance(obj, cls._test_class)
obj.create()
self._check_equal(obj, self.db_obj)
- def test_update_no_changes(self):
- with mock.patch.object(db_api, 'update_object',
- return_value=self.db_obj) as update_mock:
- obj = self._test_class(self.context, **self.db_obj)
- self._check_equal(obj, self.db_obj)
+ @mock.patch.object(db_api, 'update_object')
+ def test_update_no_changes(self, update_mock):
+ with mock.patch.object(base.NeutronDbObject,
+ '_get_changed_persistent_fields',
+ return_value={}):
+ obj = self._test_class(self.context)
obj.update()
- self.assertTrue(update_mock.called)
-
- # consequent call to update does not try to update database
- update_mock.reset_mock()
- obj.update()
- self._check_equal(obj, self.db_obj)
self.assertFalse(update_mock.called)
- def test_update_changes(self):
- with mock.patch.object(db_api, 'update_object',
- return_value=self.db_obj) as update_mock:
+ @mock.patch.object(db_api, 'update_object')
+ def test_update_changes(self, update_mock):
+ fields_to_update = self.get_updatable_fields(self.db_obj)
+ with mock.patch.object(base.NeutronDbObject,
+ '_get_changed_persistent_fields',
+ return_value=fields_to_update):
obj = self._test_class(self.context, **self.db_obj)
- self._check_equal(obj, self.db_obj)
obj.update()
- self._check_equal(obj, self.db_obj)
update_mock.assert_called_once_with(
self.context, self._test_class.db_model,
- self.db_obj['id'], self.db_obj)
+ self.db_obj['id'], fields_to_update)
+
+ @mock.patch.object(base.NeutronDbObject,
+ '_get_changed_persistent_fields',
+ return_value={'a': 'a', 'b': 'b', 'c': 'c'})
+ def test_update_changes_forbidden(self, *mocks):
+ with mock.patch.object(
+ self._test_class,
+ 'fields_no_update',
+ new_callable=mock.PropertyMock(return_value=['a', 'c']),
+ create=True):
+ obj = self._test_class(self.context, **self.db_obj)
+ self.assertRaises(base.NeutronObjectUpdateForbidden, obj.update)
def test_update_updates_from_db_object(self):
with mock.patch.object(db_api, 'update_object',
return_value=self.db_obj):
obj = self._test_class(self.context, **self.db_objs[1])
- self._check_equal(obj, self.db_objs[1])
- obj.update()
+ fields_to_update = self.get_updatable_fields(self.db_objs[1])
+ with mock.patch.object(base.NeutronDbObject,
+ '_get_changed_persistent_fields',
+ return_value=fields_to_update):
+ obj.update()
self._check_equal(obj, self.db_obj)
@mock.patch.object(db_api, 'delete_object')
self.assertEqual(obj, new)
obj = new
- for key, val in self.db_objs[1].items():
- if key not in self._test_class.fields_no_update:
- setattr(obj, key, val)
+
+ for key, val in self.get_updatable_fields(self.db_objs[1]).items():
+ setattr(obj, key, val)
obj.update()
new = self._test_class.get_by_id(self.context, id=obj.id)
new = self._test_class.get_by_id(self.context, id=obj.id)
self.assertIsNone(new)
+
+ def test_update_non_existent_object_raises_not_found(self):
+ obj = self._test_class(self.context, **self.db_obj)
+ obj.obj_reset_changes()
+
+ for key, val in self.get_updatable_fields(self.db_obj).items():
+ setattr(obj, key, val)
+
+ self.assertRaises(n_exc.ObjectNotFound, obj.update)
+
+ def test_delete_non_existent_object_raises_not_found(self):
+ obj = self._test_class(self.context, **self.db_obj)
+ self.assertRaises(n_exc.ObjectNotFound, obj.delete)
from neutron.common import exceptions as n_exc
from neutron import context
from neutron import manager
+from neutron.objects import base as base_object
from neutron.objects.qos import policy as policy_object
from neutron.objects.qos import rule as rule_object
from neutron.plugins.common import constants
self.assertFalse(self.registry_m.called)
def test_update_policy(self):
+ fields = base_object.get_updatable_fields(
+ policy_object.QosPolicy, self.policy_data['policy'])
self.qos_plugin.update_policy(
- self.ctxt, self.policy.id, self.policy_data)
+ self.ctxt, self.policy.id, {'policy': fields})
self._validate_registry_params(events.UPDATED)
def test_delete_policy(self):