]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Limit volume copy bandwidth per backend
authorTomoki Sekiyama <tomoki.sekiyama@hds.com>
Mon, 19 Jan 2015 21:34:18 +0000 (16:34 -0500)
committerTomoki Sekiyama <tomoki.sekiyama@hds.com>
Mon, 23 Feb 2015 21:30:20 +0000 (16:30 -0500)
Currently, by setting volume_copy_bps_limit in cinder.conf,
volume/image copy can be throttled to mitigate slow down of
instances' volume access during the copy. However, this is a global
setting and applied to all the backends.
This change enables admins to configure volume_copy_bps_limit to
different values among backends by specifying it in each backend
section.

In addition, with this change, the bps limit will be divided when
multiple volume copy operations run concurrently on the same backend.
For example, if volume_copy_bps_limit is set to 100MB/s, and 2 copies
are running on a backend, each copy can use up to 50MB/s.
This behavior will be useful for QoS which ensures a certain amount
of bandwidth is kept for instances' volume access.

This introduces Throttle classes to count volume copies in a backend
and setup bandwidth limit.

Change-Id: Ie390b46538556fa704b06ffc79cd6cc000bd5936
Implents: blueprint limit-volume-copy-bps-per-backend

cinder/image/image_utils.py
cinder/tests/test_image_utils.py
cinder/tests/test_volume.py
cinder/tests/test_volume_throttling.py [new file with mode: 0644]
cinder/tests/test_volume_utils.py
cinder/volume/driver.py
cinder/volume/manager.py
cinder/volume/throttling.py [new file with mode: 0644]
cinder/volume/utils.py

index bb1da6d0053bc955dfe53d6f28f13ccec758a038..ed724be617bcd0c3549632a29c5fd6b91550541a 100644 (file)
@@ -40,6 +40,7 @@ from cinder.openstack.common import fileutils
 from cinder.openstack.common import imageutils
 from cinder.openstack.common import log as logging
 from cinder import utils
+from cinder.volume import throttling
 from cinder.volume import utils as volume_utils
 
 LOG = logging.getLogger(__name__)
@@ -62,11 +63,11 @@ def qemu_img_info(path, run_as_root=True):
     return imageutils.QemuImgInfo(out)
 
 
-def convert_image(source, dest, out_format, bps_limit=None, run_as_root=True):
+def _convert_image(prefix, source, dest, out_format, run_as_root=True):
     """Convert image to other format."""
 
-    cmd = ('qemu-img', 'convert',
-           '-O', out_format, source, dest)
+    cmd = prefix + ('qemu-img', 'convert',
+                    '-O', out_format, source, dest)
 
     # Check whether O_DIRECT is supported and set '-t none' if it is
     # This is needed to ensure that all data hit the device before
@@ -81,16 +82,12 @@ def convert_image(source, dest, out_format, bps_limit=None, run_as_root=True):
             volume_utils.check_for_odirect_support(source,
                                                    dest,
                                                    'oflag=direct')):
-        cmd = ('qemu-img', 'convert',
-               '-t', 'none',
-               '-O', out_format, source, dest)
+        cmd = prefix + ('qemu-img', 'convert',
+                        '-t', 'none',
+                        '-O', out_format, source, dest)
 
     start_time = timeutils.utcnow()
-    cgcmd = volume_utils.setup_blkio_cgroup(source, dest, bps_limit)
-    if cgcmd:
-        cmd = tuple(cgcmd) + cmd
     utils.execute(*cmd, run_as_root=run_as_root)
-
     duration = timeutils.delta_seconds(start_time, timeutils.utcnow())
 
     # NOTE(jdg): use a default of 1, mostly for unit test, but in
@@ -110,6 +107,15 @@ def convert_image(source, dest, out_format, bps_limit=None, run_as_root=True):
     LOG.info(msg % {"sz": fsz_mb, "mbps": mbps})
 
 
+def convert_image(source, dest, out_format, run_as_root=True, throttle=None):
+    if not throttle:
+        throttle = throttling.Throttle.get_default()
+    with throttle.subcommand(source, dest) as throttle_cmd:
+        _convert_image(tuple(throttle_cmd['prefix']),
+                       source, dest,
+                       out_format, run_as_root=run_as_root)
+
+
 def resize_image(source, size, run_as_root=False):
     """Changes the virtual size of the image."""
     cmd = ('qemu-img', 'resize', source, '%sG' % size)
@@ -278,7 +284,6 @@ def fetch_to_volume_format(context, image_service,
         LOG.debug("%s was %s, converting to %s " % (image_id, fmt,
                                                     volume_format))
         convert_image(tmp, dest, volume_format,
-                      bps_limit=CONF.volume_copy_bps_limit,
                       run_as_root=run_as_root)
 
         data = qemu_img_info(dest, run_as_root=run_as_root)
@@ -310,7 +315,6 @@ def upload_volume(context, image_service, image_meta, volume_path,
         LOG.debug("%s was %s, converting to %s" %
                   (image_id, volume_format, image_meta['disk_format']))
         convert_image(volume_path, tmp, image_meta['disk_format'],
-                      bps_limit=CONF.volume_copy_bps_limit,
                       run_as_root=run_as_root)
 
         data = qemu_img_info(tmp, run_as_root=run_as_root)
index 7217e726bb5324d05e6c0c3fcceec366ab73d5ed..9eb4a853ec36008c07623b51a70537373e15c6d0 100644 (file)
@@ -24,6 +24,7 @@ from oslo_utils import units
 from cinder import exception
 from cinder.image import image_utils
 from cinder import test
+from cinder.volume import throttling
 
 
 class TestQemuImgInfo(test.TestCase):
@@ -72,23 +73,22 @@ class TestQemuImgInfo(test.TestCase):
 class TestConvertImage(test.TestCase):
     @mock.patch('cinder.image.image_utils.os.stat')
     @mock.patch('cinder.utils.execute')
-    @mock.patch('cinder.volume.utils.setup_blkio_cgroup',
-                return_value=(mock.sentinel.cgcmd, ))
     @mock.patch('cinder.utils.is_blk_device', return_value=True)
-    def test_defaults_block_dev(self, mock_isblk, mock_cgroup, mock_exec,
+    def test_defaults_block_dev(self, mock_isblk, mock_exec,
                                 mock_stat):
         source = mock.sentinel.source
         dest = mock.sentinel.dest
         out_format = mock.sentinel.out_format
-        cgcmd = mock.sentinel.cgcmd
         mock_stat.return_value.st_size = 1048576
+        throttle = throttling.Throttle(prefix=['cgcmd'])
 
         with mock.patch('cinder.volume.utils.check_for_odirect_support',
                         return_value=True):
-            output = image_utils.convert_image(source, dest, out_format)
+            output = image_utils.convert_image(source, dest, out_format,
+                                               throttle=throttle)
 
             self.assertIsNone(output)
-            mock_exec.assert_called_once_with(cgcmd, 'qemu-img', 'convert',
+            mock_exec.assert_called_once_with('cgcmd', 'qemu-img', 'convert',
                                               '-t', 'none', '-O', out_format,
                                               source, dest, run_as_root=True)
 
@@ -99,7 +99,7 @@ class TestConvertImage(test.TestCase):
             output = image_utils.convert_image(source, dest, out_format)
 
             self.assertIsNone(output)
-            mock_exec.assert_called_once_with(cgcmd, 'qemu-img', 'convert',
+            mock_exec.assert_called_once_with('qemu-img', 'convert',
                                               '-O', out_format, source, dest,
                                               run_as_root=True)
 
@@ -107,21 +107,18 @@ class TestConvertImage(test.TestCase):
                 return_value=True)
     @mock.patch('cinder.image.image_utils.os.stat')
     @mock.patch('cinder.utils.execute')
-    @mock.patch('cinder.volume.utils.setup_blkio_cgroup',
-                return_value=(mock.sentinel.cgcmd, ))
     @mock.patch('cinder.utils.is_blk_device', return_value=False)
-    def test_defaults_not_block_dev(self, mock_isblk, mock_cgroup, mock_exec,
+    def test_defaults_not_block_dev(self, mock_isblk, mock_exec,
                                     mock_stat, mock_odirect):
         source = mock.sentinel.source
         dest = mock.sentinel.dest
         out_format = mock.sentinel.out_format
-        cgcmd = mock.sentinel.cgcmd
         mock_stat.return_value.st_size = 1048576
 
         output = image_utils.convert_image(source, dest, out_format)
 
         self.assertIsNone(output)
-        mock_exec.assert_called_once_with(cgcmd, 'qemu-img', 'convert', '-O',
+        mock_exec.assert_called_once_with('qemu-img', 'convert', '-O',
                                           out_format, source, dest,
                                           run_as_root=True)
 
@@ -339,7 +336,6 @@ class TestUploadVolume(test.TestCase):
                       'disk_format': mock.sentinel.disk_format}
         volume_path = mock.sentinel.volume_path
         mock_os.name = 'posix'
-        mock_conf.volume_copy_bps_limit = mock.sentinel.bps_limit
         data = mock_info.return_value
         data.file_format = mock.sentinel.disk_format
         temp_file = mock_temp.return_value.__enter__.return_value
@@ -351,7 +347,6 @@ class TestUploadVolume(test.TestCase):
         mock_convert.assert_called_once_with(volume_path,
                                              temp_file,
                                              mock.sentinel.disk_format,
-                                             bps_limit=mock.sentinel.bps_limit,
                                              run_as_root=True)
         mock_info.assert_called_once_with(temp_file, run_as_root=True)
         mock_open.assert_called_once_with(temp_file, 'rb')
@@ -430,7 +425,6 @@ class TestUploadVolume(test.TestCase):
                       'disk_format': mock.sentinel.disk_format}
         volume_path = mock.sentinel.volume_path
         mock_os.name = 'posix'
-        mock_conf.volume_copy_bps_limit = mock.sentinel.bps_limit
         data = mock_info.return_value
         data.file_format = mock.sentinel.other_disk_format
         temp_file = mock_temp.return_value.__enter__.return_value
@@ -441,7 +435,6 @@ class TestUploadVolume(test.TestCase):
         mock_convert.assert_called_once_with(volume_path,
                                              temp_file,
                                              mock.sentinel.disk_format,
-                                             bps_limit=mock.sentinel.bps_limit,
                                              run_as_root=True)
         mock_info.assert_called_once_with(temp_file, run_as_root=True)
         self.assertFalse(image_service.update.called)
@@ -544,8 +537,6 @@ class TestFetchToVolumeFormat(test.TestCase):
         volume_format = mock.sentinel.volume_format
         blocksize = mock.sentinel.blocksize
 
-        bps_limit = mock.sentinel.bps_limit
-        mock_conf.volume_copy_bps_limit = bps_limit
         data = mock_info.return_value
         data.file_format = volume_format
         data.backing_file = None
@@ -568,7 +559,6 @@ class TestFetchToVolumeFormat(test.TestCase):
         self.assertFalse(mock_repl_xen.called)
         self.assertFalse(mock_copy.called)
         mock_convert.assert_called_once_with(tmp, dest, volume_format,
-                                             bps_limit=bps_limit,
                                              run_as_root=True)
 
     @mock.patch('cinder.image.image_utils.convert_image')
@@ -594,8 +584,6 @@ class TestFetchToVolumeFormat(test.TestCase):
         size = 4321
         run_as_root = mock.sentinel.run_as_root
 
-        bps_limit = mock.sentinel.bps_limit
-        mock_conf.volume_copy_bps_limit = bps_limit
         data = mock_info.return_value
         data.file_format = volume_format
         data.backing_file = None
@@ -619,7 +607,6 @@ class TestFetchToVolumeFormat(test.TestCase):
         self.assertFalse(mock_repl_xen.called)
         self.assertFalse(mock_copy.called)
         mock_convert.assert_called_once_with(tmp, dest, volume_format,
-                                             bps_limit=bps_limit,
                                              run_as_root=run_as_root)
 
     @mock.patch('cinder.image.image_utils.convert_image')
@@ -647,8 +634,6 @@ class TestFetchToVolumeFormat(test.TestCase):
         size = 4321
         run_as_root = mock.sentinel.run_as_root
 
-        bps_limit = mock.sentinel.bps_limit
-        mock_conf.volume_copy_bps_limit = bps_limit
         tmp = mock_temp.return_value.__enter__.return_value
         image_service.show.return_value = {'disk_format': 'raw',
                                            'size': 41126400}
@@ -695,8 +680,6 @@ class TestFetchToVolumeFormat(test.TestCase):
         size = 4321
         run_as_root = mock.sentinel.run_as_root
 
-        bps_limit = mock.sentinel.bps_limit
-        mock_conf.volume_copy_bps_limit = bps_limit
         tmp = mock_temp.return_value.__enter__.return_value
         image_service.show.return_value = {'disk_format': 'not_raw'}
 
@@ -740,8 +723,6 @@ class TestFetchToVolumeFormat(test.TestCase):
         size = 4321
         run_as_root = mock.sentinel.run_as_root
 
-        bps_limit = mock.sentinel.bps_limit
-        mock_conf.volume_copy_bps_limit = bps_limit
         tmp = mock_temp.return_value.__enter__.return_value
         image_service.show.return_value = None
 
@@ -783,8 +764,6 @@ class TestFetchToVolumeFormat(test.TestCase):
         size = 1234
         run_as_root = mock.sentinel.run_as_root
 
-        bps_limit = mock.sentinel.bps_limit
-        mock_conf.volume_copy_bps_limit = bps_limit
         data = mock_info.return_value
         data.file_format = volume_format
         data.backing_file = None
@@ -833,8 +812,6 @@ class TestFetchToVolumeFormat(test.TestCase):
         size = 4321
         run_as_root = mock.sentinel.run_as_root
 
-        bps_limit = mock.sentinel.bps_limit
-        mock_conf.volume_copy_bps_limit = bps_limit
         data = mock_info.return_value
         data.file_format = None
         data.backing_file = None
@@ -883,8 +860,6 @@ class TestFetchToVolumeFormat(test.TestCase):
         size = 4321
         run_as_root = mock.sentinel.run_as_root
 
-        bps_limit = mock.sentinel.bps_limit
-        mock_conf.volume_copy_bps_limit = bps_limit
         data = mock_info.return_value
         data.file_format = volume_format
         data.backing_file = mock.sentinel.backing_file
@@ -933,8 +908,6 @@ class TestFetchToVolumeFormat(test.TestCase):
         size = 4321
         run_as_root = mock.sentinel.run_as_root
 
-        bps_limit = mock.sentinel.bps_limit
-        mock_conf.volume_copy_bps_limit = bps_limit
         data = mock_info.return_value
         data.file_format = mock.sentinel.file_format
         data.backing_file = None
@@ -959,7 +932,6 @@ class TestFetchToVolumeFormat(test.TestCase):
         self.assertFalse(mock_repl_xen.called)
         self.assertFalse(mock_copy.called)
         mock_convert.assert_called_once_with(tmp, dest, volume_format,
-                                             bps_limit=bps_limit,
                                              run_as_root=run_as_root)
 
     @mock.patch('cinder.image.image_utils.convert_image')
@@ -986,8 +958,6 @@ class TestFetchToVolumeFormat(test.TestCase):
         size = 4321
         run_as_root = mock.sentinel.run_as_root
 
-        bps_limit = mock.sentinel.bps_limit
-        mock_conf.volume_copy_bps_limit = bps_limit
         data = mock_info.return_value
         data.file_format = volume_format
         data.backing_file = None
@@ -1011,7 +981,6 @@ class TestFetchToVolumeFormat(test.TestCase):
         mock_repl_xen.assert_called_once_with(tmp)
         self.assertFalse(mock_copy.called)
         mock_convert.assert_called_once_with(tmp, dest, volume_format,
-                                             bps_limit=bps_limit,
                                              run_as_root=run_as_root)
 
 
index 24c2024972a3e79a4223e9408a40348f4a58d5e2..0637363ad899f98040b3321e983eb72ee062e46f 100644 (file)
@@ -238,7 +238,8 @@ class VolumeTestCase(BaseVolumeTestCase):
         self.stubs.Set(volutils, 'clear_volume',
                        lambda a, b, volume_clear=mox.IgnoreArg(),
                        volume_clear_size=mox.IgnoreArg(),
-                       lvm_type=mox.IgnoreArg(): None)
+                       lvm_type=mox.IgnoreArg(),
+                       throttle=mox.IgnoreArg(): None)
         self.stubs.Set(tgt.TgtAdm,
                        'create_iscsi_target',
                        self._fake_create_iscsi_target)
@@ -2426,7 +2427,7 @@ class VolumeTestCase(BaseVolumeTestCase):
             pass
 
         def fake_fetch_to_raw(ctx, image_service, image_id, path, blocksize,
-                              size=None):
+                              size=None, throttle=None):
             pass
 
         def fake_clone_image(ctx, volume_ref,
diff --git a/cinder/tests/test_volume_throttling.py b/cinder/tests/test_volume_throttling.py
new file mode 100644 (file)
index 0000000..77f1543
--- /dev/null
@@ -0,0 +1,78 @@
+# Copyright (c) 2015 Hitachi Data Systems, Inc.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""Tests for volume copy throttling helpers."""
+
+import mock
+from oslo_config import cfg
+
+from cinder import test
+from cinder import utils
+from cinder.volume import throttling
+
+
+CONF = cfg.CONF
+
+
+class ThrottleTestCase(test.TestCase):
+
+    def test_NoThrottle(self):
+        with throttling.Throttle().subcommand('volume1', 'volume2') as cmd:
+            self.assertEqual([], cmd['prefix'])
+
+    @mock.patch.object(utils, 'get_blkdev_major_minor')
+    def test_BlkioCgroup(self, mock_major_minor):
+
+        def fake_get_blkdev_major_minor(path):
+            return {'src_volume1': "253:0", 'dst_volume1': "253:1",
+                    'src_volume2': "253:2", 'dst_volume2': "253:3"}[path]
+
+        mock_major_minor.side_effect = fake_get_blkdev_major_minor
+
+        self.exec_cnt = 0
+
+        def fake_execute(*cmd, **kwargs):
+            cmd_set = ['cgset', '-r',
+                       'blkio.throttle.%s_bps_device=%s %d', 'fake_group']
+            set_order = [None,
+                         ('read', '253:0', 1024),
+                         ('write', '253:1', 1024),
+                         # a nested job starts; bps limit are set to the half
+                         ('read', '253:0', 512),
+                         ('read', '253:2', 512),
+                         ('write', '253:1', 512),
+                         ('write', '253:3', 512),
+                         # a nested job ends; bps limit is resumed
+                         ('read', '253:0', 1024),
+                         ('write', '253:1', 1024)]
+
+            if set_order[self.exec_cnt] is None:
+                self.assertEqual(('cgcreate', '-g', 'blkio:fake_group'), cmd)
+            else:
+                cmd_set[2] %= set_order[self.exec_cnt]
+                self.assertEqual(tuple(cmd_set), cmd)
+
+            self.exec_cnt += 1
+
+        with mock.patch.object(utils, 'execute', side_effect=fake_execute):
+            throttle = throttling.BlkioCgroup(1024, 'fake_group')
+            with throttle.subcommand('src_volume1', 'dst_volume1') as cmd:
+                self.assertEqual(['cgexec', '-g', 'blkio:fake_group'],
+                                 cmd['prefix'])
+
+                # a nested job
+                with throttle.subcommand('src_volume2', 'dst_volume2') as cmd:
+                    self.assertEqual(['cgexec', '-g', 'blkio:fake_group'],
+                                     cmd['prefix'])
index a68454fe12c437d90e753b7b9307c3bc4e275280..357342a34dbabc81c8f385292f860b4392804b59 100644 (file)
@@ -23,6 +23,7 @@ from cinder import exception
 from cinder.openstack.common import log as logging
 from cinder import test
 from cinder import utils
+from cinder.volume import throttling
 from cinder.volume import utils as volume_utils
 
 
@@ -380,7 +381,8 @@ class ClearVolumeTestCase(test.TestCase):
         self.assertIsNone(output)
         mock_copy.assert_called_once_with('/dev/zero', 'volume_path', 1024,
                                           '1M', sync=True,
-                                          execute=utils.execute, ionice='-c3')
+                                          execute=utils.execute, ionice='-c3',
+                                          throttle=None)
 
     @mock.patch('cinder.volume.utils.copy_volume', return_value=None)
     @mock.patch('cinder.volume.utils.CONF')
@@ -394,7 +396,8 @@ class ClearVolumeTestCase(test.TestCase):
         self.assertIsNone(output)
         mock_copy.assert_called_once_with('/dev/zero', 'volume_path', 1,
                                           '1M', sync=True,
-                                          execute=utils.execute, ionice='-c0')
+                                          execute=utils.execute, ionice='-c0',
+                                          throttle=None)
 
     @mock.patch('cinder.utils.execute')
     @mock.patch('cinder.volume.utils.CONF')
@@ -429,8 +432,6 @@ class ClearVolumeTestCase(test.TestCase):
 
 
 class CopyVolumeTestCase(test.TestCase):
-    @mock.patch('cinder.volume.utils.setup_blkio_cgroup',
-                return_value=['cg_cmd'])
     @mock.patch('cinder.volume.utils._calculate_count',
                 return_value=(1234, 5678))
     @mock.patch('cinder.volume.utils.check_for_odirect_support',
@@ -438,13 +439,14 @@ class CopyVolumeTestCase(test.TestCase):
     @mock.patch('cinder.utils.execute')
     @mock.patch('cinder.volume.utils.CONF')
     def test_copy_volume_dd_iflag_and_oflag(self, mock_conf, mock_exec,
-                                            mock_support, mock_count, mock_cg):
-        mock_conf.volume_copy_bps_limit = 10
+                                            mock_support, mock_count):
+        fake_throttle = throttling.Throttle(['fake_throttle'])
         output = volume_utils.copy_volume('/dev/zero', '/dev/null', 1024, 1,
                                           sync=True, execute=utils.execute,
-                                          ionice=None)
+                                          ionice=None, throttle=fake_throttle)
         self.assertIsNone(output)
