# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+import datetime
+import random
+import time
from oslo.config import cfg
import sqlalchemy as sa
from sqlalchemy.orm import joinedload
from neutron.common import constants
+from neutron import context as n_ctx
from neutron.db import agents_db
from neutron.db import agentschedulers_db
from neutron.db import model_base
from neutron.extensions import l3agentscheduler
from neutron import manager
+from neutron.openstack.common.gettextutils import _LI, _LW
+from neutron.openstack.common import log as logging
+from neutron.openstack.common import loopingcall
+from neutron.openstack.common import timeutils
+
+
+LOG = logging.getLogger(__name__)
L3_AGENTS_SCHEDULER_OPTS = [
cfg.StrOpt('router_scheduler_driver',
'router to a default L3 agent')),
cfg.BoolOpt('router_auto_schedule', default=True,
help=_('Allow auto scheduling of routers to L3 agent.')),
+ cfg.BoolOpt('allow_automatic_l3agent_failover', default=False,
+ help=_('Automatically reschedule routers from offline L3 '
+ 'agents to online L3 agents.')),
]
cfg.CONF.register_opts(L3_AGENTS_SCHEDULER_OPTS)
router_scheduler = None
+ def start_periodic_agent_status_check(self):
+ if not cfg.CONF.allow_automatic_l3agent_failover:
+ LOG.info(_LI("Skipping period L3 agent status check because "
+ "automatic router rescheduling is disabled."))
+ return
+
+ self.periodic_agent_loop = loopingcall.FixedIntervalLoopingCall(
+ self.reschedule_routers_from_down_agents)
+ interval = max(cfg.CONF.agent_down_time / 2, 1)
+ # add random initial delay to allow agents to check in after the
+ # neutron server first starts. random to offset multiple servers
+ self.periodic_agent_loop.start(interval=interval,
+ initial_delay=random.randint(interval, interval * 2))
+
+ def reschedule_routers_from_down_agents(self):
+ """Reschedule routers from down l3 agents if admin state is up."""
+
+ # give agents extra time to handle transient failures
+ agent_dead_limit = cfg.CONF.agent_down_time * 2
+
+ # check for an abrupt clock change since last check. if a change is
+ # detected, sleep for a while to let the agents check in.
+ tdelta = timeutils.utcnow() - getattr(self, '_clock_jump_canary',
+ timeutils.utcnow())
+ if timeutils.total_seconds(tdelta) > cfg.CONF.agent_down_time:
+ LOG.warn(_LW("Time since last L3 agent reschedule check has "
+ "exceeded the interval between checks. Waiting "
+ "before check to allow agents to send a heartbeat "
+ "in case there was a clock adjustment."))
+ time.sleep(agent_dead_limit)
+ self._clock_jump_canary = timeutils.utcnow()
+
+ context = n_ctx.get_admin_context()
+ cutoff = timeutils.utcnow() - datetime.timedelta(
+ seconds=agent_dead_limit)
+ down_bindings = (
+ context.session.query(RouterL3AgentBinding).
+ filter(agents_db.Agent.heartbeat_timestamp < cutoff,
+ agents_db.Agent.admin_state_up))
+ for binding in down_bindings:
+ LOG.warn(_LW("Rescheduling router %(router)s from agent %(agent)s "
+ "because the agent did not report to the server in "
+ "the last %(dead_time)s seconds."),
+ {'router': binding.router_id,
+ 'agent': binding.l3_agent_id,
+ 'dead_time': agent_dead_limit})
+ self.reschedule_router(context, binding.router_id)
+
def add_router_to_l3_agent(self, context, agent_id, router_id):
"""Add a l3 agent to host a router."""
router = self.get_router(context, router_id)
import contextlib
import copy
+import datetime
import mock
from oslo.config import cfg
from neutron import context
from neutron.db import agents_db
from neutron.db import dhcp_rpc_base
+from neutron.db import l3_agentschedulers_db
from neutron.db import l3_rpc_base
from neutron.extensions import agent
from neutron.extensions import dhcpagentscheduler
self.l3_notify_p = mock.patch(
'neutron.extensions.l3agentscheduler.notify')
self.patched_l3_notify = self.l3_notify_p.start()
+ self.l3_periodic_p = mock.patch('neutron.db.L3AgentSchedulerDbMixin.'
+ 'start_periodic_agent_status_check')
+ self.patched_l3_periodic = self.l3_notify_p.start()
self.dhcp_notify_p = mock.patch(
'neutron.extensions.dhcpagentscheduler.notify')
self.patched_dhcp_notify = self.dhcp_notify_p.start()
self.assertEqual(port_list['ports'][0]['device_id'],
constants.DEVICE_ID_RESERVED_DHCP_PORT)
+ def _take_down_agent_and_run_reschedule(self, host):
+ # take down the agent on host A and ensure B is alive
+ self.adminContext.session.begin(subtransactions=True)
+ query = self.adminContext.session.query(agents_db.Agent)
+ agt = query.filter_by(host=host).first()
+ agt.heartbeat_timestamp = (
+ agt.heartbeat_timestamp - datetime.timedelta(hours=1))
+ self.adminContext.session.commit()
+
+ plugin = manager.NeutronManager.get_service_plugins().get(
+ service_constants.L3_ROUTER_NAT)
+
+ plugin.reschedule_routers_from_down_agents()
+
+ def _set_agent_admin_state_up(self, host, state):
+ self.adminContext.session.begin(subtransactions=True)
+ query = self.adminContext.session.query(agents_db.Agent)
+ agt_db = query.filter_by(host=host).first()
+ agt_db.admin_state_up = state
+ self.adminContext.session.commit()
+
+ def test_router_reschedule_from_dead_agent(self):
+ with self.router():
+ l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
+ self._register_agent_states()
+
+ # schedule the router to host A
+ ret_a = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
+ self._take_down_agent_and_run_reschedule(L3_HOSTA)
+
+ # B should now pick up the router
+ ret_b = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB)
+ self.assertEqual(ret_b, ret_a)
+
+ def test_router_no_reschedule_from_dead_admin_down_agent(self):
+ with self.router() as r:
+ l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
+ self._register_agent_states()
+
+ # schedule the router to host A
+ l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
+ self._set_agent_admin_state_up(L3_HOSTA, False)
+ self._take_down_agent_and_run_reschedule(L3_HOSTA)
+
+ # A should still have it even though it was inactive due to the
+ # admin_state being down
+ rab = l3_agentschedulers_db.RouterL3AgentBinding
+ binding = (self.adminContext.session.query(rab).
+ filter(rab.router_id == r['router']['id']).first())
+ self.assertEqual(binding.l3_agent.host, L3_HOSTA)
+
+ # B should not pick up the router
+ ret_b = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB)
+ self.assertFalse(ret_b)
+
def test_router_auto_schedule_with_invalid_router(self):
with self.router() as router:
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()