]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Allow nvp_api to load balance requests
authorAaron Rosen <arosen@nicira.com>
Wed, 31 Oct 2012 04:35:42 +0000 (21:35 -0700)
committerAaron Rosen <arosen@nicira.com>
Tue, 8 Jan 2013 05:39:32 +0000 (00:39 -0500)
The current version of the nvp_api client does not load balance
requests across multiple controllers. Instead, it just sends all the requests
to one controller and if there is a controller failure it will failover
to use another controller. This blueprint implements the ablility to
utilize all controllers at once.

blueprint nvp-api-client-loadbalance-request

Change-Id: I331be2a23ae360a95786152d5f116359f690d9f3

quantum/plugins/nicira/nicira_nvp_plugin/NvpApiClient.py
quantum/plugins/nicira/nicira_nvp_plugin/QuantumPlugin.py
quantum/plugins/nicira/nicira_nvp_plugin/README
quantum/plugins/nicira/nicira_nvp_plugin/api_client/__init__.py
quantum/plugins/nicira/nicira_nvp_plugin/api_client/client.py
quantum/plugins/nicira/nicira_nvp_plugin/api_client/client_eventlet.py
quantum/plugins/nicira/nicira_nvp_plugin/api_client/common.py
quantum/plugins/nicira/nicira_nvp_plugin/api_client/request.py
quantum/plugins/nicira/nicira_nvp_plugin/api_client/request_eventlet.py
quantum/plugins/nicira/nicira_nvp_plugin/common/config.py
quantum/tests/unit/nicira/test_nvp_api_request_eventlet.py

index 0c3a65eeb8a7da4539a5ed653bd6475a3f309656..48a8d60e922dcab92e081491d734ad51ec8ea291 100644 (file)
@@ -36,8 +36,8 @@ class NVPApiHelper(client_eventlet.NvpApiClientEventlet):
     '''
 
     def __init__(self, api_providers, user, password, request_timeout,
-                 http_timeout, retries, redirects, failover_time,
-                 concurrent_connections=3):
+                 http_timeout, retries, redirects,
+                 concurrent_connections=3, nvp_gen_timeout=-1):
         '''Constructor.
 
         :param api_providers: a list of tuples in the form:
@@ -53,12 +53,10 @@ class NVPApiHelper(client_eventlet.NvpApiClientEventlet):
             controller in the cluster)
         :param retries: the number of concurrent connections.
         :param redirects: the number of concurrent connections.
-        :param failover_time: minimum time between controller failover and new
-            connections allowed.
         '''
         client_eventlet.NvpApiClientEventlet.__init__(
             self, api_providers, user, password, concurrent_connections,
-            failover_time=failover_time)
+            nvp_gen_timeout)
 
         self._request_timeout = request_timeout
         self._http_timeout = http_timeout
@@ -84,7 +82,7 @@ class NVPApiHelper(client_eventlet.NvpApiClientEventlet):
         if password:
             self._password = password
 
-        return client_eventlet.NvpApiClientEventlet.login(self)
+        return client_eventlet.NvpApiClientEventlet._login(self)
 
     def request(self, method, url, body="", content_type="application/json"):
         '''Issues request to controller.'''
index 19b7a6380e206ff108eac10104011fd4315d2a38..9daefca066dc3e6e011b69eb419f640b8cd3eebe 100644 (file)
@@ -27,7 +27,6 @@ import webob.exc
 
 # FIXME(salvatore-orlando): get rid of relative imports
 from common import config
-from quantum.plugins.nicira.nicira_nvp_plugin.api_client import client_eventlet
 from nvp_plugin_version import PLUGIN_VERSION
 
 from quantum.plugins.nicira.nicira_nvp_plugin import nicira_models
@@ -157,12 +156,11 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2):
                 http_timeout=cluster.http_timeout,
                 retries=cluster.retries,
                 redirects=cluster.redirects,
-                failover_time=self.nvp_opts.failover_time,
-                concurrent_connections=self.nvp_opts.concurrent_connections)
+                concurrent_connections=self.nvp_opts['concurrent_connections'],
+                nvp_gen_timeout=self.nvp_opts['nvp_gen_timeout'])
 
-            # TODO(salvatore-orlando): do login at first request,
-            # not when plugin is instantiated
-            cluster.api_client.login()
+            if len(self.clusters) == 0:
+                first_cluster = cluster
             self.clusters[c_opts['name']] = cluster
 
         def_cluster_name = self.nvp_opts.default_cluster_name
index caefb7cbead563c5efd581bf8f2154c397d57f53..ca4eed63c13e52b77f8cffc4ab3f4966f78313a1 100644 (file)
@@ -21,8 +21,8 @@ NVP Plugin configuration
     bridged transport zone (default 64)
     - concurrent_connections: Number of connects to each controller node
     (default 3)
-    - failover_time: Time from when a connection pool is switched to another
-    controller during failures.
+    - nvp_gen_timout: Number of seconds a generation id should be valid for
+    (default -1 meaning do not time out)
     3) NVP cluster
     The Quantum NVP plugin allow for configuring multiple clusters.
     Each cluster configuration section must be declared in the following way
index f76d4a9610e2c7bd901dd3ff35b9ce76808b87ff..87f79ba0559b02038226bc92ab6285bab164b69f 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright 2012 Nicira Networks, Inc.
+# Copyright 2012 Nicira, Inc.
 # All Rights Reserved
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -14,3 +14,5 @@
 #    under the License.
 #
 # vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# @author: Aaron Rosen, Nicira Networks, Inc.
index e3cc9d1d0d86f3515382fcc26cd99e84135e5f4a..a0cdb81b090cfa68176214f9054a285cb38102c9 100644 (file)
@@ -1,25 +1,40 @@
-# Copyright 2009-2012 Nicira Networks, Inc.
+# Copyright 2012 Nicira, Inc.
 # All Rights Reserved
 #
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
 #
-#         http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
 #
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
 #
 # vim: tabstop=4 shiftwidth=4 softtabstop=4
 #
-# Author: David Lapsley <dlapsley@nicira.com>, Nicira Networks, Inc.
+# @author: David Lapsley <dlapsley@nicira.com>, Nicira Networks, Inc.
+# @author: Aaron Rosen, Nicira Networks, Inc.
+
 
 from abc import ABCMeta
-from abc import abstractmethod
-from abc import abstractproperty
+import httplib
+import time
+import logging
+
+
+from quantum.plugins.nicira.nicira_nvp_plugin.api_client.common import (
+    _conn_str)
+
+
+logging.basicConfig(level=logging.INFO)
+LOG = logging.getLogger(__name__)
+#Default parameters.
+GENERATION_ID_TIMEOUT = -1
+DEFAULT_CONCURRENT_CONNECTIONS = 3
+DEFAULT_CONNECT_TIMEOUT = 5
 
 
 class NvpApiClient(object):
@@ -33,38 +48,230 @@ class NvpApiClient(object):
 
     CONN_IDLE_TIMEOUT = 60 * 15
 
-    @abstractmethod
-    def update_providers(self, api_providers):
-        pass
+    def _create_connection(self, host, port, is_ssl):
+        if is_ssl:
+            return httplib.HTTPSConnection(host, port,
+                                           timeout=self._connect_timeout)
+        return httplib.HTTPConnection(host, port,
+                                      timeout=self._connect_timeout)
+
+    @staticmethod
+    def _conn_params(http_conn):
+        is_ssl = isinstance(http_conn, httplib.HTTPSConnection)
+        return (http_conn.host, http_conn.port, is_ssl)
 
-    @abstractproperty
+    @property
     def user(self):
-        pass
+        return self._user
 
-    @abstractproperty
+    @property
     def password(self):
-        pass
+        return self._password
+
+    @property
+    def nvp_config_gen(self):
+        # If nvp_gen_timeout is not -1 then:
+        # Maintain a timestamp along with the generation ID.  Hold onto the
+        # ID long enough to be useful and block on sequential requests but
+        # not long enough to persist when Onix db is cleared, which resets
+        # the generation ID, causing the DAL to block indefinitely with some
+        # number that's higher than the cluster's value.
+        if self._nvp_gen_timeout != -1:
+            ts = self._nvp_config_gen_ts
+            if ts is not None:
+                if (time.time() - ts) > self._nvp_gen_timeout:
+                    return None
+        return self._nvp_config_gen
+
+    @nvp_config_gen.setter
+    def nvp_config_gen(self, value):
+        if self._nvp_config_gen != value:
+            if self._nvp_gen_timeout != -1:
+                self._nvp_config_gen_ts = time.time()
+        self._nvp_config_gen = value
+
+    def auth_cookie(self, conn):
+        cookie = None
+        data = self._get_provider_data(conn)
+        if data:
+            cookie = data[1]
+        return cookie
+
+    def set_auth_cookie(self, conn, cookie):
+        data = self._get_provider_data(conn)
+        if data:
+            self._set_provider_data(conn, (data[0], cookie))
+
+    def acquire_connection(self, auto_login=True, headers=None, rid=-1):
+        '''Check out an available HTTPConnection instance.
+
+        Blocks until a connection is available.
+        :auto_login: automatically logins before returning conn
+        :headers: header to pass on to login attempt
+        :param rid: request id passed in from request eventlet.
+        :returns: An available HTTPConnection instance or None if no
+                 api_providers are configured.
+        '''
+        if not self._api_providers:
+            LOG.warn(_("[%d] no API providers currently available."), rid)
+            return None
+        if self._conn_pool.empty():
+            LOG.debug(_("[%d] Waiting to acquire API client connection."), rid)
+        priority, conn = self._conn_pool.get()
+        now = time.time()
+        if getattr(conn, 'last_used', now) < now - self.CONN_IDLE_TIMEOUT:
+            LOG.info(_("[%d] Connection %s idle for %0.2f seconds; "
+                       "reconnecting."),
+                     rid, _conn_str(conn), now - conn.last_used)
+            conn = self._create_connection(*self._conn_params(conn))
 
-    @abstractproperty
-    def auth_cookie(self):
-        pass
+        conn.last_used = now
+        conn.priority = priority  # stash current priority for release
+        qsize = self._conn_pool.qsize()
+        LOG.debug(_("[%d] Acquired connection %s. %d connection(s) "
+                    "available."), rid, _conn_str(conn), qsize)
+        if auto_login and self.auth_cookie(conn) is None:
+            self._wait_for_login(conn, headers)
+        return conn
 
-    @abstractmethod
-    def acquire_connection(self):
-        pass
+    def release_connection(self, http_conn, bad_state=False,
+                           service_unavail=False, rid=-1):
+        '''Mark HTTPConnection instance as available for check-out.
 
-    @abstractmethod
-    def release_connection(self, http_conn, bad_state=False):
-        pass
+        :param http_conn: An HTTPConnection instance obtained from this
+            instance.
+        :param bad_state: True if http_conn is known to be in a bad state
+                (e.g. connection fault.)
+        :service_unavail: True if http_conn returned 503 response.
+        :param rid: request id passed in from request eventlet.
+        '''
+        conn_params = self._conn_params(http_conn)
+        if self._conn_params(http_conn) not in self._api_providers:
+            LOG.debug(_("[%d] Released connection '%s' is not an API provider "
+                        "for the cluster"), rid, _conn_str(http_conn))
+            return
+        elif hasattr(http_conn, "no_release"):
+            return
 
-    @abstractproperty
-    def need_login(self):
-        pass
+        if bad_state:
+            # Reconnect to provider.
+            LOG.warn(_("[%d] Connection returned in bad state, reconnecting "
+                       "to %s"), rid, _conn_str(http_conn))
+            http_conn = self._create_connection(*self._conn_params(http_conn))
+            priority = self._next_conn_priority
+            self._next_conn_priority += 1
+        elif service_unavail:
+            # http_conn returned a service unaviable response, put other
+            # connections to the same controller at end of priority queue,
+            conns = []
+            while not self._conn_pool.empty():
+                priority, conn = self._conn_pool.get()
+                if self._conn_params(conn) == conn_params:
+                    priority = self._next_conn_priority
+                    self._next_conn_priority += 1
+                conns.append((priority, conn))
+            for priority, conn in conns:
+                self._conn_pool.put((priority, conn))
+            # put http_conn at end of queue also
+            priority = self._next_conn_priority
+            self._next_conn_priority += 1
+        else:
+            priority = http_conn.priority
+
+        self._conn_pool.put((priority, http_conn))
+        LOG.debug(_("[%d] Released connection %s. %d connection(s) "
+                    "available."),
+                  rid, _conn_str(http_conn), self._conn_pool.qsize())
+
+    def _wait_for_login(self, conn, headers=None):
+        '''Block until a login has occurred for the current API provider.'''
+
+        data = self._get_provider_data(conn)
+        if data is None:
+            LOG.error(_("Login request for an invalid connection: '%s'"),
+                      _conn_str(conn))
+            return
+        provider_sem = data[0]
+        if provider_sem.acquire(blocking=False):
+            try:
+                cookie = self._login(conn, headers)
+                self.set_auth_cookie(conn, cookie)
+            finally:
+                provider_sem.release()
+        else:
+            LOG.debug(_("Waiting for auth to complete"))
+            # Wait until we can aquire then release
+            provider_sem.acquire(blocking=True)
+            provider_sem.release()
+
+    def _get_provider_data(self, conn_or_conn_params, default=None):
+        """Get data for specified API provider.
+
+        Args:
+            conn_or_conn_params: either a HTTP(S)Connection object or the
+                resolved conn_params tuple returned by self._conn_params().
+            default: conn_params if ones passed aren't known
+        Returns: Data associated with specified provider
+        """
+        conn_params = self._normalize_conn_params(conn_or_conn_params)
+        return self._api_provider_data.get(conn_params, default)
+
+    def _set_provider_data(self, conn_or_conn_params, data):
+        """Set data for specified API provider.
+
+        Args:
+            conn_or_conn_params: either a HTTP(S)Connection object or the
+                resolved conn_params tuple returned by self._conn_params().
+            data: data to associate with API provider
+        """
+        conn_params = self._normalize_conn_params(conn_or_conn_params)
+        if data is None:
+            del self._api_provider_data[conn_params]
+        else:
+            self._api_provider_data[conn_params] = data
+
+    def _normalize_conn_params(self, conn_or_conn_params):
+        """Normalize conn_param tuple.
+
+        Args:
+            conn_or_conn_params: either a HTTP(S)Connection object or the
+                resolved conn_params tuple returned by self._conn_params().
+
+        Returns: Normalized conn_param tuple
+        """
+        if (not isinstance(conn_or_conn_params, tuple) and
+            not isinstance(conn_or_conn_params, httplib.HTTPConnection)):
+            LOG.debug(_("Invalid conn_params value: '%s'"),
+                      str(conn_or_conn_params))
+            return conn_or_conn_params
+        if isinstance(conn_or_conn_params, httplib.HTTPConnection):
+            conn_params = self._conn_params(conn_or_conn_params)
+        else:
+            conn_params = conn_or_conn_params
+        host, port, is_ssl = conn_params
+        if port is None:
+            port = 443 if is_ssl else 80
+        return (host, port, is_ssl)
+
+    def update_providers(self, api_providers):
+        new_providers = set([tuple(p) for p in api_providers])
+        if new_providers != self._api_providers:
+            new_conns = []
+            while not self._conn_pool.empty():
+                priority, conn = self._conn_pool.get_nowait()
+                if self._conn_params(conn) in new_providers:
+                    new_conns.append((priority, conn))
 
-    @abstractmethod
-    def wait_for_login(self):
-        pass
+            to_subtract = self._api_providers - new_providers
+            for p in to_subtract:
+                self._set_provider_data(p, None)
+            to_add = new_providers - self._api_providers
+            for unused_i in range(self._concurrent_connections):
+                for host, port, is_ssl in to_add:
+                    conn = self._create_connection(host, port, is_ssl)
+                    new_conns.append((self._next_conn_priority, conn))
+                    self._next_conn_priority += 1
 
-    @abstractmethod
-    def login(self):
-        pass
+            for priority, conn in new_conns:
+                self._conn_pool.put((priority, conn))
+            self._api_providers = new_providers
index 75d0347669928cf9737b4dae7e8e93a48ebfef2d..8e10ee12855e2ca724661084f2169688d60db24e 100644 (file)
@@ -1,53 +1,44 @@
-# Copyright 2009-2012 Nicira Networks, Inc.
+# Copyright 2012 Nicira, Inc.
 # All Rights Reserved
 #
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
 #
-#         http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
 #
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
 #
 # vim: tabstop=4 shiftwidth=4 softtabstop=4
 #
+# @author: Aaron Rosen, Nicira Networks, Inc.
+
 
-import client
 import eventlet
-import httplib
 import logging
-import request_eventlet
 import time
 
-from common import _conn_str
+from quantum.plugins.nicira.nicira_nvp_plugin.api_client import client
+from quantum.plugins.nicira.nicira_nvp_plugin.api_client import (
+    request_eventlet)
 
 eventlet.monkey_patch()
 
 logging.basicConfig(level=logging.INFO)
 LOG = logging.getLogger(__name__)
 
-# Default parameters.
-DEFAULT_FAILOVER_TIME = 5
-DEFAULT_CONCURRENT_CONNECTIONS = 3
-DEFAULT_CONNECT_TIMEOUT = 5
-GENERATION_ID_TIMEOUT = -1  # if set to -1 then disabled
-
 
-class NvpApiClientEventlet(object):
+class NvpApiClientEventlet(client.NvpApiClient):
     '''Eventlet-based implementation of NvpApiClient ABC.'''
 
-    CONN_IDLE_TIMEOUT = 60 * 15
-
     def __init__(self, api_providers, user, password,
-                 concurrent_connections=DEFAULT_CONCURRENT_CONNECTIONS,
-                 use_https=True,
-                 connect_timeout=DEFAULT_CONNECT_TIMEOUT,
-                 failover_time=DEFAULT_FAILOVER_TIME,
-                 nvp_gen_timeout=GENERATION_ID_TIMEOUT):
+                 concurrent_connections=client.DEFAULT_CONCURRENT_CONNECTIONS,
+                 nvp_gen_timeout=client.GENERATION_ID_TIMEOUT, use_https=True,
+                 connect_timeout=client.DEFAULT_CONNECT_TIMEOUT):
         '''Constructor
 
         :param api_providers: a list of tuples of the form: (host, port,
@@ -57,242 +48,112 @@ class NvpApiClientEventlet(object):
         :param concurrent_connections: total number of concurrent connections.
         :param use_https: whether or not to use https for requests.
         :param connect_timeout: connection timeout in seconds.
-        :param failover_time: time from when a connection pool is switched to
-            the next connection released via acquire_connection().
         :param nvp_gen_timeout controls how long the generation id is kept
             if set to -1 the generation id is never timed out
         '''
         if not api_providers:
             api_providers = []
         self._api_providers = set([tuple(p) for p in api_providers])
+        self._api_provider_data = {}  # tuple(semaphore, nvp_session_cookie)
+        for p in self._api_providers:
+            self._set_provider_data(p, (eventlet.semaphore.Semaphore(1), None))
         self._user = user
         self._password = password
         self._concurrent_connections = concurrent_connections
         self._use_https = use_https
         self._connect_timeout = connect_timeout
-        self._failover_time = failover_time
         self._nvp_config_gen = None
         self._nvp_config_gen_ts = None
         self._nvp_gen_timeout = nvp_gen_timeout
 
         # Connection pool is a list of queues.
-        self._conn_pool = list()
-        conn_pool_idx = 0
+        self._conn_pool = eventlet.queue.PriorityQueue()
+        self._next_conn_priority = 1
         for host, port, is_ssl in api_providers:
-            provider_conn_pool = eventlet.queue.Queue(
-                maxsize=concurrent_connections)
             for i in range(concurrent_connections):
-                # All connections in a provider_conn_poool have the
-                # same priority (they connect to the same server).
                 conn = self._create_connection(host, port, is_ssl)
-                conn.idx = conn_pool_idx
-                provider_conn_pool.put(conn)
-
-            self._conn_pool.append(provider_conn_pool)
-            conn_pool_idx += 1
-
-        self._active_conn_pool_idx = 0
-
-        self._cookie = None
-        self._need_login = True
-        self._doing_login_sem = eventlet.semaphore.Semaphore(1)
-
-    def _create_connection(self, host, port, is_ssl):
-        if is_ssl:
-            return httplib.HTTPSConnection(host, port,
-                                           timeout=self._connect_timeout)
-        return httplib.HTTPConnection(host, port,
-                                      timeout=self._connect_timeout)
-
-    @staticmethod
-    def _conn_params(http_conn):
-        is_ssl = isinstance(http_conn, httplib.HTTPSConnection)
-        return (http_conn.host, http_conn.port, is_ssl)
-
-    def update_providers(self, api_providers):
-        raise Exception(_('update_providers() not implemented.'))
-
-    @property
-    def user(self):
-        return self._user
-
-    @property
-    def password(self):
-        return self._password
-
-    @property
-    def nvp_config_gen(self):
-        # If nvp_gen_timeout is not -1 then:
-        # Maintain a timestamp along with the generation ID.  Hold onto the
-        # ID long enough to be useful and block on sequential requests but
-        # not long enough to persist when Onix db is cleared, which resets
-        # the generation ID, causing the DAL to block indefinitely with some
-        # number that's higher than the cluster's value.
-        if self._nvp_gen_timeout != -1:
-            ts = self._nvp_config_gen_ts
-            if ts is not None:
-                if (time.time() - ts) > self._nvp_gen_timeout:
-                    return None
-        return self._nvp_config_gen
-
-    @nvp_config_gen.setter
-    def nvp_config_gen(self, value):
-        if self._nvp_config_gen != value:
-            if self._nvp_gen_timeout != -1:
-                self._nvp_config_gen_ts = time.time()
-        self._nvp_config_gen = value
-
-    @property
-    def auth_cookie(self):
-        return self._cookie
-
-    def acquire_connection(self, rid=-1):
-        '''Check out an available HTTPConnection instance.
-
-        Blocks until a connection is available.
-
-        :param rid: request id passed in from request eventlet.
-        :returns: An available HTTPConnection instance or None if no
-                 api_providers are configured.
-        '''
-        if not self._api_providers:
-            LOG.warn(_("[%d] no API providers currently available."), rid)
-            return None
-
-        # The sleep time is to give controllers time to become consistent after
-        # there has been a change in the controller used as the api_provider.
-        now = time.time()
-        if now < getattr(self, '_issue_conn_barrier', now):
-            LOG.warn(_("[%d] Waiting for failover timer to expire."), rid)
-            time.sleep(self._issue_conn_barrier - now)
-
-        # Print out a warning if all connections are in use.
-        if self._conn_pool[self._active_conn_pool_idx].empty():
-            LOG.debug(_("[%d] Waiting to acquire client connection."), rid)
-
-        # Try to acquire a connection (block in get() until connection
-        # available or timeout occurs).
-        active_conn_pool_idx = self._active_conn_pool_idx
-        conn = self._conn_pool[active_conn_pool_idx].get()
-
-        if active_conn_pool_idx != self._active_conn_pool_idx:
-            # active_conn_pool became inactive while we were waiting.
-            # Put connection back on old pool and try again.
-            LOG.warn(_("[%(rid)d] Active pool expired while waiting for "
-                       "connection: %(conn)s"),
-                     {'rid': rid, 'conn': _conn_str(conn)})
-            self._conn_pool[active_conn_pool_idx].put(conn)
-            return self.acquire_connection(rid=rid)
-
-        # Check if the connection has been idle too long.
-        now = time.time()
-        if getattr(conn, 'last_used', now) < now - self.CONN_IDLE_TIMEOUT:
-            LOG.info(_("[%(rid)d] Connection %(conn)s idle for %(sec)0.2f "
-                       "seconds; reconnecting."),
-                     {'rid': rid, 'conn': _conn_str(conn),
-                      'sec': now - conn.last_used})
-            conn = self._create_connection(*self._conn_params(conn))
-
-            # Stash conn pool so conn knows where to go when it releases.
-            conn.idx = self._active_conn_pool_idx
-
-        conn.last_used = now
-        qsize = self._conn_pool[self._active_conn_pool_idx].qsize()
-        LOG.debug(_("[%(rid)d] Acquired connection %(conn)s. %(qsize)d "
-                    "connection(s) available."),
-                  {'rid': rid, 'conn': _conn_str(conn), 'qsize': qsize})
-        return conn
-
-    def release_connection(self, http_conn, bad_state=False, rid=-1):
-        '''Mark HTTPConnection instance as available for check-out.
-
-        :param http_conn: An HTTPConnection instance obtained from this
-            instance.
-        :param bad_state: True if http_conn is known to be in a bad state
-                (e.g. connection fault.)
-        :param rid: request id passed in from request eventlet.
-        '''
-        if self._conn_params(http_conn) not in self._api_providers:
-            LOG.warn(_("[%(rid)d] Released connection '%(conn)s' is not an "
-                       "API provider for the cluster"),
-                     {'rid': rid, 'conn': _conn_str(http_conn)})
-            return
-
-        # Retrieve "home" connection pool.
-        conn_pool_idx = http_conn.idx
-        conn_pool = self._conn_pool[conn_pool_idx]
-        if bad_state:
-            # Reconnect to provider.
-            LOG.warn(_("[%(rid)d] Connection returned in bad state, "
-                       "reconnecting to %(conn)s"),
-                     {'rid': rid, 'conn': _conn_str(http_conn)})
-            http_conn = self._create_connection(*self._conn_params(http_conn))
-            http_conn.idx = conn_pool_idx
-
-            if self._active_conn_pool_idx == http_conn.idx:
-                # This pool is no longer in a good state. Switch to next pool.
-                self._active_conn_pool_idx += 1
-                self._active_conn_pool_idx %= len(self._conn_pool)
-                LOG.warn(_("[%(rid)d] Switched active_conn_pool from "
-                           "%(idx)d to %(pool_idx)d."),
-                         {'rid': rid, 'idx': http_conn.idx,
-                          'pool_idx': self._active_conn_pool_idx})
-
-                # No connections to the new provider allowed until after this
-                # timer has expired (allow time for synchronization).
-                self._issue_conn_barrier = time.time() + self._failover_time
-
-        conn_pool.put(http_conn)
-        LOG.debug(_("[%(rid)d] Released connection %(conn)s. "
-                    "%(qsize)d connection(s) available."),
-                  {'rid': rid, 'conn': _conn_str(http_conn),
-                   'qsize': conn_pool.qsize()})
-
-    @property
-    def need_login(self):
-        return self._need_login
-
-    @need_login.setter
-    def need_login(self, val=True):
-        self._need_login = val
-
-    def wait_for_login(self):
-        '''Block until a login has occurred for the current API provider.'''
-        if self._need_login:
-            if self._doing_login_sem.acquire(blocking=False):
-                self.login()
-                self._doing_login_sem.release()
-            else:
-                LOG.debug(_("Waiting for auth to complete"))
-                self._doing_login_sem.acquire()
-                self._doing_login_sem.release()
-        return self._cookie
-
-    def login(self):
+                self._conn_pool.put((self._next_conn_priority, conn))
+                self._next_conn_priority += 1
+
+    def acquire_redirect_connection(self, conn_params, auto_login=True,
+                                    headers=None):
+        """Check out or create connection to redirected NVP API server.
+
+        Args:
+            conn_params: tuple specifying target of redirect, see
+                self._conn_params()
+            auto_login: returned connection should have valid session cookie
+            headers: headers to pass on if auto_login
+
+        Returns: An available HTTPConnection instance corresponding to the
+                 specified conn_params. If a connection did not previously
+                 exist, new connections are created with the highest prioity
+                 in the connection pool and one of these new connections
+                 returned.
+        """
+        result_conn = None
+        data = self._get_provider_data(conn_params)
+        if data:
+            # redirect target already exists in provider data and connections
+            # to the provider have been added to the connection pool. Try to
+            # obtain a connection from the pool, note that it's possible that
+            # all connection to the provider are currently in use.
+            conns = []
+            while not self._conn_pool.empty():
+                priority, conn = self._conn_pool.get_nowait()
+                if not result_conn and self._conn_params(conn) == conn_params:
+                    conn.priority = priority
+                    result_conn = conn
+                else:
+                    conns.append((priority, conn))
+            for priority, conn in conns:
+                self._conn_pool.put((priority, conn))
+            # hack: if no free connections available, create new connection
+            # and stash "no_release" attribute (so that we only exceed
+            # self._concurrent_connections temporarily)
+            if not result_conn:
+                conn = self._create_connection(*conn_params)
+                conn.priority = 0  # redirect connections ahve highest priority
+                conn.no_release = True
+                result_conn = conn
+        else:
+            #redirect target not already known, setup provider lists
+            self._api_providers.update([conn_params])
+            self._set_provider_data(conn_params,
+                                    (eventlet.semaphore.Semaphore(1), None))
+            # redirects occur during cluster upgrades, i.e. results to old
+            # redirects to new, so give redirect targets highest priority
+            priority = 0
+            for i in range(self._concurrent_connections):
+                conn = self._create_connection(*conn_params)
+                conn.priority = priority
+                if i == self._concurrent_connections - 1:
+                    break
+                self._conn_pool.put((priority, conn))
+            result_conn = conn
+        if result_conn:
+            result_conn.last_used = time.time()
+            if auto_login and self.auth_cookie(conn) is None:
+                self._wait_for_login(result_conn, headers)
+        return result_conn
+
+    def _login(self, conn=None, headers=None):
         '''Issue login request and update authentication cookie.'''
+        cookie = None
         g = request_eventlet.NvpLoginRequestEventlet(
-            self, self._user, self._password)
+            self, self._user, self._password, conn, headers)
         g.start()
         ret = g.join()
-
         if ret:
             if isinstance(ret, Exception):
                 LOG.error(_('NvpApiClient: login error "%s"'), ret)
                 raise ret
 
-            self._cookie = None
             cookie = ret.getheader("Set-Cookie")
             if cookie:
                 LOG.debug(_("Saving new authentication cookie '%s'"), cookie)
-                self._cookie = cookie
-                self._need_login = False
-
-        # TODO: or ret is an error.
-        if not ret:
-            return None
-
-        return self._cookie
 
+        return cookie
 
 # Register as subclass.
 client.NvpApiClient.register(NvpApiClientEventlet)
index 9cf39da6cc56f2d66ec17480282a0d87269f6ba3..bd61ca7b3a9b77d0100a43cba2d20997d590a3ba 100644 (file)
@@ -1,23 +1,24 @@
-# Copyright 2009-2012 Nicira Networks, Inc.
+# Copyright 2012 Nicira, Inc.
 # All Rights Reserved
 #
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
 #
-#         http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
 #
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
 #
 # vim: tabstop=4 shiftwidth=4 softtabstop=4
 #
+# @author: Aaron Rosen, Nicira Networks, Inc.
+
 
 import httplib
-import mock
 
 
 def _conn_str(conn):
@@ -25,8 +26,6 @@ def _conn_str(conn):
         proto = "https://"
     elif isinstance(conn, httplib.HTTPConnection):
         proto = "http://"
-    elif isinstance(conn, mock.Mock):
-        proto = "http://"
     else:
         raise TypeError(_('_conn_str() invalid connection type: %s') %
                         type(conn))
index fb18b5a158da8d42c4d81f61a4ef05c1ee874f7b..a398a31c551d5a6692f35892f3ed427228364f85 100644 (file)
@@ -1,27 +1,49 @@
-# Copyright 2009-2012 Nicira Networks, Inc.
+# Copyright 2012 Nicira, Inc.
 # All Rights Reserved
 #
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
 #
-#         http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
 #
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
 #
 # vim: tabstop=4 shiftwidth=4 softtabstop=4
 #
+# @author: Aaron Rosen, Nicira Networks, Inc.
+
 
 from abc import ABCMeta
 from abc import abstractmethod
-from abc import abstractproperty
+import copy
+import httplib
+import logging
+import time
+import urlparse
+
+from quantum.plugins.nicira.nicira_nvp_plugin.api_client.common import (
+    _conn_str)
+
+logging.basicConfig(level=logging.INFO)
+LOG = logging.getLogger(__name__)
 
+# Default parameters.
+DEFAULT_REQUEST_TIMEOUT = 30
+DEFAULT_HTTP_TIMEOUT = 10
+DEFAULT_RETRIES = 2
+DEFAULT_REDIRECTS = 2
+DEFAULT_API_REQUEST_POOL_SIZE = 1000
+DEFAULT_MAXIMUM_REQUEST_ID = 4294967295
+DOWNLOAD_TIMEOUT = 180  # The UI code has a coorespoind 190 sec timeout
+                        # for downloads, see: django/nvp_console/views.py
 
-class NvpApiRequest:
+
+class NvpApiRequest(object):
     '''An abstract baseclass for all ApiRequest implementations.
 
     This defines the interface and property structure for both eventlet and
@@ -30,6 +52,22 @@ class NvpApiRequest:
 
     __metaclass__ = ABCMeta
 
+    # List of allowed status codes.
+    ALLOWED_STATUS_CODES = [
+        httplib.OK,
+        httplib.CREATED,
+        httplib.NO_CONTENT,
+        httplib.MOVED_PERMANENTLY,
+        httplib.TEMPORARY_REDIRECT,
+        httplib.BAD_REQUEST,
+        httplib.UNAUTHORIZED,
+        httplib.FORBIDDEN,
+        httplib.NOT_FOUND,
+        httplib.CONFLICT,
+        httplib.INTERNAL_SERVER_ERROR,
+        httplib.SERVICE_UNAVAILABLE
+    ]
+
     @abstractmethod
     def start(self):
         pass
@@ -42,6 +80,200 @@ class NvpApiRequest:
     def copy(self):
         pass
 
-    @abstractproperty
+    def _issue_request(self):
+        '''Issue a request to a provider.'''
+        conn = (self._client_conn or
+                self._api_client.acquire_connection(True,
+                                                    copy.copy(self._headers),
+                                                    rid=self._rid()))
+        if conn is None:
+            error = Exception("No API connections available")
+            self._request_error = error
+            return error
+
+        url = self._url
+        LOG.debug(_("[%d] Issuing - request '%s'"),
+                  self._rid(), self._request_str(conn, url))
+        issued_time = time.time()
+        is_conn_error = False
+        is_conn_service_unavail = False
+        try:
+            redirects = 0
+            while (redirects <= self._redirects):
+                # Update connection with user specified request timeout,
+                # the connect timeout is usually smaller so we only set
+                # the request timeout after a connection is established
+                if conn.sock is None:
+                    conn.connect()
+                    conn.sock.settimeout(self._http_timeout)
+                elif conn.sock.gettimeout() != self._http_timeout:
+                    conn.sock.settimeout(self._http_timeout)
+
+                headers = copy.copy(self._headers)
+                cookie = self._api_client.auth_cookie(conn)
+                if cookie:
+                    headers["Cookie"] = cookie
+
+                gen = self._api_client.nvp_config_gen
+                if gen:
+                    headers["X-Nvp-Wait-For-Config-Generation"] = gen
+                    LOG.debug(_("Setting %s request header: '%s'"),
+                              'X-Nvp-Wait-For-Config-Generation', gen)
+                try:
+                    conn.request(self._method, url, self._body, headers)
+                except Exception as e:
+                    LOG.warn(_("[%d] Exception issuing request: '%s'"),
+                             self._rid(), e)
+                    raise e
+
+                response = conn.getresponse()
+                response.body = response.read()
+                response.headers = response.getheaders()
+                LOG.debug(_("[%d] Completed request '%s': %s (%0.2f seconds)"),
+                          self._rid(), self._request_str(conn, url),
+                          response.status, time.time() - issued_time)
+
+                new_gen = response.getheader('X-Nvp-Config-Generation', None)
+                if new_gen:
+                    LOG.debug(_("Reading '%s' response header: '%s'"),
+                              'X-Nvp-config-Generation', new_gen)
+                    if (self._api_client.nvp_config_gen is None or
+                        self._api_client.nvp_config_gen < int(new_gen)):
+                        self._api_client.nvp_config_gen = int(new_gen)
+
+                if response.status == httplib.UNAUTHORIZED:
+
+                    if cookie is None and self._url != "/ws.v1/login":
+                        # The connection still has no valid cookie despite
+                        # attemps to authenticate and the request has failed
+                        # with unauthorized status code. If this isn't a
+                        # a request to authenticate, we should abort the
+                        # request since there is no point in retrying.
+                        self._abort = True
+                    else:
+                        # If request is unauthorized, clear the session cookie
+                        # for the current provider so that subsequent requests
+                        # to the same provider triggers re-authentication.
+                        self._api_client.set_auth_cookie(conn, None)
+
+                    self._api_client.set_auth_cookie(conn, None)
+                elif response.status == httplib.SERVICE_UNAVAILABLE:
+                    is_conn_service_unavail = True
+
+                if response.status not in [httplib.MOVED_PERMANENTLY,
+                                           httplib.TEMPORARY_REDIRECT]:
+                    break
+                elif redirects >= self._redirects:
+                    LOG.info(_("[%d] Maximum redirects exceeded, aborting "
+                               "request"), self._rid())
+                    break
+                redirects += 1
+
+                conn, url = self._redirect_params(conn, response.headers,
+                                                  self._client_conn is None)
+                if url is None:
+                    response.status = httplib.INTERNAL_SERVER_ERROR
+                    break
+                LOG.info(_("[%d] Redirecting request to: '%s'"),
+                         self._rid(), self._request_str(conn, url))
+
+            # If we receive any of these responses, then
+            # our server did not process our request and may be in an
+            # errored state. Raise an exception, which will cause the
+            # the conn to be released with is_conn_error == True
+            # which puts the conn on the back of the client's priority
+            # queue.
+            if response.status >= 500:
+                LOG.warn(_("[%d] Request '%s %s' received: %s"),
+                         self._rid(), self._method, self._url,
+                         response.status)
+                raise Exception('Server error return: %s' %
+                                response.status)
+            return response
+        except Exception as e:
+            if isinstance(e, httplib.BadStatusLine):
+                msg = "Invalid server response"
+            else:
+                msg = unicode(e)
+            LOG.warn(_("[%d] Failed request '%s': '%s' (%0.2f seconds)"),
+                     self._rid(), self._request_str(conn, url), msg,
+                     time.time() - issued_time)
+            self._request_error = e
+            is_conn_error = True
+            return e
+        finally:
+            # Make sure we release the original connection provided by the
+            # acquire_connection() call above.
+            if self._client_conn is None:
+                self._api_client.release_connection(conn, is_conn_error,
+                                                    is_conn_service_unavail,
+                                                    rid=self._rid())
+
+    def _redirect_params(self, conn, headers, allow_release_conn=False):
+        """Process redirect response, create new connection if necessary.
+
+        Args:
+            conn: connection that returned the redirect response
+            headers: response headers of the redirect response
+            allow_release_conn: if redirecting to a different server,
+                release existing connection back to connection pool.
+
+        Returns: Return tuple(conn, url) where conn is a connection object
+            to the redirect target and url is the path of the API request
+        """
+
+        url = None
+        for name, value in headers:
+            if name.lower() == "location":
+                url = value
+                break
+        if not url:
+            LOG.warn(_("[%d] Received redirect status without location header"
+                       " field"), self._rid())
+            return (conn, None)
+        # Accept location with the following format:
+        # 1. /path, redirect to same node
+        # 2. scheme://hostname:[port]/path where scheme is https or http
+        # Reject others
+        # 3. e.g. relative paths, unsupported scheme, unspecified host
+        result = urlparse.urlparse(url)
+        if not result.scheme and not result.hostname and result.path:
+            if result.path[0] == "/":
+                if result.query:
+                    url = "%s?%s" % (result.path, result.query)
+                else:
+                    url = result.path
+                return (conn, url)      # case 1
+            else:
+                LOG.warn(_("[%d] Received invalid redirect location: '%s'"),
+                         self._rid(), url)
+                return (conn, None)     # case 3
+        elif result.scheme not in ["http", "https"] or not result.hostname:
+            LOG.warn(_("[%d] Received malformed redirect location: %s"),
+                     self._rid(), url)
+            return (conn, None)         # case 3
+        # case 2, redirect location includes a scheme
+        # so setup a new connection and authenticate
+        if allow_release_conn:
+            self._api_client.release_connection(conn)
+        conn_params = (result.hostname, result.port, result.scheme == "https")
+        conn = self._api_client.acquire_redirect_connection(conn_params, True,
+                                                            self._headers)
+        if result.query:
+            url = "%s?%s" % (result.path, result.query)
+        else:
+            url = result.path
+        return (conn, url)
+
+    def _rid(self):
+        '''Return current request id.'''
+        return self._request_id
+
+    @property
     def request_error(self):
-        pass
+        '''Return any errors associated with this instance.'''
+        return self._request_error
+
+    def _request_str(self, conn, url):
+        '''Return string representation of connection.'''
+        return "%s %s/%s" % (self._method, _conn_str(conn), url)
index e5f6f100d8be7197aae1d83cdf0da14df31c7d5a..1c878d3822d04c5dff1c95b11f96b369f3c97070 100644 (file)
@@ -1,75 +1,46 @@
-# Copyright 2009-2012 Nicira Networks, Inc.
+# Copyright 2012 Nicira, Inc.
 # All Rights Reserved
 #
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
 #
-#         http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
 #
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
 #
 # vim: tabstop=4 shiftwidth=4 softtabstop=4
 #
+# @author: Aaron Rosen, Nicira Networks, Inc.
+
 
-import copy
 import eventlet
 import httplib
 import json
 import logging
-import request
-import time
 import urllib
-import urlparse
 
-import client_eventlet
-from common import _conn_str
-from eventlet import timeout
+from quantum.plugins.nicira.nicira_nvp_plugin.api_client import request
 
 eventlet.monkey_patch()
-
 logging.basicConfig(level=logging.INFO)
-lg = logging.getLogger("nvp_api_request")
+LOG = logging.getLogger(__name__)
 USER_AGENT = "NVP eventlet client/1.0"
 
-# Default parameters.
-DEFAULT_REQUEST_TIMEOUT = 30
-DEFAULT_HTTP_TIMEOUT = 10
-DEFAULT_RETRIES = 2
-DEFAULT_REDIRECTS = 2
-DEFAULT_API_REQUEST_POOL_SIZE = 1000
-DEFAULT_MAXIMUM_REQUEST_ID = 4294967295
-
 
-class NvpApiRequestEventlet:
+class NvpApiRequestEventlet(request.NvpApiRequest):
     '''Eventlet-based ApiRequest class.
 
     This class will form the basis for eventlet-based ApiRequest classes
     (e.g. those used by the Quantum NVP Plugin).
     '''
 
-    # List of allowed status codes.
-    ALLOWED_STATUS_CODES = [
-        httplib.OK,
-        httplib.CREATED,
-        httplib.NO_CONTENT,
-        httplib.MOVED_PERMANENTLY,
-        httplib.TEMPORARY_REDIRECT,
-        httplib.BAD_REQUEST,
-        httplib.UNAUTHORIZED,
-        httplib.FORBIDDEN,
-        httplib.NOT_FOUND,
-        httplib.CONFLICT,
-        httplib.INTERNAL_SERVER_ERROR,
-        httplib.SERVICE_UNAVAILABLE
-    ]
-
     # Maximum number of green threads present in the system at one time.
-    API_REQUEST_POOL_SIZE = DEFAULT_API_REQUEST_POOL_SIZE
+    API_REQUEST_POOL_SIZE = request.DEFAULT_API_REQUEST_POOL_SIZE
 
     # Pool of green threads. One green thread is allocated per incoming
     # request. Incoming requests will block when the pool is empty.
@@ -77,18 +48,18 @@ class NvpApiRequestEventlet:
 
     # A unique id is assigned to each incoming request. When the current
     # request id reaches MAXIMUM_REQUEST_ID it wraps around back to 0.
-    MAXIMUM_REQUEST_ID = DEFAULT_MAXIMUM_REQUEST_ID
+    MAXIMUM_REQUEST_ID = request.DEFAULT_MAXIMUM_REQUEST_ID
 
     # The request id for the next incoming request.
     CURRENT_REQUEST_ID = 0
 
     def __init__(self, nvp_api_client, url, method="GET", body=None,
                  headers=None,
-                 request_timeout=DEFAULT_REQUEST_TIMEOUT,
-                 retries=DEFAULT_RETRIES,
+                 request_timeout=request.DEFAULT_REQUEST_TIMEOUT,
+                 retries=request.DEFAULT_RETRIES,
                  auto_login=True,
-                 redirects=DEFAULT_REDIRECTS,
-                 http_timeout=DEFAULT_HTTP_TIMEOUT):
+                 redirects=request.DEFAULT_REDIRECTS,
+                 http_timeout=request.DEFAULT_HTTP_TIMEOUT, client_conn=None):
         '''Constructor.'''
         self._api_client = nvp_api_client
         self._url = url