-        mock_exec.assert_called_once_with('cg_cmd', 'dd', 'if=/dev/zero',
+        mock_exec.assert_called_once_with('fake_throttle', 'dd',
+                                          'if=/dev/zero',
                                           'of=/dev/null', 'count=5678',
                                           'bs=1234', 'iflag=direct',
                                           'oflag=direct', run_as_root=True)
@@ -453,30 +455,28 @@ class CopyVolumeTestCase(test.TestCase):
 
         output = volume_utils.copy_volume('/dev/zero', '/dev/null', 1024, 1,
                                           sync=False, execute=utils.execute,
-                                          ionice=None)
+                                          ionice=None, throttle=fake_throttle)
         self.assertIsNone(output)
-        mock_exec.assert_called_once_with('cg_cmd', 'dd', 'if=/dev/zero',
+        mock_exec.assert_called_once_with('fake_throttle', 'dd',
+                                          'if=/dev/zero',
                                           'of=/dev/null', 'count=5678',
                                           'bs=1234', 'iflag=direct',
                                           'oflag=direct', run_as_root=True)
 
-    @mock.patch('cinder.volume.utils.setup_blkio_cgroup',
-                return_value=['cg_cmd'])
     @mock.patch('cinder.volume.utils._calculate_count',
                 return_value=(1234, 5678))
     @mock.patch('cinder.volume.utils.check_for_odirect_support',
                 return_value=False)
     @mock.patch('cinder.utils.execute')
-    @mock.patch('cinder.volume.utils.CONF')
-    def test_copy_volume_dd_no_iflag_or_oflag(self, mock_conf, mock_exec,
-                                              mock_support, mock_count,
-                                              mock_cg):
-        mock_conf.volume_copy_bps_limit = 10
+    def test_copy_volume_dd_no_iflag_or_oflag(self, mock_exec,
+                                              mock_support, mock_count):
+        fake_throttle = throttling.Throttle(['fake_throttle'])
         output = volume_utils.copy_volume('/dev/zero', '/dev/null', 1024, 1,
                                           sync=True, execute=utils.execute,
-                                          ionice=None)
+                                          ionice=None, throttle=fake_throttle)
         self.assertIsNone(output)
