class NaServer(object):
- """ Encapsulates server connection logic"""
+ """Encapsulates server connection logic."""
TRANSPORT_TYPE_HTTP = 'http'
TRANSPORT_TYPE_HTTPS = 'https'
def set_transport_type(self, transport_type):
"""Set the transport type protocol for api.
- Supports http and https transport types.
+
+ Supports http and https transport types.
"""
if transport_type.lower() not in (
NaServer.TRANSPORT_TYPE_HTTP,
def set_style(self, style):
"""Set the authorization style for communicating with the server.
- Supports basic_auth for now.
- Certificate_auth mode to be done.
+
+ Supports basic_auth for now. Certificate_auth mode to be done.
"""
if style.lower() not in (NaServer.STYLE_LOGIN_PASSWORD,
NaServer.STYLE_CERTIFICATE):
def set_server_type(self, server_type):
"""Set the target server type.
- Supports filer and dfm server types.
+
+ Supports filer and dfm server types.
"""
if server_type.lower() not in (NaServer.SERVER_TYPE_FILER,
NaServer.SERVER_TYPE_DFM):
return self._port
def set_timeout(self, seconds):
- """Sets the timeout in seconds"""
+ """Sets the timeout in seconds."""
try:
self._timeout = int(seconds)
except ValueError:
return None
def get_vfiler(self):
- """Get the vfiler tunneling."""
+ """Get the vfiler to use in tunneling."""
return self._vfiler
def set_vfiler(self, vfiler):
- """Set the vfiler tunneling."""
+ """Set the vfiler to use if tunneling gets enabled."""
self._vfiler = vfiler
def get_vserver(self):
- """Get the vserver for tunneling."""
+ """Get the vserver to use in tunneling."""
return self._vserver
def set_vserver(self, vserver):
- """Set the vserver for tunneling."""
+ """Set the vserver to use if tunneling gets enabled."""
self._vserver = vserver
def set_username(self, username):
- """Set the username for authentication."""
+ """Set the user name for authentication."""
self._username = username
self._refresh_conn = True
self._password = password
self._refresh_conn = True
- def invoke_elem(self, na_element):
+ def invoke_elem(self, na_element, enable_tunneling=False):
"""Invoke the api on the server."""
if na_element and not isinstance(na_element, NaElement):
ValueError('NaElement must be supplied to invoke api')
- request = self._create_request(na_element)
+ request = self._create_request(na_element, enable_tunneling)
if not hasattr(self, '_opener') or not self._opener \
or self._refresh_conn:
self._build_opener()
xml = response.read()
return self._get_result(xml)
- def invoke_successfully(self, na_element):
- """Invokes api and checks execution status as success."""
- result = self.invoke_elem(na_element)
+ def invoke_successfully(self, na_element, enable_tunneling=False):
+ """Invokes api and checks execution status as success.
+
+ Need to set enable_tunneling to True explicitly to achieve it.
+ This helps to use same connection instance to enable or disable
+ tunneling. The vserver or vfiler should be set before this call
+ otherwise tunneling remains disabled.
+ """
+ result = self.invoke_elem(na_element, enable_tunneling)
if result.has_attr('status') and result.get_attr('status') == 'passed':
return result
code = result.get_attr('errno')\
or 'Execution status is failed due to unknown reason'
raise NaApiError(code, msg)
- def _create_request(self, na_element):
+ def _create_request(self, na_element, enable_tunneling=False):
"""Creates request in the desired format."""
netapp_elem = NaElement('netapp')
netapp_elem.add_attr('xmlns', self._ns)
if hasattr(self, '_api_version'):
netapp_elem.add_attr('version', self._api_version)
+ if enable_tunneling:
+ self._enable_tunnel_request(netapp_elem)
+ netapp_elem.add_child_elem(na_element)
+ request_d = netapp_elem.to_string()
+ request = urllib2.Request(
+ self._get_url(), data=request_d,
+ headers={'Content-Type': 'text/xml', 'charset': 'utf-8'})
+ return request
+
+ def _enable_tunnel_request(self, netapp_elem):
+ """Enables vserver or vfiler tunneling."""
if hasattr(self, '_vfiler') and self._vfiler:
if hasattr(self, '_api_major_version') and \
hasattr(self, '_api_minor_version') and \
else:
raise ValueError('ontapi version has to be atleast 1.15'
' to send request to vserver')
- netapp_elem.add_child_elem(na_element)
- request_d = netapp_elem.to_string()
- request = urllib2.Request(
- self._get_url(), data=request_d,
- headers={'Content-Type': 'text/xml', 'charset': 'utf-8'})
- return request
def _parse_response(self, response):
"""Get the NaElement for the response."""
def add_new_child(self, name, content, convert=False):
"""Add child with tag name and context.
- Convert replaces entity refs to chars.
- """
+
+ Convert replaces entity refs to chars."""
child = NaElement(name)
if convert:
content = NaElement._convert_entity_refs(content)
@staticmethod
def _convert_entity_refs(text):
- """Converts entity refs to chars
- neccessary to handle etree auto conversions.
- """
+ """Converts entity refs to chars to handle etree auto conversions."""
text = text.replace("<", "<")
text = text.replace(">", ">")
return text
self.add_child_elem(parent)
def to_string(self, pretty=False, method='xml', encoding='UTF-8'):
- """Prints the element to string"""
+ """Prints the element to string."""
return etree.tostring(self._element, method=method, encoding=encoding,
pretty_print=pretty)
class NaApiError(Exception):
"""Base exception class for NetApp api errors."""
+
def __init__(self, code='unknown', message='unknown'):
self.code = code
self.message = message
def get_volume_stats(self, refresh=False):
"""Get volume status.
- If 'refresh' is True, run update the stats first."""
+ If 'refresh' is True, run update the stats first.
+ """
if refresh:
self._update_volume_status()
"""
host_filer = kwargs['hostname']
LOG.debug(_('Using NetApp filer: %s') % host_filer)
- # Do not use client directly
- # Use _invoke_successfully instead to make sure
- # we use the right api i.e. cluster or vserver api
- # and not the connection from previous call
self.client = NaServer(host=host_filer,
server_type=NaServer.SERVER_TYPE_FILER,
transport_type=kwargs['transport_type'],
'lun-destroy',
**{'path': metadata['Path'],
'force': 'true'})
- self._invoke_successfully(lun_destroy, True)
+ self.client.invoke_successfully(lun_destroy, True)
LOG.debug(_("Destroyed LUN %s") % name)
self.lun_table.pop(name)
def _get_ontapi_version(self):
"""Gets the supported ontapi version."""
ontapi_version = NaElement('system-get-ontapi-version')
- res = self._invoke_successfully(ontapi_version, False)
+ res = self.client.invoke_successfully(ontapi_version, False)
major = res.get_child_content('major-version')
minor = res.get_child_content('minor-version')
return (major, minor)
'ostype': metadata['OsType'],
'space-reservation-enabled':
metadata['SpaceReserved']})
- self._invoke_successfully(lun_create, True)
+ self.client.invoke_successfully(lun_create, True)
metadata['Path'] = '/vol/%s/%s' % (volume['name'], name)
metadata['Volume'] = volume['name']
metadata['Qtree'] = None
def _extract_and_populate_luns(self, api_luns):
"""Extracts the luns from api.
- Populates in the lun table.
+
+ Populates in the lun table.
"""
for lun in api_luns:
meta_dict = self._create_lun_meta(lun)
size, meta_dict)
self._add_lun_to_table(discovered_lun)
- def _invoke_successfully(self, na_element, do_tunneling=False):
- """Invoke the api for successful result.
- do_tunneling sets flag for tunneling.
- """
- self._is_naelement(na_element)
- self._configure_tunneling(do_tunneling)
- result = self.client.invoke_successfully(na_element)
- return result
-
- def _configure_tunneling(self, do_tunneling=False):
- """Configures tunneling based on system type."""
- raise NotImplementedError()
-
def _is_naelement(self, elem):
"""Checks if element is NetApp element."""
if not isinstance(elem, NaElement):
raise ValueError('Expects NaElement')
def _map_lun(self, name, initiator, initiator_type='iscsi', lun_id=None):
- """Maps lun to the initiator.
- Returns lun id assigned.
- """
+ """Maps lun to the initiator and returns lun id assigned."""
metadata = self._get_lun_attr(name, 'metadata')
os = metadata['OsType']
path = metadata['Path']
if lun_id:
lun_map.add_new_child('lun-id', lun_id)
try:
- result = self._invoke_successfully(lun_map, True)
+ result = self.client.invoke_successfully(lun_map, True)
return result.get_child_content('lun-id-assigned')
except NaApiError as e:
code = e.code
**{'path': path,
'initiator-group': igroup_name})
try:
- self._invoke_successfully(lun_unmap, True)
+ self.client.invoke_successfully(lun_unmap, True)
except NaApiError as e:
msg = _("Error unmapping lun. Code :%(code)s, Message:%(message)s")
code = e.code
def _get_or_create_igroup(self, initiator, initiator_type='iscsi',
os='default'):
"""Checks for an igroup for an initiator.
- Creates igroup if not found.
+
+ Creates igroup if not found.
"""
igroups = self._get_igroup_by_initiator(initiator=initiator)
igroup_name = None
**{'initiator-group-name': igroup,
'initiator-group-type': igroup_type,
'os-type': os_type})
- self._invoke_successfully(igroup_create, True)
+ self.client.invoke_successfully(igroup_create, True)
def _add_igroup_initiator(self, igroup, initiator):
"""Adds initiators to the specified igroup."""
'igroup-add',
**{'initiator-group-name': igroup,
'initiator': initiator})
- self._invoke_successfully(igroup_add, True)
+ self.client.invoke_successfully(igroup_add, True)
def _get_qos_type(self, volume):
"""Get the storage service type for a volume."""
def _do_custom_setup(self):
"""Does custom setup for ontap cluster."""
self.vserver = self.configuration.netapp_vserver
+ # We set vserver in client permanently.
+ # To use tunneling enable_tunneling while invoking api
+ self.client.set_vserver(self.vserver)
# Default values to run first api
self.client.set_api_version(1, 15)
(major, minor) = self._get_ontapi_version()
tag = None
while True:
vol_request = self._create_avl_vol_request(self.vserver, tag)
- res = self._invoke_successfully(vol_request)
+ res = self.client.invoke_successfully(vol_request)
tag = res.get_child_content('next-tag')
attr_list = res.get_child_by_name('attributes-list')
vols = attr_list.get_children()
def _get_target_details(self):
"""Gets the target portal details."""
iscsi_if_iter = NaElement('iscsi-interface-get-iter')
- result = self._invoke_successfully(iscsi_if_iter, True)
+ result = self.client.invoke_successfully(iscsi_if_iter, True)
tgt_list = []
if result.get_child_content('num-records')\
and int(result.get_child_content('num-records')) >= 1:
def _get_iscsi_service_details(self):
"""Returns iscsi iqn."""
iscsi_service_iter = NaElement('iscsi-service-get-iter')
- result = self._invoke_successfully(iscsi_service_iter, True)
+ result = self.client.invoke_successfully(iscsi_service_iter, True)
if result.get_child_content('num-records') and\
int(result.get_child_content('num-records')) >= 1:
attr_list = result.get_child_by_name('attributes-list')
return '%s:%s' % (self.vserver, metadata['Path'])
def _get_lun_list(self):
- """Gets the list of luns on filer."""
- """Gets the luns from cluster with vserver."""
+ """Gets the list of luns on filer.
+
+ Gets the luns from cluster with vserver.
+ """
tag = None
while True:
api = NaElement('lun-get-iter')
query = NaElement('query')
query.add_child_elem(lun_info)
api.add_child_elem(query)
- result = self._invoke_successfully(api)
+ result = self.client.invoke_successfully(api)
if result.get_child_by_name('num-records') and\
int(result.get_child_content('num-records')) >= 1:
attr_list = result.get_child_by_name('attributes-list')
query = NaElement('query')
lun_map_iter.add_child_elem(query)
query.add_node_with_children('lun-map-info', **{'path': path})
- result = self._invoke_successfully(lun_map_iter, True)
+ result = self.client.invoke_successfully(lun_map_iter, True)
tag = result.get_child_content('next-tag')
if result.get_child_content('num-records') and \
int(result.get_child_content('num-records')) >= 1:
des_ig_info.add_new_child('initiator-group-type', None)
des_ig_info.add_new_child('initiator-group-os-type', None)
igroup_iter.add_child_elem(des_attrs)
- result = self._invoke_successfully(igroup_iter, None)
+ result = self.client.invoke_successfully(igroup_iter, False)
tag = result.get_child_content('next-tag')
if result.get_child_content('num-records') and\
int(result.get_child_content('num-records')) > 0:
**{'volume': volume, 'source-path': name,
'destination-path': new_name,
'space-reserve': space_reserved})
- self._invoke_successfully(clone_create, True)
+ self.client.invoke_successfully(clone_create, True)
LOG.debug(_("Cloned LUN with new name %s") % new_name)
lun = self._get_lun_by_args(vserver=self.vserver, path='/vol/%s/%s'
% (volume, new_name))
query = NaElement('query')
lun_iter.add_child_elem(query)
query.add_node_with_children('lun-info', **args)
- luns = self._invoke_successfully(lun_iter)
+ luns = self.client.invoke_successfully(lun_iter)
attr_list = luns.get_child_by_name('attributes-list')
return attr_list.get_children()
if self.vfiler:
(major, minor) = self._get_ontapi_version()
self.client.set_api_version(major, minor)
+ self.client.set_vfiler(self.vfiler)
def _get_avl_volume_by_size(self, size):
"""Get the available volume by size."""
vol_request = NaElement('volume-list-info')
- res = self._invoke_successfully(vol_request, True)
+ res = self.client.invoke_successfully(vol_request, True)
volumes = res.get_child_by_name('volumes')
vols = volumes.get_children()
for vol in vols:
"""Checks if a volume is not root."""
vol_options = NaElement.create_node_with_children(
'volume-options-list-info', **{'volume': vol['name']})
- result = self._invoke_successfully(vol_options, True)
+ result = self.client.invoke_successfully(vol_options, True)
options = result.get_child_by_name('options')
ops = options.get_children()
for op in ops:
def _get_igroup_by_initiator(self, initiator):
"""Get igroups by initiator."""
igroup_list = NaElement('igroup-list-info')
- result = self._invoke_successfully(igroup_list, True)
+ result = self.client.invoke_successfully(igroup_list, True)
igroups = []
igs = result.get_child_by_name('initiator-groups')
if igs:
def _get_target_details(self):
"""Gets the target portal details."""
iscsi_if_iter = NaElement('iscsi-portal-list-info')
- result = self._invoke_successfully(iscsi_if_iter, True)
+ result = self.client.invoke_successfully(iscsi_if_iter, True)
tgt_list = []
portal_list_entries = result.get_child_by_name(
'iscsi-portal-list-entries')
def _get_iscsi_service_details(self):
"""Returns iscsi iqn."""
iscsi_service_iter = NaElement('iscsi-node-get-name')
- result = self._invoke_successfully(iscsi_service_iter, True)
+ result = self.client.invoke_successfully(iscsi_service_iter, True)
return result.get_child_content('node-name')
def _create_lun_handle(self, metadata):
api = NaElement('lun-list-info')
if vol_name:
api.add_new_child('volume-name', vol_name)
- result = self._invoke_successfully(api, True)
+ result = self.client.invoke_successfully(api, True)
luns = result.get_child_by_name('luns')
return luns.get_children()
lun_map_list = NaElement.create_node_with_children(
'lun-map-list-info',
**{'path': path})
- result = self._invoke_successfully(lun_map_list, True)
+ result = self.client.invoke_successfully(lun_map_list, True)
igroups = result.get_child_by_name('initiator-groups')
if igroups:
igroup = None
'clone-start',
**{'source-path': path, 'destination-path': clone_path,
'no-snap': 'true'})
- result = self._invoke_successfully(clone_start, True)
+ result = self.client.invoke_successfully(clone_start, True)
clone_id_el = result.get_child_by_name('clone-id')
cl_id_info = clone_id_el.get_child_by_name('clone-id-info')
vol_uuid = cl_id_info.get_child_content('volume-uuid')
space_res = NaElement.create_node_with_children(
'lun-set-space-reservation-info',
**{'path': path, 'enable': enable})
- self._invoke_successfully(space_res, True)
+ self.client.invoke_successfully(space_res, True)
def _check_clone_status(self, clone_id, vol_uuid, name, new_name):
"""Checks for the job till completed."""
running = True
clone_ops_info = None
while running:
- result = self._invoke_successfully(clone_status, True)
+ result = self.client.invoke_successfully(clone_status, True)
status = result.get_child_by_name('status')
ops_info = status.get_children()
if ops_info:
def _get_lun_by_args(self, **args):
"""Retrives lun with specified args."""
lun_info = NaElement.create_node_with_children('lun-list-info', **args)
- result = self._invoke_successfully(lun_info, True)
+ result = self.client.invoke_successfully(lun_info, True)
luns = result.get_child_by_name('luns')
if luns:
infos = luns.get_children()
'is-space-reservation-enabled')
return meta_dict
- def _configure_tunneling(self, do_tunneling=False):
- """Configures tunneling for 7 mode."""
- if do_tunneling:
- self.client.set_vfiler(self.vfiler)
- else:
- self.client.set_vfiler(None)
-
def _update_volume_status(self):
"""Retrieve status info from volume group."""
Volume driver for NetApp NFS storage.
"""
+import copy
import os
import time
self._client = self._get_client()
def check_for_setup_error(self):
- """Returns an error if prerequisites aren't met"""
+ """Returns an error if prerequisites aren't met."""
self._check_dfm_flags()
super(NetAppNFSDriver, self).check_for_setup_error()
return (nfs_server_ip + ':' + export_path)
def _clone_volume(self, volume_name, clone_name, volume_id):
- """Clones mounted volume with OnCommand proxy API"""
+ """Clones mounted volume with OnCommand proxy API."""
host_id = self._get_host_id(volume_id)
export_path = self._get_full_export_path(volume_id, host_id)
return volume.provider_location
def _get_host_ip(self, volume_id):
- """Returns IP address for the given volume"""
+ """Returns IP address for the given volume."""
return self._get_provider_location(volume_id).split(':')[0]
def _get_export_path(self, volume_id):
- """Returns NFS export path for the given volume"""
+ """Returns NFS export path for the given volume."""
return self._get_provider_location(volume_id).split(':')[1]
def _get_host_id(self, volume_id):
- """Returns ID of the ONTAP-7 host"""
+ """Returns ID of the ONTAP-7 host."""
host_ip = self._get_host_ip(volume_id)
server = self._client.service
server.HostListInfoIterEnd(Tag=tag)
def _get_full_export_path(self, volume_id, host_id):
- """Returns full path to the NFS share, e.g. /vol/vol0/home"""
+ """Returns full path to the NFS share, e.g. /vol/vol0/home."""
export_path = self._get_export_path(volume_id)
command_args = '<pathname>%s</pathname>'
raise exception.CinderException(resp.Reason)
def _volume_not_present(self, nfs_mount, volume_name):
- """
- Check if volume exists
- """
+ """Check if volume exists."""
try:
self._try_execute('ls', self._get_volume_path(nfs_mount,
volume_name))
class NetAppCmodeNfsDriver (NetAppNFSDriver):
- """Executes commands related to volumes on c mode"""
+ """Executes commands related to volumes on c mode."""
+
def __init__(self, *args, **kwargs):
super(NetAppCmodeNfsDriver, self).__init__(*args, **kwargs)
self._client = self._get_client()
def check_for_setup_error(self):
- """Returns an error if prerequisites aren't met"""
+ """Returns an error if prerequisites aren't met."""
self._check_flags()
def _clone_volume(self, volume_name, clone_name, volume_id):
- """Clones mounted volume with NetApp Cloud Services"""
+ """Clones mounted volume with NetApp Cloud Services."""
host_ip = self._get_host_ip(volume_id)
export_path = self._get_export_path(volume_id)
LOG.debug(_("""Cloning with params ip %(host_ip)s, exp_path
class NetAppDirectNfsDriver (NetAppNFSDriver):
- """Executes commands related to volumes on NetApp filer"""
+ """Executes commands related to volumes on NetApp filer."""
+
def __init__(self, *args, **kwargs):
super(NetAppDirectNfsDriver, self).__init__(*args, **kwargs)
self._do_custom_setup(self._client)
def check_for_setup_error(self):
- """Returns an error if prerequisites aren't met"""
+ """Returns an error if prerequisites aren't met."""
self._check_flags()
def _clone_volume(self, volume_name, clone_name, volume_id):
- """Clones mounted volume on NetApp filer"""
+ """Clones mounted volume on NetApp filer."""
raise NotImplementedError()
def _check_flags(self):
return client
def _do_custom_setup(self, client):
- """Do the customized set up on client if any for different types"""
+ """Do the customized set up on client if any for different types."""
raise NotImplementedError()
def _is_naelement(self, elem):
- """Checks if element is NetApp element"""
+ """Checks if element is NetApp element."""
if not isinstance(elem, NaElement):
raise ValueError('Expects NaElement')
def _invoke_successfully(self, na_element, vserver=None):
"""Invoke the api for successful result.
- Vserver implies vserver api else filer/Cluster api.
+
+ If vserver is present then invokes vserver/vfiler api
+ else filer/Cluster api.
+ :param vserver: vserver/vfiler name.
"""
self._is_naelement(na_element)
+ server = copy.copy(self._client)
if vserver:
- self._client.set_vserver(vserver)
+ server.set_vserver(vserver)
else:
- self._client.set_vserver(None)
- result = self._client.invoke_successfully(na_element)
+ server.set_vserver(None)
+ result = server.invoke_successfully(na_element, True)
return result
def _get_ontapi_version(self):
class NetAppDirectCmodeNfsDriver (NetAppDirectNfsDriver):
- """Executes commands related to volumes on c mode"""
+ """Executes commands related to volumes on c mode."""
+
def __init__(self, *args, **kwargs):
super(NetAppDirectCmodeNfsDriver, self).__init__(*args, **kwargs)
def _do_custom_setup(self, client):
- """Do the customized set up on client for cluster mode"""
+ """Do the customized set up on client for cluster mode."""
# Default values to run first api
client.set_api_version(1, 15)
(major, minor) = self._get_ontapi_version()
client.set_api_version(major, minor)
def _clone_volume(self, volume_name, clone_name, volume_id):
- """Clones mounted volume on NetApp Cluster"""
+ """Clones mounted volume on NetApp Cluster."""
host_ip = self._get_host_ip(volume_id)
export_path = self._get_export_path(volume_id)
ifs = self._get_if_info_by_ip(host_ip)
% (ip))
def _get_vol_by_junc_vserver(self, vserver, junction):
- """Gets the volume by junction path and vserver"""
+ """Gets the volume by junction path and vserver."""
vol_iter = NaElement('volume-get-iter')
vol_iter.add_new_child('max-records', '10')
query = NaElement('query')
""") % locals())
def _clone_file(self, volume, src_path, dest_path, vserver=None):
- """Clones file on vserver"""
+ """Clones file on vserver."""
LOG.debug(_("""Cloning with params volume %(volume)s,src %(src_path)s,
dest %(dest_path)s, vserver %(vserver)s""")
% locals())
class NetAppDirect7modeNfsDriver (NetAppDirectNfsDriver):
- """Executes commands related to volumes on 7 mode"""
+ """Executes commands related to volumes on 7 mode."""
+
def __init__(self, *args, **kwargs):
super(NetAppDirect7modeNfsDriver, self).__init__(*args, **kwargs)
def _do_custom_setup(self, client):
- """Do the customized set up on client if any for 7 mode"""
+ """Do the customized set up on client if any for 7 mode."""
(major, minor) = self._get_ontapi_version()
client.set_api_version(major, minor)
def _clone_volume(self, volume_name, clone_name, volume_id):
- """Clones mounted volume with NetApp filer"""
+ """Clones mounted volume with NetApp filer."""
export_path = self._get_export_path(volume_id)
storage_path = self._get_actual_path_for_export(export_path)
target_path = '%s/%s' % (storage_path, clone_name)
raise e
def _get_actual_path_for_export(self, export_path):
- """Gets the actual path on the filer for export path"""
+ """Gets the actual path on the filer for export path."""
storage_path = NaElement.create_node_with_children(
'nfs-exportfs-storage-path', **{'pathname': export_path})
result = self._invoke_successfully(storage_path, None)
def _start_clone(self, src_path, dest_path):
"""Starts the clone operation.
- Returns the clone-id
+
+ :returns: clone-id
"""
LOG.debug(_("""Cloning with src %(src_path)s, dest %(dest_path)s""")
% locals())
return (clone_id, vol_uuid)
def _wait_for_clone_finish(self, clone_op_id, vol_uuid):
- """
- Waits till a clone operation is complete or errored out.
- """
+ """Waits till a clone operation is complete or errored out."""
clone_ls_st = NaElement('clone-list-status')
clone_id = NaElement('clone-id')
clone_ls_st.add_child_elem(clone_id)
def _clear_clone(self, clone_id):
"""Clear the clone information.
- Invoke this in case of failed clone.
+
+ Invoke this in case of failed clone.
"""
clone_clear = NaElement.create_node_with_children(
'clone-clear',