@@ -100,6 +71,8 @@ class NvpApiRequestEventlet:
         self._auto_login = auto_login
         self._redirects = redirects
         self._http_timeout = http_timeout
+        self._client_conn = client_conn
+        self._abort = False
 
         self._request_error = None
 
@@ -126,10 +99,6 @@ class NvpApiRequestEventlet:
         '''Spawn a new green thread with the supplied function and args.'''
         return self.__class__._spawn(func, *args, **kwargs)
 
-    def _rid(self):
-        '''Return current request id.'''
-        return self._request_id
-
     @classmethod
     def joinall(cls):
         '''Wait for all outstanding requests to complete.'''
@@ -152,197 +121,19 @@ class NvpApiRequestEventlet:
             self._headers, self._request_timeout, self._retries,
             self._auto_login, self._redirects, self._http_timeout)
 
-    @property
-    def request_error(self):
-        '''Return any errors associated with this instance.'''
-        return self._request_error
-
     def _run(self):
         '''Method executed within green thread.'''
         if self._request_timeout:
             # No timeout exception escapes the with block.
-            with timeout.Timeout(self._request_timeout, False):
+            with eventlet.timeout.Timeout(self._request_timeout, False):
                 return self._handle_request()
 
-            lg.info(_('[%d] Request timeout.'), self._rid())
+            LOG.info(_('[%d] Request timeout.'), self._rid())
             self._request_error = Exception(_('Request timeout'))
             return None
         else:
             return self._handle_request()
 
-    def _request_str(self, conn, url):
-        '''Return string representation of connection.'''
-        return "%s %s/%s" % (self._method, _conn_str(conn), url)
-
-    def _issue_request(self):
-        '''Issue a request to a provider.'''
-        conn = self._api_client.acquire_connection(rid=self._rid())
-        if conn is None:
-            error = Exception(_("No API connections available"))
-            self._request_error = error
-            return error
-
-        # Preserve the acquired connection as conn may be over-written by
-        # redirects below.
-        acquired_conn = conn
-
-        url = self._url
-        lg.debug(_("[%(rid)d] Issuing - request '%(req)s'"),
-                 {'rid': self._rid(),
-                  'req': self._request_str(conn, url)})
-        issued_time = time.time()
-        is_conn_error = False
-        try:
-            redirects = 0
-            while (redirects <= self._redirects):
-                # Update connection with user specified request timeout,
-                # the connect timeout is usually smaller so we only set
-                # the request timeout after a connection is established
-                if conn.sock is None:
-                    conn.connect()
-                    conn.sock.settimeout(self._http_timeout)
-                elif conn.sock.gettimeout() != self._http_timeout:
-                    conn.sock.settimeout(self._http_timeout)
-
-                headers = copy.copy(self._headers)
-                gen = self._api_client.nvp_config_gen
-                if gen:
-                    headers["X-Nvp-Wait-For-Config-Generation"] = gen
-                    lg.debug(_("Setting %(header)s request header: %(gen)s"),
-                             {'header': 'X-Nvp-Wait-For-Config-Generation',
-                              'gen': gen})
-                try:
-                    conn.request(self._method, url, self._body, headers)
-                except Exception as e:
-                    lg.warn(_('[%(rid)d] Exception issuing request: %(e)s'),
-                            {'rid': self._rid(), 'e': e})
-                    raise e
-
-                response = conn.getresponse()
-                response.body = response.read()
-                response.headers = response.getheaders()
-                lg.debug(_("[%(rid)d] Completed request '%(req)s': %(status)s "
-                           "(%(time)0.2f seconds)"),
-                         {'rid': self._rid(),
-                          'req': self._request_str(conn, url),
-                          'status': response.status,
-                          'time': time.time() - issued_time})
-
-                new_gen = response.getheader('X-Nvp-Config-Generation', None)
-                if new_gen:
-                    lg.debug(_("Reading %(header)s response header: %(gen)s"),
-                             {'header': 'X-Nvp-config-Generation',
-                              'gen': new_gen})
-                    if (self._api_client.nvp_config_gen is None or
-                            self._api_client.nvp_config_gen < int(new_gen)):
-                        self._api_client.nvp_config_gen = int(new_gen)
-
-                if response.status not in [httplib.MOVED_PERMANENTLY,
-                                           httplib.TEMPORARY_REDIRECT]:
-                    break
-                elif redirects >= self._redirects:
-                    lg.info(_("[%d] Maximum redirects exceeded, aborting "
-                              "request"), self._rid())
-                    break
-                redirects += 1
-
-                # In the following call, conn is replaced by the connection
-                # specified in the redirect response from the server.
-                conn, url = self._redirect_params(conn, response.headers)
-                if url is None:
-                    response.status = httplib.INTERNAL_SERVER_ERROR
-                    break
-                lg.info(_("[%(rid)d] Redirecting request to: %(req)s"),
-                        {'rid': self._rid(),
-                         'req': self._request_str(conn, url)})
-
-            # FIX for #9415. If we receive any of these responses, then
-            # our server did not process our request and may be in an
-            # errored state. Raise an exception, which will cause the
-            # the conn to be released with is_conn_error == True
-            # which puts the conn on the back of the client's priority
-            # queue.
-            if response.status >= 500:
-                lg.warn(_("[%(rid)d] Request '%(method)s %(url)s' "
-                          "received: %(status)s"),
-                        {'rid': self._rid(), 'method': self._method,
-                         'url': self._url,
-                         'status': response.status})
-                raise Exception(_('Server error return: %s') %
-                                response.status)
-            return response
-        except Exception as e:
-            if isinstance(e, httplib.BadStatusLine):
-                msg = _("Invalid server response")
-            else:
-                msg = unicode(e)
-            lg.warn(_("[%(rid)d] Failed request '%(req)s': %(msg)s "
-                      "(%(time)0.2f seconds)"),
-                    {'rid': self._rid(), 'req': self._request_str(conn, url),
-                     'msg': msg,
-                     'time': time.time() - issued_time})
-            self._request_error = e
-            is_conn_error = True
-            return e
-        finally:
-            # Make sure we release the original connection provided by the
-            # acquire_connection() call above.
-            self._api_client.release_connection(acquired_conn, is_conn_error,
-                                                rid=self._rid())
-
-    def _redirect_params(self, conn, headers):
-        '''Process redirect params from a server response.'''
-        url = None
-        for name, value in headers:
-            if name.lower() == "location":
-                url = value
-                break
-        if not url:
-            lg.warn(_("[%d] Received redirect status without location header "
-                      "field"), self._rid())
-            return (conn, None)
-        # Accept location with the following format:
-        # 1. /path, redirect to same node
-        # 2. scheme://hostname:[port]/path where scheme is https or http
-        # Reject others
-        # 3. e.g. relative paths, unsupported scheme, unspecified host
-        result = urlparse.urlparse(url)
-        if not result.scheme and not result.hostname and result.path:
-            if result.path[0] == "/":
-                if result.query:
-                    url = "%s?%s" % (result.path, result.query)
-                else:
-                    url = result.path
-                return (conn, url)      # case 1
-            else:
-                lg.warn(_("[%(rid)d] Received invalid redirect location: "
-                          "%(url)s"),
-                        {'rid': self._rid(), 'url': url})
-                return (conn, None)     # case 3
-        elif result.scheme not in ["http", "https"] or not result.hostname:
-            lg.warn(_("[%(rid)d] Received malformed redirect location: "
-                      "%(url)s"),
-                    {'rid': self._rid(), 'url': url})
-            return (conn, None)         # case 3
-        # case 2, redirect location includes a scheme
-        # so setup a new connection and authenticate
-        use_https = result.scheme == "https"
-        api_providers = [(result.hostname, result.port, use_https)]
-        api_client = client_eventlet.NvpApiClientEventlet(
-            api_providers, self._api_client.user, self._api_client.password,
-            use_https=use_https)
-        api_client.wait_for_login()
-        if api_client.auth_cookie:
-            self._headers["Cookie"] = api_client.auth_cookie
-        else:
-            self._headers["Cookie"] = ""
-        conn = api_client.acquire_connection(rid=self._rid())
-        if result.query:
-            url = "%s?%s" % (result.path, result.query)
-        else:
-            url = result.path
-        return (conn, url)
-
     def _handle_request(self):
         '''First level request handling.'''
         attempt = 0
