From 4cac1bd3227fd8b65744e75f0df018fb34bcb1c1 Mon Sep 17 00:00:00 2001 From: LarryLiu Date: Fri, 14 Feb 2014 15:46:07 +0800 Subject: [PATCH] Storwize/SVC: Change volume copy task to async If Cinder crashes during a migration or retype (where data is moved and the operation can take a long time), the storage ends up with multiple copies of the same volume which requires storage admin intervention. This patch maintain a list of pending operations which is backed up in admin metadata, and a periodic task reviews the list and removes volume copies whose copy operation completed. When Cinder comes up, check the admin metadata and rebuild the list. Change-Id: I6549712bb0083996faced89c2207a4c438ae953d Closes-Bug: #1278035 --- cinder/tests/test_storwize_svc.py | 101 +++++----- cinder/volume/driver.py | 1 + .../drivers/ibm/storwize_svc/__init__.py | 185 ++++++++++++++---- .../drivers/ibm/storwize_svc/helpers.py | 30 ++- cinder/volume/drivers/ibm/storwize_svc/ssh.py | 5 - cinder/volume/manager.py | 3 +- 6 files changed, 213 insertions(+), 112 deletions(-) diff --git a/cinder/tests/test_storwize_svc.py b/cinder/tests/test_storwize_svc.py index 55cf20115..97f86efbf 100644 --- a/cinder/tests/test_storwize_svc.py +++ b/cinder/tests/test_storwize_svc.py @@ -25,9 +25,11 @@ import re from cinder import context from cinder import exception from cinder.openstack.common import excutils +from cinder.openstack.common import importutils from cinder.openstack.common import log as logging from cinder.openstack.common import processutils from cinder import test +from cinder.tests import utils as testutils from cinder import units from cinder import utils from cinder.volume import configuration as conf @@ -38,17 +40,6 @@ from cinder.volume import volume_types LOG = logging.getLogger(__name__) -class StorwizeSVCFakeDB: - def __init__(self): - self.volume = None - - def volume_get(self, ctxt, vol_id): - return self.volume - - def volume_set(self, vol): - self.volume = vol - - class StorwizeSVCManagementSimulator: def __init__(self, pool_name): self._flags = {'storwize_svc_volpool_name': pool_name} @@ -1564,7 +1555,10 @@ class StorwizeSVCDriverTestCase(test.TestCase): self._connector = utils.brick_get_connector_properties() self._reset_flags() - self.driver.db = StorwizeSVCFakeDB() + self.ctxt = context.get_admin_context() + db_driver = self.driver.configuration.db_driver + self.db = importutils.import_module(db_driver) + self.driver.db = self.db self.driver.do_setup(None) self.driver.check_for_setup_error() self.sleeppatch = mock.patch('eventlet.greenthread.sleep') @@ -1688,8 +1682,17 @@ class StorwizeSVCDriverTestCase(test.TestCase): 'volume_type_id': None, 'mdisk_grp_name': 'openstack'} + def _create_volume(self, **kwargs): + vol = testutils.create_volume(self.ctxt, **kwargs) + self.driver.create_volume(vol) + return vol + + def _delete_volume(self, volume): + self.driver.delete_volume(volume) + self.db.volume_destroy(self.ctxt, volume['id']) + def _create_test_vol(self, opts): - ctxt = context.get_admin_context() + ctxt = testutils.get_test_admin_context() type_ref = volume_types.create(ctxt, 'testtype', opts) volume = self._generate_vol_info(None, None) type_id = type_ref['id'] @@ -1704,9 +1707,7 @@ class StorwizeSVCDriverTestCase(test.TestCase): return attrs def test_storwize_svc_snapshots(self): - vol1 = self._generate_vol_info(None, None) - self.driver.create_volume(vol1) - self.driver.db.volume_set(vol1) + vol1 = self._create_volume() snap1 = self._generate_vol_info(vol1['name'], vol1['id']) # Test timeout and volume cleanup @@ -1749,9 +1750,7 @@ class StorwizeSVCDriverTestCase(test.TestCase): self.driver.delete_snapshot(snap1) def test_storwize_svc_create_volfromsnap_clone(self): - vol1 = self._generate_vol_info(None, None) - self.driver.create_volume(vol1) - self.driver.db.volume_set(vol1) + vol1 = self._create_volume() snap1 = self._generate_vol_info(vol1['name'], vol1['id']) self.driver.create_snapshot(snap1) vol2 = self._generate_vol_info(None, None) @@ -2193,9 +2192,7 @@ class StorwizeSVCDriverTestCase(test.TestCase): def test_storwize_svc_delete_volume_snapshots(self): # Create a volume with two snapshots - master = self._generate_vol_info(None, None) - self.driver.create_volume(master) - self.driver.db.volume_set(master) + master = self._create_volume() # Fail creating a snapshot - will force delete the snapshot if self.USESIM and False: @@ -2286,9 +2283,7 @@ class StorwizeSVCDriverTestCase(test.TestCase): self.assertAlmostEqual(stats['free_capacity_gb'], 3287.5) def test_storwize_svc_extend_volume(self): - volume = self._generate_vol_info(None, None) - self.driver.db.volume_set(volume) - self.driver.create_volume(volume) + volume = self._create_volume() self.driver.extend_volume(volume, '13') attrs = self.driver._helpers.get_vdisk_attributes(volume['name']) vol_size = int(attrs['capacity']) / units.GiB @@ -2320,39 +2315,18 @@ class StorwizeSVCDriverTestCase(test.TestCase): cap = {'location_info': 'StorwizeSVCDriver:foo:bar'} self._check_loc_info(cap, {'moved': False, 'model_update': None}) - def test_storwize_svc_migrate_same_extent_size(self): + def test_storwize_svc_volume_migrate(self): # Make sure we don't call migrate_volume_vdiskcopy - with mock.patch.object(self.driver._helpers, - 'migrate_volume_vdiskcopy') as migr_vdiskcopy: - migr_vdiskcopy.side_effect = KeyError - self.driver.do_setup(None) - loc = ('StorwizeSVCDriver:' + self.driver._state['system_id'] + - ':openstack2') - cap = {'location_info': loc, 'extent_size': '256'} - host = {'host': 'foo', 'capabilities': cap} - ctxt = context.get_admin_context() - volume = self._generate_vol_info(None, None) - volume['volume_type_id'] = None - self.driver.create_volume(volume) - self.driver.migrate_volume(ctxt, volume, host) - self.driver.delete_volume(volume) - - def test_storwize_svc_migrate_diff_extent_size(self): self.driver.do_setup(None) loc = ('StorwizeSVCDriver:' + self.driver._state['system_id'] + - ':openstack3') - cap = {'location_info': loc, 'extent_size': '128'} + ':openstack2') + cap = {'location_info': loc, 'extent_size': '256'} host = {'host': 'foo', 'capabilities': cap} ctxt = context.get_admin_context() - volume = self._generate_vol_info(None, None) + volume = self._create_volume() volume['volume_type_id'] = None - self.driver.create_volume(volume) - self.assertNotEqual(cap['extent_size'], - self.driver._state['extent_size']) self.driver.migrate_volume(ctxt, volume, host) - attrs = self.driver._helpers.get_vdisk_attributes(volume['name']) - self.assertIn('openstack3', attrs['mdisk_grp_name']) - self.driver.delete_volume(volume) + self._delete_volume(volume) def test_storwize_svc_retype_no_copy(self): self.driver.do_setup(None) @@ -2446,8 +2420,29 @@ class StorwizeSVCDriverTestCase(test.TestCase): def test_set_storage_code_level_success(self): res = self.driver._helpers.get_system_info() - self.assertEqual((7, 2, 0, 0), res['code_level'], - 'Get code level error') + if self.USESIM: + self.assertEqual((7, 2, 0, 0), res['code_level'], + 'Get code level error') + + def test_storwize_vdisk_copy_ops(self): + ctxt = testutils.get_test_admin_context() + volume = self._create_volume() + driver = self.driver + dest_pool = self.driver.configuration.storwize_svc_volpool_name + new_ops = driver._helpers.add_vdisk_copy(volume['name'], dest_pool, + None, self.driver._state, + self.driver.configuration) + self.driver._add_vdisk_copy_op(ctxt, volume, new_ops) + admin_metadata = self.db.volume_admin_metadata_get(ctxt, volume['id']) + self.assertEqual(":".join(x for x in new_ops), + admin_metadata['vdiskcopyops'], + 'Storwize driver add vdisk copy error.') + self.driver._check_volume_copy_ops() + self.driver._rm_vdisk_copy_op(ctxt, volume, new_ops[0], new_ops[1]) + admin_metadata = self.db.volume_admin_metadata_get(ctxt, volume['id']) + self.assertEqual(None, admin_metadata.get('vdiskcopyops', None), + 'Storwize driver delete vdisk copy error') + self._delete_volume(volume) class CLIResponseTestCase(test.TestCase): diff --git a/cinder/volume/driver.py b/cinder/volume/driver.py index cd96acc1e..22dc586b6 100644 --- a/cinder/volume/driver.py +++ b/cinder/volume/driver.py @@ -137,6 +137,7 @@ class VolumeDriver(object): def __init__(self, execute=utils.execute, *args, **kwargs): # NOTE(vish): db is set by Manager self.db = kwargs.get('db') + self.host = kwargs.get('host') self.configuration = kwargs.get('configuration', None) if self.configuration: self.configuration.append_config_values(volume_opts) diff --git a/cinder/volume/drivers/ibm/storwize_svc/__init__.py b/cinder/volume/drivers/ibm/storwize_svc/__init__.py index ea150df42..b2c13eab2 100644 --- a/cinder/volume/drivers/ibm/storwize_svc/__init__.py +++ b/cinder/volume/drivers/ibm/storwize_svc/__init__.py @@ -40,6 +40,7 @@ from cinder import context from cinder import exception from cinder.openstack.common import excutils from cinder.openstack.common import log as logging +from cinder.openstack.common import loopingcall from cinder import units from cinder import utils from cinder.volume.drivers.ibm.storwize_svc import helpers as storwize_helpers @@ -114,21 +115,24 @@ class StorwizeSVCDriver(san.SanDriver): 1.2.3 - Fix Fibre Channel connectivity: bug #1279758 (add delim to lsfabric, clear unused data from connections, ensure matching WWPNs by comparing lower case + 1.2.4 - Fix bug #1278035 (async migration/retype) """ - VERSION = "1.2.3" + VERSION = "1.2.4" + VDISKCOPYOPS_INTERVAL = 600 def __init__(self, *args, **kwargs): super(StorwizeSVCDriver, self).__init__(*args, **kwargs) self.configuration.append_config_values(storwize_svc_opts) self._helpers = storwize_helpers.StorwizeHelpers(self._run_ssh) + self._vdiskcopyops = {} + self._vdiskcopyops_loop = None self._state = {'storage_nodes': {}, 'enabled_protocols': set(), 'compression_enabled': False, 'available_iogrps': [], 'system_name': None, 'system_id': None, - 'extent_size': None, 'code_level': None, } @@ -142,11 +146,10 @@ class StorwizeSVCDriver(san.SanDriver): # Validate that the pool exists pool = self.configuration.storwize_svc_volpool_name try: - attributes = self._helpers.get_pool_attrs(pool) + self._helpers.get_pool_attrs(pool) except exception.VolumeBackendAPIException: msg = _('Failed getting details for pool %s') % pool raise exception.InvalidInput(reason=msg) - self._state['extent_size'] = attributes['extent_size'] # Check if compression is supported self._state['compression_enabled'] = \ @@ -184,6 +187,28 @@ class StorwizeSVCDriver(san.SanDriver): msg = _('do_setup: No configured nodes.') LOG.error(msg) raise exception.VolumeDriverException(message=msg) + + # Build the list of in-progress vdisk copy operations + if ctxt is None: + admin_context = context.get_admin_context() + else: + admin_context = ctxt.elevated() + volumes = self.db.volume_get_all_by_host(admin_context, self.host) + + for volume in volumes: + metadata = self.db.volume_admin_metadata_get(admin_context, + volume['id']) + curr_ops = metadata.get('vdiskcopyops', None) + if curr_ops: + ops = [tuple(x.split(':')) for x in curr_ops.split(';')] + self._vdiskcopyops[volume['id']] = ops + + # if vdiskcopy exists in database, start the looping call + if len(self._vdiskcopyops) >= 1: + self._vdiskcopyops_loop = loopingcall.LoopingCall( + self._check_volume_copy_ops) + self._vdiskcopyops_loop.start(interval=self.VDISKCOPYOPS_INTERVAL) + LOG.debug(_('leave: do_setup')) def check_for_setup_error(self): @@ -197,9 +222,6 @@ class StorwizeSVCDriver(san.SanDriver): if self._state['system_id'] is None: exception_msg = (_('Unable to determine system id')) raise exception.VolumeBackendAPIException(data=exception_msg) - if self._state['extent_size'] is None: - exception_msg = (_('Unable to determine pool extent size')) - raise exception.VolumeBackendAPIException(data=exception_msg) required_flags = ['san_ip', 'san_ssh_port', 'san_login', 'storwize_svc_volpool_name'] @@ -285,7 +307,6 @@ class StorwizeSVCDriver(san.SanDriver): 'conn': str(connector)}) vol_opts = self._get_vdisk_params(volume['volume_type_id']) - host_name = connector['host'] volume_name = volume['name'] # Delete irrelevant connection information that later could result @@ -453,7 +474,12 @@ class StorwizeSVCDriver(san.SanDriver): def create_snapshot(self, snapshot): ctxt = context.get_admin_context() - source_vol = self.db.volume_get(ctxt, snapshot['volume_id']) + try: + source_vol = self.db.volume_get(ctxt, snapshot['volume_id']) + except Exception: + msg = (_('create_snapshot: get source volume failed.')) + LOG.error(msg) + raise exception.VolumeDriverException(message=msg) opts = self._get_vdisk_params(source_vol['volume_type_id']) self._helpers.create_copy(snapshot['volume_name'], snapshot['name'], snapshot['volume_id'], self.configuration, @@ -500,13 +526,112 @@ class StorwizeSVCDriver(san.SanDriver): self._helpers.extend_vdisk(volume['name'], extend_amt) LOG.debug(_('leave: extend_volume: volume %s') % volume['id']) + def _add_vdisk_copy_op(self, ctxt, volume, new_op): + metadata = self.db.volume_admin_metadata_get(ctxt.elevated(), + volume['id']) + curr_ops = metadata.get('vdiskcopyops', None) + if curr_ops: + curr_ops_list = [tuple(x.split(':')) for x in curr_ops.split(';')] + new_ops_list = curr_ops_list.append(new_op) + else: + new_ops_list = [new_op] + new_ops_str = ';'.join([':'.join(x) for x in new_ops_list]) + self.db.volume_admin_metadata_update(ctxt.elevated(), volume['id'], + {'vdiskcopyops': new_ops_str}, + False) + if volume['id'] in self._vdiskcopyops: + self._vdiskcopyops[volume['id']].append(new_op) + else: + self._vdiskcopyops[volume['id']] = [new_op] + + # We added the first copy operation, so start the looping call + if len(self._vdiskcopyops) == 1: + self._vdiskcopyops_loop = loopingcall.LoopingCall( + self._check_volume_copy_ops) + self._vdiskcopyops_loop.start(interval=self.VDISKCOPYOPS_INTERVAL) + + def _rm_vdisk_copy_op(self, ctxt, volume, orig_copy_id, new_copy_id): + try: + self._vdiskcopyops[volume['id']].remove((orig_copy_id, + new_copy_id)) + if not len(self._vdiskcopyops[volume['id']]): + del self._vdiskcopyops[volume['id']] + if not len(self._vdiskcopyops): + self._vdiskcopyops_loop.stop() + self._vdiskcopyops_loop = None + except IndexError: + msg = (_('_rm_vdisk_copy_op: Volume %s does not have any ' + 'registered vdisk copy operations.') % volume['id']) + LOG.error(msg) + return + except ValueError: + msg = (_('_rm_vdisk_copy_op: Volume %(vol)s does not have the ' + 'specified vdisk copy operation: orig=%(orig)s ' + 'new=%(new)s.') + % {'vol': volume['id'], 'orig': orig_copy_id, + 'new': new_copy_id}) + LOG.error(msg) + return + + metadata = self.db.volume_admin_metadata_get(ctxt.elevated(), + volume['id']) + curr_ops = metadata.get('vdiskcopyops', None) + if not curr_ops: + msg = (_('_rm_vdisk_copy_op: Volume metadata %s does not have any ' + 'registered vdisk copy operations.') % volume['id']) + LOG.error(msg) + return + curr_ops_list = [tuple(x.split(':')) for x in curr_ops.split(';')] + try: + curr_ops_list.remove((orig_copy_id, new_copy_id)) + except ValueError: + msg = (_('_rm_vdisk_copy_op: Volume %(vol)s metadata does not ' + 'have the specified vdisk copy operation: orig=%(orig)s ' + 'new=%(new)s.') + % {'vol': volume['id'], 'orig': orig_copy_id, + 'new': new_copy_id}) + LOG.error(msg) + return + + if len(curr_ops_list): + new_ops_str = ';'.join([':'.join(x) for x in curr_ops_list]) + self.db.volume_admin_metadata_update(ctxt.elevated(), volume['id'], + {'vdiskcopyops': new_ops_str}, + False) + else: + self.db.volume_admin_metadata_delete(ctxt.elevated(), volume['id'], + 'vdiskcopyops') + + def _check_volume_copy_ops(self): + LOG.debug(_("enter: update volume copy status")) + ctxt = context.get_admin_context() + copy_items = self._vdiskcopyops.items() + for vol_id, copy_ops in copy_items: + volume = self.db.volume_get(ctxt, vol_id) + for copy_op in copy_ops: + try: + synced = self._helpers.is_vdisk_copy_synced(volume['name'], + copy_op[1]) + except Exception: + msg = (_('_check_volume_copy_ops: Volume %(vol)s does not ' + 'have the specified vdisk copy operation: ' + 'orig=%(orig)s new=%(new)s.') + % {'vol': volume['id'], 'orig': copy_op[0], + 'new': copy_op[1]}) + LOG.info(msg) + else: + if synced: + self._helpers.rm_vdisk_copy(volume['name'], copy_op[0]) + self._rm_vdisk_copy_op(ctxt, volume, copy_op[0], + copy_op[1]) + LOG.debug(_("exit: update volume copy status")) + def migrate_volume(self, ctxt, volume, host): """Migrate directly if source and dest are managed by same storage. - The method uses the migratevdisk method, which returns almost - immediately, if the source and target pools have the same extent_size. - Otherwise, it uses addvdiskcopy and rmvdiskcopy, which require waiting - for the copy operation to complete. + We create a new vdisk copy in the desired pool, and add the original + vdisk copy to the admin_metadata of the volume to be deleted. The + deletion will occur using a periodic task once the new copy is synced. :param ctxt: Context :param volume: A dictionary describing the volume to migrate @@ -522,24 +647,17 @@ class StorwizeSVCDriver(san.SanDriver): if dest_pool is None: return false_ret - if 'extent_size' not in host['capabilities']: - return false_ret - if host['capabilities']['extent_size'] == self._state['extent_size']: - # If source and dest pools have the same extent size, migratevdisk - self._helpers.migrate_vdisk(volume['name'], dest_pool) + ctxt = context.get_admin_context() + if volume['volume_type_id'] is not None: + volume_type_id = volume['volume_type_id'] + vol_type = volume_types.get_volume_type(ctxt, volume_type_id) else: - # If source and dest pool extent size differ, add/delete vdisk copy - ctxt = context.get_admin_context() - if volume['volume_type_id'] is not None: - volume_type_id = volume['volume_type_id'] - vol_type = volume_types.get_volume_type(ctxt, volume_type_id) - else: - vol_type = None - self._helpers.migrate_volume_vdiskcopy(volume['name'], dest_pool, - vol_type, - self._state, - self.configuration) + vol_type = None + new_op = self._helpers.add_vdisk_copy(volume['name'], dest_pool, + vol_type, self._state, + self.configuration) + self._add_vdisk_copy_op(ctxt, volume, new_op) LOG.debug(_('leave: migrate_volume: id=%(id)s, host=%(host)s') % {'id': volume['id'], 'host': host['host']}) return (True, None) @@ -590,10 +708,10 @@ class StorwizeSVCDriver(san.SanDriver): if dest_pool is None: return False - self._helpers.migrate_volume_vdiskcopy(volume['name'], dest_pool, - new_type, - self._state, - self.configuration) + new_op = self._helpers.add_vdisk_copy(volume['name'], dest_pool, + new_type, self._state, + self.configuration) + self._add_vdisk_copy_op(ctxt, volume, new_op) else: self._helpers.change_vdisk_options(volume['name'], vdisk_changes, new_opts, self._state) @@ -650,7 +768,6 @@ class StorwizeSVCDriver(san.SanDriver): units.GiB) data['easytier_support'] = attributes['easy_tier'] in ['on', 'auto'] data['compression_support'] = self._state['compression_enabled'] - data['extent_size'] = self._state['extent_size'] data['location_info'] = ('StorwizeSVCDriver:%(sys_id)s:%(pool)s' % {'sys_id': self._state['system_id'], 'pool': pool}) diff --git a/cinder/volume/drivers/ibm/storwize_svc/helpers.py b/cinder/volume/drivers/ibm/storwize_svc/helpers.py index b71cb4372..18c814acb 100644 --- a/cinder/volume/drivers/ibm/storwize_svc/helpers.py +++ b/cinder/volume/drivers/ibm/storwize_svc/helpers.py @@ -679,13 +679,8 @@ class StorwizeHelpers(object): def extend_vdisk(self, vdisk, amount): self.ssh.expandvdisksize(vdisk, amount) - def migrate_volume_vdiskcopy(self, vdisk, dest_pool, volume_type, - state, config): - """Migrate a volume using addvdiskcopy and rmvdiskcopy. - - This will add a vdisk copy with the given volume type in the given - pool, wait until it syncs, and delete the original copy. - """ + def add_vdisk_copy(self, vdisk, dest_pool, volume_type, state, config): + """Add a vdisk copy in the given pool.""" this_pool = config.storwize_svc_volpool_name resp = self.ssh.lsvdiskcopy(vdisk) orig_copy_id = None @@ -694,7 +689,7 @@ class StorwizeHelpers(object): orig_copy_id = copy_id if orig_copy_id is None: - msg = (_('migrate_volume started without a vdisk copy in the ' + msg = (_('add_vdisk_copy started without a vdisk copy in the ' 'expected pool.')) LOG.error(msg) raise exception.VolumeDriverException(message=msg) @@ -706,19 +701,16 @@ class StorwizeHelpers(object): volume_type=volume_type) params = self._get_vdisk_create_params(opts) new_copy_id = self.ssh.addvdiskcopy(vdisk, dest_pool, params) + return (orig_copy_id, new_copy_id) - sync = False - while not sync: - sync = self.ssh.lsvdiskcopy(vdisk, copy_id=new_copy_id)[0]['sync'] - if sync == 'yes': - sync = True - else: - greenthread.sleep(10) - - self.ssh.rmvdiskcopy(vdisk, orig_copy_id) + def is_vdisk_copy_synced(self, vdisk, copy_id): + sync = self.ssh.lsvdiskcopy(vdisk, copy_id=copy_id)[0]['sync'] + if sync == 'yes': + return True + return False - def migrate_vdisk(self, vdisk, dest_pool): - self.ssh.migratevdisk(vdisk, dest_pool) + def rm_vdisk_copy(self, vdisk, copy_id): + self.ssh.rmvdiskcopy(vdisk, copy_id) @staticmethod def can_migrate_to_host(host, state): diff --git a/cinder/volume/drivers/ibm/storwize_svc/ssh.py b/cinder/volume/drivers/ibm/storwize_svc/ssh.py index a6f6f7d3d..e05cfaf97 100644 --- a/cinder/volume/drivers/ibm/storwize_svc/ssh.py +++ b/cinder/volume/drivers/ibm/storwize_svc/ssh.py @@ -231,11 +231,6 @@ class StorwizeSSH(object): '-unit', 'gb', vdisk]) self.run_ssh_assert_no_output(ssh_cmd) - def migratevdisk(self, vdisk, dest_pool): - ssh_cmd = ['svctask', 'migratevdisk', '-mdiskgrp', dest_pool, - '-vdisk', vdisk] - self.run_ssh_assert_no_output(ssh_cmd) - def mkfcmap(self, source, target, full_copy): ssh_cmd = ['svctask', 'mkfcmap', '-source', source, '-target', target, '-autodelete'] diff --git a/cinder/volume/manager.py b/cinder/volume/manager.py index 6ea225bb8..54246cc96 100644 --- a/cinder/volume/manager.py +++ b/cinder/volume/manager.py @@ -189,7 +189,8 @@ class VolumeManager(manager.SchedulerDependentManager): self.driver = importutils.import_object( volume_driver, configuration=self.configuration, - db=self.db) + db=self.db, + host=self.host) def _add_to_threadpool(self, func, *args, **kwargs): self._tp.spawn_n(func, *args, **kwargs) -- 2.45.2