]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Modify dhcp agent for agent management extension
authorgongysh <gongysh@linux.vnet.ibm.com>
Mon, 18 Feb 2013 08:42:34 +0000 (16:42 +0800)
committergongysh <gongysh@linux.vnet.ibm.com>
Tue, 19 Feb 2013 12:28:02 +0000 (20:28 +0800)
2nd part of blueprint quantum-scheduler

Remove openstack openstack listener on DHCP agent side.
Add DHCPagent notifier on quantum server side.

Change-Id: I196691650a99ba865bf06081a1fc4546f9fac7bd

12 files changed:
etc/dhcp_agent.ini
etc/quantum.conf
quantum/agent/dhcp_agent.py
quantum/agent/rpc.py
quantum/api/rpc/__init__.py [new file with mode: 0644]
quantum/api/rpc/agentnotifiers/__init__.py [new file with mode: 0644]
quantum/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py [new file with mode: 0644]
quantum/api/v2/base.py
quantum/common/config.py
quantum/common/topics.py
quantum/tests/unit/test_agent_rpc.py
quantum/tests/unit/test_dhcp_agent.py

index a3c798f3859351974af3c6a267f335ea33eeb553..fd8d646610c8a2960963ba8ae18961c9fc9de6a3 100644 (file)
@@ -36,6 +36,7 @@ dhcp_driver = quantum.agent.linux.dhcp.Dnsmasq
 # be activated when the subnet gateway_ip is None.  The guest instance must
 # be configured to request host routes via DHCP (Option 121).
 # enable_isolated_metadata = False
+
 # Allows for serving metadata requests coming from a dedicated metadata
 # access network whose cidr is 169.254.169.254/16 (or larger prefix), and
 # is connected to a Quantum router from which the VMs send metadata
@@ -43,3 +44,6 @@ dhcp_driver = quantum.agent.linux.dhcp.Dnsmasq
 # they will be able to reach 169.254.169.254 through a router.
 # This option requires enable_isolated_metadata = True
 # enable_metadata_network = False
+
+# The Quantum DHCP agent manager.
+# dhcp_agent_manager = quantum.agent.dhcp_agent.DhcpAgentWithStateReport
index ebf2343d864a59791087eb6756fc6db6627040c4..2de2687523ff388cda6e66103fe4f448cf9fa9b1 100644 (file)
@@ -64,6 +64,9 @@ api_paste_config = api-paste.ini
 # DHCP Lease duration (in seconds)
 # dhcp_lease_duration = 120
 
+# Allow sending resource operation notification to DHCP agent
+# dhcp_agent_notification = True
+
 # Enable or disable bulk create/update/delete operations
 # allow_bulk = True
 # Enable or disable overlapping IPs for subnets
index 3ea81a87d39dafcf3c88b57c681c708073f5dda9..333fa86c3449d140ce238a4a6175eedeea0856f6 100644 (file)
@@ -33,11 +33,15 @@ from quantum.common import constants
 from quantum.common import exceptions
 from quantum.common import topics
 from quantum import context
+from quantum import manager
 from quantum.openstack.common import importutils
 from quantum.openstack.common import jsonutils
 from quantum.openstack.common import log as logging
+from quantum.openstack.common import loopingcall
 from quantum.openstack.common.rpc import proxy
+from quantum.openstack.common import service
 from quantum.openstack.common import uuidutils
+from quantum import service as quantum_service
 
 LOG = logging.getLogger(__name__)
 NS_PREFIX = 'qdhcp-'
@@ -46,7 +50,7 @@ METADATA_DEFAULT_IP = '169.254.169.254/%d' % METADATA_DEFAULT_PREFIX
 METADATA_PORT = 80
 
 
-class DhcpAgent(object):
+class DhcpAgent(manager.Manager):
     OPTS = [
         cfg.IntOpt('resync_interval', default=5,
                    help=_("Interval to resync.")),
@@ -60,29 +64,34 @@ class DhcpAgent(object):
         cfg.BoolOpt('enable_metadata_network', default=False,
                     help=_("Allows for serving metadata requests from a "
                            "dedicate network. Requires "
-                           "enable isolated_metadata = True "))
+                           "enable isolated_metadata = True ")),
+        cfg.StrOpt('dhcp_agent_manager',
+                   default='quantum.agent.dhcp_agent.'
+                   'DhcpAgentWithStateReport',
+                   help=_("The Quantum DHCP agent manager.")),
     ]
 
-    def __init__(self, conf):
+    def __init__(self, host=None):
+        super(DhcpAgent, self).__init__(host=host)
         self.needs_resync = False
-        self.conf = conf
+        self.conf = cfg.CONF
         self.cache = NetworkCache()
-        self.root_helper = config.get_root_helper(conf)
-
-        self.dhcp_driver_cls = importutils.import_class(conf.dhcp_driver)
+        self.root_helper = config.get_root_helper(self.conf)
+        self.dhcp_driver_cls = importutils.import_class(self.conf.dhcp_driver)
         ctx = context.get_admin_context_without_session()
         self.plugin_rpc = DhcpPluginApi(topics.PLUGIN, ctx)
-
         self.device_manager = DeviceManager(self.conf, self.plugin_rpc)
-        self.notifications = agent_rpc.NotificationDispatcher()
         self.lease_relay = DhcpLeaseRelay(self.update_lease)
 
+    def after_start(self):
+        self.run()
+        LOG.info(_("DHCP agent started"))
+
     def run(self):
         """Activate the DHCP agent."""
         self.sync_state()
         self.periodic_resync()
         self.lease_relay.start()
-        self.notifications.run_dispatch(self)
 
     def _ns_name(self, network):
         if self.conf.use_namespaces:
@@ -199,12 +208,12 @@ class DhcpAgent(object):
         else:
             self.disable_dhcp_helper(network.id)
 
-    def network_create_end(self, payload):
+    def network_create_end(self, context, payload):
         """Handle the network.create.end notification event."""
         network_id = payload['network']['id']
         self.enable_dhcp_helper(network_id)
 
-    def network_update_end(self, payload):
+    def network_update_end(self, context, payload):
         """Handle the network.update.end notification event."""
         network_id = payload['network']['id']
         if payload['network']['admin_state_up']:
@@ -212,11 +221,11 @@ class DhcpAgent(object):
         else:
             self.disable_dhcp_helper(network_id)
 
-    def network_delete_end(self, payload):
+    def network_delete_end(self, context, payload):
         """Handle the network.delete.end notification event."""
         self.disable_dhcp_helper(payload['network_id'])
 
-    def subnet_update_end(self, payload):
+    def subnet_update_end(self, context, payload):
         """Handle the subnet.update.end notification event."""
         network_id = payload['subnet']['network_id']
         self.refresh_dhcp_helper(network_id)
@@ -224,14 +233,14 @@ class DhcpAgent(object):
     # Use the update handler for the subnet create event.
     subnet_create_end = subnet_update_end
 
-    def subnet_delete_end(self, payload):
+    def subnet_delete_end(self, context, payload):
         """Handle the subnet.delete.end notification event."""
         subnet_id = payload['subnet_id']
         network = self.cache.get_network_by_subnet_id(subnet_id)
         if network:
             self.refresh_dhcp_helper(network.id)
 
-    def port_update_end(self, payload):
+    def port_update_end(self, context, payload):
         """Handle the port.update.end notification event."""
         port = DictModel(payload['port'])
         network = self.cache.get_network_by_id(port.network_id)
@@ -242,7 +251,7 @@ class DhcpAgent(object):
     # Use the update handler for the port create event.
     port_create_end = port_update_end
 
-    def port_delete_end(self, payload):
+    def port_delete_end(self, context, payload):
         """Handle the port.delete.end notification event."""
         port = self.cache.get_port_by_id(payload['port_id'])
         if port:
@@ -434,6 +443,19 @@ class NetworkCache(object):
                 if port.id == port_id:
                     return port
 
+    def get_state(self):
+        net_ids = self.get_network_ids()
+        num_nets = len(net_ids)
+        num_subnets = 0
+        num_ports = 0
+        for net_id in net_ids:
+            network = self.get_network_by_id(net_id)
+            num_subnets += len(network.subnets)
+            num_ports += len(network.ports)
+        return {'networks': num_nets,
+                'subnets': num_subnets,
+                'ports': num_ports}
+
 
 class DeviceManager(object):
     OPTS = [
@@ -626,9 +648,46 @@ class DhcpLeaseRelay(object):
         eventlet.spawn(eventlet.serve, listener, self._handler)
 
 
+class DhcpAgentWithStateReport(DhcpAgent):
+    def __init__(self, host=None):
+        super(DhcpAgentWithStateReport, self).__init__(host=host)
+        self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
+        self.agent_state = {
+            'binary': 'quantum-dhcp-agent',
+            'host': host,
+            'topic': topics.DHCP_AGENT,
+            'configurations': {
+                'dhcp_driver': cfg.CONF.dhcp_driver,
+                'use_namespaces': cfg.CONF.use_namespaces,
+                'dhcp_lease_time': cfg.CONF.dhcp_lease_time},
+            'start_flag': True,
+            'agent_type': constants.AGENT_TYPE_DHCP}
+        report_interval = cfg.CONF.AGENT.report_interval
+        if report_interval:
+            heartbeat = loopingcall.LoopingCall(self._report_state)
+            heartbeat.start(interval=report_interval)
+
+    def _report_state(self):
+        try:
+            self.agent_state.get('configurations').update(
+                self.cache.get_state())
+            ctx = context.get_admin_context_without_session()
+            self.state_rpc.report_state(ctx,
+                                        self.agent_state)
+        except Exception:
+            LOG.exception(_("Failed reporting state!"))
+            return
+        if self.agent_state.pop('start_flag', None):
+            self.run()
+
+    def after_start(self):
+        LOG.info(_("DHCP agent started"))
+
+
 def main():
     eventlet.monkey_patch()
     cfg.CONF.register_opts(DhcpAgent.OPTS)
+    config.register_agent_state_opts_helper(cfg.CONF)
     config.register_root_helper(cfg.CONF)
     cfg.CONF.register_opts(DeviceManager.OPTS)
     cfg.CONF.register_opts(DhcpLeaseRelay.OPTS)
@@ -636,6 +695,8 @@ def main():
     cfg.CONF.register_opts(interface.OPTS)
     cfg.CONF(project='quantum')
     config.setup_logging(cfg.CONF)
-
-    mgr = DhcpAgent(cfg.CONF)
-    mgr.run()
+    server = quantum_service.Service.create(
+        binary='quantum-dhcp-agent',
+        topic=topics.DHCP_AGENT,
+        report_interval=cfg.CONF.AGENT.report_interval)
+    service.launch(server).wait()
index 97d9c800f4b7f860135389eb9f26235f9840041b..4f33fb2dd1d0cc899877d3b984c4b69555a0ad7b 100644 (file)
@@ -100,34 +100,3 @@ class PluginApi(proxy.RpcProxy):
         return self.call(context,
                          self.make_msg('tunnel_sync', tunnel_ip=tunnel_ip),
                          topic=self.topic)
-
-
-class NotificationDispatcher(object):
-    def __init__(self):
-        # Set the Queue size to 1 so that messages stay on server rather than
-        # being buffered in the process.
-        self.queue = eventlet.queue.Queue(1)
-        self.connection = rpc.create_connection(new=True)
-        topic = '%s.%s' % (rpc_notifier.CONF.notification_topics[0],
-                           api.CONF.default_notification_level.lower())
-        queue_name = 'notification_listener_%s' % uuidutils.generate_uuid()
-        self.connection.declare_topic_consumer(topic=topic,
-                                               queue_name=queue_name,
-                                               callback=self._add_to_queue)
-        self.connection.consume_in_thread()
-
-    def _add_to_queue(self, msg):
-        self.queue.put(msg)
-
-    def run_dispatch(self, handler):
-        while True:
-            msg = self.queue.get()
-            name = msg['event_type'].replace('.', '_')
-
-            try:
-                if hasattr(handler, name):
-                    getattr(handler, name)(msg['payload'])
-                else:
-                    LOG.debug(_('Unknown event_type: %s.'), msg['event_type'])
-            except Exception, e:
-                LOG.warn(_('Error processing message. Exception: %s'), e)
diff --git a/quantum/api/rpc/__init__.py b/quantum/api/rpc/__init__.py
new file mode 100644 (file)
index 0000000..c6fab2e
--- /dev/null
@@ -0,0 +1,14 @@
+# Copyright (c) 2013 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/quantum/api/rpc/agentnotifiers/__init__.py b/quantum/api/rpc/agentnotifiers/__init__.py
new file mode 100644 (file)
index 0000000..c6fab2e
--- /dev/null
@@ -0,0 +1,14 @@
+# Copyright (c) 2013 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/quantum/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py b/quantum/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py
new file mode 100644 (file)
index 0000000..5cf0369
--- /dev/null
@@ -0,0 +1,70 @@
+# Copyright (c) 2013 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from quantum.common import topics
+from quantum.openstack.common import log as logging
+from quantum.openstack.common.rpc import proxy
+
+
+LOG = logging.getLogger(__name__)
+
+
+class DhcpAgentNotifyAPI(proxy.RpcProxy):
+    """API for plugin to notify DHCP agent."""
+    BASE_RPC_API_VERSION = '1.0'
+    # It seems dhcp agent does not support bulk operation
+    VALID_RESOURCES = ['network', 'subnet', 'port']
+    VALID_METHOD_NAMES = ['network.create.end',
+                          'network.update.end',
+                          'network.delete.end',
+                          'subnet.create.end',
+                          'subnet.update.end',
+                          'subnet.delete.end',
+                          'port.create.end',
+                          'port.update.end',
+                          'port.delete.end']
+
+    def __init__(self, topic=topics.DHCP_AGENT):
+        super(DhcpAgentNotifyAPI, self).__init__(
+            topic=topic, default_version=self.BASE_RPC_API_VERSION)
+
+    def _notification(self, context, method, payload):
+        """Notify all the agents that are hosting the network"""
+        # By now, we have no scheduling feature, so we fanout
+        # to all of the DHCP agents
+        self._notification_fanout(context, method, payload)
+
+    def _notification_fanout(self, context, method, payload):
+        """Fanout the payload to all dhcp agents"""
+        self.fanout_cast(
+            context, self.make_msg(method,
+                                   payload=payload),
+            topic=topics.DHCP_AGENT)
+
+    def notify(self, context, data, methodname):
+        # data is {'key' : 'value'} with only one key
+        if methodname not in self.VALID_METHOD_NAMES:
+            return
+        obj_type = data.keys()[0]
+        if obj_type not in self.VALID_RESOURCES:
+            return
+        obj_value = data[obj_type]
+        methodname = methodname.replace(".", "_")
+        if methodname.endswith("_delete_end"):
+            if 'id' in obj_value:
+                self._notification(context, methodname,
+                                   {obj_type + '_id': obj_value['id']})
+        else:
+            self._notification(context, methodname, data)
index e5620047933054a07bd025276be176b6b61a819c..463fb77f554ab4d5d541a26943e51393de167347 100644 (file)
@@ -18,6 +18,9 @@
 import netaddr
 import webob.exc
 
+from oslo.config import cfg
+
+from quantum.api.rpc.agentnotifiers import dhcp_rpc_agent_api
 from quantum.api.v2 import attributes
 from quantum.api.v2 import resource as wsgi_resource
 from quantum.common import exceptions
@@ -94,6 +97,7 @@ class Controller(object):
         self._policy_attrs = [name for (name, info) in self._attr_info.items()
                               if info.get('required_by_policy')]
         self._publisher_id = notifier_api.publisher_id('network')
+        self._dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
         self._member_actions = member_actions
 
         if parent:
@@ -193,6 +197,10 @@ class Controller(object):
             policy.enforce(request.context, action, obj, plugin=self._plugin)
         return obj
 
+    def _send_dhcp_notification(self, context, data, methodname):
+        if cfg.CONF.dhcp_agent_notification:
+            self._dhcp_agent_notifier.notify(context, data, methodname)
+
     def index(self, request, **kwargs):
         """Returns a list of the requested entity"""
         parent_id = kwargs.get(self._parent_id_name)
@@ -298,11 +306,15 @@ class Controller(object):
                                          **kwargs)
 
         def notify(create_result):
+            notifier_method = self._resource + '.create.end'
             notifier_api.notify(request.context,
                                 self._publisher_id,
-                                self._resource + '.create.end',
+                                notifier_method,
                                 notifier_api.CONF.default_notification_level,
                                 create_result)
+            self._send_dhcp_notification(request.context,
+                                         create_result,
+                                         notifier_method)
             return create_result
 
         kwargs = {self._parent_id_name: parent_id} if parent_id else {}
@@ -348,11 +360,16 @@ class Controller(object):
 
         obj_deleter = getattr(self._plugin, action)
         obj_deleter(request.context, id, **kwargs)
+        notifier_method = self._resource + '.delete.end'
         notifier_api.notify(request.context,
                             self._publisher_id,
-                            self._resource + '.delete.end',
+                            notifier_method,
                             notifier_api.CONF.default_notification_level,
                             {self._resource + '_id': id})
+        result = {self._resource: self._view(obj)}
+        self._send_dhcp_notification(request.context,
+                                     result,
+                                     notifier_method)
 
     def update(self, request, id, body=None, **kwargs):
         """Updates the specified entity's attributes"""
@@ -398,11 +415,15 @@ class Controller(object):
             kwargs[self._parent_id_name] = parent_id
         obj = obj_updater(request.context, id, **kwargs)
         result = {self._resource: self._view(obj)}
+        notifier_method = self._resource + '.update.end'
         notifier_api.notify(request.context,
                             self._publisher_id,
-                            self._resource + '.update.end',
+                            notifier_method,
                             notifier_api.CONF.default_notification_level,
                             result)
+        self._send_dhcp_notification(request.context,
+                                     result,
+                                     notifier_method)
         return result
 
     @staticmethod
index a33f319e97ba54de9701e4cea600daa149274fd5..c880a8fe0e329643e8140b9bbee034d13e770fe8 100644 (file)
@@ -62,6 +62,9 @@ core_opts = [
                help=_("Maximum number of host routes per subnet")),
     cfg.IntOpt('dhcp_lease_duration', default=120,
                help=_("DHCP lease duration")),
+    cfg.BoolOpt('dhcp_agent_notification', default=True,
+                help=_("Allow sending resource operation"
+                       " notification to DHCP agent")),
     cfg.BoolOpt('allow_overlapping_ips', default=False,
                 help=_("Allow overlapping IP support in Quantum")),
     cfg.StrOpt('host', default=utils.get_hostname(),
index c9f3b115c03576db4fb7da59b6a120b93eaa9716..91970f2f982c1997af380039f2be9a981736b53d 100644 (file)
@@ -27,6 +27,7 @@ PLUGIN = 'q-plugin'
 DHCP = 'q-dhcp-notifer'
 
 L3_AGENT = 'l3_agent'
+DHCP_AGENT = 'dhcp_agent'
 
 
 def get_topic_name(prefix, table, operation):
index e210d9d5d4e5d7550206c2201ebb14d943ef7aa9..d9a6ffd28facac632e22333750451612f4806abc 100644 (file)
@@ -62,84 +62,3 @@ class AgentRPCMethods(unittest.TestCase):
         with mock.patch(call_to_patch) as create_connection:
             conn = rpc.create_consumers(dispatcher, 'foo', [('topic', 'op')])
             create_connection.assert_has_calls(expected)
-
-
-class AgentRPCNotificationDispatcher(unittest.TestCase):
-    def setUp(self):
-        self.create_connection_p = mock.patch(
-            'quantum.openstack.common.rpc.create_connection')
-        self.create_connection = self.create_connection_p.start()
-        cfg.CONF.set_override('default_notification_level', 'INFO')
-        cfg.CONF.set_override('notification_topics', ['notifications'])
-
-    def tearDown(self):
-        self.create_connection_p.stop()
-        cfg.CONF.reset()
-
-    def test_init(self):
-        nd = rpc.NotificationDispatcher()
-
-        expected = [
-            mock.call(new=True),
-            mock.call().declare_topic_consumer(topic='notifications.info',
-                                               queue_name=mock.ANY,
-                                               callback=nd._add_to_queue),
-            mock.call().consume_in_thread()
-        ]
-        self.create_connection.assert_has_calls(expected)
-
-    def test_add_to_queue(self):
-        nd = rpc.NotificationDispatcher()
-        nd._add_to_queue('foo')
-        self.assertEqual(nd.queue.get(), 'foo')
-
-    def _test_run_dispatch_helper(self, msg, handler):
-        msgs = [msg]
-
-        def side_effect(*args):
-            return msgs.pop(0)
-
-        with mock.patch('eventlet.Queue.get') as queue_get:
-            queue_get.side_effect = side_effect
-            nd = rpc.NotificationDispatcher()
-            # catch the assertion so that the loop runs once
-            self.assertRaises(IndexError, nd.run_dispatch, handler)
-
-    def test_run_dispatch_once(self):
-        class SimpleHandler:
-            def __init__(self):
-                self.network_delete_end = mock.Mock()
-
-        msg = dict(event_type='network.delete.end',
-                   payload=dict(network_id='a'))
-
-        handler = SimpleHandler()
-        self._test_run_dispatch_helper(msg, handler)
-        handler.network_delete_end.called_once_with(msg['payload'])
-
-    def test_run_dispatch_missing_handler(self):
-        class SimpleHandler:
-            self.subnet_create_start = mock.Mock()
-
-        msg = dict(event_type='network.delete.end',
-                   payload=dict(network_id='a'))
-
-        handler = SimpleHandler()
-
-        with mock.patch('quantum.agent.rpc.LOG') as log:
-            self._test_run_dispatch_helper(msg, handler)
-            log.assert_has_calls([mock.call.debug(mock.ANY, mock.ANY)])
-
-    def test_run_dispatch_handler_raises(self):
-        class SimpleHandler:
-            def network_delete_end(self, payload):
-                raise Exception('foo')
-
-        msg = dict(event_type='network.delete.end',
-                   payload=dict(network_id='a'))
-
-        handler = SimpleHandler()
-
-        with mock.patch('quantum.agent.rpc.LOG') as log:
-            self._test_run_dispatch_helper(msg, handler)
-            log.assert_has_calls([mock.call.warn(mock.ANY, mock.ANY)])
index 0b6e52537851ccae2696cad09d0d36814067c778..b109f12e66a845d1114978a6995f777895e4d1db 100644 (file)
@@ -19,12 +19,15 @@ import socket
 import sys
 import uuid
 
+import eventlet
 import mock
 from oslo.config import cfg
 import unittest2 as unittest
 
 from quantum.agent.common import config
 from quantum.agent import dhcp_agent
+from quantum.agent.dhcp_agent import DhcpAgentWithStateReport
+from quantum.agent.linux import dhcp
 from quantum.agent.linux import interface
 from quantum.common import constants
 from quantum.common import exceptions
@@ -33,6 +36,7 @@ from quantum.openstack.common import jsonutils
 
 ROOTDIR = os.path.dirname(os.path.dirname(__file__))
 ETCDIR = os.path.join(ROOTDIR, 'etc')
+HOSTNAME = 'hostname'
 
 
 def etcdir(*p):
@@ -114,34 +118,65 @@ class TestDhcpAgent(unittest.TestCase):
         self.driver = mock.Mock(name='driver')
         self.driver_cls = self.driver_cls_p.start()
         self.driver_cls.return_value = self.driver
-        self.notification_p = mock.patch(
-            'quantum.agent.rpc.NotificationDispatcher')
-        self.notification = self.notification_p.start()
 
     def tearDown(self):
-        self.notification_p.stop()
         self.driver_cls_p.stop()
         cfg.CONF.reset()
 
-    def test_dhcp_agent_main(self):
+    def test_dhcp_agent_manager(self):
+        state_rpc_str = 'quantum.agent.rpc.PluginReportStateAPI'
+        lease_relay_str = 'quantum.agent.dhcp_agent.DhcpLeaseRelay'
+        with mock.patch.object(DhcpAgentWithStateReport,
+                               'sync_state',
+                               autospec=True) as mock_sync_state:
+            with mock.patch.object(DhcpAgentWithStateReport,
+                                   'periodic_resync',
+                                   autospec=True) as mock_periodic_resync:
+                with mock.patch(state_rpc_str) as state_rpc:
+                    with mock.patch(lease_relay_str) as mock_lease_relay:
+                        with mock.patch.object(sys, 'argv') as sys_argv:
+                            sys_argv.return_value = [
+                                'dhcp', '--config-file',
+                                etcdir('quantum.conf.test')]
+                            cfg.CONF.register_opts(dhcp_agent.DhcpAgent.OPTS)
+                            config.register_agent_state_opts_helper(cfg.CONF)
+                            config.register_root_helper(cfg.CONF)
+                            cfg.CONF.register_opts(
+                                dhcp_agent.DeviceManager.OPTS)
+                            cfg.CONF.register_opts(
+                                dhcp_agent.DhcpLeaseRelay.OPTS)
+                            cfg.CONF.register_opts(dhcp.OPTS)
+                            cfg.CONF.register_opts(interface.OPTS)
+                            cfg.CONF(project='quantum')
+                            agent_mgr = DhcpAgentWithStateReport('testhost')
+                            eventlet.greenthread.sleep(1)
+                            agent_mgr.after_start()
+                            mock_sync_state.assert_called_once_with(agent_mgr)
+                            mock_periodic_resync.assert_called_once_with(
+                                agent_mgr)
+                            state_rpc.assert_has_calls(
+                                [mock.call(mock.ANY),
+                                 mock.call().report_state(mock.ANY, mock.ANY)])
+                            mock_lease_relay.assert_has_calls(
+                                [mock.call(mock.ANY),
+                                 mock.call().start()])
+
+    def test_dhcp_agent_main_agent_manager(self):
         logging_str = 'quantum.agent.common.config.setup_logging'
-        manager_str = 'quantum.agent.dhcp_agent.DeviceManager'
-        agent_str = 'quantum.agent.dhcp_agent.DhcpAgent'
+        launcher_str = 'quantum.openstack.common.service.ServiceLauncher'
         with mock.patch(logging_str):
-            with mock.patch(manager_str) as dev_mgr:
-                with mock.patch(agent_str) as dhcp:
-                    with mock.patch.object(sys, 'argv') as sys_argv:
-                        sys_argv.return_value = ['dhcp', '--config-file',
-                                                 etcdir('quantum.conf.test')]
-                        dhcp_agent.main()
-                        dev_mgr.assert_called_once(mock.ANY, 'sudo')
-                        dhcp.assert_has_calls([
-                            mock.call(mock.ANY),
-                            mock.call().run()])
+            with mock.patch.object(sys, 'argv') as sys_argv:
+                with mock.patch(launcher_str) as launcher:
+                    sys_argv.return_value = ['dhcp', '--config-file',
+                                             etcdir('quantum.conf.test')]
+                    dhcp_agent.main()
+                    launcher.assert_has_calls(
+                        [mock.call(), mock.call().launch_service(mock.ANY),
+                         mock.call().wait()])
 
     def test_run_completes_single_pass(self):
         with mock.patch('quantum.agent.dhcp_agent.DeviceManager') as dev_mgr:
-            dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
+            dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
             attrs_to_mock = dict(
                 [(a, mock.DEFAULT) for a in
                  ['sync_state', 'lease_relay', 'periodic_resync']])
@@ -151,19 +186,18 @@ class TestDhcpAgent(unittest.TestCase):
                 mocks['periodic_resync'].assert_called_once_with()
                 mocks['lease_relay'].assert_has_mock_calls(
                     [mock.call.start()])
-                self.notification.assert_has_calls([mock.call.run_dispatch()])
 
     def test_ns_name(self):
         with mock.patch('quantum.agent.dhcp_agent.DeviceManager') as dev_mgr:
             mock_net = mock.Mock(id='foo')
-            dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
+            dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
             self.assertEqual(dhcp._ns_name(mock_net), 'qdhcp-foo')
 
     def test_ns_name_disabled_namespace(self):
         with mock.patch('quantum.agent.dhcp_agent.DeviceManager') as dev_mgr:
             cfg.CONF.set_override('use_namespaces', False)
             mock_net = mock.Mock(id='foo')
-            dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
+            dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
             self.assertIsNone(dhcp._ns_name(mock_net))
 
     def test_call_driver(self):
@@ -185,7 +219,7 @@ class TestDhcpAgent(unittest.TestCase):
         self.driver.return_value.foo.side_effect = Exception
         with mock.patch('quantum.agent.dhcp_agent.DeviceManager') as dev_mgr:
             with mock.patch.object(dhcp_agent.LOG, 'exception') as log:
-                dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
+                dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
                 self.assertIsNone(dhcp.call_driver('foo', network))
                 self.assertTrue(dev_mgr.called)
                 self.driver.assert_called_once_with(cfg.CONF,
@@ -198,7 +232,7 @@ class TestDhcpAgent(unittest.TestCase):
 
     def test_update_lease(self):
         with mock.patch('quantum.agent.dhcp_agent.DhcpPluginApi') as plug:
-            dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
+            dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
             dhcp.update_lease('net_id', '192.168.1.1', 120)
             plug.assert_has_calls(
                 [mock.call().update_lease_expiration(
@@ -209,7 +243,7 @@ class TestDhcpAgent(unittest.TestCase):
             plug.return_value.update_lease_expiration.side_effect = Exception
 
             with mock.patch.object(dhcp_agent.LOG, 'exception') as log:
-                dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
+                dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
                 dhcp.update_lease('net_id', '192.168.1.1', 120)
                 plug.assert_has_calls(
                     [mock.call().update_lease_expiration(
@@ -224,7 +258,7 @@ class TestDhcpAgent(unittest.TestCase):
             mock_plugin.get_active_networks.return_value = active_networks
             plug.return_value = mock_plugin
 
-            dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
+            dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
 
             attrs_to_mock = dict(
                 [(a, mock.DEFAULT) for a in
@@ -260,21 +294,21 @@ class TestDhcpAgent(unittest.TestCase):
             plug.return_value = mock_plugin
 
             with mock.patch.object(dhcp_agent.LOG, 'exception') as log:
-                dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
+                dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
                 dhcp.sync_state()
 
                 self.assertTrue(log.called)
                 self.assertTrue(dhcp.needs_resync)
 
     def test_periodic_resync(self):
-        dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
+        dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
         with mock.patch.object(dhcp_agent.eventlet, 'spawn') as spawn:
             dhcp.periodic_resync()
             spawn.assert_called_once_with(dhcp._periodic_resync_helper)
 
     def test_periodoc_resync_helper(self):
         with mock.patch.object(dhcp_agent.eventlet, 'sleep') as sleep:
-            dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
+            dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
             dhcp.needs_resync = True
             with mock.patch.object(dhcp, 'sync_state') as sync_state:
                 sync_state.side_effect = RuntimeError
@@ -293,9 +327,6 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
                               'quantum.agent.linux.interface.NullDriver')
         config.register_root_helper(cfg.CONF)
         cfg.CONF.register_opts(dhcp_agent.DhcpAgent.OPTS)
-        self.notification_p = mock.patch(
-            'quantum.agent.rpc.NotificationDispatcher')
-        self.notification = self.notification_p.start()
 
         self.plugin_p = mock.patch('quantum.agent.dhcp_agent.DhcpPluginApi')
         plugin_cls = self.plugin_p.start()
@@ -307,7 +338,7 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
         self.cache = mock.Mock()
         cache_cls.return_value = self.cache
 
-        self.dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
+        self.dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
         self.call_driver_p = mock.patch.object(self.dhcp, 'call_driver')
 
         self.call_driver = self.call_driver_p.start()
@@ -321,7 +352,6 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
         self.call_driver_p.stop()
         self.cache_p.stop()
         self.plugin_p.stop()
-        self.notification_p.stop()
 
     def test_enable_dhcp_helper(self):
         self.plugin.get_network_info.return_value = fake_network
@@ -462,26 +492,26 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
         payload = dict(network=dict(id=fake_network.id))
 
         with mock.patch.object(self.dhcp, 'enable_dhcp_helper') as enable:
-            self.dhcp.network_create_end(payload)
+            self.dhcp.network_create_end(None, payload)
             enable.assertCalledOnceWith(fake_network.id)
 
     def test_network_update_end_admin_state_up(self):
         payload = dict(network=dict(id=fake_network.id, admin_state_up=True))
         with mock.patch.object(self.dhcp, 'enable_dhcp_helper') as enable:
-            self.dhcp.network_update_end(payload)
+            self.dhcp.network_update_end(None, payload)
             enable.assertCalledOnceWith(fake_network.id)
 
     def test_network_update_end_admin_state_down(self):
         payload = dict(network=dict(id=fake_network.id, admin_state_up=False))
         with mock.patch.object(self.dhcp, 'disable_dhcp_helper') as disable:
-            self.dhcp.network_update_end(payload)
+            self.dhcp.network_update_end(None, payload)
             disable.assertCalledOnceWith(fake_network.id)
 
     def test_network_delete_end(self):
         payload = dict(network_id=fake_network.id)
 
         with mock.patch.object(self.dhcp, 'disable_dhcp_helper') as disable:
-            self.dhcp.network_delete_end(payload)
+            self.dhcp.network_delete_end(None, payload)
             disable.assertCalledOnceWith(fake_network.id)
 
     def test_refresh_dhcp_helper_no_dhcp_enabled_networks(self):
@@ -523,13 +553,13 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
         self.cache.get_network_by_id.return_value = fake_network
         self.plugin.get_network_info.return_value = fake_network
 
-        self.dhcp.subnet_update_end(payload)
+        self.dhcp.subnet_update_end(None, payload)
 
         self.cache.assert_has_calls([mock.call.put(fake_network)])
         self.call_driver.assert_called_once_with('reload_allocations',
                                                  fake_network)
 
-    def test_subnet_update_end(self):
+    def test_subnet_update_end_restart(self):
         new_state = FakeModel(fake_network.id,
                               tenant_id=fake_network.tenant_id,
                               admin_state_up=True,
@@ -540,7 +570,7 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
         self.cache.get_network_by_id.return_value = fake_network
         self.plugin.get_network_info.return_value = new_state
 
-        self.dhcp.subnet_update_end(payload)
+        self.dhcp.subnet_update_end(None, payload)
 
         self.cache.assert_has_calls([mock.call.put(new_state)])
         self.call_driver.assert_called_once_with('restart',
@@ -558,7 +588,7 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
         self.cache.get_network_by_id.return_value = prev_state
         self.plugin.get_network_info.return_value = fake_network
 
-        self.dhcp.subnet_delete_end(payload)
+        self.dhcp.subnet_delete_end(None, payload)
 
         self.cache.assert_has_calls([
             mock.call.get_network_by_subnet_id(
@@ -571,7 +601,7 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
     def test_port_update_end(self):
         payload = dict(port=vars(fake_port2))
         self.cache.get_network_by_id.return_value = fake_network
-        self.dhcp.port_update_end(payload)
+        self.dhcp.port_update_end(None, payload)
         self.cache.assert_has_calls(
             [mock.call.get_network_by_id(fake_port2.network_id),
              mock.call.put_port(mock.ANY)])
@@ -583,7 +613,7 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
         self.cache.get_network_by_id.return_value = fake_network
         self.cache.get_port_by_id.return_value = fake_port2
 
-        self.dhcp.port_delete_end(payload)
+        self.dhcp.port_delete_end(None, payload)
 
         self.cache.assert_has_calls(
             [mock.call.get_port_by_id(fake_port2.id),
@@ -596,7 +626,7 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
         payload = dict(port_id='unknown')
         self.cache.get_port_by_id.return_value = None
 
-        self.dhcp.port_delete_end(payload)
+        self.dhcp.port_delete_end(None, payload)
 
         self.cache.assert_has_calls([mock.call.get_port_by_id('unknown')])
         self.assertEqual(self.call_driver.call_count, 0)