def get_nondvr_active_network_ports(self, session, network_id):
query = self._get_active_network_ports(session, network_id)
- return query.filter(models_v2.Port.device_owner !=
- const.DEVICE_OWNER_DVR_INTERFACE)
+ query = query.filter(models_v2.Port.device_owner !=
+ const.DEVICE_OWNER_DVR_INTERFACE)
+ return [(bind, agent) for bind, agent in query.all()
+ if self.get_agent_ip(agent)]
def get_dvr_active_network_ports(self, session, network_id):
with session.begin(subtransactions=True):
const.PORT_STATUS_ACTIVE,
models_v2.Port.device_owner ==
const.DEVICE_OWNER_DVR_INTERFACE)
- return query
+ return [(bind, agent) for bind, agent in query.all()
+ if self.get_agent_ip(agent)]
def get_agent_network_active_port_count(self, session, agent_host,
network_id):
session = db_api.get_session()
agent = self.get_agent_by_host(session, agent_host)
if not agent:
+ LOG.warning(_LW("Unable to retrieve active L2 agent on host %s"),
+ agent_host)
return
agent_ip = self.get_agent_ip(agent)
- if not agent_ip:
- LOG.warning(_LW("Unable to retrieve the agent ip, check the agent "
- "configuration."))
- return
segment = context.bottom_bound_segment
if not segment:
'network_type': segment['network_type'],
'ports': {}}}
tunnel_network_ports = (
- self.get_dvr_active_network_ports(session, network_id).all())
+ self.get_dvr_active_network_ports(session, network_id))
fdb_network_ports = (
- self.get_nondvr_active_network_ports(session, network_id).all())
+ self.get_nondvr_active_network_ports(session, network_id))
ports = agent_fdb_entries[network_id]['ports']
ports.update(self._get_tunnels(
fdb_network_ports + tunnel_network_ports,
from neutron.common import constants
from neutron import context
+from neutron.db import models_v2
+from neutron.extensions import portbindings
from neutron.plugins.ml2.drivers.l2pop import db as l2pop_db
+from neutron.plugins.ml2 import models
from neutron.tests.common import helpers
from neutron.tests.unit import testlib_api
def setUp(self):
super(TestL2PopulationDBTestCase, self).setUp()
self.db_mixin = l2pop_db.L2populationDbMixin()
+ self.ctx = context.get_admin_context()
def test_get_agent_by_host(self):
# Register a L2 agent + A bunch of other agents on the same host
helpers.register_dhcp_agent()
helpers.register_ovs_agent()
agent = self.db_mixin.get_agent_by_host(
- context.get_admin_context().session, helpers.HOST)
+ self.ctx.session, helpers.HOST)
self.assertEqual(constants.AGENT_TYPE_OVS, agent.agent_type)
def test_get_agent_by_host_no_candidate(self):
helpers.register_l3_agent()
helpers.register_dhcp_agent()
agent = self.db_mixin.get_agent_by_host(
- context.get_admin_context().session, helpers.HOST)
+ self.ctx.session, helpers.HOST)
self.assertIsNone(agent)
+
+ def _setup_port_binding(self, network_id='network_id', dvr=True):
+ with self.ctx.session.begin(subtransactions=True):
+ self.ctx.session.add(models_v2.Network(id=network_id))
+ device_owner = constants.DEVICE_OWNER_DVR_INTERFACE if dvr else ''
+ self.ctx.session.add(models_v2.Port(
+ id='port_id',
+ network_id=network_id,
+ mac_address='00:11:22:33:44:55',
+ admin_state_up=True,
+ status=constants.PORT_STATUS_ACTIVE,
+ device_id='',
+ device_owner=device_owner))
+ port_binding_cls = (models.DVRPortBinding if dvr
+ else models.PortBinding)
+ binding_kwarg = {
+ 'port_id': 'port_id',
+ 'host': helpers.HOST,
+ 'vif_type': portbindings.VIF_TYPE_UNBOUND,
+ 'vnic_type': portbindings.VNIC_NORMAL
+ }
+ if dvr:
+ binding_kwarg['router_id'] = 'router_id'
+ binding_kwarg['status'] = constants.PORT_STATUS_DOWN
+
+ self.ctx.session.add(port_binding_cls(**binding_kwarg))
+
+ def test_get_dvr_active_network_ports(self):
+ self._setup_port_binding()
+ # Register a L2 agent + A bunch of other agents on the same host
+ helpers.register_l3_agent()
+ helpers.register_dhcp_agent()
+ helpers.register_ovs_agent()
+ tunnel_network_ports = self.db_mixin.get_dvr_active_network_ports(
+ self.ctx.session, 'network_id')
+ self.assertEqual(1, len(tunnel_network_ports))
+ _, agent = tunnel_network_ports[0]
+ self.assertEqual(constants.AGENT_TYPE_OVS, agent.agent_type)
+
+ def test_get_dvr_active_network_ports_no_candidate(self):
+ self._setup_port_binding()
+ # Register a bunch of non-L2 agents on the same host
+ helpers.register_l3_agent()
+ helpers.register_dhcp_agent()
+ tunnel_network_ports = self.db_mixin.get_dvr_active_network_ports(
+ self.ctx.session, 'network_id')
+ self.assertEqual(0, len(tunnel_network_ports))
+
+ def test_get_nondvr_active_network_ports(self):
+ self._setup_port_binding(dvr=False)
+ # Register a L2 agent + A bunch of other agents on the same host
+ helpers.register_l3_agent()
+ helpers.register_dhcp_agent()
+ helpers.register_ovs_agent()
+ fdb_network_ports = self.db_mixin.get_nondvr_active_network_ports(
+ self.ctx.session, 'network_id')
+ self.assertEqual(1, len(fdb_network_ports))
+ _, agent = fdb_network_ports[0]
+ self.assertEqual(constants.AGENT_TYPE_OVS, agent.agent_type)
+
+ def test_get_nondvr_active_network_ports_no_candidate(self):
+ self._setup_port_binding(dvr=False)
+ # Register a bunch of non-L2 agents on the same host
+ helpers.register_l3_agent()
+ helpers.register_dhcp_agent()
+ fdb_network_ports = self.db_mixin.get_nondvr_active_network_ports(
+ self.ctx.session, 'network_id')
+ self.assertEqual(0, len(fdb_network_ports))
tunnels = self._test_get_tunnels(None, exclude_host=False)
self.assertEqual(0, len(tunnels))
- def _test_create_agent_fdb(self, fdb_network_ports_query, agent_ips):
+ def _test_create_agent_fdb(self, fdb_network_ports, agent_ips):
mech_driver = l2pop_mech_driver.L2populationMechanismDriver()
- tunnel_network_ports_query, tunnel_agent = (
- self._mock_network_ports_query(HOST + '1', None))
+ tunnel_network_ports, tunnel_agent = (
+ self._mock_network_ports(HOST + '1', None))
agent_ips[tunnel_agent] = '10.0.0.1'
def agent_ip_side_effect(agent):
side_effect=agent_ip_side_effect),\
mock.patch.object(l2pop_db.L2populationDbMixin,
'get_nondvr_active_network_ports',
- new=fdb_network_ports_query),\
+ return_value=fdb_network_ports),\
mock.patch.object(l2pop_db.L2populationDbMixin,
'get_dvr_active_network_ports',
- new=tunnel_network_ports_query):
+ return_value=tunnel_network_ports):
session = mock.Mock()
agent = mock.Mock()
agent.host = HOST
segment,
'network_id')
- def _mock_network_ports_query(self, host_name, binding):
+ def _mock_network_ports(self, host_name, binding):
agent = mock.Mock()
agent.host = host_name
- result = [(binding, agent)]
- all_mock = mock.Mock()
- all_mock.all = mock.Mock(return_value=result)
- mock_query = mock.Mock(return_value=all_mock)
- return mock_query, agent
+ return [(binding, agent)], agent
def test_create_agent_fdb(self):
binding = mock.Mock()
binding.port = {'mac_address': '00:00:DE:AD:BE:EF',
'fixed_ips': [{'ip_address': '1.1.1.1'}]}
- fdb_network_ports_query, fdb_agent = (
- self._mock_network_ports_query(HOST + '2', binding))
+ fdb_network_ports, fdb_agent = (
+ self._mock_network_ports(HOST + '2', binding))
agent_ips = {fdb_agent: '20.0.0.1'}
- agent_fdb = self._test_create_agent_fdb(fdb_network_ports_query,
+ agent_fdb = self._test_create_agent_fdb(fdb_network_ports,
agent_ips)
result = agent_fdb['network_id']
self.assertEqual(expected_result, result)
def test_create_agent_fdb_only_tunnels(self):
- all_mock = mock.Mock()
- all_mock.all = mock.Mock(return_value=[])
- fdb_network_ports_query = mock.Mock(return_value=all_mock)
- agent_fdb = self._test_create_agent_fdb(fdb_network_ports_query, {})
+ agent_fdb = self._test_create_agent_fdb([], {})
result = agent_fdb['network_id']
expected_result = {'segment_id': 1,