from cinder.openstack.common import imageutils
from cinder.openstack.common import log as logging
from cinder import utils
+from cinder.volume import throttling
from cinder.volume import utils as volume_utils
LOG = logging.getLogger(__name__)
return imageutils.QemuImgInfo(out)
-def convert_image(source, dest, out_format, bps_limit=None, run_as_root=True):
+def _convert_image(prefix, source, dest, out_format, run_as_root=True):
"""Convert image to other format."""
- cmd = ('qemu-img', 'convert',
- '-O', out_format, source, dest)
+ cmd = prefix + ('qemu-img', 'convert',
+ '-O', out_format, source, dest)
# Check whether O_DIRECT is supported and set '-t none' if it is
# This is needed to ensure that all data hit the device before
volume_utils.check_for_odirect_support(source,
dest,
'oflag=direct')):
- cmd = ('qemu-img', 'convert',
- '-t', 'none',
- '-O', out_format, source, dest)
+ cmd = prefix + ('qemu-img', 'convert',
+ '-t', 'none',
+ '-O', out_format, source, dest)
start_time = timeutils.utcnow()
- cgcmd = volume_utils.setup_blkio_cgroup(source, dest, bps_limit)
- if cgcmd:
- cmd = tuple(cgcmd) + cmd
utils.execute(*cmd, run_as_root=run_as_root)
-
duration = timeutils.delta_seconds(start_time, timeutils.utcnow())
# NOTE(jdg): use a default of 1, mostly for unit test, but in
LOG.info(msg % {"sz": fsz_mb, "mbps": mbps})
+def convert_image(source, dest, out_format, run_as_root=True, throttle=None):
+ if not throttle:
+ throttle = throttling.Throttle.get_default()
+ with throttle.subcommand(source, dest) as throttle_cmd:
+ _convert_image(tuple(throttle_cmd['prefix']),
+ source, dest,
+ out_format, run_as_root=run_as_root)
+
+
def resize_image(source, size, run_as_root=False):
"""Changes the virtual size of the image."""
cmd = ('qemu-img', 'resize', source, '%sG' % size)
LOG.debug("%s was %s, converting to %s " % (image_id, fmt,
volume_format))
convert_image(tmp, dest, volume_format,
- bps_limit=CONF.volume_copy_bps_limit,
run_as_root=run_as_root)
data = qemu_img_info(dest, run_as_root=run_as_root)
LOG.debug("%s was %s, converting to %s" %
(image_id, volume_format, image_meta['disk_format']))
convert_image(volume_path, tmp, image_meta['disk_format'],
- bps_limit=CONF.volume_copy_bps_limit,
run_as_root=run_as_root)
data = qemu_img_info(tmp, run_as_root=run_as_root)
from cinder import exception
from cinder.image import image_utils
from cinder import test
+from cinder.volume import throttling
class TestQemuImgInfo(test.TestCase):
class TestConvertImage(test.TestCase):
@mock.patch('cinder.image.image_utils.os.stat')
@mock.patch('cinder.utils.execute')
- @mock.patch('cinder.volume.utils.setup_blkio_cgroup',
- return_value=(mock.sentinel.cgcmd, ))
@mock.patch('cinder.utils.is_blk_device', return_value=True)
- def test_defaults_block_dev(self, mock_isblk, mock_cgroup, mock_exec,
+ def test_defaults_block_dev(self, mock_isblk, mock_exec,
mock_stat):
source = mock.sentinel.source
dest = mock.sentinel.dest
out_format = mock.sentinel.out_format
- cgcmd = mock.sentinel.cgcmd
mock_stat.return_value.st_size = 1048576
+ throttle = throttling.Throttle(prefix=['cgcmd'])
with mock.patch('cinder.volume.utils.check_for_odirect_support',
return_value=True):
- output = image_utils.convert_image(source, dest, out_format)
+ output = image_utils.convert_image(source, dest, out_format,
+ throttle=throttle)
self.assertIsNone(output)
- mock_exec.assert_called_once_with(cgcmd, 'qemu-img', 'convert',
+ mock_exec.assert_called_once_with('cgcmd', 'qemu-img', 'convert',
'-t', 'none', '-O', out_format,
source, dest, run_as_root=True)
output = image_utils.convert_image(source, dest, out_format)
self.assertIsNone(output)
- mock_exec.assert_called_once_with(cgcmd, 'qemu-img', 'convert',
+ mock_exec.assert_called_once_with('qemu-img', 'convert',
'-O', out_format, source, dest,
run_as_root=True)
return_value=True)
@mock.patch('cinder.image.image_utils.os.stat')
@mock.patch('cinder.utils.execute')
- @mock.patch('cinder.volume.utils.setup_blkio_cgroup',
- return_value=(mock.sentinel.cgcmd, ))
@mock.patch('cinder.utils.is_blk_device', return_value=False)
- def test_defaults_not_block_dev(self, mock_isblk, mock_cgroup, mock_exec,
+ def test_defaults_not_block_dev(self, mock_isblk, mock_exec,
mock_stat, mock_odirect):
source = mock.sentinel.source
dest = mock.sentinel.dest
out_format = mock.sentinel.out_format
- cgcmd = mock.sentinel.cgcmd
mock_stat.return_value.st_size = 1048576
output = image_utils.convert_image(source, dest, out_format)
self.assertIsNone(output)
- mock_exec.assert_called_once_with(cgcmd, 'qemu-img', 'convert', '-O',
+ mock_exec.assert_called_once_with('qemu-img', 'convert', '-O',
out_format, source, dest,
run_as_root=True)
'disk_format': mock.sentinel.disk_format}
volume_path = mock.sentinel.volume_path
mock_os.name = 'posix'
- mock_conf.volume_copy_bps_limit = mock.sentinel.bps_limit
data = mock_info.return_value
data.file_format = mock.sentinel.disk_format
temp_file = mock_temp.return_value.__enter__.return_value
mock_convert.assert_called_once_with(volume_path,
temp_file,
mock.sentinel.disk_format,
- bps_limit=mock.sentinel.bps_limit,
run_as_root=True)
mock_info.assert_called_once_with(temp_file, run_as_root=True)
mock_open.assert_called_once_with(temp_file, 'rb')
'disk_format': mock.sentinel.disk_format}
volume_path = mock.sentinel.volume_path
mock_os.name = 'posix'
- mock_conf.volume_copy_bps_limit = mock.sentinel.bps_limit
data = mock_info.return_value
data.file_format = mock.sentinel.other_disk_format
temp_file = mock_temp.return_value.__enter__.return_value
mock_convert.assert_called_once_with(volume_path,
temp_file,
mock.sentinel.disk_format,
- bps_limit=mock.sentinel.bps_limit,
run_as_root=True)
mock_info.assert_called_once_with(temp_file, run_as_root=True)
self.assertFalse(image_service.update.called)
volume_format = mock.sentinel.volume_format
blocksize = mock.sentinel.blocksize
- bps_limit = mock.sentinel.bps_limit
- mock_conf.volume_copy_bps_limit = bps_limit
data = mock_info.return_value
data.file_format = volume_format
data.backing_file = None
self.assertFalse(mock_repl_xen.called)
self.assertFalse(mock_copy.called)
mock_convert.assert_called_once_with(tmp, dest, volume_format,
- bps_limit=bps_limit,
run_as_root=True)
@mock.patch('cinder.image.image_utils.convert_image')
size = 4321
run_as_root = mock.sentinel.run_as_root
- bps_limit = mock.sentinel.bps_limit
- mock_conf.volume_copy_bps_limit = bps_limit
data = mock_info.return_value
data.file_format = volume_format
data.backing_file = None
self.assertFalse(mock_repl_xen.called)
self.assertFalse(mock_copy.called)
mock_convert.assert_called_once_with(tmp, dest, volume_format,
- bps_limit=bps_limit,
run_as_root=run_as_root)
@mock.patch('cinder.image.image_utils.convert_image')
size = 4321
run_as_root = mock.sentinel.run_as_root
- bps_limit = mock.sentinel.bps_limit
- mock_conf.volume_copy_bps_limit = bps_limit
tmp = mock_temp.return_value.__enter__.return_value
image_service.show.return_value = {'disk_format': 'raw',
'size': 41126400}
size = 4321
run_as_root = mock.sentinel.run_as_root
- bps_limit = mock.sentinel.bps_limit
- mock_conf.volume_copy_bps_limit = bps_limit
tmp = mock_temp.return_value.__enter__.return_value
image_service.show.return_value = {'disk_format': 'not_raw'}
size = 4321
run_as_root = mock.sentinel.run_as_root
- bps_limit = mock.sentinel.bps_limit
- mock_conf.volume_copy_bps_limit = bps_limit
tmp = mock_temp.return_value.__enter__.return_value
image_service.show.return_value = None
size = 1234
run_as_root = mock.sentinel.run_as_root
- bps_limit = mock.sentinel.bps_limit
- mock_conf.volume_copy_bps_limit = bps_limit
data = mock_info.return_value
data.file_format = volume_format
data.backing_file = None
size = 4321
run_as_root = mock.sentinel.run_as_root
- bps_limit = mock.sentinel.bps_limit
- mock_conf.volume_copy_bps_limit = bps_limit
data = mock_info.return_value
data.file_format = None
data.backing_file = None
size = 4321
run_as_root = mock.sentinel.run_as_root
- bps_limit = mock.sentinel.bps_limit
- mock_conf.volume_copy_bps_limit = bps_limit
data = mock_info.return_value
data.file_format = volume_format
data.backing_file = mock.sentinel.backing_file
size = 4321
run_as_root = mock.sentinel.run_as_root
- bps_limit = mock.sentinel.bps_limit
- mock_conf.volume_copy_bps_limit = bps_limit
data = mock_info.return_value
data.file_format = mock.sentinel.file_format
data.backing_file = None
self.assertFalse(mock_repl_xen.called)
self.assertFalse(mock_copy.called)
mock_convert.assert_called_once_with(tmp, dest, volume_format,
- bps_limit=bps_limit,
run_as_root=run_as_root)
@mock.patch('cinder.image.image_utils.convert_image')
size = 4321
run_as_root = mock.sentinel.run_as_root
- bps_limit = mock.sentinel.bps_limit
- mock_conf.volume_copy_bps_limit = bps_limit
data = mock_info.return_value
data.file_format = volume_format
data.backing_file = None
mock_repl_xen.assert_called_once_with(tmp)
self.assertFalse(mock_copy.called)
mock_convert.assert_called_once_with(tmp, dest, volume_format,
- bps_limit=bps_limit,
run_as_root=run_as_root)
self.stubs.Set(volutils, 'clear_volume',
lambda a, b, volume_clear=mox.IgnoreArg(),
volume_clear_size=mox.IgnoreArg(),
- lvm_type=mox.IgnoreArg(): None)
+ lvm_type=mox.IgnoreArg(),
+ throttle=mox.IgnoreArg(): None)
self.stubs.Set(tgt.TgtAdm,
'create_iscsi_target',
self._fake_create_iscsi_target)
pass
def fake_fetch_to_raw(ctx, image_service, image_id, path, blocksize,
- size=None):
+ size=None, throttle=None):
pass
def fake_clone_image(ctx, volume_ref,
--- /dev/null
+# Copyright (c) 2015 Hitachi Data Systems, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Tests for volume copy throttling helpers."""
+
+import mock
+from oslo_config import cfg
+
+from cinder import test
+from cinder import utils
+from cinder.volume import throttling
+
+
+CONF = cfg.CONF
+
+
+class ThrottleTestCase(test.TestCase):
+
+ def test_NoThrottle(self):
+ with throttling.Throttle().subcommand('volume1', 'volume2') as cmd:
+ self.assertEqual([], cmd['prefix'])
+
+ @mock.patch.object(utils, 'get_blkdev_major_minor')
+ def test_BlkioCgroup(self, mock_major_minor):
+
+ def fake_get_blkdev_major_minor(path):
+ return {'src_volume1': "253:0", 'dst_volume1': "253:1",
+ 'src_volume2': "253:2", 'dst_volume2': "253:3"}[path]
+
+ mock_major_minor.side_effect = fake_get_blkdev_major_minor
+
+ self.exec_cnt = 0
+
+ def fake_execute(*cmd, **kwargs):
+ cmd_set = ['cgset', '-r',
+ 'blkio.throttle.%s_bps_device=%s %d', 'fake_group']
+ set_order = [None,
+ ('read', '253:0', 1024),
+ ('write', '253:1', 1024),
+ # a nested job starts; bps limit are set to the half
+ ('read', '253:0', 512),
+ ('read', '253:2', 512),
+ ('write', '253:1', 512),
+ ('write', '253:3', 512),
+ # a nested job ends; bps limit is resumed
+ ('read', '253:0', 1024),
+ ('write', '253:1', 1024)]
+
+ if set_order[self.exec_cnt] is None:
+ self.assertEqual(('cgcreate', '-g', 'blkio:fake_group'), cmd)
+ else:
+ cmd_set[2] %= set_order[self.exec_cnt]
+ self.assertEqual(tuple(cmd_set), cmd)
+
+ self.exec_cnt += 1
+
+ with mock.patch.object(utils, 'execute', side_effect=fake_execute):
+ throttle = throttling.BlkioCgroup(1024, 'fake_group')
+ with throttle.subcommand('src_volume1', 'dst_volume1') as cmd:
+ self.assertEqual(['cgexec', '-g', 'blkio:fake_group'],
+ cmd['prefix'])
+
+ # a nested job
+ with throttle.subcommand('src_volume2', 'dst_volume2') as cmd:
+ self.assertEqual(['cgexec', '-g', 'blkio:fake_group'],
+ cmd['prefix'])
from cinder.openstack.common import log as logging
from cinder import test
from cinder import utils
+from cinder.volume import throttling
from cinder.volume import utils as volume_utils
self.assertIsNone(output)
mock_copy.assert_called_once_with('/dev/zero', 'volume_path', 1024,
'1M', sync=True,
- execute=utils.execute, ionice='-c3')
+ execute=utils.execute, ionice='-c3',
+ throttle=None)
@mock.patch('cinder.volume.utils.copy_volume', return_value=None)
@mock.patch('cinder.volume.utils.CONF')
self.assertIsNone(output)
mock_copy.assert_called_once_with('/dev/zero', 'volume_path', 1,
'1M', sync=True,
- execute=utils.execute, ionice='-c0')
+ execute=utils.execute, ionice='-c0',
+ throttle=None)
@mock.patch('cinder.utils.execute')
@mock.patch('cinder.volume.utils.CONF')
class CopyVolumeTestCase(test.TestCase):
- @mock.patch('cinder.volume.utils.setup_blkio_cgroup',
- return_value=['cg_cmd'])
@mock.patch('cinder.volume.utils._calculate_count',
return_value=(1234, 5678))
@mock.patch('cinder.volume.utils.check_for_odirect_support',
@mock.patch('cinder.utils.execute')
@mock.patch('cinder.volume.utils.CONF')
def test_copy_volume_dd_iflag_and_oflag(self, mock_conf, mock_exec,
- mock_support, mock_count, mock_cg):
- mock_conf.volume_copy_bps_limit = 10
+ mock_support, mock_count):
+ fake_throttle = throttling.Throttle(['fake_throttle'])
output = volume_utils.copy_volume('/dev/zero', '/dev/null', 1024, 1,
sync=True, execute=utils.execute,
- ionice=None)
+ ionice=None, throttle=fake_throttle)
self.assertIsNone(output)
- mock_exec.assert_called_once_with('cg_cmd', 'dd', 'if=/dev/zero',
+ mock_exec.assert_called_once_with('fake_throttle', 'dd',
+ 'if=/dev/zero',
'of=/dev/null', 'count=5678',
'bs=1234', 'iflag=direct',
'oflag=direct', run_as_root=True)
output = volume_utils.copy_volume('/dev/zero', '/dev/null', 1024, 1,
sync=False, execute=utils.execute,
- ionice=None)
+ ionice=None, throttle=fake_throttle)
self.assertIsNone(output)
- mock_exec.assert_called_once_with('cg_cmd', 'dd', 'if=/dev/zero',
+ mock_exec.assert_called_once_with('fake_throttle', 'dd',
+ 'if=/dev/zero',
'of=/dev/null', 'count=5678',
'bs=1234', 'iflag=direct',
'oflag=direct', run_as_root=True)
- @mock.patch('cinder.volume.utils.setup_blkio_cgroup',
- return_value=['cg_cmd'])
@mock.patch('cinder.volume.utils._calculate_count',
return_value=(1234, 5678))
@mock.patch('cinder.volume.utils.check_for_odirect_support',
return_value=False)
@mock.patch('cinder.utils.execute')
- @mock.patch('cinder.volume.utils.CONF')
- def test_copy_volume_dd_no_iflag_or_oflag(self, mock_conf, mock_exec,
- mock_support, mock_count,
- mock_cg):
- mock_conf.volume_copy_bps_limit = 10
+ def test_copy_volume_dd_no_iflag_or_oflag(self, mock_exec,
+ mock_support, mock_count):
+ fake_throttle = throttling.Throttle(['fake_throttle'])
output = volume_utils.copy_volume('/dev/zero', '/dev/null', 1024, 1,
sync=True, execute=utils.execute,
- ionice=None)
+ ionice=None, throttle=fake_throttle)
self.assertIsNone(output)
- mock_exec.assert_called_once_with('cg_cmd', 'dd', 'if=/dev/zero',
+ mock_exec.assert_called_once_with('fake_throttle', 'dd',
+ 'if=/dev/zero',
'of=/dev/null', 'count=5678',
'bs=1234', 'conv=fdatasync',
run_as_root=True)
output = volume_utils.copy_volume('/dev/zero', '/dev/null', 1024, 1,
sync=False, execute=utils.execute,
- ionice=None)
+ ionice=None, throttle=fake_throttle)
self.assertIsNone(output)
- mock_exec.assert_called_once_with('cg_cmd', 'dd', 'if=/dev/zero',
+ mock_exec.assert_called_once_with('fake_throttle', 'dd',
+ 'if=/dev/zero',
'of=/dev/null', 'count=5678',
'bs=1234', run_as_root=True)
- @mock.patch('cinder.volume.utils.setup_blkio_cgroup',
- return_value=None)
@mock.patch('cinder.volume.utils._calculate_count',
return_value=(1234, 5678))
@mock.patch('cinder.volume.utils.check_for_odirect_support',
return_value=False)
@mock.patch('cinder.utils.execute')
- @mock.patch('cinder.volume.utils.CONF')
- def test_copy_volume_dd_no_cgroup(self, mock_conf, mock_exec, mock_support,
- mock_count, mock_cg):
- mock_conf.volume_copy_bps_limit = 10
+ def test_copy_volume_dd_no_throttle(self, mock_exec, mock_support,
+ mock_count):
output = volume_utils.copy_volume('/dev/zero', '/dev/null', 1024, 1,
sync=True, execute=utils.execute,
ionice=None)
'count=5678', 'bs=1234',
'conv=fdatasync', run_as_root=True)
- @mock.patch('cinder.volume.utils.setup_blkio_cgroup',
- return_value=None)
@mock.patch('cinder.volume.utils._calculate_count',
return_value=(1234, 5678))
@mock.patch('cinder.volume.utils.check_for_odirect_support',
return_value=False)
@mock.patch('cinder.utils.execute')
- @mock.patch('cinder.volume.utils.CONF')
- def test_copy_volume_dd_with_ionice(self, mock_conf, mock_exec,
- mock_support, mock_count, mock_cg):
- mock_conf.volume_copy_bps_limit = 10
+ def test_copy_volume_dd_with_ionice(self, mock_exec,
+ mock_support, mock_count):
output = volume_utils.copy_volume('/dev/zero', '/dev/null', 1024, 1,
sync=True, execute=utils.execute,
ionice='-c3')
'conv=fdatasync', run_as_root=True)
-class BlkioCgroupTestCase(test.TestCase):
- def test_bps_limit_zero(self):
- mock_exec = mock.Mock()
- output = volume_utils.setup_blkio_cgroup('src', 'dst', 0,
- execute=mock_exec)
- self.assertIsNone(output)
- self.assertFalse(mock_exec.called)
-
- @mock.patch('cinder.utils.get_blkdev_major_minor',
- side_effect=exception.Error)
- def test_get_blkdev_error(self, mock_get_blkdev):
- mock_exec = mock.Mock()
- output = volume_utils.setup_blkio_cgroup('src', 'dst', 1,
- execute=mock_exec)
- self.assertIsNone(output)
- mock_get_blkdev.assert_has_calls([mock.call('src'), mock.call('dst')])
- self.assertFalse(mock_exec.called)
-
- @mock.patch('cinder.utils.get_blkdev_major_minor',
- side_effect=lambda x: x)
- @mock.patch('cinder.volume.utils.CONF')
- def test_cgcreate_fail(self, mock_conf, mock_get_blkdev):
- mock_conf.volume_copy_blkio_cgroup_name = 'test_group'
- mock_exec = mock.Mock()
- mock_exec.side_effect = processutils.ProcessExecutionError
- output = volume_utils.setup_blkio_cgroup('src', 'dst', 1,
- execute=mock_exec)
- self.assertIsNone(output)
- mock_get_blkdev.assert_has_calls([mock.call('src'), mock.call('dst')])
- mock_exec.assert_called_once_with('cgcreate', '-g', 'blkio:test_group',
- run_as_root=True)
-
- @mock.patch('cinder.utils.get_blkdev_major_minor',
- side_effect=lambda x: x)
- @mock.patch('cinder.volume.utils.CONF')
- def test_cgset_fail(self, mock_conf, mock_get_blkdev):
- mock_conf.volume_copy_blkio_cgroup_name = 'test_group'
- mock_exec = mock.Mock()
-
- def cgset_exception(*args, **kwargs):
- if 'cgset' in args:
- raise processutils.ProcessExecutionError
-
- mock_exec.side_effect = cgset_exception
- output = volume_utils.setup_blkio_cgroup('src', 'dst', 1,
- execute=mock_exec)
- self.assertIsNone(output)
- mock_get_blkdev.assert_has_calls([mock.call('src'), mock.call('dst')])
- mock_exec.assert_has_calls([
- mock.call('cgcreate', '-g', 'blkio:test_group', run_as_root=True),
- mock.call('cgset', '-r', 'blkio.throttle.read_bps_device=src 1',
- 'test_group', run_as_root=True)])
-
- @mock.patch('cinder.utils.get_blkdev_major_minor',
- side_effect=lambda x: x)
- @mock.patch('cinder.volume.utils.CONF')
- def test_setup_blkio_cgroup(self, mock_conf, mock_get_blkdev):
- mock_conf.volume_copy_blkio_cgroup_name = 'test_group'
- mock_exec = mock.Mock()
- output = volume_utils.setup_blkio_cgroup('src', 'dst', 1,
- execute=mock_exec)
- self.assertEqual(['cgexec', '-g', 'blkio:test_group'], output)
- mock_get_blkdev.assert_has_calls([mock.call('src'), mock.call('dst')])
- mock_exec.assert_has_calls([
- mock.call('cgcreate', '-g', 'blkio:test_group', run_as_root=True),
- mock.call('cgset', '-r', 'blkio.throttle.read_bps_device=src 1',
- 'test_group', run_as_root=True),
- mock.call('cgset', '-r', 'blkio.throttle.write_bps_device=dst 1',
- 'test_group', run_as_root=True)])
-
-
class VolumeUtilsTestCase(test.TestCase):
def test_null_safe_str(self):
self.assertEqual('', volume_utils.null_safe_str(None))
from cinder.openstack.common import log as logging
from cinder import utils
from cinder.volume import rpcapi as volume_rpcapi
+from cinder.volume import throttling
from cinder.volume import utils as volume_utils
LOG = logging.getLogger(__name__)
def initialized(self):
return self._initialized
+ def set_throttle(self):
+ bps_limit = ((self.configuration and
+ self.configuration.safe_get('volume_copy_bps_limit')) or
+ CONF.volume_copy_bps_limit)
+ cgroup_name = ((self.configuration and
+ self.configuration.safe_get(
+ 'volume_copy_blkio_cgroup_name')) or
+ CONF.volume_copy_blkio_cgroup_name)
+ self._throttle = None
+ if bps_limit:
+ try:
+ self._throttle = throttling.BlkioCgroup(int(bps_limit),
+ cgroup_name)
+ except processutils.ProcessExecutionError as err:
+ LOG.warning(_LW('Failed to activate volume copy throttling: '
+ '%(err)s'), {'err': six.text_type(err)})
+ throttling.Throttle.set_default(self._throttle)
+
def get_version(self):
"""Get the current version of this driver."""
return self.VERSION
src_attach_info['device']['path'],
dest_attach_info['device']['path'],
size_in_mb,
- self.configuration.volume_dd_blocksize)
+ self.configuration.volume_dd_blocksize,
+ throttle=self._throttle)
copy_error = False
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(ex)
return
+ self.driver.set_throttle()
+
# at this point the driver is considered initialized.
self.driver.set_initialized()
--- /dev/null
+# Copyright (c) 2015 Hitachi Data Systems, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Volume copy throttling helpers."""
+
+
+import contextlib
+
+from oslo_concurrency import processutils
+
+from cinder import exception
+from cinder.i18n import _LW, _LE
+from cinder.openstack.common import log as logging
+from cinder import utils
+
+
+LOG = logging.getLogger(__name__)
+
+
+class Throttle(object):
+ """Base class for throttling disk I/O bandwidth"""
+
+ DEFAULT = None
+
+ @staticmethod
+ def set_default(throttle):
+ Throttle.DEFAULT = throttle
+
+ @staticmethod
+ def get_default():
+ return Throttle.DEFAULT or Throttle()
+
+ def __init__(self, prefix=None):
+ self.prefix = prefix or []
+
+ @contextlib.contextmanager
+ def subcommand(self, srcpath, dstpath):
+ """Throttle disk I/O bandwidth used by a sub-command, such as 'dd',
+ that reads from srcpath and writes to dstpath. The sub-command
+ must be executed with the generated prefix command.
+ """
+ yield {'prefix': self.prefix}
+
+
+class BlkioCgroup(Throttle):
+ """Throttle disk I/O bandwidth using blkio cgroups."""
+
+ def __init__(self, bps_limit, cgroup_name):
+ self.bps_limit = bps_limit
+ self.cgroup = cgroup_name
+ self.srcdevs = {}
+ self.dstdevs = {}
+
+ try:
+ utils.execute('cgcreate', '-g', 'blkio:%s' % self.cgroup,
+ run_as_root=True)
+ except processutils.ProcessExecutionError:
+ LOG.error(_LE('Failed to create blkio cgroup \'%(name)s\'.'),
+ {'name': cgroup_name})
+ raise
+
+ def _get_device_number(self, path):
+ try:
+ return utils.get_blkdev_major_minor(path)
+ except exception.Error as e:
+ LOG.error(_LE('Failed to get device number for throttling: '
+ '%(error)s'), {'error': e})
+
+ def _limit_bps(self, rw, dev, bps):
+ try:
+ utils.execute('cgset', '-r', 'blkio.throttle.%s_bps_device=%s %d'
+ % (rw, dev, bps), self.cgroup, run_as_root=True)
+ except processutils.ProcessExecutionError:
+ LOG.warn(_LW('Failed to setup blkio cgroup to throttle the '
+ 'device \'%(device)s\'.'), {'device': dev})
+
+ def _set_limits(self, rw, devs):
+ total = sum(devs.itervalues())
+ for dev in devs:
+ self._limit_bps(rw, dev, self.bps_limit * devs[dev] / total)
+
+ @utils.synchronized('BlkioCgroup')
+ def _inc_device(self, srcdev, dstdev):
+ if srcdev:
+ self.srcdevs[srcdev] = self.srcdevs.get(srcdev, 0) + 1
+ self._set_limits('read', self.srcdevs)
+ if dstdev:
+ self.dstdevs[dstdev] = self.dstdevs.get(dstdev, 0) + 1
+ self._set_limits('write', self.dstdevs)
+
+ @utils.synchronized('BlkioCgroup')
+ def _dec_device(self, srcdev, dstdev):
+ if srcdev:
+ self.srcdevs[srcdev] -= 1
+ if self.srcdevs[srcdev] == 0:
+ del self.srcdevs[srcdev]
+ self._set_limits('read', self.srcdevs)
+ if dstdev:
+ self.dstdevs[dstdev] -= 1
+ if self.dstdevs[dstdev] == 0:
+ del self.dstdevs[dstdev]
+ self._set_limits('write', self.dstdevs)
+
+ @contextlib.contextmanager
+ def subcommand(self, srcpath, dstpath):
+ srcdev = self._get_device_number(srcpath)
+ dstdev = self._get_device_number(dstpath)
+
+ if srcdev is None and dstdev is None:
+ yield {'prefix': []}
+ return
+
+ self._inc_device(srcdev, dstdev)
+ try:
+ yield {'prefix': ['cgexec', '-g', 'blkio:%s' % self.cgroup]}
+ finally:
+ self._dec_device(srcdev, dstdev)
from cinder.brick.local_dev import lvm as brick_lvm
from cinder import exception
-from cinder.i18n import _, _LI, _LW
+from cinder.i18n import _, _LI
from cinder.openstack.common import log as logging
from cinder import rpc
from cinder import utils
+from cinder.volume import throttling
CONF = cfg.CONF
usage_info)
-def setup_blkio_cgroup(srcpath, dstpath, bps_limit, execute=utils.execute):
- if not bps_limit:
- LOG.debug('Not using bps rate limiting on volume copy')
- return None
-
- try:
- srcdev = utils.get_blkdev_major_minor(srcpath)
- except exception.Error as e:
- msg = (_('Failed to get device number for read throttling: %(error)s')
- % {'error': e})
- LOG.error(msg)
- srcdev = None
-
- try:
- dstdev = utils.get_blkdev_major_minor(dstpath)
- except exception.Error as e:
- msg = (_('Failed to get device number for write throttling: %(error)s')
- % {'error': e})
- LOG.error(msg)
- dstdev = None
-
- if not srcdev and not dstdev:
- return None
-
- group_name = CONF.volume_copy_blkio_cgroup_name
- LOG.debug('Setting rate limit to %s bps for blkio '
- 'group: %s' % (bps_limit, group_name))
- try:
- execute('cgcreate', '-g', 'blkio:%s' % group_name, run_as_root=True)
- except processutils.ProcessExecutionError:
- LOG.warn(_LW('Failed to create blkio cgroup'))
- return None
-
- try:
- if srcdev:
- execute('cgset', '-r', 'blkio.throttle.read_bps_device=%s %d'
- % (srcdev, bps_limit), group_name, run_as_root=True)
- if dstdev:
- execute('cgset', '-r', 'blkio.throttle.write_bps_device=%s %d'
- % (dstdev, bps_limit), group_name, run_as_root=True)
- except processutils.ProcessExecutionError:
- msg = (_('Failed to setup blkio cgroup to throttle the devices: '
- '\'%(src)s\',\'%(dst)s\'')
- % {'src': srcdev, 'dst': dstdev})
- LOG.warn(msg)
- return None
-
- return ['cgexec', '-g', 'blkio:%s' % group_name]
-
-
def _calculate_count(size_in_m, blocksize):
# Check if volume_dd_blocksize is valid
return False
-def copy_volume(srcstr, deststr, size_in_m, blocksize, sync=False,
- execute=utils.execute, ionice=None):
+def _copy_volume(prefix, srcstr, deststr, size_in_m, blocksize, sync=False,
+ execute=utils.execute, ionice=None):
# Use O_DIRECT to avoid thrashing the system buffer cache
extra_flags = []
if check_for_odirect_support(srcstr, deststr, 'iflag=direct'):
if ionice is not None:
cmd = ['ionice', ionice] + cmd
- cgcmd = setup_blkio_cgroup(srcstr, deststr, CONF.volume_copy_bps_limit)
- if cgcmd:
- cmd = cgcmd + cmd
+ cmd = prefix + cmd
# Perform the copy
start_time = timeutils.utcnow()
LOG.info(mesg % {'size_in_m': size_in_m, 'mbps': mbps})
+def copy_volume(srcstr, deststr, size_in_m, blocksize, sync=False,
+ execute=utils.execute, ionice=None, throttle=None):
+ if not throttle:
+ throttle = throttling.Throttle.get_default()
+ with throttle.subcommand(srcstr, deststr) as throttle_cmd:
+ _copy_volume(throttle_cmd['prefix'], srcstr, deststr,
+ size_in_m, blocksize, sync=sync,
+ execute=execute, ionice=ionice)
+
+
def clear_volume(volume_size, volume_path, volume_clear=None,
- volume_clear_size=None, volume_clear_ionice=None):
+ volume_clear_size=None, volume_clear_ionice=None,
+ throttle=None):
"""Unprovision old volumes to prevent data leaking between users."""
if volume_clear is None:
volume_clear = CONF.volume_clear
return copy_volume('/dev/zero', volume_path, volume_clear_size,
CONF.volume_dd_blocksize,
sync=True, execute=utils.execute,
- ionice=volume_clear_ionice)
+ ionice=volume_clear_ionice,
+ throttle=throttle)
elif volume_clear == 'shred':
clear_cmd = ['shred', '-n3']
if volume_clear_size: