def stub_snapshot_update(self, context, *args, **param):
pass
+
+
+def stub_service_get_all_by_topic(context, topic):
+ return [{'availability_zone': "zone1:host1"}]
from cinder.openstack.common import jsonutils
from cinder import test
from cinder.tests.api import fakes
+from cinder.tests.api.v1 import stubs
CONF = cfg.CONF
self.stubs.Set(cinder.db, 'volume_get', return_volume)
self.stubs.Set(cinder.db, 'volume_metadata_get',
return_volume_metadata)
+ self.stubs.Set(cinder.db, 'service_get_all_by_topic',
+ stubs.stub_service_get_all_by_topic)
self.stubs.Set(self.volume_api, 'update_volume_metadata',
fake_update_volume_metadata)
self.stubs.Set(db, 'volume_get_all', stubs.stub_volume_get_all)
self.stubs.Set(db, 'volume_get_all_by_project',
stubs.stub_volume_get_all_by_project)
+ self.stubs.Set(db, 'service_get_all_by_topic',
+ stubs.stub_service_get_all_by_topic)
self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_get)
self.stubs.Set(volume_api.API, 'delete', stubs.stub_volume_delete)
req,
body)
+ def test_volume_creation_fails_with_bad_availability_zone(self):
+ vol = {"size": '1',
+ "name": "Volume Test Name",
+ "description": "Volume Test Desc",
+ "availability_zone": "zonen:hostn"}
+ body = {"volume": vol}
+ req = fakes.HTTPRequest.blank('/v2/volumes')
+ self.assertRaises(exception.InvalidInput,
+ self.controller.create,
+ req, body)
+
def test_volume_create_with_image_id(self):
self.stubs.Set(volume_api.API, "create", stubs.stub_volume_create)
self.ext_mgr.extensions = {'os-image-create': 'fake'}
def stub_snapshot_update(self, context, *args, **param):
pass
+
+
+def stub_service_get_all_by_topic(context, topic):
+ return [{'availability_zone': "zone1:host1"}]
stubs.stub_volume_get_all_by_project)
self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_get)
self.stubs.Set(volume_api.API, 'delete', stubs.stub_volume_delete)
+ self.stubs.Set(db, 'service_get_all_by_topic',
+ stubs.stub_service_get_all_by_topic)
self.maxDiff = None
def test_volume_create(self):
req,
body)
+ def test_volume_creation_fails_with_bad_availability_zone(self):
+ vol = {"size": '1',
+ "name": "Volume Test Name",
+ "description": "Volume Test Desc",
+ "availability_zone": "zonen:hostn"}
+ body = {"volume": vol}
+ req = fakes.HTTPRequest.blank('/v2/volumes')
+ self.assertRaises(exception.InvalidInput,
+ self.controller.create,
+ req, body)
+
def test_volume_create_with_image_id(self):
self.stubs.Set(volume_api.API, "create", stubs.stub_volume_create)
self.ext_mgr.extensions = {'os-image-create': 'fake'}
"""Creates a volume in availability_zone."""
# Create volume
- availability_zone = 'zone1:host1'
+ availability_zone = 'nova'
created_volume = self.api.post_volume(
{'volume': {'size': 1,
'availability_zone': availability_zone}})
from oslo.config import cfg
+from cinder import context
from cinder.db import base
from cinder import exception
from cinder import flags
glance.get_default_image_service())
self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
self.volume_rpcapi = volume_rpcapi.VolumeAPI()
+ self.availability_zones = set()
super(API, self).__init__(db_driver)
def create(self, context, size, name, description, snapshot=None,
if availability_zone is None:
availability_zone = FLAGS.storage_availability_zone
+ else:
+ self._check_availabilty_zone(availability_zone)
if not volume_type and not source_volume:
volume_type = volume_types.get_default_volume_type()
request_spec=request_spec,
filter_properties=filter_properties)
+ def _check_availabilty_zone(self, availability_zone):
+ if availability_zone in self.availability_zones:
+ return
+
+ ctxt = context.get_admin_context()
+ topic = FLAGS.volume_topic
+ volume_services = self.db.service_get_all_by_topic(ctxt, topic)
+
+ # NOTE(haomai): In case of volume services isn't init or
+ # availability_zones is updated in the backend
+ self.availability_zones = set()
+ for service in volume_services:
+ self.availability_zones.add(service['availability_zone'])
+
+ if availability_zone not in self.availability_zones:
+ msg = _("Availability zone is invalid")
+ LOG.warn(msg)
+ raise exception.InvalidInput(reason=msg)
+
@wrap_check_policy
def delete(self, context, volume, force=False):
if context.is_admin and context.project_id != volume['project_id']: