# be activated when the subnet gateway_ip is None. The guest instance must
# be configured to request host routes via DHCP (Option 121).
# enable_isolated_metadata = False
+
# Allows for serving metadata requests coming from a dedicated metadata
# access network whose cidr is 169.254.169.254/16 (or larger prefix), and
# is connected to a Quantum router from which the VMs send metadata
# they will be able to reach 169.254.169.254 through a router.
# This option requires enable_isolated_metadata = True
# enable_metadata_network = False
+
+# The Quantum DHCP agent manager.
+# dhcp_agent_manager = quantum.agent.dhcp_agent.DhcpAgentWithStateReport
# DHCP Lease duration (in seconds)
# dhcp_lease_duration = 120
+# Allow sending resource operation notification to DHCP agent
+# dhcp_agent_notification = True
+
# Enable or disable bulk create/update/delete operations
# allow_bulk = True
# Enable or disable overlapping IPs for subnets
from quantum.common import exceptions
from quantum.common import topics
from quantum import context
+from quantum import manager
from quantum.openstack.common import importutils
from quantum.openstack.common import jsonutils
from quantum.openstack.common import log as logging
+from quantum.openstack.common import loopingcall
from quantum.openstack.common.rpc import proxy
+from quantum.openstack.common import service
from quantum.openstack.common import uuidutils
+from quantum import service as quantum_service
LOG = logging.getLogger(__name__)
NS_PREFIX = 'qdhcp-'
METADATA_PORT = 80
-class DhcpAgent(object):
+class DhcpAgent(manager.Manager):
OPTS = [
cfg.IntOpt('resync_interval', default=5,
help=_("Interval to resync.")),
cfg.BoolOpt('enable_metadata_network', default=False,
help=_("Allows for serving metadata requests from a "
"dedicate network. Requires "
- "enable isolated_metadata = True "))
+ "enable isolated_metadata = True ")),
+ cfg.StrOpt('dhcp_agent_manager',
+ default='quantum.agent.dhcp_agent.'
+ 'DhcpAgentWithStateReport',
+ help=_("The Quantum DHCP agent manager.")),
]
- def __init__(self, conf):
+ def __init__(self, host=None):
+ super(DhcpAgent, self).__init__(host=host)
self.needs_resync = False
- self.conf = conf
+ self.conf = cfg.CONF
self.cache = NetworkCache()
- self.root_helper = config.get_root_helper(conf)
-
- self.dhcp_driver_cls = importutils.import_class(conf.dhcp_driver)
+ self.root_helper = config.get_root_helper(self.conf)
+ self.dhcp_driver_cls = importutils.import_class(self.conf.dhcp_driver)
ctx = context.get_admin_context_without_session()
self.plugin_rpc = DhcpPluginApi(topics.PLUGIN, ctx)
-
self.device_manager = DeviceManager(self.conf, self.plugin_rpc)
- self.notifications = agent_rpc.NotificationDispatcher()
self.lease_relay = DhcpLeaseRelay(self.update_lease)
+ def after_start(self):
+ self.run()
+ LOG.info(_("DHCP agent started"))
+
def run(self):
"""Activate the DHCP agent."""
self.sync_state()
self.periodic_resync()
self.lease_relay.start()
- self.notifications.run_dispatch(self)
def _ns_name(self, network):
if self.conf.use_namespaces:
else:
self.disable_dhcp_helper(network.id)
- def network_create_end(self, payload):
+ def network_create_end(self, context, payload):
"""Handle the network.create.end notification event."""
network_id = payload['network']['id']
self.enable_dhcp_helper(network_id)
- def network_update_end(self, payload):
+ def network_update_end(self, context, payload):
"""Handle the network.update.end notification event."""
network_id = payload['network']['id']
if payload['network']['admin_state_up']:
else:
self.disable_dhcp_helper(network_id)
- def network_delete_end(self, payload):
+ def network_delete_end(self, context, payload):
"""Handle the network.delete.end notification event."""
self.disable_dhcp_helper(payload['network_id'])
- def subnet_update_end(self, payload):
+ def subnet_update_end(self, context, payload):
"""Handle the subnet.update.end notification event."""
network_id = payload['subnet']['network_id']
self.refresh_dhcp_helper(network_id)
# Use the update handler for the subnet create event.
subnet_create_end = subnet_update_end
- def subnet_delete_end(self, payload):
+ def subnet_delete_end(self, context, payload):
"""Handle the subnet.delete.end notification event."""
subnet_id = payload['subnet_id']
network = self.cache.get_network_by_subnet_id(subnet_id)
if network:
self.refresh_dhcp_helper(network.id)
- def port_update_end(self, payload):
+ def port_update_end(self, context, payload):
"""Handle the port.update.end notification event."""
port = DictModel(payload['port'])
network = self.cache.get_network_by_id(port.network_id)
# Use the update handler for the port create event.
port_create_end = port_update_end
- def port_delete_end(self, payload):
+ def port_delete_end(self, context, payload):
"""Handle the port.delete.end notification event."""
port = self.cache.get_port_by_id(payload['port_id'])
if port:
if port.id == port_id:
return port
+ def get_state(self):
+ net_ids = self.get_network_ids()
+ num_nets = len(net_ids)
+ num_subnets = 0
+ num_ports = 0
+ for net_id in net_ids:
+ network = self.get_network_by_id(net_id)
+ num_subnets += len(network.subnets)
+ num_ports += len(network.ports)
+ return {'networks': num_nets,
+ 'subnets': num_subnets,
+ 'ports': num_ports}
+
class DeviceManager(object):
OPTS = [
eventlet.spawn(eventlet.serve, listener, self._handler)
+class DhcpAgentWithStateReport(DhcpAgent):
+ def __init__(self, host=None):
+ super(DhcpAgentWithStateReport, self).__init__(host=host)
+ self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
+ self.agent_state = {
+ 'binary': 'quantum-dhcp-agent',
+ 'host': host,
+ 'topic': topics.DHCP_AGENT,
+ 'configurations': {
+ 'dhcp_driver': cfg.CONF.dhcp_driver,
+ 'use_namespaces': cfg.CONF.use_namespaces,
+ 'dhcp_lease_time': cfg.CONF.dhcp_lease_time},
+ 'start_flag': True,
+ 'agent_type': constants.AGENT_TYPE_DHCP}
+ report_interval = cfg.CONF.AGENT.report_interval
+ if report_interval:
+ heartbeat = loopingcall.LoopingCall(self._report_state)
+ heartbeat.start(interval=report_interval)
+
+ def _report_state(self):
+ try:
+ self.agent_state.get('configurations').update(
+ self.cache.get_state())
+ ctx = context.get_admin_context_without_session()
+ self.state_rpc.report_state(ctx,
+ self.agent_state)
+ except Exception:
+ LOG.exception(_("Failed reporting state!"))
+ return
+ if self.agent_state.pop('start_flag', None):
+ self.run()
+
+ def after_start(self):
+ LOG.info(_("DHCP agent started"))
+
+
def main():
eventlet.monkey_patch()
cfg.CONF.register_opts(DhcpAgent.OPTS)
+ config.register_agent_state_opts_helper(cfg.CONF)
config.register_root_helper(cfg.CONF)
cfg.CONF.register_opts(DeviceManager.OPTS)
cfg.CONF.register_opts(DhcpLeaseRelay.OPTS)
cfg.CONF.register_opts(interface.OPTS)
cfg.CONF(project='quantum')
config.setup_logging(cfg.CONF)
-
- mgr = DhcpAgent(cfg.CONF)
- mgr.run()
+ server = quantum_service.Service.create(
+ binary='quantum-dhcp-agent',
+ topic=topics.DHCP_AGENT,
+ report_interval=cfg.CONF.AGENT.report_interval)
+ service.launch(server).wait()
return self.call(context,
self.make_msg('tunnel_sync', tunnel_ip=tunnel_ip),
topic=self.topic)
-
-
-class NotificationDispatcher(object):
- def __init__(self):
- # Set the Queue size to 1 so that messages stay on server rather than
- # being buffered in the process.
- self.queue = eventlet.queue.Queue(1)
- self.connection = rpc.create_connection(new=True)
- topic = '%s.%s' % (rpc_notifier.CONF.notification_topics[0],
- api.CONF.default_notification_level.lower())
- queue_name = 'notification_listener_%s' % uuidutils.generate_uuid()
- self.connection.declare_topic_consumer(topic=topic,
- queue_name=queue_name,
- callback=self._add_to_queue)
- self.connection.consume_in_thread()
-
- def _add_to_queue(self, msg):
- self.queue.put(msg)
-
- def run_dispatch(self, handler):
- while True:
- msg = self.queue.get()
- name = msg['event_type'].replace('.', '_')
-
- try:
- if hasattr(handler, name):
- getattr(handler, name)(msg['payload'])
- else:
- LOG.debug(_('Unknown event_type: %s.'), msg['event_type'])
- except Exception, e:
- LOG.warn(_('Error processing message. Exception: %s'), e)
--- /dev/null
+# Copyright (c) 2013 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
--- /dev/null
+# Copyright (c) 2013 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
--- /dev/null
+# Copyright (c) 2013 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from quantum.common import topics
+from quantum.openstack.common import log as logging
+from quantum.openstack.common.rpc import proxy
+
+
+LOG = logging.getLogger(__name__)
+
+
+class DhcpAgentNotifyAPI(proxy.RpcProxy):
+ """API for plugin to notify DHCP agent."""
+ BASE_RPC_API_VERSION = '1.0'
+ # It seems dhcp agent does not support bulk operation
+ VALID_RESOURCES = ['network', 'subnet', 'port']
+ VALID_METHOD_NAMES = ['network.create.end',
+ 'network.update.end',
+ 'network.delete.end',
+ 'subnet.create.end',
+ 'subnet.update.end',
+ 'subnet.delete.end',
+ 'port.create.end',
+ 'port.update.end',
+ 'port.delete.end']
+
+ def __init__(self, topic=topics.DHCP_AGENT):
+ super(DhcpAgentNotifyAPI, self).__init__(
+ topic=topic, default_version=self.BASE_RPC_API_VERSION)
+
+ def _notification(self, context, method, payload):
+ """Notify all the agents that are hosting the network"""
+ # By now, we have no scheduling feature, so we fanout
+ # to all of the DHCP agents
+ self._notification_fanout(context, method, payload)
+
+ def _notification_fanout(self, context, method, payload):
+ """Fanout the payload to all dhcp agents"""
+ self.fanout_cast(
+ context, self.make_msg(method,
+ payload=payload),
+ topic=topics.DHCP_AGENT)
+
+ def notify(self, context, data, methodname):
+ # data is {'key' : 'value'} with only one key
+ if methodname not in self.VALID_METHOD_NAMES:
+ return
+ obj_type = data.keys()[0]
+ if obj_type not in self.VALID_RESOURCES:
+ return
+ obj_value = data[obj_type]
+ methodname = methodname.replace(".", "_")
+ if methodname.endswith("_delete_end"):
+ if 'id' in obj_value:
+ self._notification(context, methodname,
+ {obj_type + '_id': obj_value['id']})
+ else:
+ self._notification(context, methodname, data)
import netaddr
import webob.exc
+from oslo.config import cfg
+
+from quantum.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from quantum.api.v2 import attributes
from quantum.api.v2 import resource as wsgi_resource
from quantum.common import exceptions
self._policy_attrs = [name for (name, info) in self._attr_info.items()
if info.get('required_by_policy')]
self._publisher_id = notifier_api.publisher_id('network')
+ self._dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
self._member_actions = member_actions
if parent:
policy.enforce(request.context, action, obj, plugin=self._plugin)
return obj
+ def _send_dhcp_notification(self, context, data, methodname):
+ if cfg.CONF.dhcp_agent_notification:
+ self._dhcp_agent_notifier.notify(context, data, methodname)
+
def index(self, request, **kwargs):
"""Returns a list of the requested entity"""
parent_id = kwargs.get(self._parent_id_name)
**kwargs)
def notify(create_result):
+ notifier_method = self._resource + '.create.end'
notifier_api.notify(request.context,
self._publisher_id,
- self._resource + '.create.end',
+ notifier_method,
notifier_api.CONF.default_notification_level,
create_result)
+ self._send_dhcp_notification(request.context,
+ create_result,
+ notifier_method)
return create_result
kwargs = {self._parent_id_name: parent_id} if parent_id else {}
obj_deleter = getattr(self._plugin, action)
obj_deleter(request.context, id, **kwargs)
+ notifier_method = self._resource + '.delete.end'
notifier_api.notify(request.context,
self._publisher_id,
- self._resource + '.delete.end',
+ notifier_method,
notifier_api.CONF.default_notification_level,
{self._resource + '_id': id})
+ result = {self._resource: self._view(obj)}
+ self._send_dhcp_notification(request.context,
+ result,
+ notifier_method)
def update(self, request, id, body=None, **kwargs):
"""Updates the specified entity's attributes"""
kwargs[self._parent_id_name] = parent_id
obj = obj_updater(request.context, id, **kwargs)
result = {self._resource: self._view(obj)}
+ notifier_method = self._resource + '.update.end'
notifier_api.notify(request.context,
self._publisher_id,
- self._resource + '.update.end',
+ notifier_method,
notifier_api.CONF.default_notification_level,
result)
+ self._send_dhcp_notification(request.context,
+ result,
+ notifier_method)
return result
@staticmethod
help=_("Maximum number of host routes per subnet")),
cfg.IntOpt('dhcp_lease_duration', default=120,
help=_("DHCP lease duration")),
+ cfg.BoolOpt('dhcp_agent_notification', default=True,
+ help=_("Allow sending resource operation"
+ " notification to DHCP agent")),
cfg.BoolOpt('allow_overlapping_ips', default=False,
help=_("Allow overlapping IP support in Quantum")),
cfg.StrOpt('host', default=utils.get_hostname(),
DHCP = 'q-dhcp-notifer'
L3_AGENT = 'l3_agent'
+DHCP_AGENT = 'dhcp_agent'
def get_topic_name(prefix, table, operation):
with mock.patch(call_to_patch) as create_connection:
conn = rpc.create_consumers(dispatcher, 'foo', [('topic', 'op')])
create_connection.assert_has_calls(expected)
-
-
-class AgentRPCNotificationDispatcher(unittest.TestCase):
- def setUp(self):
- self.create_connection_p = mock.patch(
- 'quantum.openstack.common.rpc.create_connection')
- self.create_connection = self.create_connection_p.start()
- cfg.CONF.set_override('default_notification_level', 'INFO')
- cfg.CONF.set_override('notification_topics', ['notifications'])
-
- def tearDown(self):
- self.create_connection_p.stop()
- cfg.CONF.reset()
-
- def test_init(self):
- nd = rpc.NotificationDispatcher()
-
- expected = [
- mock.call(new=True),
- mock.call().declare_topic_consumer(topic='notifications.info',
- queue_name=mock.ANY,
- callback=nd._add_to_queue),
- mock.call().consume_in_thread()
- ]
- self.create_connection.assert_has_calls(expected)
-
- def test_add_to_queue(self):
- nd = rpc.NotificationDispatcher()
- nd._add_to_queue('foo')
- self.assertEqual(nd.queue.get(), 'foo')
-
- def _test_run_dispatch_helper(self, msg, handler):
- msgs = [msg]
-
- def side_effect(*args):
- return msgs.pop(0)
-
- with mock.patch('eventlet.Queue.get') as queue_get:
- queue_get.side_effect = side_effect
- nd = rpc.NotificationDispatcher()
- # catch the assertion so that the loop runs once
- self.assertRaises(IndexError, nd.run_dispatch, handler)
-
- def test_run_dispatch_once(self):
- class SimpleHandler:
- def __init__(self):
- self.network_delete_end = mock.Mock()
-
- msg = dict(event_type='network.delete.end',
- payload=dict(network_id='a'))
-
- handler = SimpleHandler()
- self._test_run_dispatch_helper(msg, handler)
- handler.network_delete_end.called_once_with(msg['payload'])
-
- def test_run_dispatch_missing_handler(self):
- class SimpleHandler:
- self.subnet_create_start = mock.Mock()
-
- msg = dict(event_type='network.delete.end',
- payload=dict(network_id='a'))
-
- handler = SimpleHandler()
-
- with mock.patch('quantum.agent.rpc.LOG') as log:
- self._test_run_dispatch_helper(msg, handler)
- log.assert_has_calls([mock.call.debug(mock.ANY, mock.ANY)])
-
- def test_run_dispatch_handler_raises(self):
- class SimpleHandler:
- def network_delete_end(self, payload):
- raise Exception('foo')
-
- msg = dict(event_type='network.delete.end',
- payload=dict(network_id='a'))
-
- handler = SimpleHandler()
-
- with mock.patch('quantum.agent.rpc.LOG') as log:
- self._test_run_dispatch_helper(msg, handler)
- log.assert_has_calls([mock.call.warn(mock.ANY, mock.ANY)])
import sys
import uuid
+import eventlet
import mock
from oslo.config import cfg
import unittest2 as unittest
from quantum.agent.common import config
from quantum.agent import dhcp_agent
+from quantum.agent.dhcp_agent import DhcpAgentWithStateReport
+from quantum.agent.linux import dhcp
from quantum.agent.linux import interface
from quantum.common import constants
from quantum.common import exceptions
ROOTDIR = os.path.dirname(os.path.dirname(__file__))
ETCDIR = os.path.join(ROOTDIR, 'etc')
+HOSTNAME = 'hostname'
def etcdir(*p):
self.driver = mock.Mock(name='driver')
self.driver_cls = self.driver_cls_p.start()
self.driver_cls.return_value = self.driver
- self.notification_p = mock.patch(
- 'quantum.agent.rpc.NotificationDispatcher')
- self.notification = self.notification_p.start()
def tearDown(self):
- self.notification_p.stop()
self.driver_cls_p.stop()
cfg.CONF.reset()
- def test_dhcp_agent_main(self):
+ def test_dhcp_agent_manager(self):
+ state_rpc_str = 'quantum.agent.rpc.PluginReportStateAPI'
+ lease_relay_str = 'quantum.agent.dhcp_agent.DhcpLeaseRelay'
+ with mock.patch.object(DhcpAgentWithStateReport,
+ 'sync_state',
+ autospec=True) as mock_sync_state:
+ with mock.patch.object(DhcpAgentWithStateReport,
+ 'periodic_resync',
+ autospec=True) as mock_periodic_resync:
+ with mock.patch(state_rpc_str) as state_rpc:
+ with mock.patch(lease_relay_str) as mock_lease_relay:
+ with mock.patch.object(sys, 'argv') as sys_argv:
+ sys_argv.return_value = [
+ 'dhcp', '--config-file',
+ etcdir('quantum.conf.test')]
+ cfg.CONF.register_opts(dhcp_agent.DhcpAgent.OPTS)
+ config.register_agent_state_opts_helper(cfg.CONF)
+ config.register_root_helper(cfg.CONF)
+ cfg.CONF.register_opts(
+ dhcp_agent.DeviceManager.OPTS)
+ cfg.CONF.register_opts(
+ dhcp_agent.DhcpLeaseRelay.OPTS)
+ cfg.CONF.register_opts(dhcp.OPTS)
+ cfg.CONF.register_opts(interface.OPTS)
+ cfg.CONF(project='quantum')
+ agent_mgr = DhcpAgentWithStateReport('testhost')
+ eventlet.greenthread.sleep(1)
+ agent_mgr.after_start()
+ mock_sync_state.assert_called_once_with(agent_mgr)
+ mock_periodic_resync.assert_called_once_with(
+ agent_mgr)
+ state_rpc.assert_has_calls(
+ [mock.call(mock.ANY),
+ mock.call().report_state(mock.ANY, mock.ANY)])
+ mock_lease_relay.assert_has_calls(
+ [mock.call(mock.ANY),
+ mock.call().start()])
+
+ def test_dhcp_agent_main_agent_manager(self):
logging_str = 'quantum.agent.common.config.setup_logging'
- manager_str = 'quantum.agent.dhcp_agent.DeviceManager'
- agent_str = 'quantum.agent.dhcp_agent.DhcpAgent'
+ launcher_str = 'quantum.openstack.common.service.ServiceLauncher'
with mock.patch(logging_str):
- with mock.patch(manager_str) as dev_mgr:
- with mock.patch(agent_str) as dhcp:
- with mock.patch.object(sys, 'argv') as sys_argv:
- sys_argv.return_value = ['dhcp', '--config-file',
- etcdir('quantum.conf.test')]
- dhcp_agent.main()
- dev_mgr.assert_called_once(mock.ANY, 'sudo')
- dhcp.assert_has_calls([
- mock.call(mock.ANY),
- mock.call().run()])
+ with mock.patch.object(sys, 'argv') as sys_argv:
+ with mock.patch(launcher_str) as launcher:
+ sys_argv.return_value = ['dhcp', '--config-file',
+ etcdir('quantum.conf.test')]
+ dhcp_agent.main()
+ launcher.assert_has_calls(
+ [mock.call(), mock.call().launch_service(mock.ANY),
+ mock.call().wait()])
def test_run_completes_single_pass(self):
with mock.patch('quantum.agent.dhcp_agent.DeviceManager') as dev_mgr:
- dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
+ dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
attrs_to_mock = dict(
[(a, mock.DEFAULT) for a in
['sync_state', 'lease_relay', 'periodic_resync']])
mocks['periodic_resync'].assert_called_once_with()
mocks['lease_relay'].assert_has_mock_calls(
[mock.call.start()])
- self.notification.assert_has_calls([mock.call.run_dispatch()])
def test_ns_name(self):
with mock.patch('quantum.agent.dhcp_agent.DeviceManager') as dev_mgr:
mock_net = mock.Mock(id='foo')
- dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
+ dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
self.assertEqual(dhcp._ns_name(mock_net), 'qdhcp-foo')
def test_ns_name_disabled_namespace(self):
with mock.patch('quantum.agent.dhcp_agent.DeviceManager') as dev_mgr:
cfg.CONF.set_override('use_namespaces', False)
mock_net = mock.Mock(id='foo')
- dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
+ dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
self.assertIsNone(dhcp._ns_name(mock_net))
def test_call_driver(self):
self.driver.return_value.foo.side_effect = Exception
with mock.patch('quantum.agent.dhcp_agent.DeviceManager') as dev_mgr:
with mock.patch.object(dhcp_agent.LOG, 'exception') as log:
- dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
+ dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
self.assertIsNone(dhcp.call_driver('foo', network))
self.assertTrue(dev_mgr.called)
self.driver.assert_called_once_with(cfg.CONF,
def test_update_lease(self):
with mock.patch('quantum.agent.dhcp_agent.DhcpPluginApi') as plug:
- dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
+ dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
dhcp.update_lease('net_id', '192.168.1.1', 120)
plug.assert_has_calls(
[mock.call().update_lease_expiration(
plug.return_value.update_lease_expiration.side_effect = Exception
with mock.patch.object(dhcp_agent.LOG, 'exception') as log:
- dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
+ dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
dhcp.update_lease('net_id', '192.168.1.1', 120)
plug.assert_has_calls(
[mock.call().update_lease_expiration(
mock_plugin.get_active_networks.return_value = active_networks
plug.return_value = mock_plugin
- dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
+ dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
attrs_to_mock = dict(
[(a, mock.DEFAULT) for a in
plug.return_value = mock_plugin
with mock.patch.object(dhcp_agent.LOG, 'exception') as log:
- dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
+ dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
dhcp.sync_state()
self.assertTrue(log.called)
self.assertTrue(dhcp.needs_resync)
def test_periodic_resync(self):
- dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
+ dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
with mock.patch.object(dhcp_agent.eventlet, 'spawn') as spawn:
dhcp.periodic_resync()
spawn.assert_called_once_with(dhcp._periodic_resync_helper)
def test_periodoc_resync_helper(self):
with mock.patch.object(dhcp_agent.eventlet, 'sleep') as sleep:
- dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
+ dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
dhcp.needs_resync = True
with mock.patch.object(dhcp, 'sync_state') as sync_state:
sync_state.side_effect = RuntimeError
'quantum.agent.linux.interface.NullDriver')
config.register_root_helper(cfg.CONF)
cfg.CONF.register_opts(dhcp_agent.DhcpAgent.OPTS)
- self.notification_p = mock.patch(
- 'quantum.agent.rpc.NotificationDispatcher')
- self.notification = self.notification_p.start()
self.plugin_p = mock.patch('quantum.agent.dhcp_agent.DhcpPluginApi')
plugin_cls = self.plugin_p.start()
self.cache = mock.Mock()
cache_cls.return_value = self.cache
- self.dhcp = dhcp_agent.DhcpAgent(cfg.CONF)
+ self.dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
self.call_driver_p = mock.patch.object(self.dhcp, 'call_driver')
self.call_driver = self.call_driver_p.start()
self.call_driver_p.stop()
self.cache_p.stop()
self.plugin_p.stop()
- self.notification_p.stop()
def test_enable_dhcp_helper(self):
self.plugin.get_network_info.return_value = fake_network
payload = dict(network=dict(id=fake_network.id))
with mock.patch.object(self.dhcp, 'enable_dhcp_helper') as enable:
- self.dhcp.network_create_end(payload)
+ self.dhcp.network_create_end(None, payload)
enable.assertCalledOnceWith(fake_network.id)
def test_network_update_end_admin_state_up(self):
payload = dict(network=dict(id=fake_network.id, admin_state_up=True))
with mock.patch.object(self.dhcp, 'enable_dhcp_helper') as enable:
- self.dhcp.network_update_end(payload)
+ self.dhcp.network_update_end(None, payload)
enable.assertCalledOnceWith(fake_network.id)
def test_network_update_end_admin_state_down(self):
payload = dict(network=dict(id=fake_network.id, admin_state_up=False))
with mock.patch.object(self.dhcp, 'disable_dhcp_helper') as disable:
- self.dhcp.network_update_end(payload)
+ self.dhcp.network_update_end(None, payload)
disable.assertCalledOnceWith(fake_network.id)
def test_network_delete_end(self):
payload = dict(network_id=fake_network.id)
with mock.patch.object(self.dhcp, 'disable_dhcp_helper') as disable:
- self.dhcp.network_delete_end(payload)
+ self.dhcp.network_delete_end(None, payload)
disable.assertCalledOnceWith(fake_network.id)
def test_refresh_dhcp_helper_no_dhcp_enabled_networks(self):
self.cache.get_network_by_id.return_value = fake_network
self.plugin.get_network_info.return_value = fake_network
- self.dhcp.subnet_update_end(payload)
+ self.dhcp.subnet_update_end(None, payload)
self.cache.assert_has_calls([mock.call.put(fake_network)])
self.call_driver.assert_called_once_with('reload_allocations',
fake_network)
- def test_subnet_update_end(self):
+ def test_subnet_update_end_restart(self):
new_state = FakeModel(fake_network.id,
tenant_id=fake_network.tenant_id,
admin_state_up=True,
self.cache.get_network_by_id.return_value = fake_network
self.plugin.get_network_info.return_value = new_state
- self.dhcp.subnet_update_end(payload)
+ self.dhcp.subnet_update_end(None, payload)
self.cache.assert_has_calls([mock.call.put(new_state)])
self.call_driver.assert_called_once_with('restart',
self.cache.get_network_by_id.return_value = prev_state
self.plugin.get_network_info.return_value = fake_network
- self.dhcp.subnet_delete_end(payload)
+ self.dhcp.subnet_delete_end(None, payload)
self.cache.assert_has_calls([
mock.call.get_network_by_subnet_id(
def test_port_update_end(self):
payload = dict(port=vars(fake_port2))
self.cache.get_network_by_id.return_value = fake_network
- self.dhcp.port_update_end(payload)
+ self.dhcp.port_update_end(None, payload)
self.cache.assert_has_calls(
[mock.call.get_network_by_id(fake_port2.network_id),
mock.call.put_port(mock.ANY)])
self.cache.get_network_by_id.return_value = fake_network
self.cache.get_port_by_id.return_value = fake_port2
- self.dhcp.port_delete_end(payload)
+ self.dhcp.port_delete_end(None, payload)
self.cache.assert_has_calls(
[mock.call.get_port_by_id(fake_port2.id),
payload = dict(port_id='unknown')
self.cache.get_port_by_id.return_value = None
- self.dhcp.port_delete_end(payload)
+ self.dhcp.port_delete_end(None, payload)
self.cache.assert_has_calls([mock.call.get_port_by_id('unknown')])
self.assertEqual(self.call_driver.call_count, 0)