1 # Copyright (C) 2014,2015 VA Linux Systems Japan K.K.
2 # Copyright (C) 2014,2015 YAMAMOTO Takashi <yamamoto at valinux co jp>
5 # Licensed under the Apache License, Version 2.0 (the "License"); you may
6 # not use this file except in compliance with the License. You may obtain
7 # a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 # License for the specific language governing permissions and limitations
19 from neutron.tests.unit.plugins.ml2.drivers.openvswitch.agent.openflow.native \
20 import ovs_bridge_test_base
23 call = mock.call # short hand
26 class OVSIntegrationBridgeTest(ovs_bridge_test_base.OVSBridgeTestBase):
28 super(OVSIntegrationBridgeTest, self).setUp()
29 self.setup_bridge_mock('br-int', self.br_int_cls)
31 def test_setup_default_table(self):
32 self.br.setup_default_table()
33 (dp, ofp, ofpp) = self._get_dp()
35 call._send_msg(ofpp.OFPFlowMod(dp,
38 ofpp.OFPInstructionActions(
39 ofp.OFPIT_APPLY_ACTIONS, [
40 ofpp.OFPActionOutput(ofp.OFPP_NORMAL, 0)
43 match=ofpp.OFPMatch(),
46 call._send_msg(ofpp.OFPFlowMod(dp,
49 match=ofpp.OFPMatch(),
52 call._send_msg(ofpp.OFPFlowMod(dp,
55 match=ofpp.OFPMatch(),
59 self.assertEqual(expected, self.mock.mock_calls)
61 def test_provision_local_vlan(self):
65 self.br.provision_local_vlan(port=port, lvid=lvid,
66 segmentation_id=segmentation_id)
67 (dp, ofp, ofpp) = self._get_dp()
69 call._send_msg(ofpp.OFPFlowMod(dp,
72 ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [
73 ofpp.OFPActionSetField(
74 vlan_vid=lvid | ofp.OFPVID_PRESENT),
75 ofpp.OFPActionOutput(ofp.OFPP_NORMAL, 0)
80 vlan_vid=segmentation_id | ofp.OFPVID_PRESENT),
84 self.assertEqual(expected, self.mock.mock_calls)
86 def test_provision_local_vlan_novlan(self):
89 segmentation_id = None
90 self.br.provision_local_vlan(port=port, lvid=lvid,
91 segmentation_id=segmentation_id)
92 (dp, ofp, ofpp) = self._get_dp()
94 call._send_msg(ofpp.OFPFlowMod(dp,
97 ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [
98 ofpp.OFPActionPushVlan(),
99 ofpp.OFPActionSetField(
100 vlan_vid=lvid | ofp.OFPVID_PRESENT),
101 ofpp.OFPActionOutput(ofp.OFPP_NORMAL, 0),
106 vlan_vid=ofp.OFPVID_NONE),
110 self.assertEqual(expected, self.mock.mock_calls)
112 def test_reclaim_local_vlan(self):
114 segmentation_id = 777
115 self.br.reclaim_local_vlan(port=port, segmentation_id=segmentation_id)
116 (dp, ofp, ofpp) = self._get_dp()
121 vlan_vid=segmentation_id | ofp.OFPVID_PRESENT)),
123 self.assertEqual(expected, self.mock.mock_calls)
125 def test_reclaim_local_vlan_novlan(self):
127 segmentation_id = None
128 self.br.reclaim_local_vlan(port=port, segmentation_id=segmentation_id)
129 (dp, ofp, ofpp) = self._get_dp()
134 vlan_vid=ofp.OFPVID_NONE)),
136 self.assertEqual(expected, self.mock.mock_calls)
138 def test_install_dvr_to_src_mac(self):
139 network_type = 'vxlan'
141 gateway_mac = '08:60:6e:7f:74:e7'
142 dst_mac = '00:02:b3:13:fe:3d'
144 self.br.install_dvr_to_src_mac(network_type=network_type,
146 gateway_mac=gateway_mac,
149 (dp, ofp, ofpp) = self._get_dp()
151 call._send_msg(ofpp.OFPFlowMod(dp,
154 ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [
155 ofpp.OFPActionPopVlan(),
156 ofpp.OFPActionSetField(eth_src=gateway_mac),
157 ofpp.OFPActionOutput(6666, 0),
162 vlan_vid=vlan_tag | ofp.OFPVID_PRESENT),
166 self.assertEqual(expected, self.mock.mock_calls)
168 def test_delete_dvr_to_src_mac(self):
169 network_type = 'vxlan'
171 dst_mac = '00:02:b3:13:fe:3d'
172 self.br.delete_dvr_to_src_mac(network_type=network_type,
175 (dp, ofp, ofpp) = self._get_dp()
177 call.delete_flows(table_id=1,
180 vlan_vid=vlan_tag | ofp.OFPVID_PRESENT)),
182 self.assertEqual(expected, self.mock.mock_calls)
184 def test_install_dvr_to_src_mac_vlan(self):
185 network_type = 'vlan'
187 gateway_mac = '08:60:6e:7f:74:e7'
188 dst_mac = '00:02:b3:13:fe:3d'
190 self.br.install_dvr_to_src_mac(network_type=network_type,
192 gateway_mac=gateway_mac,
195 (dp, ofp, ofpp) = self._get_dp()
197 call._send_msg(ofpp.OFPFlowMod(dp,
200 ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [
201 ofpp.OFPActionPopVlan(),
202 ofpp.OFPActionSetField(eth_src=gateway_mac),
203 ofpp.OFPActionOutput(dst_port, 0),
208 vlan_vid=vlan_tag | ofp.OFPVID_PRESENT),
212 self.assertEqual(expected, self.mock.mock_calls)
214 def test_delete_dvr_to_src_mac_vlan(self):
215 network_type = 'vlan'
217 dst_mac = '00:02:b3:13:fe:3d'
218 self.br.delete_dvr_to_src_mac(network_type=network_type,
221 (dp, ofp, ofpp) = self._get_dp()
223 call.delete_flows(table_id=2,
226 vlan_vid=vlan_tag | ofp.OFPVID_PRESENT)),
228 self.assertEqual(expected, self.mock.mock_calls)
230 def test_add_dvr_mac_vlan(self):
231 mac = '00:02:b3:13:fe:3d'
233 self.br.add_dvr_mac_vlan(mac=mac, port=port)
234 (dp, ofp, ofpp) = self._get_dp()
236 call._send_msg(ofpp.OFPFlowMod(dp,
239 ofpp.OFPInstructionGotoTable(table_id=2),
247 self.assertEqual(expected, self.mock.mock_calls)
249 def test_remove_dvr_mac_vlan(self):
250 mac = '00:02:b3:13:fe:3d'
251 self.br.remove_dvr_mac_vlan(mac=mac)
252 (dp, ofp, ofpp) = self._get_dp()
254 call.delete_flows(eth_src=mac, table_id=0),
256 self.assertEqual(expected, self.mock.mock_calls)
258 def test_add_dvr_mac_tun(self):
259 mac = '00:02:b3:13:fe:3d'
261 self.br.add_dvr_mac_tun(mac=mac, port=port)
262 (dp, ofp, ofpp) = self._get_dp()
264 call._send_msg(ofpp.OFPFlowMod(dp,
267 ofpp.OFPInstructionGotoTable(table_id=1),
275 self.assertEqual(expected, self.mock.mock_calls)
277 def test_remove_dvr_mac_tun(self):
278 mac = '00:02:b3:13:fe:3d'
280 self.br.remove_dvr_mac_tun(mac=mac, port=port)
282 call.delete_flows(eth_src=mac, in_port=port, table_id=0),
284 self.assertEqual(expected, self.mock.mock_calls)
286 def test_install_icmpv6_na_spoofing_protection(self):
288 ip_addresses = ['2001:db8::1', 'fdf8:f53b:82e4::1/128']
289 self.br.install_icmpv6_na_spoofing_protection(port, ip_addresses)
290 (dp, ofp, ofpp) = self._get_dp()
292 call._send_msg(ofpp.OFPFlowMod(dp,
295 ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [
296 ofpp.OFPActionOutput(ofp.OFPP_NORMAL, 0),
300 eth_type=self.ether_types.ETH_TYPE_IPV6,
301 icmpv6_type=self.icmpv6.ND_NEIGHBOR_ADVERT,
302 ip_proto=self.in_proto.IPPROTO_ICMPV6,
303 ipv6_nd_target='2001:db8::1',
308 call._send_msg(ofpp.OFPFlowMod(dp,
311 ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [
312 ofpp.OFPActionOutput(ofp.OFPP_NORMAL, 0),
316 eth_type=self.ether_types.ETH_TYPE_IPV6,
317 icmpv6_type=self.icmpv6.ND_NEIGHBOR_ADVERT,
318 ip_proto=self.in_proto.IPPROTO_ICMPV6,
319 ipv6_nd_target='fdf8:f53b:82e4::1',
324 call._send_msg(ofpp.OFPFlowMod(dp,
327 ofpp.OFPInstructionGotoTable(table_id=24),
330 eth_type=self.ether_types.ETH_TYPE_IPV6,
331 icmpv6_type=self.icmpv6.ND_NEIGHBOR_ADVERT,
332 ip_proto=self.in_proto.IPPROTO_ICMPV6,
338 self.assertEqual(expected, self.mock.mock_calls)
340 def test_install_arp_spoofing_protection(self):
342 ip_addresses = ['192.0.2.1', '192.0.2.2/32']
343 self.br.install_arp_spoofing_protection(port, ip_addresses)
344 (dp, ofp, ofpp) = self._get_dp()
346 call._send_msg(ofpp.OFPFlowMod(dp,
349 ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [
350 ofpp.OFPActionOutput(ofp.OFPP_NORMAL, 0),
354 eth_type=self.ether_types.ETH_TYPE_ARP,
360 call._send_msg(ofpp.OFPFlowMod(dp,
363 ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [
364 ofpp.OFPActionOutput(ofp.OFPP_NORMAL, 0),
368 eth_type=self.ether_types.ETH_TYPE_ARP,
374 call._send_msg(ofpp.OFPFlowMod(dp,
377 ofpp.OFPInstructionGotoTable(table_id=24),
380 eth_type=self.ether_types.ETH_TYPE_ARP,
386 self.assertEqual(expected, self.mock.mock_calls)
388 def test_delete_arp_spoofing_protection(self):
390 self.br.delete_arp_spoofing_protection(port)
391 (dp, ofp, ofpp) = self._get_dp()
393 call.delete_flows(table_id=0, match=ofpp.OFPMatch(
394 eth_type=self.ether_types.ETH_TYPE_ARP,
396 call.delete_flows(table_id=0, match=ofpp.OFPMatch(
397 eth_type=self.ether_types.ETH_TYPE_IPV6,
398 icmpv6_type=self.icmpv6.ND_NEIGHBOR_ADVERT,
400 ip_proto=self.in_proto.IPPROTO_ICMPV6)),
401 call.delete_flows(table_id=24, in_port=port),
403 self.assertEqual(expected, self.mock.mock_calls)