]> review.fuel-infra Code Review - openstack-build/heat-build.git/commitdiff
Use openstack-common service.py
authorAngus Salkeld <asalkeld@redhat.com>
Fri, 2 Nov 2012 11:02:00 +0000 (22:02 +1100)
committerAngus Salkeld <asalkeld@redhat.com>
Fri, 2 Nov 2012 11:02:00 +0000 (22:02 +1100)
Change-Id: Idbc145209c039e9362cf97c9926e050f809ef0fa

bin/heat-engine
heat/engine/service.py [moved from heat/engine/manager.py with 83% similarity]
heat/manager.py [deleted file]
heat/service.py [deleted file]
heat/tests/functional/test_WordPress_2_Instances.py
heat/tests/functional/test_WordPress_2_Instances_With_EBS.py
heat/tests/functional/test_WordPress_Single_Instance_With_EBS.py
heat/tests/functional/test_WordPress_Single_Instance_With_EBS_EIP.py
heat/tests/functional/test_WordPress_Single_Instance_With_EIP.py
heat/tests/test_engine_service.py [moved from heat/tests/test_engine_manager.py with 94% similarity]
heat/tests/test_validate.py

index 22f4ae9a4fc48f99f2da8aaa1bf32ecb812134e0..9a4550c336ad80dfa70b88898a7d6317b875b894 100755 (executable)
@@ -38,28 +38,26 @@ if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'heat', '__init__.py')):
 
 gettext.install('heat', unicode=1)
 
-from heat.openstack.common import log as logging
 from heat.openstack.common import cfg
-from heat import service
+from heat.openstack.common import log as logging
+from heat.openstack.common import service
+
 from heat.common import config
-from heat.common import utils
 from heat.db import api as db_api
+from heat.engine import service as engine
+
 
 logger = logging.getLogger('heat.engine')
 
 if __name__ == '__main__':
 
-    default_manager = 'heat.engine.manager.EngineManager'
     cfg.CONF(project='heat', prog='heat-engine')
 
     config.setup_logging()
     config.register_engine_opts()
     db_api.configure()
 
-    #utils.monkey_patch()
-    server = service.Service.create(binary='heat-engine',
-                                    topic='engine',
-                                    manager=default_manager,
-                                    config=cfg.CONF)
-    service.serve(server)
-    service.wait()
+    srv = engine.EngineService(cfg.CONF.host,
+                                'engine')
+    launcher = service.launch(srv)
+    launcher.wait()
similarity index 83%
rename from heat/engine/manager.py
rename to heat/engine/service.py
index 335c09e52a760f8b77e36cbc29c8f3052cbf876a..36872edb74999de0cc8a572f50c2c10e773d61bc 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-
-from copy import deepcopy
-import datetime
 import webob
-import json
-import urlparse
-import httplib
-import eventlet
-import collections
 
-from heat import manager
+from heat.common import context
 from heat.db import api as db_api
-from heat.common import config
-from heat.common import utils as heat_utils
 from heat.engine import api
 from heat.engine import identifier
 from heat.engine import parser
 from heat.engine import watchrule
-from heat.engine import auth
 
 from heat.openstack.common import cfg
-from heat.openstack.common import timeutils
 from heat.openstack.common import log as logging
+from heat.openstack.common import threadgroup
+from heat.openstack.common.gettextutils import _
+from heat.openstack.common.rpc import service
 
-from novaclient.v1_1 import client
-from novaclient.exceptions import BadRequest
-from novaclient.exceptions import NotFound
-from novaclient.exceptions import AuthorizationFailure
 
-logger = logging.getLogger('heat.engine.manager')
-greenpool = eventlet.GreenPool()
+logger = logging.getLogger(__name__)
 
 
-class EngineManager(manager.Manager):
+class EngineService(service.Service):
     """
     Manages the running instances from creation to destruction.
     All the methods in here are called from the RPC backend.  This is
@@ -56,35 +42,23 @@ class EngineManager(manager.Manager):
     are also dynamically added and will be named as keyword arguments
     by the RPC caller.
     """
