self.assertEqual(res.status_int, 202)
def test_initialize_connection(self):
- def fake_initialize_connection(*args, **kwargs):
- return {}
- self.stubs.Set(volume.API, 'initialize_connection',
- fake_initialize_connection)
-
- body = {'os-initialize_connection': {'connector': 'fake'}}
- req = webob.Request.blank('/v2/fake/volumes/1/action')
- req.method = "POST"
- req.body = jsonutils.dumps(body)
- req.headers["content-type"] = "application/json"
+ with mock.patch.object(volume_api.API,
+ 'initialize_connection') as init_conn:
+ init_conn.return_value = {}
+ body = {'os-initialize_connection': {'connector': 'fake'}}
+ req = webob.Request.blank('/v2/fake/volumes/1/action')
+ req.method = "POST"
+ req.body = jsonutils.dumps(body)
+ req.headers["content-type"] = "application/json"
- res = req.get_response(fakes.wsgi_app())
- self.assertEqual(res.status_int, 200)
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 200)
def test_initialize_connection_without_connector(self):
- def fake_initialize_connection(*args, **kwargs):
- return {}
- self.stubs.Set(volume.API, 'initialize_connection',
- fake_initialize_connection)
-
- body = {'os-initialize_connection': {}}
- req = webob.Request.blank('/v2/fake/volumes/1/action')
- req.method = "POST"
- req.body = jsonutils.dumps(body)
- req.headers["content-type"] = "application/json"
-
- res = req.get_response(fakes.wsgi_app())
- self.assertEqual(res.status_int, 400)
+ with mock.patch.object(volume_api.API,
+ 'initialize_connection') as init_conn:
+ init_conn.return_value = {}
+ body = {'os-initialize_connection': {}}
+ req = webob.Request.blank('/v2/fake/volumes/1/action')
+ req.method = "POST"
+ req.body = jsonutils.dumps(body)
+ req.headers["content-type"] = "application/json"
+
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 400)
+
+ def test_initialize_connection_exception(self):
+ with mock.patch.object(volume_api.API,
+ 'initialize_connection') as init_conn:
+ init_conn.side_effect = \
+ exception.VolumeBackendAPIException(data=None)
+ body = {'os-initialize_connection': {'connector': 'fake'}}
+ req = webob.Request.blank('/v2/fake/volumes/1/action')
+ req.method = "POST"
+ req.body = jsonutils.dumps(body)
+ req.headers["content-type"] = "application/json"
+
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 500)
def test_terminate_connection(self):
- def fake_terminate_connection(*args, **kwargs):
- return {}
- self.stubs.Set(volume.API, 'terminate_connection',
- fake_terminate_connection)
-
- body = {'os-terminate_connection': {'connector': 'fake'}}
- req = webob.Request.blank('/v2/fake/volumes/1/action')
- req.method = "POST"
- req.body = jsonutils.dumps(body)
- req.headers["content-type"] = "application/json"
+ with mock.patch.object(volume_api.API,
+ 'terminate_connection') as terminate_conn:
+ terminate_conn.return_value = {}
+ body = {'os-terminate_connection': {'connector': 'fake'}}
+ req = webob.Request.blank('/v2/fake/volumes/1/action')
+ req.method = "POST"
+ req.body = jsonutils.dumps(body)
+ req.headers["content-type"] = "application/json"
- res = req.get_response(fakes.wsgi_app())
- self.assertEqual(res.status_int, 202)
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 202)
def test_terminate_connection_without_connector(self):
- def fake_terminate_connection(*args, **kwargs):
- return {}
- self.stubs.Set(volume.API, 'terminate_connection',
- fake_terminate_connection)
+ with mock.patch.object(volume_api.API,
+ 'terminate_connection') as terminate_conn:
+ terminate_conn.return_value = {}
+ body = {'os-terminate_connection': {}}
+ req = webob.Request.blank('/v2/fake/volumes/1/action')
+ req.method = "POST"
+ req.body = jsonutils.dumps(body)
+ req.headers["content-type"] = "application/json"
- body = {'os-terminate_connection': {}}
- req = webob.Request.blank('/v2/fake/volumes/1/action')
- req.method = "POST"
- req.body = jsonutils.dumps(body)
- req.headers["content-type"] = "application/json"
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 400)
- res = req.get_response(fakes.wsgi_app())
- self.assertEqual(res.status_int, 400)
+ def test_terminate_connection_with_exception(self):
+ with mock.patch.object(volume_api.API,
+ 'terminate_connection') as terminate_conn:
+ terminate_conn.side_effect = \
+ exception.VolumeBackendAPIException(data=None)
+ body = {'os-terminate_connection': {'connector': 'fake'}}
+ req = webob.Request.blank('/v2/fake/volumes/1/action')
+ req.method = "POST"
+ req.body = jsonutils.dumps(body)
+ req.headers["content-type"] = "application/json"
+
+ res = req.get_response(fakes.wsgi_app())
+ self.assertEqual(res.status_int, 500)
def test_attach_to_instance(self):
body = {'os-attach': {'instance_uuid': 'fake',