@@ -350,46 +141,41 @@ class NvpApiRequestEventlet:
         while response is None and attempt <= self._retries:
             attempt += 1
 
-            if self._auto_login and self._api_client.need_login:
-                self._api_client.wait_for_login()
-
-            if self._api_client.auth_cookie:
-                self._headers["Cookie"] = self._api_client.auth_cookie
-
             req = self.spawn(self._issue_request).wait()
             # automatically raises any exceptions returned.
             if isinstance(req, httplib.HTTPResponse):
-                if (req.status == httplib.UNAUTHORIZED
+                if attempt <= self._retries and not self._abort:
+                    if (req.status == httplib.UNAUTHORIZED
                         or req.status == httplib.FORBIDDEN):
-                    self._api_client.need_login = True
-                    if attempt <= self._retries:
                         continue
                     # else fall through to return the error code
 
-                lg.debug(_("[%(rid)d] Completed request '%(method)s %(url)s'"
-                           ": %(status)s"),
-                         {'rid': self._rid(), 'method': self._method,
-                          'url': self._url, 'status': req.status})
+                LOG.debug(_("[%(rid)d] Completed request '%(method)s %(url)s'"
+                            ": %(status)s"),
+                          {'rid': self._rid(), 'method': self._method,
+                           'url': self._url, 'status': req.status})
                 self._request_error = None
                 response = req
             else:
-                lg.info(_('[%(rid)d] Error while handling request: %(req)s'),
-                        {'rid': self._rid(), 'req': req})
+                LOG.info(_('[%(rid)d] Error while handling request: %(req)s'),
+                         {'rid': self._rid(), 'req': req})
                 self._request_error = req
                 response = None
-
         return response
 
 
 class NvpLoginRequestEventlet(NvpApiRequestEventlet):
     '''Process a login request.'''
 
-    def __init__(self, nvp_client, user, password):
-        headers = {"Content-Type": "application/x-www-form-urlencoded"}
+    def __init__(self, nvp_client, user, password, client_conn=None,
+                 headers=None):
+        if headers is None:
+            headers = {}
+        headers.update({"Content-Type": "application/x-www-form-urlencoded"})
         body = urllib.urlencode({"username": user, "password": password})
         NvpApiRequestEventlet.__init__(
             self, nvp_client, "/ws.v1/login", "POST", body, headers,
-            auto_login=False)
+            auto_login=False, client_conn=client_conn)
 
     def session_cookie(self):
         if self.successful():
