From: Aaron Rosen Date: Wed, 31 Oct 2012 04:35:42 +0000 (-0700) Subject: Allow nvp_api to load balance requests X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=76b872266665b070663e8eef9b4778ff8b523cb5;p=openstack-build%2Fneutron-build.git Allow nvp_api to load balance requests The current version of the nvp_api client does not load balance requests across multiple controllers. Instead, it just sends all the requests to one controller and if there is a controller failure it will failover to use another controller. This blueprint implements the ablility to utilize all controllers at once. blueprint nvp-api-client-loadbalance-request Change-Id: I331be2a23ae360a95786152d5f116359f690d9f3 --- diff --git a/quantum/plugins/nicira/nicira_nvp_plugin/NvpApiClient.py b/quantum/plugins/nicira/nicira_nvp_plugin/NvpApiClient.py index 0c3a65eeb..48a8d60e9 100644 --- a/quantum/plugins/nicira/nicira_nvp_plugin/NvpApiClient.py +++ b/quantum/plugins/nicira/nicira_nvp_plugin/NvpApiClient.py @@ -36,8 +36,8 @@ class NVPApiHelper(client_eventlet.NvpApiClientEventlet): ''' def __init__(self, api_providers, user, password, request_timeout, - http_timeout, retries, redirects, failover_time, - concurrent_connections=3): + http_timeout, retries, redirects, + concurrent_connections=3, nvp_gen_timeout=-1): '''Constructor. :param api_providers: a list of tuples in the form: @@ -53,12 +53,10 @@ class NVPApiHelper(client_eventlet.NvpApiClientEventlet): controller in the cluster) :param retries: the number of concurrent connections. :param redirects: the number of concurrent connections. - :param failover_time: minimum time between controller failover and new - connections allowed. ''' client_eventlet.NvpApiClientEventlet.__init__( self, api_providers, user, password, concurrent_connections, - failover_time=failover_time) + nvp_gen_timeout) self._request_timeout = request_timeout self._http_timeout = http_timeout @@ -84,7 +82,7 @@ class NVPApiHelper(client_eventlet.NvpApiClientEventlet): if password: self._password = password - return client_eventlet.NvpApiClientEventlet.login(self) + return client_eventlet.NvpApiClientEventlet._login(self) def request(self, method, url, body="", content_type="application/json"): '''Issues request to controller.''' diff --git a/quantum/plugins/nicira/nicira_nvp_plugin/QuantumPlugin.py b/quantum/plugins/nicira/nicira_nvp_plugin/QuantumPlugin.py index 19b7a6380..9daefca06 100644 --- a/quantum/plugins/nicira/nicira_nvp_plugin/QuantumPlugin.py +++ b/quantum/plugins/nicira/nicira_nvp_plugin/QuantumPlugin.py @@ -27,7 +27,6 @@ import webob.exc # FIXME(salvatore-orlando): get rid of relative imports from common import config -from quantum.plugins.nicira.nicira_nvp_plugin.api_client import client_eventlet from nvp_plugin_version import PLUGIN_VERSION from quantum.plugins.nicira.nicira_nvp_plugin import nicira_models @@ -157,12 +156,11 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2): http_timeout=cluster.http_timeout, retries=cluster.retries, redirects=cluster.redirects, - failover_time=self.nvp_opts.failover_time, - concurrent_connections=self.nvp_opts.concurrent_connections) + concurrent_connections=self.nvp_opts['concurrent_connections'], + nvp_gen_timeout=self.nvp_opts['nvp_gen_timeout']) - # TODO(salvatore-orlando): do login at first request, - # not when plugin is instantiated - cluster.api_client.login() + if len(self.clusters) == 0: + first_cluster = cluster self.clusters[c_opts['name']] = cluster def_cluster_name = self.nvp_opts.default_cluster_name diff --git a/quantum/plugins/nicira/nicira_nvp_plugin/README b/quantum/plugins/nicira/nicira_nvp_plugin/README index caefb7cbe..ca4eed63c 100644 --- a/quantum/plugins/nicira/nicira_nvp_plugin/README +++ b/quantum/plugins/nicira/nicira_nvp_plugin/README @@ -21,8 +21,8 @@ NVP Plugin configuration bridged transport zone (default 64) - concurrent_connections: Number of connects to each controller node (default 3) - - failover_time: Time from when a connection pool is switched to another - controller during failures. + - nvp_gen_timout: Number of seconds a generation id should be valid for + (default -1 meaning do not time out) 3) NVP cluster The Quantum NVP plugin allow for configuring multiple clusters. Each cluster configuration section must be declared in the following way diff --git a/quantum/plugins/nicira/nicira_nvp_plugin/api_client/__init__.py b/quantum/plugins/nicira/nicira_nvp_plugin/api_client/__init__.py index f76d4a961..87f79ba05 100644 --- a/quantum/plugins/nicira/nicira_nvp_plugin/api_client/__init__.py +++ b/quantum/plugins/nicira/nicira_nvp_plugin/api_client/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2012 Nicira Networks, Inc. +# Copyright 2012 Nicira, Inc. # All Rights Reserved # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -14,3 +14,5 @@ # under the License. # # vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# @author: Aaron Rosen, Nicira Networks, Inc. diff --git a/quantum/plugins/nicira/nicira_nvp_plugin/api_client/client.py b/quantum/plugins/nicira/nicira_nvp_plugin/api_client/client.py index e3cc9d1d0..a0cdb81b0 100644 --- a/quantum/plugins/nicira/nicira_nvp_plugin/api_client/client.py +++ b/quantum/plugins/nicira/nicira_nvp_plugin/api_client/client.py @@ -1,25 +1,40 @@ -# Copyright 2009-2012 Nicira Networks, Inc. +# Copyright 2012 Nicira, Inc. # All Rights Reserved # -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. # # vim: tabstop=4 shiftwidth=4 softtabstop=4 # -# Author: David Lapsley , Nicira Networks, Inc. +# @author: David Lapsley , Nicira Networks, Inc. +# @author: Aaron Rosen, Nicira Networks, Inc. + from abc import ABCMeta -from abc import abstractmethod -from abc import abstractproperty +import httplib +import time +import logging + + +from quantum.plugins.nicira.nicira_nvp_plugin.api_client.common import ( + _conn_str) + + +logging.basicConfig(level=logging.INFO) +LOG = logging.getLogger(__name__) +#Default parameters. +GENERATION_ID_TIMEOUT = -1 +DEFAULT_CONCURRENT_CONNECTIONS = 3 +DEFAULT_CONNECT_TIMEOUT = 5 class NvpApiClient(object): @@ -33,38 +48,230 @@ class NvpApiClient(object): CONN_IDLE_TIMEOUT = 60 * 15 - @abstractmethod - def update_providers(self, api_providers): - pass + def _create_connection(self, host, port, is_ssl): + if is_ssl: + return httplib.HTTPSConnection(host, port, + timeout=self._connect_timeout) + return httplib.HTTPConnection(host, port, + timeout=self._connect_timeout) + + @staticmethod + def _conn_params(http_conn): + is_ssl = isinstance(http_conn, httplib.HTTPSConnection) + return (http_conn.host, http_conn.port, is_ssl) - @abstractproperty + @property def user(self): - pass + return self._user - @abstractproperty + @property def password(self): - pass + return self._password + + @property + def nvp_config_gen(self): + # If nvp_gen_timeout is not -1 then: + # Maintain a timestamp along with the generation ID. Hold onto the + # ID long enough to be useful and block on sequential requests but + # not long enough to persist when Onix db is cleared, which resets + # the generation ID, causing the DAL to block indefinitely with some + # number that's higher than the cluster's value. + if self._nvp_gen_timeout != -1: + ts = self._nvp_config_gen_ts + if ts is not None: + if (time.time() - ts) > self._nvp_gen_timeout: + return None + return self._nvp_config_gen + + @nvp_config_gen.setter + def nvp_config_gen(self, value): + if self._nvp_config_gen != value: + if self._nvp_gen_timeout != -1: + self._nvp_config_gen_ts = time.time() + self._nvp_config_gen = value + + def auth_cookie(self, conn): + cookie = None + data = self._get_provider_data(conn) + if data: + cookie = data[1] + return cookie + + def set_auth_cookie(self, conn, cookie): + data = self._get_provider_data(conn) + if data: + self._set_provider_data(conn, (data[0], cookie)) + + def acquire_connection(self, auto_login=True, headers=None, rid=-1): + '''Check out an available HTTPConnection instance. + + Blocks until a connection is available. + :auto_login: automatically logins before returning conn + :headers: header to pass on to login attempt + :param rid: request id passed in from request eventlet. + :returns: An available HTTPConnection instance or None if no + api_providers are configured. + ''' + if not self._api_providers: + LOG.warn(_("[%d] no API providers currently available."), rid) + return None + if self._conn_pool.empty(): + LOG.debug(_("[%d] Waiting to acquire API client connection."), rid) + priority, conn = self._conn_pool.get() + now = time.time() + if getattr(conn, 'last_used', now) < now - self.CONN_IDLE_TIMEOUT: + LOG.info(_("[%d] Connection %s idle for %0.2f seconds; " + "reconnecting."), + rid, _conn_str(conn), now - conn.last_used) + conn = self._create_connection(*self._conn_params(conn)) - @abstractproperty - def auth_cookie(self): - pass + conn.last_used = now + conn.priority = priority # stash current priority for release + qsize = self._conn_pool.qsize() + LOG.debug(_("[%d] Acquired connection %s. %d connection(s) " + "available."), rid, _conn_str(conn), qsize) + if auto_login and self.auth_cookie(conn) is None: + self._wait_for_login(conn, headers) + return conn - @abstractmethod - def acquire_connection(self): - pass + def release_connection(self, http_conn, bad_state=False, + service_unavail=False, rid=-1): + '''Mark HTTPConnection instance as available for check-out. - @abstractmethod - def release_connection(self, http_conn, bad_state=False): - pass + :param http_conn: An HTTPConnection instance obtained from this + instance. + :param bad_state: True if http_conn is known to be in a bad state + (e.g. connection fault.) + :service_unavail: True if http_conn returned 503 response. + :param rid: request id passed in from request eventlet. + ''' + conn_params = self._conn_params(http_conn) + if self._conn_params(http_conn) not in self._api_providers: + LOG.debug(_("[%d] Released connection '%s' is not an API provider " + "for the cluster"), rid, _conn_str(http_conn)) + return + elif hasattr(http_conn, "no_release"): + return - @abstractproperty - def need_login(self): - pass + if bad_state: + # Reconnect to provider. + LOG.warn(_("[%d] Connection returned in bad state, reconnecting " + "to %s"), rid, _conn_str(http_conn)) + http_conn = self._create_connection(*self._conn_params(http_conn)) + priority = self._next_conn_priority + self._next_conn_priority += 1 + elif service_unavail: + # http_conn returned a service unaviable response, put other + # connections to the same controller at end of priority queue, + conns = [] + while not self._conn_pool.empty(): + priority, conn = self._conn_pool.get() + if self._conn_params(conn) == conn_params: + priority = self._next_conn_priority + self._next_conn_priority += 1 + conns.append((priority, conn)) + for priority, conn in conns: + self._conn_pool.put((priority, conn)) + # put http_conn at end of queue also + priority = self._next_conn_priority + self._next_conn_priority += 1 + else: + priority = http_conn.priority + + self._conn_pool.put((priority, http_conn)) + LOG.debug(_("[%d] Released connection %s. %d connection(s) " + "available."), + rid, _conn_str(http_conn), self._conn_pool.qsize()) + + def _wait_for_login(self, conn, headers=None): + '''Block until a login has occurred for the current API provider.''' + + data = self._get_provider_data(conn) + if data is None: + LOG.error(_("Login request for an invalid connection: '%s'"), + _conn_str(conn)) + return + provider_sem = data[0] + if provider_sem.acquire(blocking=False): + try: + cookie = self._login(conn, headers) + self.set_auth_cookie(conn, cookie) + finally: + provider_sem.release() + else: + LOG.debug(_("Waiting for auth to complete")) + # Wait until we can aquire then release + provider_sem.acquire(blocking=True) + provider_sem.release() + + def _get_provider_data(self, conn_or_conn_params, default=None): + """Get data for specified API provider. + + Args: + conn_or_conn_params: either a HTTP(S)Connection object or the + resolved conn_params tuple returned by self._conn_params(). + default: conn_params if ones passed aren't known + Returns: Data associated with specified provider + """ + conn_params = self._normalize_conn_params(conn_or_conn_params) + return self._api_provider_data.get(conn_params, default) + + def _set_provider_data(self, conn_or_conn_params, data): + """Set data for specified API provider. + + Args: + conn_or_conn_params: either a HTTP(S)Connection object or the + resolved conn_params tuple returned by self._conn_params(). + data: data to associate with API provider + """ + conn_params = self._normalize_conn_params(conn_or_conn_params) + if data is None: + del self._api_provider_data[conn_params] + else: + self._api_provider_data[conn_params] = data + + def _normalize_conn_params(self, conn_or_conn_params): + """Normalize conn_param tuple. + + Args: + conn_or_conn_params: either a HTTP(S)Connection object or the + resolved conn_params tuple returned by self._conn_params(). + + Returns: Normalized conn_param tuple + """ + if (not isinstance(conn_or_conn_params, tuple) and + not isinstance(conn_or_conn_params, httplib.HTTPConnection)): + LOG.debug(_("Invalid conn_params value: '%s'"), + str(conn_or_conn_params)) + return conn_or_conn_params + if isinstance(conn_or_conn_params, httplib.HTTPConnection): + conn_params = self._conn_params(conn_or_conn_params) + else: + conn_params = conn_or_conn_params + host, port, is_ssl = conn_params + if port is None: + port = 443 if is_ssl else 80 + return (host, port, is_ssl) + + def update_providers(self, api_providers): + new_providers = set([tuple(p) for p in api_providers]) + if new_providers != self._api_providers: + new_conns = [] + while not self._conn_pool.empty(): + priority, conn = self._conn_pool.get_nowait() + if self._conn_params(conn) in new_providers: + new_conns.append((priority, conn)) - @abstractmethod - def wait_for_login(self): - pass + to_subtract = self._api_providers - new_providers + for p in to_subtract: + self._set_provider_data(p, None) + to_add = new_providers - self._api_providers + for unused_i in range(self._concurrent_connections): + for host, port, is_ssl in to_add: + conn = self._create_connection(host, port, is_ssl) + new_conns.append((self._next_conn_priority, conn)) + self._next_conn_priority += 1 - @abstractmethod - def login(self): - pass + for priority, conn in new_conns: + self._conn_pool.put((priority, conn)) + self._api_providers = new_providers diff --git a/quantum/plugins/nicira/nicira_nvp_plugin/api_client/client_eventlet.py b/quantum/plugins/nicira/nicira_nvp_plugin/api_client/client_eventlet.py index 75d034766..8e10ee128 100644 --- a/quantum/plugins/nicira/nicira_nvp_plugin/api_client/client_eventlet.py +++ b/quantum/plugins/nicira/nicira_nvp_plugin/api_client/client_eventlet.py @@ -1,53 +1,44 @@ -# Copyright 2009-2012 Nicira Networks, Inc. +# Copyright 2012 Nicira, Inc. # All Rights Reserved # -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. # # vim: tabstop=4 shiftwidth=4 softtabstop=4 # +# @author: Aaron Rosen, Nicira Networks, Inc. + -import client import eventlet -import httplib import logging -import request_eventlet import time -from common import _conn_str +from quantum.plugins.nicira.nicira_nvp_plugin.api_client import client +from quantum.plugins.nicira.nicira_nvp_plugin.api_client import ( + request_eventlet) eventlet.monkey_patch() logging.basicConfig(level=logging.INFO) LOG = logging.getLogger(__name__) -# Default parameters. -DEFAULT_FAILOVER_TIME = 5 -DEFAULT_CONCURRENT_CONNECTIONS = 3 -DEFAULT_CONNECT_TIMEOUT = 5 -GENERATION_ID_TIMEOUT = -1 # if set to -1 then disabled - -class NvpApiClientEventlet(object): +class NvpApiClientEventlet(client.NvpApiClient): '''Eventlet-based implementation of NvpApiClient ABC.''' - CONN_IDLE_TIMEOUT = 60 * 15 - def __init__(self, api_providers, user, password, - concurrent_connections=DEFAULT_CONCURRENT_CONNECTIONS, - use_https=True, - connect_timeout=DEFAULT_CONNECT_TIMEOUT, - failover_time=DEFAULT_FAILOVER_TIME, - nvp_gen_timeout=GENERATION_ID_TIMEOUT): + concurrent_connections=client.DEFAULT_CONCURRENT_CONNECTIONS, + nvp_gen_timeout=client.GENERATION_ID_TIMEOUT, use_https=True, + connect_timeout=client.DEFAULT_CONNECT_TIMEOUT): '''Constructor :param api_providers: a list of tuples of the form: (host, port, @@ -57,242 +48,112 @@ class NvpApiClientEventlet(object): :param concurrent_connections: total number of concurrent connections. :param use_https: whether or not to use https for requests. :param connect_timeout: connection timeout in seconds. - :param failover_time: time from when a connection pool is switched to - the next connection released via acquire_connection(). :param nvp_gen_timeout controls how long the generation id is kept if set to -1 the generation id is never timed out ''' if not api_providers: api_providers = [] self._api_providers = set([tuple(p) for p in api_providers]) + self._api_provider_data = {} # tuple(semaphore, nvp_session_cookie) + for p in self._api_providers: + self._set_provider_data(p, (eventlet.semaphore.Semaphore(1), None)) self._user = user self._password = password self._concurrent_connections = concurrent_connections self._use_https = use_https self._connect_timeout = connect_timeout - self._failover_time = failover_time self._nvp_config_gen = None self._nvp_config_gen_ts = None self._nvp_gen_timeout = nvp_gen_timeout # Connection pool is a list of queues. - self._conn_pool = list() - conn_pool_idx = 0 + self._conn_pool = eventlet.queue.PriorityQueue() + self._next_conn_priority = 1 for host, port, is_ssl in api_providers: - provider_conn_pool = eventlet.queue.Queue( - maxsize=concurrent_connections) for i in range(concurrent_connections): - # All connections in a provider_conn_poool have the - # same priority (they connect to the same server). conn = self._create_connection(host, port, is_ssl) - conn.idx = conn_pool_idx - provider_conn_pool.put(conn) - - self._conn_pool.append(provider_conn_pool) - conn_pool_idx += 1 - - self._active_conn_pool_idx = 0 - - self._cookie = None - self._need_login = True - self._doing_login_sem = eventlet.semaphore.Semaphore(1) - - def _create_connection(self, host, port, is_ssl): - if is_ssl: - return httplib.HTTPSConnection(host, port, - timeout=self._connect_timeout) - return httplib.HTTPConnection(host, port, - timeout=self._connect_timeout) - - @staticmethod - def _conn_params(http_conn): - is_ssl = isinstance(http_conn, httplib.HTTPSConnection) - return (http_conn.host, http_conn.port, is_ssl) - - def update_providers(self, api_providers): - raise Exception(_('update_providers() not implemented.')) - - @property - def user(self): - return self._user - - @property - def password(self): - return self._password - - @property - def nvp_config_gen(self): - # If nvp_gen_timeout is not -1 then: - # Maintain a timestamp along with the generation ID. Hold onto the - # ID long enough to be useful and block on sequential requests but - # not long enough to persist when Onix db is cleared, which resets - # the generation ID, causing the DAL to block indefinitely with some - # number that's higher than the cluster's value. - if self._nvp_gen_timeout != -1: - ts = self._nvp_config_gen_ts - if ts is not None: - if (time.time() - ts) > self._nvp_gen_timeout: - return None - return self._nvp_config_gen - - @nvp_config_gen.setter - def nvp_config_gen(self, value): - if self._nvp_config_gen != value: - if self._nvp_gen_timeout != -1: - self._nvp_config_gen_ts = time.time() - self._nvp_config_gen = value - - @property - def auth_cookie(self): - return self._cookie - - def acquire_connection(self, rid=-1): - '''Check out an available HTTPConnection instance. - - Blocks until a connection is available. - - :param rid: request id passed in from request eventlet. - :returns: An available HTTPConnection instance or None if no - api_providers are configured. - ''' - if not self._api_providers: - LOG.warn(_("[%d] no API providers currently available."), rid) - return None - - # The sleep time is to give controllers time to become consistent after - # there has been a change in the controller used as the api_provider. - now = time.time() - if now < getattr(self, '_issue_conn_barrier', now): - LOG.warn(_("[%d] Waiting for failover timer to expire."), rid) - time.sleep(self._issue_conn_barrier - now) - - # Print out a warning if all connections are in use. - if self._conn_pool[self._active_conn_pool_idx].empty(): - LOG.debug(_("[%d] Waiting to acquire client connection."), rid) - - # Try to acquire a connection (block in get() until connection - # available or timeout occurs). - active_conn_pool_idx = self._active_conn_pool_idx - conn = self._conn_pool[active_conn_pool_idx].get() - - if active_conn_pool_idx != self._active_conn_pool_idx: - # active_conn_pool became inactive while we were waiting. - # Put connection back on old pool and try again. - LOG.warn(_("[%(rid)d] Active pool expired while waiting for " - "connection: %(conn)s"), - {'rid': rid, 'conn': _conn_str(conn)}) - self._conn_pool[active_conn_pool_idx].put(conn) - return self.acquire_connection(rid=rid) - - # Check if the connection has been idle too long. - now = time.time() - if getattr(conn, 'last_used', now) < now - self.CONN_IDLE_TIMEOUT: - LOG.info(_("[%(rid)d] Connection %(conn)s idle for %(sec)0.2f " - "seconds; reconnecting."), - {'rid': rid, 'conn': _conn_str(conn), - 'sec': now - conn.last_used}) - conn = self._create_connection(*self._conn_params(conn)) - - # Stash conn pool so conn knows where to go when it releases. - conn.idx = self._active_conn_pool_idx - - conn.last_used = now - qsize = self._conn_pool[self._active_conn_pool_idx].qsize() - LOG.debug(_("[%(rid)d] Acquired connection %(conn)s. %(qsize)d " - "connection(s) available."), - {'rid': rid, 'conn': _conn_str(conn), 'qsize': qsize}) - return conn - - def release_connection(self, http_conn, bad_state=False, rid=-1): - '''Mark HTTPConnection instance as available for check-out. - - :param http_conn: An HTTPConnection instance obtained from this - instance. - :param bad_state: True if http_conn is known to be in a bad state - (e.g. connection fault.) - :param rid: request id passed in from request eventlet. - ''' - if self._conn_params(http_conn) not in self._api_providers: - LOG.warn(_("[%(rid)d] Released connection '%(conn)s' is not an " - "API provider for the cluster"), - {'rid': rid, 'conn': _conn_str(http_conn)}) - return - - # Retrieve "home" connection pool. - conn_pool_idx = http_conn.idx - conn_pool = self._conn_pool[conn_pool_idx] - if bad_state: - # Reconnect to provider. - LOG.warn(_("[%(rid)d] Connection returned in bad state, " - "reconnecting to %(conn)s"), - {'rid': rid, 'conn': _conn_str(http_conn)}) - http_conn = self._create_connection(*self._conn_params(http_conn)) - http_conn.idx = conn_pool_idx - - if self._active_conn_pool_idx == http_conn.idx: - # This pool is no longer in a good state. Switch to next pool. - self._active_conn_pool_idx += 1 - self._active_conn_pool_idx %= len(self._conn_pool) - LOG.warn(_("[%(rid)d] Switched active_conn_pool from " - "%(idx)d to %(pool_idx)d."), - {'rid': rid, 'idx': http_conn.idx, - 'pool_idx': self._active_conn_pool_idx}) - - # No connections to the new provider allowed until after this - # timer has expired (allow time for synchronization). - self._issue_conn_barrier = time.time() + self._failover_time - - conn_pool.put(http_conn) - LOG.debug(_("[%(rid)d] Released connection %(conn)s. " - "%(qsize)d connection(s) available."), - {'rid': rid, 'conn': _conn_str(http_conn), - 'qsize': conn_pool.qsize()}) - - @property - def need_login(self): - return self._need_login - - @need_login.setter - def need_login(self, val=True): - self._need_login = val - - def wait_for_login(self): - '''Block until a login has occurred for the current API provider.''' - if self._need_login: - if self._doing_login_sem.acquire(blocking=False): - self.login() - self._doing_login_sem.release() - else: - LOG.debug(_("Waiting for auth to complete")) - self._doing_login_sem.acquire() - self._doing_login_sem.release() - return self._cookie - - def login(self): + self._conn_pool.put((self._next_conn_priority, conn)) + self._next_conn_priority += 1 + + def acquire_redirect_connection(self, conn_params, auto_login=True, + headers=None): + """Check out or create connection to redirected NVP API server. + + Args: + conn_params: tuple specifying target of redirect, see + self._conn_params() + auto_login: returned connection should have valid session cookie + headers: headers to pass on if auto_login + + Returns: An available HTTPConnection instance corresponding to the + specified conn_params. If a connection did not previously + exist, new connections are created with the highest prioity + in the connection pool and one of these new connections + returned. + """ + result_conn = None + data = self._get_provider_data(conn_params) + if data: + # redirect target already exists in provider data and connections + # to the provider have been added to the connection pool. Try to + # obtain a connection from the pool, note that it's possible that + # all connection to the provider are currently in use. + conns = [] + while not self._conn_pool.empty(): + priority, conn = self._conn_pool.get_nowait() + if not result_conn and self._conn_params(conn) == conn_params: + conn.priority = priority + result_conn = conn + else: + conns.append((priority, conn)) + for priority, conn in conns: + self._conn_pool.put((priority, conn)) + # hack: if no free connections available, create new connection + # and stash "no_release" attribute (so that we only exceed + # self._concurrent_connections temporarily) + if not result_conn: + conn = self._create_connection(*conn_params) + conn.priority = 0 # redirect connections ahve highest priority + conn.no_release = True + result_conn = conn + else: + #redirect target not already known, setup provider lists + self._api_providers.update([conn_params]) + self._set_provider_data(conn_params, + (eventlet.semaphore.Semaphore(1), None)) + # redirects occur during cluster upgrades, i.e. results to old + # redirects to new, so give redirect targets highest priority + priority = 0 + for i in range(self._concurrent_connections): + conn = self._create_connection(*conn_params) + conn.priority = priority + if i == self._concurrent_connections - 1: + break + self._conn_pool.put((priority, conn)) + result_conn = conn + if result_conn: + result_conn.last_used = time.time() + if auto_login and self.auth_cookie(conn) is None: + self._wait_for_login(result_conn, headers) + return result_conn + + def _login(self, conn=None, headers=None): '''Issue login request and update authentication cookie.''' + cookie = None g = request_eventlet.NvpLoginRequestEventlet( - self, self._user, self._password) + self, self._user, self._password, conn, headers) g.start() ret = g.join() - if ret: if isinstance(ret, Exception): LOG.error(_('NvpApiClient: login error "%s"'), ret) raise ret - self._cookie = None cookie = ret.getheader("Set-Cookie") if cookie: LOG.debug(_("Saving new authentication cookie '%s'"), cookie) - self._cookie = cookie - self._need_login = False - - # TODO: or ret is an error. - if not ret: - return None - - return self._cookie + return cookie # Register as subclass. client.NvpApiClient.register(NvpApiClientEventlet) diff --git a/quantum/plugins/nicira/nicira_nvp_plugin/api_client/common.py b/quantum/plugins/nicira/nicira_nvp_plugin/api_client/common.py index 9cf39da6c..bd61ca7b3 100644 --- a/quantum/plugins/nicira/nicira_nvp_plugin/api_client/common.py +++ b/quantum/plugins/nicira/nicira_nvp_plugin/api_client/common.py @@ -1,23 +1,24 @@ -# Copyright 2009-2012 Nicira Networks, Inc. +# Copyright 2012 Nicira, Inc. # All Rights Reserved # -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. # # vim: tabstop=4 shiftwidth=4 softtabstop=4 # +# @author: Aaron Rosen, Nicira Networks, Inc. + import httplib -import mock def _conn_str(conn): @@ -25,8 +26,6 @@ def _conn_str(conn): proto = "https://" elif isinstance(conn, httplib.HTTPConnection): proto = "http://" - elif isinstance(conn, mock.Mock): - proto = "http://" else: raise TypeError(_('_conn_str() invalid connection type: %s') % type(conn)) diff --git a/quantum/plugins/nicira/nicira_nvp_plugin/api_client/request.py b/quantum/plugins/nicira/nicira_nvp_plugin/api_client/request.py index fb18b5a15..a398a31c5 100644 --- a/quantum/plugins/nicira/nicira_nvp_plugin/api_client/request.py +++ b/quantum/plugins/nicira/nicira_nvp_plugin/api_client/request.py @@ -1,27 +1,49 @@ -# Copyright 2009-2012 Nicira Networks, Inc. +# Copyright 2012 Nicira, Inc. # All Rights Reserved # -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. # # vim: tabstop=4 shiftwidth=4 softtabstop=4 # +# @author: Aaron Rosen, Nicira Networks, Inc. + from abc import ABCMeta from abc import abstractmethod -from abc import abstractproperty +import copy +import httplib +import logging +import time +import urlparse + +from quantum.plugins.nicira.nicira_nvp_plugin.api_client.common import ( + _conn_str) + +logging.basicConfig(level=logging.INFO) +LOG = logging.getLogger(__name__) +# Default parameters. +DEFAULT_REQUEST_TIMEOUT = 30 +DEFAULT_HTTP_TIMEOUT = 10 +DEFAULT_RETRIES = 2 +DEFAULT_REDIRECTS = 2 +DEFAULT_API_REQUEST_POOL_SIZE = 1000 +DEFAULT_MAXIMUM_REQUEST_ID = 4294967295 +DOWNLOAD_TIMEOUT = 180 # The UI code has a coorespoind 190 sec timeout + # for downloads, see: django/nvp_console/views.py -class NvpApiRequest: + +class NvpApiRequest(object): '''An abstract baseclass for all ApiRequest implementations. This defines the interface and property structure for both eventlet and @@ -30,6 +52,22 @@ class NvpApiRequest: __metaclass__ = ABCMeta + # List of allowed status codes. + ALLOWED_STATUS_CODES = [ + httplib.OK, + httplib.CREATED, + httplib.NO_CONTENT, + httplib.MOVED_PERMANENTLY, + httplib.TEMPORARY_REDIRECT, + httplib.BAD_REQUEST, + httplib.UNAUTHORIZED, + httplib.FORBIDDEN, + httplib.NOT_FOUND, + httplib.CONFLICT, + httplib.INTERNAL_SERVER_ERROR, + httplib.SERVICE_UNAVAILABLE + ] + @abstractmethod def start(self): pass @@ -42,6 +80,200 @@ class NvpApiRequest: def copy(self): pass - @abstractproperty + def _issue_request(self): + '''Issue a request to a provider.''' + conn = (self._client_conn or + self._api_client.acquire_connection(True, + copy.copy(self._headers), + rid=self._rid())) + if conn is None: + error = Exception("No API connections available") + self._request_error = error + return error + + url = self._url + LOG.debug(_("[%d] Issuing - request '%s'"), + self._rid(), self._request_str(conn, url)) + issued_time = time.time() + is_conn_error = False + is_conn_service_unavail = False + try: + redirects = 0 + while (redirects <= self._redirects): + # Update connection with user specified request timeout, + # the connect timeout is usually smaller so we only set + # the request timeout after a connection is established + if conn.sock is None: + conn.connect() + conn.sock.settimeout(self._http_timeout) + elif conn.sock.gettimeout() != self._http_timeout: + conn.sock.settimeout(self._http_timeout) + + headers = copy.copy(self._headers) + cookie = self._api_client.auth_cookie(conn) + if cookie: + headers["Cookie"] = cookie + + gen = self._api_client.nvp_config_gen + if gen: + headers["X-Nvp-Wait-For-Config-Generation"] = gen + LOG.debug(_("Setting %s request header: '%s'"), + 'X-Nvp-Wait-For-Config-Generation', gen) + try: + conn.request(self._method, url, self._body, headers) + except Exception as e: + LOG.warn(_("[%d] Exception issuing request: '%s'"), + self._rid(), e) + raise e + + response = conn.getresponse() + response.body = response.read() + response.headers = response.getheaders() + LOG.debug(_("[%d] Completed request '%s': %s (%0.2f seconds)"), + self._rid(), self._request_str(conn, url), + response.status, time.time() - issued_time) + + new_gen = response.getheader('X-Nvp-Config-Generation', None) + if new_gen: + LOG.debug(_("Reading '%s' response header: '%s'"), + 'X-Nvp-config-Generation', new_gen) + if (self._api_client.nvp_config_gen is None or + self._api_client.nvp_config_gen < int(new_gen)): + self._api_client.nvp_config_gen = int(new_gen) + + if response.status == httplib.UNAUTHORIZED: + + if cookie is None and self._url != "/ws.v1/login": + # The connection still has no valid cookie despite + # attemps to authenticate and the request has failed + # with unauthorized status code. If this isn't a + # a request to authenticate, we should abort the + # request since there is no point in retrying. + self._abort = True + else: + # If request is unauthorized, clear the session cookie + # for the current provider so that subsequent requests + # to the same provider triggers re-authentication. + self._api_client.set_auth_cookie(conn, None) + + self._api_client.set_auth_cookie(conn, None) + elif response.status == httplib.SERVICE_UNAVAILABLE: + is_conn_service_unavail = True + + if response.status not in [httplib.MOVED_PERMANENTLY, + httplib.TEMPORARY_REDIRECT]: + break + elif redirects >= self._redirects: + LOG.info(_("[%d] Maximum redirects exceeded, aborting " + "request"), self._rid()) + break + redirects += 1 + + conn, url = self._redirect_params(conn, response.headers, + self._client_conn is None) + if url is None: + response.status = httplib.INTERNAL_SERVER_ERROR + break + LOG.info(_("[%d] Redirecting request to: '%s'"), + self._rid(), self._request_str(conn, url)) + + # If we receive any of these responses, then + # our server did not process our request and may be in an + # errored state. Raise an exception, which will cause the + # the conn to be released with is_conn_error == True + # which puts the conn on the back of the client's priority + # queue. + if response.status >= 500: + LOG.warn(_("[%d] Request '%s %s' received: %s"), + self._rid(), self._method, self._url, + response.status) + raise Exception('Server error return: %s' % + response.status) + return response + except Exception as e: + if isinstance(e, httplib.BadStatusLine): + msg = "Invalid server response" + else: + msg = unicode(e) + LOG.warn(_("[%d] Failed request '%s': '%s' (%0.2f seconds)"), + self._rid(), self._request_str(conn, url), msg, + time.time() - issued_time) + self._request_error = e + is_conn_error = True + return e + finally: + # Make sure we release the original connection provided by the + # acquire_connection() call above. + if self._client_conn is None: + self._api_client.release_connection(conn, is_conn_error, + is_conn_service_unavail, + rid=self._rid()) + + def _redirect_params(self, conn, headers, allow_release_conn=False): + """Process redirect response, create new connection if necessary. + + Args: + conn: connection that returned the redirect response + headers: response headers of the redirect response + allow_release_conn: if redirecting to a different server, + release existing connection back to connection pool. + + Returns: Return tuple(conn, url) where conn is a connection object + to the redirect target and url is the path of the API request + """ + + url = None + for name, value in headers: + if name.lower() == "location": + url = value + break + if not url: + LOG.warn(_("[%d] Received redirect status without location header" + " field"), self._rid()) + return (conn, None) + # Accept location with the following format: + # 1. /path, redirect to same node + # 2. scheme://hostname:[port]/path where scheme is https or http + # Reject others + # 3. e.g. relative paths, unsupported scheme, unspecified host + result = urlparse.urlparse(url) + if not result.scheme and not result.hostname and result.path: + if result.path[0] == "/": + if result.query: + url = "%s?%s" % (result.path, result.query) + else: + url = result.path + return (conn, url) # case 1 + else: + LOG.warn(_("[%d] Received invalid redirect location: '%s'"), + self._rid(), url) + return (conn, None) # case 3 + elif result.scheme not in ["http", "https"] or not result.hostname: + LOG.warn(_("[%d] Received malformed redirect location: %s"), + self._rid(), url) + return (conn, None) # case 3 + # case 2, redirect location includes a scheme + # so setup a new connection and authenticate + if allow_release_conn: + self._api_client.release_connection(conn) + conn_params = (result.hostname, result.port, result.scheme == "https") + conn = self._api_client.acquire_redirect_connection(conn_params, True, + self._headers) + if result.query: + url = "%s?%s" % (result.path, result.query) + else: + url = result.path + return (conn, url) + + def _rid(self): + '''Return current request id.''' + return self._request_id + + @property def request_error(self): - pass + '''Return any errors associated with this instance.''' + return self._request_error + + def _request_str(self, conn, url): + '''Return string representation of connection.''' + return "%s %s/%s" % (self._method, _conn_str(conn), url) diff --git a/quantum/plugins/nicira/nicira_nvp_plugin/api_client/request_eventlet.py b/quantum/plugins/nicira/nicira_nvp_plugin/api_client/request_eventlet.py index e5f6f100d..1c878d382 100644 --- a/quantum/plugins/nicira/nicira_nvp_plugin/api_client/request_eventlet.py +++ b/quantum/plugins/nicira/nicira_nvp_plugin/api_client/request_eventlet.py @@ -1,75 +1,46 @@ -# Copyright 2009-2012 Nicira Networks, Inc. +# Copyright 2012 Nicira, Inc. # All Rights Reserved # -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. # # vim: tabstop=4 shiftwidth=4 softtabstop=4 # +# @author: Aaron Rosen, Nicira Networks, Inc. + -import copy import eventlet import httplib import json import logging -import request -import time import urllib -import urlparse -import client_eventlet -from common import _conn_str -from eventlet import timeout +from quantum.plugins.nicira.nicira_nvp_plugin.api_client import request eventlet.monkey_patch() - logging.basicConfig(level=logging.INFO) -lg = logging.getLogger("nvp_api_request") +LOG = logging.getLogger(__name__) USER_AGENT = "NVP eventlet client/1.0" -# Default parameters. -DEFAULT_REQUEST_TIMEOUT = 30 -DEFAULT_HTTP_TIMEOUT = 10 -DEFAULT_RETRIES = 2 -DEFAULT_REDIRECTS = 2 -DEFAULT_API_REQUEST_POOL_SIZE = 1000 -DEFAULT_MAXIMUM_REQUEST_ID = 4294967295 - -class NvpApiRequestEventlet: +class NvpApiRequestEventlet(request.NvpApiRequest): '''Eventlet-based ApiRequest class. This class will form the basis for eventlet-based ApiRequest classes (e.g. those used by the Quantum NVP Plugin). ''' - # List of allowed status codes. - ALLOWED_STATUS_CODES = [ - httplib.OK, - httplib.CREATED, - httplib.NO_CONTENT, - httplib.MOVED_PERMANENTLY, - httplib.TEMPORARY_REDIRECT, - httplib.BAD_REQUEST, - httplib.UNAUTHORIZED, - httplib.FORBIDDEN, - httplib.NOT_FOUND, - httplib.CONFLICT, - httplib.INTERNAL_SERVER_ERROR, - httplib.SERVICE_UNAVAILABLE - ] - # Maximum number of green threads present in the system at one time. - API_REQUEST_POOL_SIZE = DEFAULT_API_REQUEST_POOL_SIZE + API_REQUEST_POOL_SIZE = request.DEFAULT_API_REQUEST_POOL_SIZE # Pool of green threads. One green thread is allocated per incoming # request. Incoming requests will block when the pool is empty. @@ -77,18 +48,18 @@ class NvpApiRequestEventlet: # A unique id is assigned to each incoming request. When the current # request id reaches MAXIMUM_REQUEST_ID it wraps around back to 0. - MAXIMUM_REQUEST_ID = DEFAULT_MAXIMUM_REQUEST_ID + MAXIMUM_REQUEST_ID = request.DEFAULT_MAXIMUM_REQUEST_ID # The request id for the next incoming request. CURRENT_REQUEST_ID = 0 def __init__(self, nvp_api_client, url, method="GET", body=None, headers=None, - request_timeout=DEFAULT_REQUEST_TIMEOUT, - retries=DEFAULT_RETRIES, + request_timeout=request.DEFAULT_REQUEST_TIMEOUT, + retries=request.DEFAULT_RETRIES, auto_login=True, - redirects=DEFAULT_REDIRECTS, - http_timeout=DEFAULT_HTTP_TIMEOUT): + redirects=request.DEFAULT_REDIRECTS, + http_timeout=request.DEFAULT_HTTP_TIMEOUT, client_conn=None): '''Constructor.''' self._api_client = nvp_api_client self._url = url @@ -100,6 +71,8 @@ class NvpApiRequestEventlet: self._auto_login = auto_login self._redirects = redirects self._http_timeout = http_timeout + self._client_conn = client_conn + self._abort = False self._request_error = None @@ -126,10 +99,6 @@ class NvpApiRequestEventlet: '''Spawn a new green thread with the supplied function and args.''' return self.__class__._spawn(func, *args, **kwargs) - def _rid(self): - '''Return current request id.''' - return self._request_id - @classmethod def joinall(cls): '''Wait for all outstanding requests to complete.''' @@ -152,197 +121,19 @@ class NvpApiRequestEventlet: self._headers, self._request_timeout, self._retries, self._auto_login, self._redirects, self._http_timeout) - @property - def request_error(self): - '''Return any errors associated with this instance.''' - return self._request_error - def _run(self): '''Method executed within green thread.''' if self._request_timeout: # No timeout exception escapes the with block. - with timeout.Timeout(self._request_timeout, False): + with eventlet.timeout.Timeout(self._request_timeout, False): return self._handle_request() - lg.info(_('[%d] Request timeout.'), self._rid()) + LOG.info(_('[%d] Request timeout.'), self._rid()) self._request_error = Exception(_('Request timeout')) return None else: return self._handle_request() - def _request_str(self, conn, url): - '''Return string representation of connection.''' - return "%s %s/%s" % (self._method, _conn_str(conn), url) - - def _issue_request(self): - '''Issue a request to a provider.''' - conn = self._api_client.acquire_connection(rid=self._rid()) - if conn is None: - error = Exception(_("No API connections available")) - self._request_error = error - return error - - # Preserve the acquired connection as conn may be over-written by - # redirects below. - acquired_conn = conn - - url = self._url - lg.debug(_("[%(rid)d] Issuing - request '%(req)s'"), - {'rid': self._rid(), - 'req': self._request_str(conn, url)}) - issued_time = time.time() - is_conn_error = False - try: - redirects = 0 - while (redirects <= self._redirects): - # Update connection with user specified request timeout, - # the connect timeout is usually smaller so we only set - # the request timeout after a connection is established - if conn.sock is None: - conn.connect() - conn.sock.settimeout(self._http_timeout) - elif conn.sock.gettimeout() != self._http_timeout: - conn.sock.settimeout(self._http_timeout) - - headers = copy.copy(self._headers) - gen = self._api_client.nvp_config_gen - if gen: - headers["X-Nvp-Wait-For-Config-Generation"] = gen - lg.debug(_("Setting %(header)s request header: %(gen)s"), - {'header': 'X-Nvp-Wait-For-Config-Generation', - 'gen': gen}) - try: - conn.request(self._method, url, self._body, headers) - except Exception as e: - lg.warn(_('[%(rid)d] Exception issuing request: %(e)s'), - {'rid': self._rid(), 'e': e}) - raise e - - response = conn.getresponse() - response.body = response.read() - response.headers = response.getheaders() - lg.debug(_("[%(rid)d] Completed request '%(req)s': %(status)s " - "(%(time)0.2f seconds)"), - {'rid': self._rid(), - 'req': self._request_str(conn, url), - 'status': response.status, - 'time': time.time() - issued_time}) - - new_gen = response.getheader('X-Nvp-Config-Generation', None) - if new_gen: - lg.debug(_("Reading %(header)s response header: %(gen)s"), - {'header': 'X-Nvp-config-Generation', - 'gen': new_gen}) - if (self._api_client.nvp_config_gen is None or - self._api_client.nvp_config_gen < int(new_gen)): - self._api_client.nvp_config_gen = int(new_gen) - - if response.status not in [httplib.MOVED_PERMANENTLY, - httplib.TEMPORARY_REDIRECT]: - break - elif redirects >= self._redirects: - lg.info(_("[%d] Maximum redirects exceeded, aborting " - "request"), self._rid()) - break - redirects += 1 - - # In the following call, conn is replaced by the connection - # specified in the redirect response from the server. - conn, url = self._redirect_params(conn, response.headers) - if url is None: - response.status = httplib.INTERNAL_SERVER_ERROR - break - lg.info(_("[%(rid)d] Redirecting request to: %(req)s"), - {'rid': self._rid(), - 'req': self._request_str(conn, url)}) - - # FIX for #9415. If we receive any of these responses, then - # our server did not process our request and may be in an - # errored state. Raise an exception, which will cause the - # the conn to be released with is_conn_error == True - # which puts the conn on the back of the client's priority - # queue. - if response.status >= 500: - lg.warn(_("[%(rid)d] Request '%(method)s %(url)s' " - "received: %(status)s"), - {'rid': self._rid(), 'method': self._method, - 'url': self._url, - 'status': response.status}) - raise Exception(_('Server error return: %s') % - response.status) - return response - except Exception as e: - if isinstance(e, httplib.BadStatusLine): - msg = _("Invalid server response") - else: - msg = unicode(e) - lg.warn(_("[%(rid)d] Failed request '%(req)s': %(msg)s " - "(%(time)0.2f seconds)"), - {'rid': self._rid(), 'req': self._request_str(conn, url), - 'msg': msg, - 'time': time.time() - issued_time}) - self._request_error = e - is_conn_error = True - return e - finally: - # Make sure we release the original connection provided by the - # acquire_connection() call above. - self._api_client.release_connection(acquired_conn, is_conn_error, - rid=self._rid()) - - def _redirect_params(self, conn, headers): - '''Process redirect params from a server response.''' - url = None - for name, value in headers: - if name.lower() == "location": - url = value - break - if not url: - lg.warn(_("[%d] Received redirect status without location header " - "field"), self._rid()) - return (conn, None) - # Accept location with the following format: - # 1. /path, redirect to same node - # 2. scheme://hostname:[port]/path where scheme is https or http - # Reject others - # 3. e.g. relative paths, unsupported scheme, unspecified host - result = urlparse.urlparse(url) - if not result.scheme and not result.hostname and result.path: - if result.path[0] == "/": - if result.query: - url = "%s?%s" % (result.path, result.query) - else: - url = result.path - return (conn, url) # case 1 - else: - lg.warn(_("[%(rid)d] Received invalid redirect location: " - "%(url)s"), - {'rid': self._rid(), 'url': url}) - return (conn, None) # case 3 - elif result.scheme not in ["http", "https"] or not result.hostname: - lg.warn(_("[%(rid)d] Received malformed redirect location: " - "%(url)s"), - {'rid': self._rid(), 'url': url}) - return (conn, None) # case 3 - # case 2, redirect location includes a scheme - # so setup a new connection and authenticate - use_https = result.scheme == "https" - api_providers = [(result.hostname, result.port, use_https)] - api_client = client_eventlet.NvpApiClientEventlet( - api_providers, self._api_client.user, self._api_client.password, - use_https=use_https) - api_client.wait_for_login() - if api_client.auth_cookie: - self._headers["Cookie"] = api_client.auth_cookie - else: - self._headers["Cookie"] = "" - conn = api_client.acquire_connection(rid=self._rid()) - if result.query: - url = "%s?%s" % (result.path, result.query) - else: - url = result.path - return (conn, url) - def _handle_request(self): '''First level request handling.''' attempt = 0 @@ -350,46 +141,41 @@ class NvpApiRequestEventlet: while response is None and attempt <= self._retries: attempt += 1 - if self._auto_login and self._api_client.need_login: - self._api_client.wait_for_login() - - if self._api_client.auth_cookie: - self._headers["Cookie"] = self._api_client.auth_cookie - req = self.spawn(self._issue_request).wait() # automatically raises any exceptions returned. if isinstance(req, httplib.HTTPResponse): - if (req.status == httplib.UNAUTHORIZED + if attempt <= self._retries and not self._abort: + if (req.status == httplib.UNAUTHORIZED or req.status == httplib.FORBIDDEN): - self._api_client.need_login = True - if attempt <= self._retries: continue # else fall through to return the error code - lg.debug(_("[%(rid)d] Completed request '%(method)s %(url)s'" - ": %(status)s"), - {'rid': self._rid(), 'method': self._method, - 'url': self._url, 'status': req.status}) + LOG.debug(_("[%(rid)d] Completed request '%(method)s %(url)s'" + ": %(status)s"), + {'rid': self._rid(), 'method': self._method, + 'url': self._url, 'status': req.status}) self._request_error = None response = req else: - lg.info(_('[%(rid)d] Error while handling request: %(req)s'), - {'rid': self._rid(), 'req': req}) + LOG.info(_('[%(rid)d] Error while handling request: %(req)s'), + {'rid': self._rid(), 'req': req}) self._request_error = req response = None - return response class NvpLoginRequestEventlet(NvpApiRequestEventlet): '''Process a login request.''' - def __init__(self, nvp_client, user, password): - headers = {"Content-Type": "application/x-www-form-urlencoded"} + def __init__(self, nvp_client, user, password, client_conn=None, + headers=None): + if headers is None: + headers = {} + headers.update({"Content-Type": "application/x-www-form-urlencoded"}) body = urllib.urlencode({"username": user, "password": password}) NvpApiRequestEventlet.__init__( self, nvp_client, "/ws.v1/login", "POST", body, headers, - auto_login=False) + auto_login=False, client_conn=client_conn) def session_cookie(self): if self.successful(): @@ -398,7 +184,7 @@ class NvpLoginRequestEventlet(NvpApiRequestEventlet): class NvpGetApiProvidersRequestEventlet(NvpApiRequestEventlet): - '''Get a list of API providers.''' + '''Gej a list of API providers.''' def __init__(self, nvp_client): url = "/ws.v1/control-cluster/node?fields=roles" @@ -427,8 +213,8 @@ class NvpGetApiProvidersRequestEventlet(NvpApiRequestEventlet): ret.append(_provider_from_listen_addr(addr)) return ret except Exception as e: - lg.warn(_("[%(rid)d] Failed to parse API provider: %(e)s"), - {'rid': self._rid(), 'e': e}) + LOG.warn(_("[%(rid)d] Failed to parse API provider: %(e)s"), + {'rid': self._rid(), 'e': e}) # intentionally fall through return None @@ -438,12 +224,11 @@ class NvpGenericRequestEventlet(NvpApiRequestEventlet): def __init__(self, nvp_client, method, url, body, content_type, auto_login=False, - request_timeout=DEFAULT_REQUEST_TIMEOUT, - http_timeout=DEFAULT_HTTP_TIMEOUT, - retries=DEFAULT_RETRIES, - redirects=DEFAULT_REDIRECTS): + request_timeout=request.DEFAULT_REQUEST_TIMEOUT, + http_timeout=request.DEFAULT_HTTP_TIMEOUT, + retries=request.DEFAULT_RETRIES, + redirects=request.DEFAULT_REDIRECTS): headers = {"Content-Type": content_type} - NvpApiRequestEventlet.__init__( self, nvp_client, url, method, body, headers, request_timeout=request_timeout, retries=retries, diff --git a/quantum/plugins/nicira/nicira_nvp_plugin/common/config.py b/quantum/plugins/nicira/nicira_nvp_plugin/common/config.py index 23459f7a7..41db47920 100644 --- a/quantum/plugins/nicira/nicira_nvp_plugin/common/config.py +++ b/quantum/plugins/nicira/nicira_nvp_plugin/common/config.py @@ -21,7 +21,7 @@ nvp_opts = [ cfg.IntOpt('max_lp_per_bridged_ls', default=64), cfg.IntOpt('max_lp_per_overlay_ls', default=256), cfg.IntOpt('concurrent_connections', default=5), - cfg.IntOpt('failover_time', default=240), + cfg.IntOpt('nvp_gen_timeout', default=-1), cfg.StrOpt('default_cluster_name') ] diff --git a/quantum/tests/unit/nicira/test_nvp_api_request_eventlet.py b/quantum/tests/unit/nicira/test_nvp_api_request_eventlet.py index f06aabe35..3e7ec54a2 100644 --- a/quantum/tests/unit/nicira/test_nvp_api_request_eventlet.py +++ b/quantum/tests/unit/nicira/test_nvp_api_request_eventlet.py @@ -134,7 +134,6 @@ class NvpApiRequestEventletTest(unittest.TestCase): myconn.__str__.return_value = 'myconn string' req = self.req - req._request_timeout = REQUEST_TIMEOUT = 1 req._redirect_params = Mock() req._redirect_params.return_value = (myconn, 'url') req._request_str = Mock() @@ -214,60 +213,48 @@ class NvpApiRequestEventletTest(unittest.TestCase): with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.' 'client_eventlet.NvpApiClientEventlet') as mock: api_client = mock.return_value - api_client.wait_for_login.return_value = None - api_client.auth_cookie = 'mycookie' - api_client.acquire_connection.return_value = True + self.req._api_client = api_client myconn = Mock() (conn, retval) = self.req._redirect_params( myconn, [('location', 'https://host:1/path')]) self.assertTrue(retval is not None) - self.assertTrue(api_client.wait_for_login.called) - self.assertTrue(api_client.acquire_connection.called) + self.assertTrue(api_client.acquire_redirect_connection.called) def test_redirect_params_setup_htttps_and_query(self): with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.' 'client_eventlet.NvpApiClientEventlet') as mock: api_client = mock.return_value - api_client.wait_for_login.return_value = None - api_client.auth_cookie = 'mycookie' - api_client.acquire_connection.return_value = True + self.req._api_client = api_client myconn = Mock() (conn, retval) = self.req._redirect_params(myconn, [ ('location', 'https://host:1/path?q=1')]) self.assertTrue(retval is not None) - self.assertTrue(api_client.wait_for_login.called) - self.assertTrue(api_client.acquire_connection.called) + self.assertTrue(api_client.acquire_redirect_connection.called) def test_redirect_params_setup_https_connection_no_cookie(self): with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.' 'client_eventlet.NvpApiClientEventlet') as mock: api_client = mock.return_value - api_client.wait_for_login.return_value = None - api_client.auth_cookie = None - api_client.acquire_connection.return_value = True + self.req._api_client = api_client myconn = Mock() (conn, retval) = self.req._redirect_params(myconn, [ ('location', 'https://host:1/path')]) self.assertTrue(retval is not None) - self.assertTrue(api_client.wait_for_login.called) - self.assertTrue(api_client.acquire_connection.called) + self.assertTrue(api_client.acquire_redirect_connection.called) def test_redirect_params_setup_https_and_query_no_cookie(self): with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.' 'client_eventlet.NvpApiClientEventlet') as mock: api_client = mock.return_value - api_client.wait_for_login.return_value = None - api_client.auth_cookie = None - api_client.acquire_connection.return_value = True + self.req._api_client = api_client myconn = Mock() (conn, retval) = self.req._redirect_params( myconn, [('location', 'https://host:1/path?q=1')]) self.assertTrue(retval is not None) - self.assertTrue(api_client.wait_for_login.called) - self.assertTrue(api_client.acquire_connection.called) + self.assertTrue(api_client.acquire_redirect_connection.called) def test_redirect_params_path_only_with_query(self): with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.'