From: Tomoki Sekiyama Date: Mon, 19 Jan 2015 21:34:18 +0000 (-0500) Subject: Limit volume copy bandwidth per backend X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=ccd7463ef625b7b9b7968b8c8b1c4ef8de3cb3c0;p=openstack-build%2Fcinder-build.git Limit volume copy bandwidth per backend Currently, by setting volume_copy_bps_limit in cinder.conf, volume/image copy can be throttled to mitigate slow down of instances' volume access during the copy. However, this is a global setting and applied to all the backends. This change enables admins to configure volume_copy_bps_limit to different values among backends by specifying it in each backend section. In addition, with this change, the bps limit will be divided when multiple volume copy operations run concurrently on the same backend. For example, if volume_copy_bps_limit is set to 100MB/s, and 2 copies are running on a backend, each copy can use up to 50MB/s. This behavior will be useful for QoS which ensures a certain amount of bandwidth is kept for instances' volume access. This introduces Throttle classes to count volume copies in a backend and setup bandwidth limit. Change-Id: Ie390b46538556fa704b06ffc79cd6cc000bd5936 Implents: blueprint limit-volume-copy-bps-per-backend --- diff --git a/cinder/image/image_utils.py b/cinder/image/image_utils.py index bb1da6d00..ed724be61 100644 --- a/cinder/image/image_utils.py +++ b/cinder/image/image_utils.py @@ -40,6 +40,7 @@ from cinder.openstack.common import fileutils from cinder.openstack.common import imageutils from cinder.openstack.common import log as logging from cinder import utils +from cinder.volume import throttling from cinder.volume import utils as volume_utils LOG = logging.getLogger(__name__) @@ -62,11 +63,11 @@ def qemu_img_info(path, run_as_root=True): return imageutils.QemuImgInfo(out) -def convert_image(source, dest, out_format, bps_limit=None, run_as_root=True): +def _convert_image(prefix, source, dest, out_format, run_as_root=True): """Convert image to other format.""" - cmd = ('qemu-img', 'convert', - '-O', out_format, source, dest) + cmd = prefix + ('qemu-img', 'convert', + '-O', out_format, source, dest) # Check whether O_DIRECT is supported and set '-t none' if it is # This is needed to ensure that all data hit the device before @@ -81,16 +82,12 @@ def convert_image(source, dest, out_format, bps_limit=None, run_as_root=True): volume_utils.check_for_odirect_support(source, dest, 'oflag=direct')): - cmd = ('qemu-img', 'convert', - '-t', 'none', - '-O', out_format, source, dest) + cmd = prefix + ('qemu-img', 'convert', + '-t', 'none', + '-O', out_format, source, dest) start_time = timeutils.utcnow() - cgcmd = volume_utils.setup_blkio_cgroup(source, dest, bps_limit) - if cgcmd: - cmd = tuple(cgcmd) + cmd utils.execute(*cmd, run_as_root=run_as_root) - duration = timeutils.delta_seconds(start_time, timeutils.utcnow()) # NOTE(jdg): use a default of 1, mostly for unit test, but in @@ -110,6 +107,15 @@ def convert_image(source, dest, out_format, bps_limit=None, run_as_root=True): LOG.info(msg % {"sz": fsz_mb, "mbps": mbps}) +def convert_image(source, dest, out_format, run_as_root=True, throttle=None): + if not throttle: + throttle = throttling.Throttle.get_default() + with throttle.subcommand(source, dest) as throttle_cmd: + _convert_image(tuple(throttle_cmd['prefix']), + source, dest, + out_format, run_as_root=run_as_root) + + def resize_image(source, size, run_as_root=False): """Changes the virtual size of the image.""" cmd = ('qemu-img', 'resize', source, '%sG' % size) @@ -278,7 +284,6 @@ def fetch_to_volume_format(context, image_service, LOG.debug("%s was %s, converting to %s " % (image_id, fmt, volume_format)) convert_image(tmp, dest, volume_format, - bps_limit=CONF.volume_copy_bps_limit, run_as_root=run_as_root) data = qemu_img_info(dest, run_as_root=run_as_root) @@ -310,7 +315,6 @@ def upload_volume(context, image_service, image_meta, volume_path, LOG.debug("%s was %s, converting to %s" % (image_id, volume_format, image_meta['disk_format'])) convert_image(volume_path, tmp, image_meta['disk_format'], - bps_limit=CONF.volume_copy_bps_limit, run_as_root=run_as_root) data = qemu_img_info(tmp, run_as_root=run_as_root) diff --git a/cinder/tests/test_image_utils.py b/cinder/tests/test_image_utils.py index 7217e726b..9eb4a853e 100644 --- a/cinder/tests/test_image_utils.py +++ b/cinder/tests/test_image_utils.py @@ -24,6 +24,7 @@ from oslo_utils import units from cinder import exception from cinder.image import image_utils from cinder import test +from cinder.volume import throttling class TestQemuImgInfo(test.TestCase): @@ -72,23 +73,22 @@ class TestQemuImgInfo(test.TestCase): class TestConvertImage(test.TestCase): @mock.patch('cinder.image.image_utils.os.stat') @mock.patch('cinder.utils.execute') - @mock.patch('cinder.volume.utils.setup_blkio_cgroup', - return_value=(mock.sentinel.cgcmd, )) @mock.patch('cinder.utils.is_blk_device', return_value=True) - def test_defaults_block_dev(self, mock_isblk, mock_cgroup, mock_exec, + def test_defaults_block_dev(self, mock_isblk, mock_exec, mock_stat): source = mock.sentinel.source dest = mock.sentinel.dest out_format = mock.sentinel.out_format - cgcmd = mock.sentinel.cgcmd mock_stat.return_value.st_size = 1048576 + throttle = throttling.Throttle(prefix=['cgcmd']) with mock.patch('cinder.volume.utils.check_for_odirect_support', return_value=True): - output = image_utils.convert_image(source, dest, out_format) + output = image_utils.convert_image(source, dest, out_format, + throttle=throttle) self.assertIsNone(output) - mock_exec.assert_called_once_with(cgcmd, 'qemu-img', 'convert', + mock_exec.assert_called_once_with('cgcmd', 'qemu-img', 'convert', '-t', 'none', '-O', out_format, source, dest, run_as_root=True) @@ -99,7 +99,7 @@ class TestConvertImage(test.TestCase): output = image_utils.convert_image(source, dest, out_format) self.assertIsNone(output) - mock_exec.assert_called_once_with(cgcmd, 'qemu-img', 'convert', + mock_exec.assert_called_once_with('qemu-img', 'convert', '-O', out_format, source, dest, run_as_root=True) @@ -107,21 +107,18 @@ class TestConvertImage(test.TestCase): return_value=True) @mock.patch('cinder.image.image_utils.os.stat') @mock.patch('cinder.utils.execute') - @mock.patch('cinder.volume.utils.setup_blkio_cgroup', - return_value=(mock.sentinel.cgcmd, )) @mock.patch('cinder.utils.is_blk_device', return_value=False) - def test_defaults_not_block_dev(self, mock_isblk, mock_cgroup, mock_exec, + def test_defaults_not_block_dev(self, mock_isblk, mock_exec, mock_stat, mock_odirect): source = mock.sentinel.source dest = mock.sentinel.dest out_format = mock.sentinel.out_format - cgcmd = mock.sentinel.cgcmd mock_stat.return_value.st_size = 1048576 output = image_utils.convert_image(source, dest, out_format) self.assertIsNone(output) - mock_exec.assert_called_once_with(cgcmd, 'qemu-img', 'convert', '-O', + mock_exec.assert_called_once_with('qemu-img', 'convert', '-O', out_format, source, dest, run_as_root=True) @@ -339,7 +336,6 @@ class TestUploadVolume(test.TestCase): 'disk_format': mock.sentinel.disk_format} volume_path = mock.sentinel.volume_path mock_os.name = 'posix' - mock_conf.volume_copy_bps_limit = mock.sentinel.bps_limit data = mock_info.return_value data.file_format = mock.sentinel.disk_format temp_file = mock_temp.return_value.__enter__.return_value @@ -351,7 +347,6 @@ class TestUploadVolume(test.TestCase): mock_convert.assert_called_once_with(volume_path, temp_file, mock.sentinel.disk_format, - bps_limit=mock.sentinel.bps_limit, run_as_root=True) mock_info.assert_called_once_with(temp_file, run_as_root=True) mock_open.assert_called_once_with(temp_file, 'rb') @@ -430,7 +425,6 @@ class TestUploadVolume(test.TestCase): 'disk_format': mock.sentinel.disk_format} volume_path = mock.sentinel.volume_path mock_os.name = 'posix' - mock_conf.volume_copy_bps_limit = mock.sentinel.bps_limit data = mock_info.return_value data.file_format = mock.sentinel.other_disk_format temp_file = mock_temp.return_value.__enter__.return_value @@ -441,7 +435,6 @@ class TestUploadVolume(test.TestCase): mock_convert.assert_called_once_with(volume_path, temp_file, mock.sentinel.disk_format, - bps_limit=mock.sentinel.bps_limit, run_as_root=True) mock_info.assert_called_once_with(temp_file, run_as_root=True) self.assertFalse(image_service.update.called) @@ -544,8 +537,6 @@ class TestFetchToVolumeFormat(test.TestCase): volume_format = mock.sentinel.volume_format blocksize = mock.sentinel.blocksize - bps_limit = mock.sentinel.bps_limit - mock_conf.volume_copy_bps_limit = bps_limit data = mock_info.return_value data.file_format = volume_format data.backing_file = None @@ -568,7 +559,6 @@ class TestFetchToVolumeFormat(test.TestCase): self.assertFalse(mock_repl_xen.called) self.assertFalse(mock_copy.called) mock_convert.assert_called_once_with(tmp, dest, volume_format, - bps_limit=bps_limit, run_as_root=True) @mock.patch('cinder.image.image_utils.convert_image') @@ -594,8 +584,6 @@ class TestFetchToVolumeFormat(test.TestCase): size = 4321 run_as_root = mock.sentinel.run_as_root - bps_limit = mock.sentinel.bps_limit - mock_conf.volume_copy_bps_limit = bps_limit data = mock_info.return_value data.file_format = volume_format data.backing_file = None @@ -619,7 +607,6 @@ class TestFetchToVolumeFormat(test.TestCase): self.assertFalse(mock_repl_xen.called) self.assertFalse(mock_copy.called) mock_convert.assert_called_once_with(tmp, dest, volume_format, - bps_limit=bps_limit, run_as_root=run_as_root) @mock.patch('cinder.image.image_utils.convert_image') @@ -647,8 +634,6 @@ class TestFetchToVolumeFormat(test.TestCase): size = 4321 run_as_root = mock.sentinel.run_as_root - bps_limit = mock.sentinel.bps_limit - mock_conf.volume_copy_bps_limit = bps_limit tmp = mock_temp.return_value.__enter__.return_value image_service.show.return_value = {'disk_format': 'raw', 'size': 41126400} @@ -695,8 +680,6 @@ class TestFetchToVolumeFormat(test.TestCase): size = 4321 run_as_root = mock.sentinel.run_as_root - bps_limit = mock.sentinel.bps_limit - mock_conf.volume_copy_bps_limit = bps_limit tmp = mock_temp.return_value.__enter__.return_value image_service.show.return_value = {'disk_format': 'not_raw'} @@ -740,8 +723,6 @@ class TestFetchToVolumeFormat(test.TestCase): size = 4321 run_as_root = mock.sentinel.run_as_root - bps_limit = mock.sentinel.bps_limit - mock_conf.volume_copy_bps_limit = bps_limit tmp = mock_temp.return_value.__enter__.return_value image_service.show.return_value = None @@ -783,8 +764,6 @@ class TestFetchToVolumeFormat(test.TestCase): size = 1234 run_as_root = mock.sentinel.run_as_root - bps_limit = mock.sentinel.bps_limit - mock_conf.volume_copy_bps_limit = bps_limit data = mock_info.return_value data.file_format = volume_format data.backing_file = None @@ -833,8 +812,6 @@ class TestFetchToVolumeFormat(test.TestCase): size = 4321 run_as_root = mock.sentinel.run_as_root - bps_limit = mock.sentinel.bps_limit - mock_conf.volume_copy_bps_limit = bps_limit data = mock_info.return_value data.file_format = None data.backing_file = None @@ -883,8 +860,6 @@ class TestFetchToVolumeFormat(test.TestCase): size = 4321 run_as_root = mock.sentinel.run_as_root - bps_limit = mock.sentinel.bps_limit - mock_conf.volume_copy_bps_limit = bps_limit data = mock_info.return_value data.file_format = volume_format data.backing_file = mock.sentinel.backing_file @@ -933,8 +908,6 @@ class TestFetchToVolumeFormat(test.TestCase): size = 4321 run_as_root = mock.sentinel.run_as_root - bps_limit = mock.sentinel.bps_limit - mock_conf.volume_copy_bps_limit = bps_limit data = mock_info.return_value data.file_format = mock.sentinel.file_format data.backing_file = None @@ -959,7 +932,6 @@ class TestFetchToVolumeFormat(test.TestCase): self.assertFalse(mock_repl_xen.called) self.assertFalse(mock_copy.called) mock_convert.assert_called_once_with(tmp, dest, volume_format, - bps_limit=bps_limit, run_as_root=run_as_root) @mock.patch('cinder.image.image_utils.convert_image') @@ -986,8 +958,6 @@ class TestFetchToVolumeFormat(test.TestCase): size = 4321 run_as_root = mock.sentinel.run_as_root - bps_limit = mock.sentinel.bps_limit - mock_conf.volume_copy_bps_limit = bps_limit data = mock_info.return_value data.file_format = volume_format data.backing_file = None @@ -1011,7 +981,6 @@ class TestFetchToVolumeFormat(test.TestCase): mock_repl_xen.assert_called_once_with(tmp) self.assertFalse(mock_copy.called) mock_convert.assert_called_once_with(tmp, dest, volume_format, - bps_limit=bps_limit, run_as_root=run_as_root) diff --git a/cinder/tests/test_volume.py b/cinder/tests/test_volume.py index 24c202497..0637363ad 100644 --- a/cinder/tests/test_volume.py +++ b/cinder/tests/test_volume.py @@ -238,7 +238,8 @@ class VolumeTestCase(BaseVolumeTestCase): self.stubs.Set(volutils, 'clear_volume', lambda a, b, volume_clear=mox.IgnoreArg(), volume_clear_size=mox.IgnoreArg(), - lvm_type=mox.IgnoreArg(): None) + lvm_type=mox.IgnoreArg(), + throttle=mox.IgnoreArg(): None) self.stubs.Set(tgt.TgtAdm, 'create_iscsi_target', self._fake_create_iscsi_target) @@ -2426,7 +2427,7 @@ class VolumeTestCase(BaseVolumeTestCase): pass def fake_fetch_to_raw(ctx, image_service, image_id, path, blocksize, - size=None): + size=None, throttle=None): pass def fake_clone_image(ctx, volume_ref, diff --git a/cinder/tests/test_volume_throttling.py b/cinder/tests/test_volume_throttling.py new file mode 100644 index 000000000..77f154375 --- /dev/null +++ b/cinder/tests/test_volume_throttling.py @@ -0,0 +1,78 @@ +# Copyright (c) 2015 Hitachi Data Systems, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Tests for volume copy throttling helpers.""" + +import mock +from oslo_config import cfg + +from cinder import test +from cinder import utils +from cinder.volume import throttling + + +CONF = cfg.CONF + + +class ThrottleTestCase(test.TestCase): + + def test_NoThrottle(self): + with throttling.Throttle().subcommand('volume1', 'volume2') as cmd: + self.assertEqual([], cmd['prefix']) + + @mock.patch.object(utils, 'get_blkdev_major_minor') + def test_BlkioCgroup(self, mock_major_minor): + + def fake_get_blkdev_major_minor(path): + return {'src_volume1': "253:0", 'dst_volume1': "253:1", + 'src_volume2': "253:2", 'dst_volume2': "253:3"}[path] + + mock_major_minor.side_effect = fake_get_blkdev_major_minor + + self.exec_cnt = 0 + + def fake_execute(*cmd, **kwargs): + cmd_set = ['cgset', '-r', + 'blkio.throttle.%s_bps_device=%s %d', 'fake_group'] + set_order = [None, + ('read', '253:0', 1024), + ('write', '253:1', 1024), + # a nested job starts; bps limit are set to the half + ('read', '253:0', 512), + ('read', '253:2', 512), + ('write', '253:1', 512), + ('write', '253:3', 512), + # a nested job ends; bps limit is resumed + ('read', '253:0', 1024), + ('write', '253:1', 1024)] + + if set_order[self.exec_cnt] is None: + self.assertEqual(('cgcreate', '-g', 'blkio:fake_group'), cmd) + else: + cmd_set[2] %= set_order[self.exec_cnt] + self.assertEqual(tuple(cmd_set), cmd) + + self.exec_cnt += 1 + + with mock.patch.object(utils, 'execute', side_effect=fake_execute): + throttle = throttling.BlkioCgroup(1024, 'fake_group') + with throttle.subcommand('src_volume1', 'dst_volume1') as cmd: + self.assertEqual(['cgexec', '-g', 'blkio:fake_group'], + cmd['prefix']) + + # a nested job + with throttle.subcommand('src_volume2', 'dst_volume2') as cmd: + self.assertEqual(['cgexec', '-g', 'blkio:fake_group'], + cmd['prefix']) diff --git a/cinder/tests/test_volume_utils.py b/cinder/tests/test_volume_utils.py index a68454fe1..357342a34 100644 --- a/cinder/tests/test_volume_utils.py +++ b/cinder/tests/test_volume_utils.py @@ -23,6 +23,7 @@ from cinder import exception from cinder.openstack.common import log as logging from cinder import test from cinder import utils +from cinder.volume import throttling from cinder.volume import utils as volume_utils @@ -380,7 +381,8 @@ class ClearVolumeTestCase(test.TestCase): self.assertIsNone(output) mock_copy.assert_called_once_with('/dev/zero', 'volume_path', 1024, '1M', sync=True, - execute=utils.execute, ionice='-c3') + execute=utils.execute, ionice='-c3', + throttle=None) @mock.patch('cinder.volume.utils.copy_volume', return_value=None) @mock.patch('cinder.volume.utils.CONF') @@ -394,7 +396,8 @@ class ClearVolumeTestCase(test.TestCase): self.assertIsNone(output) mock_copy.assert_called_once_with('/dev/zero', 'volume_path', 1, '1M', sync=True, - execute=utils.execute, ionice='-c0') + execute=utils.execute, ionice='-c0', + throttle=None) @mock.patch('cinder.utils.execute') @mock.patch('cinder.volume.utils.CONF') @@ -429,8 +432,6 @@ class ClearVolumeTestCase(test.TestCase): class CopyVolumeTestCase(test.TestCase): - @mock.patch('cinder.volume.utils.setup_blkio_cgroup', - return_value=['cg_cmd']) @mock.patch('cinder.volume.utils._calculate_count', return_value=(1234, 5678)) @mock.patch('cinder.volume.utils.check_for_odirect_support', @@ -438,13 +439,14 @@ class CopyVolumeTestCase(test.TestCase): @mock.patch('cinder.utils.execute') @mock.patch('cinder.volume.utils.CONF') def test_copy_volume_dd_iflag_and_oflag(self, mock_conf, mock_exec, - mock_support, mock_count, mock_cg): - mock_conf.volume_copy_bps_limit = 10 + mock_support, mock_count): + fake_throttle = throttling.Throttle(['fake_throttle']) output = volume_utils.copy_volume('/dev/zero', '/dev/null', 1024, 1, sync=True, execute=utils.execute, - ionice=None) + ionice=None, throttle=fake_throttle) self.assertIsNone(output) - mock_exec.assert_called_once_with('cg_cmd', 'dd', 'if=/dev/zero', + mock_exec.assert_called_once_with('fake_throttle', 'dd', + 'if=/dev/zero', 'of=/dev/null', 'count=5678', 'bs=1234', 'iflag=direct', 'oflag=direct', run_as_root=True) @@ -453,30 +455,28 @@ class CopyVolumeTestCase(test.TestCase): output = volume_utils.copy_volume('/dev/zero', '/dev/null', 1024, 1, sync=False, execute=utils.execute, - ionice=None) + ionice=None, throttle=fake_throttle) self.assertIsNone(output) - mock_exec.assert_called_once_with('cg_cmd', 'dd', 'if=/dev/zero', + mock_exec.assert_called_once_with('fake_throttle', 'dd', + 'if=/dev/zero', 'of=/dev/null', 'count=5678', 'bs=1234', 'iflag=direct', 'oflag=direct', run_as_root=True) - @mock.patch('cinder.volume.utils.setup_blkio_cgroup', - return_value=['cg_cmd']) @mock.patch('cinder.volume.utils._calculate_count', return_value=(1234, 5678)) @mock.patch('cinder.volume.utils.check_for_odirect_support', return_value=False) @mock.patch('cinder.utils.execute') - @mock.patch('cinder.volume.utils.CONF') - def test_copy_volume_dd_no_iflag_or_oflag(self, mock_conf, mock_exec, - mock_support, mock_count, - mock_cg): - mock_conf.volume_copy_bps_limit = 10 + def test_copy_volume_dd_no_iflag_or_oflag(self, mock_exec, + mock_support, mock_count): + fake_throttle = throttling.Throttle(['fake_throttle']) output = volume_utils.copy_volume('/dev/zero', '/dev/null', 1024, 1, sync=True, execute=utils.execute, - ionice=None) + ionice=None, throttle=fake_throttle) self.assertIsNone(output) - mock_exec.assert_called_once_with('cg_cmd', 'dd', 'if=/dev/zero', + mock_exec.assert_called_once_with('fake_throttle', 'dd', + 'if=/dev/zero', 'of=/dev/null', 'count=5678', 'bs=1234', 'conv=fdatasync', run_as_root=True) @@ -485,23 +485,20 @@ class CopyVolumeTestCase(test.TestCase): output = volume_utils.copy_volume('/dev/zero', '/dev/null', 1024, 1, sync=False, execute=utils.execute, - ionice=None) + ionice=None, throttle=fake_throttle) self.assertIsNone(output) - mock_exec.assert_called_once_with('cg_cmd', 'dd', 'if=/dev/zero', + mock_exec.assert_called_once_with('fake_throttle', 'dd', + 'if=/dev/zero', 'of=/dev/null', 'count=5678', 'bs=1234', run_as_root=True) - @mock.patch('cinder.volume.utils.setup_blkio_cgroup', - return_value=None) @mock.patch('cinder.volume.utils._calculate_count', return_value=(1234, 5678)) @mock.patch('cinder.volume.utils.check_for_odirect_support', return_value=False) @mock.patch('cinder.utils.execute') - @mock.patch('cinder.volume.utils.CONF') - def test_copy_volume_dd_no_cgroup(self, mock_conf, mock_exec, mock_support, - mock_count, mock_cg): - mock_conf.volume_copy_bps_limit = 10 + def test_copy_volume_dd_no_throttle(self, mock_exec, mock_support, + mock_count): output = volume_utils.copy_volume('/dev/zero', '/dev/null', 1024, 1, sync=True, execute=utils.execute, ionice=None) @@ -510,17 +507,13 @@ class CopyVolumeTestCase(test.TestCase): 'count=5678', 'bs=1234', 'conv=fdatasync', run_as_root=True) - @mock.patch('cinder.volume.utils.setup_blkio_cgroup', - return_value=None) @mock.patch('cinder.volume.utils._calculate_count', return_value=(1234, 5678)) @mock.patch('cinder.volume.utils.check_for_odirect_support', return_value=False) @mock.patch('cinder.utils.execute') - @mock.patch('cinder.volume.utils.CONF') - def test_copy_volume_dd_with_ionice(self, mock_conf, mock_exec, - mock_support, mock_count, mock_cg): - mock_conf.volume_copy_bps_limit = 10 + def test_copy_volume_dd_with_ionice(self, mock_exec, + mock_support, mock_count): output = volume_utils.copy_volume('/dev/zero', '/dev/null', 1024, 1, sync=True, execute=utils.execute, ionice='-c3') @@ -531,77 +524,6 @@ class CopyVolumeTestCase(test.TestCase): 'conv=fdatasync', run_as_root=True) -class BlkioCgroupTestCase(test.TestCase): - def test_bps_limit_zero(self): - mock_exec = mock.Mock() - output = volume_utils.setup_blkio_cgroup('src', 'dst', 0, - execute=mock_exec) - self.assertIsNone(output) - self.assertFalse(mock_exec.called) - - @mock.patch('cinder.utils.get_blkdev_major_minor', - side_effect=exception.Error) - def test_get_blkdev_error(self, mock_get_blkdev): - mock_exec = mock.Mock() - output = volume_utils.setup_blkio_cgroup('src', 'dst', 1, - execute=mock_exec) - self.assertIsNone(output) - mock_get_blkdev.assert_has_calls([mock.call('src'), mock.call('dst')]) - self.assertFalse(mock_exec.called) - - @mock.patch('cinder.utils.get_blkdev_major_minor', - side_effect=lambda x: x) - @mock.patch('cinder.volume.utils.CONF') - def test_cgcreate_fail(self, mock_conf, mock_get_blkdev): - mock_conf.volume_copy_blkio_cgroup_name = 'test_group' - mock_exec = mock.Mock() - mock_exec.side_effect = processutils.ProcessExecutionError - output = volume_utils.setup_blkio_cgroup('src', 'dst', 1, - execute=mock_exec) - self.assertIsNone(output) - mock_get_blkdev.assert_has_calls([mock.call('src'), mock.call('dst')]) - mock_exec.assert_called_once_with('cgcreate', '-g', 'blkio:test_group', - run_as_root=True) - - @mock.patch('cinder.utils.get_blkdev_major_minor', - side_effect=lambda x: x) - @mock.patch('cinder.volume.utils.CONF') - def test_cgset_fail(self, mock_conf, mock_get_blkdev): - mock_conf.volume_copy_blkio_cgroup_name = 'test_group' - mock_exec = mock.Mock() - - def cgset_exception(*args, **kwargs): - if 'cgset' in args: - raise processutils.ProcessExecutionError - - mock_exec.side_effect = cgset_exception - output = volume_utils.setup_blkio_cgroup('src', 'dst', 1, - execute=mock_exec) - self.assertIsNone(output) - mock_get_blkdev.assert_has_calls([mock.call('src'), mock.call('dst')]) - mock_exec.assert_has_calls([ - mock.call('cgcreate', '-g', 'blkio:test_group', run_as_root=True), - mock.call('cgset', '-r', 'blkio.throttle.read_bps_device=src 1', - 'test_group', run_as_root=True)]) - - @mock.patch('cinder.utils.get_blkdev_major_minor', - side_effect=lambda x: x) - @mock.patch('cinder.volume.utils.CONF') - def test_setup_blkio_cgroup(self, mock_conf, mock_get_blkdev): - mock_conf.volume_copy_blkio_cgroup_name = 'test_group' - mock_exec = mock.Mock() - output = volume_utils.setup_blkio_cgroup('src', 'dst', 1, - execute=mock_exec) - self.assertEqual(['cgexec', '-g', 'blkio:test_group'], output) - mock_get_blkdev.assert_has_calls([mock.call('src'), mock.call('dst')]) - mock_exec.assert_has_calls([ - mock.call('cgcreate', '-g', 'blkio:test_group', run_as_root=True), - mock.call('cgset', '-r', 'blkio.throttle.read_bps_device=src 1', - 'test_group', run_as_root=True), - mock.call('cgset', '-r', 'blkio.throttle.write_bps_device=dst 1', - 'test_group', run_as_root=True)]) - - class VolumeUtilsTestCase(test.TestCase): def test_null_safe_str(self): self.assertEqual('', volume_utils.null_safe_str(None)) diff --git a/cinder/volume/driver.py b/cinder/volume/driver.py index cea0eea83..ed39ebbd0 100644 --- a/cinder/volume/driver.py +++ b/cinder/volume/driver.py @@ -32,6 +32,7 @@ from cinder.openstack.common import fileutils from cinder.openstack.common import log as logging from cinder import utils from cinder.volume import rpcapi as volume_rpcapi +from cinder.volume import throttling from cinder.volume import utils as volume_utils LOG = logging.getLogger(__name__) @@ -338,6 +339,24 @@ class BaseVD(object): def initialized(self): return self._initialized + def set_throttle(self): + bps_limit = ((self.configuration and + self.configuration.safe_get('volume_copy_bps_limit')) or + CONF.volume_copy_bps_limit) + cgroup_name = ((self.configuration and + self.configuration.safe_get( + 'volume_copy_blkio_cgroup_name')) or + CONF.volume_copy_blkio_cgroup_name) + self._throttle = None + if bps_limit: + try: + self._throttle = throttling.BlkioCgroup(int(bps_limit), + cgroup_name) + except processutils.ProcessExecutionError as err: + LOG.warning(_LW('Failed to activate volume copy throttling: ' + '%(err)s'), {'err': six.text_type(err)}) + throttling.Throttle.set_default(self._throttle) + def get_version(self): """Get the current version of this driver.""" return self.VERSION @@ -435,7 +454,8 @@ class BaseVD(object): src_attach_info['device']['path'], dest_attach_info['device']['path'], size_in_mb, - self.configuration.volume_dd_blocksize) + self.configuration.volume_dd_blocksize, + throttle=self._throttle) copy_error = False except Exception: with excutils.save_and_reraise_exception(): diff --git a/cinder/volume/manager.py b/cinder/volume/manager.py index 5033c0689..4172f9833 100644 --- a/cinder/volume/manager.py +++ b/cinder/volume/manager.py @@ -327,6 +327,8 @@ class VolumeManager(manager.SchedulerDependentManager): LOG.exception(ex) return + self.driver.set_throttle() + # at this point the driver is considered initialized. self.driver.set_initialized() diff --git a/cinder/volume/throttling.py b/cinder/volume/throttling.py new file mode 100644 index 000000000..3d7aca979 --- /dev/null +++ b/cinder/volume/throttling.py @@ -0,0 +1,129 @@ +# Copyright (c) 2015 Hitachi Data Systems, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Volume copy throttling helpers.""" + + +import contextlib + +from oslo_concurrency import processutils + +from cinder import exception +from cinder.i18n import _LW, _LE +from cinder.openstack.common import log as logging +from cinder import utils + + +LOG = logging.getLogger(__name__) + + +class Throttle(object): + """Base class for throttling disk I/O bandwidth""" + + DEFAULT = None + + @staticmethod + def set_default(throttle): + Throttle.DEFAULT = throttle + + @staticmethod + def get_default(): + return Throttle.DEFAULT or Throttle() + + def __init__(self, prefix=None): + self.prefix = prefix or [] + + @contextlib.contextmanager + def subcommand(self, srcpath, dstpath): + """Throttle disk I/O bandwidth used by a sub-command, such as 'dd', + that reads from srcpath and writes to dstpath. The sub-command + must be executed with the generated prefix command. + """ + yield {'prefix': self.prefix} + + +class BlkioCgroup(Throttle): + """Throttle disk I/O bandwidth using blkio cgroups.""" + + def __init__(self, bps_limit, cgroup_name): + self.bps_limit = bps_limit + self.cgroup = cgroup_name + self.srcdevs = {} + self.dstdevs = {} + + try: + utils.execute('cgcreate', '-g', 'blkio:%s' % self.cgroup, + run_as_root=True) + except processutils.ProcessExecutionError: + LOG.error(_LE('Failed to create blkio cgroup \'%(name)s\'.'), + {'name': cgroup_name}) + raise + + def _get_device_number(self, path): + try: + return utils.get_blkdev_major_minor(path) + except exception.Error as e: + LOG.error(_LE('Failed to get device number for throttling: ' + '%(error)s'), {'error': e}) + + def _limit_bps(self, rw, dev, bps): + try: + utils.execute('cgset', '-r', 'blkio.throttle.%s_bps_device=%s %d' + % (rw, dev, bps), self.cgroup, run_as_root=True) + except processutils.ProcessExecutionError: + LOG.warn(_LW('Failed to setup blkio cgroup to throttle the ' + 'device \'%(device)s\'.'), {'device': dev}) + + def _set_limits(self, rw, devs): + total = sum(devs.itervalues()) + for dev in devs: + self._limit_bps(rw, dev, self.bps_limit * devs[dev] / total) + + @utils.synchronized('BlkioCgroup') + def _inc_device(self, srcdev, dstdev): + if srcdev: + self.srcdevs[srcdev] = self.srcdevs.get(srcdev, 0) + 1 + self._set_limits('read', self.srcdevs) + if dstdev: + self.dstdevs[dstdev] = self.dstdevs.get(dstdev, 0) + 1 + self._set_limits('write', self.dstdevs) + + @utils.synchronized('BlkioCgroup') + def _dec_device(self, srcdev, dstdev): + if srcdev: + self.srcdevs[srcdev] -= 1 + if self.srcdevs[srcdev] == 0: + del self.srcdevs[srcdev] + self._set_limits('read', self.srcdevs) + if dstdev: + self.dstdevs[dstdev] -= 1 + if self.dstdevs[dstdev] == 0: + del self.dstdevs[dstdev] + self._set_limits('write', self.dstdevs) + + @contextlib.contextmanager + def subcommand(self, srcpath, dstpath): + srcdev = self._get_device_number(srcpath) + dstdev = self._get_device_number(dstpath) + + if srcdev is None and dstdev is None: + yield {'prefix': []} + return + + self._inc_device(srcdev, dstdev) + try: + yield {'prefix': ['cgexec', '-g', 'blkio:%s' % self.cgroup]} + finally: + self._dec_device(srcdev, dstdev) diff --git a/cinder/volume/utils.py b/cinder/volume/utils.py index a223e4e21..e96b282e1 100644 --- a/cinder/volume/utils.py +++ b/cinder/volume/utils.py @@ -26,10 +26,11 @@ from oslo_utils import units from cinder.brick.local_dev import lvm as brick_lvm from cinder import exception -from cinder.i18n import _, _LI, _LW +from cinder.i18n import _, _LI from cinder.openstack.common import log as logging from cinder import rpc from cinder import utils +from cinder.volume import throttling CONF = cfg.CONF @@ -246,56 +247,6 @@ def notify_about_cgsnapshot_usage(context, cgsnapshot, event_suffix, usage_info) -def setup_blkio_cgroup(srcpath, dstpath, bps_limit, execute=utils.execute): - if not bps_limit: - LOG.debug('Not using bps rate limiting on volume copy') - return None - - try: - srcdev = utils.get_blkdev_major_minor(srcpath) - except exception.Error as e: - msg = (_('Failed to get device number for read throttling: %(error)s') - % {'error': e}) - LOG.error(msg) - srcdev = None - - try: - dstdev = utils.get_blkdev_major_minor(dstpath) - except exception.Error as e: - msg = (_('Failed to get device number for write throttling: %(error)s') - % {'error': e}) - LOG.error(msg) - dstdev = None - - if not srcdev and not dstdev: - return None - - group_name = CONF.volume_copy_blkio_cgroup_name - LOG.debug('Setting rate limit to %s bps for blkio ' - 'group: %s' % (bps_limit, group_name)) - try: - execute('cgcreate', '-g', 'blkio:%s' % group_name, run_as_root=True) - except processutils.ProcessExecutionError: - LOG.warn(_LW('Failed to create blkio cgroup')) - return None - - try: - if srcdev: - execute('cgset', '-r', 'blkio.throttle.read_bps_device=%s %d' - % (srcdev, bps_limit), group_name, run_as_root=True) - if dstdev: - execute('cgset', '-r', 'blkio.throttle.write_bps_device=%s %d' - % (dstdev, bps_limit), group_name, run_as_root=True) - except processutils.ProcessExecutionError: - msg = (_('Failed to setup blkio cgroup to throttle the devices: ' - '\'%(src)s\',\'%(dst)s\'') - % {'src': srcdev, 'dst': dstdev}) - LOG.warn(msg) - return None - - return ['cgexec', '-g', 'blkio:%s' % group_name] - - def _calculate_count(size_in_m, blocksize): # Check if volume_dd_blocksize is valid @@ -332,8 +283,8 @@ def check_for_odirect_support(src, dest, flag='oflag=direct'): return False -def copy_volume(srcstr, deststr, size_in_m, blocksize, sync=False, - execute=utils.execute, ionice=None): +def _copy_volume(prefix, srcstr, deststr, size_in_m, blocksize, sync=False, + execute=utils.execute, ionice=None): # Use O_DIRECT to avoid thrashing the system buffer cache extra_flags = [] if check_for_odirect_support(srcstr, deststr, 'iflag=direct'): @@ -357,9 +308,7 @@ def copy_volume(srcstr, deststr, size_in_m, blocksize, sync=False, if ionice is not None: cmd = ['ionice', ionice] + cmd - cgcmd = setup_blkio_cgroup(srcstr, deststr, CONF.volume_copy_bps_limit) - if cgcmd: - cmd = cgcmd + cmd + cmd = prefix + cmd # Perform the copy start_time = timeutils.utcnow() @@ -381,8 +330,19 @@ def copy_volume(srcstr, deststr, size_in_m, blocksize, sync=False, LOG.info(mesg % {'size_in_m': size_in_m, 'mbps': mbps}) +def copy_volume(srcstr, deststr, size_in_m, blocksize, sync=False, + execute=utils.execute, ionice=None, throttle=None): + if not throttle: + throttle = throttling.Throttle.get_default() + with throttle.subcommand(srcstr, deststr) as throttle_cmd: + _copy_volume(throttle_cmd['prefix'], srcstr, deststr, + size_in_m, blocksize, sync=sync, + execute=execute, ionice=ionice) + + def clear_volume(volume_size, volume_path, volume_clear=None, - volume_clear_size=None, volume_clear_ionice=None): + volume_clear_size=None, volume_clear_ionice=None, + throttle=None): """Unprovision old volumes to prevent data leaking between users.""" if volume_clear is None: volume_clear = CONF.volume_clear @@ -402,7 +362,8 @@ def clear_volume(volume_size, volume_path, volume_clear=None, return copy_volume('/dev/zero', volume_path, volume_clear_size, CONF.volume_dd_blocksize, sync=True, execute=utils.execute, - ionice=volume_clear_ionice) + ionice=volume_clear_ionice, + throttle=throttle) elif volume_clear == 'shred': clear_cmd = ['shred', '-n3'] if volume_clear_size: