class VolumeLimitExceeded(QuotaError):
- message = _("Maximum number of volumes allowed (%(allowed)d) exceeded")
+ message = _("Maximum number of volumes allowed (%(allowed)d) exceeded for "
+ "quota '%(name)s'.")
+
+ def __init__(self, message=None, **kwargs):
+ kwargs.setdefault('name', 'volumes')
+ super(VolumeLimitExceeded, self).__init__(message, **kwargs)
class SnapshotLimitExceeded(QuotaError):
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
- self.assertEqual(res.status_int, 413)
- self.assertEqual(res_dict['overLimit']['code'], 413)
- self.assertEqual(res_dict['overLimit']['message'],
- 'Maximum number of volumes allowed (1) exceeded')
+ self.assertEqual(413, res.status_int)
+ self.assertEqual(413, res_dict['overLimit']['code'])
+ self.assertEqual("Maximum number of volumes allowed (1) exceeded for"
+ " quota 'volumes'.", res_dict['overLimit']['message'])
def test_restore_backup_to_undersized_volume(self):
backup_size = 10
self.assertEqual(413, res.status_int)
self.assertEqual(413, res_dict['overLimit']['code'])
- self.assertEqual('VolumeLimitExceeded: Maximum number of volumes '
- 'allowed (1) exceeded',
+ self.assertEqual("VolumeLimitExceeded: Maximum number of volumes "
+ "allowed (1) exceeded for quota 'volumes'.",
res_dict['overLimit']['message'])
import mock
from oslo_config import cfg
from oslo_utils import timeutils
+import six
from cinder import backup
from cinder import context
for _i in range(CONF.quota_volumes):
vol_ref = self._create_volume()
volume_ids.append(vol_ref['id'])
- self.assertRaises(exception.VolumeLimitExceeded,
- volume.API().create,
- self.context, 1, '', '',
- volume_type=self.volume_type)
+ ex = self.assertRaises(exception.VolumeLimitExceeded,
+ volume.API().create,
+ self.context, 1, '', '',
+ volume_type=self.volume_type)
+ msg = ("Maximum number of volumes allowed (%d) exceeded for"
+ " quota 'volumes'." % CONF.quota_volumes)
+ self.assertEqual(msg, six.text_type(ex))
for volume_id in volume_ids:
db.volume_destroy(self.context, volume_id)
}
self.flags(**flag_args)
vol_ref = self._create_volume()
- self.assertRaises(exception.VolumeLimitExceeded,
- volume.API().create,
- self.context, 1, '', '',
- volume_type=self.volume_type)
+ ex = self.assertRaises(exception.VolumeLimitExceeded,
+ volume.API().create,
+ self.context, 1, '', '',
+ volume_type=self.volume_type)
+ msg = ("Maximum number of volumes allowed (1) exceeded for"
+ " quota '%s'." % resource)
+ self.assertEqual(msg, six.text_type(ex))
db.volume_destroy(self.context, vol_ref['id'])
def test_too_many_snapshots_of_type(self):
return None
over_name = _get_over('gigabytes')
+ exceeded_vol_limit_name = _get_over('volumes')
if over_name:
msg = _LW("Quota exceeded for %(s_pid)s, tried to create "
"%(s_size)sG volume (%(d_consumed)dG "
requested=size,
consumed=_consumed(over_name),
quota=quotas[over_name])
- elif _get_over('volumes'):
- msg = _LW("Quota exceeded for %(s_pid)s, tried to create "
- "volume (%(d_consumed)d volumes "
- "already consumed)")
- LOG.warning(msg, {'s_pid': context.project_id,
- 'd_consumed': _consumed('volumes')})
- raise exception.VolumeLimitExceeded(allowed=quotas['volumes'])
+ elif exceeded_vol_limit_name:
+ msg = _LW("Quota %(s_name)s exceeded for %(s_pid)s, tried "
+ "to create volume (%(d_consumed)d volume(s) "
+ "already consumed).")
+ LOG.warning(msg,
+ {'s_name': exceeded_vol_limit_name,
+ 's_pid': context.project_id,
+ 'd_consumed':
+ _consumed(exceeded_vol_limit_name)})
+ raise exception.VolumeLimitExceeded(
+ allowed=quotas[exceeded_vol_limit_name],
+ name=exceeded_vol_limit_name)
else:
# If nothing was reraised, ensure we reraise the initial error
raise