# admin_state_up set to True to alive agents.
# allow_automatic_l3agent_failover = False
+# Allow automatic removal of networks from dead DHCP agents with
+# admin_state_up set to True.
+# Networks could then be rescheduled if network_auto_schedule is True
+# allow_automatic_dhcp_failover = True
+
# Number of DHCP agents scheduled to host a network. This enables redundant
# DHCP agents for configured networks.
# dhcp_agents_per_network = 1
# License for the specific language governing permissions and limitations
# under the License.
+import datetime
+import random
+import time
+
from oslo.config import cfg
+from oslo.utils import timeutils
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.orm import exc
from neutron.common import constants
from neutron.common import utils
+from neutron import context as ncontext
from neutron.db import agents_db
from neutron.db import model_base
from neutron.extensions import agent as ext_agent
from neutron.extensions import dhcpagentscheduler
+from neutron.i18n import _LE, _LI, _LW
from neutron.openstack.common import log as logging
+from neutron.openstack.common import loopingcall
LOG = logging.getLogger(__name__)
help=_('Driver to use for scheduling network to DHCP agent')),
cfg.BoolOpt('network_auto_schedule', default=True,
help=_('Allow auto scheduling networks to DHCP agent.')),
+ cfg.BoolOpt('allow_automatic_dhcp_failover', default=True,
+ help=_('Automatically remove networks from offline DHCP '
+ 'agents.')),
cfg.IntOpt('dhcp_agents_per_network', default=1,
help=_('Number of DHCP agents scheduled to host a network.')),
]
original_agent['host'])
return result
+ def setup_agent_status_check(self, function):
+ self.periodic_agent_loop = loopingcall.FixedIntervalLoopingCall(
+ function)
+ # TODO(enikanorov): make interval configurable rather than computed
+ interval = max(cfg.CONF.agent_down_time / 2, 1)
+ # add random initial delay to allow agents to check in after the
+ # neutron server first starts. random to offset multiple servers
+ initial_delay = random.randint(interval, interval * 2)
+ self.periodic_agent_loop.start(interval=interval,
+ initial_delay=initial_delay)
+
+ def agent_dead_limit_seconds(self):
+ return cfg.CONF.agent_down_time * 2
+
+ def wait_down_agents(self, agent_type, agent_dead_limit):
+ """Gives chance for agents to send a heartbeat."""
+ # check for an abrupt clock change since last check. if a change is
+ # detected, sleep for a while to let the agents check in.
+ tdelta = timeutils.utcnow() - getattr(self, '_clock_jump_canary',
+ timeutils.utcnow())
+ if timeutils.total_seconds(tdelta) > cfg.CONF.agent_down_time:
+ LOG.warn(_LW("Time since last %s agent reschedule check has "
+ "exceeded the interval between checks. Waiting "
+ "before check to allow agents to send a heartbeat "
+ "in case there was a clock adjustment."), agent_type)
+ time.sleep(agent_dead_limit)
+ self._clock_jump_canary = timeutils.utcnow()
+
+ def get_cutoff_time(self, agent_dead_limit):
+ cutoff = timeutils.utcnow() - datetime.timedelta(
+ seconds=agent_dead_limit)
+ return cutoff
+
class DhcpAgentSchedulerDbMixin(dhcpagentscheduler
.DhcpAgentSchedulerPluginBase,
network_scheduler = None
+ def start_periodic_dhcp_agent_status_check(self):
+ if not cfg.CONF.allow_automatic_dhcp_failover:
+ LOG.info(_LI("Skipping periodic DHCP agent status check because "
+ "automatic network rescheduling is disabled."))
+ return
+
+ self.setup_agent_status_check(self.remove_networks_from_down_agents)
+
+ def _agent_starting_up(self, context, agent):
+ """Check if agent was just started.
+
+ Method returns True if agent is in its 'starting up' period.
+ Return value depends on amount of networks assigned to the agent.
+ It doesn't look at latest heartbeat timestamp as it is assumed
+ that this method is called for agents that are considered dead.
+ """
+ agent_dead_limit = datetime.timedelta(
+ seconds=self.agent_dead_limit_seconds())
+ network_count = (context.session.query(NetworkDhcpAgentBinding).
+ filter_by(dhcp_agent_id=agent['id']).count())
+ # amount of networks assigned to agent affect amount of time we give
+ # it so startup. Tests show that it's more or less sage to assume
+ # that DHCP agent processes each network in less than 2 seconds.
+ # So, give it this additional time for each of the networks.
+ additional_time = datetime.timedelta(seconds=2 * network_count)
+ LOG.debug("Checking if agent starts up and giving it additional %s",
+ additional_time)
+ agent_expected_up = (agent['started_at'] + agent_dead_limit +
+ additional_time)
+ return agent_expected_up > timeutils.utcnow()
+
+ def _schedule_network(self, context, network_id, dhcp_notifier):
+ LOG.info(_LI("Scheduling unhosted network %s"), network_id)
+ try:
+ # TODO(enikanorov): have to issue redundant db query
+ # to satisfy scheduling interface
+ network = self.get_network(context, network_id)
+ agents = self.schedule_network(context, network)
+ if not agents:
+ LOG.info(_LI("Failed to schedule network %s, "
+ "no eligible agents or it might be "
+ "already scheduled by another server"),
+ network_id)
+ return
+ if not dhcp_notifier:
+ return
+ for agent in agents:
+ LOG.info(_LI("Adding network %(net)s to agent "
+ "%(agent)%s on host %(host)s"),
+ {'net': network_id,
+ 'agent': agent.id,
+ 'host': agent.host})
+ dhcp_notifier.network_added_to_agent(
+ context, network_id, agent.host)
+ except Exception:
+ # catching any exception during scheduling
+ # so if _schedule_network is invoked in the loop it could
+ # continue in any case
+ LOG.exception(_LE("Failed to schedule network %s"), network_id)
+
+ def _filter_bindings(self, context, bindings):
+ """Skip bindings for which the agent is dead, but starting up."""
+
+ # to save few db calls: store already checked agents in dict
+ # id -> is_agent_starting_up
+ checked_agents = {}
+ for binding in bindings:
+ agent_id = binding.dhcp_agent['id']
+ if agent_id not in checked_agents:
+ if self._agent_starting_up(context, binding.dhcp_agent):
+ # When agent starts and it has many networks to process
+ # it may fail to send state reports in defined interval.
+ # The server will consider it dead and try to remove
+ # networks from it.
+ checked_agents[agent_id] = True
+ LOG.debug("Agent %s is starting up, skipping", agent_id)
+ else:
+ checked_agents[agent_id] = False
+ if not checked_agents[agent_id]:
+ yield binding
+
+ def remove_networks_from_down_agents(self):
+ """Remove networks from down DHCP agents if admin state is up.
+
+ Reschedule them if configured so.
+ """
+
+ agent_dead_limit = self.agent_dead_limit_seconds()
+ self.wait_down_agents('DHCP', agent_dead_limit)
+ cutoff = self.get_cutoff_time(agent_dead_limit)
+
+ context = ncontext.get_admin_context()
+ down_bindings = (
+ context.session.query(NetworkDhcpAgentBinding).
+ join(agents_db.Agent).
+ filter(agents_db.Agent.heartbeat_timestamp < cutoff,
+ agents_db.Agent.admin_state_up))
+ dhcp_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_DHCP)
+
+ for binding in self._filter_bindings(context, down_bindings):
+ LOG.warn(_LW("Removing network %(network)s from agent %(agent)s "
+ "because the agent did not report to the server in "
+ "the last %(dead_time)s seconds."),
+ {'network': binding.network_id,
+ 'agent': binding.dhcp_agent_id,
+ 'dead_time': agent_dead_limit})
+ try:
+ self.remove_network_from_dhcp_agent(context,
+ binding.dhcp_agent_id,
+ binding.network_id)
+ except dhcpagentscheduler.NetworkNotHostedByDhcpAgent:
+ # measures against concurrent operation
+ LOG.debug("Network %(net)s already removed from DHCP agent "
+ "%s(agent)",
+ {'net': binding.network_id,
+ 'agent': binding.dhcp_agent_id})
+ # still continue and allow concurrent scheduling attempt
+
+ if cfg.CONF.network_auto_schedule:
+ self._schedule_network(
+ context, binding.network_id, dhcp_notifier)
+
def get_dhcp_agents_hosting_networks(
self, context, network_ids, active=None):
if not network_ids:
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-import datetime
-import random
-import time
-
from oslo.config import cfg
from oslo.db import exception as db_exc
from oslo import messaging
-from oslo.utils import timeutils
import sqlalchemy as sa
from sqlalchemy import func
from sqlalchemy import or_
from neutron.i18n import _LE, _LI, _LW
from neutron import manager
from neutron.openstack.common import log as logging
-from neutron.openstack.common import loopingcall
LOG = logging.getLogger(__name__)
router_scheduler = None
- def start_periodic_agent_status_check(self):
+ def start_periodic_l3_agent_status_check(self):
if not cfg.CONF.allow_automatic_l3agent_failover:
LOG.info(_LI("Skipping period L3 agent status check because "
"automatic router rescheduling is disabled."))
return
- self.periodic_agent_loop = loopingcall.FixedIntervalLoopingCall(
+ self.setup_agent_status_check(
self.reschedule_routers_from_down_agents)
- interval = max(cfg.CONF.agent_down_time / 2, 1)
- # add random initial delay to allow agents to check in after the
- # neutron server first starts. random to offset multiple servers
- self.periodic_agent_loop.start(interval=interval,
- initial_delay=random.randint(interval, interval * 2))
def reschedule_routers_from_down_agents(self):
"""Reschedule routers from down l3 agents if admin state is up."""
-
- # give agents extra time to handle transient failures
- agent_dead_limit = cfg.CONF.agent_down_time * 2
-
- # check for an abrupt clock change since last check. if a change is
- # detected, sleep for a while to let the agents check in.
- tdelta = timeutils.utcnow() - getattr(self, '_clock_jump_canary',
- timeutils.utcnow())
- if timeutils.total_seconds(tdelta) > cfg.CONF.agent_down_time:
- LOG.warn(_LW("Time since last L3 agent reschedule check has "
- "exceeded the interval between checks. Waiting "
- "before check to allow agents to send a heartbeat "
- "in case there was a clock adjustment."))
- time.sleep(agent_dead_limit)
- self._clock_jump_canary = timeutils.utcnow()
+ agent_dead_limit = self.agent_dead_limit_seconds()
+ self.wait_down_agents('L3', agent_dead_limit)
+ cutoff = self.get_cutoff_time(agent_dead_limit)
context = n_ctx.get_admin_context()
- cutoff = timeutils.utcnow() - datetime.timedelta(
- seconds=agent_dead_limit)
down_bindings = (
context.session.query(RouterL3AgentBinding).
join(agents_db.Agent).
if cfg.CONF.RESTPROXY.sync_data:
self._send_all_data()
+ self.start_periodic_dhcp_agent_status_check()
LOG.debug("NeutronRestProxyV2: initialization done")
def _setup_rpc(self):
cfg.CONF.router_scheduler_driver
)
self.brocade_init()
+ self.start_periodic_dhcp_agent_status_check()
def brocade_init(self):
"""Brocade specific initialization."""
self.network_scheduler = importutils.import_object(
q_conf.CONF.network_scheduler_driver
)
+ self.start_periodic_dhcp_agent_status_check()
def _setup_rpc(self):
# RPC support
cfg.CONF.network_scheduler_driver
)
+ self.start_periodic_dhcp_agent_status_check()
LOG.info(_LI("Modular L2 Plugin initialization complete"))
def _setup_rpc(self):
'default': self.deactivate_port,
}
}
+ self.start_periodic_dhcp_agent_status_check()
def setup_rpc(self):
self.service_topics = {svc_constants.CORE: topics.PLUGIN,
cfg.CONF.network_scheduler_driver)
self.router_scheduler = importutils.import_object(
cfg.CONF.router_scheduler_driver)
+ self.start_periodic_dhcp_agent_status_check()
def oneconvergence_init(self):
"""Initialize the connections and set the log levels for the plugin."""
self.nsx_sync_opts.min_sync_req_delay,
self.nsx_sync_opts.min_chunk_size,
self.nsx_sync_opts.max_random_sync_delay)
+ self.start_periodic_dhcp_agent_status_check()
def _ensure_default_network_gateway(self):
if self._is_default_net_gw_in_sync:
self.setup_rpc()
self.router_scheduler = importutils.import_object(
cfg.CONF.router_scheduler_driver)
- self.start_periodic_agent_status_check()
+ self.start_periodic_l3_agent_status_check()
super(L3RouterPlugin, self).__init__()
def setup_rpc(self):
cfg.CONF.set_override('add_meta_server_route', False, 'RESTPROXY')
def setup_patches(self):
+ self.dhcp_periodic_p = mock.patch(
+ 'neutron.db.agentschedulers_db.DhcpAgentSchedulerDbMixin.'
+ 'start_periodic_dhcp_agent_status_check')
+ self.patched_dhcp_periodic = self.dhcp_periodic_p.start()
self.plugin_notifier_p = mock.patch(NOTIFIER)
# prevent any greenthreads from spawning
self.spawn_p = mock.patch(SPAWN, new=lambda *args, **kwargs: None)
self.patched_l3_notify = self.l3_notify_p.start()
self.l3_periodic_p = mock.patch('neutron.db.l3_agentschedulers_db.'
'L3AgentSchedulerDbMixin.'
- 'start_periodic_agent_status_check')
+ 'start_periodic_l3_agent_status_check')
self.patched_l3_periodic = self.l3_periodic_p.start()
self.dhcp_notify_p = mock.patch(
'neutron.extensions.dhcpagentscheduler.notify')
self.net_create_status = 'ACTIVE'
self.port_create_status = 'ACTIVE'
+ self.dhcp_periodic_p = mock.patch(
+ 'neutron.db.agentschedulers_db.DhcpAgentSchedulerDbMixin.'
+ 'start_periodic_dhcp_agent_status_check')
+ self.patched_dhcp_periodic = self.dhcp_periodic_p.start()
+
def _is_native_bulk_supported():
plugin_obj = manager.NeutronManager.get_plugin()
native_bulk_attr_name = ("_%s__native_bulk_support"
# See the License for the specific language governing permissions and
# limitations under the License.
+import contextlib
import datetime
import mock
from neutron.common import topics
from neutron import context
from neutron.db import agents_db
-from neutron.db import agentschedulers_db
+from neutron.db import agentschedulers_db as sched_db
from neutron.db import models_v2
from neutron.scheduler import dhcp_agent_scheduler
from neutron.tests.unit import testlib_api
old_time = agent['heartbeat_timestamp']
hour_old = old_time - datetime.timedelta(hours=1)
agent['heartbeat_timestamp'] = hour_old
+ agent['started_at'] = hour_old
self._save_agents(dhcp_agents)
return dhcp_agents
scheduler = dhcp_agent_scheduler.ChanceScheduler()
scheduler._schedule_bind_network(self.ctx, agents, network_id)
results = self.ctx.session.query(
- agentschedulers_db.NetworkDhcpAgentBinding).filter_by(
+ sched_db.NetworkDhcpAgentBinding).filter_by(
network_id=network_id).all()
self.assertEqual(len(agents), len(results))
for result in results:
plugin, self.ctx, host)
self.assertEqual(expected_result, observed_ret_value)
hosted_agents = self.ctx.session.query(
- agentschedulers_db.NetworkDhcpAgentBinding).all()
+ sched_db.NetworkDhcpAgentBinding).all()
self.assertEqual(expected_hosted_agents, len(hosted_agents))
+
+
+class TestNetworksFailover(TestDhcpSchedulerBaseTestCase,
+ sched_db.DhcpAgentSchedulerDbMixin):
+ def test_reschedule_network_from_down_agent(self):
+ plugin = mock.MagicMock()
+ plugin.get_subnets.return_value = [{"network_id": self.network_id,
+ "enable_dhcp": True}]
+ agents = self._create_and_set_agents_down(['host-a', 'host-b'], 1)
+ self._test_schedule_bind_network([agents[0]], self.network_id)
+ self._save_networks(["foo-network-2"])
+ self._test_schedule_bind_network([agents[1]], "foo-network-2")
+ with contextlib.nested(
+ mock.patch.object(self, 'remove_network_from_dhcp_agent'),
+ mock.patch.object(self, 'schedule_network',
+ return_value=[agents[1]]),
+ mock.patch.object(self, 'get_network', create=True,
+ return_value={'id': self.network_id})
+ ) as (rn, sch, getn):
+ notifier = mock.MagicMock()
+ self.agent_notifiers[constants.AGENT_TYPE_DHCP] = notifier
+ self.remove_networks_from_down_agents()
+ rn.assert_called_with(mock.ANY, agents[0].id, self.network_id)
+ sch.assert_called_with(mock.ANY, {'id': self.network_id})
+ notifier.network_added_to_agent.assert_called_with(
+ mock.ANY, self.network_id, agents[1].host)
+
+ def test_reschedule_network_from_down_agent_failed(self):
+ plugin = mock.MagicMock()
+ plugin.get_subnets.return_value = [{"network_id": self.network_id,
+ "enable_dhcp": True}]
+ agents = self._create_and_set_agents_down(['host-a'], 1)
+ self._test_schedule_bind_network([agents[0]], self.network_id)
+ with contextlib.nested(
+ mock.patch.object(self, 'remove_network_from_dhcp_agent'),
+ mock.patch.object(self, 'schedule_network',
+ return_value=None),
+ mock.patch.object(self, 'get_network', create=True,
+ return_value={'id': self.network_id})
+ ) as (rn, sch, getn):
+ notifier = mock.MagicMock()
+ self.agent_notifiers[constants.AGENT_TYPE_DHCP] = notifier
+ self.remove_networks_from_down_agents()
+ rn.assert_called_with(mock.ANY, agents[0].id, self.network_id)
+ sch.assert_called_with(mock.ANY, {'id': self.network_id})
+ self.assertFalse(notifier.network_added_to_agent.called)
+
+ def test_filter_bindings(self):
+ bindings = [
+ sched_db.NetworkDhcpAgentBinding(network_id='foo1',
+ dhcp_agent={'id': 'id1'}),
+ sched_db.NetworkDhcpAgentBinding(network_id='foo2',
+ dhcp_agent={'id': 'id1'}),
+ sched_db.NetworkDhcpAgentBinding(network_id='foo3',
+ dhcp_agent={'id': 'id2'}),
+ sched_db.NetworkDhcpAgentBinding(network_id='foo4',
+ dhcp_agent={'id': 'id2'})]
+ with mock.patch.object(self, '_agent_starting_up',
+ side_effect=[True, False]):
+ res = [b for b in self._filter_bindings(None, bindings)]
+ # once per each agent id1 and id2
+ self.assertEqual(2, len(res))
+ res_ids = [b.network_id for b in res]
+ self.assertIn('foo3', res_ids)
+ self.assertIn('foo4', res_ids)
# Avoid runs of the synchronizer looping call
patch_sync = mock.patch.object(sync, '_start_loopingcall')
patch_sync.start()
+ dhcp_periodic_p = mock.patch('neutron.db.agentschedulers_db.'
+ 'DhcpAgentSchedulerDbMixin.'
+ 'start_periodic_dhcp_agent_status_check')
+ dhcp_periodic_p.start()
def _assert_required_options(self, cluster):
self.assertEqual(cluster.nsx_controllers, ['fake_1:443', 'fake_2:443'])
# Avoid runs of the synchronizer looping call
patch_sync = mock.patch.object(sync, '_start_loopingcall')
patch_sync.start()
+ dhcp_periodic_p = mock.patch('neutron.db.agentschedulers_db.'
+ 'DhcpAgentSchedulerDbMixin.'
+ 'start_periodic_dhcp_agent_status_check')
+ dhcp_periodic_p.start()
def _assert_required_options(self, cluster):
self.assertEqual(cluster.nsx_controllers, ['fake_1:443', 'fake_2:443'])
'--config-file', vmware.get_fake_conf('nsx.ini.test')]
self.config_parse(args=args)
cfg.CONF.set_override('allow_overlapping_ips', True)
+ dhcp_periodic_p = mock.patch('neutron.db.agentschedulers_db.'
+ 'DhcpAgentSchedulerDbMixin.'
+ 'start_periodic_dhcp_agent_status_check')
+ dhcp_periodic_p.start()
self._plugin = plugin.NsxPlugin()
# Mock neutron manager plugin load functions to speed up tests
mock_nm_get_plugin = mock.patch('neutron.manager.NeutronManager.'