"""
+import contextlib
import datetime
-import mock
import os
import re
import shutil
import socket
import tempfile
+import eventlet
+import mock
import mox
from oslo.config import cfg
from taskflow.engines.action_engine import engine
from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import utils as volutils
-import eventlet
QUOTAS = quota.QUOTAS
image_id='fake_id',
source_volume='fake_id')
+ @mock.patch.object(db, 'volume_get')
+ @mock.patch.object(db, 'volume_admin_metadata_get')
+ def test_initialize_connection_fetchqos(self,
+ _mock_volume_admin_metadata_get,
+ _mock_volume_get):
+ """Make sure initialize_connection returns correct information."""
+ _mock_volume_get.return_value = {'volume_type_id': 'fake_type_id',
+ 'volume_admin_metadata': {}}
+ _mock_volume_admin_metadata_get.return_value = {}
+ connector = {'ip': 'IP', 'initiator': 'INITIATOR'}
+ qos_values = {'consumer': 'front-end',
+ 'specs': {
+ 'key1': 'value1',
+ 'key2': 'value2'}
+ }
+
+ with contextlib.nested(
+ mock.patch.object(cinder.volume.volume_types,
+ 'get_volume_type_qos_specs'),
+ mock.patch.object(cinder.tests.fake_driver.FakeISCSIDriver,
+ 'initialize_connection')
+ ) as (type_qos, driver_init):
+ type_qos.return_value = dict(qos_specs=qos_values)
+ driver_init.return_value = {'data': {}}
+ qos_specs_expected = {'key1': 'value1',
+ 'key2': 'value2'}
+ # initialize_connection() passes qos_specs that is designated to
+ # be consumed by front-end or both front-end and back-end
+ conn_info = self.volume.initialize_connection(self.context,
+ 'fake_volume_id',
+ connector)
+ self.assertDictMatch(qos_specs_expected,
+ conn_info['data']['qos_specs'])
+
+ qos_values.update({'consumer': 'both'})
+ conn_info = self.volume.initialize_connection(self.context,
+ 'fake_volume_id',
+ connector)
+ self.assertDictMatch(qos_specs_expected,
+ conn_info['data']['qos_specs'])
+ # initialize_connection() skips qos_specs that is designated to be
+ # consumed by back-end only
+ qos_values.update({'consumer': 'back-end'})
+ type_qos.return_value = dict(qos_specs=qos_values)
+ conn_info = self.volume.initialize_connection(self.context,
+ 'fake_volume_id',
+ connector)
+ self.assertEqual(None, conn_info['data']['qos_specs'])
+
def test_run_attach_detach_volume_for_instance(self):
"""Make sure volume can be attached and detached from instance."""
mountpoint = "/dev/sdf"
# Add qos_specs to connection info
typeid = volume['volume_type_id']
- specs = {}
+ specs = None
if typeid:
res = volume_types.get_volume_type_qos_specs(typeid)
- specs = res['qos_specs']
-
- # Don't pass qos_spec as empty dict
- qos_spec = dict(qos_spec=specs if specs else None)
+ qos = res['qos_specs']
+ # only pass qos_specs that is designated to be consumed by
+ # front-end, or both front-end and back-end.
+ if qos and qos.get('consumer') in ['front-end', 'both']:
+ specs = qos.get('specs')
+ qos_spec = dict(qos_specs=specs)
conn_info['data'].update(qos_spec)
# Add access_mode to connection info