context.get_admin_context(),
agent_id,
{'agent': {'admin_state_up': admin_state_up}})
+
+
+def _get_ovs_agent_dict(host, agent_type, binary, tunnel_types,
+ tunneling_ip='20.0.0.1', interface_mappings=None,
+ l2pop_network_types=None):
+ agent = {
+ 'binary': binary,
+ 'host': host,
+ 'topic': constants.L2_AGENT_TOPIC,
+ 'configurations': {'tunneling_ip': tunneling_ip,
+ 'tunnel_types': tunnel_types},
+ 'agent_type': agent_type,
+ 'tunnel_type': [],
+ 'start_flag': True}
+
+ if interface_mappings is not None:
+ agent['configurations']['interface_mappings'] = interface_mappings
+ if l2pop_network_types is not None:
+ agent['configurations']['l2pop_network_types'] = l2pop_network_types
+ return agent
+
+
+def register_ovs_agent(host=HOST, agent_type=constants.AGENT_TYPE_OVS,
+ binary='neutron-openvswitch-agent',
+ tunnel_types=['vxlan'], tunneling_ip='20.0.0.1',
+ interface_mappings=None,
+ l2pop_network_types=None):
+ agent = _get_ovs_agent_dict(host, agent_type, binary, tunnel_types,
+ tunneling_ip, interface_mappings,
+ l2pop_network_types)
+ return _register_agent(agent)
import testtools
import mock
-from oslo_utils import timeutils
from neutron.agent import l2population_rpc
from neutron.common import constants
from neutron.common import topics
from neutron import context
-from neutron.db import agents_db
from neutron.extensions import portbindings
from neutron.extensions import providernet as pnet
from neutron import manager
from neutron.plugins.ml2 import managers
from neutron.plugins.ml2 import rpc
from neutron.tests import base
+from neutron.tests.common import helpers
from neutron.tests.unit.plugins.ml2 import test_plugin
HOST = 'my_l2_host'
-L2_AGENT = {
- 'binary': 'neutron-openvswitch-agent',
- 'host': HOST,
- 'topic': constants.L2_AGENT_TOPIC,
- 'configurations': {'tunneling_ip': '20.0.0.1',
- 'tunnel_types': ['vxlan']},
- 'agent_type': constants.AGENT_TYPE_OVS,
- 'tunnel_type': [],
- 'start_flag': True
-}
-
-L2_AGENT_2 = {
- 'binary': 'neutron-openvswitch-agent',
- 'host': HOST + '_2',
- 'topic': constants.L2_AGENT_TOPIC,
- 'configurations': {'tunneling_ip': '20.0.0.2',
- 'tunnel_types': ['vxlan']},
- 'agent_type': constants.AGENT_TYPE_OVS,
- 'tunnel_type': [],
- 'start_flag': True
-}
-
-L2_AGENT_3 = {
- 'binary': 'neutron-openvswitch-agent',
- 'host': HOST + '_3',
- 'topic': constants.L2_AGENT_TOPIC,
- 'configurations': {'tunneling_ip': '20.0.0.3',
- 'tunnel_types': []},
- 'agent_type': constants.AGENT_TYPE_OVS,
- 'tunnel_type': [],
- 'start_flag': True
-}
-
-L2_AGENT_4 = {
- 'binary': 'neutron-openvswitch-agent',
- 'host': HOST + '_4',
- 'topic': constants.L2_AGENT_TOPIC,
- 'configurations': {'tunneling_ip': '20.0.0.4',
- 'tunnel_types': ['vxlan']},
- 'agent_type': constants.AGENT_TYPE_OVS,
- 'tunnel_type': [],
- 'start_flag': True
-}
-
-L2_AGENT_5 = {
- 'binary': 'neutron-fake-agent',
- 'host': HOST + '_5',
- 'topic': constants.L2_AGENT_TOPIC,
- 'configurations': {'tunneling_ip': '20.0.0.5',
- 'tunnel_types': [],
- 'interface_mappings': {'physnet1': 'eth9'},
- 'l2pop_network_types': ['vlan']},
- # NOTE(yamamoto): mech_fake_agent has a comment to explain why
- # OFA is used here.
- 'agent_type': constants.AGENT_TYPE_OFA,
- 'tunnel_type': [],
- 'start_flag': True
-}
+HOST_2 = HOST + '_2'
+HOST_3 = HOST + '_3'
+HOST_4 = HOST + '_4'
+HOST_5 = HOST + '_5'
+
NOTIFIER = 'neutron.plugins.ml2.rpc.AgentNotifierApi'
DEVICE_OWNER_COMPUTE = 'compute:None'
uptime_patch.start()
def _register_ml2_agents(self):
- callback = agents_db.AgentExtRpcCallback()
- callback.report_state(self.adminContext,
- agent_state={'agent_state': L2_AGENT},
- time=timeutils.strtime())
- callback.report_state(self.adminContext,
- agent_state={'agent_state': L2_AGENT_2},
- time=timeutils.strtime())
- callback.report_state(self.adminContext,
- agent_state={'agent_state': L2_AGENT_3},
- time=timeutils.strtime())
- callback.report_state(self.adminContext,
- agent_state={'agent_state': L2_AGENT_4},
- time=timeutils.strtime())
- callback.report_state(self.adminContext,
- agent_state={'agent_state': L2_AGENT_5},
- time=timeutils.strtime())
+ helpers.register_ovs_agent(host=HOST, tunneling_ip='20.0.0.1')
+ helpers.register_ovs_agent(host=HOST_2, tunneling_ip='20.0.0.2')
+ helpers.register_ovs_agent(host=HOST_3, tunneling_ip='20.0.0.3',
+ tunnel_types=[])
+ helpers.register_ovs_agent(host=HOST_4, tunneling_ip='20.0.0.4')
+ helpers.register_ovs_agent(host=HOST_5, tunneling_ip='20.0.0.5',
+ binary='neutron-fake-agent',
+ tunnel_types=[],
+ interface_mappings={'physnet1': 'eth9'},
+ agent_type=constants.AGENT_TYPE_OFA,
+ l2pop_network_types=['vlan'])
def test_port_info_compare(self):
# An assumption the code makes is that PortInfo compares equal to
def test_get_device_details_port_id(self):
self._register_ml2_agents()
- host_arg = {portbindings.HOST_ID: L2_AGENT['host']}
+ host_arg = {portbindings.HOST_ID: HOST}
with self.port(arg_list=(portbindings.HOST_ID,),
**host_arg) as port:
port_id = port['port']['id']
for device in formats:
details = self.callbacks.get_device_details(
self.adminContext, device=device,
- agent_id=L2_AGENT_2['host'])
+ agent_id=HOST_2)
self.assertEqual(port_id, details['port_id'])
def _update_and_check_portbinding(self, port_id, host_id):
def _test_host_changed(self, twice):
self._register_ml2_agents()
with self.subnet(network=self._network) as subnet:
- host_arg = {portbindings.HOST_ID: L2_AGENT['host']}
+ host_arg = {portbindings.HOST_ID: HOST}
with self.port(subnet=subnet, cidr='10.0.0.0/24',
device_owner=DEVICE_OWNER_COMPUTE,
arg_list=(portbindings.HOST_ID,),
device1 = 'tap' + p1['id']
self.callbacks.update_device_up(
self.adminContext,
- agent_id=L2_AGENT['host'],
+ agent_id=HOST,
device=device1)
if twice:
- self._update_and_check_portbinding(p1['id'],
- L2_AGENT_4['host'])
- self._update_and_check_portbinding(p1['id'],
- L2_AGENT_2['host'])
+ self._update_and_check_portbinding(p1['id'], HOST_4)
+ self._update_and_check_portbinding(p1['id'], HOST_2)
self.mock_fanout.reset_mock()
# NOTE(yamamoto): see bug #1441488
self.adminContext.session.expire_all()
self.callbacks.get_device_details(
self.adminContext,
device=device1,
- agent_id=L2_AGENT_2['host'])
+ agent_id=HOST_2)
p1_ips = [p['ip_address'] for p in p1['fixed_ips']]
expected = {p1['network_id']:
{'ports':