-        mock_exec.assert_called_once_with('cg_cmd', 'dd', 'if=/dev/zero',
+        mock_exec.assert_called_once_with('fake_throttle', 'dd',
+                                          'if=/dev/zero',
                                           'of=/dev/null', 'count=5678',
                                           'bs=1234', 'conv=fdatasync',
                                           run_as_root=True)
@@ -485,23 +485,20 @@ class CopyVolumeTestCase(test.TestCase):
 
         output = volume_utils.copy_volume('/dev/zero', '/dev/null', 1024, 1,
                                           sync=False, execute=utils.execute,
-                                          ionice=None)
+                                          ionice=None, throttle=fake_throttle)
         self.assertIsNone(output)
-        mock_exec.assert_called_once_with('cg_cmd', 'dd', 'if=/dev/zero',
+        mock_exec.assert_called_once_with('fake_throttle', 'dd',
+                                          'if=/dev/zero',
                                           'of=/dev/null', 'count=5678',
                                           'bs=1234', run_as_root=True)
 
-    @mock.patch('cinder.volume.utils.setup_blkio_cgroup',
-                return_value=None)
     @mock.patch('cinder.volume.utils._calculate_count',
                 return_value=(1234, 5678))
     @mock.patch('cinder.volume.utils.check_for_odirect_support',
                 return_value=False)
     @mock.patch('cinder.utils.execute')
-    @mock.patch('cinder.volume.utils.CONF')
-    def test_copy_volume_dd_no_cgroup(self, mock_conf, mock_exec, mock_support,
-                                      mock_count, mock_cg):
-        mock_conf.volume_copy_bps_limit = 10
+    def test_copy_volume_dd_no_throttle(self, mock_exec, mock_support,
+                                        mock_count):
         output = volume_utils.copy_volume('/dev/zero', '/dev/null', 1024, 1,
                                           sync=True, execute=utils.execute,
                                           ionice=None)
@@ -510,17 +507,13 @@ class CopyVolumeTestCase(test.TestCase):
                                           'count=5678', 'bs=1234',
                                           'conv=fdatasync', run_as_root=True)
 
