# test __str__
str(port)
- def test_set_controller(self):
- controller_names = ['tcp:127.0.0.1:6633', 'tcp:172.17.16.10:5555']
- self.br.set_controller(controller_names)
- self._verify_vsctl_mock('set-controller', self.BR_NAME,
- 'tcp:127.0.0.1:6633', 'tcp:172.17.16.10:5555')
-
- def test_del_controller(self):
- self.br.del_controller()
- self._verify_vsctl_mock('del-controller', self.BR_NAME)
-
- def test_get_controller(self):
- self.execute.return_value = (
- 'tcp:127.0.0.1:6633\\ntcp:172.17.16.10:5555')
- names = self.br.get_controller()
- self.assertEqual(names,
- ['tcp:127.0.0.1:6633', 'tcp:172.17.16.10:5555'])
- self._verify_vsctl_mock('get-controller', self.BR_NAME)
-
- def test_set_secure_mode(self):
- self.br.set_secure_mode()
- self._verify_vsctl_mock('set-fail-mode', self.BR_NAME, 'secure')
-
- def test_set_standalone_mode(self):
- self.br.set_standalone_mode()
- self._verify_vsctl_mock('set-fail-mode', self.BR_NAME, 'standalone')
-
- def test_set_protocols(self):
- protocols = 'OpenFlow13'
- self.br.set_protocols(protocols)
- self._verify_vsctl_mock('set', 'Bridge', self.BR_NAME,
- "protocols=%s" % protocols)
-
- def test_create(self):
- self.br.add_bridge(self.BR_NAME)
-
- self.br.create()
-
- def test_destroy(self):
- self.br.delete_bridge(self.BR_NAME)
-
- self.br.destroy()
-
- def test_reset_bridge(self):
- self.br.destroy()
- self.br.create()
-
- self.br.reset_bridge()
-
def _build_timeout_opt(self, exp_timeout):
return "--timeout=%d" % exp_timeout if exp_timeout else self.TO
- def _test_delete_port(self, exp_timeout=None):
- pname = "tap5"
- self.br.delete_port(pname)
- self._verify_vsctl_mock("--if-exists", "del-port", self.BR_NAME, pname)
-
- def test_delete_port(self):
- self._test_delete_port()
-
- def test_call_command_non_default_timeput(self):
- # This test is only for verifying a non-default timeout
- # is correctly applied. Does not need to be repeated for
- # every ovs_lib method
- new_timeout = 5
- self.br.vsctl_timeout = new_timeout
- self._test_delete_port(new_timeout)
-
def test_add_flow(self):
ofport = "99"
vid = 4000
self._test_get_port_ofport(ovs_lib.INVALID_OFPORT,
ovs_lib.INVALID_OFPORT)
- def test_get_datapath_id(self):
- datapath_id = '"0000b67f4fbcc149"'
- self.execute.return_value = self._encode_ovs_json(['datapath_id'],
- [[datapath_id]])
- self.assertEqual(self.br.get_datapath_id(), datapath_id)
- self._verify_vsctl_mock("--columns=datapath_id", "list", "Bridge",
- self.BR_NAME)
-
def test_count_flows(self):
self.execute.return_value = 'ignore\nflow-1\n'
# counts the number of flows as total lines of output - 2
tools.verify_mock_calls(self.execute, expected_calls_and_values)
- def test_add_patch_port(self):
- pname = "tap99"
- peer = "bar10"
- ofport = 6
-
- # Each element is a tuple of (expected mock call, return_value)
- command = ["--may-exist", "add-port", self.BR_NAME, pname]
- command.extend(["--", "set", "Interface", pname])
- command.extend(["type=patch", "options:peer=" + peer])
- expected_calls_and_values = [
- (self._vsctl_mock(*command), None),
- (self._vsctl_mock("--columns=ofport", "list", "Interface", pname),
- self._encode_ovs_json(['ofport'], [[ofport]]))
- ]
- tools.setup_mock_calls(self.execute, expected_calls_and_values)
-
- self.assertEqual(self.br.add_patch_port(pname, peer), ofport)
- tools.verify_mock_calls(self.execute, expected_calls_and_values)
-
def _test_get_vif_ports(self, is_xen=False):
pname = "tap99"
ofport = 6
self.assertRaises(RuntimeError, self.br.delete_ports, all_ports=False)
tools.verify_mock_calls(self.execute, expected_calls_and_values)
- def _test_get_bridges(self, exp_timeout=None):
+ def test_get_bridges_not_default_timeout(self):
bridges = ['br-int', 'br-ex']
- if exp_timeout:
- self.br.vsctl_timeout = exp_timeout
+ self.br.vsctl_timeout = 5
self.execute.return_value = 'br-int\\nbr-ex\n'
self.assertEqual(self.br.get_bridges(), bridges)
self._verify_vsctl_mock("list-br")
- def test_get_bridges(self):
- self._test_get_bridges()
-
- def test_get_bridges_not_default_timeout(self):
- self._test_get_bridges(5)
-
def test_get_local_port_mac_succeeds(self):
with mock.patch('neutron.agent.linux.ip_lib.IpLinkCommand',
return_value=mock.Mock(address='foo')):