self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify
# NOTE: callback_sg is referred to from the sg unit test.
+ self.callback_nec = NECPluginV2RPCCallbacks(self)
self.callback_sg = SecurityGroupServerRpcCallback()
- callbacks = [NECPluginV2RPCCallbacks(self),
+ callbacks = [self.callback_nec,
DhcpRpcCallback(), L3RpcCallback(),
self.callback_sg,
agents_db.AgentExtRpcCallback()]
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 NEC Corporation. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import contextlib
+import copy
+import itertools
+import time
+
+import mock
+from oslo.config import cfg
+import testtools
+
+from quantum.agent.linux import ovs_lib
+from quantum.extensions import securitygroup as ext_sg
+from quantum.plugins.nec.agent import nec_quantum_agent
+from quantum.tests import base
+
+DAEMON_LOOP_COUNT = 10
+OVS_DPID = '00000629355b6943'
+OVS_DPID_0X = '0x' + OVS_DPID
+
+
+class TestNecAgentBase(base.BaseTestCase):
+
+ def setUp(self):
+ super(TestNecAgentBase, self).setUp()
+ self.addCleanup(cfg.CONF.reset)
+ self.addCleanup(mock.patch.stopall)
+ cfg.CONF.set_override('rpc_backend',
+ 'quantum.openstack.common.rpc.impl_fake')
+ cfg.CONF.set_override('host', 'dummy-host')
+ with contextlib.nested(
+ mock.patch.object(ovs_lib.OVSBridge, 'get_datapath_id',
+ return_value=OVS_DPID),
+ mock.patch('socket.gethostname', return_value='dummy-host'),
+ mock.patch('quantum.openstack.common.loopingcall.'
+ 'FixedIntervalLoopingCall'),
+ mock.patch('quantum.agent.rpc.PluginReportStateAPI')
+ ) as (get_datapath_id, gethostname,
+ loopingcall, state_rpc_api):
+ kwargs = {'integ_br': 'integ_br',
+ 'root_helper': 'dummy_wrapper',
+ 'polling_interval': 1}
+ self.agent = nec_quantum_agent.NECQuantumAgent(**kwargs)
+ self.loopingcall = loopingcall
+ self.state_rpc_api = state_rpc_api
+
+
+class TestNecAgent(TestNecAgentBase):
+
+ def test_daemon_loop(self):
+
+ def state_check(index):
+ self.assertEqual(len(self.vif_ports_scenario[index]),
+ len(self.agent.cur_ports))
+
+ # Fake time.sleep to stop the infinite loop in daemon_loop()
+ self.sleep_count = 0
+
+ def sleep_mock(*args, **kwargs):
+ state_check(self.sleep_count)
+ self.sleep_count += 1
+ if self.sleep_count >= DAEMON_LOOP_COUNT:
+ raise RuntimeError()
+
+ vif_ports = [ovs_lib.VifPort('port1', '1', 'id-1', 'mac-1',
+ self.agent.int_br),
+ ovs_lib.VifPort('port2', '2', 'id-2', 'mac-2',
+ self.agent.int_br)]
+
+ self.vif_ports_scenario = [[], [], vif_ports[0:1], vif_ports[0:2],
+ vif_ports[1:2], []]
+
+ # Ensure vif_ports_scenario is longer than DAEMON_LOOP_COUNT
+ if len(self.vif_ports_scenario) < DAEMON_LOOP_COUNT:
+ self.vif_ports_scenario.extend(
+ [] for _i in xrange(DAEMON_LOOP_COUNT -
+ len(self.vif_ports_scenario)))
+
+ with contextlib.nested(
+ mock.patch.object(time, 'sleep', side_effect=sleep_mock),
+ mock.patch.object(ovs_lib.OVSBridge, 'get_vif_ports'),
+ mock.patch.object(nec_quantum_agent.NECPluginApi, 'update_ports'),
+ mock.patch.object(self.agent.sg_agent, 'prepare_devices_filter'),
+ mock.patch.object(self.agent.sg_agent, 'remove_devices_filter')
+ ) as (sleep, get_vif_potrs, update_ports,
+ prepare_devices_filter, remove_devices_filter):
+ get_vif_potrs.side_effect = self.vif_ports_scenario
+
+ with testtools.ExpectedException(RuntimeError):
+ self.agent.daemon_loop()
+ self.assertEqual(update_ports.call_count, 4)
+ self.assertEqual(sleep.call_count, DAEMON_LOOP_COUNT)
+
+ agent_id = 'nec-q-agent.dummy-host'
+ expected = [
+ mock.call(mock.ANY, agent_id, OVS_DPID_0X,
+ [{'id': 'id-1', 'mac': 'mac-1', 'port_no': '1'}],
+ []),
+ mock.call(mock.ANY, agent_id, OVS_DPID_0X,
+ [{'id': 'id-2', 'mac': 'mac-2', 'port_no': '2'}],
+ []),
+ mock.call(mock.ANY, agent_id, OVS_DPID_0X,
+ [], ['id-1']),
+ mock.call(mock.ANY, agent_id, OVS_DPID_0X,
+ [], ['id-2'])
+ ]
+ update_ports.assert_has_calls(expected)
+
+ expected = [mock.call(['id-1']),
+ mock.call(['id-2'])]
+ self.assertEqual(prepare_devices_filter.call_count, 2)
+ prepare_devices_filter.assert_has_calls(expected)
+ self.assertEqual(remove_devices_filter.call_count, 2)
+ remove_devices_filter.assert_has_calls(expected)
+
+ sleep.assert_called_with(self.agent.polling_interval)
+
+ def test_report_state_installed(self):
+ self.loopingcall.assert_called_once_with(self.agent._report_state)
+ instance = self.loopingcall.return_value
+ instance.start.assert_called_once_with(interval=4)
+
+ def _check_report_state(self, cur_ports, num_ports, fail_mode,
+ first=False):
+ self.assertEqual(first or fail_mode,
+ 'start_flag' in self.agent.agent_state)
+ self.agent.cur_ports = cur_ports
+
+ self.agent._report_state()
+
+ self.assertEqual(fail_mode,
+ 'start_flag' in self.agent.agent_state)
+ self.assertEqual(self.agent.
+ agent_state['configurations']['devices'],
+ num_ports)
+ self.num_ports_hist.append(num_ports)
+
+ def _test_report_state(self, fail_mode):
+ log_mocked = mock.patch.object(nec_quantum_agent, 'LOG')
+ log_patched = log_mocked.start()
+
+ def record_state(*args, **kwargs):
+ self.record_calls.append(copy.deepcopy(args))
+ if fail_mode:
+ raise Exception()
+
+ self.record_calls = []
+ self.num_ports_hist = []
+ state_rpc = self.state_rpc_api.return_value
+ state_rpc.report_state.side_effect = record_state
+ dummy_vif = ovs_lib.VifPort('port1', '1', 'id-1', 'mac-1', None)
+
+ self.state_rpc_api.assert_called_once_with('q-plugin')
+ self.assertTrue('start_flag' in self.agent.agent_state)
+
+ self._check_report_state([], 0, fail_mode, first=True)
+ self._check_report_state([dummy_vif] * 2, 2, fail_mode)
+ self._check_report_state([dummy_vif] * 5, 5, fail_mode)
+ self._check_report_state([], 0, fail_mode)
+
+ print 'record_state', self.record_calls
+ print 'num_ports_hist', self.num_ports_hist
+
+ # Since loopingcall start is mocked, call_count is same as
+ # the call count of check_report_state.
+ self.assertEqual(state_rpc.report_state.call_count, 4)
+ self.assertEqual(len(self.record_calls), 4)
+
+ for i, x in enumerate(itertools.izip(self.record_calls,
+ self.num_ports_hist)):
+ rec, num_ports = x
+ expected_state = {
+ 'binary': 'quantum-nec-agent',
+ 'host': 'dummy-host',
+ 'topic': 'N/A',
+ 'configurations': {'devices': 0},
+ 'agent_type': 'NEC plugin agent'}
+ expected_state['configurations']['devices'] = num_ports
+ if i == 0 or fail_mode:
+ expected_state['start_flag'] = True
+ self.assertEqual(expected_state, rec[1])
+
+ self.assertEqual(fail_mode, log_patched.exception.called)
+
+ def test_report_state(self):
+ self._test_report_state(fail_mode=False)
+
+ def test_report_state_fail(self):
+ self._test_report_state(fail_mode=True)
+
+
+class TestNecAgentCallback(TestNecAgentBase):
+
+ def test_port_update(self):
+ with contextlib.nested(
+ mock.patch.object(ovs_lib.OVSBridge, 'get_vif_port_by_id'),
+ mock.patch.object(self.agent.sg_agent, 'refresh_firewall')
+ ) as (get_vif_port_by_id, refresh_firewall):
+ context = mock.Mock()
+ vifport = ovs_lib.VifPort('port1', '1', 'id-1', 'mac-1',
+ self.agent.int_br)
+
+ # The OVS port does not exist.
+ get_vif_port_by_id.return_value = None
+ port = {'id': 'update-port-1'}
+ self.agent.callback_nec.port_update(context, port=port)
+ self.assertEqual(get_vif_port_by_id.call_count, 1)
+ self.assertFalse(refresh_firewall.call_count)
+
+ # The OVS port exists but no security group is associated.
+ get_vif_port_by_id.return_value = vifport
+ port = {'id': 'update-port-1'}
+ self.agent.callback_nec.port_update(context, port=port)
+ self.assertEqual(get_vif_port_by_id.call_count, 2)
+ self.assertFalse(refresh_firewall.call_count)
+
+ # The OVS port exists but a security group is associated.
+ get_vif_port_by_id.return_value = vifport
+ port = {'id': 'update-port-1',
+ ext_sg.SECURITYGROUPS: ['default']}
+ self.agent.callback_nec.port_update(context, port=port)
+ self.assertEqual(get_vif_port_by_id.call_count, 3)
+ self.assertEqual(refresh_firewall.call_count, 1)
+
+ get_vif_port_by_id.return_value = None
+ port = {'id': 'update-port-1',
+ ext_sg.SECURITYGROUPS: ['default']}
+ self.agent.callback_nec.port_update(context, port=port)
+ self.assertEqual(get_vif_port_by_id.call_count, 4)
+ self.assertEqual(refresh_firewall.call_count, 1)
+
+
+class TestNecAgentPluginApi(TestNecAgentBase):
+
+ def _test_plugin_api(self, expected_failure=False):
+ with contextlib.nested(
+ mock.patch.object(nec_quantum_agent.NECPluginApi, 'make_msg'),
+ mock.patch.object(nec_quantum_agent.NECPluginApi, 'call'),
+ mock.patch.object(nec_quantum_agent, 'LOG')
+ ) as (make_msg, apicall, log):
+ agent_id = 'nec-q-agent.dummy-host'
+ if expected_failure:
+ apicall.side_effect = Exception()
+
+ self.agent.plugin_rpc.update_ports(
+ mock.sentinel.ctx, agent_id, OVS_DPID_0X,
+ # port_added
+ [{'id': 'id-1', 'mac': 'mac-1', 'port_no': '1'},
+ {'id': 'id-2', 'mac': 'mac-2', 'port_no': '2'}],
+ # port_removed
+ ['id-3', 'id-4', 'id-5'])
+
+ make_msg.assert_called_once_with(
+ 'update_ports', topic='q-agent-notifier',
+ agent_id=agent_id, datapath_id=OVS_DPID_0X,
+ port_added=[{'id': 'id-1', 'mac': 'mac-1', 'port_no': '1'},
+ {'id': 'id-2', 'mac': 'mac-2', 'port_no': '2'}],
+ port_removed=['id-3', 'id-4', 'id-5'])
+
+ apicall.assert_called_once_with(mock.sentinel.ctx,
+ make_msg.return_value)
+
+ self.assertTrue(log.info.called)
+ if expected_failure:
+ self.assertTrue(log.warn.called)
+
+ def test_plugin_api(self):
+ self._test_plugin_api()
+
+ def test_plugin_api_fail(self):
+ self._test_plugin_api(expected_failure=True)
+
+
+class TestNecAgentMain(base.BaseTestCase):
+ def test_main(self):
+ with contextlib.nested(
+ mock.patch.object(nec_quantum_agent, 'NECQuantumAgent'),
+ mock.patch('eventlet.monkey_patch'),
+ mock.patch('quantum.common.config'),
+ mock.patch.object(nec_quantum_agent, 'config')
+ ) as (agent, eventlet, logging_config, cfg):
+ cfg.CONF.ovs.integration_bridge = 'br-int-x'
+ cfg.CONF.AGENT.root_helper = 'dummy-helper'
+ cfg.CONF.AGENT.polling_interval = 10
+
+ nec_quantum_agent.main()
+
+ self.assertTrue(eventlet.called)
+ self.assertTrue(logging_config.setup_logging.called)
+ agent.assert_has_calls([
+ mock.call('br-int-x', 'dummy-helper', 10),
+ mock.call().daemon_loop()
+ ])
from quantum import context as q_context
from quantum.extensions import portbindings
from quantum import manager
+from quantum.plugins.nec.common import exceptions as nexc
from quantum.plugins.nec.db import api as ndb
from quantum.plugins.nec import nec_plugin
from quantum.tests.unit import _test_extension_portbindings as test_bindings
from quantum.tests.unit import test_security_groups_rpc as test_sg_rpc
-OFC_MANAGER = 'quantum.plugins.nec.nec_plugin.ofc_manager.OFCManager'
PLUGIN_NAME = 'quantum.plugins.nec.nec_plugin.NECPluginV2'
+OFC_MANAGER = 'quantum.plugins.nec.nec_plugin.ofc_manager.OFCManager'
+OFC_DRIVER = 'quantum.tests.unit.nec.stub_ofc_driver.StubOFCDriver'
class NecPluginV2TestCase(test_plugin.QuantumDbPluginV2TestCase):
_plugin_name = PLUGIN_NAME
+ PACKET_FILTER_ENABLE = False
def setUp(self):
+ self.addCleanup(mock.patch.stopall)
+ ofc_manager_cls = mock.patch(OFC_MANAGER).start()
+ ofc_driver = ofc_manager_cls.return_value.driver
+ ofc_driver.filter_supported.return_value = self.PACKET_FILTER_ENABLE
super(NecPluginV2TestCase, self).setUp(self._plugin_name)
+ self.context = q_context.get_admin_context()
+ self.plugin = manager.QuantumManager.get_plugin()
class TestNecBasicGet(test_plugin.TestBasicGet, NecPluginV2TestCase):
class TestNecPortsV2Callback(NecPluginV2TestCase):
def setUp(self):
- self.addCleanup(mock.patch.stopall)
- ofc_manager_p = mock.patch(OFC_MANAGER)
- ofc_manager_cls = ofc_manager_p.start()
- self.ofc = mock.Mock()
- ofc_manager_cls.return_value = self.ofc
- self.ofc_port_exists = False
- self._setup_side_effects()
-
super(TestNecPortsV2Callback, self).setUp()
- self.context = q_context.get_admin_context()
- self.plugin = manager.QuantumManager.get_plugin()
self.callbacks = nec_plugin.NECPluginV2RPCCallbacks(self.plugin)
+ self.ofc = self.plugin.ofc
+ self.ofc_port_exists = False
+ self._setup_side_effects()
+
def _setup_side_effects(self):
def _create_ofc_port_called(*args, **kwargs):
self.ofc_port_exists = True
self.ofc.assert_has_calls(expected)
self.assertEqual(2, self.ofc.create_ofc_port.call_count)
self.assertEqual(1, self.ofc.delete_ofc_port.call_count)
+
+
+class TestNecPluginDbTest(NecPluginV2TestCase):
+
+ def test_update_resource(self):
+ with self.network() as network:
+ self.assertEqual("ACTIVE", network['network']['status'])
+ net_id = network['network']['id']
+ for status in ["DOWN", "BUILD", "ERROR", "ACTIVE"]:
+ self.plugin._update_resource_status(
+ self.context, 'network', net_id,
+ getattr(nec_plugin.OperationalStatus, status))
+ n = self.plugin._get_network(self.context, net_id)
+ self.assertEqual(status, n.status)
+
+
+class TestNecPluginOfcManager(NecPluginV2TestCase):
+ def setUp(self):
+ super(TestNecPluginOfcManager, self).setUp()
+ self.ofc = self.plugin.ofc
+
+ def _update_resource(self, resource, id, data):
+ collection = resource + 's'
+ data = {resource: data}
+ req = self.new_update_request(collection, data, id)
+ res = self.deserialize(self.fmt, req.get_response(self.api))
+ return res[resource]
+
+ def _show_resource(self, resource, id):
+ collection = resource + 's'
+ req = self.new_show_request(collection, id)
+ res = self.deserialize(self.fmt, req.get_response(self.api))
+ return res[resource]
+
+ def _list_resource(self, resource):
+ collection = resource + 's'
+ req = self.new_list_request(collection)
+ res = req.get_response(self.api)
+ return res[collection]
+
+ def _delete_resource(self, resource, id):
+ collection = resource + 's'
+ req = self.new_delete_request(collection, id)
+ res = req.get_response(self.api)
+ return res.status_int
+
+ def test_create_network(self):
+ self.ofc.exists_ofc_tenant.return_value = False
+ net = None
+ ctx = mock.ANY
+ with self.network() as network:
+ net = network['network']
+ self.assertEqual(network['network']['status'], 'ACTIVE')
+
+ expected = [
+ mock.call.exists_ofc_tenant(ctx, self._tenant_id),
+ mock.call.create_ofc_tenant(ctx, self._tenant_id),
+ mock.call.create_ofc_network(ctx, self._tenant_id, net['id'],
+ net['name']),
+ mock.call.delete_ofc_network(ctx, net['id'], mock.ANY),
+ mock.call.delete_ofc_tenant(ctx, self._tenant_id)
+ ]
+ self.ofc.assert_has_calls(expected)
+
+ def test_create_network_with_admin_state_down(self):
+ self.ofc.exists_ofc_tenant.return_value = False
+ net = None
+ ctx = mock.ANY
+ with self.network(admin_state_up=False) as network:
+ net = network['network']
+
+ expected = [
+ mock.call.exists_ofc_tenant(ctx, self._tenant_id),
+ mock.call.create_ofc_tenant(ctx, self._tenant_id),
+ mock.call.create_ofc_network(ctx, self._tenant_id, net['id'],
+ net['name']),
+ mock.call.delete_ofc_network(ctx, net['id'], mock.ANY),
+ mock.call.delete_ofc_tenant(ctx, self._tenant_id)
+ ]
+ self.ofc.assert_has_calls(expected)
+
+ def test_create_two_network(self):
+ self.ofc.exists_ofc_tenant.side_effect = [False, True]
+ nets = []
+ ctx = mock.ANY
+ with self.network() as net1:
+ nets.append(net1['network'])
+ self.assertEqual(net1['network']['status'], 'ACTIVE')
+ with self.network() as net2:
+ nets.append(net2['network'])
+ self.assertEqual(net2['network']['status'], 'ACTIVE')
+
+ expected = [
+ mock.call.exists_ofc_tenant(ctx, self._tenant_id),
+ mock.call.create_ofc_tenant(ctx, self._tenant_id),
+ mock.call.create_ofc_network(ctx, self._tenant_id, nets[0]['id'],
+ nets[0]['name']),
+ mock.call.exists_ofc_tenant(ctx, self._tenant_id),
+ mock.call.create_ofc_network(ctx, self._tenant_id, nets[1]['id'],
+ nets[1]['name']),
+ mock.call.delete_ofc_network(ctx, nets[1]['id'], mock.ANY),
+ mock.call.delete_ofc_network(ctx, nets[0]['id'], mock.ANY),
+ mock.call.delete_ofc_tenant(ctx, self._tenant_id)
+ ]
+ self.ofc.assert_has_calls(expected)
+
+ def test_create_network_fail(self):
+ self.ofc.exists_ofc_tenant.return_value = False
+ self.ofc.create_ofc_network.side_effect = nexc.OFCException(
+ reason='hoge')
+
+ net = None
+ ctx = mock.ANY
+ with self.network() as network:
+ net = network['network']
+
+ expected = [
+ mock.call.exists_ofc_tenant(ctx, self._tenant_id),
+ mock.call.create_ofc_tenant(ctx, self._tenant_id),
+ mock.call.create_ofc_network(ctx, self._tenant_id, net['id'],
+ net['name']),
+ mock.call.delete_ofc_network(ctx, net['id'], mock.ANY),
+ mock.call.delete_ofc_tenant(ctx, self._tenant_id)
+ ]
+ self.ofc.assert_has_calls(expected)
+
+ def test_update_network(self):
+ self.ofc.exists_ofc_tenant.return_value = False
+
+ net = None
+ ctx = mock.ANY
+ with self.network() as network:
+ net = network['network']
+ self.assertEqual(network['network']['status'], 'ACTIVE')
+
+ # Set admin_state_up to False
+ res = self._update_resource('network', net['id'],
+ {'admin_state_up': False})
+ self.assertFalse(res['admin_state_up'])
+
+ # Set admin_state_up to True
+ res = self._update_resource('network', net['id'],
+ {'admin_state_up': True})
+ self.assertTrue(res['admin_state_up'])
+
+ expected = [
+ mock.call.exists_ofc_tenant(ctx, self._tenant_id),
+ mock.call.create_ofc_tenant(ctx, self._tenant_id),
+ mock.call.create_ofc_network(ctx, self._tenant_id, net['id'],
+ net['name']),
+ mock.call.delete_ofc_network(ctx, net['id'], mock.ANY),
+ mock.call.delete_ofc_tenant(ctx, self._tenant_id)
+ ]
+ self.ofc.assert_has_calls(expected)
+
+ def _rpcapi_update_ports(self, agent_id='nec-q-agent.fake',
+ datapath_id="0xabc", added=[], removed=[]):
+ kwargs = {'topic': topics.AGENT,
+ 'agent_id': agent_id,
+ 'datapath_id': datapath_id,
+ 'port_added': added, 'port_removed': removed}
+ self.plugin.callback_nec.update_ports(self.context, **kwargs)
+
+ def test_create_port_no_ofc_creation(self):
+ self.ofc.exists_ofc_tenant.return_value = False
+ self.ofc.exists_ofc_port.return_value = False
+
+ net = None
+ p1 = None
+ ctx = mock.ANY
+ with self.subnet() as subnet:
+ with self.port(subnet=subnet) as port:
+ p1 = port['port']
+ net_id = port['port']['network_id']
+ net = self._show_resource('network', net_id)
+ self.assertEqual(net['status'], 'ACTIVE')
+ self.assertEqual(p1['status'], 'ACTIVE')
+
+ expected = [
+ mock.call.exists_ofc_tenant(ctx, self._tenant_id),
+ mock.call.create_ofc_tenant(ctx, self._tenant_id),
+ mock.call.create_ofc_network(ctx, self._tenant_id, net['id'],
+ net['name']),
+
+ mock.call.exists_ofc_port(ctx, p1['id']),
+ mock.call.delete_ofc_network(ctx, net['id'], mock.ANY),
+ mock.call.delete_ofc_tenant(ctx, self._tenant_id)
+ ]
+ self.ofc.assert_has_calls(expected)
+
+ def test_create_port_with_ofc_creation(self):
+ self.ofc.exists_ofc_tenant.return_value = False
+ self.ofc.exists_ofc_port.side_effect = [False, True]
+
+ net = None
+ p1 = None
+ ctx = mock.ANY
+ with self.subnet() as subnet:
+ with self.port(subnet=subnet) as port:
+ p1 = port['port']
+ net_id = port['port']['network_id']
+ net = self._show_resource('network', net_id)
+ self.assertEqual(net['status'], 'ACTIVE')
+ self.assertEqual(p1['status'], 'ACTIVE')
+
+ # Check the port is not created on OFC
+ self.assertFalse(self.ofc.create_ofc_port.call_count)
+
+ # Register portinfo, then the port is created on OFC
+ portinfo = {'id': p1['id'], 'port_no': 123}
+ self._rpcapi_update_ports(added=[portinfo])
+ self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
+
+ expected = [
+ mock.call.exists_ofc_tenant(ctx, self._tenant_id),
+ mock.call.create_ofc_tenant(ctx, self._tenant_id),
+ mock.call.create_ofc_network(ctx, self._tenant_id, net['id'],
+ net['name']),
+
+ mock.call.exists_ofc_port(ctx, p1['id']),
+ mock.call.create_ofc_port(ctx, p1['id'], mock.ANY),
+
+ mock.call.exists_ofc_port(ctx, p1['id']),
+ mock.call.delete_ofc_port(ctx, p1['id'], mock.ANY),
+ mock.call.delete_ofc_network(ctx, net['id'], mock.ANY),
+ mock.call.delete_ofc_tenant(ctx, self._tenant_id)
+ ]
+ self.ofc.assert_has_calls(expected)
+
+ def test_update_port(self):
+ self._test_update_port_with_admin_state(resource='port')
+
+ def test_update_network_with_ofc_port(self):
+ self._test_update_port_with_admin_state(resource='network')
+
+ def _test_update_port_with_admin_state(self, resource='port'):
+ self.ofc.exists_ofc_tenant.return_value = False
+ self.ofc.exists_ofc_port.side_effect = [False, True, False]
+
+ net = None
+ p1 = None
+ ctx = mock.ANY
+
+ if resource == 'network':
+ net_ini_admin_state = False
+ port_ini_admin_state = True
+ else:
+ net_ini_admin_state = True
+ port_ini_admin_state = False
+
+ with self.network(admin_state_up=net_ini_admin_state) as network:
+ with self.subnet(network=network) as subnet:
+ with self.port(subnet=subnet,
+ admin_state_up=port_ini_admin_state) as port:
+ p1 = port['port']
+ net_id = port['port']['network_id']
+ res_id = net_id if resource == 'network' else p1['id']
+
+ net = self._show_resource('network', net_id)
+
+ # Check the port is not created on OFC
+ self.assertFalse(self.ofc.create_ofc_port.call_count)
+
+ # Register portinfo, then the port is created on OFC
+ portinfo = {'id': p1['id'], 'port_no': 123}
+ self._rpcapi_update_ports(added=[portinfo])
+ self.assertFalse(self.ofc.create_ofc_port.call_count)
+
+ self._update_resource(resource, res_id,
+ {'admin_state_up': True})
+ self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
+ self.assertFalse(self.ofc.delete_ofc_port.call_count)
+
+ self._update_resource(resource, res_id,
+ {'admin_state_up': False})
+ self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
+
+ expected = [
+ mock.call.exists_ofc_tenant(ctx, self._tenant_id),
+ mock.call.create_ofc_tenant(ctx, self._tenant_id),
+ mock.call.create_ofc_network(ctx, self._tenant_id, net['id'],
+ net['name']),
+
+ mock.call.exists_ofc_port(ctx, p1['id']),
+ mock.call.create_ofc_port(ctx, p1['id'], mock.ANY),
+
+ mock.call.exists_ofc_port(ctx, p1['id']),
+ mock.call.delete_ofc_port(ctx, p1['id'], mock.ANY),
+
+ mock.call.exists_ofc_port(ctx, p1['id']),
+ mock.call.delete_ofc_network(ctx, net['id'], mock.ANY),
+ mock.call.delete_ofc_tenant(ctx, self._tenant_id)
+ ]
+ self.ofc.assert_has_calls(expected)
self.driver.delete_port(port_path)
self.mox.VerifyAll()
+ def test_filter_supported(self):
+ self.assertFalse(self.driver.filter_supported())
+
class PFCDriverBaseTest(PFCDriverTestBase):
pass
self.assertEqual(ofc_t_path, ret)
def testc_delete_tenant(self):
- pass
+ t, n, p = self.get_ofc_item_random_params()
+
+ path = "/tenants/%s" % _ofc(t)
+ # There is no API call.
+ self.mox.ReplayAll()
+
+ self.driver.delete_tenant(path)
+ self.mox.VerifyAll()
class PFCV4DriverTest(PFCDriverTestBase):
from quantum.extensions import securitygroup as ext_sg
from quantum import manager
from quantum.plugins.nec.db import api as ndb # noqa
+from quantum.tests.unit.nec import test_nec_plugin
from quantum.tests.unit import test_extension_security_group as test_sg
from quantum.tests.unit import test_security_groups_rpc as test_sg_rpc
-PLUGIN_NAME = ('quantum.plugins.nec.nec_plugin.NECPluginV2')
+PLUGIN_NAME = test_nec_plugin.PLUGIN_NAME
+OFC_MANAGER = test_nec_plugin.OFC_MANAGER
AGENT_NAME = ('quantum.plugins.nec.agent.nec_quantum_agent.NECQuantumAgent')
NOTIFIER = ('quantum.plugins.nec.nec_plugin.NECPluginV2AgentNotifierApi')
class NecSecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase):
- _plugin_name = PLUGIN_NAME
def setUp(self, plugin=None):
test_sg_rpc.set_firewall_driver(test_sg_rpc.FIREWALL_HYBRID_DRIVER)
self.addCleanup(mock.patch.stopall)
- notifier_p = mock.patch(NOTIFIER)
- notifier_cls = notifier_p.start()
- self.notifier = mock.Mock()
- notifier_cls.return_value = self.notifier
+ mock.patch(NOTIFIER).start()
+ mock.patch(OFC_MANAGER).start()
self._attribute_map_bk_ = {}
for item in attributes.RESOURCE_ATTRIBUTE_MAP:
self._attribute_map_bk_[item] = (attributes.
RESOURCE_ATTRIBUTE_MAP[item].
copy())
super(NecSecurityGroupsTestCase, self).setUp(PLUGIN_NAME)
+ self.notifier = manager.QuantumManager.get_plugin().notifier
def tearDown(self):
super(NecSecurityGroupsTestCase, self).tearDown()
# under the License.
# @author: Ryota MIBU
+import random
+
import mox
from quantum import context
tenant_id = uuidutils.generate_uuid()
network_id = uuidutils.generate_uuid()
port_id = uuidutils.generate_uuid()
+ mac = ':'.join(['%x' % random.randint(0, 255) for i in xrange(6)])
portinfo = nmodels.PortInfo(id=port_id, datapath_id="0x123456789",
port_no=1234, vlan_id=321,
- mac="11:22:33:44:55:66")
+ mac=mac)
return tenant_id, network_id, portinfo
class TremaDriverNetworkTestBase(TremaDriverTestBase):
+ def test_create_tenant(self):
+ t, n, p = self.get_ofc_item_random_params()
+ # There is no API call.
+ self.mox.ReplayAll()
+ ret = self.driver.create_tenant('dummy_desc', t)
+ self.mox.VerifyAll()
+ ofc_t_path = "/tenants/%s" % t
+ self.assertEqual(ofc_t_path, ret)
+
+ def test_update_tenant(self):
+ t, n, p = self.get_ofc_item_random_params()
+ path = "/tenants/%s" % t
+ # There is no API call.
+ self.mox.ReplayAll()
+ self.driver.update_tenant(path, 'dummy_desc')
+ self.mox.VerifyAll()
+
+ def testc_delete_tenant(self):
+ t, n, p = self.get_ofc_item_random_params()
+ path = "/tenants/%s" % t
+ # There is no API call.
+ self.mox.ReplayAll()
+ self.driver.delete_tenant(path)
+ self.mox.VerifyAll()
+
def testa_create_network(self):
t, n, p = self.get_ofc_item_random_params()
description = "desc of %s" % n
driver_name = "trema_port"
+ def test_filter_supported(self):
+ self.assertTrue(self.driver.filter_supported())
+
def testd_create_port(self):
_t, n, p = self.get_ofc_item_random_params()
driver_name = "trema_portmac"
+ def test_filter_supported(self):
+ self.assertTrue(self.driver.filter_supported())
+
def testd_create_port(self):
t, n, p = self.get_ofc_item_random_params()
dummy_port = "dummy-%s" % p.id
driver_name = "trema_mac"
+ def test_filter_supported(self):
+ self.assertFalse(self.driver.filter_supported())
+
def testd_create_port(self):
t, n, p = self.get_ofc_item_random_params()
class TremaFilterDriverTest(TremaDriverTestBase):
-
- def get_ofc_item_random_params(self):
- """create random parameters for ofc_item test."""
- t, n, p = (super(TremaFilterDriverTest, self).
- get_ofc_item_random_params())
- filter_id = uuidutils.generate_uuid()
- filter_dict = {'tenant_id': t,
- 'id': filter_id,
- 'network_id': n,
- 'priority': 123,
- 'action': "ACCEPT",
- 'in_port': p.id,
- 'src_mac': p.mac,
- 'dst_mac': "",
- 'eth_type': 0,
- 'src_cidr': "",
- 'dst_cidr': "",
- 'src_port': 0,
- 'dst_port': 0,
- 'protocol': "TCP",
- 'admin_state_up': True,
- 'status': "ACTIVE"}
- filter_item = nmodels.PacketFilter(**filter_dict)
- return t, n, p, filter_item
-
- def testa_create_filter(self):
- t, n, p, f = self.get_ofc_item_random_params()
+ def _test_create_filter(self, filter_dict=None, filter_post=None,
+ filter_wildcards=None, no_portinfo=False):
+ t, n, p = self.get_ofc_item_random_params()
+ src_mac = ':'.join(['%x' % random.randint(0, 255) for i in xrange(6)])
+ if filter_wildcards is None:
+ filter_wildcards = []
+
+ f = {'tenant_id': t,
+ 'id': uuidutils.generate_uuid(),
+ 'network_id': n,
+ 'priority': 123,
+ 'action': "ACCEPT",
+ 'in_port': p.id,
+ 'src_mac': src_mac,
+ 'dst_mac': "",
+ 'eth_type': 0,
+ 'src_cidr': "",
+ 'dst_cidr': "",
+ 'src_port': 0,
+ 'dst_port': 0,
+ 'protocol': "TCP",
+ 'admin_state_up': True,
+ 'status': "ACTIVE"}
+ if filter_dict:
+ f.update(filter_dict)
+ print 'filter=%s' % f
net_path = "/networks/%s" % n
- ofp_wildcards = 'dl_vlan,dl_vlan_pcp,nw_tos,dl_dst,' + \
- 'nw_src:32,nw_dst:32,tp_src,tp_dst'
- body = {'id': f.id,
+
+ all_wildcards_ofp = ['dl_vlan', 'dl_vlan_pcp', 'nw_tos',
+ 'in_port', 'dl_src', 'dl_dst',
+ 'nw_src', 'nw_dst',
+ 'dl_type', 'nw_proto',
+ 'tp_src', 'tp_dst']
+ all_wildcards_non_ofp = ['in_datapath_id', 'slice']
+
+ body = {'id': f['id'],
'action': 'ALLOW',
'priority': 123,
'slice': n,
'in_port': 1234,
'nw_proto': '0x6',
'dl_type': '0x800',
- 'dl_src': p.mac,
- 'ofp_wildcards': ofp_wildcards}
+ 'dl_src': src_mac}
+ if filter_post:
+ body.update(filter_post)
+
+ if no_portinfo:
+ filter_wildcards += ['in_datapath_id', 'in_port']
+ p = None
+
+ for field in filter_wildcards:
+ if field in body:
+ del body[field]
+
+ ofp_wildcards = ["%s:32" % _f if _f in ['nw_src', 'nw_dst'] else _f
+ for _f in all_wildcards_ofp if _f not in body]
+ body['ofp_wildcards'] = ','.join(ofp_wildcards)
+
+ non_ofp_wildcards = [_f for _f in all_wildcards_non_ofp
+ if _f not in body]
+ if non_ofp_wildcards:
+ body['wildcards'] = ','.join(non_ofp_wildcards)
+
ofc_client.OFCClient.do_request("POST", "/filters", body=body)
self.mox.ReplayAll()
- ret = self.driver.create_filter(net_path, f, p, f.id)
+ ret = self.driver.create_filter(net_path, f, p, f['id'])
self.mox.VerifyAll()
- self.assertEqual(ret, '/filters/%s' % f.id)
+ self.assertEqual(ret, '/filters/%s' % f['id'])
+
+ def test_create_filter_accept(self):
+ self._test_create_filter(filter_dict={'action': 'ACCEPT'})
+
+ def test_create_filter_allow(self):
+ self._test_create_filter(filter_dict={'action': 'ALLOW'})
+
+ def test_create_filter_deny(self):
+ self._test_create_filter(filter_dict={'action': 'DENY'},
+ filter_post={'action': 'DENY'})
+
+ def test_create_filter_drop(self):
+ self._test_create_filter(filter_dict={'action': 'DROP'},
+ filter_post={'action': 'DENY'})
+
+ def test_create_filter_no_port(self):
+ self._test_create_filter(no_portinfo=True)
+
+ def test_create_filter_src_mac_wildcard(self):
+ self._test_create_filter(filter_dict={'src_mac': ''},
+ filter_wildcards=['dl_src'])
+
+ def test_create_filter_dst_mac(self):
+ dst_mac = ':'.join(['%x' % random.randint(0, 255) for i in xrange(6)])
+ self._test_create_filter(filter_dict={'dst_mac': dst_mac},
+ filter_post={'dl_dst': dst_mac})
+
+ def test_create_filter_src_cidr(self):
+ src_cidr = '10.2.0.0/24'
+ self._test_create_filter(filter_dict={'src_cidr': src_cidr},
+ filter_post={'nw_src': src_cidr})
+
+ def test_create_filter_dst_cidr(self):
+ dst_cidr = '192.168.10.0/24'
+ self._test_create_filter(filter_dict={'dst_cidr': dst_cidr},
+ filter_post={'nw_dst': dst_cidr})
+
+ def test_create_filter_proto_icmp(self):
+ self._test_create_filter(
+ filter_dict={'protocol': 'icmp'},
+ filter_post={'dl_type': '0x800', 'nw_proto': '0x1'})
+
+ def test_create_filter_proto_tcp(self):
+ self._test_create_filter(
+ filter_dict={'protocol': 'tcp'},
+ filter_post={'dl_type': '0x800', 'nw_proto': '0x6'})
+
+ def test_create_filter_proto_udp(self):
+ self._test_create_filter(
+ filter_dict={'protocol': 'udp'},
+ filter_post={'dl_type': '0x800', 'nw_proto': '0x11'})
+
+ def test_create_filter_proto_arp(self):
+ self._test_create_filter(
+ filter_dict={'protocol': 'arp'},
+ filter_post={'dl_type': '0x806'},
+ filter_wildcards=['nw_proto'])
+
+ def test_create_filter_proto_misc(self):
+ self._test_create_filter(
+ filter_dict={'protocol': '0x33', 'eth_type': '0x900'},
+ filter_post={'dl_type': '0x900', 'nw_proto': '0x33'})
+
+ def test_create_filter_proto_misc_dl_type_wildcard(self):
+ self._test_create_filter(
+ filter_dict={'protocol': '0x33', 'ether_type': ''},
+ filter_post={'nw_proto': '0x33'},
+ filter_wildcards=['dl_type'])
+
+ def test_create_filter_proto_wildcard(self):
+ self._test_create_filter(
+ filter_dict={'protocol': ''},
+ filter_wildcards=['dl_type', 'nw_proto'])
+
+ def test_create_filter_src_dst_port(self):
+ self._test_create_filter(filter_dict={'src_port': 8192,
+ 'dst_port': 4096},
+ filter_post={'tp_src': '0x2000',
+ 'tp_dst': '0x1000'})
def testb_delete_filter(self):
- t, n, p, f = self.get_ofc_item_random_params()
+ t, n, p = self.get_ofc_item_random_params()
- f_path = "/filters/%s" % f.id
+ f_path = "/filters/%s" % uuidutils.generate_uuid()
ofc_client.OFCClient.do_request("DELETE", f_path)
self.mox.ReplayAll()