-    @mock.patch('cinder.volume.utils.setup_blkio_cgroup',
-                return_value=None)
     @mock.patch('cinder.volume.utils._calculate_count',
                 return_value=(1234, 5678))
     @mock.patch('cinder.volume.utils.check_for_odirect_support',
                 return_value=False)
     @mock.patch('cinder.utils.execute')
-    @mock.patch('cinder.volume.utils.CONF')
-    def test_copy_volume_dd_with_ionice(self, mock_conf, mock_exec,
-                                        mock_support, mock_count, mock_cg):
-        mock_conf.volume_copy_bps_limit = 10
+    def test_copy_volume_dd_with_ionice(self, mock_exec,
+                                        mock_support, mock_count):
         output = volume_utils.copy_volume('/dev/zero', '/dev/null', 1024, 1,
                                           sync=True, execute=utils.execute,
                                           ionice='-c3')
@@ -531,77 +524,6 @@ class CopyVolumeTestCase(test.TestCase):
                                           'conv=fdatasync', run_as_root=True)
 
 
-class BlkioCgroupTestCase(test.TestCase):
-    def test_bps_limit_zero(self):
-        mock_exec = mock.Mock()
-        output = volume_utils.setup_blkio_cgroup('src', 'dst', 0,
-                                                 execute=mock_exec)
-        self.assertIsNone(output)
-        self.assertFalse(mock_exec.called)
-
-    @mock.patch('cinder.utils.get_blkdev_major_minor',
-                side_effect=exception.Error)
-    def test_get_blkdev_error(self, mock_get_blkdev):
-        mock_exec = mock.Mock()
-        output = volume_utils.setup_blkio_cgroup('src', 'dst', 1,
-                                                 execute=mock_exec)
-        self.assertIsNone(output)
-        mock_get_blkdev.assert_has_calls([mock.call('src'), mock.call('dst')])
-        self.assertFalse(mock_exec.called)
-
-    @mock.patch('cinder.utils.get_blkdev_major_minor',
-                side_effect=lambda x: x)
-    @mock.patch('cinder.volume.utils.CONF')
-    def test_cgcreate_fail(self, mock_conf, mock_get_blkdev):
-        mock_conf.volume_copy_blkio_cgroup_name = 'test_group'
-        mock_exec = mock.Mock()
-        mock_exec.side_effect = processutils.ProcessExecutionError
-        output = volume_utils.setup_blkio_cgroup('src', 'dst', 1,
-                                                 execute=mock_exec)
-        self.assertIsNone(output)
-        mock_get_blkdev.assert_has_calls([mock.call('src'), mock.call('dst')])
-        mock_exec.assert_called_once_with('cgcreate', '-g', 'blkio:test_group',
-                                          run_as_root=True)
-
-    @mock.patch('cinder.utils.get_blkdev_major_minor',
-                side_effect=lambda x: x)
-    @mock.patch('cinder.volume.utils.CONF')
-    def test_cgset_fail(self, mock_conf, mock_get_blkdev):
-        mock_conf.volume_copy_blkio_cgroup_name = 'test_group'
-        mock_exec = mock.Mock()
-
-        def cgset_exception(*args, **kwargs):
-            if 'cgset' in args:
-                raise processutils.ProcessExecutionError
-
-        mock_exec.side_effect = cgset_exception
-        output = volume_utils.setup_blkio_cgroup('src', 'dst', 1,
-                                                 execute=mock_exec)
-        self.assertIsNone(output)
-        mock_get_blkdev.assert_has_calls([mock.call('src'), mock.call('dst')])
-        mock_exec.assert_has_calls([
-            mock.call('cgcreate', '-g', 'blkio:test_group', run_as_root=True),
-            mock.call('cgset', '-r', 'blkio.throttle.read_bps_device=src 1',
-                      'test_group', run_as_root=True)])
-
-    @mock.patch('cinder.utils.get_blkdev_major_minor',
-                side_effect=lambda x: x)
-    @mock.patch('cinder.volume.utils.CONF')
-    def test_setup_blkio_cgroup(self, mock_conf, mock_get_blkdev):
-        mock_conf.volume_copy_blkio_cgroup_name = 'test_group'
-        mock_exec = mock.Mock()
-        output = volume_utils.setup_blkio_cgroup('src', 'dst', 1,
-                                                 execute=mock_exec)
-        self.assertEqual(['cgexec', '-g', 'blkio:test_group'], output)
-        mock_get_blkdev.assert_has_calls([mock.call('src'), mock.call('dst')])
-        mock_exec.assert_has_calls([
-            mock.call('cgcreate', '-g', 'blkio:test_group', run_as_root=True),
-            mock.call('cgset', '-r', 'blkio.throttle.read_bps_device=src 1',
-                      'test_group', run_as_root=True),
-            mock.call('cgset', '-r', 'blkio.throttle.write_bps_device=dst 1',
-                      'test_group', run_as_root=True)])
-
-
 class VolumeUtilsTestCase(test.TestCase):
     def test_null_safe_str(self):
         self.assertEqual('', volume_utils.null_safe_str(None))
