import random
import StringIO
import time
- import unittest
import eventlet
+ import testtools
import webob.exc
import quantum.api.networks
submitted bug fix does have a unit test, be sure to add a new one that fails
without the patch and passes with the patch.
+All unittest classes must ultimately inherit from testtools.TestCase.
+All setUp and tearDown methods must upcall using the super() method.
+tearDown methods should be avoided and addCleanup calls should be preferred.
+Never manually create tempfiles. Always use the tempfile fixtures from
+the fixture library to ensure that they are cleaned up.
+
openstack-common
----------------
# The code below enables nosetests to work with i18n _() blocks
import __builtin__
-import unittest
-
-
setattr(__builtin__, '_', lambda x: x)
-
-
-class BaseTest(unittest.TestCase):
-
- def setUp(self):
- pass
-
-
-def setUp():
- pass
import logging
import os.path
-import unittest
import routes
+import testtools
import webob
from webtest import TestApp
self._delete_port(net_id, port_id)
self._delete_network(net_id)
- def tearDown(self):
-
- """ Tear down """
-
- db.clear_db()
-
-class QosExtensionTest(unittest.TestCase):
+class QosExtensionTest(testtools.TestCase):
def setUp(self):
""" Set up function """
+ super(QosExtensionTest, self).setUp()
parent_resource = dict(member_name="tenant",
collection_name="extensions/csco/tenants")
controller = qos.QosController(QuantumManager.get_plugin())
db.clear_db()
-class CredentialExtensionTest(unittest.TestCase):
+class CredentialExtensionTest(testtools.TestCase):
def setUp(self):
""" Set up function """
+ super(CredentialExtensionTest, self).setUp()
parent_resource = dict(member_name="tenant",
collection_name="extensions/csco/tenants")
controller = credential.CredentialController(QuantumManager.
"""
import logging as LOG
-import unittest
+
+import testtools
from quantum.openstack.common import log as logging
from quantum.plugins.cisco.common import cisco_constants as const
raise Exception("Failed to unplug interface: %s" % str(exc))
-class NexusDBTest(unittest.TestCase):
+class NexusDBTest(testtools.TestCase):
"""Class conisting of nexus DB unit tests"""
def setUp(self):
+ super(NexusDBTest, self).setUp()
"""Setup for nexus db tests"""
l2network_db.initialize()
+ self.addCleanup(db.clear_db)
self.dbtest = NexusDB()
LOG.debug("Setup")
- def tearDown(self):
- """Tear Down"""
- db.clear_db()
-
def testa_create_nexusportbinding(self):
"""create nexus port binding"""
binding1 = self.dbtest.create_nexusportbinding("port1", 10)
self.dbtest.delete_nexusportbinding(vlan_id)
-class L2networkDBTest(unittest.TestCase):
+class L2networkDBTest(testtools.TestCase):
"""Class conisting of L2network DB unit tests"""
def setUp(self):
"""Setup for tests"""
+ super(L2networkDBTest, self).setUp()
l2network_db.initialize()
self.dbtest = L2networkDB()
self.quantum = QuantumDB()
+ self.addCleanup(db.clear_db)
LOG.debug("Setup")
- def tearDown(self):
- """Tear Down"""
- db.clear_db()
-
def testa_create_vlanbinding(self):
"""test add vlan binding"""
net1 = self.quantum.create_network("t1", "netid1")
self.dbtest.delete_vlan_binding(netid)
-class QuantumDBTest(unittest.TestCase):
+class QuantumDBTest(testtools.TestCase):
"""Class conisting of Quantum DB unit tests"""
def setUp(self):
"""Setup for tests"""
+ super(QuantumDBTest, self).setUp()
l2network_db.initialize()
+ self.addCleanup(db.clear_db)
self.dbtest = QuantumDB()
self.tenant_id = "t1"
LOG.debug("Setup")
- def tearDown(self):
- """Tear Down"""
- db.clear_db()
-
def testa_create_network(self):
"""test to create network"""
net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
# under the License.
import __builtin__
-import unittest
-
-
setattr(__builtin__, '_', lambda x: x)
-
-
-class BaseTest(unittest.TestCase):
-
- def setUp(self):
- pass
-
-
-def setUp():
- pass
# under the License.
import __builtin__
-import unittest
-
-
setattr(__builtin__, '_', lambda x: x)
-
-
-class BaseTest(unittest.TestCase):
-
- def setUp(self):
- pass
-
-
-def setUp():
- pass
class APIv2TestCase(test_api_v2.APIv2TestCase):
def setUp(self):
+ super(APIv2TestCase, self).setUp()
plugin = 'quantum.plugins.cisco.network_plugin.PluginV2'
# Ensure 'stale' patched copies of the plugin are never returned
QuantumManager._instance = None
import __builtin__
import os
-import unittest
+
+import testtools
setattr(__builtin__, '_', lambda x: x)
# An empty lock path forces lockutils.synchronized to use a temporary
# location for lock files that will be cleaned up automatically.
cfg.CONF.lock_path = ''
-
-
-class BaseTest(unittest.TestCase):
-
- def setUp(self):
- pass
-
-
-def setUp():
- pass
# under the License.
import os
-import unittest
+
+import testtools
from quantum.agent.linux import utils
from quantum.openstack.common import log as logging
LOG = logging.getLogger(__name__)
-class RootwrapTestExec(unittest.TestCase):
+class RootwrapTestExec(testtools.TestCase):
"""Simple unit test to test the basic rootwrap mechanism
Essentially hello-world. Just run a command as root and check that
"""
def setUp(self):
+ super(RootwrapTestExec, self).setUp()
self.cwd = os.getcwd() + "/../../.."
# stuff a stupid bash script into /tmp, so that the next
# method can execute it.
def tearDown(self):
os.remove(self.test_file)
os.remove(self.conf_file)
+ super(RootwrapTestExec, self).tearDown()
self.httpPatch = patch('httplib.HTTPConnection', create=True,
new=HTTPConnectionMock)
+ self.addCleanup(self.httpPatch.stop)
MockHTTPConnection = self.httpPatch.start()
super(BigSwitchProxyPluginV2TestCase,
self).setUp(self._plugin_name)
- def tearDown(self):
- super(BigSwitchProxyPluginV2TestCase, self).tearDown()
- self.httpPatch.stop()
-
class TestBigSwitchProxyBasicGet(test_plugin.TestBasicGet,
BigSwitchProxyPluginV2TestCase):
"""
Test vlans alloc/dealloc.
"""
-import unittest2 as unittest
+import testtools
from quantum.db import api as db
from quantum.openstack.common import context
from quantum.plugins.brocade import vlanbm as vlan_bitmap
-class TestVlanBitmap(unittest.TestCase):
+class TestVlanBitmap(testtools.TestCase):
"""exercise Vlan bitmap ."""
def setUp(self):
+ super(TestVlanBitmap, self).setUp()
db.configure_db()
+ self.addCleanup(db.clear_db)
self.context = context.get_admin_context()
self.context.session = db.get_session()
self.vbm_.release_vlan(4)
vlan_id = self.vbm_.get_next_vlan(None)
self.assertEqual(vlan_id, 4)
-
- def tearDown(self):
- db.clear_db()
# limitations under the License.
import mock
-import unittest
+import testtools
from quantum.db import api as db
from quantum.openstack.common import importutils
'fake_nexus_driver.CiscoNEXUSFakeDriver')
-class TestCiscoNexusPlugin(unittest.TestCase):
+class TestCiscoNexusPlugin(testtools.TestCase):
def setUp(self):
"""
Set up function
"""
+ super(TestCiscoNexusPlugin, self).setUp()
self.tenant_id = "test_tenant_cisco1"
self.net_name = "test_network_cisco1"
self.net_id = 000007
)
self.assertEqual(expected_instance_id, INSTANCE)
-
- def tearDown(self):
- """Clear the test environment"""
- pass
- # Remove database contents
- #db.clear_db(network_models_v2.model_base.BASEV2)
import mock
from oslo.config import cfg
-import unittest2 as unittest
+import testtools
from quantum.plugins.hyperv.agent import hyperv_quantum_agent
-class TestHyperVQuantumAgent(unittest.TestCase):
+class TestHyperVQuantumAgent(testtools.TestCase):
def setUp(self):
+ super(TestHyperVQuantumAgent, self).setUp()
self.addCleanup(cfg.CONF.reset)
# Avoid rpc initialization for unit tests
cfg.CONF.set_override('rpc_backend',
self.agent.agent_id = mock.Mock()
self.agent._utils = mock.Mock()
- def tearDown(self):
- cfg.CONF.reset()
-
def test_port_bound(self):
port = mock.Mock()
net_uuid = 'my-net-uuid'
"""
import mock
-import unittest2
+import testtools
from quantum.agent import rpc as agent_rpc
from quantum.common import topics
from quantum.plugins.hyperv.common import constants
-class rpcHyperVApiTestCase(unittest2.TestCase):
+class rpcHyperVApiTestCase(testtools.TestCase):
def _test_hyperv_quantum_api(
self, rpcapi, topic, method, rpc_method, **kwargs):
# limitations under the License.
from oslo.config import cfg
-import unittest2 as unittest
+import testtools
#NOTE this import loads tests required options
from quantum.plugins.linuxbridge.common import config
-class ConfigurationTest(unittest.TestCase):
+class ConfigurationTest(testtools.TestCase):
def test_defaults(self):
self.assertEqual(-1,
# See the License for the specific language governing permissions and
# limitations under the License.
-import unittest2
+import testtools
+from testtools import matchers
from quantum.common import exceptions as q_exc
from quantum.db import api as db
PHYS_NET_2: [(VLAN_MIN + 20, VLAN_MAX + 20)]}
-class NetworkStatesTest(unittest2.TestCase):
+class NetworkStatesTest(testtools.TestCase):
def setUp(self):
+ super(NetworkStatesTest, self).setUp()
lb_db.initialize()
lb_db.sync_network_states(VLAN_RANGES)
self.session = db.get_session()
-
- def tearDown(self):
- db.clear_db()
+ self.addCleanup(db.clear_db)
def test_sync_network_states(self):
self.assertIsNone(lb_db.get_network_state(PHYS_NET,
for x in xrange(VLAN_MIN, VLAN_MAX + 1):
physical_network, vlan_id = lb_db.reserve_network(self.session)
self.assertEqual(physical_network, PHYS_NET)
- self.assertGreaterEqual(vlan_id, VLAN_MIN)
- self.assertLessEqual(vlan_id, VLAN_MAX)
+ self.assertThat(vlan_id, matchers.GreaterThan(VLAN_MIN - 1))
+ self.assertThat(vlan_id, matchers.LessThan(VLAN_MAX + 1))
vlan_ids.add(vlan_id)
- with self.assertRaises(q_exc.NoNetworkAvailable):
+ with testtools.ExpectedException(q_exc.NoNetworkAvailable):
physical_network, vlan_id = lb_db.reserve_network(self.session)
for vlan_id in vlan_ids:
self.assertTrue(lb_db.get_network_state(PHYS_NET,
vlan_id).allocated)
- with self.assertRaises(q_exc.VlanIdInUse):
+ with testtools.ExpectedException(q_exc.VlanIdInUse):
lb_db.reserve_specific_network(self.session, PHYS_NET, vlan_id)
lb_db.release_network(self.session, PHYS_NET, vlan_id, VLAN_RANGES)
self.assertTrue(lb_db.get_network_state(PHYS_NET,
vlan_id).allocated)
- with self.assertRaises(q_exc.VlanIdInUse):
+ with testtools.ExpectedException(q_exc.VlanIdInUse):
lb_db.reserve_specific_network(self.session, PHYS_NET, vlan_id)
lb_db.release_network(self.session, PHYS_NET, vlan_id, VLAN_RANGES)
import mock
from oslo.config import cfg
-import unittest2 as unittest
+import testtools
from quantum.plugins.linuxbridge.agent import linuxbridge_quantum_agent
from quantum.plugins.linuxbridge.common import constants as lconst
-class TestLinuxBridge(unittest.TestCase):
+class TestLinuxBridge(testtools.TestCase):
def setUp(self):
+ super(TestLinuxBridge, self).setUp()
self.addCleanup(cfg.CONF.reset)
interface_mappings = {'physnet1': 'eth1'}
root_helper = cfg.CONF.AGENT.root_helper
_plugin_name = PLUGIN_NAME
def setUp(self, plugin=None):
- self.addCleanup(mock.patch.stopall)
notifier_p = mock.patch(NOTIFIER)
notifier_cls = notifier_p.start()
self.notifier = mock.Mock()
RESOURCE_ATTRIBUTE_MAP[item].
copy())
super(LinuxBridgeSecurityGroupsTestCase, self).setUp(PLUGIN_NAME)
+ self.addCleanup(mock.patch.stopall)
def tearDown(self):
- super(LinuxBridgeSecurityGroupsTestCase, self).tearDown()
attributes.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk_
+ super(LinuxBridgeSecurityGroupsTestCase, self).tearDown()
class TestLinuxBridgeSecurityGroups(LinuxBridgeSecurityGroupsTestCase,
"""
import stubout
-import unittest2
+import testtools
from quantum.agent import rpc as agent_rpc
from quantum.common import topics
from quantum.plugins.linuxbridge import lb_quantum_plugin as plb
-class rpcApiTestCase(unittest2.TestCase):
+class rpcApiTestCase(testtools.TestCase):
def _test_lb_api(self, rpcapi, topic, method, rpc_method, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
import mox
from oslo.config import cfg
import stubout
-import unittest2 as unittest
+import testtools
from quantum import context
from quantum.db import api as db
cfg.CONF.set_override('max_dns_nameservers', 10)
-class MetaQuantumPluginV2Test(unittest.TestCase):
+class MetaQuantumPluginV2Test(testtools.TestCase):
"""Class conisting of MetaQuantumPluginV2 unit tests"""
def setUp(self):
self.plugin.delete_router(self.context, router_ret1['id'])
self.plugin.delete_router(self.context, router_ret2['id'])
- with self.assertRaises(FlavorNotFound):
+ with testtools.ExpectedException(FlavorNotFound):
self.plugin.get_router(self.context, router_ret1['id'])
def test_extension_method(self):
self.stubs.SmartUnsetAll()
self.mox.VerifyAll()
db.clear_db()
+ super(MetaQuantumPluginV2Test, self).tearDown()
# @author: Ryu Ishimoto, Midokura Japan KK
# @author: Tomoe Sugihara, Midokura Japan KK
-import unittest2 as unittest
+import testtools
import uuid
import mock
from quantum.plugins.midonet import midonet_lib
-class MidonetLibTestCase(unittest.TestCase):
+class MidonetLibTestCase(testtools.TestCase):
def setUp(self):
+ super(MidonetLibTestCase, self).setUp()
self.mock_api = mock.Mock()
- def tearDown(self):
- self.mock_api = None
-
def _create_mock_chains(self, sg_id, sg_name):
mock_in_chain = mock.Mock()
mock_in_chain.get_name.return_value = "OS_SG_%s_%s_IN" % (sg_id,
super(MidonetChainManagerTestCase, self).setUp()
self.mgr = midonet_lib.ChainManager(self.mock_api)
- def tearDown(self):
- self.mgr = None
- super(MidonetChainManagerTestCase, self).tearDown()
-
def test_create_for_sg(self):
tenant_id = 'test_tenant'
sg_id = str(uuid.uuid4())
super(MidonetPortGroupManagerTestCase, self).setUp()
self.mgr = midonet_lib.PortGroupManager(self.mock_api)
- def tearDown(self):
- self.mgr = None
- super(MidonetPortGroupManagerTestCase, self).tearDown()
-
def test_create(self):
tenant_id = 'test_tenant'
sg_id = str(uuid.uuid4())
self.mgr.chain_manager = mock.Mock()
self.mgr.pg_manager = mock.Mock()
- def tearDown(self):
- self.mgr = None
- super(MidonetRuleManagerTestCase, self).tearDown()
-
def _create_test_rule(self, tenant_id, sg_id, rule_id, direction="egress",
protocol="tcp", port_min=1, port_max=65535,
src_ip='192.168.1.0/24', src_group_id=None,
# under the License.
# @author: Ryota MIBU
-import unittest
+import testtools
from quantum.plugins.nec.common import config
-class ConfigurationTest(unittest.TestCase):
+class ConfigurationTest(testtools.TestCase):
def test_defaults(self):
self.assertEqual(-1, config.CONF.DATABASE.sql_max_retries)
# @author: Ryota MIBU
import random
-import unittest
+import testtools
from quantum.db import api as db_api
from quantum.openstack.common import uuidutils
from quantum.plugins.nec.db import models as nmodels
-class NECPluginV2DBTestBase(object):
+class NECPluginV2DBTestBase(testtools.TestCase):
"""Class conisting of NECPluginV2 DB unit tests"""
def setUp(self):
"""Setup for tests"""
+ super(NECPluginV2DBTestBase, self).setUp()
ndb.initialize()
self.session = db_api.get_session()
-
- def tearDown(self):
- """Tear Down"""
- ndb.clear_db()
+ self.addCleanup(ndb.clear_db)
def get_ofc_item_random_params(self):
"""create random parameters for ofc_item test"""
return port_id, datapath_id, port_no, vlan_id, mac, none
-class NECPluginV2DBTest(NECPluginV2DBTestBase,
- unittest.TestCase):
+class NECPluginV2DBTest(NECPluginV2DBTestBase):
def testa_add_ofc_item(self):
"""test add OFC item"""
self.assertEqual(None, portinfo_none)
-class NECPluginV2DBOldMappingTest(NECPluginV2DBTestBase,
- unittest.TestCase):
+class NECPluginV2DBOldMappingTest(NECPluginV2DBTestBase):
"""Test related to old ID mapping"""
# Mapping Table mode
# under the License.
# @author: Ryota MIBU
-import unittest
+import testtools
from quantum import context
from quantum.openstack.common import uuidutils
from quantum.plugins.nec import ofc_manager
-class OFCManagerTestBase(object):
+class OFCManagerTestBase(testtools.TestCase):
"""Class conisting of OFCManager unit tests"""
def setUp(self):
+ super(OFCManagerTestBase, self).setUp()
driver = "quantum.tests.unit.nec.stub_ofc_driver.StubOFCDriver"
config.CONF.set_override('driver', driver, 'OFC')
ndb.initialize()
+ self.addCleanup(ndb.clear_db)
self.ofc = ofc_manager.OFCManager()
self.ctx = context.get_admin_context()
- def tearDown(self):
- ndb.clear_db()
-
def get_random_params(self):
"""create random parameters for portinfo test"""
tenant = uuidutils.generate_uuid()
return tenant, network, port, _filter, none
-class OFCManagerTest(OFCManagerTestBase, unittest.TestCase):
+class OFCManagerTest(OFCManagerTestBase):
def testa_create_ofc_tenant(self):
"""test create ofc_tenant"""
t, n, p, f, none = self.get_random_params()
'ofc_packet_filter', f))
-class OFCManagerTestWithOldMapping(OFCManagerTestBase, unittest.TestCase):
+class OFCManagerTestWithOldMapping(OFCManagerTestBase):
def test_exists_ofc_tenant(self):
t, n, p, f, none = self.get_random_params()
import string
import mox
-import unittest
+import testtools
from quantum import context
from quantum.openstack.common import uuidutils
return "ofc-%s" % id
-class PFCDriverTestBase():
+class PFCDriverTestBase(testtools.TestCase):
driver = 'quantum.plugins.nec.drivers.pfc.PFCDriverBase'
def setUp(self):
+ super(PFCDriverTestBase, self).setUp()
self.mox = mox.Mox()
self.driver = drivers.get_driver(self.driver)(TestConfig)
self.mox.StubOutWithMock(ofc.OFCClient, 'do_request')
-
- def tearDown(self):
- self.mox.UnsetStubs()
+ self.addCleanup(self.mox.UnsetStubs)
def get_ofc_item_random_params(self):
"""create random parameters for ofc_item test"""
self.mox.VerifyAll()
-class PFCDriverBaseTest(PFCDriverTestBase, unittest.TestCase):
+class PFCDriverBaseTest(PFCDriverTestBase):
pass
-class PFCV3DriverTest(PFCDriverTestBase, unittest.TestCase):
+class PFCV3DriverTest(PFCDriverTestBase):
driver = 'pfc_v3'
def testa_create_tenant(self):
pass
-class PFCV4DriverTest(PFCDriverTestBase, unittest.TestCase):
+class PFCV4DriverTest(PFCDriverTestBase):
driver = 'pfc_v4'
-class PFCDriverStringTest(unittest.TestCase):
+class PFCDriverStringTest(testtools.TestCase):
driver = 'quantum.plugins.nec.drivers.pfc.PFCDriverBase'
def setUp(self):
+ super(PFCDriverStringTest, self).setUp()
self.driver = drivers.get_driver(self.driver)(TestConfig)
- def tearDown(self):
- pass
-
def test_generate_pfc_id_uuid(self):
id_str = uuidutils.generate_uuid()
exp_str = (id_str[:14] + id_str[15:]).replace('-', '')[:31]
self.assertEqual(exp_str, ret_str)
-class PFCIdConvertTest(unittest.TestCase):
+class PFCIdConvertTest(testtools.TestCase):
driver = 'quantum.plugins.nec.drivers.pfc.PFCDriverBase'
def setUp(self):
+ super(PFCIdConvertTest, self).setUp()
self.mox = mox.Mox()
self.driver = drivers.get_driver(self.driver)(TestConfig)
self.ctx = self.mox.CreateMock(context.Context)
self.ctx.session = "session"
self.mox.StubOutWithMock(ndb, 'get_ofc_id_lookup_both')
-
- def tearDown(self):
- self.mox.UnsetStubs()
+ self.addCleanup(self.mox.UnsetStubs)
def generate_random_ids(self, count=1):
if count == 1:
# @author: Ryota MIBU
import mox
-import unittest
+import testtools
from quantum import context
from quantum.openstack.common import uuidutils
port = 8888
-class TremaDriverTestBase():
+class TremaDriverTestBase(testtools.TestCase):
driver_name = "trema"
def setUp(self):
+ super(TremaDriverTestBase, self).setUp()
self.mox = mox.Mox()
self.driver = drivers.get_driver(self.driver_name)(TestConfig)
self.mox.StubOutWithMock(ofc_client.OFCClient, 'do_request')
-
- def tearDown(self):
- self.mox.UnsetStubs()
+ self.addCleanup(self.mox.UnsetStubs)
def get_ofc_item_random_params(self):
"""create random parameters for ofc_item test"""
self.mox.VerifyAll()
-class TremaPortBaseDriverTest(TremaDriverNetworkTestBase, unittest.TestCase):
+class TremaPortBaseDriverTest(TremaDriverNetworkTestBase):
driver_name = "trema_port"
self.mox.VerifyAll()
-class TremaPortMACBaseDriverTest(TremaDriverNetworkTestBase,
- unittest.TestCase):
+class TremaPortMACBaseDriverTest(TremaDriverNetworkTestBase):
driver_name = "trema_portmac"
self.mox.VerifyAll()
-class TremaMACBaseDriverTest(TremaDriverNetworkTestBase, unittest.TestCase):
+class TremaMACBaseDriverTest(TremaDriverNetworkTestBase):
driver_name = "trema_mac"
self.mox.VerifyAll()
-class TremaFilterDriverTest(TremaDriverTestBase, unittest.TestCase):
+class TremaFilterDriverTest(TremaDriverTestBase):
def get_ofc_item_random_params(self):
"""create random parameters for ofc_item test"""
return [uuidutils.generate_uuid() for i in xrange(count)]
-class TremaIdConvertTest(unittest.TestCase):
+class TremaIdConvertTest(testtools.TestCase):
driver_name = 'trema'
def setUp(self):
+ super(TremaIdConvertTest, self).setUp()
self.driver = drivers.get_driver(self.driver_name)(TestConfig)
self.mox = mox.Mox()
self.ctx = self.mox.CreateMock(context.Context)
-
- def tearDown(self):
- self.mox.UnsetStubs()
+ self.addCleanup(self.mox.UnsetStubs)
def test_convert_tenant_id(self):
ofc_t_id = generate_random_ids(1)
self.assertEqual(ret, ofc_f_id)
-class TremaIdConvertTestBase(object):
+class TremaIdConvertTestBase(testtools.TestCase):
def setUp(self):
+ super(TremaIdConvertTestBase, self).setUp()
self.mox = mox.Mox()
self.driver = drivers.get_driver(self.driver_name)(TestConfig)
self.ctx = self.mox.CreateMock(context.Context)
self.ctx.session = "session"
self.mox.StubOutWithMock(ndb, 'get_ofc_id_lookup_both')
-
- def tearDown(self):
- self.mox.UnsetStubs()
+ self.addCleanup(self.mox.UnsetStubs)
def _test_convert_port_id(self, port_path_template):
t_id, n_id = generate_random_ids(2)
self.assertEqual(ret, ofc_p_id)
-class TremaIdConvertPortBaseTest(TremaIdConvertTestBase, unittest.TestCase):
+class TremaIdConvertPortBaseTest(TremaIdConvertTestBase):
driver_name = "trema_port"
def test_convert_port_id(self):
'/networs/%(network)s/ports/%(port)s')
-class TremaIdConvertPortMACBaseTest(TremaIdConvertTestBase, unittest.TestCase):
+class TremaIdConvertPortMACBaseTest(TremaIdConvertTestBase):
driver_name = "trema_portmac"
def test_convert_port_id(self):
'/networs/%(network)s/ports/dummy-%(port)s/attachments/%(port)s')
-class TremaIdConvertMACBaseTest(TremaIdConvertTestBase, unittest.TestCase):
+class TremaIdConvertMACBaseTest(TremaIdConvertTestBase):
driver_name = "trema_mac"
def test_convert_port_id(self):
# under the License.
#
-import unittest2 as unittest
+import testtools
from oslo.config import cfg
from quantum.plugins.nicira.nicira_nvp_plugin.common import config
-class ConfigurationTest(unittest.TestCase):
+class ConfigurationTest(testtools.TestCase):
def test_defaults(self):
self.assertEqual(-1, cfg.CONF.DATABASE.sql_max_retries)
import contextlib
import mock
-import unittest2 as unittest
+import testtools
from webob import exc
import webtest
return []
-class NetworkGatewayExtensionTestCase(unittest.TestCase):
+class NetworkGatewayExtensionTestCase(testtools.TestCase):
def setUp(self):
+ super(NetworkGatewayExtensionTestCase, self).setUp()
plugin = '%s.%s' % (networkgw.__name__,
networkgw.NetworkGatewayPluginBase.__name__)
self._resource = networkgw.RESOURCE_NAME.replace('-', '_')
# Update the plugin and extensions path
cfg.CONF.set_override('core_plugin', plugin)
+ self.addCleanup(cfg.CONF.reset)
- self._plugin_patcher = mock.patch(plugin, autospec=True)
- self.plugin = self._plugin_patcher.start()
+ _plugin_patcher = mock.patch(plugin, autospec=True)
+ self.plugin = _plugin_patcher.start()
+ self.addCleanup(_plugin_patcher.stop)
# Instantiate mock plugin and enable extensions
manager.QuantumManager.get_plugin().supported_extension_aliases = (
self.ext_mdw = test_extensions.setup_extensions_middleware(ext_mgr)
self.api = webtest.TestApp(self.ext_mdw)
- def tearDown(self):
- self._plugin_patcher.stop()
- self.api = None
- self.plugin = None
- cfg.CONF.reset()
-
def test_network_gateway_create(self):
nw_gw_id = _uuid()
data = {self._resource: {'name': 'nw-gw',
import mock
import netaddr
from oslo.config import cfg
+import testtools
import webob.exc
from quantum.common import constants
instance.return_value.request.side_effect = _fake_request
super(NiciraPluginV2TestCase, self).setUp(self._plugin_name)
cfg.CONF.set_override('enable_metadata_access_network', False, 'NVP')
-
- def tearDown(self):
- self.fc.reset_all()
- super(NiciraPluginV2TestCase, self).tearDown()
- self.mock_nvpapi.stop()
+ self.addCleanup(self.fc.reset_all)
+ self.addCleanup(self.mock_nvpapi.stop)
class TestNiciraBasicGet(test_plugin.TestBasicGet, NiciraPluginV2TestCase):
self._test_create_bridge_network(vlan_id=123)
def test_create_bridge_vlan_network_outofrange_returns_400(self):
- with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
+ with testtools.ExpectedException(
+ webob.exc.HTTPClientError) as ctx_manager:
self._test_create_bridge_network(vlan_id=5000)
- self.assertEqual(ctx_manager.exception.code, 400)
+ self.assertEqual(ctx_manager.exception.code, 400)
def test_list_networks_filter_by_id(self):
# We add this unit test to cover some logic specific to the
instance.return_value.request.side_effect = _fake_request
super(NiciraPortSecurityTestCase, self).setUp(self._plugin_name)
-
- def tearDown(self):
- super(NiciraPortSecurityTestCase, self).tearDown()
- self.mock_nvpapi.stop()
+ self.addCleanup(self.mock_nvpapi.stop)
class TestNiciraPortSecurity(psec.TestPortSecurity,
# System
import httplib
-import unittest
# Third party
+import testtools
+
# Local
import quantum.plugins.nicira.nicira_nvp_plugin.api_client.common as naco
-class NvpApiCommonTest(unittest.TestCase):
-
- def setUp(self):
- pass
-
- def tearDown(self):
- pass
+class NvpApiCommonTest(testtools.TestCase):
def test_conn_str(self):
conn = httplib.HTTPSConnection('localhost', 4242, timeout=0)
import eventlet
eventlet.monkey_patch()
import logging
-import unittest
import urllib2
+import testtools
+
logging.basicConfig(level=logging.DEBUG)
lg = logging.getLogger("test_nvp_api_request")
return urllib2.urlopen(url).read()
-class NvpApiRequestTest(unittest.TestCase):
-
- def setUp(self):
- pass
+class NvpApiRequestTest(testtools.TestCase):
- def tearDown(self):
- pass
+ pass
import logging
import new
import random
-import unittest
import eventlet
from eventlet.green import urllib2
from mock import Mock
from mock import patch
+import testtools
from quantum.plugins.nicira.nicira_nvp_plugin.api_client import (
client_eventlet as nace,
return urllib2.urlopen(url).read()
-class NvpApiRequestEventletTest(unittest.TestCase):
+class NvpApiRequestEventletTest(testtools.TestCase):
def setUp(self):
+ super(NvpApiRequestEventletTest, self).setUp()
self.client = nace.NvpApiClientEventlet(
[("127.0.0.1", 4401, True)], "admin", "admin")
self.url = "/ws.v1/_debug"
def tearDown(self):
self.client = None
self.req = None
+ super(NvpApiRequestEventletTest, self).tearDown()
def test_construct_eventlet_api_request(self):
e = nare.NvpApiRequestEventlet(self.client, self.url)
#
# @author: Salvatore Orlando, VMware
+import mock
import os
-import mock
-import unittest2 as unittest
+import testtools
from quantum.openstack.common import jsonutils as json
import quantum.plugins.nicira.nicira_nvp_plugin as nvp_plugin
_uuid = test_api_v2._uuid
-class NvplibTestCase(unittest.TestCase):
+class NvplibTestCase(testtools.TestCase):
def setUp(self):
# mock nvp api client
self.fake_cluster.retries, self.fake_cluster.redirects)
super(NvplibTestCase, self).setUp()
-
- def tearDown(self):
- self.fc.reset_all()
- self.mock_nvpapi.stop()
+ self.addCleanup(self.fc.reset_all)
+ self.addCleanup(self.mock_nvpapi.stop)
class TestNvplibNatRules(NvplibTestCase):
gw_ids.append(self._create_gw_service(_uuid(), name)['uuid'])
results = nvplib.get_l2_gw_services(self.fake_cluster)
self.assertEqual(len(results), 2)
- self.assertItemsEqual(gw_ids, [r['uuid'] for r in results])
+ self.assertEqual(sorted(gw_ids), sorted([r['uuid'] for r in results]))
def test_delete_l2_gw_service(self):
display_name = 'fake-gateway'
# See the License for the specific language governing permissions and
# limitations under the License.
-import unittest2
+import testtools
+from testtools import matchers
from quantum.common import exceptions as q_exc
from quantum.db import api as db
UPDATED_TUNNEL_RANGES = [(TUN_MIN + 5, TUN_MAX + 5)]
-class VlanAllocationsTest(unittest2.TestCase):
+class VlanAllocationsTest(testtools.TestCase):
def setUp(self):
+ super(VlanAllocationsTest, self).setUp()
ovs_db_v2.initialize()
ovs_db_v2.sync_vlan_allocations(VLAN_RANGES)
self.session = db.get_session()
-
- def tearDown(self):
- db.clear_db()
+ self.addCleanup(db.clear_db)
def test_sync_vlan_allocations(self):
self.assertIsNone(ovs_db_v2.get_vlan_allocation(PHYS_NET,
for x in xrange(VLAN_MIN, VLAN_MAX + 1):
physical_network, vlan_id = ovs_db_v2.reserve_vlan(self.session)
self.assertEqual(physical_network, PHYS_NET)
- self.assertGreaterEqual(vlan_id, VLAN_MIN)
- self.assertLessEqual(vlan_id, VLAN_MAX)
+ self.assertThat(vlan_id, matchers.GreaterThan(VLAN_MIN - 1))
+ self.assertThat(vlan_id, matchers.LessThan(VLAN_MAX + 1))
vlan_ids.add(vlan_id)
- with self.assertRaises(q_exc.NoNetworkAvailable):
+ with testtools.ExpectedException(q_exc.NoNetworkAvailable):
physical_network, vlan_id = ovs_db_v2.reserve_vlan(self.session)
ovs_db_v2.release_vlan(self.session, PHYS_NET, vlan_ids.pop(),
VLAN_RANGES)
physical_network, vlan_id = ovs_db_v2.reserve_vlan(self.session)
self.assertEqual(physical_network, PHYS_NET)
- self.assertGreaterEqual(vlan_id, VLAN_MIN)
- self.assertLessEqual(vlan_id, VLAN_MAX)
+ self.assertThat(vlan_id, matchers.GreaterThan(VLAN_MIN - 1))
+ self.assertThat(vlan_id, matchers.LessThan(VLAN_MAX + 1))
vlan_ids.add(vlan_id)
for vlan_id in vlan_ids:
self.assertTrue(ovs_db_v2.get_vlan_allocation(PHYS_NET,
vlan_id).allocated)
- with self.assertRaises(q_exc.VlanIdInUse):
+ with testtools.ExpectedException(q_exc.VlanIdInUse):
ovs_db_v2.reserve_specific_vlan(self.session, PHYS_NET, vlan_id)
ovs_db_v2.release_vlan(self.session, PHYS_NET, vlan_id, VLAN_RANGES)
self.assertTrue(ovs_db_v2.get_vlan_allocation(PHYS_NET,
vlan_id).allocated)
- with self.assertRaises(q_exc.VlanIdInUse):
+ with testtools.ExpectedException(q_exc.VlanIdInUse):
ovs_db_v2.reserve_specific_vlan(self.session, PHYS_NET, vlan_id)
ovs_db_v2.release_vlan(self.session, PHYS_NET, vlan_id, VLAN_RANGES)
for x in xrange(VLAN_MIN, VLAN_MAX + 1):
physical_network, vlan_id = ovs_db_v2.reserve_vlan(self.session)
self.assertEqual(physical_network, PHYS_NET)
- self.assertGreaterEqual(vlan_id, VLAN_MIN)
- self.assertLessEqual(vlan_id, VLAN_MAX)
+ self.assertThat(vlan_id, matchers.GreaterThan(VLAN_MIN - 1))
+ self.assertThat(vlan_id, matchers.LessThan(VLAN_MAX + 1))
vlan_ids.add(vlan_id)
ovs_db_v2.release_vlan(self.session, PHYS_NET, vlan_ids.pop(),
ovs_db_v2.sync_vlan_allocations({})
-class TunnelAllocationsTest(unittest2.TestCase):
+class TunnelAllocationsTest(testtools.TestCase):
def setUp(self):
+ super(TunnelAllocationsTest, self).setUp()
ovs_db_v2.initialize()
ovs_db_v2.sync_tunnel_allocations(TUNNEL_RANGES)
self.session = db.get_session()
-
- def tearDown(self):
- db.clear_db()
+ self.addCleanup(db.clear_db)
def test_sync_tunnel_allocations(self):
self.assertIsNone(ovs_db_v2.get_tunnel_allocation(TUN_MIN - 1))
tunnel_ids = set()
for x in xrange(TUN_MIN, TUN_MAX + 1):
tunnel_id = ovs_db_v2.reserve_tunnel(self.session)
- self.assertGreaterEqual(tunnel_id, TUN_MIN)
- self.assertLessEqual(tunnel_id, TUN_MAX)
+ self.assertThat(tunnel_id, matchers.GreaterThan(TUN_MIN - 1))
+ self.assertThat(tunnel_id, matchers.LessThan(TUN_MAX + 1))
tunnel_ids.add(tunnel_id)
- with self.assertRaises(q_exc.NoNetworkAvailable):
+ with testtools.ExpectedException(q_exc.NoNetworkAvailable):
tunnel_id = ovs_db_v2.reserve_tunnel(self.session)
ovs_db_v2.release_tunnel(self.session, tunnel_ids.pop(), TUNNEL_RANGES)
tunnel_id = ovs_db_v2.reserve_tunnel(self.session)
- self.assertGreaterEqual(tunnel_id, TUN_MIN)
- self.assertLessEqual(tunnel_id, TUN_MAX)
+ self.assertThat(tunnel_id, matchers.GreaterThan(TUN_MIN - 1))
+ self.assertThat(tunnel_id, matchers.LessThan(TUN_MAX + 1))
tunnel_ids.add(tunnel_id)
for tunnel_id in tunnel_ids:
ovs_db_v2.reserve_specific_tunnel(self.session, tunnel_id)
self.assertTrue(ovs_db_v2.get_tunnel_allocation(tunnel_id).allocated)
- with self.assertRaises(q_exc.TunnelIdInUse):
+ with testtools.ExpectedException(q_exc.TunnelIdInUse):
ovs_db_v2.reserve_specific_tunnel(self.session, tunnel_id)
ovs_db_v2.release_tunnel(self.session, tunnel_id, TUNNEL_RANGES)
ovs_db_v2.reserve_specific_tunnel(self.session, tunnel_id)
self.assertTrue(ovs_db_v2.get_tunnel_allocation(tunnel_id).allocated)
- with self.assertRaises(q_exc.TunnelIdInUse):
+ with testtools.ExpectedException(q_exc.TunnelIdInUse):
ovs_db_v2.reserve_specific_tunnel(self.session, tunnel_id)
ovs_db_v2.release_tunnel(self.session, tunnel_id, TUNNEL_RANGES)
# See the License for the specific language governing permissions and
# limitations under the License.
-import unittest
+import testtools
from oslo.config import cfg
from quantum.plugins.openvswitch.common import config
-class ConfigurationTest(unittest.TestCase):
+class ConfigurationTest(testtools.TestCase):
def test_defaults(self):
self.assertEqual('br-int', cfg.CONF.OVS.integration_bridge)
# @author: Dan Wendlandt, Nicira, Inc.
import mox
-import unittest2 as unittest
+import testtools
from quantum.agent.linux import ovs_lib, utils
from quantum.openstack.common import uuidutils
-class OVS_Lib_Test(unittest.TestCase):
+class OVS_Lib_Test(testtools.TestCase):
"""
A test suite to excercise the OVS libraries shared by Quantum agents.
Note: these tests do not actually execute ovs-* utilities, and thus
"""
def setUp(self):
+ super(OVS_Lib_Test, self).setUp()
self.BR_NAME = "br-int"
self.TO = "--timeout=2"
self.root_helper = 'sudo'
self.br = ovs_lib.OVSBridge(self.BR_NAME, self.root_helper)
self.mox.StubOutWithMock(utils, "execute")
-
- def tearDown(self):
- self.mox.UnsetStubs()
+ self.addCleanup(self.mox.UnsetStubs)
def test_vifport(self):
"""create and stringify vif port, confirm no exceptions"""
import mock
from oslo.config import cfg
-import unittest2 as unittest
+import testtools
from quantum.plugins.openvswitch.agent import ovs_quantum_agent
'ovs_quantum_plugin.AgentNotifierApi')
-class CreateAgentConfigMap(unittest.TestCase):
+class CreateAgentConfigMap(testtools.TestCase):
def test_create_agent_config_map_succeeds(self):
self.assertTrue(ovs_quantum_agent.create_agent_config_map(cfg.CONF))
self.addCleanup(cfg.CONF.reset)
# An ip address is required for tunneling but there is no default
cfg.CONF.set_override('enable_tunneling', True, group='OVS')
- with self.assertRaises(ValueError):
+ with testtools.ExpectedException(ValueError):
ovs_quantum_agent.create_agent_config_map(cfg.CONF)
-class TestOvsQuantumAgent(unittest.TestCase):
+class TestOvsQuantumAgent(testtools.TestCase):
def setUp(self):
+ super(TestOvsQuantumAgent, self).setUp()
self.addCleanup(cfg.CONF.reset)
self.addCleanup(mock.patch.stopall)
notifier_p = mock.patch(NOTIFIER)
"""
import stubout
-import unittest2
+import testtools
from quantum.agent import rpc as agent_rpc
from quantum.common import topics
from quantum.plugins.openvswitch import ovs_quantum_plugin as povs
-class rpcApiTestCase(unittest2.TestCase):
+class rpcApiTestCase(testtools.TestCase):
def _test_ovs_api(self, rpcapi, topic, method, rpc_method, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
#
# @author: Dave Lapsley, Nicira Networks, Inc.
-import unittest
-
import mox
from oslo.config import cfg
+import testtools
from quantum.agent.linux import ip_lib
from quantum.agent.linux import ovs_lib
self.vlan_id = vlan_id
-class TunnelTest(unittest.TestCase):
+class TunnelTest(testtools.TestCase):
+
def setUp(self):
+ super(TunnelTest, self).setUp()
cfg.CONF.set_override('rpc_backend',
'quantum.openstack.common.rpc.impl_fake')
cfg.CONF.set_override('report_interval', 0, 'AGENT')
self.mox = mox.Mox()
+ self.addCleanup(self.mox.UnsetStubs)
+
self.INT_BRIDGE = 'integration_bridge'
self.TUN_BRIDGE = 'tunnel_bridge'
self.MAP_TUN_BRIDGE = 'tunnel_bridge_mapping'
self.mox.StubOutWithMock(utils, 'get_interface_mac')
utils.get_interface_mac(self.INT_BRIDGE).AndReturn('000000000001')
- def tearDown(self):
- self.mox.UnsetStubs()
-
def testConstruct(self):
self.mox.ReplayAll()
b = ovs_quantum_agent.OVSQuantumAgent(self.INT_BRIDGE,
# under the License.
from oslo.config import cfg
-import unittest2
+import testtools
#NOTE this import loads tests required options
from quantum.plugins.ryu.common import config
-class ConfigurationTest(unittest2.TestCase):
+class ConfigurationTest(testtools.TestCase):
"""Configuration file Tests"""
def test_defaults(self):
self.assertEqual('br-int', cfg.CONF.OVS.integration_bridge)
import httplib
import mock
-import unittest2 as unittest
+import testtools
from quantum.openstack.common import importutils
from quantum.tests.unit.ryu import fake_ryu
-class RyuAgentTestCase(unittest.TestCase):
+class RyuAgentTestCase(testtools.TestCase):
_AGENT_NAME = 'quantum.plugins.ryu.agent.ryu_quantum_agent'
def setUp(self):
+ super(RyuAgentTestCase, self).setUp()
self.addCleanup(mock.patch.stopall)
self.fake_ryu = fake_ryu.patch_fake_ryu_client().start()
self.mod_agent = importutils.import_module(self._AGENT_NAME)
self.ryu_patcher = fake_ryu.patch_fake_ryu_client()
self.ryu_patcher.start()
super(RyuPluginV2TestCase, self).setUp(self._plugin_name)
+ self.addCleanup(self.ryu_patcher.stop)
class TestRyuBasicGet(test_plugin.TestBasicGet, RyuPluginV2TestCase):
# License for the specific language governing permissions and limitations
# under the License.
-import unittest2
+import testtools
from quantum.agent.common import config
assert conf.state_path.endswith('/var/lib/quantum')
-class TestRootHelper(unittest2.TestCase):
+class TestRootHelper(testtools.TestCase):
def test_agent_root_helper(self):
conf = config.setup_conf()
# under the License.
# @author: Dan Wendlandt, Nicira, Inc.
-import unittest
-
import mock
+import testtools
from quantum.agent.linux import utils
-class AgentUtilsExecuteTest(unittest.TestCase):
+class AgentUtilsExecuteTest(testtools.TestCase):
def setUp(self):
+ super(AgentUtilsExecuteTest, self).setUp()
self.root_helper = "echo"
self.test_file = "/tmp/test_execute.tmp"
open(self.test_file, 'w').close()
self.assertEqual(result, "%s\n" % self.test_file)
-class AgentUtilsGetInterfaceMAC(unittest.TestCase):
+class AgentUtilsGetInterfaceMAC(testtools.TestCase):
def test_get_interface_mac(self):
expect_val = '01:02:03:04:05:06'
with mock.patch('fcntl.ioctl') as ioctl:
import mock
from oslo.config import cfg
-import unittest2 as unittest
+import testtools
from quantum.agent import netns_cleanup_util as util
-class TestNullDelegate(unittest.TestCase):
+class TestNullDelegate(testtools.TestCase):
def test_getattribute(self):
null_delegate = util.NullDelegate()
self.assertIsNone(null_delegate.test())
-class TestNetnsCleanup(unittest.TestCase):
- def tearDown(self):
- cfg.CONF.reset()
+class TestNetnsCleanup(testtools.TestCase):
+ def setUp(self):
+ super(TestNetnsCleanup, self).setUp()
+ self.addCleanup(cfg.CONF.reset)
def test_kill_dhcp(self, dhcp_active=True):
conf = mock.Mock()
import itertools
import mock
from oslo.config import cfg
-import unittest2 as unittest
+import testtools
from quantum.agent.linux import ip_lib
from quantum.agent.linux import ovs_lib
from quantum.openstack.common import uuidutils
-class TestOVSCleanup(unittest.TestCase):
- def tearDown(self):
- cfg.CONF.reset()
+class TestOVSCleanup(testtools.TestCase):
+ def setUp(self):
+ super(TestOVSCleanup, self).setUp()
+ self.addCleanup(cfg.CONF.reset)
def test_setup_conf(self):
conf = util.setup_conf()
# License for the specific language governing permissions and limitations
# under the License.
-import unittest
-
import mock
from oslo.config import cfg
+import testtools
from quantum.agent import rpc
from quantum.openstack.common import context
-class AgentRPCPluginApi(unittest.TestCase):
+class AgentRPCPluginApi(testtools.TestCase):
def _test_rpc_call(self, method):
agent = rpc.PluginApi('fake_topic')
ctxt = context.RequestContext('fake_user', 'fake_project')
self._test_rpc_call('tunnel_sync')
-class AgentRPCMethods(unittest.TestCase):
+class AgentRPCMethods(testtools.TestCase):
def test_create_consumers(self):
dispatcher = mock.Mock()
expected = [
# @author: Zhongyue Luo, Intel Corporation.
#
-import unittest2
+import testtools
+from testtools import matchers
from webob import exc
from quantum.api import api_common as common
_resource_name = 'fake'
-class APICommonTestCase(unittest2.TestCase):
+class APICommonTestCase(testtools.TestCase):
def setUp(self):
+ super(APICommonTestCase, self).setUp()
self.controller = FakeController(None)
def test_prepare_request_body(self):
}
}
actual = self.controller._prepare_request_body(body, params)
- self.assertDictEqual(expect, actual)
+ self.assertThat(expect, matchers.Equals(actual))
def test_prepare_request_body_none(self):
body = None
}
}
actual = self.controller._prepare_request_body(body, params)
- self.assertDictEqual(expect, actual)
+ self.assertThat(expect, matchers.Equals(actual))
def test_prepare_request_body_keyerror(self):
body = {'t2': {}}
import mock
from oslo.config import cfg
-import unittest2 as unittest
+import testtools
+from testtools import matchers
import webob
from webob import exc
import webtest
return path
-class ResourceIndexTestCase(unittest.TestCase):
+class ResourceIndexTestCase(testtools.TestCase):
def test_index_json(self):
index = webtest.TestApp(router.Index({'foo': 'bar'}))
res = index.get('')
self.assertTrue(link['rel'] == 'self')
-class APIv2TestBase(unittest.TestCase):
+class APIv2TestBase(testtools.TestCase):
def setUp(self):
+ super(APIv2TestBase, self).setUp()
+
plugin = 'quantum.quantum_plugin_base_v2.QuantumPluginBaseV2'
# Ensure 'stale' patched copies of the plugin are never returned
QuantumManager._instance = None
instance = self.plugin.return_value
instance._QuantumPluginBaseV2__native_pagination_support = True
instance._QuantumPluginBaseV2__native_sorting_support = True
+ self.addCleanup(self._plugin_patcher.stop)
+ self.addCleanup(cfg.CONF.reset)
+
api = router.APIRouter()
self.api = webtest.TestApp(api)
- super(APIv2TestBase, self).setUp()
-
- def tearDown(self):
- self._plugin_patcher.stop()
- self.api = None
- self.plugin = None
- cfg.CONF.reset()
class _ArgMatcher(object):
class APIv2TestCase(APIv2TestBase):
- # NOTE(jkoelker) This potentially leaks the mock object if the setUp
- # raises without being caught. Using unittest2
- # or dropping 2.6 support so we can use addCleanup
- # will get around this.
def _do_field_list(self, resource, base_fields):
attr_info = attributes.RESOURCE_ATTRIBUTE_MAP[resource]
policy_attrs = [name for (name, info) in attr_info.items()
params=params).json
self.assertEqual(len(res['networks']), 2)
- self.assertItemsEqual([id1, id2],
- [res['networks'][0]['id'],
- res['networks'][1]['id']])
+ self.assertEqual(sorted([id1, id2]),
+ sorted([res['networks'][0]['id'],
+ res['networks'][1]['id']]))
self.assertIn('networks_links', res)
next_links = []
self.assertEqual(res.status_int, 400)
-class SubresourceTest(unittest.TestCase):
+class SubresourceTest(testtools.TestCase):
def setUp(self):
+ super(SubresourceTest, self).setUp()
+
plugin = 'quantum.tests.unit.test_api_v2.TestSubresourcePlugin'
QuantumManager._instance = None
PluginAwareExtensionManager._instance = None
self._plugin_patcher = mock.patch(plugin, autospec=True)
self.plugin = self._plugin_patcher.start()
+ self.addCleanup(self._plugin_patcher.stop)
+ self.addCleanup(cfg.CONF.reset)
router.SUB_RESOURCES['dummy'] = {
'collection_name': 'dummies',
self.api = webtest.TestApp(api)
def tearDown(self):
- self._plugin_patcher.stop()
- self.api = None
- self.plugin = None
router.SUB_RESOURCES = {}
- cfg.CONF.reset()
# Restore the global RESOURCE_ATTRIBUTE_MAP
attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
+ super(SubresourceTest, self).tearDown()
def test_index_sub_resource(self):
instance = self.plugin.return_value
fmt = 'xml'
-class V2Views(unittest.TestCase):
+class V2Views(testtools.TestCase):
def _view(self, keys, collection, resource):
data = dict((key, 'value') for key in keys)
data['fake'] = 'value'
self.assertEqual(res.status_int, exc.HTTPCreated.code)
-class ExtensionTestCase(unittest.TestCase):
- # NOTE(jkoelker) This potentially leaks the mock object if the setUp
- # raises without being caught. Using unittest2
- # or dropping 2.6 support so we can use addCleanup
- # will get around this.
+class ExtensionTestCase(testtools.TestCase):
def setUp(self):
+ super(ExtensionTestCase, self).setUp()
plugin = 'quantum.quantum_plugin_base_v2.QuantumPluginBaseV2'
# Ensure 'stale' patched copies of the plugin are never returned
self.api = webtest.TestApp(api)
def tearDown(self):
+ super(ExtensionTestCase, self).tearDown()
self._plugin_patcher.stop()
self.api = None
self.plugin = None
return
-class ListArgsTestCase(unittest.TestCase):
+class ListArgsTestCase(testtools.TestCase):
def test_list_args(self):
path = '/?fields=4&foo=3&fields=2&bar=1'
request = webob.Request.blank(path)
expect_val = ['2', '4']
actual_val = api_common.list_args(request, 'fields')
- self.assertItemsEqual(actual_val, expect_val)
+ self.assertEqual(sorted(actual_val), expect_val)
def test_list_args_with_empty(self):
path = '/?foo=4&bar=3&baz=2&qux=1'
self.assertEqual([], api_common.list_args(request, 'fields'))
-class FiltersTestCase(unittest.TestCase):
+class FiltersTestCase(testtools.TestCase):
def test_all_skip_args(self):
path = '/?fields=4&fields=3&fields=2&fields=1'
request = webob.Request.blank(path)
self.assertEqual(actual_val, expect_val)
-class CreateResourceTestCase(unittest.TestCase):
+class CreateResourceTestCase(testtools.TestCase):
def test_resource_creation(self):
resource = base.create_resource('fakes', 'fake', None, {})
self.assertIsInstance(resource, webob.dec.wsgify)
# @author: Zhongyue Luo, Intel Corporation.
#
-import unittest2 as unittest
-
import mock
+import testtools
from webob import exc
import webtest
from quantum import wsgi
-class RequestTestCase(unittest.TestCase):
+class RequestTestCase(testtools.TestCase):
def setUp(self):
+ super(RequestTestCase, self).setUp()
self.req = wsgi_resource.Request({'foo': 'bar'})
def test_content_type_missing(self):
self.assertTrue(self.req.context.is_admin)
-class ResourceTestCase(unittest.TestCase):
+class ResourceTestCase(testtools.TestCase):
def test_unmapped_quantum_error(self):
controller = mock.MagicMock()
controller.test.side_effect = q_exc.QuantumException()
# License for the specific language governing permissions and limitations
# under the License.
-import unittest2
+import testtools
from quantum.api.v2 import attributes
from quantum.common import exceptions as q_exc
-class TestAttributes(unittest2.TestCase):
+class TestAttributes(testtools.TestCase):
def _construct_dict_and_constraints(self):
""" Constructs a test dictionary and a definition of constraints.
del dictionary['key1']
msg = attributes._validate_dict(dictionary, constraints)
- self.assertIn('Expected keys:', msg, 'The error was not detected.')
+ self.assertIn('Expected keys:', msg)
def test_validate_dict_wrong_values(self):
dictionary, constraints = self._construct_dict_and_constraints()
del dictionary['key3']['k4']
dictionary['key3']['k5'] = 'a string value'
msg = attributes._validate_dict(dictionary, constraints)
- self.assertIn('Expected keys:', msg, 'The error was not detected.')
+ self.assertIn('Expected keys:', msg)
def test_validate_dict_or_none(self):
dictionary, constraints = self._construct_dict_and_constraints()
self.assertIsNone(msg)
-class TestConvertToBoolean(unittest2.TestCase):
+class TestConvertToBoolean(testtools.TestCase):
def test_convert_to_boolean_bool(self):
self.assertIs(attributes.convert_to_boolean(True), True)
'7')
-class TestConvertToInt(unittest2.TestCase):
+class TestConvertToInt(testtools.TestCase):
def test_convert_to_int_int(self):
self.assertEqual(attributes.convert_to_int(-1), -1)
value, attributes.convert_none_to_empty_list(value))
-class TestConvertKvp(unittest2.TestCase):
+class TestConvertKvp(testtools.TestCase):
def test_convert_kvp_list_to_dict_succeeds_for_missing_values(self):
result = attributes.convert_kvp_list_to_dict(['True'])
self.assertEqual({'a': ['b'], 'c': ['d']}, result)
def test_convert_kvp_str_to_list_fails_for_missing_key(self):
- with self.assertRaises(q_exc.InvalidInput):
+ with testtools.ExpectedException(q_exc.InvalidInput):
attributes.convert_kvp_str_to_list('=a')
def test_convert_kvp_str_to_list_fails_for_missing_equals(self):
- with self.assertRaises(q_exc.InvalidInput):
+ with testtools.ExpectedException(q_exc.InvalidInput):
attributes.convert_kvp_str_to_list('a')
def test_convert_kvp_str_to_list_succeeds_for_one_equals(self):
self.assertEqual(['a', 'a=a'], result)
-class TestConvertToList(unittest2.TestCase):
+class TestConvertToList(testtools.TestCase):
def test_convert_to_empty_list(self):
for item in (None, [], (), {}):
-import unittest
-
+import testtools
import webob
from quantum import auth
-class QuantumKeystoneContextTestCase(unittest.TestCase):
+class QuantumKeystoneContextTestCase(testtools.TestCase):
def setUp(self):
super(QuantumKeystoneContextTestCase, self).setUp()
# License for the specific language governing permissions and limitations
# under the License.
-import unittest2 as unittest
+import testtools
from quantum.common import utils
-class TestParseMappings(unittest.TestCase):
+class TestParseMappings(testtools.TestCase):
def parse(self, mapping_list, unique_values=True):
return utils.parse_mappings(mapping_list, unique_values)
def test_parse_mappings_fails_for_missing_separator(self):
- with self.assertRaises(ValueError):
+ with testtools.ExpectedException(ValueError):
self.parse(['key'])
def test_parse_mappings_fails_for_missing_key(self):
- with self.assertRaises(ValueError):
+ with testtools.ExpectedException(ValueError):
self.parse([':val'])
def test_parse_mappings_fails_for_missing_value(self):
- with self.assertRaises(ValueError):
+ with testtools.ExpectedException(ValueError):
self.parse(['key:'])
def test_parse_mappings_fails_for_extra_separator(self):
- with self.assertRaises(ValueError):
+ with testtools.ExpectedException(ValueError):
self.parse(['key:val:junk'])
def test_parse_mappings_fails_for_duplicate_key(self):
- with self.assertRaises(ValueError):
+ with testtools.ExpectedException(ValueError):
self.parse(['key:val1', 'key:val2'])
def test_parse_mappings_fails_for_duplicate_value(self):
- with self.assertRaises(ValueError):
+ with testtools.ExpectedException(ValueError):
self.parse(['key1:val', 'key2:val'])
def test_parse_mappings_succeeds_for_one_mapping(self):
# limitations under the License.
import os
-import unittest
+import testtools
from oslo.config import cfg
from quantum.common import config
-class ConfigurationTest(unittest.TestCase):
+class ConfigurationTest(testtools.TestCase):
def test_defaults(self):
self.assertEqual('0.0.0.0', cfg.CONF.bind_host)
"""Test of DB API"""
+import fixtures
import mock
from oslo.config import cfg
-import unittest2 as unittest
+import testtools
import quantum.db.api as db
-class DBTestCase(unittest.TestCase):
+class DBTestCase(testtools.TestCase):
def setUp(self):
+ super(DBTestCase, self).setUp()
cfg.CONF.set_override('sql_max_retries', 1, 'DATABASE')
cfg.CONF.set_override('reconnect_interval', 0, 'DATABASE')
-
- def tearDown(self):
- db._ENGINE = None
- cfg.CONF.reset()
+ self.addCleanup(cfg.CONF.reset)
+ self.useFixture(fixtures.MonkeyPatch('quantum.db.api._ENGINE', None))
def test_db_reconnect(self):
with mock.patch.object(db, 'register_models') as mock_register:
import sys
import mock
-import unittest2 as unittest
+import testtools
from quantum.db import migration
from quantum.db.migration import cli
-class TestDbMigration(unittest.TestCase):
+class TestDbMigration(testtools.TestCase):
def test_should_run_plugin_in_list(self):
self.assertTrue(migration.should_run('foo', ['foo', 'bar']))
self.assertFalse(migration.should_run('foo', ['bar']))
self.assertTrue(migration.should_run('foo', ['*']))
-class TestCli(unittest.TestCase):
+class TestCli(testtools.TestCase):
def setUp(self):
+ super(TestCli, self).setUp()
self.do_alembic_cmd_p = mock.patch.object(cli, 'do_alembic_command')
self.do_alembic_cmd = self.do_alembic_cmd_p.start()
-
- def tearDown(self):
- self.do_alembic_cmd_p.stop()
- cli.CONF.reset()
+ self.addCleanup(self.do_alembic_cmd_p.stop)
+ self.addCleanup(cli.CONF.reset)
def _main_test_helper(self, argv, func_name, exp_args=(), exp_kwargs={}):
with mock.patch.object(sys, 'argv', argv):
import mock
from oslo.config import cfg
import sqlalchemy as sa
-import unittest2
+import testtools
+from testtools import matchers
import webob.exc
import quantum
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
def tearDown(self):
- super(QuantumDbPluginV2TestCase, self).tearDown()
self.api = None
self._deserializers = None
self._skip_native_bulk = None
cfg.CONF.reset()
# Restore the original attribute map
attributes.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk
+ super(QuantumDbPluginV2TestCase, self).tearDown()
def _req(self, method, resource, data=None, fmt=None, id=None, params=None,
action=None, subresource=None, sub_id=None):
quantum_context=quantum_context,
query_params=query_params)
resource = resource.replace('-', '_')
- self.assertItemsEqual([i['id'] for i in res['%ss' % resource]],
- [i[resource]['id'] for i in items])
+ self.assertEqual(sorted([i['id'] for i in res['%ss' % resource]]),
+ sorted([i[resource]['id'] for i in items]))
@contextlib.contextmanager
def network(self, name='net1',
res = self.deserialize(self.fmt, req.get_response(api))
collection = collection.replace('-', '_')
expected_res = [item[collection]['id'] for item in items]
- self.assertListEqual([n['id'] for n in res["%ss" % collection]],
- expected_res)
+ self.assertEqual(sorted([n['id'] for n in res["%ss" % collection]]),
+ sorted(expected_res))
def _test_list_with_pagination(self, collection, items, sort,
limit, expected_page_num, query_params='',
while req:
page_num = page_num + 1
res = self.deserialize(self.fmt, req.get_response(api))
- self.assertLessEqual(len(res["%ss" % collection]), limit)
+ self.assertThat(len(res["%ss" % collection]),
+ matchers.LessThan(limit + 1))
items_res = items_res + res["%ss" % collection]
req = None
if '%ss_links' % collection in res:
self.assertEqual(len(res["%ss" % collection]),
limit)
self.assertEqual(page_num, expected_page_num)
- self.assertListEqual([n[verify_key] for n in items_res],
- [item[collection][verify_key] for item in items])
+ self.assertEqual(sorted([n[verify_key] for n in items_res]),
+ sorted([item[collection][verify_key]
+ for item in items]))
def _test_list_with_pagination_reverse(self, collection, items, sort,
limit, expected_page_num,
while req:
page_num = page_num + 1
res = self.deserialize(self.fmt, req.get_response(api))
- self.assertLessEqual(len(res["%ss" % collection]), limit)
+ self.assertThat(len(res["%ss" % collection]),
+ matchers.LessThan(limit + 1))
res["%ss" % collection].reverse()
item_res = item_res + res["%ss" % collection]
req = None
self.assertEqual(page_num, expected_page_num)
expected_res = [item[collection]['id'] for item in items]
expected_res.reverse()
- self.assertListEqual([n['id'] for n in item_res],
- expected_res)
+ self.assertEqual(sorted([n['id'] for n in item_res]),
+ sorted(expected_res))
class TestBasicGet(QuantumDbPluginV2TestCase):
ip_allocation = q.one()
- self.assertGreater(
+ self.assertThat(
ip_allocation.expiration - timeutils.utcnow(),
- datetime.timedelta(seconds=10))
+ matchers.GreaterThan(datetime.timedelta(seconds=10)))
def test_port_delete_holds_ip(self):
plugin = QuantumManager.get_plugin()
name = 'public_net'
keys = [('subnets', []), ('name', name), ('admin_state_up', True),
('status', 'ACTIVE'), ('shared', True)]
- with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
+ with testtools.ExpectedException(
+ webob.exc.HTTPClientError) as ctx_manager:
with self.network(name=name,
shared=True,
tenant_id="another_tenant",
set_context=True):
pass
- self.assertEqual(ctx_manager.exception.code, 403)
+ self.assertEqual(ctx_manager.exception.code, 403)
def test_update_network(self):
with self.network() as network:
with self.subnet(network=network,
gateway_ip=gateway_ip_1,
cidr=cidr_1):
- with self.assertRaises(
+ with testtools.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
with self.subnet(network=network,
gateway_ip=gateway_ip_2,
cidr=cidr_2):
pass
- self.assertEqual(ctx_manager.exception.code, 400)
+ self.assertEqual(ctx_manager.exception.code, 400)
def test_create_subnet_bad_V4_cidr(self):
with self.network() as network:
cidr_1 = '10.0.0.0/23'
cidr_2 = '10.0.0.0/24'
cfg.CONF.set_override('allow_overlapping_ips', False)
- with self.assertRaises(
- webob.exc.HTTPClientError) as ctx_manager:
+ with testtools.ExpectedException(
+ webob.exc.HTTPClientError) as ctx_manager:
with contextlib.nested(self.subnet(cidr=cidr_1),
self.subnet(cidr=cidr_2)):
pass
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.1',
'end': '10.0.0.5'}]
- with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
+ with testtools.ExpectedException(
+ webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(cidr=cidr,
allocation_pools=allocation_pools)
- self.assertEqual(ctx_manager.exception.code, 409)
+ self.assertEqual(ctx_manager.exception.code, 409)
def test_create_subnet_gateway_in_allocation_pool_returns_409(self):
gateway_ip = '10.0.0.50'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.1',
'end': '10.0.0.100'}]
- with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
+ with testtools.ExpectedException(
+ webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
- self.assertEqual(ctx_manager.exception.code, 409)
+ self.assertEqual(ctx_manager.exception.code, 409)
def test_create_subnet_overlapping_allocation_pools_returns_409(self):
gateway_ip = '10.0.0.1'
'end': '10.0.0.150'},
{'start': '10.0.0.140',
'end': '10.0.0.180'}]
- with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
+ with testtools.ExpectedException(
+ webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
- self.assertEqual(ctx_manager.exception.code, 409)
+ self.assertEqual(ctx_manager.exception.code, 409)
def test_create_subnet_invalid_allocation_pool_returns_400(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.256'}]
- with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
+ with testtools.ExpectedException(
+ webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
- self.assertEqual(ctx_manager.exception.code, 400)
+ self.assertEqual(ctx_manager.exception.code, 400)
def test_create_subnet_out_of_range_allocation_pool_returns_400(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.1.6'}]
- with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
+ with testtools.ExpectedException(
+ webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
- self.assertEqual(ctx_manager.exception.code, 400)
+ self.assertEqual(ctx_manager.exception.code, 400)
def test_create_subnet_shared_returns_400(self):
cidr = '10.0.0.0/24'
- with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
+ with testtools.ExpectedException(
+ webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(cidr=cidr,
shared=True)
- self.assertEqual(ctx_manager.exception.code, 400)
+ self.assertEqual(ctx_manager.exception.code, 400)
def test_create_subnet_inconsistent_ipv6_cidrv4(self):
with self.network() as network:
self.assertEqual(res.status_int, 204)
-class DbModelTestCase(unittest2.TestCase):
+class DbModelTestCase(testtools.TestCase):
""" DB model tests """
def test_repr(self):
""" testing the string representation of 'model' classes """
# See the License for the specific language governing permissions and
# limitations under the License.
-import unittest
-
import mock
+import testtools
from quantum.db import dhcp_rpc_base
-class TestDhcpRpcCallackMixin(unittest.TestCase):
+class TestDhcpRpcCallackMixin(testtools.TestCase):
def setUp(self):
+ super(TestDhcpRpcCallackMixin, self).setUp()
self.plugin_p = mock.patch('quantum.manager.QuantumManager.get_plugin')
get_plugin = self.plugin_p.start()
self.plugin = mock.Mock()
def tearDown(self):
self.log_p.stop()
self.plugin_p.stop()
+ super(TestDhcpRpcCallackMixin, self).tearDown()
def test_get_active_networks(self):
plugin_retval = [dict(id='a'), dict(id='b')]
import mock
from oslo.config import cfg
-import unittest2 as unittest
+import testtools
from quantum.agent.common import config
from quantum.agent.linux import interface
self.stdout = _stdout
-class TestDebugCommands(unittest.TestCase):
+class TestDebugCommands(testtools.TestCase):
def setUp(self):
+ super(TestDebugCommands, self).setUp()
cfg.CONF.register_opts(interface.OPTS)
cfg.CONF.register_opts(QuantumDebugAgent.OPTS)
cfg.CONF(args=[], project='quantum')
import eventlet
import mock
from oslo.config import cfg
-import unittest2 as unittest
+import testtools
from quantum.agent.common import config
from quantum.agent import dhcp_agent
ports=[])
-class TestDhcpAgent(unittest.TestCase):
+class TestDhcpAgent(testtools.TestCase):
def setUp(self):
+ super(TestDhcpAgent, self).setUp()
cfg.CONF.register_opts(dhcp_agent.DeviceManager.OPTS)
cfg.CONF.register_opts(dhcp_agent.DhcpAgent.OPTS)
cfg.CONF.register_opts(dhcp_agent.DhcpLeaseRelay.OPTS)
def tearDown(self):
self.driver_cls_p.stop()
cfg.CONF.reset()
+ super(TestDhcpAgent, self).tearDown()
def test_dhcp_agent_manager(self):
state_rpc_str = 'quantum.agent.rpc.PluginReportStateAPI'
dhcp.needs_resync = True
with mock.patch.object(dhcp, 'sync_state') as sync_state:
sync_state.side_effect = RuntimeError
- with self.assertRaises(RuntimeError):
+ with testtools.ExpectedException(RuntimeError):
dhcp._periodic_resync_helper()
sync_state.assert_called_once_with()
sleep.assert_called_once_with(dhcp.conf.resync_interval)
self.assertFalse(dhcp.needs_resync)
-class TestDhcpAgentEventHandler(unittest.TestCase):
+class TestDhcpAgentEventHandler(testtools.TestCase):
def setUp(self):
+ super(TestDhcpAgentEventHandler, self).setUp()
cfg.CONF.register_opts(dhcp_agent.DeviceManager.OPTS)
cfg.CONF.register_opts(dhcp_agent.DhcpLeaseRelay.OPTS)
cfg.CONF.set_override('interface_driver',
self.call_driver_p.stop()
self.cache_p.stop()
self.plugin_p.stop()
+ super(TestDhcpAgentEventHandler, self).tearDown()
def test_enable_dhcp_helper(self):
self.plugin.get_network_info.return_value = fake_network
self.assertEqual(self.call_driver.call_count, 0)
-class TestDhcpPluginApiProxy(unittest.TestCase):
+class TestDhcpPluginApiProxy(testtools.TestCase):
def setUp(self):
+ super(TestDhcpPluginApiProxy, self).setUp()
self.proxy = dhcp_agent.DhcpPluginApi('foo', {})
self.proxy.host = 'foo'
def tearDown(self):
self.make_msg_p.stop()
self.call_p.stop()
+ super(TestDhcpPluginApiProxy, self).tearDown()
def test_get_active_networks(self):
self.proxy.get_active_networks()
host='foo')
-class TestNetworkCache(unittest.TestCase):
+class TestNetworkCache(testtools.TestCase):
def test_put_network(self):
nc = dhcp_agent.NetworkCache()
nc.put(fake_network)
self.assertEqual(nc.get_port_by_id(fake_port1.id), fake_port1)
-class TestDeviceManager(unittest.TestCase):
+class TestDeviceManager(testtools.TestCase):
def setUp(self):
+ super(TestDeviceManager, self).setUp()
cfg.CONF.register_opts(dhcp_agent.DeviceManager.OPTS)
cfg.CONF.register_opts(dhcp_agent.DhcpAgent.OPTS)
cfg.CONF.set_override('interface_driver',
self.device_exists_p.stop()
self.iproute_cls_p.stop()
cfg.CONF.reset()
+ super(TestDeviceManager, self).tearDown()
def _test_setup_helper(self, device_exists, reuse_existing=False,
metadata_access_network=False,
self._test_setup_helper(False)
def test_setup_device_exists(self):
- with self.assertRaises(exceptions.PreexistingDeviceFailure):
+ with testtools.ExpectedException(exceptions.PreexistingDeviceFailure):
self._test_setup_helper(True)
def test_setup_device_exists_reuse(self):
self.assertEqual(dh.get_device_id(fake_network), expected)
-class TestDhcpLeaseRelay(unittest.TestCase):
+class TestDhcpLeaseRelay(testtools.TestCase):
def setUp(self):
+ super(TestDhcpLeaseRelay, self).setUp()
cfg.CONF.register_opts(dhcp_agent.DhcpLeaseRelay.OPTS)
self.unlink_p = mock.patch('os.unlink')
self.unlink = self.unlink_p.start()
def tearDown(self):
self.unlink_p.stop()
+ super(TestDhcpLeaseRelay, self).tearDown()
def test_init_relay_socket_path_no_prev_socket(self):
with mock.patch('os.path.exists') as exists:
self.unlink.side_effect = OSError
with mock.patch('os.path.exists') as exists:
exists.return_value = True
- with self.assertRaises(OSError):
+ with testtools.ExpectedException(OSError):
relay = dhcp_agent.DhcpLeaseRelay(None)
self.unlink.assert_called_once_with(
relay._handler)])
-class TestDictModel(unittest.TestCase):
+class TestDictModel(testtools.TestCase):
def test_basic_dict(self):
d = dict(a=1, b=2)
"""
from oslo.config import cfg
-import unittest2 as unittest
+import testtools
import webob.exc as webexc
import quantum
return self.objh[id]
-class ExtensionExtendedAttributeTestCase(unittest.TestCase):
+class ExtensionExtendedAttributeTestCase(testtools.TestCase):
def setUp(self):
+ super(ExtensionExtendedAttributeTestCase, self).setUp()
plugin = (
"quantum.tests.unit.test_extension_extended_attribute."
"ExtensionExtendedAttributeTestPlugin"
self._api = extensions.ExtensionMiddleware(app, ext_mgr=ext_mgr)
self._tenant_id = "8c70909f-b081-452d-872b-df48e6c355d1"
-
- def tearDown(self):
- # restore original resource attribute map before
- self._api = None
- cfg.CONF.reset()
+ self.addCleanup(cfg.CONF.reset)
def _do_request(self, method, path, data=None, params=None, action=None):
content_type = 'application/json'
{'router': {'routes': routes}})
body = self._show('routers', r['router']['id'])
- self.assertItemsEqual(body['router']['routes'],
- routes)
+ self.assertEqual(sorted(body['router']['routes']),
+ sorted(routes))
# clean-up
self._update('routers', r['router']['id'],
routes_orig}})
body = self._show('routers', r['router']['id'])
- self.assertItemsEqual(body['router']['routes'],
- routes_orig)
+ self.assertEqual(sorted(body['router']['routes']),
+ sorted(routes_orig))
routes_left = [{'destination': '135.207.0.0/16',
'nexthop': '10.0.1.3'},
routes_left}})
body = self._show('routers', r['router']['id'])
- self.assertItemsEqual(body['router']['routes'],
- routes_left)
+ self.assertEqual(sorted(body['router']['routes']),
+ sorted(routes_left))
body = self._update('routers', r['router']['id'],
{'router': {'routes': []}})
# under the License.
import os
-import unittest
import routes
+import testtools
import webob
import webtest
self._log("method_to_support_foxnsox_extension", context)
-class ResourceExtensionTest(unittest.TestCase):
+class ResourceExtensionTest(testtools.TestCase):
class ResourceExtensionController(wsgi.Controller):
self.assertEqual(404, response.status_int)
-class ActionExtensionTest(unittest.TestCase):
+class ActionExtensionTest(testtools.TestCase):
def setUp(self):
super(ActionExtensionTest, self).setUp()
self.assertEqual(404, response.status_int)
-class RequestExtensionTest(unittest.TestCase):
+class RequestExtensionTest(testtools.TestCase):
def test_headers_can_be_extended(self):
def extend_headers(req, res):
return _setup_extensions_test_app(manager)
-class ExtensionManagerTest(unittest.TestCase):
+class ExtensionManagerTest(testtools.TestCase):
def test_invalid_extensions_are_not_registered(self):
self.assertFalse('invalid_extension' in ext_mgr.extensions)
-class PluginAwareExtensionManagerTest(unittest.TestCase):
+class PluginAwareExtensionManagerTest(testtools.TestCase):
def test_unsupported_extensions_are_not_loaded(self):
stub_plugin = ext_stubs.StubPlugin(supported_extensions=["e1", "e3"])
import mock
from mock import call
-import unittest2 as unittest
+import testtools
from quantum.agent.linux.iptables_firewall import IptablesFirewallDriver
from quantum.tests.unit import test_api_v2
'IPv6': 'fe80::1'}
-class IptablesFirewallTestCase(unittest.TestCase):
+class IptablesFirewallTestCase(testtools.TestCase):
def setUp(self):
+ super(IptablesFirewallTestCase, self).setUp()
self.utils_exec_p = mock.patch(
'quantum.agent.linux.utils.execute')
self.utils_exec = self.utils_exec_p.start()
+ self.addCleanup(self.utils_exec_p.stop)
self.iptables_cls_p = mock.patch(
'quantum.agent.linux.iptables_manager.IptablesManager')
iptables_cls = self.iptables_cls_p.start()
+ self.addCleanup(self.iptables_cls_p.stop)
self.iptables_inst = mock.Mock()
self.v4filter_inst = mock.Mock()
self.v6filter_inst = mock.Mock()
self.firewall = IptablesFirewallDriver()
self.firewall.iptables = self.iptables_inst
- def tearDown(self):
- self.iptables_cls_p.stop()
- self.utils_exec_p.stop()
-
def _fake_port(self):
return {'device': 'tapfake_dev',
'mac_address': 'ff:ff:ff:ff',
import inspect
import os
-import unittest
import mox
+import testtools
from quantum.agent.linux import iptables_manager
-class IptablesManagerStateFulTestCase(unittest.TestCase):
+class IptablesManagerStateFulTestCase(testtools.TestCase):
def setUp(self):
+ super(IptablesManagerStateFulTestCase, self).setUp()
self.mox = mox.Mox()
self.root_helper = 'sudo'
self.iptables = (iptables_manager.
IptablesManager(root_helper=self.root_helper))
self.mox.StubOutWithMock(self.iptables, "execute")
-
- def tearDown(self):
- self.mox.UnsetStubs()
+ self.addCleanup(self.mox.UnsetStubs)
def test_binary_name(self):
self.assertEqual(iptables_manager.binary_name,
self.mox.VerifyAll()
-class IptablesManagerStateLessTestCase(unittest.TestCase):
+class IptablesManagerStateLessTestCase(testtools.TestCase):
def setUp(self):
+ super(IptablesManagerStateLessTestCase, self).setUp()
self.iptables = (iptables_manager.IptablesManager(state_less=True))
def test_nat_not_found(self):
# under the License.
import copy
-import unittest2
import mock
from oslo.config import cfg
+import testtools
from quantum.agent.common import config as agent_config
from quantum.agent import l3_agent
HOSTNAME = 'myhost'
-class TestBasicRouterOperations(unittest2.TestCase):
+class TestBasicRouterOperations(testtools.TestCase):
def setUp(self):
+ super(TestBasicRouterOperations, self).setUp()
self.conf = cfg.ConfigOpts()
self.conf.register_opts(base_config.core_opts)
self.conf.register_opts(l3_agent.L3NATAgent.OPTS)
self.dvr_cls_p.stop()
self.utils_exec_p.stop()
self.external_process_p.stop()
+ super(TestBasicRouterOperations, self).tearDown()
def testRouterInfoCreate(self):
id = _uuid()
import mock
from oslo.config import cfg
+import testtools
from webob import exc
import webtest
fmt = 'json'
def setUp(self):
+ super(L3NatExtensionTestCase, self).setUp()
plugin = 'quantum.extensions.l3.RouterPluginBase'
# Ensure 'stale' patched copies of the plugin are never returned
# Restore the global RESOURCE_ATTRIBUTE_MAP
attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
+ super(L3NatExtensionTestCase, self).tearDown()
def test_router_create(self):
router_id = _uuid()
def test_create_port_external_network_non_admin_fails(self):
with self.network(router__external=True) as ext_net:
with self.subnet(network=ext_net) as ext_subnet:
- with self.assertRaises(exc.HTTPClientError) as ctx_manager:
+ with testtools.ExpectedException(
+ exc.HTTPClientError) as ctx_manager:
with self.port(subnet=ext_subnet,
set_context='True',
tenant_id='noadmin'):
ext_net['network']['id'])
def test_create_external_network_non_admin_fails(self):
- with self.assertRaises(exc.HTTPClientError) as ctx_manager:
+ with testtools.ExpectedException(exc.HTTPClientError) as ctx_manager:
with self.network(router__external=True,
set_context='True',
tenant_id='noadmin'):
import os
import mock
-import unittest2 as unittest
+import testtools
from quantum.agent.linux import daemon
FAKE_FD = 8
-class TestPidfile(unittest.TestCase):
+class TestPidfile(testtools.TestCase):
def setUp(self):
+ super(TestPidfile, self).setUp()
self.os_p = mock.patch.object(daemon, 'os')
self.os = self.os_p.start()
+ self.addCleanup(self.os_p.stop)
self.os.open.return_value = FAKE_FD
self.fcntl_p = mock.patch.object(daemon, 'fcntl')
self.fcntl = self.fcntl_p.start()
+ self.addCleanup(self.fcntl_p.stop)
self.fcntl.flock.return_value = 0
- def tearDown(self):
- self.fcntl_p.stop()
- self.os_p.stop()
-
def test_init(self):
self.os.O_CREAT = os.O_CREAT
self.os.O_RDWR = os.O_RDWR
self.os.open.side_effect = IOError
with mock.patch.object(daemon.sys, 'stderr') as stderr:
- with self.assertRaises(SystemExit):
+ with testtools.ExpectedException(SystemExit):
p = daemon.Pidfile('thefile', 'python')
sys.assert_has_calls([
mock.call.stderr.write(mock.ANY),
['cat', '/proc/34/cmdline'], 'sudo')
-class TestDaemon(unittest.TestCase):
+class TestDaemon(testtools.TestCase):
def setUp(self):
+ super(TestDaemon, self).setUp()
self.os_p = mock.patch.object(daemon, 'os')
self.os = self.os_p.start()
def tearDown(self):
self.pidfile_p.stop()
self.os_p.stop()
+ super(TestDaemon, self).tearDown()
def test_init(self):
d = daemon.Daemon('pidfile')
def test_fork_parent(self):
self.os.fork.return_value = 1
- with self.assertRaises(SystemExit):
+ with testtools.ExpectedException(SystemExit):
d = daemon.Daemon('pidfile')
d._fork()
def test_fork_error(self):
self.os.fork.side_effect = lambda: OSError(1)
with mock.patch.object(daemon.sys, 'stderr') as stderr:
- with self.assertRaises(SystemExit):
+ with testtools.ExpectedException(SystemExit):
d = daemon.Daemon('pidfile', 'stdin')
d._fork()
with mock.patch.object(daemon.sys, 'stderr') as stderr:
with mock.patch.object(d, 'daemonize') as daemonize:
- with self.assertRaises(SystemExit):
+ with testtools.ExpectedException(SystemExit):
d.start()
self.assertFalse(daemonize.called)
import os
import socket
-import unittest2 as unittest
import mock
from oslo.config import cfg
+import testtools
from quantum.agent.common import config
from quantum.agent.linux import dhcp
ports = [FakePort1()]
-class TestDhcpBase(unittest.TestCase):
+class TestDhcpBase(testtools.TestCase):
def test_base_abc_error(self):
self.assertRaises(TypeError, dhcp.DhcpBase, None)
self.called.append('spawn')
-class TestBase(unittest.TestCase):
+class TestBase(testtools.TestCase):
def setUp(self):
+ super(TestBase, self).setUp()
root = os.path.dirname(os.path.dirname(__file__))
args = ['--config-file',
os.path.join(root, 'etc', 'quantum.conf.test')]
self.replace_p = mock.patch('quantum.agent.linux.dhcp.replace_file')
self.execute_p = mock.patch('quantum.agent.linux.utils.execute')
+ self.addCleanup(self.execute_p.stop)
self.safe = self.replace_p.start()
+ self.addCleanup(self.replace_p.stop)
self.execute = self.execute_p.start()
- def tearDown(self):
- self.execute_p.stop()
- self.replace_p.stop()
-
class TestDhcpLocalProcess(TestBase):
def test_active(self):
# @author: Mark McClain, DreamHost
import mock
-import unittest2 as unittest
+import testtools
from quantum.agent.linux import external_process as ep
-class TestProcessManager(unittest.TestCase):
+class TestProcessManager(testtools.TestCase):
def setUp(self):
+ super(TestProcessManager, self).setUp()
self.execute_p = mock.patch('quantum.agent.linux.utils.execute')
self.execute = self.execute_p.start()
+ self.addCleanup(self.execute_p.stop)
self.conf = mock.Mock()
self.conf.external_pids = '/var/path'
- def tearDown(self):
- self.execute_p.stop()
-
def test_enable_no_namespace(self):
callback = mock.Mock()
callback.return_value = ['the', 'cmd']
# License for the specific language governing permissions and limitations
# under the License.
-import unittest
-
import mock
from oslo.config import cfg
+import testtools
from quantum.agent.common import config
from quantum.agent.dhcp_agent import DeviceManager
network_id = network.id
-class TestBase(unittest.TestCase):
+class TestBase(testtools.TestCase):
def setUp(self):
+ super(TestBase, self).setUp()
root_helper_opt = [
cfg.StrOpt('root_helper', default='sudo'),
]
config.register_root_helper(self.conf)
self.ip_dev_p = mock.patch.object(ip_lib, 'IPDevice')
self.ip_dev = self.ip_dev_p.start()
+ self.addCleanup(self.ip_dev_p.stop)
self.ip_p = mock.patch.object(ip_lib, 'IPWrapper')
self.ip = self.ip_p.start()
+ self.addCleanup(self.ip_p.stop)
self.device_exists_p = mock.patch.object(ip_lib, 'device_exists')
self.device_exists = self.device_exists_p.start()
-
- def tearDown(self):
- # sometimes a test may turn this off
- try:
- self.device_exists_p.stop()
- except RuntimeError, e:
- pass
- self.ip_dev_p.stop()
- self.ip_p.stop()
+ self.addCleanup(self.device_exists_p.stop)
class TestABCDriver(TestBase):
self.conf.register_opts(DeviceManager.OPTS)
self.client_cls_p = mock.patch('quantumclient.v2_0.client.Client')
client_cls = self.client_cls_p.start()
+ self.addCleanup(self.client_cls_p.stop)
self.client_inst = mock.Mock()
client_cls.return_value = self.client_inst
'fake1:quantum.agent.linux.interface.OVSInterfaceDriver,'
'fake2:quantum.agent.linux.interface.BridgeInterfaceDriver')
- def tearDown(self):
- self.client_cls_p.stop()
- super(TestMetaInterfaceDriver, self).tearDown()
-
def test_get_driver_by_network_id(self):
meta_interface = interface.MetaInterfaceDriver(self.conf)
driver = meta_interface._get_driver_by_network_id('test')
# under the License.
import mock
-import unittest2 as unittest
+import testtools
from quantum.agent.linux import ip_lib
from quantum.common import exceptions
"10.0.0.0/24 dev qr-23380d11-d2 scope link src 10.0.0.1")
-class TestSubProcessBase(unittest.TestCase):
+class TestSubProcessBase(testtools.TestCase):
def setUp(self):
+ super(TestSubProcessBase, self).setUp()
self.execute_p = mock.patch('quantum.agent.linux.utils.execute')
self.execute = self.execute_p.start()
-
- def tearDown(self):
- self.execute_p.stop()
+ self.addCleanup(self.execute_p.stop)
def test_execute_wrapper(self):
ip_lib.SubProcessBase._execute('o', 'link', ('list',), 'sudo')
[], 'link', ('list',))
-class TestIpWrapper(unittest.TestCase):
+class TestIpWrapper(testtools.TestCase):
def setUp(self):
+ super(TestIpWrapper, self).setUp()
self.execute_p = mock.patch.object(ip_lib.IPWrapper, '_execute')
self.execute = self.execute_p.start()
-
- def tearDown(self):
- self.execute_p.stop()
+ self.addCleanup(self.execute_p.stop)
def test_get_devices(self):
self.execute.return_value = '\n'.join(LINK_SAMPLE)
self.assertEqual(dev.mock_calls, [])
-class TestIPDevice(unittest.TestCase):
+class TestIPDevice(testtools.TestCase):
def test_eq_same_name(self):
dev1 = ip_lib.IPDevice('tap0')
dev2 = ip_lib.IPDevice('tap0')
self.assertEqual(str(ip_lib.IPDevice('tap0')), 'tap0')
-class TestIPCommandBase(unittest.TestCase):
+class TestIPCommandBase(testtools.TestCase):
def setUp(self):
+ super(TestIPCommandBase, self).setUp()
self.ip = mock.Mock()
self.ip.root_helper = 'sudo'
self.ip.namespace = 'namespace'
[mock.call._as_root('o', 'foo', ('link', ), False)])
-class TestIPDeviceCommandBase(unittest.TestCase):
+class TestIPDeviceCommandBase(testtools.TestCase):
def setUp(self):
+ super(TestIPDeviceCommandBase, self).setUp()
self.ip_dev = mock.Mock()
self.ip_dev.name = 'eth0'
self.ip_dev.root_helper = 'sudo'
self.assertEqual(self.ip_cmd.name, 'eth0')
-class TestIPCmdBase(unittest.TestCase):
+class TestIPCmdBase(testtools.TestCase):
def setUp(self):
+ super(TestIPCmdBase, self).setUp()
self.parent = mock.Mock()
self.parent.name = 'eth0'
self.parent.root_helper = 'sudo'
root_helper='sudo', check_exit_code=True)
-class TestDeviceExists(unittest.TestCase):
+class TestDeviceExists(testtools.TestCase):
def test_device_exists(self):
with mock.patch.object(ip_lib.IPDevice, '_execute') as _execute:
_execute.return_value = LINK_SAMPLE[1]
import mock
from oslo.config import cfg
-import unittest2
+import testtools
from webob import exc
import webtest
fmt = 'json'
def setUp(self):
-
+ super(LoadBalancerExtensionTestCase, self).setUp()
plugin = 'quantum.extensions.loadbalancer.LoadBalancerPluginBase'
# Ensure 'stale' patched copies of the plugin are never returned
manager.QuantumManager._instance = None
self.api = None
self.plugin = None
cfg.CONF.reset()
+ super(LoadBalancerExtensionTestCase, self).tearDown()
def test_vip_create(self):
vip_id = _uuid()
import socket
import mock
-import unittest2 as unittest
+import testtools
import webob
from quantum.agent.metadata import agent
metadata_proxy_shared_secret = 'secret'
-class TestMetadataProxyHandler(unittest.TestCase):
+class TestMetadataProxyHandler(testtools.TestCase):
def setUp(self):
+ super(TestMetadataProxyHandler, self).setUp()
self.qclient_p = mock.patch('quantumclient.v2_0.client.Client')
self.qclient = self.qclient_p.start()
+ self.addCleanup(self.qclient_p.stop)
self.log_p = mock.patch.object(agent, 'LOG')
self.log = self.log_p.start()
+ self.addCleanup(self.log_p.stop)
self.handler = agent.MetadataProxyHandler(FakeConf)
- def tearDown(self):
- self.log_p.stop()
- self.qclient_p.stop()
-
def test_call(self):
req = mock.Mock()
with mock.patch.object(self.handler, '_get_instance_id') as get_id:
webob.exc.HTTPInternalServerError)
def test_proxy_request_other_code(self):
- with self.assertRaises(Exception) as e:
+ with testtools.ExpectedException(Exception) as e:
self._proxy_request_test_helper(302)
def test_sign_instance_id(self):
)
-class TestUnixDomainHttpProtocol(unittest.TestCase):
+class TestUnixDomainHttpProtocol(testtools.TestCase):
def test_init_empty_client(self):
u = agent.UnixDomainHttpProtocol(mock.Mock(), '', mock.Mock())
self.assertEqual(u.client_address, ('<local>', 0))
self.assertEqual(u.client_address, 'foo')
-class TestUnixDomainWSGIServer(unittest.TestCase):
+class TestUnixDomainWSGIServer(testtools.TestCase):
def setUp(self):
+ super(TestUnixDomainWSGIServer, self).setUp()
self.eventlet_p = mock.patch.object(agent, 'eventlet')
self.eventlet = self.eventlet_p.start()
+ self.addCleanup(self.eventlet_p.stop)
self.server = agent.UnixDomainWSGIServer('test')
- def tearDown(self):
- self.eventlet_p.stop()
-
def test_start(self):
mock_app = mock.Mock()
with mock.patch.object(self.server, 'pool') as pool:
self.assertTrue(len(logging.mock_calls))
-class TestUnixDomainMetadataProxy(unittest.TestCase):
+class TestUnixDomainMetadataProxy(testtools.TestCase):
def setUp(self):
+ super(TestUnixDomainMetadataProxy, self).setUp()
self.cfg_p = mock.patch.object(agent, 'cfg')
self.cfg = self.cfg_p.start()
+ self.addCleanup(self.cfg_p.stop)
self.cfg.CONF.metadata_proxy_socket = '/the/path'
- def tearDown(self):
- self.cfg_p.stop()
-
def test_init_doesnot_exists(self):
with mock.patch('os.path.isdir') as isdir:
with mock.patch('os.makedirs') as makedirs:
exists.return_value = True
unlink.side_effect = OSError
- with self.assertRaises(OSError):
+ with testtools.ExpectedException(OSError):
p = agent.UnixDomainMetadataProxy(mock.Mock())
isdir.assert_called_once_with('/the')
import socket
import mock
-import unittest2 as unittest
+import testtools
import webob
from quantum.agent.metadata import namespace_proxy as ns_proxy
metadata_proxy_shared_secret = 'secret'
-class TestUnixDomainHttpConnection(unittest.TestCase):
+class TestUnixDomainHttpConnection(testtools.TestCase):
def test_connect(self):
with mock.patch.object(ns_proxy, 'cfg') as cfg:
cfg.CONF.metadata_proxy_socket = '/the/path'
self.assertEqual(conn.timeout, 3)
-class TestNetworkMetadataProxyHandler(unittest.TestCase):
+class TestNetworkMetadataProxyHandler(testtools.TestCase):
def setUp(self):
+ super(TestNetworkMetadataProxyHandler, self).setUp()
self.log_p = mock.patch.object(ns_proxy, 'LOG')
self.log = self.log_p.start()
+ self.addCleanup(self.log_p.stop)
self.handler = ns_proxy.NetworkMetadataProxyHandler('router_id')
- def tearDown(self):
- self.log_p.stop()
-
def test_call(self):
req = mock.Mock(headers={})
with mock.patch.object(self.handler, '_proxy_request') as proxy_req:
req.query_string)
def test_no_argument_passed_to_init(self):
- with self.assertRaises(ValueError):
+ with testtools.ExpectedException(ValueError):
ns_proxy.NetworkMetadataProxyHandler()
def test_call_internal_server_error(self):
with mock.patch('httplib2.Http') as mock_http:
mock_http.return_value.request.return_value = (resp, '')
- with self.assertRaises(Exception):
+ with testtools.ExpectedException(Exception):
self.handler._proxy_request('192.168.1.1',
'/latest/meta-data',
'')
with mock.patch('httplib2.Http') as mock_http:
mock_http.return_value.request.side_effect = Exception
- with self.assertRaises(Exception):
+ with testtools.ExpectedException(Exception):
self.handler._proxy_request('192.168.1.1',
'/latest/meta-data',
'')
)
-class TestProxyDaemon(unittest.TestCase):
+class TestProxyDaemon(testtools.TestCase):
def test_init(self):
with mock.patch('quantum.agent.linux.daemon.Pidfile') as pf:
pd = ns_proxy.ProxyDaemon('pidfile', 9697, 'net_id', 'router_id')
import shutil
import StringIO
import tempfile
-import unittest2 as unittest
import urllib2
import mock
+import testtools
import quantum
from quantum.common import exceptions
from quantum import policy
-class PolicyFileTestCase(unittest.TestCase):
+class PolicyFileTestCase(testtools.TestCase):
def setUp(self):
super(PolicyFileTestCase, self).setUp()
policy.reset()
+ self.addCleanup(policy.reset)
self.context = context.Context('fake', 'fake')
self.target = {}
- def tearDown(self):
- super(PolicyFileTestCase, self).tearDown()
- policy.reset()
-
@contextlib.contextmanager
def _tempdir(self, **kwargs):
tmpdir = tempfile.mkdtemp(**kwargs)
self.target)
-class PolicyTestCase(unittest.TestCase):
+class PolicyTestCase(testtools.TestCase):
def setUp(self):
super(PolicyTestCase, self).setUp()
policy.reset()
+ self.addCleanup(policy.reset)
# NOTE(vish): preload rules to circumvent reloading from file
policy.init()
rules = {
self.context = context.Context('fake', 'fake', roles=['member'])
self.target = {}
- def tearDown(self):
- policy.reset()
- super(PolicyTestCase, self).tearDown()
-
def test_enforce_nonexistent_action_throws(self):
action = "example:noexist"
self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
policy.enforce(admin_context, uppercase_action, self.target)
-class DefaultPolicyTestCase(unittest.TestCase):
+class DefaultPolicyTestCase(testtools.TestCase):
def setUp(self):
super(DefaultPolicyTestCase, self).setUp()
policy.reset()
policy.init()
+ self.addCleanup(policy.reset)
self.rules = {
"default": '',
for k, v in self.rules.items()), default_rule)
common_policy.set_rules(rules)
- def tearDown(self):
- super(DefaultPolicyTestCase, self).tearDown()
- policy.reset()
-
def test_policy_called(self):
self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
self.context, "example:exist", {})
self.context, "example:noexist", {})
-class QuantumPolicyTestCase(unittest.TestCase):
+class QuantumPolicyTestCase(testtools.TestCase):
def setUp(self):
super(QuantumPolicyTestCase, self).setUp()
policy.reset()
policy.init()
+ self.addCleanup(policy.reset)
self.rules = dict((k, common_policy.parse_rule(v)) for k, v in {
"admin_or_network_owner": "role:admin or "
"tenant_id:%(network_tenant_id)s",
'init',
new=fakepolicyinit)
self.patcher.start()
+ self.addCleanup(self.patcher.stop)
self.context = context.Context('fake', 'fake', roles=['user'])
plugin_klass = importutils.import_class(
"quantum.db.db_base_plugin_v2.QuantumDbPluginV2")
self.plugin = plugin_klass()
- def tearDown(self):
- self.patcher.stop()
- policy.reset()
-
def _test_action_on_attr(self, context, action, attr, value,
exception=None):
action = "%s_network" % action
# License for the specific language governing permissions and limitations
# under the License.
-import unittest2 as unittest
-
import mock
+import testtools
from quantum import context
-class TestQuantumContext(unittest.TestCase):
+class TestQuantumContext(testtools.TestCase):
def setUp(self):
+ super(TestQuantumContext, self).setUp()
db_api = 'quantum.db.api.get_session'
self._db_api_session_patcher = mock.patch(db_api)
self.db_api_session = self._db_api_session_patcher.start()
-
- def tearDown(self):
- self._db_api_session_patcher.stop()
+ self.addCleanup(self._db_api_session_patcher.stop)
def testQuantumContextCreate(self):
cxt = context.Context('user_id', 'tenant_id')
import os
import types
-import unittest2
+
+import fixtures
+import testtools
from oslo.config import cfg
return os.path.join(ETCDIR, *p)
-class QuantumManagerTestCase(unittest2.TestCase):
+class QuantumManagerTestCase(testtools.TestCase):
def setUp(self):
super(QuantumManagerTestCase, self).setUp()
args = ['--config-file', etcdir('quantum.conf.test')]
# If test_config specifies some config-file, use it, as well
config.parse(args=args)
-
- def tearDown(self):
- unittest2.TestCase.tearDown(self)
- cfg.CONF.reset()
- QuantumManager._instance = None
+ self.addCleanup(cfg.CONF.reset)
+ self.useFixture(
+ fixtures.MonkeyPatch('quantum.manager.QuantumManager._instance'))
def test_service_plugin_is_loaded(self):
cfg.CONF.set_override("core_plugin",
cfg.CONF.set_override("service_plugins",
["quantum.tests.unit.dummy_plugin."
"DummyServicePlugin"])
- QuantumManager._instance = None
mgr = QuantumManager.get_instance()
plugin = mgr.get_service_plugins()[constants.DUMMY]
"QuantumDummyPlugin",
"quantum.tests.unit.dummy_plugin."
"QuantumDummyPlugin"])
- QuantumManager._instance = None
try:
QuantumManager.get_instance().get_service_plugins()
-import unittest2 as unittest
-import webtest
-
import mock
from oslo.config import cfg
+import testtools
+import webtest
from quantum.api import extensions
from quantum.api.v2 import attributes
fmt = 'json'
def setUp(self):
+ super(QuotaExtensionTestCase, self).setUp()
db._ENGINE = None
db._MAKER = None
# Ensure 'stale' patched copies of the plugin are never returned
# Restore the global RESOURCE_ATTRIBUTE_MAP
attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
+ super(QuotaExtensionTestCase, self).tearDown()
def test_quotas_loaded_right(self):
res = self.api.get(_get_path('quotas', fmt=self.fmt))
fmt=self.fmt),
self.serialize(quotas), extra_environ=env)
self.assertEqual(200, res.status_int)
- with self.assertRaises(exceptions.OverQuota):
+ with testtools.ExpectedException(exceptions.OverQuota):
quota.QUOTAS.limit_check(context.Context('', tenant_id),
tenant_id,
network=6)
def test_quotas_limit_check_with_invalid_quota_value(self):
tenant_id = 'tenant_id1'
- with self.assertRaises(exceptions.InvalidQuotaValue):
+ with testtools.ExpectedException(exceptions.InvalidQuotaValue):
quota.QUOTAS.limit_check(context.Context('', tenant_id),
tenant_id,
network=-1)
import os
import mock
-import unittest2 as unittest
+import testtools
from quantum.common import utils
from quantum.rootwrap import filters
from quantum.rootwrap import wrapper
-class RootwrapTestCase(unittest.TestCase):
+class RootwrapTestCase(testtools.TestCase):
def setUp(self):
super(RootwrapTestCase, self).setUp()
# License for the specific language governing permissions and limitations
# under the License.
-import unittest2 as unittest
+import testtools
import webob.exc as webexc
import quantum
pass
-class RouterServiceInsertionTestCase(unittest.TestCase):
+class RouterServiceInsertionTestCase(testtools.TestCase):
def setUp(self):
+ super(RouterServiceInsertionTestCase, self).setUp()
plugin = (
"quantum.tests.unit.test_routerserviceinsertion."
"RouterServiceInsertionTestPlugin"
cfg.CONF.set_override('core_plugin', plugin)
cfg.CONF.set_override('service_plugins', [plugin])
cfg.CONF.set_override('quota_router', -1, group='QUOTAS')
+ self.addCleanup(cfg.CONF.reset)
# Ensure 'stale' patched copies of the plugin are never returned
quantum.manager.QuantumManager._instance = None
res = self._do_request('GET', _get_path('service-types'))
self._service_type_id = res['service_types'][0]['id']
- def tearDown(self):
- self._api = None
- cfg.CONF.reset()
-
def _do_request(self, method, path, data=None, params=None, action=None):
content_type = 'application/json'
body = None
}
if update_service_type_id:
data["router"]["service_type_id"] = _uuid()
- with self.assertRaises(webexc.HTTPClientError) as ctx_manager:
+ with testtools.ExpectedException(
+ webexc.HTTPClientError) as ctx_manager:
res = self._do_request(
'PUT', _get_path('routers/{0}'.format(router_id)), data)
- self.assertEqual(ctx_manager.exception.code, 400)
+ self.assertEqual(ctx_manager.exception.code, 400)
else:
res = self._do_request(
'PUT', _get_path('routers/{0}'.format(router_id)), data)
data = {res: uattrs}
if update_router_id:
uattrs['router_id'] = self._router_id
- with self.assertRaises(webexc.HTTPClientError) as ctx_manager:
+ with testtools.ExpectedException(
+ webexc.HTTPClientError) as ctx_manager:
newobj = self._do_request(
'PUT',
_get_path('lb/{0}s/{1}'.format(res, obj['id'])), data)
- self.assertEqual(ctx_manager.exception.code, 400)
+ self.assertEqual(ctx_manager.exception.code, 400)
else:
newobj = self._do_request(
'PUT',
# under the License.
from contextlib import nested
+
import mock
from mock import call
-import unittest2 as unittest
-
import mox
from oslo.config import cfg
+import testtools
from quantum.agent import firewall as firewall_base
from quantum.agent.linux import iptables_manager
fmt = 'xml'
-class SGAgentRpcCallBackMixinTestCase(unittest.TestCase):
+class SGAgentRpcCallBackMixinTestCase(testtools.TestCase):
def setUp(self):
+ super(SGAgentRpcCallBackMixinTestCase, self).setUp()
self.rpc = sg_rpc.SecurityGroupAgentRpcCallbackMixin()
self.rpc.sg_agent = mock.Mock()
[call.security_groups_provider_updated()])
-class SecurityGroupAgentRpcTestCase(unittest.TestCase):
+class SecurityGroupAgentRpcTestCase(testtools.TestCase):
def setUp(self):
+ super(SecurityGroupAgentRpcTestCase, self).setUp()
self.agent = sg_rpc.SecurityGroupAgentRpcMixin()
self.agent.context = None
self.addCleanup(mock.patch.stopall)
pass
-class SecurityGroupServerRpcApiTestCase(unittest.TestCase):
+class SecurityGroupServerRpcApiTestCase(testtools.TestCase):
def setUp(self):
+ super(SecurityGroupServerRpcApiTestCase, self).setUp()
self.rpc = FakeSGRpcApi('fake_topic')
self.rpc.call = mock.Mock()
pass
-class SecurityGroupAgentRpcApiTestCase(unittest.TestCase):
+class SecurityGroupAgentRpcApiTestCase(testtools.TestCase):
def setUp(self):
+ super(SecurityGroupAgentRpcApiTestCase, self).setUp()
self.notifier = FakeSGNotifierAPI(topic='fake',
default_version='1.0')
self.notifier.fanout_cast = mock.Mock()
FIREWALL_BASE_PACKAGE = 'quantum.agent.linux.iptables_firewall.'
-class TestSecurityGroupAgentWithIptables(unittest.TestCase):
+class TestSecurityGroupAgentWithIptables(testtools.TestCase):
FIREWALL_DRIVER = FIREWALL_BASE_PACKAGE + 'IptablesFirewallDriver'
PHYSDEV_INGRESS = 'physdev-out'
PHYSDEV_EGRESS = 'physdev-in'
def setUp(self):
+ super(TestSecurityGroupAgentWithIptables, self).setUp()
self.mox = mox.Mox()
agent_opts = [
cfg.StrOpt('root_helper', default='sudo'),
import contextlib
import logging
-import unittest2 as unittest
import mock
from oslo.config import cfg
+import testtools
import webob.exc as webexc
import webtest
cfg.CONF.set_override('service_plugins',
["%s.%s" % (dp.__name__,
dp.DummyServicePlugin.__name__)])
+ self.addCleanup(cfg.CONF.reset)
# Make sure at each test a new instance of the plugin is returned
manager.QuantumManager._instance = None
# Ensure existing ExtensionManager is not used
self.resource_name = servicetype.RESOURCE_NAME.replace('-', '_')
super(ServiceTypeTestCaseBase, self).setUp()
- def tearDown(self):
- self.api = None
- cfg.CONF.reset()
-
class ServiceTypeExtensionTestCase(ServiceTypeTestCaseBase):
"%s.%s" % (servicetype_db.__name__,
servicetype_db.ServiceTypeManager.__name__),
autospec=True)
+ self.addCleanup(self._patcher.stop)
self.mock_mgr = self._patcher.start()
self.mock_mgr.get_instance.return_value = self.mock_mgr.return_value
super(ServiceTypeExtensionTestCase, self).setUp()
- def tearDown(self):
- self._patcher.stop()
- super(ServiceTypeExtensionTestCase, self).tearDown()
-
def _test_service_type_create(self, env=None,
expected_status=webexc.HTTPCreated.code):
tenant_id = 'fake'
plugin_name = "%s.%s" % (dp.__name__, dp.DummyServicePlugin.__name__)
cfg.CONF.set_override('service_definition', ['dummy:%s' % plugin_name],
group='DEFAULT_SERVICETYPE')
+ self.addCleanup(db_api.clear_db)
super(ServiceTypeManagerTestCase, self).setUp()
- def tearDown(self):
- super(ServiceTypeManagerTestCase, self).tearDown()
- db_api.clear_db()
-
@contextlib.contextmanager
def service_type(self, name='svc_type',
default=True,
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2011 OpenStack LLC.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import os
-from tempfile import mkstemp
-import unittest
-
-from quantum.openstack.common.setup import canonicalize_emails
-from quantum.openstack.common.setup import parse_mailmap
-
-
-class SetupTest(unittest.TestCase):
-
- def setUp(self):
- (fd, self.mailmap) = mkstemp(prefix='openstack', suffix='.mailmap')
-
- def test_str_dict_replace(self):
- string = 'Johnnie T. Hozer'
- mapping = {'T.': 'The'}
- self.assertEqual('Johnnie The Hozer',
- canonicalize_emails(string, mapping))
-
- def test_mailmap_with_fullname(self):
- with open(self.mailmap, 'w') as mm_fh:
- mm_fh.write("Foo Bar <email@foo.com> Foo Bar <email@bar.com>\n")
- self.assertEqual({'<email@bar.com>': '<email@foo.com>'},
- parse_mailmap(self.mailmap))
-
- def test_mailmap_with_firstname(self):
- with open(self.mailmap, 'w') as mm_fh:
- mm_fh.write("Foo <email@foo.com> Foo <email@bar.com>\n")
- self.assertEqual({'<email@bar.com>': '<email@foo.com>'},
- parse_mailmap(self.mailmap))
-
- def test_mailmap_with_noname(self):
- with open(self.mailmap, 'w') as mm_fh:
- mm_fh.write("<email@foo.com> <email@bar.com>\n")
- self.assertEqual({'<email@bar.com>': '<email@foo.com>'},
- parse_mailmap(self.mailmap))
-
- def tearDown(self):
- if os.path.exists(self.mailmap):
- os.remove(self.mailmap)
import socket
import mock
-import unittest2 as unittest
+import testtools
from quantum.api.v2 import attributes
from quantum.common import constants
from quantum import wsgi
-class TestWSGIServer(unittest.TestCase):
+class TestWSGIServer(testtools.TestCase):
"""WSGI server tests."""
def test_start_random_port(self):
])
-class SerializerTest(unittest.TestCase):
+class SerializerTest(testtools.TestCase):
def test_serialize_unknown_content_type(self):
"""
Test serialize verifies that exception InvalidContentType is raised
serializer.get_deserialize_handler, content_type)
-class RequestDeserializerTest(unittest.TestCase):
+class RequestDeserializerTest(testtools.TestCase):
def test_get_body_deserializer_unknown_content_type(self):
"""
Test get body deserializer verifies
deserializer.get_body_deserializer, content_type)
-class ResponseSerializerTest(unittest.TestCase):
+class ResponseSerializerTest(testtools.TestCase):
def setUp(self):
+ super(ResponseSerializerTest, self).setUp()
+
class JSONSerializer(object):
def serialize(self, data, action='default'):
return 'pew_json'
self.serializer.get_body_serializer, 'application/unknown')
-class XMLDeserializerTest(unittest.TestCase):
+class XMLDeserializerTest(testtools.TestCase):
def test_default_raise_Maiformed_Exception(self):
"""
Test verifies that exception MalformedRequestBody is raised
exception.MalformedRequestBody, deserializer.default, data_string)
-class JSONDeserializerTest(unittest.TestCase):
+class JSONDeserializerTest(testtools.TestCase):
def test_default_raise_Maiformed_Exception(self):
"""
Test verifies JsonDeserializer.default
exception.MalformedRequestBody, deserializer.default, data_string)
-class ResourceTest(unittest.TestCase):
+class ResourceTest(testtools.TestCase):
def test_dispatch_unknown_controller_action(self):
class Controller(object):
def index(self, request, pants=None):
self.assertEqual(400, result.status_int)
-class XMLDictSerializerTest(unittest.TestCase):
+class XMLDictSerializerTest(testtools.TestCase):
def test_xml(self):
NETWORK = {'network': {'test': None,
'tenant_id': 'test-tenant',
# License for the specific language governing permissions and limitations
# under the License.
-import unittest2 as unittest
+import testtools
from quantum.api.v2 import attributes
from quantum import wsgi
return req
-class WebTestCase(unittest.TestCase):
+class WebTestCase(testtools.TestCase):
fmt = 'json'
def setUp(self):
+ super(WebTestCase, self).setUp()
json_deserializer = wsgi.JSONDeserializer()
xml_deserializer = wsgi.XMLDeserializer(
attributes.get_attr_metadata())
'application/json': json_deserializer,
'application/xml': xml_deserializer,
}
- super(WebTestCase, self).setUp()
def deserialize(self, response):
ctype = 'application/%s' % self.fmt
import gettext
import os
-import unittest
import sys
from quantum.common.test_lib import run_tests
# tissue http://pypi.python.org/pypi/tissue (pep8 checker)
# openstack-nose https://github.com/jkoelker/openstack-nose
verbosity=2
-detailed-errors=1
cover-package = quantum
cover-html = true
cover-erase = true
cliff
coverage
distribute>=0.6.24
+fixtures>=0.3.12
mock>=1.0b1
mox==0.5.3
nose
openstack.nose_plugin
pep8
sphinx>=1.1.2
-unittest2
+testtools>=0.9.27
webtest==1.3.3
# Packages for the Cisco Plugin
###############################