@@ -398,7 +184,7 @@ class NvpLoginRequestEventlet(NvpApiRequestEventlet):
 
 
 class NvpGetApiProvidersRequestEventlet(NvpApiRequestEventlet):
-    '''Get a list of API providers.'''
+    '''Gej a list of API providers.'''
 
     def __init__(self, nvp_client):
         url = "/ws.v1/control-cluster/node?fields=roles"
@@ -427,8 +213,8 @@ class NvpGetApiProvidersRequestEventlet(NvpApiRequestEventlet):
                                 ret.append(_provider_from_listen_addr(addr))
                 return ret
         except Exception as e:
-            lg.warn(_("[%(rid)d] Failed to parse API provider: %(e)s"),
-                    {'rid': self._rid(), 'e': e})
+            LOG.warn(_("[%(rid)d] Failed to parse API provider: %(e)s"),
+                     {'rid': self._rid(), 'e': e})
             # intentionally fall through
         return None
 
@@ -438,12 +224,11 @@ class NvpGenericRequestEventlet(NvpApiRequestEventlet):
 
     def __init__(self, nvp_client, method, url, body, content_type,
                  auto_login=False,
-                 request_timeout=DEFAULT_REQUEST_TIMEOUT,
-                 http_timeout=DEFAULT_HTTP_TIMEOUT,
-                 retries=DEFAULT_RETRIES,
-                 redirects=DEFAULT_REDIRECTS):
+                 request_timeout=request.DEFAULT_REQUEST_TIMEOUT,
+                 http_timeout=request.DEFAULT_HTTP_TIMEOUT,
+                 retries=request.DEFAULT_RETRIES,
+                 redirects=request.DEFAULT_REDIRECTS):
         headers = {"Content-Type": content_type}