index cea0eea8340ed74c0985fa27bd1b203a15f537a6..ed39ebbd082f5090d3e45d60b7fdb7520f23c3b7 100644 (file)
@@ -32,6 +32,7 @@ from cinder.openstack.common import fileutils
 from cinder.openstack.common import log as logging
 from cinder import utils
 from cinder.volume import rpcapi as volume_rpcapi
+from cinder.volume import throttling
 from cinder.volume import utils as volume_utils
 
 LOG = logging.getLogger(__name__)
@@ -338,6 +339,24 @@ class BaseVD(object):
     def initialized(self):
         return self._initialized
 
+    def set_throttle(self):
+        bps_limit = ((self.configuration and
+                      self.configuration.safe_get('volume_copy_bps_limit')) or
+                     CONF.volume_copy_bps_limit)
+        cgroup_name = ((self.configuration and
+                        self.configuration.safe_get(
+                            'volume_copy_blkio_cgroup_name')) or
+                       CONF.volume_copy_blkio_cgroup_name)
+        self._throttle = None
+        if bps_limit:
+            try:
+                self._throttle = throttling.BlkioCgroup(int(bps_limit),
+                                                        cgroup_name)
+            except processutils.ProcessExecutionError as err:
+                LOG.warning(_LW('Failed to activate volume copy throttling: '
+                                '%(err)s'), {'err': six.text_type(err)})
+        throttling.Throttle.set_default(self._throttle)
+
     def get_version(self):
         """Get the current version of this driver."""
         return self.VERSION
