- [C306] timeutils.strtime() must not be used (deprecated).
- [C307] LOG.warn is deprecated. Enforce use of LOG.warning.
- [C308] timeutils.isotime() must not be used (deprecated).
+- [C309] Unit tests should not perform logging.
General
-------
r"(.)*LOG\.(exception|error)\(\s*(_\(|'|\")")
log_translation_LW = re.compile(
r"(.)*LOG\.(warning|warn)\(\s*(_\(|'|\")")
+logging_instance = re.compile(
+ r"(.)*LOG\.(warning|info|debug|error|exception)\(")
class BaseASTChecker(ast.NodeVisitor):
yield(0, msg)
+def no_test_log(logical_line, filename, noqa):
+ if "cinder/tests" not in filename or noqa:
+ return
+ # Skip the "integrated" tests for now
+ if "cinder/tests/unit/integrated" in filename:
+ return
+ msg = "C309: Unit tests should not perform logging."
+ if logging_instance.match(logical_line):
+ yield (0, msg)
+
+
def factory(register):
register(no_vi_headers)
register(no_translate_debug_logs)
register(check_no_contextlib_nested)
register(no_log_warn)
register(dict_constructor_with_list_copy)
+ register(no_test_log)
from xml.dom import minidom
import mock
-from oslo_log import log as logging
import webob
from cinder.consistencygroup import api as consistencygroupAPI
import cinder.volume
-LOG = logging.getLogger(__name__)
-
-
class CgsnapshotsAPITestCase(test.TestCase):
"""Test Case for cgsnapshots API."""
consistencygroup_id)['id']
cgsnapshot_id = self._create_cgsnapshot(
consistencygroup_id=consistencygroup_id)
- LOG.debug('Created cgsnapshot with id %s' % cgsnapshot_id)
req = webob.Request.blank('/v2/fake/cgsnapshots/%s' %
cgsnapshot_id)
req.method = 'GET'
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
- LOG.info(res_dict)
self.assertEqual(res.status_int, 202)
self.assertIn('id', res_dict['cgsnapshot'])
import datetime
from lxml import etree
-from oslo_log import log as logging
from oslo_utils import timeutils
import webob.exc
from cinder import test
-LOG = logging.getLogger(__name__)
created_time = datetime.datetime(2012, 11, 14, 1, 20, 41, 95099)
curr_time = datetime.datetime(2013, 7, 3, 0, 0, 1)
import json
from xml.dom import minidom
-from oslo_log import log as logging
import webob
from cinder.api.contrib import volume_transfer
import cinder.transfer
-LOG = logging.getLogger(__name__)
-
-
class VolumeTransferAPITestCase(test.TestCase):
"""Test Case for transfers API."""
def test_show_transfer(self):
volume_id = self._create_volume(size=5)
transfer = self._create_transfer(volume_id)
- LOG.debug('Created transfer with id %s' % transfer)
req = webob.Request.blank('/v2/fake/os-volume-transfer/%s' %
transfer['id'])
req.method = 'GET'
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
- LOG.info(res_dict)
self.assertEqual(res.status_int, 202)
self.assertIn('id', res_dict['transfer'])
# License for the specific language governing permissions and limitations
# under the License.
-from oslo_log import log as logging
-
from cinder.api.openstack import wsgi
from cinder.api.v1 import router
from cinder.api.v1 import snapshots
from cinder.tests.unit.api import fakes
-LOG = logging.getLogger(__name__)
-
-
class FakeController(object):
def __init__(self, ext_mgr=None):
self.ext_mgr = ext_mgr
from lxml import etree
import mock
-from oslo_log import log as logging
from oslo_utils import timeutils
import webob
from cinder import volume
-LOG = logging.getLogger(__name__)
-
UUID = '00000000-0000-0000-0000-000000000001'
INVALID_UUID = '00000000-0000-0000-0000-000000000002'
from lxml import etree
import mock
-from oslo_log import log as logging
from oslo_utils import timeutils
import webob
from cinder import volume
-LOG = logging.getLogger(__name__)
-
UUID = '00000000-0000-0000-0000-000000000001'
INVALID_UUID = '00000000-0000-0000-0000-000000000002'
# License for the specific language governing permissions and limitations
# under the License.
-from oslo_log import log as logging
-
-
-LOG = logging.getLogger(__name__)
-
class FakeBrickLVM(object):
"""Logs and records calls, for unit tests."""
# under the License.
from mox3 import mox
from oslo_concurrency import processutils
-from oslo_log import log as logging
from cinder.brick.local_dev import lvm as brick
from cinder import exception
from cinder import test
from cinder.volume import configuration as conf
-LOG = logging.getLogger(__name__)
-
def create_configuration():
configuration = mox.MockObject(conf.Configuration)
import datetime
import uuid
-from oslo_log import log as logging
from oslo_utils import timeutils
from cinder import context
from oslo_db.sqlalchemy import utils as sqlalchemyutils
-LOG = logging.getLogger(__name__)
-
-
class PurgeDeletedTest(test.TestCase):
def setUp(self):
import time
-from oslo_log import log as logging
-
from cinder import context
from cinder import db
from cinder import exception
from cinder.volume import volume_types
-LOG = logging.getLogger(__name__)
-
-
def fake_qos_specs_get_by_name(context, name, session=None, inactive=False):
pass
"""Tests for transfers table."""
-from oslo_log import log as logging
-
from cinder import context
from cinder import db
from cinder import exception
from cinder.tests.unit import utils
-LOG = logging.getLogger(__name__)
-
-
class TransfersTableTestCase(test.TestCase):
"""Test case for transfers model."""
# License for the specific language governing permissions and limitations
# under the License.
-from oslo_log import log as logging
-
-from cinder.i18n import _LE
from cinder.tests.unit.brick import fake_lvm
from cinder.volume import driver
from cinder.volume.drivers import lvm
from cinder.zonemanager import utils as fczm_utils
-LOG = logging.getLogger(__name__)
-
-
class FakeISCSIDriver(lvm.LVMISCSIDriver):
"""Logs calls instead of executing."""
def __init__(self, *args, **kwargs):
@staticmethod
def fake_execute(cmd, *_args, **_kwargs):
"""Execute that simply logs the command."""
- LOG.debug("FAKE ISCSI: %s", cmd)
return (None, None)
@staticmethod
def fake_execute(cmd, *_args, **_kwargs):
"""Execute that simply logs the command."""
- LOG.debug("FAKE ISER: %s", cmd)
return (None, None)
self.log_action('clear_volume', volume)
def local_path(self, volume):
- LOG.error(_LE("local_path not implemented"))
raise NotImplementedError()
def ensure_export(self, context, volume):
@staticmethod
def log_action(action, parameters):
"""Logs the command."""
- LOG.debug("LoggingVolumeDriver: %s" % (action))
log_dictionary = {}
if parameters:
log_dictionary = dict(parameters)
log_dictionary['action'] = action
- LOG.debug("LoggingVolumeDriver: %s" % (log_dictionary))
LoggingVolumeDriver._LOGS.append(log_dictionary)
@staticmethod
import re
from eventlet import greenthread
-from oslo_concurrency import processutils
-from oslo_log import log as logging
import six
from cinder import utils
-LOG = logging.getLogger(__name__)
-
_fake_execute_repliers = []
_fake_execute_log = []
run_as_root = kwargs.get('run_as_root', False)
cmd_str = ' '.join(str(part) for part in cmd_parts)
- LOG.debug("Faking execution of cmd (subprocess): %s", cmd_str)
_fake_execute_log.append(cmd_str)
reply_handler = fake_execute_default_reply_handler
for fake_replier in _fake_execute_repliers:
if re.match(fake_replier[0], cmd_str):
reply_handler = fake_replier[1]
- LOG.debug('Faked command matched %s' % fake_replier[0])
break
if isinstance(reply_handler, six.string_types):
# If the reply handler is a string, return it as stdout
reply = reply_handler, ''
else:
- try:
- # Alternative is a function, so call it
- reply = reply_handler(cmd_parts,
- process_input=process_input,
- delay_on_retry=delay_on_retry,
- attempts=attempts,
- run_as_root=run_as_root,
- check_exit_code=check_exit_code)
- except processutils.ProcessExecutionError as e:
- LOG.debug('Faked command raised an exception %s', e)
- raise
-
- LOG.debug("Reply to faked command is stdout='%(stdout)s' "
- "stderr='%(stderr)s'" % {'stdout': reply[0],
- 'stderr': reply[1]})
+ # Alternative is a function, so call it
+ reply = reply_handler(cmd_parts,
+ process_input=process_input,
+ delay_on_retry=delay_on_retry,
+ attempts=attempts,
+ run_as_root=run_as_root,
+ check_exit_code=check_exit_code)
# Replicate the sleep call in the real function
greenthread.sleep(0)
import datetime
import uuid
-from oslo_log import log as logging
-
from cinder import exception
import cinder.image.glance
-LOG = logging.getLogger(__name__)
-
-
class _FakeImageService(object):
"""Mock (fake) image service for unit testing."""
image = self.images.get(str(image_id))
if image:
return copy.deepcopy(image)
- LOG.warning('Unable to find image id %s. Have images: %s',
- image_id, self.images)
raise exception.ImageNotFound(image_id=image_id)
def create(self, context, metadata, data=None):
-
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
import mock
from oslo_config import cfg
-from oslo_log import log as logging
from cinder import context
from cinder import db
'schedule_create_consistencygroup') as mock_cg:
original_driver = self.manager.driver
self.manager.driver = filter_scheduler.FilterScheduler
- LOG = logging.getLogger('cinder.scheduler.manager')
- self.stubs.Set(LOG, 'error', mock.Mock())
- self.stubs.Set(LOG, 'exception', mock.Mock())
+ LOG = self.mock_object(manager, 'LOG')
self.stubs.Set(db, 'consistencygroup_update', mock.Mock())
ex = exception.CinderException('test')
from unittest import mock
except ImportError:
import mock
-from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_utils import units
import six
from cinder.volume import configuration as conf
import cinder.volume.drivers.blockbridge as bb
-LOG = logging.getLogger(__name__)
-
DEFAULT_POOL_NAME = "OpenStack"
DEFAULT_POOL_QUERY = "+openstack"
# under the License.
import mock
-from oslo_log import log as logging
from cinder import context
from cinder import exception
from cinder.volume.drivers.dell import dell_storagecenter_fc
-LOG = logging.getLogger(__name__)
-
# We patch these here as they are used by every test to keep
# from trying to contact a Dell Storage Center.
-
-
@mock.patch.object(dell_storagecenter_api.StorageCenterApi,
'__init__',
return_value=None)
# License for the specific language governing permissions and limitations
# under the License.
-from oslo_log import log as logging
+import uuid
+
+import mock
from cinder import context
from cinder import exception
from cinder.volume.drivers.dell import dell_storagecenter_iscsi
from cinder.volume import volume_types
-import mock
-
-import uuid
-
-
-LOG = logging.getLogger(__name__)
# We patch these here as they are used by every test to keep
# from trying to contact a Dell Storage Center.
-
-
@mock.patch.object(dell_storagecenter_api.StorageCenterApi,
'__init__',
return_value=None)
# under the License.
import ddt
-from oslo_log import log as logging
+import mock
+from requests import models
+import uuid
from cinder import context
from cinder import exception
from cinder import test
from cinder.volume.drivers.dell import dell_storagecenter_api
-import mock
-from requests import models
-
-import uuid
-
-LOG = logging.getLogger(__name__)
# We patch these here as they are used by every test to keep
# from trying to contact a Dell Storage Center.
-
-
@ddt.ddt
@mock.patch.object(dell_storagecenter_api.StorageCenterApi,
'__init__',
import collections
import mock
-from oslo_log import log as logging
from oslo_utils import importutils
from oslo_utils import timeutils
from cinder.volume.drivers import drbdmanagedrv
-LOG = logging.getLogger(__name__)
-
-
def create_configuration(object):
configuration = mock.MockObject(conf.Configuration)
configuration.san_is_local = False
from xml.dom import minidom
import mock
-from oslo_log import log as logging
from oslo_service import loopingcall
from oslo_utils import units
import six
from cinder.volume import volume_types
-LOG = logging.getLogger(__name__)
CINDER_EMC_CONFIG_DIR = '/etc/cinder/'
import mock
-from oslo_log import log as logging
-import six
from cinder import exception
from cinder import test
from cinder.volume.drivers.emc import xtremio
-LOG = logging.getLogger(__name__)
-
typ2id = {'volumes': 'vol-id',
'snapshots': 'vol-id',
'initiators': 'initiator-id',
del xms_data[object_type][data['index']]
del xms_data[object_type][data[typ2id[object_type]][1]]
else:
- LOG.error('Trying to delete a missing object %s',
- six.text_type(obj_key))
raise exception.NotFound()
elif request_typ == 'PUT':
if obj_key in xms_data[object_type]:
if key:
xms_data[object_type][data[key]] = obj
else:
- LOG.error('Trying to update a missing object %s',
- six.text_type(obj_key))
raise exception.NotFound()
from eventlet import greenthread
import mock
from oslo_concurrency import processutils
-from oslo_log import log as logging
import paramiko
import six
from cinder.volume import configuration as conf
from cinder.volume.drivers import eqlx
-LOG = logging.getLogger(__name__)
-
class DellEQLSanISCSIDriverTestCase(test.TestCase):
import mock
from oslo_concurrency import processutils
from oslo_config import cfg
-from oslo_log import log as logging
from oslo_utils import units
from cinder import context
from cinder.volume import volume_types
-LOG = logging.getLogger(__name__)
-
CONF = cfg.CONF
# License for the specific language governing permissions and limitations
# under the License.
+import ddt
import textwrap
import mock
from cinder import test
+@ddt.ddt
class HackingTestCase(test.TestCase):
"""This class tests the hacking checks in cinder.hacking.checks
self.assertEqual(0, len(list(checks.dict_constructor_with_list_copy(
" self._render_dict(xml, data_el, data.__dict__)"))))
+
+ @ddt.unpack
+ @ddt.data(
+ (1, 'LOG.info', "cinder/tests/unit/fake.py", False),
+ (1, 'LOG.warning', "cinder/tests/fake.py", False),
+ (1, 'LOG.error', "cinder/tests/fake.py", False),
+ (1, 'LOG.exception', "cinder/tests/fake.py", False),
+ (1, 'LOG.debug', "cinder/tests/fake.py", False),
+ (0, 'LOG.info.assert_called_once_with', "cinder/tests/fake.py", False),
+ (0, 'some.LOG.error.call', "cinder/tests/fake.py", False),
+ (0, 'LOG.warning', "cinder/tests/unit/fake.py", True),
+ (0, 'LOG.warning', "cinder/tests/unit/integrated/fake.py", False))
+ def test_no_test_log(self, first, second, third, fourth):
+ self.assertEqual(first, len(list(checks.no_test_log(
+ "%s('arg')" % second, third, fourth))))
import mock
from oslo_concurrency import processutils
from oslo_config import cfg
-from oslo_log import log as logging
from cinder import test
from cinder import utils
CONF = cfg.CONF
-LOG = logging.getLogger(__name__)
-
HNAS_RESULT1 = "\n\
FS ID FS Label FS Permanent ID EVS ID EVS Label\n\
----- ----------- ------------------ ------ ---------\n\
def m_run_cmd(*args, **kargs):
- LOG.debug(args)
- LOG.debug(HNAS_CMDS.get(args))
return HNAS_CMDS.get(args)
import tempfile
import mock
-from oslo_log import log as logging
import six
from cinder import exception
from cinder.volume.drivers.hitachi import hnas_iscsi as iscsi
from cinder.volume import volume_types
-LOG = logging.getLogger(__name__)
-
HNASCONF = """<?xml version="1.0" encoding="UTF-8" ?>
<config>
<hnas_cmd>ssc</hnas_cmd>
self.connections = []
def deleteVolume(self, name):
- LOG.info("delVolume: name %s", name)
-
volume = self.getVolume(name)
if volume:
- LOG.info("deleteVolume: deleted name %s provider %s",
- volume['name'], volume['provider_location'])
self.volumes.remove(volume)
return True
else:
return False
def deleteVolumebyProvider(self, provider):
- LOG.info("delVolumeP: provider %s", provider)
-
volume = self.getVolumebyProvider(provider)
if volume:
- LOG.info("deleteVolumeP: deleted name %s provider %s",
- volume['name'], volume['provider_location'])
self.volumes.remove(volume)
return True
else:
return self.volumes
def getVolume(self, name):
- LOG.info("getVolume: find by name %s", name)
-
if self.volumes:
for volume in self.volumes:
if str(volume['name']) == name:
- LOG.info("getVolume: found name %s provider %s",
- volume['name'], volume['provider_location'])
return volume
- else:
- LOG.info("getVolume: no volumes")
-
- LOG.info("getVolume: not found")
return None
def getVolumebyProvider(self, provider):
- LOG.info("getVolumeP: find by provider %s", provider)
-
if self.volumes:
for volume in self.volumes:
if str(volume['provider_location']) == provider:
- LOG.info("getVolumeP: found name %s provider %s",
- volume['name'], volume['provider_location'])
return volume
- else:
- LOG.info("getVolumeP: no volumes")
-
- LOG.info("getVolumeP: not found")
return None
def createVolume(self, name, provider, sizeMiB, comment):
- LOG.info("createVolume: name %s provider %s comment %s",
- name, provider, comment)
-
new_vol = {'additionalStates': [],
'adminSpace': {'freeMiB': 0,
'rawReservedMiB': 384,
def delete_lu(self, cmd, ip0, user, pw, hdp, lun):
_out = ""
id = "myID"
- LOG.info("Delete_Lu: check lun %s id %s", lun, id)
- if self.deleteVolumebyProvider(id + '.' + str(lun)):
- LOG.warning("Delete_Lu: failed to delete lun %s id %s", lun, id)
+ self.deleteVolumebyProvider(id + '.' + str(lun))
return _out
def create_dup(self, cmd, ip0, user, pw, src_lun, hdp, size, name):
(self.start_lun, size))
id = name
- LOG.info("HNAS Create_Dup: %d", self.start_lun)
self.createVolume(name, id + '.' + str(self.start_lun), size,
"create-dup")
self.start_lun += 1
self.init_index += 1
self.target_index += 1
self.hlun += 1
- LOG.debug("Created connection %d", self.init_index)
self.connections.append(conn)
return _out
_out = ("LUN: %s successfully extended to %s MB" % (lu, size))
id = name
self.out = _out
- LOG.info("extend_vol: lu: %s %d -> %s", lu, int(size), self.out)
v = self.getVolumebyProvider(id + '.' + str(lu))
if v:
v['sizeMiB'] = size
- LOG.info("extend_vol: out %s %s", self.out, self)
return _out
def get_luns(self):
import tempfile
import mock
-from oslo_log import log as logging
import six
from cinder import exception
from cinder.volume.drivers.hitachi import hnas_nfs as nfs
from cinder.volume import volume_types
-LOG = logging.getLogger(__name__)
SHARESCONF = """172.17.39.132:/cinder
172.17.39.133:/cinder"""
def file_clone(self, cmd, ip0, user, pw, fslabel, source_path,
target_path):
- _out = ""
- LOG.info("Clone: %s -> %s" % (source_path, target_path))
- return _out
+ return ""
def get_version(self, ver, cmd, ip0, user, pw):
self.out = "Array_ID: 18-48-A5-A1-80-13 (3080-G2) " \
import ast
from oslo_config import cfg
-from oslo_log import log as logging
from oslo_utils import units
from cinder import context
hpexceptions = hp3parclient.hpexceptions
-LOG = logging.getLogger(__name__)
-
CONF = cfg.CONF
HP3PAR_CPG = 'OpenStackCPG'
"""Unit tests for OpenStack Cinder volume drivers."""
import mock
-from oslo_log import log as logging
from oslo_utils import units
import six
hpexceptions = hplefthandclient.hpexceptions
-LOG = logging.getLogger(__name__)
-
GOODNESS_FUNCTION = \
"capabilities.capacity_utilization < 0.6? 100 : 25"
FILTER_FUNCTION = \
from xml.dom import minidom
import mock
-from oslo_log import log as logging
from cinder import exception
from cinder import test
from cinder.volume.drivers.huawei import huawei_18000
from cinder.volume.drivers.huawei import rest_common
-LOG = logging.getLogger(__name__)
test_volume = {'name': 'volume-21ec7341-9256-497b-97d9-ef48edcf0635',
'size': 2,
import mock
from oslo_concurrency import processutils
-from oslo_log import log as logging
-from oslo_utils import excutils
from oslo_utils import units
import six
from cinder.volume import utils as volume_utils
from cinder.volume import volume_types
-LOG = logging.getLogger(__name__)
-
class FlashSystemManagementSimulator(object):
def __init__(self):
self.fake_storage = fake
def _ssh(self, cmd, check_exit_code=True):
- try:
- LOG.debug('Run CLI command: %s' % cmd)
- utils.check_ssh_injection(cmd)
- ret = self.fake_storage.execute_command(cmd, check_exit_code)
- (stdout, stderr) = ret
- LOG.debug('CLI output:\n stdout: %(stdout)s\n stderr: '
- '%(stderr)s' % {'stdout': stdout, 'stderr': stderr})
-
- except processutils.ProcessExecutionError as e:
- with excutils.save_and_reraise_exception():
- LOG.debug('CLI Exception output:\n stdout: %(out)s\n '
- 'stderr: %(err)s' % {'out': e.stdout,
- 'err': e.stderr})
+ utils.check_ssh_injection(cmd)
+ ret = self.fake_storage.execute_command(cmd, check_exit_code)
return ret
"""
import mock
-from oslo_concurrency import processutils
-from oslo_log import log as logging
-from oslo_utils import excutils
import six
import random
from cinder.volume.drivers.ibm import flashsystem_iscsi
from cinder.volume import volume_types
-LOG = logging.getLogger(__name__)
-
class FlashSystemManagementSimulator(fscommon.FlashSystemManagementSimulator):
def __init__(self):
self.fake_storage = fake
def _ssh(self, cmd, check_exit_code=True):
- ret = None
- try:
- LOG.debug('Run CLI command: %s', cmd)
- utils.check_ssh_injection(cmd)
- ret = self.fake_storage.execute_command(cmd, check_exit_code)
- (stdout, stderr) = ret
- LOG.debug('CLI output:\n stdout: %(stdout)s\n stderr: '
- '%(stderr)s', {'stdout': stdout, 'stderr': stderr})
-
- except processutils.ProcessExecutionError as e:
- with excutils.save_and_reraise_exception():
- LOG.debug('CLI Exception output:\n stdout: %(out)s\n '
- 'stderr: %(err)s', {'out': e.stdout,
- 'err': e.stderr})
+ utils.check_ssh_injection(cmd)
+ ret = self.fake_storage.execute_command(cmd, check_exit_code)
return ret
import mock
from oslo_config import cfg
-from oslo_log import log as logging
from oslo_utils import units
from cinder import context
from cinder.volume import configuration as conf
from cinder.volume.drivers.ibm import ibmnas
-LOG = logging.getLogger(__name__)
-
CONF = cfg.CONF
from lxml import etree
import mock
-from oslo_log import log as logging
import six
from six.moves import BaseHTTPServer
from six.moves import http_client
from cinder.volume.drivers.netapp import utils
-LOG = logging.getLogger("cinder.volume.driver")
-
-
def create_configuration():
configuration = conf.Configuration(None)
configuration.append_config_values(options.netapp_connection_opts)
import re
import mock
-from oslo_log import log as logging
import requests
from six.moves import urllib
import cinder.volume.drivers.netapp.utils as na_utils
-LOG = logging.getLogger(__name__)
-
-
def create_configuration():
configuration = conf.Configuration(None)
configuration.append_config_values(options.netapp_basicauth_opts)
from lxml import etree
import mock
from mox3 import mox as mox_lib
-from oslo_log import log as logging
import six
from cinder import exception
-from cinder.i18n import _LW
from cinder.image import image_utils
from cinder import test
from cinder import utils as cinder_utils
from oslo_config import cfg
CONF = cfg.CONF
-LOG = logging.getLogger(__name__)
-
CONNECTION_INFO = {'hostname': 'fake_host',
'transport_type': 'https',
if (share == 'testshare' and file_name == 'img-cache-id'):
pass
else:
- LOG.warning(_LW("Share %(share)s and file name %(file_name)s")
- % {'share': share, 'file_name': file_name})
self.fail('Return result is unexpected')
def test_find_old_cache_files_notexists(self):
import mock
from oslo_config import cfg
-from oslo_log import log as logging
from cinder import exception
from cinder import test
NIMBLE_CLIENT = 'cinder.volume.drivers.nimble.client'
NIMBLE_URLLIB2 = 'six.moves.urllib.request'
NIMBLE_RANDOM = 'cinder.volume.drivers.nimble.random'
-LOG = logging.getLogger(__name__)
FAKE_ENUM_STRING = """
<simpleType name="SmErrorType">
import time
from oslo_db import exception as db_exc
-from oslo_log import log as logging
from cinder import context
from cinder import db
from cinder.volume import volume_types
-LOG = logging.getLogger(__name__)
-
-
def fake_db_qos_specs_create(context, values):
if values['name'] == 'DupQoSName':
raise exception.QoSSpecsExists(specs_id=values['name'])
import tempfile
import mock
-from oslo_log import log as logging
from oslo_utils import timeutils
from oslo_utils import units
from cinder.volume.flows.manager import create_volume
-LOG = logging.getLogger(__name__)
-
-
# This is used to collect raised exceptions so that tests may check what was
# raised.
# NOTE: this must be initialised in test setUp().
import mock
from oslo_concurrency import processutils
-from oslo_log import log as logging
from oslo_utils import units
from cinder import context
from cinder.volume import configuration as conf
from cinder.volume.drivers import srb
-LOG = logging.getLogger(__name__)
-
class SRBLvmTestCase(test_brick_lvm.BrickLvmTestCase):
def setUp(self):
import mock
from oslo_concurrency import processutils
-from oslo_log import log as logging
-from oslo_utils import excutils
from oslo_utils import importutils
from oslo_utils import units
from cinder.volume import qos_specs
from cinder.volume import volume_types
-LOG = logging.getLogger(__name__)
-
class StorwizeSVCManagementSimulator(object):
def __init__(self, pool_name):
self.fake_storage = fake
def _run_ssh(self, cmd, check_exit_code=True, attempts=1):
- try:
- LOG.debug('Run CLI command: %s' % cmd)
- utils.check_ssh_injection(cmd)
- ret = self.fake_storage.execute_command(cmd, check_exit_code)
- (stdout, stderr) = ret
- LOG.debug('CLI output:\n stdout: %(stdout)s\n stderr: '
- '%(stderr)s' % {'stdout': stdout, 'stderr': stderr})
-
- except processutils.ProcessExecutionError as e:
- with excutils.save_and_reraise_exception():
- LOG.debug('CLI Exception output:\n stdout: %(out)s\n '
- 'stderr: %(err)s' % {'out': e.stdout,
- 'err': e.stderr})
+ utils.check_ssh_injection(cmd)
+ ret = self.fake_storage.execute_command(cmd, check_exit_code)
return ret
import mock
-from oslo_log import log as logging
from oslo_utils import units
from cinder import context
from cinder.volume.drivers.tintri import TClient
from cinder.volume.drivers.tintri import TintriDriver
-LOG = logging.getLogger(__name__)
-
class FakeImage(object):
def __init__(self):
from oslo_config import cfg
-from oslo_log import log as logging
from cinder import test
from cinder.volume import configuration
-LOG = logging.getLogger(__name__)
-
-
volume_opts = [
cfg.StrOpt('str_opt', default='STR_OPT'),
cfg.BoolOpt('bool_opt', default=False)
import datetime
-
import mock
-from oslo_log import log as logging
from cinder import context
from cinder import db
from cinder.transfer import api as transfer_api
-LOG = logging.getLogger(__name__)
-
-
class VolumeTransferTestCase(test.TestCase):
"""Test cases for volume transfer code."""
def setUp(self):
import time
from oslo_config import cfg
-from oslo_log import log as logging
from cinder import context
from cinder import db
from cinder.db.sqlalchemy import api as db_api
from cinder.db.sqlalchemy import models
from cinder import exception
-from cinder.i18n import _
from cinder import test
from cinder.tests.unit import conf_fixture
from cinder.volume import qos_specs
from cinder.volume import volume_types
-LOG = logging.getLogger(__name__)
-
-
class VolumeTypeTestCase(test.TestCase):
"""Test cases for volume type code."""
def setUp(self):
new = volume_types.get_volume_type_by_name(self.ctxt,
self.vol_type1_name)
- LOG.info(_("Given data: %s"), self.vol_type1_specs)
- LOG.info(_("Result data: %s"), new)
-
self.assertEqual(self.vol_type1_description, new['description'])
for k, v in self.vol_type1_specs.items():
vol_types = volume_types.get_all_types(
self.ctxt,
search_opts={'extra_specs': {"key1": "val1"}})
- LOG.info("vol_types: %s" % vol_types)
self.assertEqual(len(vol_types), 1)
self.assertIn("type1", vol_types.keys())
self.assertEqual(vol_types['type1']['extra_specs'],
vol_types = volume_types.get_all_types(
self.ctxt,
search_opts={'extra_specs': {"key2": "val2"}})
- LOG.info("vol_types: %s" % vol_types)
self.assertEqual(len(vol_types), 2)
self.assertIn("type1", vol_types.keys())
self.assertIn("type2", vol_types.keys())
vol_types = volume_types.get_all_types(
self.ctxt,
search_opts={'extra_specs': {"key3": "val3"}})
- LOG.info("vol_types: %s" % vol_types)
self.assertEqual(len(vol_types), 1)
self.assertIn("type2", vol_types.keys())
self.ctxt,
search_opts={'extra_specs': {"key1": "val1",
"key3": "val3"}})
- LOG.info("vol_types: %s" % vol_types)
self.assertEqual(len(vol_types), 2)
self.assertIn("type1", vol_types.keys())
self.assertIn("type3", vol_types.keys())
from oslo_concurrency import processutils
from oslo_config import cfg
-from oslo_log import log as logging
from cinder import exception
from cinder import test
from cinder.volume import utils as volume_utils
-LOG = logging.getLogger(__name__)
-
CONF = cfg.CONF
import json
import mock
-from oslo_log import log as logging
from oslo_utils import units
from cinder import test
from cinder.volume.drivers.zfssa import zfssarest as rest
-LOG = logging.getLogger(__name__)
-
nfs_logbias = 'latency'
nfs_compression = 'off'
import mock
from os_brick.remotefs import remotefs as remotefs_brick
-from oslo_log import log as logging
from oslo_service import loopingcall
from oslo_utils import units
from cinder.volume import utils as volume_utils
-LOG = logging.getLogger(__name__)
-
-
class NetAppCmodeNfsDriverTestCase(test.TestCase):
def setUp(self):
super(NetAppCmodeNfsDriverTestCase, self).setUp()
# under the License.
import mock
-from oslo_log import log as logging
from cinder import context
from cinder import exception
from cinder.volume.drivers import datera
-LOG = logging.getLogger(__name__)
-
-
class DateraVolumeTestCase(test.TestCase):
def setUp(self):
super(DateraVolumeTestCase, self).setUp()
import mock
from oslo_concurrency import processutils
-from oslo_log import log as logging
from cinder import context
from cinder import exception
from cinder.volume import volume_types
-LOG = logging.getLogger(__name__)
-
-
class HGSTTestCase(test.TestCase):
# Need to mock these since we use them on driver creation
import mock
from oslo_config import cfg
-from oslo_log import log as logging
import paramiko
from cinder import exception
as brcd_lookup
from cinder.zonemanager.drivers.brocade import fc_zone_constants
-LOG = logging.getLogger(__name__)
nsshow = '20:1a:00:05:1e:e8:e3:29'
switch_data = [' N 011a00;2,3;20:1a:00:05:1e:e8:e3:29;\
import mock
from oslo_concurrency import processutils
-from oslo_log import log as logging
from cinder import exception
from cinder import test
import brcd_fc_zone_client_cli as client_cli
import cinder.zonemanager.drivers.brocade.fc_zone_constants as ZoneConstant
-LOG = logging.getLogger(__name__)
nsshow = '20:1a:00:05:1e:e8:e3:29'
switch_data = [' N 011a00;2,3;20:1a:00:05:1e:e8:e3:29;\
import mock
from oslo_config import cfg
-from oslo_log import log as logging
from oslo_utils import importutils
import paramiko
from cinder import exception
-from cinder.i18n import _LI
from cinder import test
from cinder.volume import configuration as conf
from cinder.zonemanager.drivers.brocade import brcd_fc_zone_driver as driver
-LOG = logging.getLogger(__name__)
-
_active_cfg_before_add = {}
_active_cfg_before_delete = {
'zones': {
"""Normal flow for i-t mode."""
GlobalVars._is_normal_test = True
GlobalVars._zone_state = []
- LOG.info(_LI("In Add GlobalVars._is_normal_test: "
- "%s"), GlobalVars._is_normal_test)
- LOG.info(_LI("In Add GlobalVars._zone_state:"
- " %s"), GlobalVars._zone_state)
get_active_zs_mock.return_value = _active_cfg_before_add
self.driver.add_connection('BRCD_FAB_1', _initiator_target_map)
self.assertTrue(_zone_name in GlobalVars._zone_state)
class FakeBrcdFCZoneClientCLI(object):
def __init__(self, ipaddress, username, password, port):
- LOG.info(_LI("User: %s"), username)
- LOG.info(_LI("_zone_state: %s"), GlobalVars._zone_state)
self.firmware_supported = True
if not GlobalVars._is_normal_test:
raise paramiko.SSHException("Unable to connect to fabric")
def get_active_zone_set(self):
- LOG.debug("Inside get_active_zone_set %s", GlobalVars._active_cfg)
return GlobalVars._active_cfg
def add_zones(self, zones, isActivate, active_zone_set):
"""Unit tests for fc san lookup service."""
-from oslo_log import log as logging
-
from cinder import exception
from cinder import test
from cinder.volume import configuration as conf
from cinder.zonemanager import fc_san_lookup_service as san_service
-LOG = logging.getLogger(__name__)
-
_target_ns_map = {'100000051e55a100': ['20240002ac000a50']}
_initiator_ns_map = {'100000051e55a100': ['10008c7cff523b01']}
_device_map_to_verify = {