-
         NvpApiRequestEventlet.__init__(
             self, nvp_client, url, method, body, headers,
             request_timeout=request_timeout, retries=retries,
index 23459f7a73794c7aba240caede139a815abefa80..41db479201eefb6f77ae34c9500092730e39ed96 100644 (file)
@@ -21,7 +21,7 @@ nvp_opts = [
     cfg.IntOpt('max_lp_per_bridged_ls', default=64),
     cfg.IntOpt('max_lp_per_overlay_ls', default=256),
     cfg.IntOpt('concurrent_connections', default=5),
-    cfg.IntOpt('failover_time', default=240),
+    cfg.IntOpt('nvp_gen_timeout', default=-1),
     cfg.StrOpt('default_cluster_name')
 ]
 
index f06aabe35dfa4dd0b12d8e2bade867a906008cbc..3e7ec54a290722a5049d0c239b8b414548be1947 100644 (file)
@@ -134,7 +134,6 @@ class NvpApiRequestEventletTest(unittest.TestCase):
         myconn.__str__.return_value = 'myconn string'
 
         req = self.req
-        req._request_timeout = REQUEST_TIMEOUT = 1
         req._redirect_params = Mock()
         req._redirect_params.return_value = (myconn, 'url')
         req._request_str = Mock()
@@ -214,60 +213,48 @@ class NvpApiRequestEventletTest(unittest.TestCase):
         with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.'
                    'client_eventlet.NvpApiClientEventlet') as mock:
             api_client = mock.return_value
