1 # Copyright (C) 2014 VA Linux Systems Japan K.K.
2 # Copyright (C) 2014 Fumihiko Kakuma <kakuma at valinux co jp>
5 # Licensed under the Apache License, Version 2.0 (the "License"); you may
6 # not use this file except in compliance with the License. You may obtain
7 # a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 # License for the specific language governing permissions and limitations
19 from neutron.common import constants as n_const
20 from neutron.tests.unit.plugins.ml2.drivers.l2pop.rpc_manager \
21 import l2population_rpc_base
24 class TestL2populationRpcCallBackTunnelMixin(
25 l2population_rpc_base.TestL2populationRpcCallBackTunnelMixinBase):
27 def test_get_agent_ports_no_data(self):
29 list(self.fakeagent.get_agent_ports(self.fdb_entries1, {})))
31 def test_get_agent_ports_non_existence_key_in_lvm(self):
33 del self.local_vlan_map1[self.lvms[1].net]
34 for lvm, agent_ports in self.fakeagent.get_agent_ports(
35 self.fdb_entries1, self.local_vlan_map1):
36 results[lvm] = agent_ports
39 self.ports[0].ip: [(self.lvms[0].mac, self.lvms[0].ip)],
42 self.ports[2].ip: [(self.lvms[2].mac, self.lvms[2].ip)],
45 self.assertEqual(expected, results)
47 def test_get_agent_ports_no_agent_ports(self):
49 self.fdb_entries1[self.lvms[1].net]['ports'] = {}
50 for lvm, agent_ports in self.fakeagent.get_agent_ports(
51 self.fdb_entries1, self.local_vlan_map1):
52 results[lvm] = agent_ports
55 self.ports[0].ip: [(self.lvms[0].mac, self.lvms[0].ip)],
59 self.ports[2].ip: [(self.lvms[2].mac, self.lvms[2].ip)],
62 self.assertEqual(expected, results)
64 def test_fdb_add_tun(self):
65 with mock.patch.object(self.fakeagent, 'setup_tunnel_port'),\
66 mock.patch.object(self.fakeagent, 'add_fdb_flow'
67 ) as mock_add_fdb_flow:
68 self.fakeagent.fdb_add_tun('context', self.fakebr, self.lvm1,
70 self._tunnel_port_lookup)
72 mock.call(self.fakebr, (self.lvms[0].mac, self.lvms[0].ip),
73 self.ports[0].ip, self.lvm1, self.ports[0].ofport),
74 mock.call(self.fakebr, (self.lvms[1].mac, self.lvms[1].ip),
75 self.ports[1].ip, self.lvm1, self.ports[1].ofport),
76 mock.call(self.fakebr, (self.lvms[2].mac, self.lvms[2].ip),
77 self.ports[2].ip, self.lvm1, self.ports[2].ofport),
79 self.assertEqual(sorted(expected),
80 sorted(mock_add_fdb_flow.call_args_list))
82 def test_fdb_add_tun_non_existence_key_in_ofports(self):
83 ofport = self.lvm1.network_type + '0a0a0a0a'
84 del self.ofports[self.type_gre][self.ports[1].ip]
85 with mock.patch.object(self.fakeagent, 'setup_tunnel_port',
87 ) as mock_setup_tunnel_port,\
88 mock.patch.object(self.fakeagent, 'add_fdb_flow'
89 ) as mock_add_fdb_flow:
90 self.fakeagent.fdb_add_tun('context', self.fakebr, self.lvm1,
92 self._tunnel_port_lookup)
93 mock_setup_tunnel_port.assert_called_once_with(
94 self.fakebr, self.ports[1].ip, self.lvm1.network_type)
96 mock.call(self.fakebr, (self.lvms[0].mac, self.lvms[0].ip),
97 self.ports[0].ip, self.lvm1, self.ports[0].ofport),
98 mock.call(self.fakebr, (self.lvms[1].mac, self.lvms[1].ip),
99 self.ports[1].ip, self.lvm1, ofport),
100 mock.call(self.fakebr, (self.lvms[2].mac, self.lvms[2].ip),
101 self.ports[2].ip, self.lvm1, self.ports[2].ofport),
103 self.assertEqual(sorted(expected),
104 sorted(mock_add_fdb_flow.call_args_list))
106 def test_fdb_add_tun_unavailable_ofport(self):
107 del self.ofports[self.type_gre][self.ports[1].ip]
108 with mock.patch.object(self.fakeagent, 'setup_tunnel_port',
110 ) as mock_setup_tunnel_port,\
111 mock.patch.object(self.fakeagent, 'add_fdb_flow'
112 ) as mock_add_fdb_flow:
113 self.fakeagent.fdb_add_tun('context', self.fakebr, self.lvm1,
115 self._tunnel_port_lookup)
116 mock_setup_tunnel_port.assert_called_once_with(
117 self.fakebr, self.ports[1].ip, self.lvm1.network_type)
119 mock.call(self.fakebr, (self.lvms[0].mac, self.lvms[0].ip),
120 self.ports[0].ip, self.lvm1, self.ports[0].ofport),
121 mock.call(self.fakebr, (self.lvms[2].mac, self.lvms[2].ip),
122 self.ports[2].ip, self.lvm1, self.ports[2].ofport),
124 self.assertEqual(sorted(expected),
125 sorted(mock_add_fdb_flow.call_args_list))
127 def test_fdb_remove_tun(self):
128 with mock.patch.object(
129 self.fakeagent, 'del_fdb_flow') as mock_del_fdb_flow:
130 self.fakeagent.fdb_remove_tun('context', self.fakebr, self.lvm1,
132 self._tunnel_port_lookup)
134 mock.call(self.fakebr, (self.lvms[0].mac, self.lvms[0].ip),
135 self.ports[0].ip, self.lvm1, self.ports[0].ofport),
136 mock.call(self.fakebr, (self.lvms[1].mac, self.lvms[1].ip),
137 self.ports[1].ip, self.lvm1, self.ports[1].ofport),
138 mock.call(self.fakebr, (self.lvms[2].mac, self.lvms[2].ip),
139 self.ports[2].ip, self.lvm1, self.ports[2].ofport),
141 self.assertEqual(sorted(expected),
142 sorted(mock_del_fdb_flow.call_args_list))
144 def test_fdb_remove_tun_flooding_entry(self):
145 self.agent_ports[self.ports[1].ip] = [n_const.FLOODING_ENTRY]
146 with mock.patch.object(self.fakeagent, 'del_fdb_flow'
147 ) as mock_del_fdb_flow,\
148 mock.patch.object(self.fakeagent, 'cleanup_tunnel_port'
149 ) as mock_cleanup_tunnel_port:
150 self.fakeagent.fdb_remove_tun('context', self.fakebr, self.lvm1,
152 self._tunnel_port_lookup)
154 mock.call(self.fakebr, (self.lvms[0].mac, self.lvms[0].ip),
155 self.ports[0].ip, self.lvm1, self.ports[0].ofport),
156 mock.call(self.fakebr,
157 (n_const.FLOODING_ENTRY[0], n_const.FLOODING_ENTRY[1]),
158 self.ports[1].ip, self.lvm1, self.ports[1].ofport),
159 mock.call(self.fakebr, (self.lvms[2].mac, self.lvms[2].ip),
160 self.ports[2].ip, self.lvm1, self.ports[2].ofport),
162 self.assertEqual(sorted(expected),
163 sorted(mock_del_fdb_flow.call_args_list))
164 mock_cleanup_tunnel_port.assert_called_once_with(
165 self.fakebr, self.ports[1].ofport, self.lvm1.network_type)
167 def test_fdb_remove_tun_non_existence_key_in_ofports(self):
168 del self.ofports[self.type_gre][self.ports[1].ip]
169 with mock.patch.object(
170 self.fakeagent, 'del_fdb_flow') as mock_del_fdb_flow:
171 self.fakeagent.fdb_remove_tun('context', self.fakebr, self.lvm1,
173 self._tunnel_port_lookup)
175 mock.call(self.fakebr, (self.lvms[0].mac, self.lvms[0].ip),
176 self.ports[0].ip, self.lvm1, self.ports[0].ofport),
177 mock.call(self.fakebr, (self.lvms[2].mac, self.lvms[2].ip),
178 self.ports[2].ip, self.lvm1, self.ports[2].ofport),
180 self.assertEqual(sorted(expected),
181 sorted(mock_del_fdb_flow.call_args_list))
183 def test_fdb_update(self):
184 fake__fdb_chg_ip = mock.Mock()
185 self.fakeagent._fdb_chg_ip = fake__fdb_chg_ip
186 self.fakeagent.fdb_update('context', self.upd_fdb_entry1)
187 fake__fdb_chg_ip.assert_called_once_with(
188 'context', self.upd_fdb_entry1_val)
190 def test_fdb_update_non_existence_method(self):
191 self.assertRaises(NotImplementedError,
192 self.fakeagent.fdb_update,
193 'context', self.upd_fdb_entry1)
195 def test__fdb_chg_ip(self):
196 m_setup_entry_for_arp_reply = mock.Mock()
197 self.fakeagent.setup_entry_for_arp_reply = m_setup_entry_for_arp_reply
198 self.fakeagent.fdb_chg_ip_tun('context', self.fakebr,
199 self.upd_fdb_entry1_val, self.local_ip,
200 self.local_vlan_map1)
202 mock.call(self.fakebr, 'remove', self.lvm1.vlan, self.lvms[0].mac,
204 mock.call(self.fakebr, 'add', self.lvm1.vlan, self.lvms[1].mac,
206 mock.call(self.fakebr, 'remove', self.lvm1.vlan, self.lvms[0].mac,
208 mock.call(self.fakebr, 'add', self.lvm1.vlan, self.lvms[1].mac,
210 mock.call(self.fakebr, 'remove', self.lvm2.vlan, self.lvms[0].mac,
212 mock.call(self.fakebr, 'add', self.lvm2.vlan, self.lvms[2].mac,
215 m_setup_entry_for_arp_reply.assert_has_calls(expected, any_order=True)
217 def test__fdb_chg_ip_no_lvm(self):
218 m_setup_entry_for_arp_reply = mock.Mock()
219 self.fakeagent.setup_entry_for_arp_reply = m_setup_entry_for_arp_reply
220 self.fakeagent.fdb_chg_ip_tun(
221 'context', self.fakebr, self.upd_fdb_entry1, self.local_ip, {})
222 self.assertFalse(m_setup_entry_for_arp_reply.call_count)
224 def test__fdb_chg_ip_ip_is_local_ip(self):
225 upd_fdb_entry_val = {
228 'before': [(self.lvms[0].mac, self.lvms[0].ip)],
229 'after': [(self.lvms[1].mac, self.lvms[1].ip)],
233 m_setup_entry_for_arp_reply = mock.Mock()
234 self.fakeagent.setup_entry_for_arp_reply = m_setup_entry_for_arp_reply
235 self.fakeagent.fdb_chg_ip_tun('context', self.fakebr,
236 upd_fdb_entry_val, self.local_ip,
237 self.local_vlan_map1)
238 self.assertFalse(m_setup_entry_for_arp_reply.call_count)
240 def test_fdb_chg_ip_tun_empty_before_after(self):
241 upd_fdb_entry_val = {
246 m_setup_entry_for_arp_reply = mock.Mock()
247 self.fakeagent.setup_entry_for_arp_reply = m_setup_entry_for_arp_reply
248 # passing non-local ip
249 self.fakeagent.fdb_chg_ip_tun('context', self.fakebr,
250 upd_fdb_entry_val, "8.8.8.8",
251 self.local_vlan_map1)
252 self.assertFalse(m_setup_entry_for_arp_reply.call_count)