@@ -435,7 +454,8 @@ class BaseVD(object):
                 src_attach_info['device']['path'],
                 dest_attach_info['device']['path'],
                 size_in_mb,
-                self.configuration.volume_dd_blocksize)
+                self.configuration.volume_dd_blocksize,
+                throttle=self._throttle)
             copy_error = False
         except Exception:
             with excutils.save_and_reraise_exception():
index 5033c0689ba7f9d47d2124e937e2443941dbaa8b..4172f9833ac77b6e73066c5aea62671dc801636c 100644 (file)
@@ -327,6 +327,8 @@ class VolumeManager(manager.SchedulerDependentManager):
             LOG.exception(ex)
             return
 
+        self.driver.set_throttle()
+
         # at this point the driver is considered initialized.
         self.driver.set_initialized()
 
diff --git a/cinder/volume/throttling.py b/cinder/volume/throttling.py
new file mode 100644 (file)
index 0000000..3d7aca9
--- /dev/null
@@ -0,0 +1,129 @@
+# Copyright (c) 2015 Hitachi Data Systems, Inc.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""Volume copy throttling helpers."""
+
+
+import contextlib
+
+from oslo_concurrency import processutils
+
+from cinder import exception
+from cinder.i18n import _LW, _LE
+from cinder.openstack.common import log as logging
+from cinder import utils
+
+
+LOG = logging.getLogger(__name__)
+
+
+class Throttle(object):
+    """Base class for throttling disk I/O bandwidth"""
+
+    DEFAULT = None
+
+    @staticmethod
+    def set_default(throttle):
+        Throttle.DEFAULT = throttle
+
+    @staticmethod
+    def get_default():
+        return Throttle.DEFAULT or Throttle()
+
+    def __init__(self, prefix=None):
+        self.prefix = prefix or []
+
+    @contextlib.contextmanager
+    def subcommand(self, srcpath, dstpath):
+        """Throttle disk I/O bandwidth used by a sub-command, such as 'dd',
+        that reads from srcpath and writes to dstpath. The sub-command
+        must be executed with the generated prefix command.
+        """
+        yield {'prefix': self.prefix}
+
+
+class BlkioCgroup(Throttle):
+    """Throttle disk I/O bandwidth using blkio cgroups."""
+
+    def __init__(self, bps_limit, cgroup_name):
+        self.bps_limit = bps_limit
+        self.cgroup = cgroup_name
+        self.srcdevs = {}
+        self.dstdevs = {}
+
+        try:
+            utils.execute('cgcreate', '-g', 'blkio:%s' % self.cgroup,
+                          run_as_root=True)
+        except processutils.ProcessExecutionError:
+            LOG.error(_LE('Failed to create blkio cgroup \'%(name)s\'.'),
+                      {'name': cgroup_name})
+            raise
+
+    def _get_device_number(self, path):
+        try:
+            return utils.get_blkdev_major_minor(path)
+        except exception.Error as e:
+            LOG.error(_LE('Failed to get device number for throttling: '
+                          '%(error)s'), {'error': e})
+
+    def _limit_bps(self, rw, dev, bps):
+        try:
+            utils.execute('cgset', '-r', 'blkio.throttle.%s_bps_device=%s %d'
+                          % (rw, dev, bps), self.cgroup, run_as_root=True)
+        except processutils.ProcessExecutionError:
+            LOG.warn(_LW('Failed to setup blkio cgroup to throttle the '
+                         'device \'%(device)s\'.'), {'device': dev})
+
+    def _set_limits(self, rw, devs):
+        total = sum(devs.itervalues())
+        for dev in devs:
+            self._limit_bps(rw, dev, self.bps_limit * devs[dev] / total)
+
+    @utils.synchronized('BlkioCgroup')
+    def _inc_device(self, srcdev, dstdev):
+        if srcdev:
+            self.srcdevs[srcdev] = self.srcdevs.get(srcdev, 0) + 1
+            self._set_limits('read', self.srcdevs)
+        if dstdev:
+            self.dstdevs[dstdev] = self.dstdevs.get(dstdev, 0) + 1
+            self._set_limits('write', self.dstdevs)
+
+    @utils.synchronized('BlkioCgroup')
+    def _dec_device(self, srcdev, dstdev):
+        if srcdev:
+            self.srcdevs[srcdev] -= 1
+            if self.srcdevs[srcdev] == 0:
+                del self.srcdevs[srcdev]
+            self._set_limits('read', self.srcdevs)
+        if dstdev:
+            self.dstdevs[dstdev] -= 1
+            if self.dstdevs[dstdev] == 0:
+                del self.dstdevs[dstdev]
+            self._set_limits('write', self.dstdevs)
+
+    @contextlib.contextmanager
+    def subcommand(self, srcpath, dstpath):
+        srcdev = self._get_device_number(srcpath)
+        dstdev = self._get_device_number(dstpath)
+
+        if srcdev is None and dstdev is None:
+            yield {'prefix': []}
+            return
+
+        self._inc_device(srcdev, dstdev)
+        try:
+            yield {'prefix': ['cgexec', '-g', 'blkio:%s' % self.cgroup]}
+        finally:
+            self._dec_device(srcdev, dstdev)
index a223e4e21a5553e6cfff3cf703ea107530caf6b8..e96b282e13e02de53649343290e69e41530130e6 100644 (file)
@@ -26,10 +26,11 @@ from oslo_utils import units
 
 from cinder.brick.local_dev import lvm as brick_lvm
 from cinder import exception
