self._test_port_exists(None, False)
+class OFCTLParamListMatcher(object):
+
+ def _parse(self, params):
+ actions_pos = params.find('actions')
+ return set(params[:actions_pos].split(',')), params[actions_pos:]
+
+ def __init__(self, params):
+ self.expected = self._parse(params)
+
+ def __eq__(self, other):
+ return self.expected == self._parse(other)
+
+ def __str__(self):
+ return 'ovs-ofctl parameters: %s, "%s"' % self.expected
+
+ __repr__ = __str__
+
+
class OVS_Lib_Test(base.BaseTestCase):
"""A test suite to exercise the OVS libraries shared by Neutron agents.
self.br.add_flow(**flow_dict_7)
expected_calls = [
mock.call(["ovs-ofctl", "add-flows", self.BR_NAME, '-'],
- process_input="hard_timeout=0,idle_timeout=0,"
- "priority=2,dl_src=ca:fe:de:ad:be:ef"
- ",actions=strip_vlan,output:0",
+ process_input=OFCTLParamListMatcher(
+ "hard_timeout=0,idle_timeout=0,"
+ "priority=2,dl_src=ca:fe:de:ad:be:ef,"
+ "actions=strip_vlan,output:0"),
root_helper=self.root_helper),
mock.call(["ovs-ofctl", "add-flows", self.BR_NAME, '-'],
- process_input="hard_timeout=0,idle_timeout=0,"
- "priority=1,actions=normal",
+ process_input=OFCTLParamListMatcher(
+ "hard_timeout=0,idle_timeout=0,"
+ "priority=1,actions=normal"),
root_helper=self.root_helper),
mock.call(["ovs-ofctl", "add-flows", self.BR_NAME, '-'],
- process_input="hard_timeout=0,idle_timeout=0,"
- "priority=2,actions=drop",
+ process_input=OFCTLParamListMatcher(
+ "hard_timeout=0,idle_timeout=0,"
+ "priority=2,actions=drop"),
root_helper=self.root_helper),
mock.call(["ovs-ofctl", "add-flows", self.BR_NAME, '-'],
- process_input="hard_timeout=0,idle_timeout=0,priority=2,"
- "in_port=%s,actions=drop" % ofport,
+ process_input=OFCTLParamListMatcher(
+ "hard_timeout=0,idle_timeout=0,priority=2,"
+ "in_port=%s,actions=drop" % ofport),
root_helper=self.root_helper),
mock.call(["ovs-ofctl", "add-flows", self.BR_NAME, '-'],
- process_input="hard_timeout=0,idle_timeout=0,"
- "priority=4,dl_vlan=%s,in_port=%s,"
- "actions=strip_vlan,set_tunnel:%s,normal"
- % (vid, ofport, lsw_id),
+ process_input=OFCTLParamListMatcher(
+ "hard_timeout=0,idle_timeout=0,"
+ "priority=4,dl_vlan=%s,in_port=%s,"
+ "actions=strip_vlan,set_tunnel:%s,normal"
+ % (vid, ofport, lsw_id)),
root_helper=self.root_helper),
mock.call(["ovs-ofctl", "add-flows", self.BR_NAME, '-'],
- process_input="hard_timeout=0,idle_timeout=0,priority=3,"
- "tun_id=%s,actions=mod_vlan_vid:%s,"
- "output:%s" % (lsw_id, vid, ofport),
+ process_input=OFCTLParamListMatcher(
+ "hard_timeout=0,idle_timeout=0,priority=3,"
+ "tun_id=%s,actions=mod_vlan_vid:%s,"
+ "output:%s" % (lsw_id, vid, ofport)),
root_helper=self.root_helper),
mock.call(["ovs-ofctl", "add-flows", self.BR_NAME, '-'],
- process_input="hard_timeout=0,idle_timeout=0,priority=4,"
- "nw_src=%s,arp,actions=drop" % cidr,
+ process_input=OFCTLParamListMatcher(
+ "hard_timeout=0,idle_timeout=0,priority=4,"
+ "nw_src=%s,arp,actions=drop" % cidr),
root_helper=self.root_helper),
]
self.execute.assert_has_calls(expected_calls)