--- /dev/null
+# Copyright (c) 2013 OpenStack Foundation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import mock
+
+
+OFC_MANAGER = 'neutron.plugins.nec.nec_plugin.ofc_manager.OFCManager'
+
+
+def patch_ofc_manager():
+ m = mock.patch(OFC_MANAGER).start()
+ f = FakeOFCManager()
+
+ m.create_ofc_tenant.side_effect = f.create_ofc_tenant
+ m.delete_ofc_tenant.side_effect = f.delete_ofc_tenant
+ m.exists_ofc_tenant.side_effect = f.exists_ofc_tenant
+ m.create_ofc_network.side_effect = f.create_ofc_net
+ m.delete_ofc_network.side_effect = f.delete_ofc_net
+ m.exists_ofc_network.side_effect = f.exists_ofc_net
+ m.create_ofc_port.side_effect = f.create_ofc_port
+ m.delete_ofc_port.side_effect = f.delete_ofc_port
+ m.exists_ofc_port.side_effect = f.exists_ofc_port
+ m.create_ofc_packet_filter.side_effect = f.create_ofc_pf
+ m.delete_ofc_packet_filter.side_effect = f.delete_ofc_pf
+ m.exists_ofc_packet_filter.side_effect = f.exists_ofc_pf
+ m.set_raise_exc = f.set_raise_exc
+
+ return m
+
+
+class FakeOFCManager(object):
+
+ def __init__(self):
+ self.ofc_tenants = {}
+ self.ofc_nets = {}
+ self.ofc_ports = {}
+ self.ofc_pfs = {}
+ self.raise_exc_map = {}
+
+ def set_raise_exc(self, func, raise_exc):
+ self.raise_exc_map.update({func: raise_exc})
+
+ def _raise_exc(self, func):
+ exc = self.raise_exc_map.get(func)
+ if exc:
+ raise exc
+
+ def create_ofc_tenant(self, context, tenant_id):
+ self._raise_exc('create_ofc_tenant')
+ self.ofc_tenants.update({tenant_id: True})
+
+ def exists_ofc_tenant(self, context, tenant_id):
+ self._raise_exc('exists_ofc_tenant')
+ return self.ofc_tenants.get(tenant_id, False)
+
+ def delete_ofc_tenant(self, context, tenant_id):
+ self._raise_exc('delete_ofc_tenant')
+ del self.ofc_tenants[tenant_id]
+
+ def create_ofc_net(self, context, tenant_id, net_id, net_name=None):
+ self._raise_exc('create_ofc_network')
+ self.ofc_nets.update({net_id: True})
+
+ def exists_ofc_net(self, context, net_id):
+ self._raise_exc('exists_ofc_network')
+ return self.ofc_nets.get(net_id, False)
+
+ def delete_ofc_net(self, context, net_id, net):
+ self._raise_exc('delete_ofc_network')
+ del self.ofc_nets[net_id]
+
+ def create_ofc_port(self, context, port_id, port):
+ self._raise_exc('create_ofc_port')
+ self.ofc_ports.update({port_id: True})
+
+ def exists_ofc_port(self, context, port_id):
+ self._raise_exc('exists_ofc_port')
+ return self.ofc_ports.get(port_id, False)
+
+ def delete_ofc_port(self, context, port_id, port):
+ self._raise_exc('delete_ofc_port')
+ del self.ofc_ports[port_id]
+
+ def create_ofc_pf(self, context, pf_id, pf_dict):
+ self._raise_exc('create_ofc_packet_filter')
+ self.ofc_pfs.update({pf_id: True})
+
+ def exists_ofc_pf(self, context, pf_id):
+ self._raise_exc('exists_ofc_packet_filter')
+ return self.ofc_pfs.get(pf_id, False)
+
+ def delete_ofc_pf(self, context, pf_id):
+ self._raise_exc('delete_ofc_packet_filter')
+ del self.ofc_pfs[pf_id]
# See the License for the specific language governing permissions and
# limitations under the License.
+import os
+
+import fixtures
import mock
+from neutron.common.test_lib import test_config
from neutron.common import topics
-from neutron import context as q_context
+from neutron import context
from neutron.extensions import portbindings
from neutron import manager
from neutron.plugins.nec.common import exceptions as nexc
from neutron.plugins.nec.db import api as ndb
from neutron.plugins.nec import nec_plugin
from neutron.tests.unit import _test_extension_portbindings as test_bindings
+from neutron.tests.unit.nec import fake_ofc_manager
from neutron.tests.unit import test_db_plugin as test_plugin
from neutron.tests.unit import test_security_groups_rpc as test_sg_rpc
PLUGIN_NAME = 'neutron.plugins.nec.nec_plugin.NECPluginV2'
-OFC_MANAGER = 'neutron.plugins.nec.nec_plugin.ofc_manager.OFCManager'
-OFC_DRIVER = 'neutron.tests.unit.nec.stub_ofc_driver.StubOFCDriver'
+NEC_PLUGIN_INI = """
+[DEFAULT]
+api_extensions_path = neutron/plugins/nec/extensions
+[OFC]
+driver = neutron.tests.unit.nec.stub_ofc_driver.StubOFCDriver
+enable_packet_filter = False
+"""
class NecPluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase):
_plugin_name = PLUGIN_NAME
- PACKET_FILTER_ENABLE = False
+ _nec_ini = NEC_PLUGIN_INI
+
+ def _set_nec_ini(self):
+ self.nec_ini_file = self.useFixture(fixtures.TempDir()).join("nec.ini")
+ with open(self.nec_ini_file, 'w') as f:
+ f.write(self._nec_ini)
+ if 'config_files' in test_config.keys():
+ for c in test_config['config_files']:
+ if c.rfind("/nec.ini") > -1:
+ test_config['config_files'].remove(c)
+ test_config['config_files'].append(self.nec_ini_file)
+ else:
+ test_config['config_files'] = [self.nec_ini_file]
+
+ def _clean_nec_ini(self):
+ test_config['config_files'].remove(self.nec_ini_file)
+ os.remove(self.nec_ini_file)
+ self.nec_ini_file = None
+
+ def rpcapi_update_ports(self, agent_id='nec-q-agent.fake',
+ datapath_id="0xabc", added=[], removed=[]):
+ kwargs = {'topic': topics.AGENT,
+ 'agent_id': agent_id,
+ 'datapath_id': datapath_id,
+ 'port_added': added, 'port_removed': removed}
+ self.callback_nec.update_ports(self.context, **kwargs)
def setUp(self):
self.addCleanup(mock.patch.stopall)
- ofc_manager_cls = mock.patch(OFC_MANAGER).start()
- ofc_driver = ofc_manager_cls.return_value.driver
- ofc_driver.filter_supported.return_value = self.PACKET_FILTER_ENABLE
+
+ self._set_nec_ini()
super(NecPluginV2TestCase, self).setUp(self._plugin_name)
- self.context = q_context.get_admin_context()
+ # NOTE: `test_config' is global, and most tests don't set
+ # test_config['config_files'] but read this in setUp().
+ # So clean test_config['config_files'] ASAP, to avoid side effects
+ # on other tests which are running at the same time.
+ self._clean_nec_ini()
+
self.plugin = manager.NeutronManager.get_plugin()
+ self.plugin.ofc = fake_ofc_manager.patch_ofc_manager()
+ self.ofc = self.plugin.ofc
+ self.callback_nec = nec_plugin.NECPluginV2RPCCallbacks(self.plugin)
+ self.context = context.get_admin_context()
class TestNecBasicGet(test_plugin.TestBasicGet, NecPluginV2TestCase):
class TestNecPortsV2Callback(NecPluginV2TestCase):
- def setUp(self):
- super(TestNecPortsV2Callback, self).setUp()
- self.callbacks = nec_plugin.NECPluginV2RPCCallbacks(self.plugin)
-
- self.ofc = self.plugin.ofc
- self.ofc_port_exists = False
- self._setup_side_effects()
-
- def _setup_side_effects(self):
- def _create_ofc_port_called(*args, **kwargs):
- self.ofc_port_exists = True
-
- def _delete_ofc_port_called(*args, **kwargs):
- self.ofc_port_exists = False
-
- def _exists_ofc_port_called(*args, **kwargs):
- return self.ofc_port_exists
-
- self.ofc.create_ofc_port.side_effect = _create_ofc_port_called
- self.ofc.delete_ofc_port.side_effect = _delete_ofc_port_called
- self.ofc.exists_ofc_port.side_effect = _exists_ofc_port_called
-
- def _rpcapi_update_ports(self, agent_id='nec-q-agent.fake',
- datapath_id="0xabc", added=[], removed=[]):
- kwargs = {'topic': topics.AGENT,
- 'agent_id': agent_id,
- 'datapath_id': datapath_id,
- 'port_added': added, 'port_removed': removed}
- self.callbacks.update_ports(self.context, **kwargs)
-
def _get_portinfo(self, port_id):
return ndb.get_portinfo(self.context.session, port_id)
self.assertIsNone(self._get_portinfo(port_id))
portinfo = {'id': port_id, 'port_no': 123}
- self._rpcapi_update_ports(added=[portinfo])
+ self.rpcapi_update_ports(added=[portinfo])
sport = self.plugin.get_port(self.context, port_id)
self.assertEqual(sport['status'], 'ACTIVE')
self.assertEqual(self.ofc.create_ofc_port.call_count, 0)
self.assertIsNone(self._get_portinfo(port_id))
- self._rpcapi_update_ports(added=[portinfo])
+ self.rpcapi_update_ports(added=[portinfo])
self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
self.assertEqual(self.ofc.delete_ofc_port.call_count, 0)
self.assertIsNotNone(self._get_portinfo(port_id))
# Before port-deletion, switch port removed message is sent.
if portinfo_delete_first:
- self._rpcapi_update_ports(removed=[port_id])
+ self.rpcapi_update_ports(removed=[port_id])
self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
self.assertIsNone(self._get_portinfo(port_id))
if not portinfo_delete_first:
self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
self.assertIsNotNone(self._get_portinfo(port_id))
- self._rpcapi_update_ports(removed=[port_id])
+ self.rpcapi_update_ports(removed=[port_id])
# Ensure port deletion is called once.
self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
def test_portinfo_added_unknown_port(self):
portinfo = {'id': 'dummy-p1', 'port_no': 123}
- self._rpcapi_update_ports(added=[portinfo])
+ self.rpcapi_update_ports(added=[portinfo])
self.assertIsNotNone(ndb.get_portinfo(self.context.session,
'dummy-p1'))
self.assertEqual(self.ofc.exists_ofc_port.call_count, 0)
self.assertEqual(self.ofc.create_ofc_port.call_count, 0)
portinfo = {'id': port_id, 'port_no': 123}
- self._rpcapi_update_ports(added=[portinfo])
+ self.rpcapi_update_ports(added=[portinfo])
self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
self.assertEqual(self.ofc.delete_ofc_port.call_count, 0)
self.assertEqual(ndb.get_portinfo(self.context.session,
if portinfo_change_first:
portinfo = {'id': port_id, 'port_no': 456}
- self._rpcapi_update_ports(added=[portinfo])
+ self.rpcapi_update_ports(added=[portinfo])
# OFC port is recreated.
self.assertEqual(self.ofc.create_ofc_port.call_count, 2)
self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
portinfo = {'id': port_id, 'port_no': 456}
- self._rpcapi_update_ports(added=[portinfo])
+ self.rpcapi_update_ports(added=[portinfo])
# No OFC operations are expected.
self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
self.assertEqual(sport['status'], 'DOWN')
portinfo_a = {'id': port_id, 'port_no': port_no_a}
- self._rpcapi_update_ports(agent_id=agent_id_a,
- datapath_id=datapath_id_a,
- added=[portinfo_a])
+ self.rpcapi_update_ports(agent_id=agent_id_a,
+ datapath_id=datapath_id_a,
+ added=[portinfo_a])
portinfo_b = {'id': port_id, 'port_no': port_no_b}
- self._rpcapi_update_ports(agent_id=agent_id_b,
- datapath_id=datapath_id_b,
- added=[portinfo_b])
+ self.rpcapi_update_ports(agent_id=agent_id_b,
+ datapath_id=datapath_id_b,
+ added=[portinfo_b])
- self._rpcapi_update_ports(agent_id=agent_id_a,
- datapath_id=datapath_id_a,
- removed=[port_id])
+ self.rpcapi_update_ports(agent_id=agent_id_a,
+ datapath_id=datapath_id_a,
+ removed=[port_id])
sport = self.plugin.get_port(self.context, port_id)
self.assertEqual(sport['status'], 'ACTIVE')
- self.assertTrue(self.ofc_port_exists)
+ self.assertTrue(self.ofc.ofc_ports[port_id])
expected = [
mock.call.exists_ofc_port(mock.ANY, port_id),
return res.status_int
def test_create_network(self):
- self.ofc.exists_ofc_tenant.return_value = False
net = None
ctx = mock.ANY
with self.network() as network:
self.ofc.assert_has_calls(expected)
def test_create_network_with_admin_state_down(self):
- self.ofc.exists_ofc_tenant.return_value = False
net = None
ctx = mock.ANY
with self.network(admin_state_up=False) as network:
self.ofc.assert_has_calls(expected)
def test_create_two_network(self):
- self.ofc.exists_ofc_tenant.side_effect = [False, True]
nets = []
ctx = mock.ANY
with self.network() as net1:
self.ofc.assert_has_calls(expected)
def test_create_network_fail(self):
- self.ofc.exists_ofc_tenant.return_value = False
self.ofc.create_ofc_network.side_effect = nexc.OFCException(
reason='hoge')
net = None
ctx = mock.ANY
- with self.network() as network:
+ # NOTE: We don't delete network through api, but db will be cleaned in
+ # tearDown(). When OFCManager has failed to create a network on OFC,
+ # it does not keeps ofc_network entry and will fail to delete this
+ # network from OFC. Deletion of network is not the scope of this test.
+ with self.network(do_delete=False) as network:
net = network['network']
+ net_ref = self._show('networks', net['id'])
+ self.assertEqual(net_ref['network']['status'], 'ERROR')
expected = [
mock.call.exists_ofc_tenant(ctx, self._tenant_id),
mock.call.create_ofc_tenant(ctx, self._tenant_id),
mock.call.create_ofc_network(ctx, self._tenant_id, net['id'],
- net['name']),
- mock.call.delete_ofc_network(ctx, net['id'], mock.ANY),
- mock.call.delete_ofc_tenant(ctx, self._tenant_id)
+ net['name'])
]
self.ofc.assert_has_calls(expected)
def test_update_network(self):
- self.ofc.exists_ofc_tenant.return_value = False
-
net = None
ctx = mock.ANY
with self.network() as network:
]
self.ofc.assert_has_calls(expected)
- def _rpcapi_update_ports(self, agent_id='nec-q-agent.fake',
- datapath_id="0xabc", added=[], removed=[]):
- kwargs = {'topic': topics.AGENT,
- 'agent_id': agent_id,
- 'datapath_id': datapath_id,
- 'port_added': added, 'port_removed': removed}
- self.plugin.callback_nec.update_ports(self.context, **kwargs)
-
def test_create_port_no_ofc_creation(self):
- self.ofc.exists_ofc_tenant.return_value = False
- self.ofc.exists_ofc_port.return_value = False
-
net = None
p1 = None
ctx = mock.ANY
self.ofc.assert_has_calls(expected)
def test_create_port_with_ofc_creation(self):
- self.ofc.exists_ofc_tenant.return_value = False
- self.ofc.exists_ofc_port.side_effect = [False, True]
-
net = None
p1 = None
ctx = mock.ANY
# Register portinfo, then the port is created on OFC
portinfo = {'id': p1['id'], 'port_no': 123}
- self._rpcapi_update_ports(added=[portinfo])
+ self.rpcapi_update_ports(added=[portinfo])
self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
expected = [
self.ofc.assert_has_calls(expected)
def test_delete_network_with_dhcp_port(self):
- self.ofc.exists_ofc_tenant.return_value = False
- self.ofc.exists_ofc_port.side_effect = [False, True]
-
ctx = mock.ANY
with self.network() as network:
with self.subnet(network=network):
'device_id': 'dhcp-port1'})
# Make sure that the port is created on OFC.
portinfo = {'id': p['id'], 'port_no': 123}
- self._rpcapi_update_ports(added=[portinfo])
+ self.rpcapi_update_ports(added=[portinfo])
# In a case of dhcp port, the port is deleted automatically
# when delete_network.
self._test_update_port_with_admin_state(resource='network')
def _test_update_port_with_admin_state(self, resource='port'):
- self.ofc.exists_ofc_tenant.return_value = False
- self.ofc.exists_ofc_port.side_effect = [False, True, False]
-
net = None
p1 = None
ctx = mock.ANY
# Register portinfo, then the port is created on OFC
portinfo = {'id': p1['id'], 'port_no': 123}
- self._rpcapi_update_ports(added=[portinfo])
+ self.rpcapi_update_ports(added=[portinfo])
self.assertFalse(self.ofc.create_ofc_port.call_count)
self._update_resource(resource, res_id,