-from cinder.i18n import _, _LI, _LW
+from cinder.i18n import _, _LI
 from cinder.openstack.common import log as logging
 from cinder import rpc
 from cinder import utils
+from cinder.volume import throttling
 
 
 CONF = cfg.CONF
@@ -246,56 +247,6 @@ def notify_about_cgsnapshot_usage(context, cgsnapshot, event_suffix,
         usage_info)
 
 
-def setup_blkio_cgroup(srcpath, dstpath, bps_limit, execute=utils.execute):
-    if not bps_limit:
-        LOG.debug('Not using bps rate limiting on volume copy')
-        return None
-
-    try:
-        srcdev = utils.get_blkdev_major_minor(srcpath)
-    except exception.Error as e:
-        msg = (_('Failed to get device number for read throttling: %(error)s')
-               % {'error': e})
-        LOG.error(msg)
-        srcdev = None
-
-    try:
-        dstdev = utils.get_blkdev_major_minor(dstpath)
-    except exception.Error as e:
-        msg = (_('Failed to get device number for write throttling: %(error)s')
-               % {'error': e})
-        LOG.error(msg)
-        dstdev = None
-
-    if not srcdev and not dstdev:
-        return None
-
-    group_name = CONF.volume_copy_blkio_cgroup_name
-    LOG.debug('Setting rate limit to %s bps for blkio '
-              'group: %s' % (bps_limit, group_name))
-    try:
-        execute('cgcreate', '-g', 'blkio:%s' % group_name, run_as_root=True)
-    except processutils.ProcessExecutionError:
-        LOG.warn(_LW('Failed to create blkio cgroup'))
-        return None
-
-    try:
-        if srcdev:
-            execute('cgset', '-r', 'blkio.throttle.read_bps_device=%s %d'
-                    % (srcdev, bps_limit), group_name, run_as_root=True)
-        if dstdev:
-            execute('cgset', '-r', 'blkio.throttle.write_bps_device=%s %d'
-                    % (dstdev, bps_limit), group_name, run_as_root=True)
-    except processutils.ProcessExecutionError:
-        msg = (_('Failed to setup blkio cgroup to throttle the devices: '
-                 '\'%(src)s\',\'%(dst)s\'')
-               % {'src': srcdev, 'dst': dstdev})
-        LOG.warn(msg)
-        return None
-
-    return ['cgexec', '-g', 'blkio:%s' % group_name]
-
-
 def _calculate_count(size_in_m, blocksize):
 
     # Check if volume_dd_blocksize is valid
@@ -332,8 +283,8 @@ def check_for_odirect_support(src, dest, flag='oflag=direct'):
         return False
 
 
-def copy_volume(srcstr, deststr, size_in_m, blocksize, sync=False,
-                execute=utils.execute, ionice=None):
+def _copy_volume(prefix, srcstr, deststr, size_in_m, blocksize, sync=False,
+                 execute=utils.execute, ionice=None):
     # Use O_DIRECT to avoid thrashing the system buffer cache
     extra_flags = []
     if check_for_odirect_support(srcstr, deststr, 'iflag=direct'):
@@ -357,9 +308,7 @@ def copy_volume(srcstr, deststr, size_in_m, blocksize, sync=False,
     if ionice is not None:
         cmd = ['ionice', ionice] + cmd
 
-    cgcmd = setup_blkio_cgroup(srcstr, deststr, CONF.volume_copy_bps_limit)
-    if cgcmd:
-        cmd = cgcmd + cmd
+    cmd = prefix + cmd
 
     # Perform the copy
     start_time = timeutils.utcnow()
@@ -381,8 +330,19 @@ def copy_volume(srcstr, deststr, size_in_m, blocksize, sync=False,
     LOG.info(mesg % {'size_in_m': size_in_m, 'mbps': mbps})
 
 
+def copy_volume(srcstr, deststr, size_in_m, blocksize, sync=False,
+                execute=utils.execute, ionice=None, throttle=None):
+    if not throttle:
+        throttle = throttling.Throttle.get_default()
+    with throttle.subcommand(srcstr, deststr) as throttle_cmd:
+        _copy_volume(throttle_cmd['prefix'], srcstr, deststr,
+                     size_in_m, blocksize, sync=sync,
+                     execute=execute, ionice=ionice)
+
+
 def clear_volume(volume_size, volume_path, volume_clear=None,
-                 volume_clear_size=None, volume_clear_ionice=None):
+                 volume_clear_size=None, volume_clear_ionice=None,
+                 throttle=None):
     """Unprovision old volumes to prevent data leaking between users."""
     if volume_clear is None:
         volume_clear = CONF.volume_clear
@@ -402,7 +362,8 @@ def clear_volume(volume_size, volume_path, volume_clear=None,
         return copy_volume('/dev/zero', volume_path, volume_clear_size,
                            CONF.volume_dd_blocksize,
                            sync=True, execute=utils.execute,
-                           ionice=volume_clear_ionice)
+                           ionice=volume_clear_ionice,
+                           throttle=throttle)
     elif volume_clear == 'shred':
         clear_cmd = ['shred', '-n3']
         if volume_clear_size: