]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Implement filter scheduler
authorZhiteng Huang <zhiteng.huang@intel.com>
Fri, 5 Oct 2012 16:08:09 +0000 (00:08 +0800)
committerZhiteng Huang <zhiteng.huang@intel.com>
Thu, 10 Jan 2013 03:22:56 +0000 (11:22 +0800)
In order to do more sophisticated scheduling (e.g. schedule based on volume
type), filter scheduler is introduced. Several changes are made to make this
possible, some of them are similar to the counterpart in Nova:

- add HostState class to host_manager in order to store volume capabilities
- implement get_volume_stats() method of iSCSIDriver as an example to
demonstrate how volume backend driver reports capabilities as well as status
- add scheduler_options.py and 'scheduler_json_config_location' flag to be
allow loading json configuration file for scheduler at run time
- port common filters/weights from oslo
- add capacity weigher (scheduler/weights/capacity.py) for picking up
target volume backend by weighing free capacity of the host. The default
behavior is to spread volumes across hosts; by changing the
'capacity_weight_multiplier' to negative number, volume placing behavior will
become stacking.
- add capacity filter which filters those hosts have insufficient storage space
to serve the request.
- add 'reserved_percentage' config option to indicate how much space is
reserved. Capacity reservation is needed when volume resize is enabled.
- add 'publish_service_capabilities()' method to volume RPC API to allow
scheduler to have volume service report capabilities. This bumps volume RPC
API to version 1.2
- remove 'volume_force_update_capabilities' config option, volume status will be
report to scheduler in every checking.

The implication of this change to storage/backend driver developer:
- implementation of get_volume_stats() of the driver is now a *MUST*, filter
scheduler heavily relies on the status/capabilities reported by backend driver
to makeplacement decision.  To ensure Cinder works seamlessly on the storage
system, driver should at least report following capabilities/status:
----------------------+---------------------------+---------------------------
  Capability/Status   |      Description          |         Example
----------------------+---------------------------+---------------------------
 'volume_backend_name'| back-end name, string     | 'Example_Storage_Backend'
----------------------+---------------------------+---------------------------
  'vendor_name'       | vendor name, string       | 'OpenStackCinder'
----------------------+---------------------------+---------------------------
  'driver_version'    | version, string           |  '1.0a'
----------------------+---------------------------+---------------------------
  'storage_protocol'  | supported protocols,      | 'iSCSI', 'RBD', 'FC', 'NFS'
                      | string or list of strings | ['iSCSI', 'NFS', 'FC']
----------------------+---------------------------+---------------------------
  'total_capacity_gb' | capacity in GB, integer   |  102400
----------------------+---------------------------+---------------------------
  'free_capacity_gb'  | available capacity in GB, |  1000
                      | integer                   |
----------------------+---------------------------+---------------------------
'reserved_percentage' | reserved space in         |  0, 10
                      | percentage, integer       |
----------------------+---------------------------+---------------------------

The implication of this change to Cinder administrator:
- the default setting for filter scheduler should work well with the benefits
of:
  * being able to fully utilize capacity of backends (driver now has to report
  actul total space and space utilization and scheduler uses these info) not
  limited by the 'max_gigabytes' config option any more;
  * being able to choose placement policy between spreading & stacking for
  volume creation (by modifying the 'capacity_weight_multiplier' in
  CapacityWeigher)
- with filter scheduler, Cinder is now able to utilize the advanced features/
capabilities provided by different storage back-ends via: defining different
volume types with proper extra_specs. Volume types can be considered as sets
of back-end capabilities requirement.
 For example, a volume type which has 'storage_protocol':'FC' key/value pair
definition in its extra_spec can only be served by those back-ends who report
they support FiberChannel protocol. Another example is volume type has 'QoS'
requirement can only be served by back-ends support QoS.

Note/TODO:
* Currently scheduler makes its decision based on the status and capabilities
information reported by volume nodes, and these information is stored in memory
of scheduler process. More sophisticated way may be add on table in DB to
record status/capabilities of all volume nodes, like Nova does for compute nodes.

implement bp volume-type-scheduler

DocImpact

Change-Id: I296b3727db8de0d4cf085fac602d122a7b474842

30 files changed:
cinder/exception.py
cinder/openstack/common/scheduler/__init__.py [new file with mode: 0644]
cinder/openstack/common/scheduler/filter.py [new file with mode: 0644]
cinder/openstack/common/scheduler/filters/__init__.py [new file with mode: 0644]
cinder/openstack/common/scheduler/filters/availability_zone_filter.py [new file with mode: 0644]
cinder/openstack/common/scheduler/filters/capabilities_filter.py [new file with mode: 0644]
cinder/openstack/common/scheduler/filters/extra_specs_ops.py [new file with mode: 0644]
cinder/openstack/common/scheduler/filters/json_filter.py [new file with mode: 0644]
cinder/openstack/common/scheduler/weight.py [new file with mode: 0644]
cinder/openstack/common/scheduler/weights/__init__.py [new file with mode: 0644]
cinder/scheduler/filter_scheduler.py [new file with mode: 0644]
cinder/scheduler/filters/__init__.py [new file with mode: 0644]
cinder/scheduler/filters/capacity_filter.py [new file with mode: 0644]
cinder/scheduler/host_manager.py
cinder/scheduler/manager.py
cinder/scheduler/scheduler_options.py [new file with mode: 0644]
cinder/scheduler/weights/__init__.py [new file with mode: 0644]
cinder/scheduler/weights/capacity.py [new file with mode: 0644]
cinder/tests/scheduler/fakes.py
cinder/tests/scheduler/test_capacity_weigher.py [new file with mode: 0644]
cinder/tests/scheduler/test_filter_scheduler.py [new file with mode: 0644]
cinder/tests/scheduler/test_host_filters.py [new file with mode: 0644]
cinder/tests/scheduler/test_host_manager.py [new file with mode: 0644]
cinder/tests/scheduler/test_scheduler_options.py [new file with mode: 0644]
cinder/volume/driver.py
cinder/volume/manager.py
cinder/volume/rpcapi.py
openstack-common.conf
setup.py
tools/pip-requires

index 7cb9bd9931bd6ce64a3cdf4fb8099c0e2b287dd5..3abed63d81a30b9e930fde6cc13ad7bf4cff9277 100644 (file)
@@ -314,6 +314,14 @@ class HostNotFound(NotFound):
     message = _("Host %(host)s could not be found.")
 
 
+class SchedulerHostFilterNotFound(NotFound):
+    message = _("Scheduler Host Filter %(filter_name)s could not be found.")
+
+
+class SchedulerHostWeigherNotFound(NotFound):
+    message = _("Scheduler Host Weigher %(weigher_name)s could not be found.")
+
+
 class HostBinaryNotFound(NotFound):
     message = _("Could not find binary %(binary)s on host %(host)s.")
 
