# Mutable available resources.
# These will change as resources are virtually "consumed".
self.total_capacity_gb = 0
+ # capacity has been allocated in cinder POV, which should be
+ # sum(vol['size'] for vol in vols_on_hosts)
+ self.allocated_capacity_gb = 0
self.free_capacity_gb = None
self.reserved_percentage = 0
self.total_capacity_gb = capability['total_capacity_gb']
self.free_capacity_gb = capability['free_capacity_gb']
+ self.allocated_capacity_gb = capability.get(
+ 'allocated_capacity_gb', 0)
self.reserved_percentage = capability['reserved_percentage']
self.updated = capability['timestamp']
def consume_from_volume(self, volume):
"""Incrementally update host state from an volume."""
volume_gb = volume['size']
+ self.allocated_capacity_gb += volume_gb
if self.free_capacity_gb == 'infinite':
# There's virtually infinite space on back-end
pass
+# Copyright (c) 2013 eBay Inc.
# Copyright (c) 2012 OpenStack Foundation
# All Rights Reserved.
#
# License for the specific language governing permissions and limitations
# under the License.
"""
-Capacity Weigher. Weigh hosts by their available capacity.
+Weighers that weigh hosts by their capacity, including following two
+weighers:
+
+1. Capacity Weigher. Weigh hosts by their available capacity.
The default is to spread volumes across all hosts evenly. If you prefer
stacking, you can set the 'capacity_weight_multiplier' option to a negative
number and the weighing has the opposite effect of the default.
+
+2. Allocated Capacity Weigher. Weigh hosts by their allocated capacity.
+
+The default behavior is to place new volume to the host allocated the least
+space. This weigher is intended to simulate the behavior of SimpleScheduler.
+If you prefer to place volumes to host allocated the most space, you can
+set the 'allocated_capacity_weight_multiplier' option to a postive number
+and the weighing has the opposite effect of the default.
"""
default=1.0,
help='Multiplier used for weighing volume capacity. '
'Negative numbers mean to stack vs spread.'),
+ cfg.FloatOpt('allocated_capacity_weight_multiplier',
+ default=-1.0,
+ help='Multiplier used for weighing volume capacity. '
+ 'Negative numbers mean to stack vs spread.'),
]
CONF = cfg.CONF
else:
free = math.floor(host_state.free_capacity_gb * (1 - reserved))
return free
+
+
+class AllocatedCapacityWeigher(weights.BaseHostWeigher):
+ def _weight_multiplier(self):
+ """Override the weight multiplier."""
+ return CONF.allocated_capacity_weight_multiplier
+
+ def _weigh_object(self, host_state, weight_properties):
+ # Higher weights win. We want spreading (choose host with lowest
+ # allocated_capacity first) to be the default.
+ allocated_space = host_state.allocated_capacity_gb
+ return allocated_space
def test_reset_status_as_non_admin(self):
# current status is 'error'
volume = db.volume_create(context.get_admin_context(),
- {'status': 'error'})
+ {'status': 'error', 'size': 1})
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
req.method = 'POST'
req.headers['content-type'] = 'application/json'
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
- volume = db.volume_create(ctx, {'status': 'available'})
+ volume = db.volume_create(ctx, {'status': 'available', 'size': 1})
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
req.method = 'POST'
req.headers['content-type'] = 'application/json'
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
- volume = db.volume_create(ctx, {'status': 'available'})
+ volume = db.volume_create(ctx, {'status': 'available', 'size': 1})
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
req.method = 'POST'
req.headers['content-type'] = 'application/json'
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
- volume = db.volume_create(ctx, {'status': 'available',
+ volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
+ 'provider_location': '', 'size': 1,
'attach_status': 'attached'})
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
req.method = 'POST'
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
- volume = db.volume_create(ctx, {'status': 'available',
+ volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
+ 'provider_location': '', 'size': 1,
'attach_status': 'detached'})
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
req.method = 'POST'
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# snapshot in 'error_deleting'
- volume = db.volume_create(ctx, {})
+ volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
+ 'provider_location': '', 'size': 1})
snapshot = db.snapshot_create(ctx, {'status': 'error_deleting',
'volume_id': volume['id']})
req = webob.Request.blank('/v2/fake/snapshots/%s/action' %
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# snapshot in 'available'
- volume = db.volume_create(ctx, {})
+ volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
+ 'provider_location': '', 'size': 1})
snapshot = db.snapshot_create(ctx, {'status': 'available',
'volume_id': volume['id']})
req = webob.Request.blank('/v2/fake/snapshots/%s/action' %
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# current status is creating
- volume = db.volume_create(ctx, {'status': 'creating'})
+ volume = db.volume_create(ctx, {'size': 1})
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
req.method = 'POST'
req.headers['content-type'] = 'application/json'
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# current status is creating
- volume = db.volume_create(ctx, {'host': 'test'})
+ volume = db.volume_create(ctx, {'host': 'test', 'size': 1})
snapshot = db.snapshot_create(ctx, {'status': 'creating',
'volume_size': 1,
'volume_id': volume['id']})
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
- 'provider_location': ''})
+ 'provider_location': '', 'size': 1})
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
- 'provider_location': ''})
+ 'provider_location': '', 'size': 1})
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
- 'provider_location': ''})
+ 'provider_location': '', 'size': 1})
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
- 'provider_location': ''})
+ 'provider_location': '', 'size': 1})
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
- 'provider_location': ''})
+ 'provider_location': '', 'size': 1})
connector = {}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
- 'provider_location': ''})
+ 'provider_location': '', 'size': 1})
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
values = {'status': 'attaching',
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
- 'provider_location': ''})
+ 'provider_location': '', 'size': 1})
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
values = {'status': 'attaching',
self.service_states = {
'host1': {'total_capacity_gb': 1024,
'free_capacity_gb': 1024,
+ 'allocated_capacity_gb': 0,
'reserved_percentage': 10,
'timestamp': None},
'host2': {'total_capacity_gb': 2048,
'free_capacity_gb': 300,
+ 'allocated_capacity_gb': 1748,
'reserved_percentage': 10,
'timestamp': None},
'host3': {'total_capacity_gb': 512,
- 'free_capacity_gb': 512,
+ 'free_capacity_gb': 256,
+ 'allocated_capacity_gb': 256,
'reserved_percentage': 0,
'timestamp': None},
'host4': {'total_capacity_gb': 2048,
'free_capacity_gb': 200,
+ 'allocated_capacity_gb': 1848,
'reserved_percentage': 5,
'timestamp': None},
}
--- /dev/null
+# Copyright 2013 eBay Inc.
+#
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Tests For Allocated Capacity Weigher.
+"""
+
+import mock
+from oslo.config import cfg
+
+from cinder import context
+from cinder.openstack.common.scheduler.weights import HostWeightHandler
+from cinder.scheduler.weights.capacity import AllocatedCapacityWeigher as ACW
+from cinder import test
+from cinder.tests.scheduler import fakes
+
+CONF = cfg.CONF
+
+
+class AllocatedCapacityWeigherTestCase(test.TestCase):
+ def setUp(self):
+ super(AllocatedCapacityWeigherTestCase, self).setUp()
+ self.host_manager = fakes.FakeHostManager()
+ self.weight_handler = HostWeightHandler('cinder.scheduler.weights')
+
+ def _get_weighed_host(self, hosts, weight_properties=None):
+ if weight_properties is None:
+ weight_properties = {}
+ return self.weight_handler.get_weighed_objects([ACW], hosts,
+ weight_properties)[0]
+
+ @mock.patch('cinder.db.sqlalchemy.api.service_get_all_by_topic')
+ def _get_all_hosts(self, _mock_service_get_all_by_topic):
+ ctxt = context.get_admin_context()
+ fakes.mock_host_manager_db_calls(_mock_service_get_all_by_topic)
+ host_states = self.host_manager.get_all_host_states(ctxt)
+ _mock_service_get_all_by_topic.assert_called_once_with(
+ ctxt, CONF.volume_topic)
+ return host_states
+
+ def test_default_of_spreading_first(self):
+ hostinfo_list = self._get_all_hosts()
+
+ # host1: allocated_capacity_gb=0, weight=0
+ # host2: allocated_capacity_gb=1748, weight=-1748
+ # host3: allocated_capacity_gb=256, weight=-256
+ # host4: allocated_capacity_gb=1848, weight=-1848
+
+ # so, host1 should win:
+ weighed_host = self._get_weighed_host(hostinfo_list)
+ self.assertEqual(weighed_host.weight, 0)
+ self.assertEqual(weighed_host.obj.host, 'host1')
+
+ def test_capacity_weight_multiplier1(self):
+ self.flags(allocated_capacity_weight_multiplier=1.0)
+ hostinfo_list = self._get_all_hosts()
+
+ # host1: allocated_capacity_gb=0, weight=0
+ # host2: allocated_capacity_gb=1748, weight=1748
+ # host3: allocated_capacity_gb=256, weight=256
+ # host4: allocated_capacity_gb=1848, weight=1848
+
+ # so, host4 should win:
+ weighed_host = self._get_weighed_host(hostinfo_list)
+ self.assertEqual(weighed_host.weight, 1848.0)
+ self.assertEqual(weighed_host.obj.host, 'host4')
+
+ def test_capacity_weight_multiplier2(self):
+ self.flags(allocated_capacity_weight_multiplier=-2.0)
+ hostinfo_list = self._get_all_hosts()
+
+ # host1: allocated_capacity_gb=0, weight=0
+ # host2: allocated_capacity_gb=1748, weight=-3496
+ # host3: allocated_capacity_gb=256, weight=-512
+ # host4: allocated_capacity_gb=1848, weight=-3696
+
+ # so, host1 should win:
+ weighed_host = self._get_weighed_host(hostinfo_list)
+ self.assertEqual(weighed_host.weight, 0)
+ self.assertEqual(weighed_host.obj.host, 'host1')
# host1: free_capacity_gb=1024, free=1024*(1-0.1)
# host2: free_capacity_gb=300, free=300*(1-0.1)
- # host3: free_capacity_gb=512, free=512
+ # host3: free_capacity_gb=512, free=256
# host4: free_capacity_gb=200, free=200*(1-0.05)
# so, host1 should win:
# host1: free_capacity_gb=1024, free=-1024*(1-0.1)
# host2: free_capacity_gb=300, free=-300*(1-0.1)
- # host3: free_capacity_gb=512, free=-512
+ # host3: free_capacity_gb=512, free=-256
# host4: free_capacity_gb=200, free=-200*(1-0.05)
# so, host4 should win:
# host1: free_capacity_gb=1024, free=1024*(1-0.1)*2
# host2: free_capacity_gb=300, free=300*(1-0.1)*2
- # host3: free_capacity_gb=512, free=512*2
+ # host3: free_capacity_gb=512, free=256*2
# host4: free_capacity_gb=200, free=200*(1-0.05)*2
# so, host1 should win:
self.volume = importutils.import_object(CONF.volume_manager)
self.volume.driver.set_execute(self._execute_wrapper)
self.volume.driver.set_initialized()
+ self.volume.stats = dict(allocated_capacity_gb=0)
self.stubs.Set(GPFSDriver, '_create_gpfs_snap',
self._fake_gpfs_snap)
super(ManagedRBDTestCase, self).setUp()
fake_image.stub_out_image_service(self.stubs)
self.volume.driver.set_initialized()
+ self.volume.stats = {'allocated_capacity_gb': 0}
self.called = []
def _create_volume_from_image(self, expected_status, raw=False,
self.stubs.Set(brick_lvm.LVM, '_vg_exists', lambda x: True)
self.stubs.Set(os.path, 'exists', lambda x: True)
self.volume.driver.set_initialized()
+ self.volume.stats = {'allocated_capacity_gb': 0}
# keep ordered record of what we execute
self.called = []
self.configuration = Configuration(volume_manager_opts,
config_group=service_name)
self._tp = GreenPool()
+ self.stats = {}
if not volume_driver:
# Get from configuration, which will get the default
LOG.debug(_("Re-exporting %s volumes"), len(volumes))
try:
+ sum = 0
+ self.stats.update({'allocated_capacity_gb': sum})
for volume in volumes:
if volume['status'] in ['available', 'in-use']:
+ # calculate allocated capacity for driver
+ sum += volume['size']
+ self.stats['allocated_capacity_gb'] = sum
self.driver.ensure_export(ctxt, volume)
elif volume['status'] == 'downloading':
LOG.info(_("volume %s stuck in a downloading state"),
# Fetch created volume from storage
volume_ref = flow_engine.storage.fetch('volume')
+ # Update volume stats
+ self.stats['allocated_capacity_gb'] += volume_ref['size']
return volume_ref['id']
@utils.require_driver_initialized
if reservations:
QUOTAS.commit(context, reservations, project_id=project_id)
+ self.stats['allocated_capacity_gb'] -= volume_ref['size']
self.publish_service_capabilities(context)
return True
else:
volume_stats = self.driver.get_volume_stats(refresh=True)
if volume_stats:
- # This will grab info about the host and queue it
- # to be sent to the Schedulers.
+ # Append volume stats with 'allocated_capacity_gb'
+ volume_stats.update(self.stats)
+ # queue it to be sent to the Schedulers.
self.update_service_capabilities(volume_stats)
def publish_service_capabilities(self, context):
QUOTAS.commit(context, reservations)
self.db.volume_update(context, volume['id'], {'size': int(new_size),
'status': 'available'})
+ self.stats['allocated_capacity_gb'] += size_increase
self._notify_about_volume_usage(
context, volume, "resize.end",
extra_usage_info={'size': int(new_size)})
# numbers mean to stack vs spread. (floating point value)
#capacity_weight_multiplier=1.0
+# Multiplier used for weighing volume capacity. Negative
+# numbers mean to stack vs spread. (floating point value)
+#allocated_capacity_weight_multiplier=-1.0
+
#
# Options defined in cinder.transfer.api
JsonFilter = cinder.openstack.common.scheduler.filters.json_filter:JsonFilter
RetryFilter = cinder.scheduler.filters.retry_filter:RetryFilter
cinder.scheduler.weights =
+ AllocatedCapacityWeigher = cinder.scheduler.weights.capacity:AllocatedCapacityWeigher
CapacityWeigher = cinder.scheduler.weights.capacity:CapacityWeigher
ChanceWeigher = cinder.scheduler.weights.chance:ChanceWeigher