]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Removing code related to functional tests
authorSalvatore Orlando <salvatore.orlando@eu.citrix.com>
Thu, 14 Jul 2011 11:47:49 +0000 (12:47 +0100)
committerSalvatore Orlando <salvatore.orlando@eu.citrix.com>
Thu, 14 Jul 2011 11:47:49 +0000 (12:47 +0100)
tests/functional/__init__.py [deleted file]
tests/functional/miniclient.py [deleted file]
tests/functional/test_service.py [deleted file]

diff --git a/tests/functional/__init__.py b/tests/functional/__init__.py
deleted file mode 100644 (file)
index 92e9598..0000000
+++ /dev/null
@@ -1,294 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2011 Somebody PLC
-# All Rights Reserved.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-"""
-Base test class for running non-stubbed tests (functional tests)
-
-The FunctionalTest class contains helper methods for starting the API
-and Registry server, grabbing the logs of each, cleaning up pidfiles,
-and spinning down the servers.
-"""
-
-import datetime
-import functools
-import os
-import random
-import shutil
-import signal
-import socket
-import tempfile
-import time
-import unittest
-import urlparse
-
-from tests.utils import execute, get_unused_port
-
-from sqlalchemy import create_engine
-
-
-class Server(object):
-    """
-    Class used to easily manage starting and stopping
-    a server during functional test runs.
-    """
-    def __init__(self, test_dir, port):
-        """
-        Creates a new Server object.
-
-        :param test_dir: The directory where all test stuff is kept. This is
-                         passed from the FunctionalTestCase.
-        :param port: The port to start a server up on.
-        """
-        self.verbose = True
-        self.debug = True
-        self.test_dir = test_dir
-        self.bind_port = port
-        self.conf_file = None
-        self.conf_base = None
-
-    def start(self, **kwargs):
-        """
-        Starts the server.
-
-        Any kwargs passed to this method will override the configuration
-        value in the conf file used in starting the servers.
-        """
-        if self.conf_file:
-            raise RuntimeError("Server configuration file already exists!")
-        if not self.conf_base:
-            raise RuntimeError("Subclass did not populate config_base!")
-
-        conf_override = self.__dict__.copy()
-        if kwargs:
-            conf_override.update(**kwargs)
-
-        # Create temporary configuration file for Quantum Unit tests.
-
-        conf_file = tempfile.NamedTemporaryFile()
-        conf_file.write(self.conf_base % conf_override)
-        conf_file.flush()
-        self.conf_file = conf_file
-        self.conf_file_name = conf_file.name
-
-        cmd = ("./bin/quantum %(conf_file_name)s" % self.__dict__)
-        return execute(cmd)
-
-    def stop(self):
-        """
-        Spin down the server.
-        """
-        # The only way we can do that at the moment is by killing quantum
-        # TODO - find quantum PID and do a sudo kill
-
-
-class ApiServer(Server):
-
-    """
-    Server object that starts/stops/manages the API server
-    """
-
-    def __init__(self, test_dir, port, registry_port):
-        super(ApiServer, self).__init__(test_dir, port)
-        self.server_name = 'api'
-        self.default_store = 'file'
-        self.image_dir = os.path.join(self.test_dir,
-                                         "images")
-        self.pid_file = os.path.join(self.test_dir,
-                                         "api.pid")
-        self.log_file = os.path.join(self.test_dir, "api.log")
-        self.registry_port = registry_port
-        self.conf_base = """[DEFAULT]
-verbose = %(verbose)s
-debug = %(debug)s
-filesystem_store_datadir=%(image_dir)s
-default_store = %(default_store)s
-bind_host = 0.0.0.0
-bind_port = %(bind_port)s
-registry_host = 0.0.0.0
-registry_port = %(registry_port)s
-log_file = %(log_file)s
-
-[pipeline:glance-api]
-pipeline = versionnegotiation apiv1app
-
-[pipeline:versions]
-pipeline = versionsapp
-
-[app:versionsapp]
-paste.app_factory = glance.api.versions:app_factory
-
-[app:apiv1app]
-paste.app_factory = glance.api.v1:app_factory
-
-[filter:versionnegotiation]
-paste.filter_factory = glance.api.middleware.version_negotiation:filter_factory
-"""
-
-
-class QuantumAPIServer(Server):
-
-    """
-    Server object that starts/stops/manages the Quantum API Server
-    """
-
-    def __init__(self, test_dir, port):
-        super(QuantumAPIServer, self).__init__(test_dir, port)
-
-        self.db_file = os.path.join(self.test_dir, ':memory:')
-        self.sql_connection = 'sqlite:///%s' % self.db_file
-        self.conf_base = """[DEFAULT]
-# Show more verbose log output (sets INFO log level output)
-verbose = %(verbose)s
-# Show debugging output in logs (sets DEBUG log level output)
-debug = %(debug)s
-# Address to bind the API server
-bind_host = 0.0.0.0
-# Port for test API server
-bind_port = %(bind_port)s
-
-[composite:quantum]
-use = egg:Paste#urlmap
-/: quantumversions
-/v0.1: quantumapi
-
-[app:quantumversions]
-paste.app_factory = quantum.api.versions:Versions.factory
-
-[app:quantumapi]
-paste.app_factory = quantum.api:APIRouterV01.factory
-"""
-
-
-class FunctionalTest(unittest.TestCase):
-
-    """
-    Base test class for any test that wants to test the actual
-    servers and clients and not just the stubbed out interfaces
-    """
-
-    def setUp(self):
-
-        self.test_id = random.randint(0, 100000)
-        self.test_port = get_unused_port()
-
-        self.quantum_server = QuantumAPIServer(self.test_dir,
-                                               self.test_port)
-
-    def tearDown(self):
-        self.cleanup()
-        # We destroy the test data store between each test case,
-        # and recreate it, which ensures that we have no side-effects
-        # from the tests
-        self._reset_database()
-
-    def _reset_database(self):
-        conn_string = self.registry_server.sql_connection
-        conn_pieces = urlparse.urlparse(conn_string)
-        if conn_string.startswith('sqlite'):
-            # We can just delete the SQLite database, which is
-            # the easiest and cleanest solution
-            db_path = conn_pieces.path.strip('/')
-            if db_path and os.path.exists(db_path):
-                os.unlink(db_path)
-            # No need to recreate the SQLite DB. SQLite will
-            # create it for us if it's not there...
-        elif conn_string.startswith('mysql'):
-            # We can execute the MySQL client to destroy and re-create
-            # the MYSQL database, which is easier and less error-prone
-            # than using SQLAlchemy to do this via MetaData...trust me.
-            database = conn_pieces.path.strip('/')
-            loc_pieces = conn_pieces.netloc.split('@')
-            host = loc_pieces[1]
-            auth_pieces = loc_pieces[0].split(':')
-            user = auth_pieces[0]
-            password = ""
-            if len(auth_pieces) > 1:
-                if auth_pieces[1].strip():
-                    password = "-p%s" % auth_pieces[1]
-            sql = ("drop database if exists %(database)s; "
-                   "create database %(database)s;") % locals()
-            cmd = ("mysql -u%(user)s %(password)s -h%(host)s "
-                   "-e\"%(sql)s\"") % locals()
-            exitcode, out, err = execute(cmd)
-            self.assertEqual(0, exitcode)
-
-    def start_servers(self, **kwargs):
-        """
-        Starts the Quantum API server on an unused port.
-
-        Any kwargs passed to this method will override the configuration
-        value in the conf file used in starting the server.
-        """
-
-        exitcode, out, err = self.quantum_server.start(**kwargs)
-
-        self.assertEqual(0, exitcode,
-                         "Failed to spin up the Quantum server. "
-                         "Got: %s" % err)
-        #self.assertTrue("Starting quantum with" in out)
-        #TODO: replace with appropriate assert
-
-        self.wait_for_servers()
-
-    def ping_server(self, port):
-        """
-        Simple ping on the port. If responsive, return True, else
-        return False.
-
-        :note We use raw sockets, not ping here, since ping uses ICMP and
-        has no concept of ports...
-        """
-        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        try:
-            s.connect(("127.0.0.1", port))
-            s.close()
-            return True
-        except socket.error, e:
-            return False
-
-    def wait_for_servers(self, timeout=3):
-        """
-        Tight loop, waiting for both API and registry server to be
-        available on the ports. Returns when both are pingable. There
-        is a timeout on waiting for the servers to come up.
-
-        :param timeout: Optional, defaults to 3 seconds
-        """
-        now = datetime.datetime.now()
-        timeout_time = now + datetime.timedelta(seconds=timeout)
-        while (timeout_time > now):
-            if self.ping_server(self.api_port) and\
-               self.ping_server(self.registry_port):
-                return
-            now = datetime.datetime.now()
-            time.sleep(0.05)
-        self.assertFalse(True, "Failed to start servers.")
-
-    def stop_servers(self):
-        """
-        Called to stop the started servers in a normal fashion. Note
-        that cleanup() will stop the servers using a fairly draconian
-        method of sending a SIGTERM signal to the servers. Here, we use
-        the glance-control stop method to gracefully shut the server down.
-        This method also asserts that the shutdown was clean, and so it
-        is meant to be called during a normal test case sequence.
-        """
-
-        exitcode, out, err = self.quantum_server.stop()
-        self.assertEqual(0, exitcode,
-                         "Failed to spin down the Quantum server. "
-                         "Got: %s" % err)
diff --git a/tests/functional/miniclient.py b/tests/functional/miniclient.py
deleted file mode 100644 (file)
index be49867..0000000
+++ /dev/null
@@ -1,99 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2011 Citrix Systems
-# All Rights Reserved.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-import httplib
-import socket
-import urllib
-
-
-class MiniClient(object):
-
-    """A base client class - derived from Glance.BaseClient"""
-
-    action_prefix = '/v0.1/tenants/{tenant_id}'
-
-    def __init__(self, host, port, use_ssl):
-        """
-        Creates a new client to some service.
-
-        :param host: The host where service resides
-        :param port: The port where service resides
-        :param use_ssl: Should we use HTTPS?
-        """
-        self.host = host
-        self.port = port
-        self.use_ssl = use_ssl
-        self.connection = None
-
-    def get_connection_type(self):
-        """
-        Returns the proper connection type
-        """
-        if self.use_ssl:
-            return httplib.HTTPSConnection
-        else:
-            return httplib.HTTPConnection
-
-    def do_request(self, tenant, method, action, body=None,
-                   headers=None, params=None):
-        """
-        Connects to the server and issues a request.
-        Returns the result data, or raises an appropriate exception if
-        HTTP status code is not 2xx
-
-        :param method: HTTP method ("GET", "POST", "PUT", etc...)
-        :param body: string of data to send, or None (default)
-        :param headers: mapping of key/value pairs to add as headers
-        :param params: dictionary of key/value pairs to add to append
-                             to action
-
-        """
-        action = MiniClient.action_prefix + action
-        action = action.replace('{tenant_id}', tenant)
-        if type(params) is dict:
-            action += '?' + urllib.urlencode(params)
-
-        try:
-            connection_type = self.get_connection_type()
-            headers = headers or {}
-
-            # Open connection and send request
-            c = connection_type(self.host, self.port)
-            c.request(method, action, body, headers)
-            res = c.getresponse()
-            status_code = self.get_status_code(res)
-            if status_code in (httplib.OK,
-                               httplib.CREATED,
-                               httplib.ACCEPTED,
-                               httplib.NO_CONTENT):
-                return res
-            else:
-                raise Exception("Server returned error: %s" % res.read())
-
-        except (socket.error, IOError), e:
-            raise Exception("Unable to connect to "
-                            "server. Got error: %s" % e)
-
-    def get_status_code(self, response):
-        """
-        Returns the integer status code from the response, which
-        can be either a Webob.Response (used in testing) or httplib.Response
-        """
-        if hasattr(response, 'status_int'):
-            return response.status_int
-        else:
-            return response.status
diff --git a/tests/functional/test_service.py b/tests/functional/test_service.py
deleted file mode 100644 (file)
index bdd926f..0000000
+++ /dev/null
@@ -1,221 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2011 Citrix Systems
-# Copyright 2011 Nicira Networks
-# All Rights Reserved.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-import gettext
-import unittest
-
-gettext.install('quantum', unicode=1)
-
-from miniclient import MiniClient
-from quantum.common.wsgi import Serializer
-
-HOST = '127.0.0.1'
-PORT = 9696
-USE_SSL = False
-
-TENANT_ID = 'totore'
-FORMAT = "json"
-
-test_network1_data = \
-    {'network': {'network-name': 'test1'}}
-test_network2_data = \
-    {'network': {'network-name': 'test2'}}
-
-
-def print_response(res):
-    content = res.read()
-    print "Status: %s" % res.status
-    print "Content: %s" % content
-    return content
-
-
-class QuantumTest(unittest.TestCase):
-    def setUp(self):
-        self.client = MiniClient(HOST, PORT, USE_SSL)
-
-    #def create_network(self, data, tenant_id=TENANT_ID):
-    #    content_type = "application/%s" % FORMAT
-    #    body = Serializer().serialize(data, content_type)
-    #    res = self.client.do_request(tenant_id, 'POST', "/networks." + FORMAT,
-    #      body=body)
-    #    self.assertEqual(res.status, 200, "bad response: %s" % res.read())
-
-    #def test_listNetworks(self):
-    #    self.create_network(test_network1_data)
-    #    self.create_network(test_network2_data)
-    #    res = self.client.do_request(TENANT_ID, 'GET', "/networks." + FORMAT)
-    #    self.assertEqual(res.status, 200, "bad response: %s" % res.read())
-
-    #def test_getNonexistentNetwork(self):
-        # TODO(bgh): parse exception and make sure it is NetworkNotFound
-        #try:
-        #    res = self.client.do_request(TENANT_ID, 'GET',
-        #      "/networks/%s.%s" % ("8675309", "xml"))
-        #    self.assertEqual(res.status, 400)
-        #except Exception, e:
-        #    print "Caught exception: %s" % (str(e))
-
-    #def test_deleteNonexistentNetwork(self):
-        # TODO(bgh): parse exception and make sure it is NetworkNotFound
-        #try:
-        #    res = self.client.do_request(TENANT_ID, 'DELETE',
-        #      "/networks/%s.%s" % ("8675309", "xml"))
-        #    self.assertEqual(res.status, 400)
-        #except Exception, e:
-        #    print "Caught exception: %s" % (str(e))
-
-    #def test_createNetwork(self):
-        #self.create_network(test_network1_data)
-
-    #def test_createPort(self):
-        #self.create_network(test_network1_data)
-        #res = self.client.do_request(TENANT_ID, 'GET', "/networks." + FORMAT)
-        #resdict = json.loads(res.read())
-        #for n in resdict["networks"]:
-        #    net_id = n["id"]
-
-            # Step 1 - List Ports for network (should not find any)
-            #res = self.client.do_request(TENANT_ID, 'GET',
-            #  "/networks/%s/ports.%s" % (net_id, FORMAT))
-            #output = res.read()
-            #self.assertEqual(res.status, 200, "Bad response: %s" % output)
-            #if len(output) > 0:
-            #    resdict = json.loads(output)
-            #    self.assertTrue(len(resdict["ports"]) == 0,
-            #      "Found unexpected ports: %s" % output)
-            #else:
-            #    self.assertTrue(len(output) == 0,
-            #      "Found unexpected ports: %s" % output)
-
-            # Step 2 - Create Port for network
-            #res = self.client.do_request(TENANT_ID, 'POST',
-            #  "/networks/%s/ports.%s" % (net_id, FORMAT))
-            #self.assertEqual(res.status, 200, "Bad response: %s" % output)
-
-            # Step 3 - List Ports for network (again); should find one
-            #res = self.client.do_request(TENANT_ID, 'GET',
-            #  "/networks/%s/ports.%s" % (net_id, FORMAT))
-            #output = res.read()
-            #self.assertEqual(res.status, 200, "Bad response: %s" % output)
-            #resdict = json.loads(output)
-            #ids = []
-            #for p in resdict["ports"]:
-            #    ids.append(p["id"])
-            #self.assertTrue(len(ids) == 1,
-            #  "Didn't find expected # of ports (1): %s" % ids)
-
-    #def test_getAttachment(self):
-        #self.create_network(test_network1_data)
-        #res = self.client.do_request(TENANT_ID, 'GET', "/networks." + FORMAT)
-        #resdict = json.loads(res.read())
-        #for n in resdict["networks"]:
-        #    net_id = n["id"]
-
-            # Step 1 - Create Port for network and attempt to get the
-            # attachment (even though there isn't one)
-            #res = self.client.do_request(TENANT_ID, 'POST',
-            #  "/networks/%s/ports.%s" % (net_id, FORMAT))
-            #output = res.read()
-            #self.assertEqual(res.status, 200, "Bad response: %s" % output)
-            #resdict = json.loads(output)
-            #port_id = resdict["ports"]["port"]["id"]
-
-            #res = self.client.do_request(TENANT_ID, 'GET',
-            #  "/networks/%s/ports/%s/attachment.%s" % (net_id, port_id,
-            #    FORMAT))
-            #output = res.read()
-            #self.assertEqual(res.status, 200, "Bad response: %s" % output)
-
-            # Step 2 - Add an attachment
-            #data = {'port': {'attachment-id': 'fudd'}}
-            #content_type = "application/" + FORMAT
-            #body = Serializer().serialize(data, content_type)
-            #res = self.client.do_request(TENANT_ID, 'PUT',
-            #  "/networks/%s/ports/%s/attachment.%s" % (net_id, port_id,
-            #    FORMAT), body=body)
-            #output = res.read()
-            #self.assertEqual(res.status, 202, "Bad response: %s" % output)
-
-            # Step 3 - Fetch the attachment
-            #res = self.client.do_request(TENANT_ID, 'GET',
-            #  "/networks/%s/ports/%s/attachment.%s" % (net_id, port_id,
-            #    FORMAT))
-            #output = res.read()
-            #self.assertEqual(res.status, 200, "Bad response: %s" % output)
-            #resdict = json.loads(output)
-            #attachment = resdict["attachment"]
-            #self.assertEqual(attachment, "fudd", "Attachment: %s"
-            #% attachment)
-
-    #def test_renameNetwork(self):
-        #self.create_network(test_network1_data)
-        #res = self.client.do_request(TENANT_ID, 'GET', "/networks." + FORMAT)
-        #resdict = json.loads(res.read())
-        #net_id = resdict["networks"][0]["id"]
-
-        #data = test_network1_data.copy()
-        #data['network']['network-name'] = 'test_renamed'
-        #content_type = "application/" + FORMAT
-        #body = Serializer().serialize(data, content_type)
-        #res = self.client.do_request(TENANT_ID, 'PUT',
-          #"/networks/%s.%s" % (net_id, FORMAT), body=body)
-        #resdict = json.loads(res.read())
-        #self.assertTrue(resdict["networks"]["network"]["id"] == net_id,
-          #"Network_rename: renamed network has a different uuid")
-        #self.assertTrue(
-            #resdict["networks"]["network"]["name"] == "test_renamed",
-            #"Network rename didn't take effect")
-
-    #def test_createNetworkOnMultipleTenants(self):
-        # Create the same network on multiple tenants
-        #self.create_network(test_network1_data, "tenant1")
-        #self.create_network(test_network1_data, "tenant2")
-
-    #def delete_networks(self, tenant_id=TENANT_ID):
-        # Remove all the networks created on the tenant (including ports and
-        # attachments)
-        #res = self.client.do_request(tenant_id, 'GET',
-        #  "/networks." + FORMAT)
-        #resdict = json.loads(res.read())
-        #for n in resdict["networks"]:
-        #    net_id = n["id"]
-        #    # Delete all the ports
-        #    res = self.client.do_request(tenant_id, 'GET',
-        #      "/networks/%s/ports.%s" % (net_id, FORMAT))
-        #    output = res.read()
-        #    self.assertEqual(res.status, 200, "Bad response: %s" % output)
-        #    resdict = json.loads(output)
-        #    ids = []
-        #    for p in resdict["ports"]:
-        #        res = self.client.do_request(tenant_id, 'DELETE',
-        #          "/networks/%s/ports/%s/attachment.%s" % (net_id, p["id"],
-        #            FORMAT))
-        #        res = self.client.do_request(tenant_id, 'DELETE',
-        #          "/networks/%s/ports/%s.%s" % (net_id, p["id"], FORMAT))
-            # Now, remove the network
-        #    res = self.client.do_request(tenant_id, 'DELETE',
-        #      "/networks/" + net_id + "." + FORMAT)
-        #    self.assertEqual(res.status, 202)
-
-    def tearDown(self):
-        self.delete_networks()
-
-# Standard boilerplate to call the main() function.
-if __name__ == '__main__':
-    suite = unittest.TestLoader().loadTestsFromTestCase(QuantumTest)
-    unittest.TextTestRunner(verbosity=2).run(suite)