ValueError: webob.exc.HTTPBadRequest,
}
-QUOTAS = quota.QUOTAS
-
def _fields(request):
"""
plugin=self._plugin)
try:
tenant_id = item[self._resource]['tenant_id']
- count = QUOTAS.count(request.context, self._resource,
- self._plugin, self._collection,
- tenant_id)
+ count = quota.QUOTAS.count(request.context, self._resource,
+ self._plugin, self._collection,
+ tenant_id)
if bulk:
delta = deltas.get(tenant_id, 0) + 1
deltas[tenant_id] = delta
# We don't want to quota this resource
LOG.debug(e)
else:
- QUOTAS.limit_check(request.context,
- item[self._resource]['tenant_id'],
- **kwargs)
+ quota.QUOTAS.limit_check(request.context,
+ item[self._resource]['tenant_id'],
+ **kwargs)
def notify(create_result):
notifier_api.notify(request.context,
quotas = DbQuotaDriver.get_tenant_quotas(
context, sub_resources, context.tenant_id)
- return dict((k, v['limit']) for k, v in quotas.items())
+ return dict((k, v) for k, v in quotas.items())
def limit_check(self, context, tenant_id, resources, values):
"""Check simple quota limits.
return len(obj_list) if obj_list else 0
-resources = []
-for resource_item in cfg.CONF.QUOTAS.quota_items:
- resources.append(CountableResource(resource_item, _count_resource,
- 'quota_' + resource_item))
+def register_resources_from_config():
+ resources = []
+ for resource_item in cfg.CONF.QUOTAS.quota_items:
+ resources.append(CountableResource(resource_item, _count_resource,
+ 'quota_' + resource_item))
+ QUOTAS.register_resources(resources)
-QUOTAS.register_resources(resources)
+
+register_resources_from_config()
-import unittest
+import unittest2 as unittest
import webtest
import mock
from quantum.api import extensions
from quantum.api.v2 import attributes
from quantum.common import config
+from quantum.common import exceptions
from quantum import context
from quantum.db import api as db
from quantum import manager
'quota_items',
['network', 'subnet', 'port', 'extra1'],
group='QUOTAS')
-
+ quota.QUOTAS = quota.QuotaEngine()
+ quota.register_resources_from_config()
self._plugin_patcher = mock.patch(TARGET_PLUGIN, autospec=True)
self.plugin = self._plugin_patcher.start()
self.plugin.return_value.supported_extension_aliases = ['quotas']
self.assertEqual(404, res.status_int)
except Exception:
pass
+
+ def test_quotas_limit_check(self):
+ tenant_id = 'tenant_id1'
+ env = {'quantum.context': context.Context('', tenant_id,
+ is_admin=True)}
+ quotas = {'quota': {'network': 5}}
+ res = self.api.put_json(_get_path('quotas', id=tenant_id, fmt='json'),
+ quotas, extra_environ=env)
+ self.assertEqual(200, res.status_int)
+ quota.QUOTAS.limit_check(context.Context('', tenant_id),
+ tenant_id,
+ network=4)
+
+ def test_quotas_limit_check_with_over_quota(self):
+ tenant_id = 'tenant_id1'
+ env = {'quantum.context': context.Context('', tenant_id,
+ is_admin=True)}
+ quotas = {'quota': {'network': 5}}
+ res = self.api.put_json(_get_path('quotas', id=tenant_id, fmt='json'),
+ quotas, extra_environ=env)
+ self.assertEqual(200, res.status_int)
+ with self.assertRaises(exceptions.OverQuota):
+ quota.QUOTAS.limit_check(context.Context('', tenant_id),
+ tenant_id,
+ network=6)
+
+ def test_quotas_limit_check_with_invalid_quota_value(self):
+ tenant_id = 'tenant_id1'
+ with self.assertRaises(exceptions.InvalidQuotaValue):
+ quota.QUOTAS.limit_check(context.Context('', tenant_id),
+ tenant_id,
+ network=-1)