# License for the specific language governing permissions and limitations
# under the License.
-import datetime
import mock
from oslo_config import cfg
-from oslo_utils import timeutils
from oslo_utils import uuidutils
from neutron.api.rpc.handlers import l3_rpc
self.agent2 = helpers.register_l3_agent(
'host_2', constants.L3_AGENT_MODE_DVR_SNAT)
- def _bring_down_agent(self, agent_id):
- update = {
- 'agent': {
- 'heartbeat_timestamp':
- timeutils.utcnow() - datetime.timedelta(hours=1)}}
- self.plugin.update_agent(self.admin_ctx, agent_id, update)
-
def _create_router(self, ha=True, tenant_id='tenant1', distributed=None,
ctx=None):
if ctx is None:
def test_get_number_of_agents_for_scheduling_not_enough_agents(self):
cfg.CONF.set_override('min_l3_agents_per_router', 3)
- agent_to_bring_down = helpers.register_l3_agent(host='l3host_3')
- self._bring_down_agent(agent_to_bring_down['id'])
+ helpers.kill_agent(helpers.register_l3_agent(host='l3host_3')['id'])
self.assertRaises(l3_ext_ha_mode.HANotEnoughAvailableAgents,
self.plugin.get_number_of_agents_for_scheduling,
self.admin_ctx)