# We have fully rendered the element; return it
return rootelem
- def render(self, parent, obj, patches=[], nsmap=None):
+ def render(self, parent, obj, patches=None, nsmap=None):
"""Render an object.
Renders an object against this template node. Returns a list
the etree.Element instances.
"""
+ patches = patches or []
# First, get the datum we're rendering
data = None if obj is None else self.selector(obj)
# License for the specific language governing permissions and limitations
# under the License.
+import re
"""
Guidelines for writing new hacking checks
yield(0, "N319 Don't translate debug level logs")
+def no_mutable_default_args(logical_line):
+ msg = "N322: Method's default argument shouldn't be mutable!"
+ mutable_default_args = re.compile(r"^\s*def .+\((.+=\{\}|.+=\[\])")
+ if mutable_default_args.match(logical_line):
+ yield (0, msg)
+
+
def factory(register):
register(no_translate_debug_logs)
+ register(no_mutable_default_args)
"""Check if the specified host passes the filters."""
raise NotImplementedError(_("Must implement host_passes_filters"))
- def find_retype_host(self, context, request_spec, filter_properties={},
+ def find_retype_host(self, context, request_spec, filter_properties=None,
migration_policy='never'):
"""Find a host that can accept the volume with its new type."""
raise NotImplementedError(_("Must implement find_retype_host"))
% {'id': request_spec['volume_id'], 'host': host})
raise exception.NoValidHost(reason=msg)
- def find_retype_host(self, context, request_spec, filter_properties={},
+ def find_retype_host(self, context, request_spec, filter_properties=None,
migration_policy='never'):
"""Find a host that can accept the volume with its new type."""
+ filter_properties = filter_properties or {}
current_host = request_spec['volume_properties']['host']
# The volume already exists on this host, and so we shouldn't check if
def stub_volume_get_all_by_project(self, context, marker, limit, sort_key,
- sort_dir, filters={},
+ sort_dir, filters=None,
viewable_admin_meta=False):
+ filters = filters or {}
return [stub_volume_get(self, context, '1')]
'FixedIntervalLoopingCall',
FakeFixedIntervalLoopingCall)
- def _mock_path_exists(self, aoe_path, mock_values=[]):
+ def _mock_path_exists(self, aoe_path, mock_values=None):
+ mock_values = mock_values or []
self.mox.StubOutWithMock(os.path, 'exists')
for value in mock_values:
os.path.exists(aoe_path).AndReturn(value)
self.driver = coraid.CoraidDriver(configuration=configuration)
self.driver.do_setup({})
- def mock_volume_types(self, repositories=[]):
+ def mock_volume_types(self, repositories=None):
if not repositories:
repositories = [fake_repository_name]
self.mox.StubOutWithMock(volume_types, 'get_volume_type_extra_specs')
class FakeManagedObjectReference(object):
- def __init__(self, lis=[]):
- self.ManagedObjectReference = lis
+ def __init__(self, lis=None):
+ self.ManagedObjectReference = lis or []
class FakeDatastoreSummary(object):
self.volume_size = 0
self.context = context.RequestContext(self.user_id, self.project_id)
- def _create_volume(self, params={}):
+ def _create_volume(self, params=None):
"""Create a test volume."""
+ params = params or {}
vol = {}
vol['snapshot_id'] = self.snapshot_id
vol['user_id'] = self.user_id
LOG.error(msg)
self.db.transfer_destroy(context, transfer_id)
- def get_all(self, context, filters={}):
+ def get_all(self, context, filters=None):
+ filters = filters or {}
volume_api.check_policy(context, 'get_all_transfers')
if context.is_admin and 'all_tenants' in filters:
transfers = self.db.transfer_get_all(context)
return specs
-def check_apis_on_cluster(na_server, api_list=[]):
+def check_apis_on_cluster(na_server, api_list=None):
"""Checks api availability and permissions on cluster.
Checks api availability and permissions for executing user.
Returns a list of failed apis.
"""
+ api_list = api_list or []
failed_apis = []
if api_list:
api_version = na_server.get_api_version()
profile_id = profile.uniqueId
return profile_id
- def _create_backing(self, volume, host, create_params={}):
+ def _create_backing(self, volume, host, create_params=None):
"""Create volume backing under the given host.
:param volume: Volume object
backing VM creation
:return: Reference to the created backing
"""
+ create_params = create_params or {}
# Get datastores and resource pool of the host
(datastores, resource_pool) = self.volumeops.get_dss_rp(host)
# Pick a folder and datastore to create the volume backing on
LOG.error(msg)
raise error_util.VimException(msg)
- def _create_backing_in_inventory(self, volume, create_params={}):
+ def _create_backing_in_inventory(self, volume, create_params=None):
"""Creates backing under any suitable host.
The method tries to pick datastore that can fit the volume under
backing VM creation
:return: Reference to the created backing
"""
-
+ create_params = create_params or {}
retrv_result = self.volumeops.get_hosts()
while retrv_result:
hosts = retrv_result.objects
type_id=None)
-def get_all_specs(context, inactive=False, search_opts={}):
+def get_all_specs(context, inactive=False, search_opts=None):
"""Get all non-deleted qos specs.
Pass inactive=True as argument and deleted volume types would return
as well.
"""
+ search_opts = search_opts or {}
qos_specs = db.qos_specs_get_all(context, inactive)
if search_opts:
LOG = logging.getLogger(__name__)
-def create(context, name, extra_specs={}):
+def create(context, name, extra_specs=None):
"""Creates volume types."""
+ extra_specs = extra_specs or {}
try:
type_ref = db.volume_type_create(context,
dict(name=name,
db.volume_type_destroy(context, id)
-def get_all_types(context, inactive=0, search_opts={}):
+def get_all_types(context, inactive=0, search_opts=None):
"""Get all non-deleted volume_types.
Pass true as argument if you want deleted volume types returned also.
"""
+ search_opts = search_opts or {}
vol_types = db.volume_type_get_all(context, inactive)
if search_opts: