]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
NetApp fix for vsadmin role failure for ssc
authorNavneet Singh <singn@netapp.com>
Sun, 17 Nov 2013 23:16:31 +0000 (04:46 +0530)
committerNavneet Singh <singn@netapp.com>
Tue, 19 Nov 2013 06:55:28 +0000 (12:25 +0530)
The ssc feature does not work with vserver admin
role. If the user is a vsadmin then he does cannot
use the ssc feature fully. This fix checks the access
and issues apis appropriately and also handles error
conditions.

Closes-Bug: #1253657

Change-Id: Ibc4409e5d5e6e36bba41e5f59dd94c05b87b1cde

cinder/tests/test_netapp_nfs.py
cinder/volume/drivers/netapp/api.py
cinder/volume/drivers/netapp/iscsi.py
cinder/volume/drivers/netapp/nfs.py
cinder/volume/drivers/netapp/ssc_utils.py
cinder/volume/drivers/netapp/utils.py

index 61cccf3935c5a3327f80c8de7ae3afb16b0a12d6..d7d7b02189603ee4517787ee1b5350d55e9452b7 100644 (file)
@@ -31,6 +31,7 @@ from cinder import test
 from cinder.volume import configuration as conf
 from cinder.volume.drivers.netapp import api
 from cinder.volume.drivers.netapp import nfs as netapp_nfs
+from cinder.volume.drivers.netapp import ssc_utils
 
 
 from oslo.config import cfg
@@ -138,11 +139,12 @@ class NetappDirectCmodeNfsDriverTestCase(test.TestCase):
 
         mox.StubOutWithMock(drv, '_get_provider_location')
         mox.StubOutWithMock(drv, '_volume_not_present')
+        mox.StubOutWithMock(drv, '_post_prov_deprov_in_ssc')
 
         if snapshot_exists:
             mox.StubOutWithMock(drv, '_execute')
             mox.StubOutWithMock(drv, '_get_volume_path')
-
+        drv._get_provider_location(IgnoreArg())
         drv._get_provider_location(IgnoreArg())
         drv._volume_not_present(IgnoreArg(), IgnoreArg())\
             .AndReturn(not snapshot_exists)
@@ -151,6 +153,8 @@ class NetappDirectCmodeNfsDriverTestCase(test.TestCase):
             drv._get_volume_path(IgnoreArg(), IgnoreArg())
             drv._execute('rm', None, run_as_root=True)
 
+        drv._post_prov_deprov_in_ssc(IgnoreArg())
+
         mox.ReplayAll()
 
         return mox
@@ -197,7 +201,11 @@ class NetappDirectCmodeNfsDriverTestCase(test.TestCase):
         # set required flags
         for flag in required_flags:
             setattr(drv.configuration, flag, 'val')
+        setattr(drv, 'ssc_enabled', False)
+
+        mox.StubOutWithMock(netapp_nfs.NetAppDirectNfsDriver, '_check_flags')
 
+        netapp_nfs.NetAppDirectNfsDriver._check_flags()
         mox.ReplayAll()
 
         drv.check_for_setup_error()
@@ -238,6 +246,7 @@ class NetappDirectCmodeNfsDriverTestCase(test.TestCase):
         mox.StubOutWithMock(drv, '_get_if_info_by_ip')
         mox.StubOutWithMock(drv, '_get_vol_by_junc_vserver')
         mox.StubOutWithMock(drv, '_clone_file')
+        mox.StubOutWithMock(drv, '_post_prov_deprov_in_ssc')
 
         drv._get_host_ip(IgnoreArg()).AndReturn('127.0.0.1')
         drv._get_export_path(IgnoreArg()).AndReturn('/nfs')
@@ -246,6 +255,7 @@ class NetappDirectCmodeNfsDriverTestCase(test.TestCase):
         drv._get_vol_by_junc_vserver('openstack', '/nfs').AndReturn('nfsvol')
         drv._clone_file('nfsvol', 'volume_name', 'clone_name',
                         'openstack')
+        drv._post_prov_deprov_in_ssc(IgnoreArg())
         return mox
 
     def _prepare_info_by_ip_response(self):
@@ -287,8 +297,9 @@ class NetappDirectCmodeNfsDriverTestCase(test.TestCase):
         volume_name = 'volume_name'
         clone_name = 'clone_name'
         volume_id = volume_name + str(hash(volume_name))
+        share = 'ip:/share'
 
-        drv._clone_volume(volume_name, clone_name, volume_id)
+        drv._clone_volume(volume_name, clone_name, volume_id, share)
 
         mox.VerifyAll()
 
@@ -792,6 +803,29 @@ class NetappDirect7modeNfsDriverTestCase(NetappDirectCmodeNfsDriverTestCase):
         self._driver = netapp_nfs.NetAppDirect7modeNfsDriver(
             configuration=create_configuration())
 
+    def _prepare_delete_snapshot_mock(self, snapshot_exists):
+        drv = self._driver
+        mox = self.mox
+
+        mox.StubOutWithMock(drv, '_get_provider_location')
+        mox.StubOutWithMock(drv, '_volume_not_present')
+
+        if snapshot_exists:
+            mox.StubOutWithMock(drv, '_execute')
+            mox.StubOutWithMock(drv, '_get_volume_path')
+
+        drv._get_provider_location(IgnoreArg())
+        drv._volume_not_present(IgnoreArg(), IgnoreArg())\
+            .AndReturn(not snapshot_exists)
+
+        if snapshot_exists:
+            drv._get_volume_path(IgnoreArg(), IgnoreArg())
+            drv._execute('rm', None, run_as_root=True)
+
+        mox.ReplayAll()
+
+        return mox
+
     def test_check_for_setup_error_version(self):
         drv = self._driver
         drv._client = api.NaServer("127.0.0.1")
index 4503488d95a1128d215e90ff2279a862e5423434..368b7f821955aaea088012bdbed14519720f8eeb 100644 (file)
@@ -479,3 +479,8 @@ class NaApiError(Exception):
 
     def __str__(self, *args, **kwargs):
         return 'NetApp api failed. Reason - %s:%s' % (self.code, self.message)
+
+
+NaErrors = {'API_NOT_FOUND': NaApiError('13005', 'Unable to find API'),
+            'INSUFFICIENT_PRIVS': NaApiError('13003',
+                                             'Insufficient privileges')}
index be5135503ed2353f558203948ecf546aa575f4f7..e28adf45a7c1fc6d75485c2d84202223f0c02d37 100644 (file)
@@ -782,6 +782,11 @@ class NetAppDirectCmodeISCSIDriver(NetAppDirectISCSIDriver):
         self.ssc_vols = None
         self.stale_vols = set()
 
+    def check_for_setup_error(self):
+        """Check that the driver is working and can communicate."""
+        ssc_utils.check_ssc_api_permissions(self.client)
+        super(NetAppDirectCmodeISCSIDriver, self).check_for_setup_error()
+
     def _create_lun_on_eligible_vol(self, name, size, metadata,
                                     extra_specs=None):
         """Creates an actual lun on filer."""
@@ -1108,6 +1113,17 @@ class NetAppDirectCmodeISCSIDriver(NetAppDirectISCSIDriver):
         """Refreshes ssc_vols with latest entries."""
         self.ssc_vols = vols
 
+    def delete_volume(self, volume):
+        """Driver entry point for destroying existing volumes."""
+        lun = self.lun_table.get(volume['name'])
+        netapp_vol = None
+        if lun:
+            netapp_vol = lun.get_metadata_property('Volume')
+        super(NetAppDirectCmodeISCSIDriver, self).delete_volume(volume)
+        if netapp_vol:
+            self._update_stale_vols(
+                volume=ssc_utils.NetAppVolume(netapp_vol, self.vserver))
+
 
 class NetAppDirect7modeISCSIDriver(NetAppDirectISCSIDriver):
     """NetApp 7-mode iSCSI volume driver."""
index 6f826979e53cac342050daca10d28ad35c9722ca..0b5621abb7c4e3662fadc0a7aa706b477b0c5f82 100644 (file)
@@ -718,6 +718,12 @@ class NetAppDirectCmodeNfsDriver (NetAppDirectNfsDriver):
             self.ssc_enabled = False
             LOG.warn(_("No vserver set in config. SSC will be disabled."))
 
+    def check_for_setup_error(self):
+        """Check that the driver is working and can communicate."""
+        super(NetAppDirectCmodeNfsDriver, self).check_for_setup_error()
+        if self.ssc_enabled:
+            ssc_utils.check_ssc_api_permissions(self._client)
+
     def _invoke_successfully(self, na_element, vserver=None):
         """Invoke the api for successful result.
 
@@ -786,6 +792,8 @@ class NetAppDirectCmodeNfsDriver (NetAppDirectNfsDriver):
         """Clones mounted volume on NetApp Cluster."""
         (vserver, exp_volume) = self._get_vserver_and_exp_vol(volume_id, share)
         self._clone_file(exp_volume, volume_name, clone_name, vserver)
+        share = share if share else self._get_provider_location(volume_id)
+        self._post_prov_deprov_in_ssc(share)
 
     def _get_vserver_and_exp_vol(self, volume_id=None, share=None):
         """Gets the vserver and export volume for share."""
@@ -1037,6 +1045,24 @@ class NetAppDirectCmodeNfsDriver (NetAppDirectNfsDriver):
         vols = ssc_utils.get_volumes_for_specs(self.ssc_vols, extra_specs)
         return netapp_vol in vols
 
+    def delete_volume(self, volume):
+        """Deletes a logical volume."""
+        share = volume['provider_location']
+        super(NetAppDirectCmodeNfsDriver, self).delete_volume(volume)
+        self._post_prov_deprov_in_ssc(share)
+
+    def delete_snapshot(self, snapshot):
+        """Deletes a snapshot."""
+        share = self._get_provider_location(snapshot.volume_id)
+        super(NetAppDirectCmodeNfsDriver, self).delete_snapshot(snapshot)
+        self._post_prov_deprov_in_ssc(share)
+
+    def _post_prov_deprov_in_ssc(self, share):
+        if self.ssc_enabled and share:
+            netapp_vol = self._get_vol_for_share(share)
+            if netapp_vol:
+                self._update_stale_vols(volume=netapp_vol)
+
 
 class NetAppDirect7modeNfsDriver (NetAppDirectNfsDriver):
     """Executes commands related to volumes on 7 mode."""
index 1343d4f4f713b51253848b2c89e0e683f9225626..3f8c3eeddf7bf348b34aee34f61a82e8a8788f0a 100644 (file)
@@ -109,31 +109,36 @@ def get_cluster_vols_with_ssc(na_server, vserver, volume=None):
                 aggr_attrs = aggrs[aggr_name]
             else:
                 aggr_attrs = query_aggr_options(na_server, aggr_name)
-                eff_disk_type = query_aggr_storage_disk(na_server, aggr_name)
-                aggr_attrs['disk_type'] = eff_disk_type
+                if aggr_attrs:
+                    eff_disk_type = query_aggr_storage_disk(na_server,
+                                                            aggr_name)
+                    aggr_attrs['disk_type'] = eff_disk_type
                 aggrs[aggr_name] = aggr_attrs
             vol.aggr['raid_type'] = aggr_attrs.get('raid_type')
             vol.aggr['ha_policy'] = aggr_attrs.get('ha_policy')
             vol.aggr['disk_type'] = aggr_attrs.get('disk_type')
-        if vol.id['name'] in sis_vols:
-            vol.sis['dedup'] = sis_vols[vol.id['name']]['dedup']
-            vol.sis['compression'] = sis_vols[vol.id['name']]['compression']
-        else:
-            vol.sis['dedup'] = False
-            vol.sis['compression'] = False
+        if sis_vols:
+            if vol.id['name'] in sis_vols:
+                vol.sis['dedup'] = sis_vols[vol.id['name']]['dedup']
+                vol.sis['compression'] =\
+                    sis_vols[vol.id['name']]['compression']
+            else:
+                vol.sis['dedup'] = False
+                vol.sis['compression'] = False
         if (vol.space['space-guarantee-enabled'] and
                 (vol.space['space-guarantee'] == 'file' or
                  vol.space['space-guarantee'] == 'volume')):
             vol.space['thin_provisioned'] = False
         else:
             vol.space['thin_provisioned'] = True
-        vol.mirror['mirrored'] = False
-        if vol.id['name'] in mirrored_vols:
-            for mirr_attrs in mirrored_vols[vol.id['name']]:
-                if (mirr_attrs['rel_type'] == 'data_protection' and
-                        mirr_attrs['mirr_state'] == 'snapmirrored'):
-                    vol.mirror['mirrored'] = True
-                    break
+        if mirrored_vols:
+            vol.mirror['mirrored'] = False
+            if vol.id['name'] in mirrored_vols:
+                for mirr_attrs in mirrored_vols[vol.id['name']]:
+                    if (mirr_attrs['rel_type'] == 'data_protection' and
+                            mirr_attrs['mirr_state'] == 'snapmirrored'):
+                        vol.mirror['mirrored'] = True
+                        break
     return volumes
 
 
@@ -158,10 +163,11 @@ def query_cluster_vols_for_ssc(na_server, vserver, volume=None):
     for res in result:
         records = res.get_child_content('num-records')
         if records > 0:
-            attr_list = res['attributes-list']
-            vol_attrs = attr_list.get_children()
-            vols_found = create_vol_list(vol_attrs)
-            vols.update(vols_found)
+            attr_list = res.get_child_by_name('attributes-list')
+            if attr_list:
+                vol_attrs = attr_list.get_children()
+                vols_found = create_vol_list(vol_attrs)
+                vols.update(vols_found)
     return vols
 
 
@@ -247,22 +253,25 @@ def query_aggr_options(na_server, aggr_name):
     """
 
     add_elems = {'aggregate': aggr_name}
-    result = na_utils.invoke_api(na_server,
-                                 api_name='aggr-options-list-info',
-                                 api_family='cm', query=None,
-                                 des_result=None,
-                                 additional_elems=add_elems,
-                                 is_iter=False)
     attrs = {}
-    for res in result:
-        options = res.get_child_by_name('options')
-        if options:
-            op_list = options.get_children()
-            for op in op_list:
-                if op.get_child_content('name') == 'ha_policy':
-                    attrs['ha_policy'] = op.get_child_content('value')
-                if op.get_child_content('name') == 'raidtype':
-                    attrs['raid_type'] = op.get_child_content('value')
+    try:
+        result = na_utils.invoke_api(na_server,
+                                     api_name='aggr-options-list-info',
+                                     api_family='cm', query=None,
+                                     des_result=None,
+                                     additional_elems=add_elems,
+                                     is_iter=False)
+        for res in result:
+            options = res.get_child_by_name('options')
+            if options:
+                op_list = options.get_children()
+                for op in op_list:
+                    if op.get_child_content('name') == 'ha_policy':
+                        attrs['ha_policy'] = op.get_child_content('value')
+                    if op.get_child_content('name') == 'raidtype':
+                        attrs['raid_type'] = op.get_child_content('value')
+    except Exception as e:
+        LOG.debug(_("Exception querying aggr options. %s"), e)
     return attrs
 
 
@@ -279,28 +288,31 @@ def get_sis_vol_dict(na_server, vserver, volume=None):
         vol_path = '/vol/%s' % (volume)
         query_attr['path'] = vol_path
     query = {'sis-status-info': query_attr}
-    result = na_utils.invoke_api(na_server,
-                                 api_name='sis-get-iter',
-                                 api_family='cm',
-                                 query=query,
-                                 is_iter=True)
-    for res in result:
-        attr_list = res.get_child_by_name('attributes-list')
-        if attr_list:
-            sis_status = attr_list.get_children()
-            for sis in sis_status:
-                path = sis.get_child_content('path')
-                if not path:
-                    continue
-                (___, __, vol) = path.rpartition('/')
-                if not vol:
-                    continue
-                v_sis = {}
-                v_sis['compression'] = na_utils.to_bool(
-                    sis.get_child_content('is-compression-enabled'))
-                v_sis['dedup'] = na_utils.to_bool(
-                    sis.get_child_content('state'))
-                sis_vols[vol] = v_sis
+    try:
+        result = na_utils.invoke_api(na_server,
+                                     api_name='sis-get-iter',
+                                     api_family='cm',
+                                     query=query,
+                                     is_iter=True)
+        for res in result:
+            attr_list = res.get_child_by_name('attributes-list')
+            if attr_list:
+                sis_status = attr_list.get_children()
+                for sis in sis_status:
+                    path = sis.get_child_content('path')
+                    if not path:
+                        continue
+                    (___, __, vol) = path.rpartition('/')
+                    if not vol:
+                        continue
+                    v_sis = {}
+                    v_sis['compression'] = na_utils.to_bool(
+                        sis.get_child_content('is-compression-enabled'))
+                    v_sis['dedup'] = na_utils.to_bool(
+                        sis.get_child_content('state'))
+                    sis_vols[vol] = v_sis
+    except Exception as e:
+        LOG.debug(_("Exception querying sis information. %s"), e)
     return sis_vols
 
 
@@ -311,54 +323,62 @@ def get_snapmirror_vol_dict(na_server, vserver, volume=None):
     if volume:
         query_attr['source-volume'] = volume
     query = {'snapmirror-info': query_attr}
-    result = na_utils.invoke_api(na_server, api_name='snapmirror-get-iter',
-                                 api_family='cm', query=query, is_iter=True)
-    for res in result:
-        attr_list = res.get_child_by_name('attributes-list')
-        if attr_list:
-            snap_info = attr_list.get_children()
-            for snap in snap_info:
-                src_volume = snap.get_child_content('source-volume')
-                v_snap = {}
-                v_snap['dest_loc'] =\
-                    snap.get_child_content('destination-location')
-                v_snap['rel_type'] =\
-                    snap.get_child_content('relationship-type')
-                v_snap['mirr_state'] =\
-                    snap.get_child_content('mirror-state')
-                if mirrored_vols.get(src_volume):
-                    mirrored_vols.get(src_volume).append(v_snap)
-                else:
-                    mirrored_vols[src_volume] = [v_snap]
+    try:
+        result = na_utils.invoke_api(na_server,
+                                     api_name='snapmirror-get-iter',
+                                     api_family='cm', query=query,
+                                     is_iter=True)
+        for res in result:
+            attr_list = res.get_child_by_name('attributes-list')
+            if attr_list:
+                snap_info = attr_list.get_children()
+                for snap in snap_info:
+                    src_volume = snap.get_child_content('source-volume')
+                    v_snap = {}
+                    v_snap['dest_loc'] =\
+                        snap.get_child_content('destination-location')
+                    v_snap['rel_type'] =\
+                        snap.get_child_content('relationship-type')
+                    v_snap['mirr_state'] =\
+                        snap.get_child_content('mirror-state')
+                    if mirrored_vols.get(src_volume):
+                        mirrored_vols.get(src_volume).append(v_snap)
+                    else:
+                        mirrored_vols[src_volume] = [v_snap]
+    except Exception as e:
+        LOG.debug(_("Exception querying mirror information. %s"), e)
     return mirrored_vols
 
 
 def query_aggr_storage_disk(na_server, aggr):
-    """Queries for storage disks assosiated to an aggregate."""
+    """Queries for storage disks associated to an aggregate."""
     query = {'storage-disk-info': {'disk-raid-info':
                                    {'disk-aggregate-info':
                                        {'aggregate-name': aggr}}}}
     des_attr = {'storage-disk-info':
                 {'disk-raid-info': ['effective-disk-type']}}
-    result = na_utils.invoke_api(na_server,
-                                 api_name='storage-disk-get-iter',
-                                 api_family='cm', query=query,
-                                 des_result=des_attr,
-                                 additional_elems=None,
-                                 is_iter=True)
-    for res in result:
-        attr_list = res.get_child_by_name('attributes-list')
-        if attr_list:
-            storage_disks = attr_list.get_children()
-            for disk in storage_disks:
-                raid_info = disk.get_child_by_name('disk-raid-info')
-                if raid_info:
-                    eff_disk_type =\
-                        raid_info.get_child_content('effective-disk-type')
-                    if eff_disk_type:
-                        return eff_disk_type
-                    else:
-                        continue
+    try:
+        result = na_utils.invoke_api(na_server,
+                                     api_name='storage-disk-get-iter',
+                                     api_family='cm', query=query,
+                                     des_result=des_attr,
+                                     additional_elems=None,
+                                     is_iter=True)
+        for res in result:
+            attr_list = res.get_child_by_name('attributes-list')
+            if attr_list:
+                storage_disks = attr_list.get_children()
+                for disk in storage_disks:
+                    raid_info = disk.get_child_by_name('disk-raid-info')
+                    if raid_info:
+                        eff_disk_type =\
+                            raid_info.get_child_content('effective-disk-type')
+                        if eff_disk_type:
+                            return eff_disk_type
+                        else:
+                            continue
+    except Exception as e:
+        LOG.debug(_("Exception querying storage disk. %s"), e)
     return 'unknown'
 
 
@@ -373,13 +393,13 @@ def get_cluster_ssc(na_server, vserver):
                'compression': compress_vols,
                'thin': thin_prov_vols, 'all': netapp_volumes}
     for vol in netapp_volumes:
-        if vol.sis['dedup']:
+        if vol.sis.get('dedup'):
             dedup_vols.add(vol)
-        if vol.sis['compression']:
+        if vol.sis.get('compression'):
             compress_vols.add(vol)
-        if vol.mirror['mirrored']:
+        if vol.mirror.get('mirrored'):
             mirror_vols.add(vol)
-        if vol.space['thin_provisioned']:
+        if vol.space.get('thin_provisioned'):
             thin_prov_vols.add(vol)
     return ssc_map
 
@@ -419,13 +439,13 @@ def refresh_cluster_stale_ssc(*args, **kwargs):
                     for k in ssc_vols_copy:
                         vol_set = ssc_vols_copy[k]
                         vol_set.discard(vol)
-                        if k == "mirrored" and vol.mirror['mirrored']:
+                        if k == "mirrored" and vol.mirror.get('mirrored'):
                             vol_set.add(vol)
-                        if k == "dedup" and vol.sis['dedup']:
+                        if k == "dedup" and vol.sis.get('dedup'):
                             vol_set.add(vol)
-                        if k == "compression" and vol.sis['compression']:
+                        if k == "compression" and vol.sis.get('compression'):
                             vol_set.add(vol)
-                        if k == "thin" and vol.space['thin_provisioned']:
+                        if k == "thin" and vol.space.get('thin_provisioned'):
                             vol_set.add(vol)
                         if k == "all":
                             vol_set.add(vol)
@@ -583,3 +603,26 @@ def get_volumes_for_specs(ssc_vols, specs):
                 if qos_policy_group.lower() != vol_qos:
                     result.discard(vol)
     return result
+
+
+def check_ssc_api_permissions(na_server):
+    """Checks backend ssc api permissions for the user."""
+    api_map = {'storage-disk-get-iter': ['disk type'],
+               'snapmirror-get-iter': ['data protection mirror'],
+               'sis-get-iter': ['deduplication', 'compression'],
+               'aggr-options-list-info': ['raid type'],
+               'volume-get-iter': ['volume information']}
+    failed_apis = na_utils.check_apis_on_cluster(na_server, api_map.keys())
+    if failed_apis:
+        if 'volume-get-iter' in failed_apis:
+            msg = _("Fatal error: User not permitted"
+                    " to query NetApp volumes.")
+            raise exception.VolumeBackendAPIException(data=msg)
+        else:
+            unsupp_ssc_features = []
+            for fail in failed_apis:
+                unsupp_ssc_features.extend(api_map[fail])
+            LOG.warn(_("The user does not have access or sufficient"
+                       " privileges to use all ssc apis. The ssc"
+                       " features %s may not work as expected."),
+                     unsupp_ssc_features)
index 961d7c13d1f768b6170cf8eabe580dc305eb3d8c..0f8d2e9294ef8e8776e212dd69e8a4c4a2545787 100644 (file)
@@ -32,6 +32,7 @@ from cinder.openstack.common import timeutils
 from cinder import utils
 from cinder.volume.drivers.netapp.api import NaApiError
 from cinder.volume.drivers.netapp.api import NaElement
+from cinder.volume.drivers.netapp.api import NaErrors
 from cinder.volume.drivers.netapp.api import NaServer
 from cinder.volume import volume_types
 
@@ -89,7 +90,7 @@ def provide_ems(requester, server, stats, netapp_backend,
             vs_info = attr_list.get_child_by_name('vserver-info')
             vs_name = vs_info.get_child_content('vserver-name')
             return vs_name
-        raise NaApiError(code='Not found', message='No records found')
+        return None
 
     do_ems = True
     if hasattr(requester, 'last_ems'):
@@ -103,15 +104,28 @@ def provide_ems(requester, server, stats, netapp_backend,
         ems = _create_ems(stats, netapp_backend, server_type)
         try:
             if server_type == "cluster":
-                node = _get_cluster_node(na_server)
+                api_version = na_server.get_api_version()
+                if api_version:
+                    major, minor = api_version
+                else:
+                    raise NaApiError(code='Not found',
+                                     message='No api version found')
+                if major == 1 and minor > 15:
+                    node = getattr(requester, 'vserver', None)
+                else:
+                    node = _get_cluster_node(na_server)
+                if node is None:
+                    raise NaApiError(code='Not found',
+                                     message='No vserver found')
                 na_server.set_vserver(node)
             else:
                 na_server.set_vfiler(None)
             na_server.invoke_successfully(ems, True)
-            requester.last_ems = timeutils.utcnow()
             LOG.debug(_("ems executed successfully."))
         except NaApiError as e:
-            LOG.debug(_("Failed to invoke ems. Message : %s") % e)
+            LOG.warn(_("Failed to invoke ems. Message : %s") % e)
+        finally:
+            requester.last_ems = timeutils.utcnow()
 
 
 def validate_instantiation(**kwargs):
@@ -249,3 +263,62 @@ def get_volume_extra_specs(volume):
         volume_type = volume_types.get_volume_type(ctxt, type_id)
         specs = volume_type.get('extra_specs')
     return specs
+
+
+def check_apis_on_cluster(na_server, api_list=[]):
+    """Checks api availability and permissions on cluster.
+
+    Checks api availability and permissions for executing user.
+    Returns a list of failed apis.
+    """
+    failed_apis = []
+    if api_list:
+        api_version = na_server.get_api_version()
+        if api_version:
+            major, minor = api_version
+            if major == 1 and minor < 20:
+                for api_name in api_list:
+                    na_el = NaElement(api_name)
+                    try:
+                        na_server.invoke_successfully(na_el)
+                    except Exception as e:
+                        if isinstance(e, NaApiError):
+                            if (e.code == NaErrors['API_NOT_FOUND'].code or
+                                    e.code ==
+                                    NaErrors['INSUFFICIENT_PRIVS'].code):
+                                failed_apis.append(api_name)
+            elif major == 1 and minor >= 20:
+                failed_apis = copy.copy(api_list)
+                result = invoke_api(
+                    na_server,
+                    api_name='system-user-capability-get-iter',
+                    api_family='cm',
+                    additional_elems=None,
+                    is_iter=True)
+                for res in result:
+                    attr_list = res.get_child_by_name('attributes-list')
+                    if attr_list:
+                        capabilities = attr_list.get_children()
+                        for capability in capabilities:
+                            op_list = capability.get_child_by_name(
+                                'operation-list')
+                            if op_list:
+                                ops = op_list.get_children()
+                                for op in ops:
+                                    apis = op.get_child_content('api-name')
+                                    if apis:
+                                        api_list = apis.split(',')
+                                        for api_name in api_list:
+                                            if (api_name and
+                                                    api_name.strip()
+                                                    in failed_apis):
+                                                failed_apis.remove(api_name)
+                                    else:
+                                        continue
+            else:
+                msg = _("Unsupported Clustered Data ONTAP version.")
+                raise exception.VolumeBackendAPIException(data=msg)
+        else:
+            msg = _("Api version could not be determined.")
+            raise exception.VolumeBackendAPIException(data=msg)
+    return failed_apis