diff --git a/cinder/openstack/common/scheduler/__init__.py b/cinder/openstack/common/scheduler/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/cinder/openstack/common/scheduler/filter.py b/cinder/openstack/common/scheduler/filter.py
new file mode 100644 (file)
index 0000000..0bdb10d
--- /dev/null
@@ -0,0 +1,71 @@
+# Copyright (c) 2011-2012 OpenStack, LLC.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+Filter support
+"""
+
+import inspect
+
+from stevedore import extension
+
+
+class BaseFilter(object):
+    """Base class for all filter classes."""
+    def _filter_one(self, obj, filter_properties):
+        """Return True if it passes the filter, False otherwise.
+        Override this in a subclass.
+        """
+        return True
+
+    def filter_all(self, filter_obj_list, filter_properties):
+        """Yield objects that pass the filter.
+
+        Can be overriden in a subclass, if you need to base filtering
+        decisions on all objects.  Otherwise, one can just override
+        _filter_one() to filter a single object.
+        """
+        for obj in filter_obj_list:
+            if self._filter_one(obj, filter_properties):
+                yield obj
+
+
+class BaseFilterHandler(object):
+    """ Base class to handle loading filter classes.
+
+    This class should be subclassed where one needs to use filters.
+    """
+    def __init__(self, filter_class_type, filter_namespace):
+        self.namespace = filter_namespace
+        self.filter_class_type = filter_class_type
+        self.filter_manager = extension.ExtensionManager(filter_namespace)
+
+    def _is_correct_class(self, obj):
+        """Return whether an object is a class of the correct type and
+        is not prefixed with an underscore.
+        """
+        return (inspect.isclass(obj) and
+                not obj.__name__.startswith('_') and
+                issubclass(obj, self.filter_class_type))
+
+    def get_all_classes(self):
+        return [x.plugin for x in self.filter_manager
+                if self._is_correct_class(x.plugin)]
+
+    def get_filtered_objects(self, filter_classes, objs,
+                             filter_properties):
+        for filter_cls in filter_classes:
+            objs = filter_cls().filter_all(objs, filter_properties)
+        return list(objs)
diff --git a/cinder/openstack/common/scheduler/filters/__init__.py b/cinder/openstack/common/scheduler/filters/__init__.py
new file mode 100644 (file)
index 0000000..e47f835
--- /dev/null
@@ -0,0 +1,41 @@
+# Copyright (c) 2011 OpenStack, LLC.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+Scheduler host filters
+"""
+
+from cinder.openstack.common import log as logging
+from cinder.openstack.common.scheduler import filter
+
+LOG = logging.getLogger(__name__)
+
+
+class BaseHostFilter(filter.BaseFilter):
+    """Base class for host filters."""
+    def _filter_one(self, obj, filter_properties):
+        """Return True if the object passes the filter, otherwise False."""
+        return self.host_passes(obj, filter_properties)
+
+    def host_passes(self, host_state, filter_properties):
+        """Return True if the HostState passes the filter, otherwise False.
+        Override this in a subclass.
+        """
+        raise NotImplementedError()
+
+
+class HostFilterHandler(filter.BaseFilterHandler):
+    def __init__(self, namespace):
+        super(HostFilterHandler, self).__init__(BaseHostFilter, namespace)
diff --git a/cinder/openstack/common/scheduler/filters/availability_zone_filter.py b/cinder/openstack/common/scheduler/filters/availability_zone_filter.py
new file mode 100644 (file)
index 0000000..0be4bd1
--- /dev/null
@@ -0,0 +1,30 @@
+# Copyright (c) 2011-2012 OpenStack, LLC.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+
+from cinder.openstack.common.scheduler import filters
+
+
+class AvailabilityZoneFilter(filters.BaseHostFilter):
+    """Filters Hosts by availability zone."""
+
+    def host_passes(self, host_state, filter_properties):
+        spec = filter_properties.get('request_spec', {})
+        props = spec.get('resource_properties', [])
+        availability_zone = props.get('availability_zone')
+
+        if availability_zone:
+            return availability_zone == host_state.service['availability_zone']
+        return True
diff --git a/cinder/openstack/common/scheduler/filters/capabilities_filter.py b/cinder/openstack/common/scheduler/filters/capabilities_filter.py
new file mode 100644 (file)
index 0000000..cd84460
--- /dev/null
@@ -0,0 +1,63 @@
+# Copyright (c) 2011 OpenStack, LLC.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from cinder.openstack.common import log as logging
+from cinder.openstack.common.scheduler import filters
+from cinder.openstack.common.scheduler.filters import extra_specs_ops
+
+
+LOG = logging.getLogger(__name__)
+
+
+class CapabilitiesFilter(filters.BaseHostFilter):
+    """HostFilter to work with resource (instance & volume) type records."""
+
+    def _satisfies_extra_specs(self, capabilities, resource_type):
+        """Check that the capabilities provided by the services
+        satisfy the extra specs associated with the instance type"""
+        extra_specs = resource_type.get('extra_specs', [])
+        if not extra_specs:
+            return True
+
+        for key, req in extra_specs.iteritems():
+            # Either not scope format, or in capabilities scope
+            scope = key.split(':')
+            if len(scope) > 1 and scope[0] != "capabilities":
+                continue
+            elif scope[0] == "capabilities":
+                del scope[0]
+
+            cap = capabilities
+            for index in range(0, len(scope)):
+                try:
+                    cap = cap.get(scope[index], None)
+                except AttributeError:
+                    return False
+                if cap is None:
+                    return False
+            if not extra_specs_ops.match(cap, req):
+                return False
+        return True
+
+    def host_passes(self, host_state, filter_properties):
+        """Return a list of hosts that can create instance_type."""
+        # Note(zhiteng) Currently only Cinder and Nova are using
+        # this filter, so the resource type is either instance or
+        # volume.
+        resource_type = filter_properties.get('resource_type')
+        if not self._satisfies_extra_specs(host_state.capabilities,
+                                           resource_type):
+            return False
+        return True
diff --git a/cinder/openstack/common/scheduler/filters/extra_specs_ops.py b/cinder/openstack/common/scheduler/filters/extra_specs_ops.py
new file mode 100644 (file)
index 0000000..f4d4ff4
--- /dev/null
@@ -0,0 +1,68 @@
+# Copyright (c) 2011 OpenStack, LLC.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import operator
+
+# 1. The following operations are supported:
+#   =, s==, s!=, s>=, s>, s<=, s<, <in>, <or>, ==, !=, >=, <=
+# 2. Note that <or> is handled in a different way below.
+# 3. If the first word in the extra_specs is not one of the operators,
+#   it is ignored.
+_op_methods = {'=': lambda x, y: float(x) >= float(y),
+               '<in>': lambda x, y: y in x,
+               '==': lambda x, y: float(x) == float(y),
+               '!=': lambda x, y: float(x) != float(y),
+               '>=': lambda x, y: float(x) >= float(y),
+               '<=': lambda x, y: float(x) <= float(y),
+               's==': operator.eq,
+               's!=': operator.ne,
+               's<': operator.lt,
+               's<=': operator.le,
+               's>': operator.gt,
+               's>=': operator.ge}
+
+
+def match(value, req):
+    words = req.split()
+
+    op = method = None
+    if words:
+        op = words.pop(0)
+        method = _op_methods.get(op)
+
+    if op != '<or>' and not method:
+        return value == req
+
+    if value is None:
+        return False
+
+    if op == '<or>':  # Ex: <or> v1 <or> v2 <or> v3
+        while True:
+            if words.pop(0) == value:
+                return True
+            if not words:
+                break
+            op = words.pop(0)  # remove a keyword <or>
+            if not words:
+                break
+        return False
+
+    try:
+        if words and method(value, words[0]):
+            return True
+    except ValueError:
+        pass
+
+    return False
diff --git a/cinder/openstack/common/scheduler/filters/json_filter.py b/cinder/openstack/common/scheduler/filters/json_filter.py
new file mode 100644 (file)
index 0000000..0e3bb47
--- /dev/null
@@ -0,0 +1,150 @@
+# Copyright (c) 2011 OpenStack, LLC.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+
+import operator
+
+from cinder.openstack.common import jsonutils
+from cinder.openstack.common.scheduler import filters
+
+
+class JsonFilter(filters.BaseHostFilter):
+    """Host Filter to allow simple JSON-based grammar for
+    selecting hosts.
+    """
+    def _op_compare(self, args, op):
+        """Returns True if the specified operator can successfully
+        compare the first item in the args with all the rest. Will
+        return False if only one item is in the list.
+        """
+        if len(args) < 2:
+            return False
+        if op is operator.contains:
+            bad = not args[0] in args[1:]
+        else:
+            bad = [arg for arg in args[1:]
+                   if not op(args[0], arg)]
+        return not bool(bad)
+
+    def _equals(self, args):
+        """First term is == all the other terms."""
+        return self._op_compare(args, operator.eq)
+
+    def _less_than(self, args):
+        """First term is < all the other terms."""
+        return self._op_compare(args, operator.lt)
+
+    def _greater_than(self, args):
+        """First term is > all the other terms."""
+        return self._op_compare(args, operator.gt)
+
+    def _in(self, args):
+        """First term is in set of remaining terms"""
+        return self._op_compare(args, operator.contains)
+
+    def _less_than_equal(self, args):
+        """First term is <= all the other terms."""
+        return self._op_compare(args, operator.le)
+
+    def _greater_than_equal(self, args):
+        """First term is >= all the other terms."""
+        return self._op_compare(args, operator.ge)
+
+    def _not(self, args):
+        """Flip each of the arguments."""
+        return [not arg for arg in args]
+
+    def _or(self, args):
+        """True if any arg is True."""
+        return any(args)
+
+    def _and(self, args):
+        """True if all args are True."""
+        return all(args)
+
+    commands = {
+        '=': _equals,
+        '<': _less_than,
+        '>': _greater_than,
+        'in': _in,
+        '<=': _less_than_equal,
+        '>=': _greater_than_equal,
+        'not': _not,
+        'or': _or,
+        'and': _and,
+    }
+
+    def _parse_string(self, string, host_state):
+        """Strings prefixed with $ are capability lookups in the
+        form '$variable' where 'variable' is an attribute in the
+        HostState class.  If $variable is a dictionary, you may
+        use: $variable.dictkey
+        """
+        if not string:
+            return None
+        if not string.startswith("$"):
+            return string
+
+        path = string[1:].split(".")
+        obj = getattr(host_state, path[0], None)
+        if obj is None:
+            return None
+        for item in path[1:]:
+            obj = obj.get(item, None)
+            if obj is None:
+                return None
+        return obj
+
+    def _process_filter(self, query, host_state):
+        """Recursively parse the query structure."""
+        if not query:
+            return True
+        cmd = query[0]
+        method = self.commands[cmd]
+        cooked_args = []
+        for arg in query[1:]:
+            if isinstance(arg, list):
+                arg = self._process_filter(arg, host_state)
+            elif isinstance(arg, basestring):
+                arg = self._parse_string(arg, host_state)
+            if arg is not None:
+                cooked_args.append(arg)
+        result = method(self, cooked_args)
+        return result
+
+    def host_passes(self, host_state, filter_properties):
+        """Return a list of hosts that can fulfill the requirements
+        specified in the query.
+        """
+        # TODO(zhiteng) Add description for filter_properties structure
+        # and scheduler_hints.
+        try:
+            query = filter_properties['scheduler_hints']['query']
+        except KeyError:
+            query = None
+        if not query:
+            return True
+
+        # NOTE(comstud): Not checking capabilities or service for
+        # enabled/disabled so that a provided json filter can decide
+
+        result = self._process_filter(jsonutils.loads(query), host_state)
+        if isinstance(result, list):
+            # If any succeeded, include the host
+            result = any(result)
+        if result:
+            # Filter it out.
+            return True
+        return False
diff --git a/cinder/openstack/common/scheduler/weight.py b/cinder/openstack/common/scheduler/weight.py
new file mode 100644 (file)
index 0000000..c5df9da
--- /dev/null
@@ -0,0 +1,91 @@
+# Copyright (c) 2011-2012 OpenStack, LLC.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+Pluggable Weighing support
+"""
+
+import inspect
+
+from stevedore import extension
+
+
+class WeighedObject(object):
+    """Object with weight information."""
+    def __init__(self, obj, weight):
+        self.obj = obj
+        self.weight = weight
+
+    def __repr__(self):
+        return "<WeighedObject '%s': %s>" % (self.obj, self.weight)
+
+
+class BaseWeigher(object):
+    """Base class for pluggable weighers."""
+    def _weight_multiplier(self):
+        """How weighted this weigher should be.  Normally this would
+        be overriden in a subclass based on a config value.
+        """
+        return 1.0
+
+    def _weigh_object(self, obj, weight_properties):
+        """Override in a subclass to specify a weight for a specific
+        object.
+        """
+        return 0.0
+
+    def weigh_objects(self, weighed_obj_list, weight_properties):
+        """Weigh multiple objects.  Override in a subclass if you need
+        need access to all objects in order to manipulate weights.
+        """
+        constant = self._weight_multiplier()
+        for obj in weighed_obj_list:
+            obj.weight += (constant *
+                           self._weigh_object(obj.obj, weight_properties))
+
+
+class BaseWeightHandler(object):
+    object_class = WeighedObject
+
+    def __init__(self, weighed_object_type, weight_namespace):
+        self.namespace = weight_namespace
+        self.weighed_object_type = weighed_object_type
+        self.weight_manager = extension.ExtensionManager(weight_namespace)
+
+    def _is_correct_class(self, obj):
+        """Return whether an object is a class of the correct type and
+        is not prefixed with an underscore.
+        """
+        return (inspect.isclass(obj) and
+                not obj.__name__.startswith('_') and
+                issubclass(obj, self.weighed_object_type))
+
+    def get_all_classes(self):
+        return [x.plugin for x in self.weight_manager
+                if self._is_correct_class(x.plugin)]
+
+    def get_weighed_objects(self, weigher_classes, obj_list,
+                            weighing_properties):
+        """Return a sorted (highest score first) list of WeighedObjects."""
+
+        if not obj_list:
+            return []
+
+        weighed_objs = [self.object_class(obj, 0.0) for obj in obj_list]
+        for weigher_cls in weigher_classes:
+            weigher = weigher_cls()
+            weigher.weigh_objects(weighed_objs, weighing_properties)
+
+        return sorted(weighed_objs, key=lambda x: x.weight, reverse=True)
diff --git a/cinder/openstack/common/scheduler/weights/__init__.py b/cinder/openstack/common/scheduler/weights/__init__.py
new file mode 100644 (file)
index 0000000..dd4b332
--- /dev/null
@@ -0,0 +1,45 @@
+# Copyright (c) 2011 OpenStack, LLC.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+Scheduler host weights
+"""
+
+
+from cinder.openstack.common.scheduler import weight
+
+
+class WeighedHost(weight.WeighedObject):
+    def to_dict(self):
+        return {
+            'weight': self.weight,
+            'host': self.obj.host,
+        }
+
+    def __repr__(self):
+        return ("WeighedHost [host: %s, weight: %s]" %
+                (self.obj.host, self.weight))
+
+
+class BaseHostWeigher(weight.BaseWeigher):
+    """Base class for host weights."""
+    pass
+
+
+class HostWeightHandler(weight.BaseWeightHandler):
+    object_class = WeighedHost
+
+    def __init__(self, namespace):
+        super(HostWeightHandler, self).__init__(BaseHostWeigher, namespace)
diff --git a/cinder/scheduler/filter_scheduler.py b/cinder/scheduler/filter_scheduler.py
new file mode 100644 (file)
index 0000000..ea8dc4e
--- /dev/null
@@ -0,0 +1,130 @@
+# Copyright (c) 2011 Intel Corporation
+# Copyright (c) 2011 OpenStack, LLC.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+The FilterScheduler is for creating volumes.
+You can customize this scheduler by specifying your own volume Filters and
+Weighing Functions.
+"""
+
+import operator
+
+from cinder import exception
+from cinder import flags
+from cinder.openstack.common import importutils
+from cinder.openstack.common import log as logging
+from cinder.scheduler import driver
+from cinder.scheduler import scheduler_options
+
+
+FLAGS = flags.FLAGS
+LOG = logging.getLogger(__name__)
+
+
+class FilterScheduler(driver.Scheduler):
+    """Scheduler that can be used for filtering and weighing."""
+    def __init__(self, *args, **kwargs):
+        super(FilterScheduler, self).__init__(*args, **kwargs)
+        self.cost_function_cache = None
+        self.options = scheduler_options.SchedulerOptions()
+
+    def schedule(self, context, topic, method, *args, **kwargs):
+        """The schedule() contract requires we return the one
+        best-suited host for this request.
+        """
+        self._schedule(context, topic, *args, **kwargs)
+
+    def _get_configuration_options(self):
+        """Fetch options dictionary. Broken out for testing."""
+        return self.options.get_configuration()
+
+    def populate_filter_properties(self, request_spec, filter_properties):
+        """Stuff things into filter_properties.  Can be overridden in a
+        subclass to add more data.
+        """
+        vol = request_spec['volume_properties']
+        filter_properties['size'] = vol['size']
+        filter_properties['availability_zone'] = vol.get('availability_zone')
+        filter_properties['user_id'] = vol.get('user_id')
+        filter_properties['metadata'] = vol.get('metadata')
+
+    def schedule_create_volume(self, context, request_spec, filter_properties):
+        weighed_host = self._schedule(context, request_spec,
+                                      filter_properties)
+
+        if not weighed_host:
+            raise exception.NoValidHost(reason="")
+
+        host = weighed_host.obj.host
+        volume_id = request_spec['volume_id']
+        snapshot_id = request_spec['snapshot_id']
+        image_id = request_spec['image_id']
+
+        updated_volume = driver.volume_update_db(context, volume_id, host)
+        self.volume_rpcapi.create_volume(context, updated_volume, host,
+                                         snapshot_id, image_id)
+
+    def _schedule(self, context, request_spec, filter_properties=None):
+        """Returns a list of hosts that meet the required specs,
+        ordered by their fitness.
+        """
+        elevated = context.elevated()
+
+        volume_properties = request_spec['volume_properties']
+        # Since Nova is using mixed filters from Oslo and it's own, which
+        # takes 'resource_XX' and 'instance_XX' as input respectively, copying
+        # 'instance_XX' to 'resource_XX' will make both filters happy.
+        resource_properties = volume_properties.copy()
+        volume_type = request_spec.get("volume_type", None)
+        resource_type = request_spec.get("volume_type", None)
+        request_spec.update({'resource_properties': resource_properties})
+
+        config_options = self._get_configuration_options()
+
+        if filter_properties is None:
+            filter_properties = {}
+        filter_properties.update({'context': context,
+                                  'request_spec': request_spec,
+                                  'config_options': config_options,
+                                  'volume_type': volume_type,
+                                  'resource_type': resource_type})
+
+        self.populate_filter_properties(request_spec,
+                                        filter_properties)
+
+        # Find our local list of acceptable hosts by filtering and
+        # weighing our options. we virtually consume resources on
+        # it so subsequent selections can adjust accordingly.
+
+        # Note: remember, we are using an iterator here. So only
+        # traverse this list once.
+        hosts = self.host_manager.get_all_host_states(elevated)
+
+        # Filter local hosts based on requirements ...
+        hosts = self.host_manager.get_filtered_hosts(hosts,
+                                                     filter_properties)
+        if not hosts:
+            return None
+
+        LOG.debug(_("Filtered %(hosts)s") % locals())
+        # weighted_host = WeightedHost() ... the best
+        # host for the job.
+        weighed_hosts = self.host_manager.get_weighed_hosts(hosts,
+                                                            filter_properties)
+        best_host = weighed_hosts[0]
+        LOG.debug(_("Choosing %(best_host)s") % locals())
+        best_host.obj.consume_from_volume(volume_properties)
+        return best_host
diff --git a/cinder/scheduler/filters/__init__.py b/cinder/scheduler/filters/__init__.py
new file mode 100644 (file)
index 0000000..ce4951d
--- /dev/null
@@ -0,0 +1,14 @@
+# Copyright (c) 2013 OpenStack, LLC.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
diff --git a/cinder/scheduler/filters/capacity_filter.py b/cinder/scheduler/filters/capacity_filter.py
new file mode 100644 (file)
index 0000000..edd5024
--- /dev/null
@@ -0,0 +1,44 @@
+# Copyright (c) 2012 Intel
+# Copyright (c) 2012 OpenStack, LLC.
+#
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+
+import math
+
+from cinder.openstack.common import log as logging
+from cinder.openstack.common.scheduler import filters
+
+
+LOG = logging.getLogger(__name__)
+
+
+class CapacityFilter(filters.BaseHostFilter):
+    """CapacityFilter filters based on volume host's capacity utilization."""
+
+    def host_passes(self, host_state, filter_properties):
+        """Return True if host has sufficient capacity."""
+        volume_size = filter_properties.get('size')
+
+        if not host_state.free_capacity_gb:
+            # Fail Safe
+            LOG.warning(_("Free capacity not set;"
+                          "volume node info collection broken."))
+            return False
+
+        reserved = float(host_state.reserved_percentage) / 100
+        free = math.floor(host_state.free_capacity_gb * (1 - reserved))
+
+        return free >= volume_size
index 99632bc86910de27b2ab6ee79675b2492ec82ece..6a76136c9318cc3423577201d9e32e92e25ed3a4 100644 (file)
 Manage hosts in the current zone.
 """
 
-# FIXME(ja): this code was written only for compute. re-implement for volumes
+import UserDict
+
+from cinder import db
+from cinder import exception
+from cinder import flags
+from cinder.openstack.common import cfg
+from cinder.openstack.common import log as logging
+from cinder.openstack.common.scheduler import filters
+from cinder.openstack.common.scheduler import weights
+from cinder.openstack.common import timeutils
+from cinder import utils
+
+
+host_manager_opts = [
+    cfg.ListOpt('scheduler_default_filters',
+                default=[
+                    'AvailabilityZoneFilter',
+                    'CapacityFilter',
+                    'CapabilitiesFilter'
+                ],
+                help='Which filter class names to use for filtering hosts '
+                     'when not specified in the request.'),
+    cfg.ListOpt('scheduler_default_weighers',
+                default=[
+                    'CapacityWeigher'
+                ],
+                help='Which weigher class names to use for weighing hosts.')
+]
+
+FLAGS = flags.FLAGS
+FLAGS.register_opts(host_manager_opts)
+
+LOG = logging.getLogger(__name__)
+
+
+class ReadOnlyDict(UserDict.IterableUserDict):
+    """A read-only dict."""
+    def __init__(self, source=None):
+        self.data = {}
+        self.update(source)
+
+    def __setitem__(self, key, item):
+        raise TypeError
+
+    def __delitem__(self, key):
+        raise TypeError
+
+    def clear(self):
+        raise TypeError
+
+    def pop(self, key, *args):
+        raise TypeError
+
+    def popitem(self):
+        raise TypeError
+
+    def update(self, source=None):
+        if source is None:
+            return
+        elif isinstance(source, UserDict.UserDict):
+            self.data = source.data
+        elif isinstance(source, type({})):
+            self.data = source
+        else:
+            raise TypeError
 
 
 class HostState(object):
-    pass
+    """Mutable and immutable information tracked for a host."""
+
+    def __init__(self, host, capabilities=None, service=None):
+        self.host = host
+        self.update_capabilities(capabilities, service)
+
+        self.volume_backend_name = None
+        self.vendor_name = None
+        self.driver_version = 0
+        self.storage_protocol = None
+        self.QoS_support = False
+        # Mutable available resources.
+        # These will change as resources are virtually "consumed".
+        self.total_capacity_gb = 0
+        self.free_capacity_gb = 0
+        self.reserved_percentage = 0
+
+        self.updated = None
+
+    def update_capabilities(self, capabilities=None, service=None):
+        # Read-only capability dicts
+
+        if capabilities is None:
+            capabilities = {}
+        self.capabilities = ReadOnlyDict(capabilities)
+        if service is None:
+            service = {}
+        self.service = ReadOnlyDict(service)
+
+    def update_from_volume_capability(self, capability):
+        """Update information about a host from its volume_node info."""
+        if self.updated and self.updated > capability['timestamp']:
+            return
+
+        if capability:
+            self.volume_backend = capability.get('volume_backend_name', None)
+            self.vendor_name = capability.get('vendor_name', None)
+            self.driver_version = capability.get('driver_version', None)
+            self.storage_protocol = capability.get('storage_protocol', None)
+            self.QoS_support = capability.get('QoS_support', False)
+
+            self.total_capacity_gb = capability['total_capacity_gb']
+            self.free_capacity_gb = capability['free_capacity_gb']
+            self.reserved_percentage = capability['reserved_percentage']
+
+            self.updated = capability['timestamp']
+
+    def consume_from_volume(self, volume):
+        """Incrementally update host state from an volume"""
+        volume_gb = volume['size']
+        self.free_capacity_gb -= volume_gb
+        self.updated = timeutils.utcnow()
+
+    def __repr__(self):
+        return ("host '%s': free_capacity_gb: %s" %
+                (self.host, self.free_capacity_gb))
 
 
 class HostManager(object):
+    """Base HostManager class."""
+
+    host_state_cls = HostState
+
+    def __init__(self):
+        self.service_states = {}  # { <host>: {<service>: {cap k : v}}}
+        self.host_state_map = {}
+        self.filter_handler = filters.HostFilterHandler('cinder.scheduler.'
+                                                        'filters')
+        self.filter_classes = self.filter_handler.get_all_classes()
+        self.weight_handler = weights.HostWeightHandler('cinder.scheduler.'
+                                                        'weights')
+        self.weight_classes = self.weight_handler.get_all_classes()
+
+    def _choose_host_filters(self, filter_cls_names):
+        """Since the caller may specify which filters to use we need
+        to have an authoritative list of what is permissible. This
+        function checks the filter names against a predefined set
+        of acceptable filters.
+        """
+        if filter_cls_names is None:
+            filter_cls_names = FLAGS.scheduler_default_filters
+        if not isinstance(filter_cls_names, (list, tuple)):
+            filter_cls_names = [filter_cls_names]
+        good_filters = []
+        bad_filters = []
+        for filter_name in filter_cls_names:
+            found_class = False
+            for cls in self.filter_classes:
+                if cls.__name__ == filter_name:
+                    found_class = True
+                    good_filters.append(cls)
+                    break
+            if not found_class:
+                bad_filters.append(filter_name)
+        if bad_filters:
+            msg = ", ".join(bad_filters)
+            raise exception.SchedulerHostFilterNotFound(filter_name=msg)
+        return good_filters
+
+    def _choose_host_weighers(self, weight_cls_names):
+        """Since the caller may specify which weighers to use, we need
+        to have an authoritative list of what is permissible. This
+        function checks the weigher names against a predefined set
+        of acceptable weighers.
+        """
+        if weight_cls_names is None:
+            weight_cls_names = FLAGS.scheduler_default_weighers
+        if not isinstance(weight_cls_names, (list, tuple)):
+            weight_cls_names = [weight_cls_names]
+
+        good_weighers = []
+        bad_weighers = []
+        for weigher_name in weight_cls_names:
+            found_class = False
+            for cls in self.weight_classes:
+                if cls.__name__ == weigher_name:
+                    good_weighers.append(cls)
+                    found_class = True
+                    break
+            if not found_class:
+                bad_weighers.append(weigher_name)
+        if bad_weighers:
+            msg = ", ".join(bad_weighers)
+            raise exception.SchedulerHostWeigherNotFound(weigher_name=msg)
+        return good_weighers
+
+    def get_filtered_hosts(self, hosts, filter_properties,
+                           filter_class_names=None):
+        """Filter hosts and return only ones passing all filters"""
+        filter_classes = self._choose_host_filters(filter_class_names)
+        return self.filter_handler.get_filtered_objects(filter_classes,
+                                                        hosts,
+                                                        filter_properties)
+
+    def get_weighed_hosts(self, hosts, weight_properties,
+                          weigher_class_names=None):
+        """Weigh the hosts"""
+        weigher_classes = self._choose_host_weighers(weigher_class_names)
+        return self.weight_handler.get_weighed_objects(weigher_classes,
+                                                       hosts,
+                                                       weight_properties)
+
+    def update_service_capabilities(self, service_name, host, capabilities):
+        """Update the per-service capabilities based on this notification."""
+        if service_name != 'volume':
+            LOG.debug(_('Ignoring %(service_name)s service update '
+                        'from %(host)s'), locals())
+            return
+
+        LOG.debug(_("Received %(service_name)s service update from "
+                    "%(host)s.") % locals())
+
+        # Copy the capabilities, so we don't modify the original dict
+        capab_copy = dict(capabilities)
+        capab_copy["timestamp"] = timeutils.utcnow()  # Reported time
+        self.service_states[host] = capab_copy
+
+    def get_all_host_states(self, context):
+        """Returns a dict of all the hosts the HostManager
+          knows about. Also, each of the consumable resources in HostState
+          are pre-populated and adjusted based on data in the db.
 
