seconds=agent_dead_limit)
down_bindings = (
context.session.query(RouterL3AgentBinding).
+ join(agents_db.Agent).
filter(agents_db.Agent.heartbeat_timestamp < cutoff,
agents_db.Agent.admin_state_up))
for binding in down_bindings:
agt_db.admin_state_up = state
self.adminContext.session.commit()
+ def test_router_is_not_rescheduled_from_alive_agent(self):
+ with self.router():
+ l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
+ self._register_agent_states()
+
+ # schedule the router to host A
+ l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
+ with mock.patch('neutron.db.l3_agentschedulers_db.'
+ 'L3AgentSchedulerDbMixin.reschedule_router') as rr:
+ # take down some unrelated agent and run reschedule check
+ self._take_down_agent_and_run_reschedule(DHCP_HOSTC)
+ self.assertFalse(rr.called)
+
def test_router_reschedule_from_dead_agent(self):
with self.router():
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()