resource.
In addition to the fields that belong to QoS policy database object itself,
-synthetic fields were added to the object that represent lists of rules,
-per-type, that belong to the policy. For example, to get a list of all
-bandwidth_limit rules for a specific policy, a consumer of the object can just
-access corresponding attribute via:
+synthetic fields were added to the object that represent lists of rules that
+belong to the policy. To get a list of all rules for a specific policy, a
+consumer of the object can just access the corresponding attribute via:
-* policy.bandwidth_limit_rules
+* policy.rules
Implementation is done in a way that will allow adding a new rule list field
with little or no modifications in the policy object itself. This is achieved
by smart introspection of existing available rule object definitions and
automatic definition of those fields on the policy class.
-Note that synthetic fields are lazily loaded, meaning there is no hit into
-the database if the field is not inspected by consumers.
+Note that rules are loaded in a non lazy way, meaning they are all fetched from
+the database on policy fetch.
For Qos<type>Rule objects, an extendable approach was taken to allow easy
addition of objects for new rule types. To accomodate this, fields common to
'tenant_id': {'allow_post': True, 'allow_put': False,
'required_by_policy': True,
'is_visible': True},
- 'bandwidth_limit_rules': {'allow_post': False, 'allow_put': False,
- 'is_visible': True},
+ 'rules': {'allow_post': False, 'allow_put': False, 'is_visible': True},
},
'rule_types': {
'type': {'allow_post': False, 'allow_put': False,
# License for the specific language governing permissions and limitations
# under the License.
-import abc
-
from oslo_versionedobjects import base as obj_base
from oslo_versionedobjects import fields as obj_fields
-import six
from neutron.common import exceptions
-from neutron.common import utils
from neutron.db import api as db_api
from neutron.db.qos import api as qos_db_api
from neutron.db.qos import models as qos_db_model
from neutron.objects import base
from neutron.objects.qos import rule as rule_obj_impl
-from neutron.services.qos import qos_consts
-
-
-class QosRulesExtenderMeta(abc.ABCMeta):
-
- def __new__(mcs, name, bases, dct):
- cls = super(QosRulesExtenderMeta, mcs).__new__(mcs, name, bases, dct)
-
- cls.rule_fields = {}
- for rule in qos_consts.VALID_RULE_TYPES:
- rule_cls_name = 'Qos%sRule' % utils.camelize(rule)
- field = '%s_rules' % rule
- cls.fields[field] = obj_fields.ListOfObjectsField(rule_cls_name)
- cls.rule_fields[field] = rule_cls_name
-
- cls.synthetic_fields = list(cls.rule_fields.keys())
-
- return cls
@obj_base.VersionedObjectRegistry.register
-@six.add_metaclass(QosRulesExtenderMeta)
class QosPolicy(base.NeutronDbObject):
db_model = qos_db_model.QosPolicy
'tenant_id': obj_fields.UUIDField(),
'name': obj_fields.StringField(),
'description': obj_fields.StringField(),
- 'shared': obj_fields.BooleanField(default=False)
+ 'shared': obj_fields.BooleanField(default=False),
+ 'rules': obj_fields.ListOfObjectsField('QosRule', subclasses=True),
}
fields_no_update = ['id', 'tenant_id']
+ synthetic_fields = ['rules']
+
def to_dict(self):
dict_ = super(QosPolicy, self).to_dict()
- for field in self.rule_fields:
- if field in dict_:
- dict_[field] = [rule.to_dict() for rule in dict_[field]]
+ if 'rules' in dict_:
+ dict_['rules'] = [rule.to_dict() for rule in dict_['rules']]
return dict_
def obj_load_attr(self, attrname):
- if attrname not in self.rule_fields:
+ if attrname != 'rules':
raise exceptions.ObjectActionError(
action='obj_load_attr', reason='unable to load %s' % attrname)
- rule_cls = getattr(rule_obj_impl, self.rule_fields[attrname])
- rules = rule_cls.get_objects(self._context, qos_policy_id=self.id)
+ rules = rule_obj_impl.get_rules(self._context, self.id)
setattr(self, attrname, rules)
self.obj_reset_changes([attrname])
def _load_rules(self):
- for attr in self.rule_fields:
- self.obj_load_attr(attr)
+ self.obj_load_attr('rules')
@staticmethod
def _is_policy_accessible(context, db_obj):
# under the License.
import abc
+import sys
from oslo_versionedobjects import base as obj_base
from oslo_versionedobjects import fields as obj_fields
import six
+from neutron.common import utils
+from neutron.db import api as db_api
from neutron.db.qos import models as qos_db_model
from neutron.objects import base
+from neutron.services.qos import qos_consts
+
+
+def get_rules(context, qos_policy_id):
+ all_rules = []
+ with db_api.autonested_transaction(context.session):
+ for rule_type in qos_consts.VALID_RULE_TYPES:
+ rule_cls_name = 'Qos%sRule' % utils.camelize(rule_type)
+ rule_cls = getattr(sys.modules[__name__], rule_cls_name)
+
+ rules = rule_cls.get_objects(context, qos_policy_id=qos_policy_id)
+ all_rules.extend(rules)
+ return all_rules
@six.add_metaclass(abc.ABCMeta)
fields_no_update = ['id', 'qos_policy_id']
+ # should be redefined in subclasses
+ rule_type = None
+
+ def to_dict(self):
+ dict_ = super(QosRule, self).to_dict()
+ dict_['type'] = self.rule_type
+ return dict_
+
@obj_base.VersionedObjectRegistry.register
class QosBandwidthLimitRule(QosRule):
'max_kbps': obj_fields.IntegerField(nullable=True),
'max_burst_kbps': obj_fields.IntegerField(nullable=True)
}
+
+ rule_type = qos_consts.RULE_TYPE_BANDWIDTH_LIMIT
retrieved_policy = retrieved_policy['policy']
self.assertEqual('test policy desc', retrieved_policy['description'])
self.assertTrue(retrieved_policy['shared'])
- self.assertEqual([], retrieved_policy['bandwidth_limit_rules'])
+ self.assertEqual([], retrieved_policy['rules'])
@test.attr(type='smoke')
@test.idempotent_id('1cb42653-54bd-4a9a-b888-c55e18199201')
# Test 'show policy'
retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
- policy_rules = retrieved_policy['policy']['bandwidth_limit_rules']
+ policy_rules = retrieved_policy['policy']['rules']
self.assertEqual(1, len(policy_rules))
self.assertEqual(rule['id'], policy_rules[0]['id'])
+ self.assertEqual(qos_consts.RULE_TYPE_BANDWIDTH_LIMIT,
+ policy_rules[0]['type'])
@test.attr(type='smoke')
@test.idempotent_id('149a6988-2568-47d2-931e-2dbc858943b3')
def test_synthetic_rule_fields(self):
policy_obj, rule_obj = self._create_test_policy_with_rule()
policy_obj = policy.QosPolicy.get_by_id(self.context, policy_obj.id)
- self.assertEqual([rule_obj], policy_obj.bandwidth_limit_rules)
+ self.assertEqual([rule_obj], policy_obj.rules)
def test_create_is_in_single_transaction(self):
obj = self._test_class(self.context, **self.db_obj)
with mock.patch('sqlalchemy.engine.'
- 'Transaction.commit') as mock_commit,\
+ 'Connection._commit_impl') as mock_commit,\
mock.patch.object(obj._context.session, 'add'):
obj.create()
self.assertEqual(1, mock_commit.call_count)
policy_obj = policy.QosPolicy.get_by_id(self.context, policy_obj.id)
primitive = policy_obj.obj_to_primitive()
- self.assertNotEqual([], (primitive['versioned_object.data']
- ['bandwidth_limit_rules']))
+ self.assertNotEqual([], (primitive['versioned_object.data']['rules']))
def test_to_dict_returns_rules_as_dicts(self):
policy_obj, rule_obj = self._create_test_policy_with_rule()
for obj in (rule_dict, obj_dict):
self.assertIsInstance(obj, dict)
- self.assertEqual(rule_dict,
- obj_dict['bandwidth_limit_rules'][0])
+ self.assertEqual(rule_dict, obj_dict['rules'][0])
def test_shared_default(self):
self.db_obj.pop('shared')
from neutron.objects.qos import policy
from neutron.objects.qos import rule
+from neutron.services.qos import qos_consts
from neutron.tests.unit.objects import test_base
from neutron.tests.unit import testlib_api
_test_class = rule.QosBandwidthLimitRule
+ def test_to_dict_returns_type(self):
+ obj = rule.QosBandwidthLimitRule(self.context, **self.db_obj)
+ dict_ = obj.to_dict()
+ self.assertEqual(qos_consts.RULE_TYPE_BANDWIDTH_LIMIT, dict_['type'])
+
class QosBandwidthLimitRuleDbObjectTestCase(test_base.BaseDbObjectTestCase,
testlib_api.SqlTestCase):