]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Copy-paste RPC Service class for backwards compatibility
authorIhar Hrachyshka <ihrachys@redhat.com>
Mon, 2 Jun 2014 15:44:20 +0000 (17:44 +0200)
committerIhar Hrachyshka <ihrachys@redhat.com>
Mon, 16 Jun 2014 10:44:22 +0000 (12:44 +0200)
blueprint oslo-messaging

Change-Id: Ie48de6d3636d6404316f19d73c7e8453298ecf14

neutron/common/rpc_compat.py
neutron/service.py
neutron/services/loadbalancer/agent/agent.py
neutron/tests/unit/services/loadbalancer/agent/test_agent.py

index 2e95ee2bd52b3251afd20e2dbe1564d4a22da128..bf8d40c35c63487a3815a9549436163776f9946a 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+from neutron.openstack.common import log as logging
+from neutron.openstack.common import rpc
 from neutron.openstack.common.rpc import common as rpc_common
+from neutron.openstack.common.rpc import dispatcher as rpc_dispatcher
 from neutron.openstack.common.rpc import proxy
+from neutron.openstack.common import service
+
+
+LOG = logging.getLogger(__name__)
 
 
 class RpcProxy(proxy.RpcProxy):
@@ -35,6 +42,57 @@ class RpcCallback(object):
     '''
 
 
+class Service(service.Service):
+    """Service object for binaries running on hosts.
+
+    A service enables rpc by listening to queues based on topic and host.
+    """
+    def __init__(self, host, topic, manager=None, serializer=None):
+        super(Service, self).__init__()
+        self.host = host
+        self.topic = topic
+        self.serializer = serializer
+        if manager is None:
+            self.manager = self
+        else:
+            self.manager = manager
+
+    def start(self):
+        super(Service, self).start()
+
+        self.conn = rpc.create_connection(new=True)
+        LOG.debug("Creating Consumer connection for Service %s" %
+                  self.topic)
+
+        dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],
+                                                  self.serializer)
+
+        # Share this same connection for these Consumers
+        self.conn.create_consumer(self.topic, dispatcher, fanout=False)
+
+        node_topic = '%s.%s' % (self.topic, self.host)
+        self.conn.create_consumer(node_topic, dispatcher, fanout=False)
+
+        self.conn.create_consumer(self.topic, dispatcher, fanout=True)
+
+        # Hook to allow the manager to do other initializations after
+        # the rpc connection is created.
+        if callable(getattr(self.manager, 'initialize_service_hook', None)):
+            self.manager.initialize_service_hook(self)
+
+        # Consume from all consumers in a thread
+        self.conn.consume_in_thread()
+
+    def stop(self):
+        # Try to shut the connection down, but if we get any sort of
+        # errors, go ahead and ignore them.. as we're shutting down anyway
+        try:
+            self.conn.close()
+        except Exception:
+            pass
+        super(Service, self).stop()
+
+
 # exceptions
 RPCException = rpc_common.RPCException
 RemoteError = rpc_common.RemoteError
index a3224545bf334f60e2dfd5406f008fdad22d2769..9b3073b5fb4ded9990a7fa9228304c9ab17b7d9c 100644 (file)
@@ -22,6 +22,7 @@ import random
 from oslo.config import cfg
 
 from neutron.common import config
+from neutron.common import rpc_compat
 from neutron import context
 from neutron.db import api as session
 from neutron import manager
@@ -29,7 +30,6 @@ from neutron.openstack.common import excutils
 from neutron.openstack.common import importutils
 from neutron.openstack.common import log as logging
 from neutron.openstack.common import loopingcall
-from neutron.openstack.common.rpc import service
 from neutron.openstack.common import service as common_service
 from neutron import wsgi
 
@@ -178,7 +178,7 @@ def _run_wsgi(app_name):
     return server
 
 
-class Service(service.Service):
+class Service(rpc_compat.Service):
     """Service object for binaries running on hosts.
 
     A service takes a manager and enables rpc by listening to queues based
index 0ec14554d9b8f419b71bb74c136aba146938e6e3..7a830c63150b6508cb71eb1a77aa9f899f61f0e6 100644 (file)
@@ -23,8 +23,8 @@ from oslo.config import cfg
 
 from neutron.agent.common import config
 from neutron.agent.linux import interface
+from neutron.common import rpc_compat
 from neutron.common import topics
-from neutron.openstack.common.rpc import service as rpc_service
 from neutron.openstack.common import service
 from neutron.services.loadbalancer.agent import agent_manager as manager
 
@@ -37,7 +37,7 @@ OPTS = [
 ]
 
 
-class LbaasAgentService(rpc_service.Service):
+class LbaasAgentService(rpc_compat.Service):
     def start(self):
         super(LbaasAgentService, self).start()
         self.tg.add_timer(
index b00c5233008c935789b363a63bf260a47debd6ed..955d6e1a842c4e7a9a045bef5c2b01c6e3ed8ca1 100644 (file)
@@ -27,7 +27,7 @@ from neutron.tests import base
 class TestLbaasService(base.BaseTestCase):
     def test_start(self):
         with mock.patch.object(
-            agent.rpc_service.Service, 'start'
+            agent.rpc_compat.Service, 'start'
         ) as mock_start:
 
             mgr = mock.Mock()