-    def get_host_list(self, *args):
-        pass
+          For example:
+          {'192.168.1.100': HostState(), ...}
+        """
 
-    def update_service_capabilities(self, *args):
-        pass
+        # Get resource usage across the available volume nodes:
+        topic = FLAGS.volume_topic
+        volume_services = db.service_get_all_by_topic(context, topic)
+        for service in volume_services:
+            if not utils.service_is_up(service) or service['disabled']:
+                LOG.warn(_("service is down or disabled."))
+                continue
+            host = service['host']
+            capabilities = self.service_states.get(host, None)
+            host_state = self.host_state_map.get(host)
+            if host_state:
+                # copy capabilities to host_state.capabilities
+                host_state.update_capabilities(capabilities,
+                                               dict(service.iteritems()))
+            else:
+                host_state = self.host_state_cls(host,
+                                                 capabilities=capabilities,
+                                                 service=
+                                                 dict(service.iteritems()))
+                self.host_state_map[host] = host_state
+            # update host_state
+            host_state.update_from_volume_capability(capabilities)
 
-    def get_service_capabilities(self, *args):
-        pass
+        return self.host_state_map.itervalues()
index a423e8dc719f14854ef99ed0fef17dbf925d3202..f3f170dc672ff0fa1c60a53070e34835b3082960 100644 (file)
@@ -21,8 +21,7 @@
 Scheduler Service
 """
 
-import functools
-
+from cinder import context
 from cinder import db
 from cinder import exception
 from cinder import flags
@@ -32,14 +31,15 @@ from cinder.openstack.common import excutils
 from cinder.openstack.common import importutils
 from cinder.openstack.common import log as logging
 from cinder.openstack.common.notifier import api as notifier
+from cinder.volume import rpcapi as volume_rpcapi
 
 
 LOG = logging.getLogger(__name__)
 
-scheduler_driver_opt = cfg.StrOpt(
-    'scheduler_driver',
-    default='cinder.scheduler.simple.SimpleScheduler',
-    help='Default driver to use for the scheduler')
+scheduler_driver_opt = cfg.StrOpt('scheduler_driver',
+                                  default='cinder.scheduler.simple.'
+                                          'SimpleScheduler',
+                                  help='Default scheduler driver to use')
 
 FLAGS = flags.FLAGS
 FLAGS.register_opt(scheduler_driver_opt)
@@ -56,6 +56,10 @@ class SchedulerManager(manager.Manager):
         self.driver = importutils.import_object(scheduler_driver)
         super(SchedulerManager, self).__init__(*args, **kwargs)
 
+    def init_host(self):
+        ctxt = context.get_admin_context()
+        self.request_service_capabilities(ctxt)
+
     def get_host_list(self, context):
         """Get a list of hosts from the HostManager."""
         return self.driver.get_host_list()
@@ -130,3 +134,6 @@ class SchedulerManager(manager.Manager):
 
         notifier.notify(context, notifier.publisher_id("scheduler"),
                         'scheduler.' + method, notifier.ERROR, payload)