-            api_client.wait_for_login.return_value = None
-            api_client.auth_cookie = 'mycookie'
-            api_client.acquire_connection.return_value = True
+            self.req._api_client = api_client
             myconn = Mock()
             (conn, retval) = self.req._redirect_params(
                 myconn, [('location', 'https://host:1/path')])
 
             self.assertTrue(retval is not None)
-            self.assertTrue(api_client.wait_for_login.called)
-            self.assertTrue(api_client.acquire_connection.called)
+            self.assertTrue(api_client.acquire_redirect_connection.called)
 
     def test_redirect_params_setup_htttps_and_query(self):
         with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.'
                    'client_eventlet.NvpApiClientEventlet') as mock:
             api_client = mock.return_value
-            api_client.wait_for_login.return_value = None
-            api_client.auth_cookie = 'mycookie'
-            api_client.acquire_connection.return_value = True
+            self.req._api_client = api_client
             myconn = Mock()
             (conn, retval) = self.req._redirect_params(myconn, [
                 ('location', 'https://host:1/path?q=1')])
 
             self.assertTrue(retval is not None)
-            self.assertTrue(api_client.wait_for_login.called)
-            self.assertTrue(api_client.acquire_connection.called)
+            self.assertTrue(api_client.acquire_redirect_connection.called)
 
     def test_redirect_params_setup_https_connection_no_cookie(self):
         with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.'
                    'client_eventlet.NvpApiClientEventlet') as mock:
             api_client = mock.return_value
-            api_client.wait_for_login.return_value = None
-            api_client.auth_cookie = None
-            api_client.acquire_connection.return_value = True
+            self.req._api_client = api_client
             myconn = Mock()
             (conn, retval) = self.req._redirect_params(myconn, [
                 ('location', 'https://host:1/path')])
 
             self.assertTrue(retval is not None)
-            self.assertTrue(api_client.wait_for_login.called)
-            self.assertTrue(api_client.acquire_connection.called)
+            self.assertTrue(api_client.acquire_redirect_connection.called)
 
     def test_redirect_params_setup_https_and_query_no_cookie(self):
         with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.'
                    'client_eventlet.NvpApiClientEventlet') as mock:
             api_client = mock.return_value
-            api_client.wait_for_login.return_value = None
-            api_client.auth_cookie = None
-            api_client.acquire_connection.return_value = True
+            self.req._api_client = api_client
             myconn = Mock()
             (conn, retval) = self.req._redirect_params(
                 myconn, [('location', 'https://host:1/path?q=1')])
             self.assertTrue(retval is not None)
-            self.assertTrue(api_client.wait_for_login.called)
-            self.assertTrue(api_client.acquire_connection.called)
+            self.assertTrue(api_client.acquire_redirect_connection.called)
 
     def test_redirect_params_path_only_with_query(self):
         with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.'