-
-    def __init__(self, *args, **kwargs):
-        """Load configuration options and connect to the hypervisor."""
-
-        # Maintain a dict mapping stack ids to in-progress greenthreads
-        # allows us to kill any pending create|update before delete_stack
-        #
-        # Currently we should only ever have one outstanding thread, but
-        # the implementation makes this a dict-of-sets so we could use
-        # the same method to cancel multiple threads, e.g if long-running
-        # query actions need to be spawned instead of run immediately
-        self.stack_threads = collections.defaultdict(set)
-
-    def _gt_done_callback(self, gt, **kwargs):
-        '''
-        Callback function to be passed to GreenThread.link() when we spawn()
-        Removes the thread ID from the stack_threads set of pending threads
-        kwargs should contain 'stack_id'
-        '''
-        if not 'stack_id' in kwargs:
-            logger.error("_gt_done_callback called with no stack_id!")
-        else:
-            stack_id = kwargs['stack_id']
-            if stack_id in self.stack_threads:
-                logger.debug("Thread done callback for stack %s, %s" %
-                             (stack_id, gt))
-                self.stack_threads[stack_id].discard(gt)
-                if not len(self.stack_threads[stack_id]):
-                    del self.stack_threads[stack_id]
+    def __init__(self, host, topic, manager=None):
+        super(EngineService, self).__init__(host, topic)
+        # stg == "Stack Thread Groups"
+        self.stg = {}
+
+    def _start_in_thread(self, stack_id, stack_name, func, *args, **kwargs):
+        if stack_id not in self.stg:
+            thr_name = '%s-%s' % (stack_name, stack_id)
+            self.stg[stack_id] = threadgroup.ThreadGroup(thr_name)
+        self.stg[stack_id].add_thread(func, *args, **kwargs)
+
+    def start(self):
+        super(EngineService, self).start()
+        admin_context = context.get_admin_context()
+        self.tg.add_timer(cfg.CONF.periodic_interval,
+                    self._periodic_watcher_task,
+                    context=admin_context)
 
     def identify_stack(self, context, stack_name):
         """
@@ -165,11 +139,7 @@ class EngineManager(manager.Manager):
 
         stack_id = stack.store()
 
-        # Spawn a greenthread to do the create, and register a
-        # callback to remove the thread from stack_threads when done
-        gt = greenpool.spawn(stack.create)
-        gt.link(self._gt_done_callback, stack_id=stack_id)
-        self.stack_threads[stack_id].add(gt)
+        self._start_in_thread(stack_id, stack_name, stack.create)
 
         return dict(stack.identifier())
 
@@ -206,11 +176,9 @@ class EngineManager(manager.Manager):
         if response:
             return {'Description': response}
 
-        # Spawn a greenthread to do the update, and register a
-        # callback to remove the thread from stack_threads when done
-        gt = greenpool.spawn(current_stack.update, updated_stack)
-        gt.link(self._gt_done_callback, stack_id=db_stack.id)
-        self.stack_threads[db_stack.id].add(gt)
+        self._start_in_thread(db_stack.id, db_stack.name,
+                              current_stack.update,
+                              updated_stack)
 
         return dict(current_stack.identifier())
 
@@ -279,17 +247,13 @@ class EngineManager(manager.Manager):
 
         stack = parser.Stack.load(context, st.id)
 
-        # Kill any in-progress create or update threads
-        if st.id in self.stack_threads:
-            # Note we must use set.copy() here or we get an error when thread
-            # rescheduling happens on t.kill() and _gt_done_callback modifies
-            # stack_threads[st.id] mid-iteration
-            for t in self.stack_threads[st.id].copy():
-                logger.warning("Killing running thread %s for stack %s" %
-                               (t, st.name))
-                t.kill()
-
-        greenpool.spawn_n(stack.delete)
+        # TODO Angus do we need a kill or will stop do?
+        if st.id in self.stg:
+            self.stg[st.id].stop()
+            self.stg[st.id].wait()
+            del self.stg[st.id]
+        # use the service ThreadGroup for deletes
+        self.tg.add_thread(stack.delete)
         return None
 
     def list_events(self, context, stack_identity):
@@ -438,10 +402,7 @@ class EngineManager(manager.Manager):
 
         return [None, resource.metadata]
 
-    @manager.periodic_task
     def _periodic_watcher_task(self, context):
-
-        now = timeutils.utcnow()
         try:
             wrn = [w.name for w in db_api.watch_rule_get_all(context)]
         except Exception as ex:
diff --git a/heat/manager.py b/heat/manager.py
deleted file mode 100644 (file)
index 7313537..0000000
+++ /dev/null
@@ -1,182 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-"""Base Manager class.
-
-Managers are responsible for a certain aspect of the system.  It is a logical
-grouping of code relating to a portion of the system.  In general other
-components should be using the manager to make changes to the components that
-it is responsible for.
-
-For example, other components that need to deal with volumes in some way,
-should do so by calling methods on the VolumeManager instead of directly
-changing fields in the database.  This allows us to keep all of the code
-relating to volumes in the same place.
-
-We have adopted a basic strategy of Smart managers and dumb data, which means
-rather than attaching methods to data objects, components should call manager
-methods that act on the data.
-
-Methods on managers that can be executed locally should be called directly. If
-a particular method must execute on a remote host, this should be done via rpc
-to the service that wraps the manager
-
-Managers should be responsible for most of the db access, and
-non-implementation specific data.  Anything implementation specific that can't
-be generalized should be done by the Driver.
-
-In general, we prefer to have one manager with multiple drivers for different
-implementations, but sometimes it makes sense to have multiple managers.  You
-can think of it this way: Abstract different overall strategies at the manager
-level(FlatNetwork vs VlanNetwork), and different implementations at the driver
-level(LinuxNetDriver vs CiscoNetDriver).
-
-Managers will often provide methods for initial setup of a host or periodic
-tasks to a wrapping service.
-
-This module provides Manager, a base class for managers.
-
-"""
-
-from heat import version
-
-from heat.openstack.common.rpc import dispatcher as rpc_dispatcher
-from heat.openstack.common import log as logging
-from heat.openstack.common import cfg
-
-
-LOG = logging.getLogger(__name__)
-
-
-def periodic_task(*args, **kwargs):
-    """Decorator to indicate that a method is a periodic task.
-
-    This decorator can be used in two ways:
-
-        1. Without arguments '@periodic_task', this will be run on every tick
-           of the periodic scheduler.
-
-        2. With arguments, @periodic_task(ticks_between_runs=N), this will be
-           run on every N ticks of the periodic scheduler.
-    """
-    def decorator(f):
-        f._periodic_task = True
-        f._ticks_between_runs = kwargs.pop('ticks_between_runs', 0)
-        return f
-
-    # NOTE(sirp): The `if` is necessary to allow the decorator to be used with
-    # and without parens.
-    #
-    # In the 'with-parens' case (with kwargs present), this function needs to
-    # return a decorator function since the interpreter will invoke it like:
-    #
-    #   periodic_task(*args, **kwargs)(f)
-    #
-    # In the 'without-parens' case, the original function will be passed
-    # in as the first argument, like:
-    #
-    #   periodic_task(f)
-    if kwargs:
-        return decorator
-    else:
-        return decorator(args[0])
-
-
-class ManagerMeta(type):
-    def __init__(cls, names, bases, dict_):
-        """Metaclass that allows us to collect decorated periodic tasks."""
-        super(ManagerMeta, cls).__init__(names, bases, dict_)
-
-        # NOTE(sirp): if the attribute is not present then we must be the base
-        # class, so, go ahead an initialize it. If the attribute is present,
-        # then we're a subclass so make a copy of it so we don't step on our
-        # parent's toes.
-        try:
-            cls._periodic_tasks = cls._periodic_tasks[:]
-        except AttributeError:
-            cls._periodic_tasks = []
-
-        try:
-            cls._ticks_to_skip = cls._ticks_to_skip.copy()
-        except AttributeError:
-            cls._ticks_to_skip = {}
-
-        for value in cls.__dict__.values():
-            if getattr(value, '_periodic_task', False):
-                task = value
-                name = task.__name__
-                cls._periodic_tasks.append((name, task))
-                cls._ticks_to_skip[name] = task._ticks_between_runs
-
-
-class Manager(object):
-    __metaclass__ = ManagerMeta
-
-    def __init__(self, host=None, db_driver=None):
-        if not host:
-            host = cfg.CONF.host
-        self.host = host
-        super(Manager, self).__init__(db_driver)
-
-    def create_rpc_dispatcher(self):
-        '''Get the rpc dispatcher for this manager.
-
-        If a manager would like to set an rpc API version, or support more than
-        one class as the target of rpc messages, override this method.
-        '''
-        return rpc_dispatcher.RpcDispatcher([self])
-
-    def periodic_tasks(self, context, raise_on_error=False):
-        """Tasks to be run at a periodic interval."""
-        for task_name, task in self._periodic_tasks:
-            full_task_name = '.'.join([self.__class__.__name__, task_name])
-
-            ticks_to_skip = self._ticks_to_skip[task_name]
-            if ticks_to_skip > 0:
-                LOG.debug(_("Skipping %(full_task_name)s, %(ticks_to_skip)s"
-                            " ticks left until next run"), locals())
-                self._ticks_to_skip[task_name] -= 1
-                continue
-
-            self._ticks_to_skip[task_name] = task._ticks_between_runs
-            LOG.debug(_("Running periodic task %(full_task_name)s"), locals())
-
-            try:
-                task(self, context)
-            except Exception as e:
-                if raise_on_error:
-                    raise
-                LOG.exception(_("Error during %(full_task_name)s: %(e)s"),
-                              locals())
-
-    def init_host(self):
-        """Handle initialization if this is a standalone service.
-
-        Child classes should override this method.
-
-        """
-        pass
-
-    def service_version(self, context):
-        return version.version_string()
-
-    def service_config(self, context):
-        config = {}
-        for key in cfg.CONF:
-            config[key] = cfg.CONF.get(key, None)
-        return config
diff --git a/heat/service.py b/heat/service.py
deleted file mode 100644 (file)
index 5552444..0000000
+++ /dev/null
@@ -1,241 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# Copyright 2011 Justin Santa Barbara
-# All Rights Reserved.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-"""Generic Node base class for all workers that run on hosts."""
-
-import inspect
-import os
-
-import eventlet
-import greenlet
-
-from heat.openstack.common import rpc
-from heat.openstack.common import cfg
-from heat.openstack.common import importutils
-from heat.openstack.common import log as logging
-
-from heat.common import utils as heat_utils
-from heat.common import exception
-from heat.common import context
-
-from heat import version
-
-LOG = logging.getLogger(__name__)
-
-
-class Launcher(object):
-    """
-    Launch one or more services and wait for them to complete.
-    """
-
-    def __init__(self):
-        """Initialize the service launcher.
-
-        :returns: None
-
-        """
-        self._services = []
-
-    @staticmethod
-    def run_server(server):
-        """Start and wait for a server to finish.
-
-        :param service: Server to run and wait for.
-        :returns: None
-
-        """
-        server.start()
-        server.wait()
-
-    def launch_server(self, server):
-        """Load and start the given server.
-
-        :param server: The server you would like to start.
-        :returns: None
-
-        """
-        gt = eventlet.spawn(self.run_server, server)
-        self._services.append(gt)
-
-    def stop(self):
-        """Stop all services which are currently running.
-
-        :returns: None
-
-        """
-        for service in self._services:
-            service.kill()
-
-    def wait(self):
-        """Waits until all services have been stopped, and then returns.
-
-        :returns: None
-
-        """
-        for service in self._services:
-            try:
-                service.wait()
-            except greenlet.GreenletExit:
-                pass
-
-
-class Service(object):
-    """Service object for binaries running on hosts.
-
-    A service takes a manager and enables rpc by listening to queues based
-    on topic. It also periodically runs tasks on the manager and reports
-    it state to the database services table."""
-
-    def __init__(self, host, binary, topic, manager,
-                 periodic_interval=None, *args, **kwargs):
-        self.host = host
-        self.binary = binary
-        self.topic = topic
-        self.manager_class_name = manager
-        manager_class = importutils.import_class(self.manager_class_name)
-        self.manager = manager_class(host=self.host, *args, **kwargs)
-        self.periodic_interval = periodic_interval
-        super(Service, self).__init__(*args, **kwargs)
-        self.saved_args, self.saved_kwargs = args, kwargs
-        self.timers = []
-
-    def start(self):
-        vcs_string = version.version_string(type='long')
-        LOG.info(_('Starting %(topic)s node (version %(vcs_string)s)'),
-                  {'topic': self.topic, 'vcs_string': vcs_string})
-        # TODO do we need this ? -> utils.cleanup_file_locks()
-        self.manager.init_host()
-        self.model_disconnected = False
-        ctxt = context.get_admin_context()
-        # self._create_service_ref(ctxt)
-
-        self.conn = rpc.create_connection(new=True)
-        LOG.debug(_("Creating Consumer connection for Service %s") %
-                  self.topic)
-
-        rpc_dispatcher = self.manager.create_rpc_dispatcher()
-
-        # Share this same connection for these Consumers
-        self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)
-
-        node_topic = '%s.%s' % (self.topic, self.host)
-        self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)
-
-        self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True)
-
-        # Consume from all consumers in a thread
-        self.conn.consume_in_thread()
-
-        if self.periodic_interval:
-            periodic = heat_utils.LoopingCall(self.periodic_tasks)
-            periodic.start(interval=self.periodic_interval, now=False)
-            self.timers.append(periodic)
-
-    def __getattr__(self, key):
-        manager = self.__dict__.get('manager', None)
-        return getattr(manager, key)
-
-    @classmethod
-    def create(cls, host=None, binary=None, topic=None, manager=None,
-               periodic_interval=None, config=None):
-        """Instantiates class and passes back application object.
-
-        :param host: defaults to cfg.CONF.host
-        :param binary: defaults to basename of executable
-        :param topic: defaults to bin_name - 'heat-' part
-        :param manager: defaults to cfg.CONF.<topic>_manager
-        :param periodic_interval: defaults to cfg.CONF.periodic_interval
-
-        """
-        if not host:
-            host = cfg.CONF.host
-        if not binary:
-            binary = os.path.basename(inspect.stack()[-1][1])
-        if not topic:
-            topic = binary.rpartition('heat-')[2]
-        if not manager:
-            manager = cfg.CONF.get('%s_manager' % topic, None)
-        if not periodic_interval:
-            periodic_interval = cfg.CONF.periodic_interval
-        service_obj = cls(host, binary, topic, manager,
-                          periodic_interval)
-
-        return service_obj
-
-    def kill(self):
-        self.stop()
-
-    def stop(self):
-        # Try to shut the connection down, but if we get any sort of
-        # errors, go ahead and ignore them.. as we're shutting down anyway
-        try:
-            self.conn.close()
-        except Exception:
-            pass
-        for x in self.timers:
-            try:
-                x.stop()
-            except Exception:
-                pass
-        self.timers = []
-
-    def wait(self):
-        for x in self.timers:
-            try:
-                x.wait()
-            except Exception:
-                pass
-
-    def periodic_tasks(self, raise_on_error=False):
-        """Tasks to be run at a periodic interval."""
-        ctxt = context.get_admin_context()
-        self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
-
-
-# NOTE(vish): the global launcher is to maintain the existing
-#             functionality of calling service.serve +
-#             service.wait
-_launcher = None
-
-
-def serve(*servers):
-    global _launcher
-    if not _launcher:
-        _launcher = Launcher()
-    for server in servers:
-        _launcher.launch_server(server)
-
-
-def wait():
-    LOG.debug(_('Full set of CONF:'))
-    for flag in cfg.CONF:
-        flag_get = cfg.CONF.get(flag, None)
-        # hide flag contents from log if contains a password
-        # should use secret flag when switch over to openstack-common
-        if ("_password" in flag or "_key" in flag or
-                (flag == "sql_connection" and "mysql:" in flag_get)):
-            LOG.debug(_('%(flag)s : FLAG SET ') % locals())
-        else:
-            LOG.debug('%(flag)s : %(flag_get)s' % locals())
-    try:
-        _launcher.wait()
-        rpc.cleanup()
-    except KeyboardInterrupt:
-        rpc.cleanup()
-        _launcher.stop()
index cf2ed8d4443afc3a3c81f27880e3135d815f21de..e9e9d81fa7ec510c00a986c466280eedc6ceeb05 100644 (file)
 
 import util
 import verify
-import nose
 from nose.plugins.attrib import attr
 
-from heat.common import context
-from heat.engine import manager
 import unittest
 import os
 
index 9e369ef88ba4332383f83455aae1583ed742aa14..03e88a7176487441a6f940486295a7ea309e4073 100644 (file)
 
 import util
 import verify
-import nose
 from nose.plugins.attrib import attr
 
-from heat.common import context
-from heat.engine import manager
 import unittest
 import os
 
index 29db9786e493503f176dc70e22b0b6477d703c22..dd1cbf4e20696fb61a1dc2a0ae338a9574f69038 100644 (file)
 
 import util
 import verify
-import nose
 from nose.plugins.attrib import attr
 
-from heat.common import context
-from heat.engine import manager
 import unittest
 import os
 
index d0c74e500d09b3b0e5fb5c76028fa42f05fbd93c..2102019d2fd81c42bc4f9eb3601363aa22ceef00 100644 (file)
 
 import util
 import verify
-import nose
 from nose.plugins.attrib import attr
 
-from heat.common import context
-from heat.engine import manager
 import unittest
 import os
 
index ce0fd8fae91363fcdb4a8b14482b26b966e0d747..da70be9d3cd844378657689c02de5608c21fe0ba 100644 (file)
 
 import util
 import verify
-import nose
 from nose.plugins.attrib import attr
 
-from heat.common import context
-from heat.engine import manager
 import unittest
 import os
 
similarity index 94%
rename from heat/tests/test_engine_manager.py
rename to heat/tests/test_engine_service.py
index a095192c87cabd64c1e78d37975b10a50d3f9b3e..c19970a533d26d36919e41389db94bf43afaf2dc 100644 (file)
@@ -20,19 +20,17 @@ import nose
 import unittest
 import mox
 import json
-import sqlalchemy
 from nose.plugins.attrib import attr
-from nose import with_setup
 
 from heat.common import context
 from heat.tests.v1_1 import fakes
 import heat.engine.api as engine_api
 import heat.db as db_api
 from heat.engine import parser
-from heat.engine import manager
-from heat.engine import auth
+from heat.engine import service
 from heat.engine.resources import instance as instances
 from heat.engine import watchrule
+from heat.openstack.common import threadgroup
 
 
 tests_dir = os.path.dirname(os.path.realpath(__file__))
@@ -81,8 +79,14 @@ def setup_mocks(mocks, stack):
                       meta=None).AndReturn(fc.servers.list()[-1])
 
 
-class DummyGreenThread():
-    def link(self, gt, **kwargs):
+class DummyThreadGroup(object):
+    def add_thread(self, callback, *args, **kwargs):
+        pass
+
+    def stop(self):
+        pass
+
+    def wait(self):
         pass
 
 
@@ -128,23 +132,23 @@ class stackCreateTest(unittest.TestCase):
         self.assertEqual(db_s.status, 'DELETE_COMPLETE')
 
 
-@attr(tag=['unit', 'engine-api', 'engine-manager'])
+@attr(tag=['unit', 'engine-api', 'engine-service'])
 @attr(speed='fast')
-class stackManagerCreateUpdateDeleteTest(unittest.TestCase):
+class stackServiceCreateUpdateDeleteTest(unittest.TestCase):
 
     def setUp(self):
         self.m = mox.Mox()
-        self.username = 'stack_manager_create_test_user'
-        self.tenant = 'stack_manager_create_test_tenant'
+        self.username = 'stack_service_create_test_user'
+        self.tenant = 'stack_service_create_test_tenant'
         self.ctx = create_context(self.m, self.username, self.tenant)
 
-        self.man = manager.EngineManager()
+        self.man = service.EngineService('a-host', 'a-topic')
 
     def tearDown(self):
         self.m.UnsetStubs()
 
     def test_stack_create(self):
-        stack_name = 'manager_create_test_stack'
+        stack_name = 'service_create_test_stack'
         params = {'foo': 'bar'}
         template = '{ "Template": "data" }'
 
@@ -164,8 +168,9 @@ class stackManagerCreateUpdateDeleteTest(unittest.TestCase):
         self.m.StubOutWithMock(stack, 'validate')
         stack.validate().AndReturn(None)
 
-        self.m.StubOutWithMock(manager.greenpool, 'spawn')
-        manager.greenpool.spawn(stack.create).AndReturn(DummyGreenThread())
+        self.m.StubOutWithMock(threadgroup, 'ThreadGroup')
+        name_match = mox.StrContains(stack.name)
+        threadgroup.ThreadGroup(name_match).AndReturn(DummyThreadGroup())
 
         self.m.ReplayAll()
 
@@ -177,7 +182,7 @@ class stackManagerCreateUpdateDeleteTest(unittest.TestCase):
         self.m.VerifyAll()
 
     def test_stack_create_verify_err(self):
-        stack_name = 'manager_create_verify_err_test_stack'
+        stack_name = 'service_create_verify_err_test_stack'
         params = {'foo': 'bar'}
         template = '{ "Template": "data" }'
 
@@ -198,7 +203,8 @@ class stackManagerCreateUpdateDeleteTest(unittest.TestCase):
         error = 'fubar'
         stack.validate().AndReturn(error)
 
-        self.m.StubOutWithMock(manager.greenpool, 'spawn')
+        #self.m.StubOutWithMock(threadgroup, 'ThreadGroup')
+        #threadgroup.ThreadGroup(stack_name).AndReturn(DummyThreadGroup())
 
         self.m.ReplayAll()
 
@@ -208,15 +214,15 @@ class stackManagerCreateUpdateDeleteTest(unittest.TestCase):
         self.m.VerifyAll()
 
     def test_stack_delete(self):
-        stack_name = 'manager_delete_test_stack'
+        stack_name = 'service_delete_test_stack'
         stack = get_wordpress_stack(stack_name, self.ctx)
         stack.store()
 
         self.m.StubOutWithMock(parser.Stack, 'load')
-        self.m.StubOutWithMock(manager.greenpool, 'spawn_n')
+        #self.m.StubOutWithMock(threadgroup, 'ThreadGroup')
+        #threadgroup.ThreadGroup(stack_name).AndReturn(DummyThreadGroup())
 
         parser.Stack.load(self.ctx, stack.id).AndReturn(stack)
-        manager.greenpool.spawn_n(stack.delete)
 
         self.m.ReplayAll()
 
@@ -225,7 +231,7 @@ class stackManagerCreateUpdateDeleteTest(unittest.TestCase):
         self.m.VerifyAll()
 
     def test_stack_delete_nonexist(self):
-        stack_name = 'manager_delete_nonexist_test_stack'
+        stack_name = 'service_delete_nonexist_test_stack'
         stack = get_wordpress_stack(stack_name, self.ctx)
 
         self.m.ReplayAll()
@@ -236,7 +242,7 @@ class stackManagerCreateUpdateDeleteTest(unittest.TestCase):
         self.m.VerifyAll()
 
     def test_stack_update(self):
-        stack_name = 'manager_update_test_stack'
+        stack_name = 'service_update_test_stack'
         params = {'foo': 'bar'}
         template = '{ "Template": "data" }'
 
@@ -262,10 +268,6 @@ class stackManagerCreateUpdateDeleteTest(unittest.TestCase):
         self.m.StubOutWithMock(stack, 'validate')
         stack.validate().AndReturn(None)
 
-        self.m.StubOutWithMock(manager.greenpool, 'spawn')
-        manager.greenpool.spawn(old_stack.update, stack).AndReturn(
-                                DummyGreenThread())
-
         self.m.ReplayAll()
 
         result = self.man.update_stack(self.ctx, old_stack.identifier(),
@@ -276,7 +278,7 @@ class stackManagerCreateUpdateDeleteTest(unittest.TestCase):
         self.m.VerifyAll()
 
     def test_stack_update_verify_err(self):
-        stack_name = 'manager_update_verify_err_test_stack'
+        stack_name = 'service_update_verify_err_test_stack'
         params = {'foo': 'bar'}
         template = '{ "Template": "data" }'
 
@@ -303,8 +305,6 @@ class stackManagerCreateUpdateDeleteTest(unittest.TestCase):
         error = 'fubar'
         stack.validate().AndReturn(error)
 
-        self.m.StubOutWithMock(manager.greenpool, 'spawn')
-
         self.m.ReplayAll()
 
         result = self.man.update_stack(self.ctx, old_stack.identifier(),
@@ -313,7 +313,7 @@ class stackManagerCreateUpdateDeleteTest(unittest.TestCase):
         self.m.VerifyAll()
 
     def test_stack_update_nonexist(self):
-        stack_name = 'manager_update_nonexist_test_stack'
+        stack_name = 'service_update_nonexist_test_stack'
         params = {'foo': 'bar'}
         template = '{ "Template": "data" }'
         stack = get_wordpress_stack(stack_name, self.ctx)
@@ -326,16 +326,16 @@ class stackManagerCreateUpdateDeleteTest(unittest.TestCase):
         self.m.VerifyAll()
 
 
-@attr(tag=['unit', 'engine-api', 'engine-manager'])
+@attr(tag=['unit', 'engine-api', 'engine-service'])
 @attr(speed='fast')
-class stackManagerTest(unittest.TestCase):
+class stackServiceTest(unittest.TestCase):
     @classmethod
     def setUpClass(cls):
         m = mox.Mox()
-        cls.username = 'stack_manager_test_user'
-        cls.tenant = 'stack_manager_test_tenant'
+        cls.username = 'stack_service_test_user'
+        cls.tenant = 'stack_service_test_tenant'
         ctx = create_context(m, cls.username, cls.tenant)
-        cls.stack_name = 'manager_test_stack'
+        cls.stack_name = 'service_test_stack'
 
         stack = get_wordpress_stack(cls.stack_name, ctx)
         setup_mocks(m, stack)
@@ -366,7 +366,7 @@ class stackManagerTest(unittest.TestCase):
         setup_mocks(self.m, self.stack)
         self.m.ReplayAll()
 
-        self.man = manager.EngineManager()
+        self.man = service.EngineService('a-host', 'a-topic')
 
     def tearDown(self):
         self.m.UnsetStubs()
index 9f504869817ea2e989cb402402cc5690144f9d98..3cbd4642e4711b447720463d57f07449cc3289e1 100644 (file)
 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 #    License for the specific language governing permissions and limitations
 #    under the License.
-
-
 import sys
-import os
-
 import nose
 import unittest
 import mox
 import json
-import sqlalchemy
 
 from nose.plugins.attrib import attr
-from nose import with_setup
 
 from heat.tests.v1_1 import fakes
 from heat.engine.resources import instance as instances
-from heat.engine.resources import volume as volumes
-from heat.engine import manager as managers
+from heat.engine import service
 import heat.db as db_api
 from heat.engine import parser
-from heat.engine import auth
 
 test_template_volumeattach = '''
 {
@@ -257,8 +249,8 @@ class validateTest(unittest.TestCase):
         instances.Instance.nova().AndReturn(self.fc)
         self.m.ReplayAll()
 
-        manager = managers.EngineManager()
-        res = dict(manager.
+        engine = service.EngineService('a', 't')
+        res = dict(engine.
             validate_template(None, t))
         print 'res %s' % res
         self.assertEqual(res['Description'], 'test.')
@@ -270,8 +262,8 @@ class validateTest(unittest.TestCase):
         instances.Instance.nova().AndReturn(self.fc)
         self.m.ReplayAll()
 
-        manager = managers.EngineManager()
-        res = dict(manager.
+        engine = service.EngineService('a', 't')
+        res = dict(engine.
             validate_template(None, t))
         self.assertNotEqual(res['Description'], 'Successfully validated')
 
@@ -282,8 +274,8 @@ class validateTest(unittest.TestCase):
         instances.Instance.nova().AndReturn(self.fc)
         self.m.ReplayAll()
 
-        manager = managers.EngineManager()
-        res = dict(manager.
+        engine = service.EngineService('a', 't')
+        res = dict(engine.
             validate_template(None, t))
         self.assertEqual(res['Description'], 'test.')
 
@@ -294,8 +286,8 @@ class validateTest(unittest.TestCase):
         instances.Instance.nova().AndReturn(self.fc)
         self.m.ReplayAll()
 
-        manager = managers.EngineManager()
-        res = dict(manager.
+        engine = service.EngineService('a', 't')
+        res = dict(engine.
             validate_template(None, t))
         self.assertNotEqual(res['Description'], 'Successfully validated')