+
+    def request_service_capabilities(self, context):
+        volume_rpcapi.VolumeAPI().publish_service_capabilities(context)
diff --git a/cinder/scheduler/scheduler_options.py b/cinder/scheduler/scheduler_options.py
new file mode 100644 (file)
index 0000000..039831b
--- /dev/null
@@ -0,0 +1,105 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2011 OpenStack, LLC.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+SchedulerOptions monitors a local .json file for changes and loads
+it if needed. This file is converted to a data structure and passed
+into the filtering and weighing functions which can use it for
+dynamic configuration.
+"""
+
+import datetime
+import json
+import os
+
+from cinder import flags
+from cinder.openstack.common import cfg
+from cinder.openstack.common import log as logging
+from cinder.openstack.common import timeutils
+
+
+scheduler_json_config_location_opt = cfg.StrOpt(
+        'scheduler_json_config_location',
+        default='',
+        help='Absolute path to scheduler configuration JSON file.')
+
+FLAGS = flags.FLAGS
+FLAGS.register_opt(scheduler_json_config_location_opt)
+
+LOG = logging.getLogger(__name__)
+
+
+class SchedulerOptions(object):
+    """
+    SchedulerOptions monitors a local .json file for changes and loads it
+    if needed. This file is converted to a data structure and passed into
+    the filtering and weighing functions which can use it for dynamic
+    configuration.
+    """
+
+    def __init__(self):
+        super(SchedulerOptions, self).__init__()
+        self.data = {}
+        self.last_modified = None
+        self.last_checked = None
+
+    def _get_file_handle(self, filename):
+        """Get file handle. Broken out for testing."""
+        return open(filename)
+
+    def _get_file_timestamp(self, filename):
+        """Get the last modified datetime. Broken out for testing."""
+        try:
+            return os.path.getmtime(filename)
+        except os.error, e:
+            LOG.exception(_("Could not stat scheduler options file "
+                            "%(filename)s: '%(e)s'"), locals())
+            raise
+
+    def _load_file(self, handle):
+        """Decode the JSON file. Broken out for testing."""
+        try:
+            return json.load(handle)
+        except ValueError, e:
+            LOG.exception(_("Could not decode scheduler options: "
+                            "'%(e)s'") % locals())
+            return {}
+
+    def _get_time_now(self):
+        """Get current UTC. Broken out for testing."""
+        return timeutils.utcnow()
+
+    def get_configuration(self, filename=None):
+        """Check the json file for changes and load it if needed."""
+        if not filename:
+            filename = FLAGS.scheduler_json_config_location
+        if not filename:
+            return self.data
+        if self.last_checked:
+            now = self._get_time_now()
+            if now - self.last_checked < datetime.timedelta(minutes=5):
+                return self.data
+
+        last_modified = self._get_file_timestamp(filename)
+        if (not last_modified or not self.last_modified or
+            last_modified > self.last_modified):
+            self.data = self._load_file(self._get_file_handle(filename))
+            self.last_modified = last_modified
+        if not self.data:
+            self.data = {}
+
+        return self.data
diff --git a/cinder/scheduler/weights/__init__.py b/cinder/scheduler/weights/__init__.py
new file mode 100644 (file)
index 0000000..ce4951d
--- /dev/null
@@ -0,0 +1,14 @@
+# Copyright (c) 2013 OpenStack, LLC.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
diff --git a/cinder/scheduler/weights/capacity.py b/cinder/scheduler/weights/capacity.py
new file mode 100644 (file)
index 0000000..2bb5a0b
--- /dev/null
@@ -0,0 +1,50 @@
+# Copyright (c) 2012 OpenStack, LLC.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+"""
+Capacity Weigher.  Weigh hosts by their available capacity.
+
+The default is to spread volumes across all hosts evenly.  If you prefer
+stacking, you can set the 'capacity_weight_multiplier' option to a negative
+number and the weighing has the opposite effect of the default.
+"""
+
+import math
+
+from cinder import flags
+from cinder.openstack.common import cfg
+from cinder.openstack.common.scheduler import weights
+
+
+capacity_weight_opts = [
+        cfg.FloatOpt('capacity_weight_multiplier',
+                     default=1.0,
+                     help='Multiplier used for weighing volume capacity. '
+                          'Negative numbers mean to stack vs spread.'),
+]
+
+FLAGS = flags.FLAGS
+FLAGS.register_opts(capacity_weight_opts)
+
+
+class CapacityWeigher(weights.BaseHostWeigher):
+    def _weight_multiplier(self):
+        """Override the weight multiplier."""
+        return FLAGS.capacity_weight_multiplier
+
+    def _weigh_object(self, host_state, weight_properties):
+        """Higher weights win.  We want spreading to be the default."""
+        reserved = float(host_state.reserved_percentage) / 100
+        free = math.floor(host_state.free_capacity_gb * (1 - reserved))
+        return free
index 97a4f8cd85b3fb6b99c596498d2e7094f1649c2f..6668a1087cf96969546d72010800ff6f2a020943 100644 (file)
 Fakes For Scheduler tests.
 """
 
+import mox
 
+from cinder import db
+from cinder.openstack.common import timeutils
+from cinder.scheduler import filter_scheduler
 from cinder.scheduler import host_manager
 
 
-class FakeHostManager(host_manager.HostManager):
-    """host1: free_ram_mb=1024-512-512=0, free_disk_gb=1024-512-512=0
-       host2: free_ram_mb=2048-512=1536  free_disk_gb=2048-512=1536
-       host3: free_ram_mb=4096-1024=3072  free_disk_gb=4096-1024=3072
-       host4: free_ram_mb=8192  free_disk_gb=8192"""
+VOLUME_SERVICES = [
+    dict(id=1, host='host1', topic='volume', disabled=False,
+         availability_zone='zone1', updated_at=timeutils.utcnow()),
+    dict(id=2, host='host2', topic='volume', disabled=False,
+         availability_zone='zone1', updated_at=timeutils.utcnow()),
+    dict(id=3, host='host3', topic='volume', disabled=False,
+         availability_zone='zone2', updated_at=timeutils.utcnow()),
+    dict(id=4, host='host4', topic='volume', disabled=False,
+         availability_zone='zone3', updated_at=timeutils.utcnow()),
+    # service on host5 is disabled
+    dict(id=5, host='host5', topic='volume', disabled=True,
+         availability_zone='zone4', updated_at=timeutils.utcnow()),
+]
+
+
+class FakeFilterScheduler(filter_scheduler.FilterScheduler):
+    def __init__(self, *args, **kwargs):
+        super(FakeFilterScheduler, self).__init__(*args, **kwargs)
+        self.host_manager = host_manager.HostManager()
 
+
+class FakeHostManager(host_manager.HostManager):
     def __init__(self):
         super(FakeHostManager, self).__init__()
 
         self.service_states = {
-            'host1': {
-                'compute': {'host_memory_free': 1073741824},
-            },
-            'host2': {
-                'compute': {'host_memory_free': 2147483648},
-            },
-            'host3': {
-                'compute': {'host_memory_free': 3221225472},
-            },
-            'host4': {
-                'compute': {'host_memory_free': 999999999},
-            },
+            'host1': {'total_capacity_gb': 1024,
+                      'free_capacity_gb': 1024,
+                      'reserved_percentage': 10,
+                      'timestamp': None},
+            'host2': {'total_capacity_gb': 2048,
+                      'free_capacity_gb': 300,
+                      'reserved_percentage': 10,
+                      'timestamp': None},
+            'host3': {'total_capacity_gb': 512,
+                      'free_capacity_gb': 512,
+                      'reserved_percentage': 0,
+                      'timestamp': None},
+            'host4': {'total_capacity_gb': 2048,
+                      'free_capacity_gb': 200,
+                      'reserved_percentage': 5,
+                      'timestamp': None},
         }
 
-    def get_host_list_from_db(self, context):
-        return [
-            ('host1', dict(free_disk_gb=1024, free_ram_mb=1024)),
-            ('host2', dict(free_disk_gb=2048, free_ram_mb=2048)),
-            ('host3', dict(free_disk_gb=4096, free_ram_mb=4096)),
-            ('host4', dict(free_disk_gb=8192, free_ram_mb=8192)),
-        ]
-
 
 class FakeHostState(host_manager.HostState):
-    def __init__(self, host, topic, attribute_dict):
-        super(FakeHostState, self).__init__(host, topic)
+    def __init__(self, host, attribute_dict):
+        super(FakeHostState, self).__init__(host)
         for (key, val) in attribute_dict.iteritems():
             setattr(self, key, val)
+
+
+def mox_host_manager_db_calls(mock, context):
+    mock.StubOutWithMock(db, 'service_get_all_by_topic')
+
+    db.service_get_all_by_topic(mox.IgnoreArg(),
+                                mox.IgnoreArg()).AndReturn(VOLUME_SERVICES)
diff --git a/cinder/tests/scheduler/test_capacity_weigher.py b/cinder/tests/scheduler/test_capacity_weigher.py
new file mode 100644 (file)
index 0000000..471364f
--- /dev/null
@@ -0,0 +1,87 @@
+# Copyright 2011-2012 OpenStack LLC.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+"""
+Tests For Capacity Weigher.
+"""
+
+from cinder import context
+from cinder.openstack.common.scheduler.weights import HostWeightHandler
+from cinder import test
+from cinder.tests.scheduler import fakes
+
+
+class CapacityWeigherTestCase(test.TestCase):
+    def setUp(self):
+        super(CapacityWeigherTestCase, self).setUp()
+        self.host_manager = fakes.FakeHostManager()
+        self.weight_handler = HostWeightHandler('cinder.scheduler.weights')
+        self.weight_classes = self.weight_handler.get_all_classes()
+
+    def _get_weighed_host(self, hosts, weight_properties=None):
+        if weight_properties is None:
+            weight_properties = {}
+        return self.weight_handler.get_weighed_objects(self.weight_classes,
+                                                       hosts,
+                                                       weight_properties)[0]
+
+    def _get_all_hosts(self):
+        ctxt = context.get_admin_context()
+        fakes.mox_host_manager_db_calls(self.mox, ctxt)
+        self.mox.ReplayAll()
+        host_states = self.host_manager.get_all_host_states(ctxt)
+        self.mox.VerifyAll()
+        self.mox.ResetAll()
+        return host_states
+
+    def test_default_of_spreading_first(self):
+        hostinfo_list = self._get_all_hosts()
+
+        # host1: free_capacity_gb=1024, free=1024*(1-0.1)
+        # host2: free_capacity_gb=300, free=300*(1-0.1)
+        # host3: free_capacity_gb=512, free=512
+        # host4: free_capacity_gb=200, free=200*(1-0.05)
+
+        # so, host1 should win:
+        weighed_host = self._get_weighed_host(hostinfo_list)
+        self.assertEqual(weighed_host.weight, 921.0)
+        self.assertEqual(weighed_host.obj.host, 'host1')
+
+    def test_capacity_weight_multiplier1(self):
+        self.flags(capacity_weight_multiplier=-1.0)
+        hostinfo_list = self._get_all_hosts()
+
+        # host1: free_capacity_gb=1024, free=-1024*(1-0.1)
+        # host2: free_capacity_gb=300, free=-300*(1-0.1)
+        # host3: free_capacity_gb=512, free=-512
+        # host4: free_capacity_gb=200, free=-200*(1-0.05)
+
+        # so, host4 should win:
+        weighed_host = self._get_weighed_host(hostinfo_list)
+        self.assertEqual(weighed_host.weight, -190.0)
+        self.assertEqual(weighed_host.obj.host, 'host4')
+
+    def test_capacity_weight_multiplier2(self):
+        self.flags(capacity_weight_multiplier=2.0)
+        hostinfo_list = self._get_all_hosts()
+
+        # host1: free_capacity_gb=1024, free=1024*(1-0.1)*2
+        # host2: free_capacity_gb=300, free=300*(1-0.1)*2
+        # host3: free_capacity_gb=512, free=512*2
+        # host4: free_capacity_gb=200, free=200*(1-0.05)*2
+
+        # so, host1 should win:
+        weighed_host = self._get_weighed_host(hostinfo_list)
+        self.assertEqual(weighed_host.weight, 921.0 * 2)
+        self.assertEqual(weighed_host.obj.host, 'host1')
diff --git a/cinder/tests/scheduler/test_filter_scheduler.py b/cinder/tests/scheduler/test_filter_scheduler.py
new file mode 100644 (file)
index 0000000..6f56575
--- /dev/null
@@ -0,0 +1,107 @@
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+"""
+Tests For Filter Scheduler.
+"""
+
+import mox
+
+from cinder import context
+from cinder import exception
+from cinder.openstack.common.scheduler import weights
+from cinder.scheduler import driver
+from cinder.scheduler import filter_scheduler
+from cinder.scheduler import host_manager
+from cinder.tests.scheduler import fakes
+from cinder.tests.scheduler import test_scheduler
+
+
+def fake_get_filtered_hosts(hosts, filter_properties):
+    return list(hosts)
+
+
+class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
+    """Test case for Filter Scheduler."""
+
+    driver_cls = filter_scheduler.FilterScheduler
+
+    def test_create_volume_no_hosts(self):
+        """
+        Ensure empty hosts & child_zones result in NoValidHosts exception.
+        """
+        def _fake_empty_call_zone_method(*args, **kwargs):
+            return []
+
+        sched = fakes.FakeFilterScheduler()
+
+        fake_context = context.RequestContext('user', 'project')
+        request_spec = {'volume_properties': {'project_id': 1,
+                                              'size': 1},
+                        'volume_type': {'name': 'LVM_iSCSI'},
+                        'volume_id': ['fake-id1']}
+        self.assertRaises(exception.NoValidHost, sched.schedule_create_volume,
+                          fake_context, request_spec, None)
+
+    def test_create_volume_non_admin(self):
+        """Test creating an instance locally using run_instance, passing
+        a non-admin context.  DB actions should work."""
+        self.was_admin = False
+
+        def fake_get(context, *args, **kwargs):
+            # make sure this is called with admin context, even though
+            # we're using user context below
+            self.was_admin = context.is_admin
+            return {}
+
+        sched = fakes.FakeFilterScheduler()
+        self.stubs.Set(sched.host_manager, 'get_all_host_states', fake_get)
+
+        fake_context = context.RequestContext('user', 'project')
+
+        request_spec = {'volume_properties': {'project_id': 1,
+                                              'size': 1},
+                        'volume_type': {'name': 'LVM_iSCSI'},
+                        'volume_id': ['fake-id1']}
+        self.assertRaises(exception.NoValidHost, sched.schedule_create_volume,
+                          fake_context, request_spec, None)
+        self.assertTrue(self.was_admin)
+
+    def test_schedule_happy_day(self):
+        """Make sure there's nothing glaringly wrong with _schedule()
+        by doing a happy day pass through."""
+
+        self.next_weight = 1.0
+
+        def _fake_weigh_objects(_self, functions, hosts, options):
+            self.next_weight += 2.0
+            host_state = hosts[0]
+            return [weights.WeighedHost(host_state, self.next_weight)]
+
+        sched = fakes.FakeFilterScheduler()
+        fake_context = context.RequestContext('user', 'project',
+                                              is_admin=True)
+
+        self.stubs.Set(sched.host_manager, 'get_filtered_hosts',
+                       fake_get_filtered_hosts)
+        self.stubs.Set(weights.HostWeightHandler,
+                       'get_weighed_objects', _fake_weigh_objects)
+        fakes.mox_host_manager_db_calls(self.mox, fake_context)
+
+        request_spec = {'volume_type': {'name': 'LVM_iSCSI'},
+                        'volume_properties': {'project_id': 1,
+                                              'size': 1}}
+        self.mox.ReplayAll()
+        weighed_host = sched._schedule(fake_context, request_spec, {})
+        self.assertTrue(weighed_host.obj is not None)
diff --git a/cinder/tests/scheduler/test_host_filters.py b/cinder/tests/scheduler/test_host_filters.py
new file mode 100644 (file)
index 0000000..5bcffd6
--- /dev/null
@@ -0,0 +1,99 @@
+# Copyright 2011 OpenStack LLC.  # All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+"""
+Tests For Scheduler Host Filters.
+"""
+
+import httplib
+import stubout
+
+from cinder import context
+from cinder import db
+from cinder import exception
+from cinder.openstack.common import jsonutils
+from cinder.openstack.common.scheduler import filters
+from cinder import test
+from cinder.tests.scheduler import fakes
+from cinder import utils
+
+
+DATA = ''
+
+
+def stub_out_https_backend(stubs):
+    """
+    Stubs out the httplib.HTTPRequest.getresponse to return
+    faked-out data instead of grabbing actual contents of a resource
+
+    The stubbed getresponse() returns an iterator over
+    the data "I am a teapot, short and stout\n"
+
+    :param stubs: Set of stubout stubs
+    """
+
+    class FakeHTTPResponse(object):
+
+        def read(self):
+            return DATA
+
+    def fake_do_request(self, *args, **kwargs):
+        return httplib.OK, FakeHTTPResponse()
+
+
+class HostFiltersTestCase(test.TestCase):
+    """Test case for host filters."""
+
+    def setUp(self):
+        super(HostFiltersTestCase, self).setUp()
+        self.stubs = stubout.StubOutForTesting()
+        stub_out_https_backend(self.stubs)
+        self.context = context.RequestContext('fake', 'fake')
+        self.json_query = jsonutils.dumps(
+                ['and', ['>=', '$free_capacity_gb', 1024],
+                 ['>=', '$total_capacity_gb', 10 * 1024]])
+        # This has a side effect of testing 'get_filter_classes'
+        # when specifying a method (in this case, our standard filters)
+        filter_handler = filters.HostFilterHandler('cinder.scheduler.filters')
+        classes = filter_handler.get_all_classes()
+        self.class_map = {}
+        for cls in classes:
+            self.class_map[cls.__name__] = cls
+
+    def _stub_service_is_up(self, ret_value):
+        def fake_service_is_up(service):
+            return ret_value
+        self.stubs.Set(utils, 'service_is_up', fake_service_is_up)
+
+    def test_capacity_filter_passes(self):
+        self._stub_service_is_up(True)
+        filt_cls = self.class_map['CapacityFilter']()
+        filter_properties = {'size': 100}
+        service = {'disabled': False}
+        host = fakes.FakeHostState('host1',
+                                   {'free_capacity_gb': 200,
+                                    'updated_at': None,
+                                    'service': service})
+        self.assertTrue(filt_cls.host_passes(host, filter_properties))
+
+    def test_capacity_filter_fails(self):
+        self._stub_service_is_up(True)
+        filt_cls = self.class_map['CapacityFilter']()
+        filter_properties = {'size': 100}
+        service = {'disabled': False}
+        host = fakes.FakeHostState('host1',
+                                   {'free_capacity_gb': 120,
+                                    'reserved_percentage': 20,
+                                    'updated_at': None,
+                                    'service': service})
+        self.assertFalse(filt_cls.host_passes(host, filter_properties))
diff --git a/cinder/tests/scheduler/test_host_manager.py b/cinder/tests/scheduler/test_host_manager.py
new file mode 100644 (file)
index 0000000..0d2207e
--- /dev/null
@@ -0,0 +1,176 @@
+# Copyright (c) 2011 OpenStack, LLC
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+"""
+Tests For HostManager
+"""
+
+
+from cinder import db
+from cinder import exception
+from cinder import flags
+from cinder.openstack.common.scheduler import filters
+from cinder.openstack.common import timeutils
+from cinder.scheduler import host_manager
+from cinder import test
+from cinder.tests.scheduler import fakes
+
+
+FLAGS = flags.FLAGS
+
+
+class FakeFilterClass1(filters.BaseHostFilter):
+    def host_passes(self, host_state, filter_properties):
+        pass
+
+
+class FakeFilterClass2(filters.BaseHostFilter):
+    def host_passes(self, host_state, filter_properties):
+        pass
+
+
+class HostManagerTestCase(test.TestCase):
+    """Test case for HostManager class"""
+
+    def setUp(self):
+        super(HostManagerTestCase, self).setUp()
+        self.host_manager = host_manager.HostManager()
+        self.fake_hosts = [host_manager.HostState('fake_host%s' % x)
+                           for x in xrange(1, 5)]
+
+    def test_choose_host_filters_not_found(self):
+        self.flags(scheduler_default_filters='FakeFilterClass3')
+        self.host_manager.filter_classes = [FakeFilterClass1,
+                                            FakeFilterClass2]
+        self.assertRaises(exception.SchedulerHostFilterNotFound,
+                          self.host_manager._choose_host_filters, None)
+
+    def test_choose_host_filters(self):
+        self.flags(scheduler_default_filters=['FakeFilterClass2'])
+        self.host_manager.filter_classes = [FakeFilterClass1,
+                                            FakeFilterClass2]
+
+        # Test 'volume' returns 1 correct function
+        filter_classes = self.host_manager._choose_host_filters(None)
+        self.assertEqual(len(filter_classes), 1)
+        self.assertEqual(filter_classes[0].__name__, 'FakeFilterClass2')
+
+    def _mock_get_filtered_hosts(self, info, specified_filters=None):
+        self.mox.StubOutWithMock(self.host_manager, '_choose_host_filters')
+
+        info['got_objs'] = []
+        info['got_fprops'] = []
+
+        def fake_filter_one(_self, obj, filter_props):
+            info['got_objs'].append(obj)
+            info['got_fprops'].append(filter_props)
+            return True
+
+        self.stubs.Set(FakeFilterClass1, '_filter_one', fake_filter_one)
+        self.host_manager._choose_host_filters(specified_filters).AndReturn(
+                [FakeFilterClass1])
+
+    def _verify_result(self, info, result):
+        for x in info['got_fprops']:
+            self.assertEqual(x, info['expected_fprops'])
+        self.assertEqual(set(info['expected_objs']), set(info['got_objs']))
+        self.assertEqual(set(result), set(info['got_objs']))
+
+    def test_get_filtered_hosts(self):
+        fake_properties = {'moo': 1, 'cow': 2}
+
+        info = {'expected_objs': self.fake_hosts,
+                'expected_fprops': fake_properties}
+
+        self._mock_get_filtered_hosts(info)
+
+        self.mox.ReplayAll()
+        result = self.host_manager.get_filtered_hosts(self.fake_hosts,
+                                                      fake_properties)
+        self._verify_result(info, result)
+
+    def test_update_service_capabilities(self):
+        service_states = self.host_manager.service_states
+        self.assertDictMatch(service_states, {})
+        self.mox.StubOutWithMock(timeutils, 'utcnow')
+        timeutils.utcnow().AndReturn(31337)
+        timeutils.utcnow().AndReturn(31338)
+        timeutils.utcnow().AndReturn(31339)
+
+        host1_volume_capabs = dict(free_capacity_gb=4321, timestamp=1)
+        host2_volume_capabs = dict(free_capacity_gb=5432, timestamp=1)
+        host3_volume_capabs = dict(free_capacity_gb=6543, timestamp=1)
+
+        self.mox.ReplayAll()
+        service_name = 'volume'
+        self.host_manager.update_service_capabilities(service_name, 'host1',
+                                                      host1_volume_capabs)
+        self.host_manager.update_service_capabilities(service_name, 'host2',
+                                                      host2_volume_capabs)
+        self.host_manager.update_service_capabilities(service_name, 'host3',
+                                                      host3_volume_capabs)
+
+        # Make sure dictionary isn't re-assigned
+        self.assertEqual(self.host_manager.service_states, service_states)
+        # Make sure original dictionary wasn't copied
+        self.assertEqual(host1_volume_capabs['timestamp'], 1)
+
+        host1_volume_capabs['timestamp'] = 31337
+        host2_volume_capabs['timestamp'] = 31338
+        host3_volume_capabs['timestamp'] = 31339
+
+        expected = {'host1': host1_volume_capabs,
+                    'host2': host2_volume_capabs,
+                    'host3': host3_volume_capabs}
+        self.assertDictMatch(service_states, expected)
+
+    def test_get_all_host_states(self):
+        context = 'fake_context'
+        topic = FLAGS.volume_topic
+
+        self.mox.StubOutWithMock(db, 'service_get_all_by_topic')
+        self.mox.StubOutWithMock(host_manager.LOG, 'warn')
+
+        ret_services = fakes.VOLUME_SERVICES
+        db.service_get_all_by_topic(context, topic).AndReturn(ret_services)
+        # Disabled service
+        host_manager.LOG.warn("service is down or disabled.")
+
+        self.mox.ReplayAll()
+        self.host_manager.get_all_host_states(context)
+        host_state_map = self.host_manager.host_state_map
+
+        self.assertEqual(len(host_state_map), 4)
+        # Check that service is up
+        for i in xrange(4):
+            volume_node = fakes.VOLUME_SERVICES[i]
+            host = volume_node['host']
+            self.assertEqual(host_state_map[host].service,
+                             volume_node)
+
+
+class HostStateTestCase(test.TestCase):
+    """Test case for HostState class"""
+
+    def test_update_from_volume_capability(self):
+        fake_host = host_manager.HostState('host1')
+        self.assertEqual(fake_host.free_capacity_gb, 0)
+
+        volume_capability = {'total_capacity_gb': 1024,
+                             'free_capacity_gb': 512,
+                             'reserved_percentage': 0,
+                             'timestamp': None}
+
+        fake_host.update_from_volume_capability(volume_capability)
+        self.assertEqual(fake_host.free_capacity_gb, 512)
diff --git a/cinder/tests/scheduler/test_scheduler_options.py b/cinder/tests/scheduler/test_scheduler_options.py
new file mode 100644 (file)
index 0000000..7ecb772
--- /dev/null
@@ -0,0 +1,138 @@
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+"""
+Tests For PickledScheduler.
+"""
+
+import datetime
+import StringIO
+
+from cinder.openstack.common import jsonutils
+from cinder.scheduler import scheduler_options
+from cinder import test
+
+
+class FakeSchedulerOptions(scheduler_options.SchedulerOptions):
+    def __init__(self, last_checked, now, file_old, file_now, data, filedata):
+        super(FakeSchedulerOptions, self).__init__()
+        # Change internals ...
+        self.last_modified = file_old
+        self.last_checked = last_checked
+        self.data = data
+
+        # For overrides ...
+        self._time_now = now
+        self._file_now = file_now
+        self._file_data = filedata
+
+        self.file_was_loaded = False
+
+    def _get_file_timestamp(self, filename):
+        return self._file_now
+
+    def _get_file_handle(self, filename):
+        self.file_was_loaded = True
+        return StringIO.StringIO(self._file_data)
+
+    def _get_time_now(self):
+        return self._time_now
+
+
+class SchedulerOptionsTestCase(test.TestCase):
+    def test_get_configuration_first_time_no_flag(self):
+        last_checked = None
+        now = datetime.datetime(2012, 1, 1, 1, 1, 1)
+        file_old = None
+        file_now = datetime.datetime(2012, 1, 1, 1, 1, 1)
+
+        data = dict(a=1, b=2, c=3)
+        jdata = jsonutils.dumps(data)
+
+        fake = FakeSchedulerOptions(last_checked, now, file_old, file_now,
+                                    {}, jdata)
+        self.assertEquals({}, fake.get_configuration())
+        self.assertFalse(fake.file_was_loaded)
+
+    def test_get_configuration_first_time_empty_file(self):
+        last_checked = None
+        now = datetime.datetime(2012, 1, 1, 1, 1, 1)
+        file_old = None
+        file_now = datetime.datetime(2012, 1, 1, 1, 1, 1)
+
+        data = dict(a=1, b=2, c=3)
+        jdata = ""
+
+        fake = FakeSchedulerOptions(last_checked, now, file_old, file_now,
+                                    {}, jdata)
+        self.assertEquals({}, fake.get_configuration('foo.json'))
+        self.assertTrue(fake.file_was_loaded)
+
+    def test_get_configuration_first_time_happy_day(self):
+        last_checked = None
+        now = datetime.datetime(2012, 1, 1, 1, 1, 1)
+        file_old = None
+        file_now = datetime.datetime(2012, 1, 1, 1, 1, 1)
+
+        data = dict(a=1, b=2, c=3)
+        jdata = jsonutils.dumps(data)
+
+        fake = FakeSchedulerOptions(last_checked, now, file_old, file_now,
+                                    {}, jdata)
+        self.assertEquals(data, fake.get_configuration('foo.json'))
+        self.assertTrue(fake.file_was_loaded)
+
+    def test_get_configuration_second_time_no_change(self):
+        last_checked = datetime.datetime(2011, 1, 1, 1, 1, 1)
+        now = datetime.datetime(2012, 1, 1, 1, 1, 1)
+        file_old = datetime.datetime(2012, 1, 1, 1, 1, 1)
+        file_now = datetime.datetime(2012, 1, 1, 1, 1, 1)
+
+        data = dict(a=1, b=2, c=3)
+        jdata = jsonutils.dumps(data)
+
+        fake = FakeSchedulerOptions(last_checked, now, file_old, file_now,
+                                    data, jdata)
+        self.assertEquals(data, fake.get_configuration('foo.json'))
+        self.assertFalse(fake.file_was_loaded)
+
+    def test_get_configuration_second_time_too_fast(self):
+        last_checked = datetime.datetime(2011, 1, 1, 1, 1, 1)
+        now = datetime.datetime(2011, 1, 1, 1, 1, 2)
+        file_old = datetime.datetime(2012, 1, 1, 1, 1, 1)
+        file_now = datetime.datetime(2013, 1, 1, 1, 1, 1)
+
+        old_data = dict(a=1, b=2, c=3)
+        data = dict(a=11, b=12, c=13)
+        jdata = jsonutils.dumps(data)
+
+        fake = FakeSchedulerOptions(last_checked, now, file_old, file_now,
+                                    old_data, jdata)
+        self.assertEquals(old_data, fake.get_configuration('foo.json'))
+        self.assertFalse(fake.file_was_loaded)
+
+    def test_get_configuration_second_time_change(self):
+        last_checked = datetime.datetime(2011, 1, 1, 1, 1, 1)
+        now = datetime.datetime(2012, 1, 1, 1, 1, 1)
+        file_old = datetime.datetime(2012, 1, 1, 1, 1, 1)
+        file_now = datetime.datetime(2013, 1, 1, 1, 1, 1)
+
+        old_data = dict(a=1, b=2, c=3)
+        data = dict(a=11, b=12, c=13)
+        jdata = jsonutils.dumps(data)
+
+        fake = FakeSchedulerOptions(last_checked, now, file_old, file_now,
+                                    old_data, jdata)
+        self.assertEquals(data, fake.get_configuration('foo.json'))
+        self.assertTrue(fake.file_was_loaded)
index 1ae2e95c08faacbf69b3b8fa8059e1b8badbc244..2450f2b6777729c7056c6a5abff635471877cd85 100644 (file)
@@ -30,6 +30,7 @@ from cinder import flags
 from cinder.image import image_utils
 from cinder.openstack.common import cfg
 from cinder.openstack.common import log as logging
+from cinder.openstack.common import timeutils
 from cinder import utils
 from cinder.volume import iscsi
 
@@ -61,7 +62,11 @@ volume_opts = [
                help='use this ip for iscsi'),
     cfg.IntOpt('iscsi_port',
                default=3260,
-               help='The port that the iSCSI daemon is listening on'), ]
+               help='The port that the iSCSI daemon is listening on'),
+    cfg.IntOpt('reserved_percentage',
+               default=0,
+               help='The percentage of backend capacity is reserved'),
+]
 
 FLAGS = flags.FLAGS
 FLAGS.register_opts(volume_opts)
@@ -73,6 +78,7 @@ class VolumeDriver(object):
         # NOTE(vish): db is set by Manager
         self.db = None
         self.set_execute(execute)
+        self._stats = {}
 
     def set_execute(self, execute):
         self._execute = execute
@@ -619,6 +625,49 @@ class ISCSIDriver(VolumeDriver):
     def terminate_connection(self, volume, connector, **kwargs):
         pass
 
+    def get_volume_stats(self, refresh=False):
+        """Get volume status.
+
+        If 'refresh' is True, run update the stats first."""
+        if refresh:
+            self._update_volume_status()
+
+        return self._stats
+
+    def _update_volume_status(self):
+        """Retrieve status info from volume group."""
+
+        LOG.debug(_("Updating volume status"))
+        data = {}
+
+        # Note(zhiteng): These information are driver/backend specific,
+        # each driver may define these values in its own config options
+        # or fetch from driver specific configuration file.
+        data["volume_backend_name"] = 'LVM_iSCSI'
+        data["vendor_name"] = 'Open Source'
+        data["driver_version"] = '1.0'
+        data["storage_protocol"] = 'iSCSI'
+
+        data['total_capacity_gb'] = 0
+        data['free_capacity_gb'] = 0
+        data['reserved_percentage'] = FLAGS.reserved_percentage
+        data['QoS_support'] = False
+
+        try:
+            out, err = self._execute('vgs', '--noheadings', '--nosuffix',
+                                     '--unit=G', '-o', 'name,size,free',
+                                     FLAGS.volume_group, run_as_root=True)
+        except exception.ProcessExecutionError as exc:
+            LOG.error(_("Error retrieving volume status: "), exc.stderr)
+            out = False
+
+        if out:
+            volume = out.split()
+            data['total_capacity_gb'] = float(volume[1])
+            data['free_capacity_gb'] = float(volume[2])
+
+        self._stats = data
+
     def copy_image_to_volume(self, context, volume, image_service, image_id):
         """Fetch the image from image_service and write it to the volume."""
         image_utils.fetch_to_raw(context,
index 7400f62a7fe33c1b5f81568c59120321f144977d..3567c985dc79cc2e5b44694db4b468c94e29bac3 100644 (file)
@@ -61,9 +61,7 @@ volume_manager_opts = [
     cfg.StrOpt('volume_driver',
                default='cinder.volume.driver.ISCSIDriver',
                help='Driver to use for volume creation'),
-    cfg.BoolOpt('volume_force_update_capabilities',
-                default=False,
-                help='if True will force update capabilities on each check'), ]
+]
 
 FLAGS = flags.FLAGS
 FLAGS.register_opts(volume_manager_opts)
@@ -103,7 +101,7 @@ MAPPING = {
 class VolumeManager(manager.SchedulerDependentManager):
     """Manages attachable block storage devices."""
 
-    RPC_API_VERSION = '1.1'
+    RPC_API_VERSION = '1.2'
 
     def __init__(self, volume_driver=None, *args, **kwargs):
         """Load the driver from the one specified in args, or from flags."""
@@ -120,7 +118,6 @@ class VolumeManager(manager.SchedulerDependentManager):
         # NOTE(vish): Implementation specific db handling is done
         #             by the driver.
         self.driver.db = self.db
-        self._last_volume_stats = []
 
     def init_host(self):
         """Do any initialization that needs to be run if this is a
@@ -144,6 +141,9 @@ class VolumeManager(manager.SchedulerDependentManager):
                 LOG.info(_('Resuming delete on volume: %s') % volume['id'])
                 self.delete_volume(ctxt, volume['id'])
 
+        # collect and publish service capabilities
+        self.publish_service_capabilities(ctxt)
+
     def create_volume(self, context, volume_id, snapshot_id=None,
                       image_id=None, source_volid=None):
         """Creates and exports the volume."""
@@ -490,33 +490,19 @@ class VolumeManager(manager.SchedulerDependentManager):
         volume_ref = self.db.volume_get(context, volume_id)
         self.driver.terminate_connection(volume_ref, connector, force=force)
 
-    def _volume_stats_changed(self, stat1, stat2):
-        if FLAGS.volume_force_update_capabilities:
-            return True
-        if len(stat1) != len(stat2):
-            return True
-        for (k, v) in stat1.iteritems():
-            if (k, v) not in stat2.iteritems():
-                return True
-        return False
-
     @manager.periodic_task
     def _report_driver_status(self, context):
+        LOG.info(_("Updating volume status"))
         volume_stats = self.driver.get_volume_stats(refresh=True)
         if volume_stats:
-            LOG.info(_("Checking volume capabilities"))
-
-            if self._volume_stats_changed(self._last_volume_stats,
-                                          volume_stats):
-                LOG.info(_("New capabilities found: %s"), volume_stats)
-                self._last_volume_stats = volume_stats
-
-                # This will grab info about the host and queue it
-                # to be sent to the Schedulers.
-                self.update_service_capabilities(self._last_volume_stats)
-            else:
-                # avoid repeating fanouts
-                self.update_service_capabilities(None)
+            # This will grab info about the host and queue it
+            # to be sent to the Schedulers.
+            self.update_service_capabilities(volume_stats)
+
+    def publish_service_capabilities(self, context):
+        """ Collect driver status and then publish """
+        self._report_driver_status(context)
+        self._publish_service_capabilities(context)
 
     def _reset_stats(self):
         LOG.info(_("Clear capabilities"))
index 54bfabd03c391f74bcecb90280b398876f470f63..0f7621016f56a156378b9fb3e4c109990a050fcf 100644 (file)
@@ -34,6 +34,7 @@ class VolumeAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
 
         1.0 - Initial version.
         1.1 - Adds clone volume option to create_volume.
+        1.2 - Add publish_service_capabilities() method.
     '''
 
     BASE_RPC_API_VERSION = '1.0'
@@ -114,3 +115,7 @@ class VolumeAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
                          topic=rpc.queue_get_for(ctxt,
                                                  self.topic,
                                                  volume['host']))
+
+    def publish_service_capabilities(self, ctxt):
+        self.fanout_cast(ctxt, self.make_msg('publish_service_capabilities'),
+                         version='1.2')
index 3f8044fcbb0d8e1001b2f3ec10ca2cc967b08c2c..93efc6ff04d520784aa62a052745f1a2fcfe374f 100644 (file)
@@ -1,7 +1,7 @@
 [DEFAULT]
 
 # The list of modules to copy from openstack-common
-modules=cfg,exception,excutils,gettextutils,importutils,iniparser,jsonutils,local,rpc,timeutils,log,setup,notifier,context,network_utils,policy,uuidutils,lockutils,fileutils,gettextutils
+modules=cfg,exception,excutils,gettextutils,importutils,iniparser,jsonutils,local,rpc,timeutils,log,setup,notifier,context,network_utils,policy,uuidutils,lockutils,fileutils,gettextutils,scheduler,scheduler.filters,scheduler.weights
 
 # The base module to hold the copy of openstack.common
 base=cinder
index 55f69c5fa284e38c23592fe38b1fc92b3f98216e..6b7cccf3a0cce8e86b4b96f31d6cba89973f6506 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -23,6 +23,23 @@ from cinder import version
 
 requires = common_setup.parse_requirements()
 
+filters = [
+    "AvailabilityZoneFilter = "
+    "cinder.openstack.common.scheduler.filters."
+    "availability_zone_filter:AvailabilityZoneFilter",
+    "CapabilitiesFilter = "
+    "cinder.openstack.common.scheduler.filters."
+    "capabilities_filter:CapabilitiesFilter",
+    "CapacityFilter = "
+    "cinder.scheduler.filters.capacity_filter:CapacityFilter",
+    "JsonFilter = "
+    "cinder.openstack.common.scheduler.filters.json_filter:JsonFilter",
+]
+
+weights = [
+    "CapacityWeigher = cinder.scheduler.weights.capacity:CapacityWeigher",
+]
+
 setuptools.setup(
     name='cinder',
     version=version.canonical_version_string(),
@@ -43,6 +60,10 @@ setuptools.setup(
     cmdclass=common_setup.get_cmdclass(),
     packages=setuptools.find_packages(exclude=['bin', 'smoketests']),
     install_requires=requires,
+    entry_points={
+        'cinder.scheduler.filters': filters,
+        'cinder.scheduler.weights': weights,
+    },
     include_package_data=True,
     test_suite='nose.collector',
     setup_requires=['setuptools_git>=0.4'],
index 9ef13a00d31cd7e69a00278d938157a60f9c678f..556a7012dc9944395b3fb252bcfd7ac0f459013a 100644 (file)
@@ -12,6 +12,7 @@ greenlet>=0.3.1
 PasteDeploy==1.5.0
 paste
 sqlalchemy-migrate>=0.7.2
+stevedore>=0.8.0
 suds==0.4
 paramiko
 Babel>=0.9.6