self.set_db_attribute('Bridge', self.br_name, 'protocols', protocols,
check_error=True)
- def create(self):
- self.ovsdb.add_br(self.br_name).execute()
+ def create(self, secure_mode=False):
+ with self.ovsdb.transaction() as txn:
+ txn.add(self.ovsdb.add_br(self.br_name))
+ if secure_mode:
+ txn.add(self.ovsdb.set_fail_mode(self.br_name,
+ FAILMODE_SECURE))
# Don't return until vswitchd sets up the internal port
self.get_port_ofport(self.br_name)
if 'NXST' not in item)
return retval
+ def dump_all_flows(self):
+ return [f for f in self.run_ofctl("dump-flows", []).splitlines()
+ if 'NXST' not in f]
+
def deferred(self, **kwargs):
return DeferredOVSBridge(self, **kwargs)
cfg.IntOpt('quitting_rpc_timeout', default=10,
help=_("Set new timeout in seconds for new rpc calls after "
"agent receives SIGTERM. If value is set to 0, rpc "
- "timeout won't be changed"))
+ "timeout won't be changed")),
+ cfg.BoolOpt('drop_flows_on_start', default=False,
+ help=_("Reset flow table on start. Setting this to True will "
+ "cause brief traffic interruption."))
]
"""openvswitch agent br-int specific logic."""
def setup_default_table(self):
- self.delete_flows()
self.install_normal()
self.setup_canary_table()
self.install_drop(table_id=constants.ARP_SPOOF_TABLE)
# to dynamically set-up flows in UCAST_TO_TUN corresponding to
# remote mac addresses (assumes that lvid has already been set by
# a previous flow)
- learned_flow = ("table=%s,"
+ learned_flow = ("cookie=%(cookie)s,"
+ "table=%(table)s,"
"priority=1,"
"hard_timeout=300,"
"NXM_OF_VLAN_TCI[0..11],"
"load:0->NXM_OF_VLAN_TCI[],"
"load:NXM_NX_TUN_ID[]->NXM_NX_TUN_ID[],"
"output:NXM_OF_IN_PORT[]" %
- constants.UCAST_TO_TUN)
+ {'cookie': self.agent_uuid_stamp,
+ 'table': constants.UCAST_TO_TUN})
# Once remote mac addresses are learnt, output packet to patch_int
deferred_br.add_flow(table=constants.LEARN_FROM_TUN,
priority=1,
# License for the specific language governing permissions and limitations
# under the License.
+import re
+
+from oslo_log import log as logging
+
+from neutron.i18n import _LW
+
+LOG = logging.getLogger(__name__)
+
# Field name mappings (from Ryu to ovs-ofctl)
_keywords = {
'eth_src': 'dl_src',
class OpenFlowSwitchMixin(object):
"""Mixin to provide common convenient routines for an openflow switch."""
+ agent_uuid_stamp = '0x0'
+
+ def set_agent_uuid_stamp(self, val):
+ self.agent_uuid_stamp = val
@staticmethod
def _conv_args(kwargs):
def dump_flows(self, table_id):
return self.dump_flows_for_table(table_id)
+ def dump_flows_all_tables(self):
+ return self.dump_all_flows()
+
def install_goto_next(self, table_id):
self.install_goto(table_id=table_id, dest_table_id=table_id + 1)
**self._conv_args(kwargs))
else:
super(OpenFlowSwitchMixin, self).remove_all_flows()
+
+ def add_flow(self, **kwargs):
+ kwargs['cookie'] = self.agent_uuid_stamp
+ super(OpenFlowSwitchMixin, self).add_flow(**self._conv_args(kwargs))
+
+ def mod_flow(self, **kwargs):
+ kwargs['cookie'] = self.agent_uuid_stamp
+ super(OpenFlowSwitchMixin, self).mod_flow(**self._conv_args(kwargs))
+
+ def _filter_flows(self, flows):
+ LOG.debug("Agent uuid stamp used to filter flows: %s",
+ self.agent_uuid_stamp)
+ cookie_re = re.compile('cookie=(0x[A-Fa-f0-9]*)')
+ table_re = re.compile('table=([0-9]*)')
+ for flow in flows:
+ fl_cookie = cookie_re.search(flow)
+ if not fl_cookie:
+ continue
+ fl_cookie = fl_cookie.group(1)
+ if int(fl_cookie, 16) != self.agent_uuid_stamp:
+ fl_table = table_re.search(flow)
+ if not fl_table:
+ continue
+ fl_table = fl_table.group(1)
+ yield flow, fl_cookie, fl_table
+
+ def cleanup_flows(self):
+ flows = self.dump_flows_all_tables()
+ for flow, cookie, table in self._filter_flows(flows):
+ # deleting a stale flow should be rare.
+ # it might deserve some attention
+ LOG.warning(_LW("Deleting flow %s"), flow)
+ self.delete_flows(cookie=cookie + '/-1', table=table)
import signal
import sys
import time
+import uuid
import netaddr
from oslo_config import cfg
# A placeholder for dead vlans.
DEAD_VLAN_TAG = p_const.MAX_VLAN_TAG + 1
+UINT64_BITMASK = (1 << 64) - 1
class _mac_mydialect(netaddr.mac_unix):
# Keep track of int_br's device count for use by _report_state()
self.int_br_device_count = 0
+ self.agent_uuid_stamp = uuid.uuid4().int & UINT64_BITMASK
+
self.int_br = self.br_int_cls(integ_br)
self.setup_integration_br()
# Stores port update notifications for processing in main rpc loop
self.patch_tun_ofport = constants.OFPORT_INVALID
if self.enable_tunneling:
# The patch_int_ofport and patch_tun_ofport are updated
- # here inside the call to reset_tunnel_br()
- self.reset_tunnel_br(tun_br)
+ # here inside the call to setup_tunnel_br()
+ self.setup_tunnel_br(tun_br)
self.dvr_agent = ovs_dvr_neutron_agent.OVSDVRNeutronAgent(
self.context,
heartbeat.start(interval=report_interval)
if self.enable_tunneling:
- self.setup_tunnel_br()
+ self.setup_tunnel_br_flows()
self.dvr_agent.setup_dvr_flows()
def setup_integration_br(self):
'''Setup the integration bridge.
- Delete patch ports and remove all existing flows.
'''
+ self.int_br.set_agent_uuid_stamp(self.agent_uuid_stamp)
# Ensure the integration bridge is created.
# ovs_lib.OVSBridge.create() will run
# ovs-vsctl -- --may-exist add-br BRIDGE_NAME
self.int_br.setup_controllers(self.conf)
self.int_br.delete_port(self.conf.OVS.int_peer_patch_port)
-
+ if self.conf.AGENT.drop_flows_on_start:
+ self.int_br.delete_flows()
self.int_br.setup_default_table()
def setup_ancillary_bridges(self, integ_br, tun_br):
ancillary_bridges.append(br)
return ancillary_bridges
- def reset_tunnel_br(self, tun_br_name=None):
+ def setup_tunnel_br(self, tun_br_name=None):
'''(re)initialize the tunnel bridge.
Creates tunnel bridge, and links it to the integration bridge
'''
if not self.tun_br:
self.tun_br = self.br_tun_cls(tun_br_name)
+ self.tun_br.set_agent_uuid_stamp(self.agent_uuid_stamp)
- self.tun_br.reset_bridge(secure_mode=True)
+ if not self.tun_br.bridge_exists('br-tun'):
+ self.tun_br.create(secure_mode=True)
self.tun_br.setup_controllers(self.conf)
- self.patch_tun_ofport = self.int_br.add_patch_port(
- self.conf.OVS.int_peer_patch_port,
- self.conf.OVS.tun_peer_patch_port)
- self.patch_int_ofport = self.tun_br.add_patch_port(
- self.conf.OVS.tun_peer_patch_port,
- self.conf.OVS.int_peer_patch_port)
+ if (not self.int_br.port_exists(self.conf.OVS.int_peer_patch_port) or
+ self.patch_tun_ofport == ovs_lib.INVALID_OFPORT):
+ self.patch_tun_ofport = self.int_br.add_patch_port(
+ self.conf.OVS.int_peer_patch_port,
+ self.conf.OVS.tun_peer_patch_port)
+ if (not self.tun_br.port_exists(self.conf.OVS.tun_peer_patch_port) or
+ self.patch_int_ofport == ovs_lib.INVALID_OFPORT):
+ self.patch_int_ofport = self.tun_br.add_patch_port(
+ self.conf.OVS.tun_peer_patch_port,
+ self.conf.OVS.int_peer_patch_port)
if ovs_lib.INVALID_OFPORT in (self.patch_tun_ofport,
self.patch_int_ofport):
LOG.error(_LE("Failed to create OVS patch port. Cannot have "
"version of OVS does not support tunnels or patch "
"ports. Agent terminated!"))
exit(1)
- self.tun_br.delete_flows()
+ if self.conf.AGENT.drop_flows_on_start:
+ self.tun_br.delete_flows()
- def setup_tunnel_br(self):
+ def setup_tunnel_br_flows(self):
'''Setup the tunnel bridge.
Add all flows to the tunnel bridge.
bridge)
phys_if_name = self.get_peer_name(constants.PEER_PHYSICAL_PREFIX,
bridge)
- self.int_br.delete_port(int_if_name)
- br.delete_port(phys_if_name)
+ # Interface type of port for physical and integration bridges must
+ # be same, so check only one of them.
+ int_type = self.int_br.db_get_val("Interface", int_if_name, "type")
if self.use_veth_interconnection:
+ # Drop ports if the interface types doesn't match the
+ # configuration value.
+ if int_type == 'patch':
+ self.int_br.delete_port(int_if_name)
+ br.delete_port(phys_if_name)
if ip_lib.device_exists(int_if_name):
ip_lib.IPDevice(int_if_name).link.delete()
# Give udev a chance to process its rules here, to avoid
int_ofport = self.int_br.add_port(int_veth)
phys_ofport = br.add_port(phys_veth)
else:
+ # Drop ports if the interface type doesn't match the
+ # configuration value
+ if int_type == 'veth':
+ self.int_br.delete_port(int_if_name)
+ br.delete_port(phys_if_name)
# Create patch ports without associating them in order to block
# untranslated traffic before association
int_ofport = self.int_br.add_patch_port(
'removed': len(ancillary_port_info.get('removed', []))}
return port_stats
+ def cleanup_stale_flows(self):
+ if self.iter_num == 0:
+ bridges = [self.int_br]
+ if self.enable_tunneling:
+ bridges.append(self.tun_br)
+ for bridge in bridges:
+ LOG.info(_LI("Cleaning stale %s flows"), bridge.br_name)
+ bridge.cleanup_flows()
+
def rpc_loop(self, polling_manager=None):
if not polling_manager:
polling_manager = polling.get_polling_manager(
self.setup_integration_br()
self.setup_physical_bridges(self.bridge_mappings)
if self.enable_tunneling:
- self.reset_tunnel_br()
self.setup_tunnel_br()
+ self.setup_tunnel_br_flows()
tunnel_sync = True
if self.enable_distributed_routing:
self.dvr_agent.reset_ovs_parameters(self.int_br,
# If treat devices fails - must resync with plugin
sync = self.process_network_ports(port_info,
ovs_restarted)
+ self.cleanup_stale_flows()
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
"ports processed. Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
#
import abc
+from concurrent import futures
+import contextlib
import functools
import os
import random
dst_ip])
+@contextlib.contextmanager
+def async_ping(namespace, ips):
+ with futures.ThreadPoolExecutor(max_workers=len(ips)) as executor:
+ fs = [executor.submit(assert_ping, namespace, ip, count=10)
+ for ip in ips]
+ yield lambda: all(f.done() for f in fs)
+ futures.wait(fs)
+ for f in fs:
+ f.result()
+
+
def assert_no_ping(src_namespace, dst_ip, timeout=1, count=1):
try:
assert_ping(src_namespace, dst_ip, timeout, count)
import br_tun
from neutron.plugins.ml2.drivers.openvswitch.agent import ovs_neutron_agent \
as ovs_agent
+from neutron.tests.common import net_helpers
from neutron.tests.functional.agent.linux import base
LOG = logging.getLogger(__name__)
self.ovs = ovs_lib.BaseOVS()
self.config = self._configure_agent()
self.driver = interface.OVSInterfaceDriver(self.config)
+ self.namespace = self.useFixture(net_helpers.NamespaceFixture()).name
def _get_config_opts(self):
config = cfg.ConfigOpts()
self.driver.plug(
network.get('id'), port.get('id'), port.get('vif_name'),
port.get('mac_address'),
- agent.int_br.br_name, namespace=None)
+ agent.int_br.br_name, namespace=self.namespace)
ip_cidrs = ["%s/%s" % (port.get('fixed_ips')[0][
'ip_address'], ip_len)]
- self.driver.init_l3(port.get('vif_name'), ip_cidrs, namespace=None)
+ self.driver.init_l3(port.get('vif_name'), ip_cidrs,
+ namespace=self.namespace)
def _get_device_details(self, port, network):
dev = {'device': port['id'],
lambda: self._expected_plugin_rpc_call(
self.agent.plugin_rpc.update_device_list, port_ids, up))
- def setup_agent_and_ports(self, port_dicts, trigger_resync=False):
- self.agent = self.create_agent()
+ def setup_agent_and_ports(self, port_dicts, create_tunnels=True,
+ trigger_resync=False):
+ self.agent = self.create_agent(create_tunnels=create_tunnels)
self.start_agent(self.agent)
self.network = self._create_test_network_dict()
self.ports = port_dicts
# License for the specific language governing permissions and limitations
# under the License.
+import time
+from neutron.tests.common import net_helpers
from neutron.tests.functional.agent.l2 import base
self.create_agent(create_tunnels=False)
self.assertTrue(self.ovs.bridge_exists(self.br_int))
self.assertFalse(self.ovs.bridge_exists(self.br_tun))
+
+ def test_assert_pings_during_br_int_setup_not_lost(self):
+ self.setup_agent_and_ports(port_dicts=self.create_test_ports(),
+ create_tunnels=False)
+ self.wait_until_ports_state(self.ports, up=True)
+ ips = [port['fixed_ips'][0]['ip_address'] for port in self.ports]
+ with net_helpers.async_ping(self.namespace, ips) as running:
+ while running():
+ self.agent.setup_integration_br()
+ time.sleep(0.25)
cidr = '192.168.1.0/24'
flow_dict_1 = collections.OrderedDict([
+ ('cookie', 1234),
('priority', 2),
('dl_src', 'ca:fe:de:ad:be:ef'),
('actions', 'strip_vlan,output:0')])
flow_dict_2 = collections.OrderedDict([
+ ('cookie', 1254),
('priority', 1),
('actions', 'normal')])
flow_dict_3 = collections.OrderedDict([
+ ('cookie', 1257),
('priority', 2),
('actions', 'drop')])
flow_dict_4 = collections.OrderedDict([
+ ('cookie', 1274),
('priority', 2),
('in_port', ofport),
('actions', 'drop')])
flow_dict_5 = collections.OrderedDict([
+ ('cookie', 1284),
('priority', 4),
('in_port', ofport),
('dl_vlan', vid),
('actions', "strip_vlan,set_tunnel:%s,normal" % (lsw_id))])
flow_dict_6 = collections.OrderedDict([
+ ('cookie', 1754),
('priority', 3),
('tun_id', lsw_id),
('actions', "mod_vlan_vid:%s,output:%s" % (vid, ofport))])
flow_dict_7 = collections.OrderedDict([
+ ('cookie', 1256),
('priority', 4),
('nw_src', cidr),
('proto', 'arp'),
expected_calls = [
self._ofctl_mock("add-flows", self.BR_NAME, '-',
process_input=OFCTLParamListMatcher(
- "hard_timeout=0,idle_timeout=0,"
+ "hard_timeout=0,idle_timeout=0,cookie=1234,"
"priority=2,dl_src=ca:fe:de:ad:be:ef,"
"actions=strip_vlan,output:0")),
self._ofctl_mock("add-flows", self.BR_NAME, '-',
process_input=OFCTLParamListMatcher(
- "hard_timeout=0,idle_timeout=0,"
+ "hard_timeout=0,idle_timeout=0,cookie=1254,"
"priority=1,actions=normal")),
self._ofctl_mock("add-flows", self.BR_NAME, '-',
process_input=OFCTLParamListMatcher(
- "hard_timeout=0,idle_timeout=0,"
+ "hard_timeout=0,idle_timeout=0,cookie=1257,"
"priority=2,actions=drop")),
self._ofctl_mock("add-flows", self.BR_NAME, '-',
process_input=OFCTLParamListMatcher(
- "hard_timeout=0,idle_timeout=0,priority=2,"
- "in_port=%s,actions=drop" % ofport)),
+ "hard_timeout=0,idle_timeout=0,cookie=1274,"
+ "priority=2,in_port=%s,actions=drop" % ofport
+ )),
self._ofctl_mock("add-flows", self.BR_NAME, '-',
process_input=OFCTLParamListMatcher(
- "hard_timeout=0,idle_timeout=0,"
+ "hard_timeout=0,idle_timeout=0,cookie=1284,"
"priority=4,dl_vlan=%s,in_port=%s,"
"actions=strip_vlan,set_tunnel:%s,normal" %
(vid, ofport, lsw_id))),
self._ofctl_mock("add-flows", self.BR_NAME, '-',
process_input=OFCTLParamListMatcher(
- "hard_timeout=0,idle_timeout=0,priority=3,"
- "tun_id=%s,actions=mod_vlan_vid:%s,"
- "output:%s" % (lsw_id, vid, ofport))),
+ "hard_timeout=0,idle_timeout=0,cookie=1754,"
+ "priority=3,"
+ "tun_id=%s,actions=mod_vlan_vid:%s,output:%s"
+ % (lsw_id, vid, ofport))),
self._ofctl_mock("add-flows", self.BR_NAME, '-',
process_input=OFCTLParamListMatcher(
- "hard_timeout=0,idle_timeout=0,priority=4,"
- "nw_src=%s,arp,actions=drop" % cidr)),
+ "hard_timeout=0,idle_timeout=0,cookie=1256,"
+ "priority=4,nw_src=%s,arp,actions=drop"
+ % cidr)),
]
self.execute.assert_has_calls(expected_calls)
def test_add_flow_timeout_set(self):
flow_dict = collections.OrderedDict([
+ ('cookie', 1234),
('priority', 1),
('hard_timeout', 1000),
('idle_timeout', 2000),
self.br.add_flow(**flow_dict)
self._verify_ofctl_mock(
"add-flows", self.BR_NAME, '-',
- process_input="hard_timeout=1000,idle_timeout=2000,priority=1,"
- "actions=normal")
+ process_input="hard_timeout=1000,idle_timeout=2000,"
+ "priority=1,cookie=1234,actions=normal")
def test_add_flow_default_priority(self):
- flow_dict = collections.OrderedDict([('actions', 'normal')])
+ flow_dict = collections.OrderedDict([('actions', 'normal'),
+ ('cookie', 1234)])
self.br.add_flow(**flow_dict)
self._verify_ofctl_mock(
"add-flows", self.BR_NAME, '-',
process_input="hard_timeout=0,idle_timeout=0,priority=1,"
- "actions=normal")
+ "cookie=1234,actions=normal")
def _test_get_port_ofport(self, ofport, expected_result):
pname = "tap99"
]
self.assertEqual(expected, self.mock.mock_calls)
+ def test_dump_flows_for_table(self):
+ table = 23
+ with mock.patch.object(self.br, 'run_ofctl') as run_ofctl:
+ self.br.dump_flows(table)
+ run_ofctl.assert_has_calls([mock.call("dump-flows", mock.ANY)])
+
+ def test_dump_all_flows(self):
+ with mock.patch.object(self.br, 'run_ofctl') as run_ofctl:
+ self.br.dump_flows_all_tables()
+ run_ofctl.assert_has_calls([mock.call("dump-flows", [])])
+
class OVSDVRProcessTestMixin(object):
def test_install_dvr_process_ipv4(self):
def test_setup_default_table(self):
self.br.setup_default_table()
expected = [
- call.delete_flows(),
call.add_flow(priority=0, table=0, actions='normal'),
call.add_flow(priority=0, table=23, actions='drop'),
call.add_flow(priority=0, table=24, actions='drop'),
{'priority': 0, 'table': 3, 'actions': 'drop'},
{'priority': 0, 'table': 4, 'actions': 'drop'},
{'priority': 1, 'table': 10,
- 'actions': 'learn(table=20,priority=1,'
+ 'actions': 'learn(cookie=0x0,table=20,priority=1,'
'hard_timeout=300,NXM_OF_VLAN_TCI[0..11],'
'NXM_OF_ETH_DST[]=NXM_OF_ETH_SRC[],'
'load:0->NXM_OF_VLAN_TCI[],'
{'priority': 0, 'table': 3, 'actions': 'drop'},
{'priority': 0, 'table': 4, 'actions': 'drop'},
{'priority': 1, 'table': 10,
- 'actions': 'learn(table=20,priority=1,'
+ 'actions': 'learn(cookie=0x0,table=20,priority=1,'
'hard_timeout=300,NXM_OF_VLAN_TCI[0..11],'
'NXM_OF_ETH_DST[]=NXM_OF_ETH_SRC[],'
'load:0->NXM_OF_VLAN_TCI[],'
'devices_down': details,
'failed_devices_up': [],
'failed_devices_down': []}),\
+ mock.patch.object(self.agent.int_br,
+ 'get_port_tag_dict',
+ return_value={}),\
mock.patch.object(self.agent, func_name) as func:
skip_devs, need_bound_devices = (
self.agent.treat_devices_added_or_updated([{}], False))
'get_devices_details_list_and_failed_devices',
return_value={'devices': [dev_mock],
'failed_devices': None}),\
+ mock.patch.object(self.agent.int_br,
+ 'get_port_tag_dict',
+ return_value={}),\
mock.patch.object(self.agent.int_br,
'get_vifs_by_ids',
return_value={}),\
mock.patch.object(self.agent.int_br,
'get_vifs_by_ids',
return_value={'xxx': mock.MagicMock()}),\
+ mock.patch.object(self.agent.int_br, 'get_port_tag_dict',
+ return_value={}),\
mock.patch.object(self.agent,
'treat_vif_port') as treat_vif_port:
skip_devs, need_bound_devices = (
mock.call.phys_br_cls('br-eth'),
mock.call.phys_br.setup_controllers(mock.ANY),
mock.call.phys_br.setup_default_table(),
- mock.call.int_br.delete_port('int-br-eth'),
- mock.call.phys_br.delete_port('phy-br-eth'),
+ mock.call.int_br.db_get_val('Interface', 'int-br-eth',
+ 'type'),
+ # Have to use __getattr__ here to avoid mock._Call.__eq__
+ # method being called
+ mock.call.int_br.db_get_val().__getattr__('__eq__')('veth'),
mock.call.int_br.add_patch_port('int-br-eth',
constants.NONEXISTENT_PEER),
mock.call.phys_br.add_patch_port('phy-br-eth',
self.assertEqual(self.agent.phys_ofports["physnet1"],
"phys_veth_ofport")
+ def test_setup_physical_bridges_change_from_veth_to_patch_conf(self):
+ with mock.patch.object(sys, "exit"),\
+ mock.patch.object(utils, "execute"),\
+ mock.patch.object(self.agent, 'br_phys_cls') as phys_br_cls,\
+ mock.patch.object(self.agent, 'int_br') as int_br,\
+ mock.patch.object(self.agent.int_br, 'db_get_val',
+ return_value='veth'):
+ phys_br = phys_br_cls()
+ parent = mock.MagicMock()
+ parent.attach_mock(phys_br_cls, 'phys_br_cls')
+ parent.attach_mock(phys_br, 'phys_br')
+ parent.attach_mock(int_br, 'int_br')
+ phys_br.add_patch_port.return_value = "phy_ofport"
+ int_br.add_patch_port.return_value = "int_ofport"
+ self.agent.setup_physical_bridges({"physnet1": "br-eth"})
+ expected_calls = [
+ mock.call.phys_br_cls('br-eth'),
+ mock.call.phys_br.setup_controllers(mock.ANY),
+ mock.call.phys_br.setup_default_table(),
+ mock.call.int_br.delete_port('int-br-eth'),
+ mock.call.phys_br.delete_port('phy-br-eth'),
+ mock.call.int_br.add_patch_port('int-br-eth',
+ constants.NONEXISTENT_PEER),
+ mock.call.phys_br.add_patch_port('phy-br-eth',
+ constants.NONEXISTENT_PEER),
+ mock.call.int_br.drop_port(in_port='int_ofport'),
+ mock.call.phys_br.drop_port(in_port='phy_ofport'),
+ mock.call.int_br.set_db_attribute('Interface', 'int-br-eth',
+ 'options:peer',
+ 'phy-br-eth'),
+ mock.call.phys_br.set_db_attribute('Interface', 'phy-br-eth',
+ 'options:peer',
+ 'int-br-eth'),
+ ]
+ parent.assert_has_calls(expected_calls)
+ self.assertEqual(self.agent.int_ofports["physnet1"],
+ "int_ofport")
+ self.assertEqual(self.agent.phys_ofports["physnet1"],
+ "phy_ofport")
+
def test_get_peer_name(self):
bridge1 = "A_REALLY_LONG_BRIDGE_NAME1"
bridge2 = "A_REALLY_LONG_BRIDGE_NAME2"
self.tun_br = mock.Mock()
with mock.patch.object(self.agent.int_br,
"add_patch_port",
- return_value=1) as intbr_patch_fn,\
- mock.patch.object(self.agent,
- 'tun_br',
- autospec=True) as tun_br,\
+ return_value=1) as int_patch_port,\
+ mock.patch.object(self.agent.tun_br,
+ "add_patch_port",
+ return_value=1) as tun_patch_port,\
+ mock.patch.object(self.agent.tun_br, 'bridge_exists',
+ return_value=False),\
+ mock.patch.object(self.agent.tun_br, 'create') as create_tun,\
+ mock.patch.object(self.agent.tun_br,
+ 'setup_controllers') as setup_controllers,\
+ mock.patch.object(self.agent.tun_br, 'port_exists',
+ return_value=False),\
+ mock.patch.object(self.agent.int_br, 'port_exists',
+ return_value=False),\
mock.patch.object(sys, "exit"):
- tun_br.add_patch_port.return_value = 2
- self.agent.reset_tunnel_br(None)
+ self.agent.setup_tunnel_br(None)
self.agent.setup_tunnel_br()
- self.assertTrue(intbr_patch_fn.called)
+ self.assertTrue(create_tun.called)
+ self.assertTrue(setup_controllers.called)
+ self.assertTrue(int_patch_port.called)
+ self.assertTrue(tun_patch_port.called)
+
+ def test_setup_tunnel_br_ports_exits_drop_flows(self):
+ cfg.CONF.set_override('drop_flows_on_start', True, 'AGENT')
+ with mock.patch.object(self.agent.tun_br, 'port_exists',
+ return_value=True),\
+ mock.patch.object(self.agent, 'tun_br'),\
+ mock.patch.object(self.agent.int_br, 'port_exists',
+ return_value=True),\
+ mock.patch.object(self.agent.tun_br, 'setup_controllers'),\
+ mock.patch.object(self.agent, 'patch_tun_ofport', new=2),\
+ mock.patch.object(self.agent, 'patch_int_ofport', new=2),\
+ mock.patch.object(self.agent.tun_br,
+ 'delete_flows') as delete,\
+ mock.patch.object(self.agent.int_br,
+ "add_patch_port") as int_patch_port,\
+ mock.patch.object(self.agent.tun_br,
+ "add_patch_port") as tun_patch_port,\
+ mock.patch.object(sys, "exit"):
+ self.agent.setup_tunnel_br(None)
+ self.agent.setup_tunnel_br()
+ self.assertFalse(int_patch_port.called)
+ self.assertFalse(tun_patch_port.called)
+ self.assertTrue(delete.called)
def test_setup_tunnel_port(self):
self.agent.tun_br = mock.Mock()
return_value=fake_tunnel_details),\
mock.patch.object(
self.agent,
- '_setup_tunnel_port') as _setup_tunnel_port_fn:
+ '_setup_tunnel_port') as _setup_tunnel_port_fn,\
+ mock.patch.object(self.agent,
+ 'cleanup_stale_flows') as cleanup:
self.agent.tunnel_types = ['vxlan']
self.agent.tunnel_sync()
expected_calls = [mock.call(self.agent.tun_br, 'vxlan-64651f0f',
'100.101.31.15', 'vxlan')]
_setup_tunnel_port_fn.assert_has_calls(expected_calls)
+ self.assertEqual([], cleanup.mock_calls)
def test_tunnel_sync_invalid_ip_address(self):
fake_tunnel_details = {'tunnels': [{'ip_address': '300.300.300.300'},
return_value=fake_tunnel_details),\
mock.patch.object(
self.agent,
- '_setup_tunnel_port') as _setup_tunnel_port_fn:
+ '_setup_tunnel_port') as _setup_tunnel_port_fn,\
+ mock.patch.object(self.agent,
+ 'cleanup_stale_flows') as cleanup:
self.agent.tunnel_types = ['vxlan']
self.agent.tunnel_sync()
_setup_tunnel_port_fn.assert_called_once_with(self.agent.tun_br,
'vxlan-64646464',
'100.100.100.100',
'vxlan')
+ self.assertEqual([], cleanup.mock_calls)
def test_tunnel_update(self):
kwargs = {'tunnel_ip': '10.10.10.10',
mock.patch.object(self.mod_agent.OVSNeutronAgent,
'setup_physical_bridges') as setup_phys_br,\
mock.patch.object(time, 'sleep'),\
+ mock.patch.object(
+ self.mod_agent.OVSNeutronAgent,
+ 'update_stale_ofport_rules') as update_stale, \
mock.patch.object(self.mod_agent.OVSNeutronAgent,
- 'update_stale_ofport_rules') as update_stale:
+ 'cleanup_stale_flows') as cleanup:
log_exception.side_effect = Exception(
'Fake exception to get out of the loop')
scan_ports.side_effect = [reply2, reply3]
mock.call(reply2, False),
mock.call(reply3, True)
])
+ cleanup.assert_called_once_with()
self.assertTrue(update_stale.called)
# Verify the OVS restart we triggered in the loop
# re-setup the bridges
self.agent.state_rpc.client):
self.assertEqual(10, rpc_client.timeout)
+ def test_cleanup_stale_flows_iter_0(self):
+ with mock.patch.object(self.agent, 'agent_uuid_stamp', new=1234),\
+ mock.patch.object(self.agent.int_br,
+ 'dump_flows_all_tables') as dump_flows,\
+ mock.patch.object(self.agent.int_br,
+ 'delete_flows') as del_flow:
+ dump_flows.return_value = [
+ 'cookie=0x4d2, duration=50.156s, table=0,actions=drop',
+ 'cookie=0x4321, duration=54.143s, table=2, priority=0',
+ 'cookie=0x2345, duration=50.125s, table=2, priority=0',
+ 'cookie=0x4d2, duration=52.112s, table=3, actions=drop',
+ ]
+ self.agent.cleanup_stale_flows()
+ del_flow.assert_has_calls([mock.call(cookie='0x4321/-1',
+ table='2'),
+ mock.call(cookie='0x2345/-1',
+ table='2')])
+
def test_set_rpc_timeout_no_value(self):
self.agent.quitting_rpc_timeout = None
with mock.patch.object(self.agent, 'set_rpc_timeout') as mock_set_rpc:
# block RPC calls and bridge calls
self.agent.setup_physical_bridges = mock.Mock()
self.agent.setup_integration_br = mock.Mock()
- self.agent.reset_tunnel_br = mock.Mock()
+ self.agent.setup_tunnel_br = mock.Mock()
self.agent.state_rpc = mock.Mock()
try:
self.agent.rpc_loop(polling_manager=mock.Mock())
self.mock_int_bridge = self.ovs_bridges[self.INT_BRIDGE]
self.mock_int_bridge_expected = [
+ mock.call.set_agent_uuid_stamp(mock.ANY),
mock.call.create(),
mock.call.set_secure_mode(),
mock.call.setup_controllers(mock.ANY),
self.mock_map_tun_bridge_expected = [
mock.call.setup_controllers(mock.ANY),
mock.call.setup_default_table(),
- mock.call.delete_port('phy-%s' % self.MAP_TUN_BRIDGE),
mock.call.add_patch_port('phy-%s' % self.MAP_TUN_BRIDGE,
constants.NONEXISTENT_PEER), ]
self.mock_int_bridge_expected += [
- mock.call.delete_port('int-%s' % self.MAP_TUN_BRIDGE),
+ mock.call.db_get_val('Interface', 'int-%s' % self.MAP_TUN_BRIDGE,
+ 'type'),
mock.call.add_patch_port('int-%s' % self.MAP_TUN_BRIDGE,
constants.NONEXISTENT_PEER),
]
]
self.mock_tun_bridge_expected = [
- mock.call.reset_bridge(secure_mode=True),
+ mock.call.set_agent_uuid_stamp(mock.ANY),
+ mock.call.bridge_exists('br-tun'),
+ mock.call.bridge_exists().__nonzero__(),
mock.call.setup_controllers(mock.ANY),
+ mock.call.port_exists('patch-int'),
+ mock.call.port_exists().__nonzero__(),
mock.call.add_patch_port('patch-int', 'patch-tun'),
]
self.mock_int_bridge_expected += [
+ mock.call.port_exists('patch-tun'),
+ mock.call.port_exists().__nonzero__(),
mock.call.add_patch_port('patch-tun', 'patch-int'),
]
self.mock_int_bridge_expected += [
]
self.mock_tun_bridge_expected += [
- mock.call.delete_flows(),
mock.call.setup_default_table(self.INT_OFPORT, arp_responder),
]
mock.patch.object(self.mod_agent.OVSNeutronAgent,
'tunnel_sync'),\
mock.patch.object(time, 'sleep'),\
- mock.patch.object(self.mod_agent.OVSNeutronAgent,
- 'update_stale_ofport_rules') as update_stale:
+ mock.patch.object(
+ self.mod_agent.OVSNeutronAgent,
+ 'update_stale_ofport_rules') as update_stale,\
+ mock.patch.object(
+ self.mod_agent.OVSNeutronAgent,
+ 'cleanup_stale_flows') as cleanup:
log_exception.side_effect = Exception(
'Fake exception to get out of the loop')
scan_ports.side_effect = [reply2, reply3]
'removed': set(['tap0']),
'added': set([])}, False)
])
+
+ cleanup.assert_called_once_with()
self.assertTrue(update_stale.called)
self._verify_mock_calls()
]
self.mock_int_bridge_expected = [
+ mock.call.set_agent_uuid_stamp(mock.ANY),
mock.call.create(),
mock.call.set_secure_mode(),
mock.call.setup_controllers(mock.ANY),
self.mock_map_tun_bridge_expected = [
mock.call.setup_controllers(mock.ANY),
mock.call.setup_default_table(),
- mock.call.delete_port('phy-%s' % self.MAP_TUN_BRIDGE),
mock.call.add_port(self.intb),
]
self.mock_int_bridge_expected += [
- mock.call.delete_port('int-%s' % self.MAP_TUN_BRIDGE),
+ mock.call.db_get_val('Interface', 'int-%s' % self.MAP_TUN_BRIDGE,
+ 'type'),
mock.call.add_port(self.inta)
]
]
self.mock_tun_bridge_expected = [
- mock.call.reset_bridge(secure_mode=True),
+ mock.call.set_agent_uuid_stamp(mock.ANY),
+ mock.call.bridge_exists('br-tun'),
+ mock.call.bridge_exists().__nonzero__(),
mock.call.setup_controllers(mock.ANY),
+ mock.call.port_exists('patch-int'),
+ mock.call.port_exists().__nonzero__(),
mock.call.add_patch_port('patch-int', 'patch-tun'),
]
self.mock_int_bridge_expected += [
+ mock.call.port_exists('patch-tun'),
+ mock.call.port_exists().__nonzero__(),
mock.call.add_patch_port('patch-tun', 'patch-int')
]
self.mock_int_bridge_expected += [
'Port', columns=['name', 'other_config', 'tag'], ports=[])
]
self.mock_tun_bridge_expected += [
- mock.call.delete_flows(),
mock.call.setup_default_table(self.INT_OFPORT, arp_responder),
]