'''
def __init__(self, api_providers, user, password, request_timeout,
- http_timeout, retries, redirects, failover_time,
- concurrent_connections=3):
+ http_timeout, retries, redirects,
+ concurrent_connections=3, nvp_gen_timeout=-1):
'''Constructor.
:param api_providers: a list of tuples in the form:
controller in the cluster)
:param retries: the number of concurrent connections.
:param redirects: the number of concurrent connections.
- :param failover_time: minimum time between controller failover and new
- connections allowed.
'''
client_eventlet.NvpApiClientEventlet.__init__(
self, api_providers, user, password, concurrent_connections,
- failover_time=failover_time)
+ nvp_gen_timeout)
self._request_timeout = request_timeout
self._http_timeout = http_timeout
if password:
self._password = password
- return client_eventlet.NvpApiClientEventlet.login(self)
+ return client_eventlet.NvpApiClientEventlet._login(self)
def request(self, method, url, body="", content_type="application/json"):
'''Issues request to controller.'''
# FIXME(salvatore-orlando): get rid of relative imports
from common import config
-from quantum.plugins.nicira.nicira_nvp_plugin.api_client import client_eventlet
from nvp_plugin_version import PLUGIN_VERSION
from quantum.plugins.nicira.nicira_nvp_plugin import nicira_models
http_timeout=cluster.http_timeout,
retries=cluster.retries,
redirects=cluster.redirects,
- failover_time=self.nvp_opts.failover_time,
- concurrent_connections=self.nvp_opts.concurrent_connections)
+ concurrent_connections=self.nvp_opts['concurrent_connections'],
+ nvp_gen_timeout=self.nvp_opts['nvp_gen_timeout'])
- # TODO(salvatore-orlando): do login at first request,
- # not when plugin is instantiated
- cluster.api_client.login()
+ if len(self.clusters) == 0:
+ first_cluster = cluster
self.clusters[c_opts['name']] = cluster
def_cluster_name = self.nvp_opts.default_cluster_name
bridged transport zone (default 64)
- concurrent_connections: Number of connects to each controller node
(default 3)
- - failover_time: Time from when a connection pool is switched to another
- controller during failures.
+ - nvp_gen_timout: Number of seconds a generation id should be valid for
+ (default -1 meaning do not time out)
3) NVP cluster
The Quantum NVP plugin allow for configuring multiple clusters.
Each cluster configuration section must be declared in the following way
-# Copyright 2012 Nicira Networks, Inc.
+# Copyright 2012 Nicira, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# @author: Aaron Rosen, Nicira Networks, Inc.
-# Copyright 2009-2012 Nicira Networks, Inc.
+# Copyright 2012 Nicira, Inc.
# All Rights Reserved
#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
-# Author: David Lapsley <dlapsley@nicira.com>, Nicira Networks, Inc.
+# @author: David Lapsley <dlapsley@nicira.com>, Nicira Networks, Inc.
+# @author: Aaron Rosen, Nicira Networks, Inc.
+
from abc import ABCMeta
-from abc import abstractmethod
-from abc import abstractproperty
+import httplib
+import time
+import logging
+
+
+from quantum.plugins.nicira.nicira_nvp_plugin.api_client.common import (
+ _conn_str)
+
+
+logging.basicConfig(level=logging.INFO)
+LOG = logging.getLogger(__name__)
+#Default parameters.
+GENERATION_ID_TIMEOUT = -1
+DEFAULT_CONCURRENT_CONNECTIONS = 3
+DEFAULT_CONNECT_TIMEOUT = 5
class NvpApiClient(object):
CONN_IDLE_TIMEOUT = 60 * 15
- @abstractmethod
- def update_providers(self, api_providers):
- pass
+ def _create_connection(self, host, port, is_ssl):
+ if is_ssl:
+ return httplib.HTTPSConnection(host, port,
+ timeout=self._connect_timeout)
+ return httplib.HTTPConnection(host, port,
+ timeout=self._connect_timeout)
+
+ @staticmethod
+ def _conn_params(http_conn):
+ is_ssl = isinstance(http_conn, httplib.HTTPSConnection)
+ return (http_conn.host, http_conn.port, is_ssl)
- @abstractproperty
+ @property
def user(self):
- pass
+ return self._user
- @abstractproperty
+ @property
def password(self):
- pass
+ return self._password
+
+ @property
+ def nvp_config_gen(self):
+ # If nvp_gen_timeout is not -1 then:
+ # Maintain a timestamp along with the generation ID. Hold onto the
+ # ID long enough to be useful and block on sequential requests but
+ # not long enough to persist when Onix db is cleared, which resets
+ # the generation ID, causing the DAL to block indefinitely with some
+ # number that's higher than the cluster's value.
+ if self._nvp_gen_timeout != -1:
+ ts = self._nvp_config_gen_ts
+ if ts is not None:
+ if (time.time() - ts) > self._nvp_gen_timeout:
+ return None
+ return self._nvp_config_gen
+
+ @nvp_config_gen.setter
+ def nvp_config_gen(self, value):
+ if self._nvp_config_gen != value:
+ if self._nvp_gen_timeout != -1:
+ self._nvp_config_gen_ts = time.time()
+ self._nvp_config_gen = value
+
+ def auth_cookie(self, conn):
+ cookie = None
+ data = self._get_provider_data(conn)
+ if data:
+ cookie = data[1]
+ return cookie
+
+ def set_auth_cookie(self, conn, cookie):
+ data = self._get_provider_data(conn)
+ if data:
+ self._set_provider_data(conn, (data[0], cookie))
+
+ def acquire_connection(self, auto_login=True, headers=None, rid=-1):
+ '''Check out an available HTTPConnection instance.
+
+ Blocks until a connection is available.
+ :auto_login: automatically logins before returning conn
+ :headers: header to pass on to login attempt
+ :param rid: request id passed in from request eventlet.
+ :returns: An available HTTPConnection instance or None if no
+ api_providers are configured.
+ '''
+ if not self._api_providers:
+ LOG.warn(_("[%d] no API providers currently available."), rid)
+ return None
+ if self._conn_pool.empty():
+ LOG.debug(_("[%d] Waiting to acquire API client connection."), rid)
+ priority, conn = self._conn_pool.get()
+ now = time.time()
+ if getattr(conn, 'last_used', now) < now - self.CONN_IDLE_TIMEOUT:
+ LOG.info(_("[%d] Connection %s idle for %0.2f seconds; "
+ "reconnecting."),
+ rid, _conn_str(conn), now - conn.last_used)
+ conn = self._create_connection(*self._conn_params(conn))
- @abstractproperty
- def auth_cookie(self):
- pass
+ conn.last_used = now
+ conn.priority = priority # stash current priority for release
+ qsize = self._conn_pool.qsize()
+ LOG.debug(_("[%d] Acquired connection %s. %d connection(s) "
+ "available."), rid, _conn_str(conn), qsize)
+ if auto_login and self.auth_cookie(conn) is None:
+ self._wait_for_login(conn, headers)
+ return conn
- @abstractmethod
- def acquire_connection(self):
- pass
+ def release_connection(self, http_conn, bad_state=False,
+ service_unavail=False, rid=-1):
+ '''Mark HTTPConnection instance as available for check-out.
- @abstractmethod
- def release_connection(self, http_conn, bad_state=False):
- pass
+ :param http_conn: An HTTPConnection instance obtained from this
+ instance.
+ :param bad_state: True if http_conn is known to be in a bad state
+ (e.g. connection fault.)
+ :service_unavail: True if http_conn returned 503 response.
+ :param rid: request id passed in from request eventlet.
+ '''
+ conn_params = self._conn_params(http_conn)
+ if self._conn_params(http_conn) not in self._api_providers:
+ LOG.debug(_("[%d] Released connection '%s' is not an API provider "
+ "for the cluster"), rid, _conn_str(http_conn))
+ return
+ elif hasattr(http_conn, "no_release"):
+ return
- @abstractproperty
- def need_login(self):
- pass
+ if bad_state:
+ # Reconnect to provider.
+ LOG.warn(_("[%d] Connection returned in bad state, reconnecting "
+ "to %s"), rid, _conn_str(http_conn))
+ http_conn = self._create_connection(*self._conn_params(http_conn))
+ priority = self._next_conn_priority
+ self._next_conn_priority += 1
+ elif service_unavail:
+ # http_conn returned a service unaviable response, put other
+ # connections to the same controller at end of priority queue,
+ conns = []
+ while not self._conn_pool.empty():
+ priority, conn = self._conn_pool.get()
+ if self._conn_params(conn) == conn_params:
+ priority = self._next_conn_priority
+ self._next_conn_priority += 1
+ conns.append((priority, conn))
+ for priority, conn in conns:
+ self._conn_pool.put((priority, conn))
+ # put http_conn at end of queue also
+ priority = self._next_conn_priority
+ self._next_conn_priority += 1
+ else:
+ priority = http_conn.priority
+
+ self._conn_pool.put((priority, http_conn))
+ LOG.debug(_("[%d] Released connection %s. %d connection(s) "
+ "available."),
+ rid, _conn_str(http_conn), self._conn_pool.qsize())
+
+ def _wait_for_login(self, conn, headers=None):
+ '''Block until a login has occurred for the current API provider.'''
+
+ data = self._get_provider_data(conn)
+ if data is None:
+ LOG.error(_("Login request for an invalid connection: '%s'"),
+ _conn_str(conn))
+ return
+ provider_sem = data[0]
+ if provider_sem.acquire(blocking=False):
+ try:
+ cookie = self._login(conn, headers)
+ self.set_auth_cookie(conn, cookie)
+ finally:
+ provider_sem.release()
+ else:
+ LOG.debug(_("Waiting for auth to complete"))
+ # Wait until we can aquire then release
+ provider_sem.acquire(blocking=True)
+ provider_sem.release()
+
+ def _get_provider_data(self, conn_or_conn_params, default=None):
+ """Get data for specified API provider.
+
+ Args:
+ conn_or_conn_params: either a HTTP(S)Connection object or the
+ resolved conn_params tuple returned by self._conn_params().
+ default: conn_params if ones passed aren't known
+ Returns: Data associated with specified provider
+ """
+ conn_params = self._normalize_conn_params(conn_or_conn_params)
+ return self._api_provider_data.get(conn_params, default)
+
+ def _set_provider_data(self, conn_or_conn_params, data):
+ """Set data for specified API provider.
+
+ Args:
+ conn_or_conn_params: either a HTTP(S)Connection object or the
+ resolved conn_params tuple returned by self._conn_params().
+ data: data to associate with API provider
+ """
+ conn_params = self._normalize_conn_params(conn_or_conn_params)
+ if data is None:
+ del self._api_provider_data[conn_params]
+ else:
+ self._api_provider_data[conn_params] = data
+
+ def _normalize_conn_params(self, conn_or_conn_params):
+ """Normalize conn_param tuple.
+
+ Args:
+ conn_or_conn_params: either a HTTP(S)Connection object or the
+ resolved conn_params tuple returned by self._conn_params().
+
+ Returns: Normalized conn_param tuple
+ """
+ if (not isinstance(conn_or_conn_params, tuple) and
+ not isinstance(conn_or_conn_params, httplib.HTTPConnection)):
+ LOG.debug(_("Invalid conn_params value: '%s'"),
+ str(conn_or_conn_params))
+ return conn_or_conn_params
+ if isinstance(conn_or_conn_params, httplib.HTTPConnection):
+ conn_params = self._conn_params(conn_or_conn_params)
+ else:
+ conn_params = conn_or_conn_params
+ host, port, is_ssl = conn_params
+ if port is None:
+ port = 443 if is_ssl else 80
+ return (host, port, is_ssl)
+
+ def update_providers(self, api_providers):
+ new_providers = set([tuple(p) for p in api_providers])
+ if new_providers != self._api_providers:
+ new_conns = []
+ while not self._conn_pool.empty():
+ priority, conn = self._conn_pool.get_nowait()
+ if self._conn_params(conn) in new_providers:
+ new_conns.append((priority, conn))
- @abstractmethod
- def wait_for_login(self):
- pass
+ to_subtract = self._api_providers - new_providers
+ for p in to_subtract:
+ self._set_provider_data(p, None)
+ to_add = new_providers - self._api_providers
+ for unused_i in range(self._concurrent_connections):
+ for host, port, is_ssl in to_add:
+ conn = self._create_connection(host, port, is_ssl)
+ new_conns.append((self._next_conn_priority, conn))
+ self._next_conn_priority += 1
- @abstractmethod
- def login(self):
- pass
+ for priority, conn in new_conns:
+ self._conn_pool.put((priority, conn))
+ self._api_providers = new_providers
-# Copyright 2009-2012 Nicira Networks, Inc.
+# Copyright 2012 Nicira, Inc.
# All Rights Reserved
#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
+# @author: Aaron Rosen, Nicira Networks, Inc.
+
-import client
import eventlet
-import httplib
import logging
-import request_eventlet
import time
-from common import _conn_str
+from quantum.plugins.nicira.nicira_nvp_plugin.api_client import client
+from quantum.plugins.nicira.nicira_nvp_plugin.api_client import (
+ request_eventlet)
eventlet.monkey_patch()
logging.basicConfig(level=logging.INFO)
LOG = logging.getLogger(__name__)
-# Default parameters.
-DEFAULT_FAILOVER_TIME = 5
-DEFAULT_CONCURRENT_CONNECTIONS = 3
-DEFAULT_CONNECT_TIMEOUT = 5
-GENERATION_ID_TIMEOUT = -1 # if set to -1 then disabled
-
-class NvpApiClientEventlet(object):
+class NvpApiClientEventlet(client.NvpApiClient):
'''Eventlet-based implementation of NvpApiClient ABC.'''
- CONN_IDLE_TIMEOUT = 60 * 15
-
def __init__(self, api_providers, user, password,
- concurrent_connections=DEFAULT_CONCURRENT_CONNECTIONS,
- use_https=True,
- connect_timeout=DEFAULT_CONNECT_TIMEOUT,
- failover_time=DEFAULT_FAILOVER_TIME,
- nvp_gen_timeout=GENERATION_ID_TIMEOUT):
+ concurrent_connections=client.DEFAULT_CONCURRENT_CONNECTIONS,
+ nvp_gen_timeout=client.GENERATION_ID_TIMEOUT, use_https=True,
+ connect_timeout=client.DEFAULT_CONNECT_TIMEOUT):
'''Constructor
:param api_providers: a list of tuples of the form: (host, port,
:param concurrent_connections: total number of concurrent connections.
:param use_https: whether or not to use https for requests.
:param connect_timeout: connection timeout in seconds.
- :param failover_time: time from when a connection pool is switched to
- the next connection released via acquire_connection().
:param nvp_gen_timeout controls how long the generation id is kept
if set to -1 the generation id is never timed out
'''
if not api_providers:
api_providers = []
self._api_providers = set([tuple(p) for p in api_providers])
+ self._api_provider_data = {} # tuple(semaphore, nvp_session_cookie)
+ for p in self._api_providers:
+ self._set_provider_data(p, (eventlet.semaphore.Semaphore(1), None))
self._user = user
self._password = password
self._concurrent_connections = concurrent_connections
self._use_https = use_https
self._connect_timeout = connect_timeout
- self._failover_time = failover_time
self._nvp_config_gen = None
self._nvp_config_gen_ts = None
self._nvp_gen_timeout = nvp_gen_timeout
# Connection pool is a list of queues.
- self._conn_pool = list()
- conn_pool_idx = 0
+ self._conn_pool = eventlet.queue.PriorityQueue()
+ self._next_conn_priority = 1
for host, port, is_ssl in api_providers:
- provider_conn_pool = eventlet.queue.Queue(
- maxsize=concurrent_connections)
for i in range(concurrent_connections):
- # All connections in a provider_conn_poool have the
- # same priority (they connect to the same server).
conn = self._create_connection(host, port, is_ssl)
- conn.idx = conn_pool_idx
- provider_conn_pool.put(conn)
-
- self._conn_pool.append(provider_conn_pool)
- conn_pool_idx += 1
-
- self._active_conn_pool_idx = 0
-
- self._cookie = None
- self._need_login = True
- self._doing_login_sem = eventlet.semaphore.Semaphore(1)
-
- def _create_connection(self, host, port, is_ssl):
- if is_ssl:
- return httplib.HTTPSConnection(host, port,
- timeout=self._connect_timeout)
- return httplib.HTTPConnection(host, port,
- timeout=self._connect_timeout)
-
- @staticmethod
- def _conn_params(http_conn):
- is_ssl = isinstance(http_conn, httplib.HTTPSConnection)
- return (http_conn.host, http_conn.port, is_ssl)
-
- def update_providers(self, api_providers):
- raise Exception(_('update_providers() not implemented.'))
-
- @property
- def user(self):
- return self._user
-
- @property
- def password(self):
- return self._password
-
- @property
- def nvp_config_gen(self):
- # If nvp_gen_timeout is not -1 then:
- # Maintain a timestamp along with the generation ID. Hold onto the
- # ID long enough to be useful and block on sequential requests but
- # not long enough to persist when Onix db is cleared, which resets
- # the generation ID, causing the DAL to block indefinitely with some
- # number that's higher than the cluster's value.
- if self._nvp_gen_timeout != -1:
- ts = self._nvp_config_gen_ts
- if ts is not None:
- if (time.time() - ts) > self._nvp_gen_timeout:
- return None
- return self._nvp_config_gen
-
- @nvp_config_gen.setter
- def nvp_config_gen(self, value):
- if self._nvp_config_gen != value:
- if self._nvp_gen_timeout != -1:
- self._nvp_config_gen_ts = time.time()
- self._nvp_config_gen = value
-
- @property
- def auth_cookie(self):
- return self._cookie
-
- def acquire_connection(self, rid=-1):
- '''Check out an available HTTPConnection instance.
-
- Blocks until a connection is available.
-
- :param rid: request id passed in from request eventlet.
- :returns: An available HTTPConnection instance or None if no
- api_providers are configured.
- '''
- if not self._api_providers:
- LOG.warn(_("[%d] no API providers currently available."), rid)
- return None
-
- # The sleep time is to give controllers time to become consistent after
- # there has been a change in the controller used as the api_provider.
- now = time.time()
- if now < getattr(self, '_issue_conn_barrier', now):
- LOG.warn(_("[%d] Waiting for failover timer to expire."), rid)
- time.sleep(self._issue_conn_barrier - now)
-
- # Print out a warning if all connections are in use.
- if self._conn_pool[self._active_conn_pool_idx].empty():
- LOG.debug(_("[%d] Waiting to acquire client connection."), rid)
-
- # Try to acquire a connection (block in get() until connection
- # available or timeout occurs).
- active_conn_pool_idx = self._active_conn_pool_idx
- conn = self._conn_pool[active_conn_pool_idx].get()
-
- if active_conn_pool_idx != self._active_conn_pool_idx:
- # active_conn_pool became inactive while we were waiting.
- # Put connection back on old pool and try again.
- LOG.warn(_("[%(rid)d] Active pool expired while waiting for "
- "connection: %(conn)s"),
- {'rid': rid, 'conn': _conn_str(conn)})
- self._conn_pool[active_conn_pool_idx].put(conn)
- return self.acquire_connection(rid=rid)
-
- # Check if the connection has been idle too long.
- now = time.time()
- if getattr(conn, 'last_used', now) < now - self.CONN_IDLE_TIMEOUT:
- LOG.info(_("[%(rid)d] Connection %(conn)s idle for %(sec)0.2f "
- "seconds; reconnecting."),
- {'rid': rid, 'conn': _conn_str(conn),
- 'sec': now - conn.last_used})
- conn = self._create_connection(*self._conn_params(conn))
-
- # Stash conn pool so conn knows where to go when it releases.
- conn.idx = self._active_conn_pool_idx
-
- conn.last_used = now
- qsize = self._conn_pool[self._active_conn_pool_idx].qsize()
- LOG.debug(_("[%(rid)d] Acquired connection %(conn)s. %(qsize)d "
- "connection(s) available."),
- {'rid': rid, 'conn': _conn_str(conn), 'qsize': qsize})
- return conn
-
- def release_connection(self, http_conn, bad_state=False, rid=-1):
- '''Mark HTTPConnection instance as available for check-out.
-
- :param http_conn: An HTTPConnection instance obtained from this
- instance.
- :param bad_state: True if http_conn is known to be in a bad state
- (e.g. connection fault.)
- :param rid: request id passed in from request eventlet.
- '''
- if self._conn_params(http_conn) not in self._api_providers:
- LOG.warn(_("[%(rid)d] Released connection '%(conn)s' is not an "
- "API provider for the cluster"),
- {'rid': rid, 'conn': _conn_str(http_conn)})
- return
-
- # Retrieve "home" connection pool.
- conn_pool_idx = http_conn.idx
- conn_pool = self._conn_pool[conn_pool_idx]
- if bad_state:
- # Reconnect to provider.
- LOG.warn(_("[%(rid)d] Connection returned in bad state, "
- "reconnecting to %(conn)s"),
- {'rid': rid, 'conn': _conn_str(http_conn)})
- http_conn = self._create_connection(*self._conn_params(http_conn))
- http_conn.idx = conn_pool_idx
-
- if self._active_conn_pool_idx == http_conn.idx:
- # This pool is no longer in a good state. Switch to next pool.
- self._active_conn_pool_idx += 1
- self._active_conn_pool_idx %= len(self._conn_pool)
- LOG.warn(_("[%(rid)d] Switched active_conn_pool from "
- "%(idx)d to %(pool_idx)d."),
- {'rid': rid, 'idx': http_conn.idx,
- 'pool_idx': self._active_conn_pool_idx})
-
- # No connections to the new provider allowed until after this
- # timer has expired (allow time for synchronization).
- self._issue_conn_barrier = time.time() + self._failover_time
-
- conn_pool.put(http_conn)
- LOG.debug(_("[%(rid)d] Released connection %(conn)s. "
- "%(qsize)d connection(s) available."),
- {'rid': rid, 'conn': _conn_str(http_conn),
- 'qsize': conn_pool.qsize()})
-
- @property
- def need_login(self):
- return self._need_login
-
- @need_login.setter
- def need_login(self, val=True):
- self._need_login = val
-
- def wait_for_login(self):
- '''Block until a login has occurred for the current API provider.'''
- if self._need_login:
- if self._doing_login_sem.acquire(blocking=False):
- self.login()
- self._doing_login_sem.release()
- else:
- LOG.debug(_("Waiting for auth to complete"))
- self._doing_login_sem.acquire()
- self._doing_login_sem.release()
- return self._cookie
-
- def login(self):
+ self._conn_pool.put((self._next_conn_priority, conn))
+ self._next_conn_priority += 1
+
+ def acquire_redirect_connection(self, conn_params, auto_login=True,
+ headers=None):
+ """Check out or create connection to redirected NVP API server.
+
+ Args:
+ conn_params: tuple specifying target of redirect, see
+ self._conn_params()
+ auto_login: returned connection should have valid session cookie
+ headers: headers to pass on if auto_login
+
+ Returns: An available HTTPConnection instance corresponding to the
+ specified conn_params. If a connection did not previously
+ exist, new connections are created with the highest prioity
+ in the connection pool and one of these new connections
+ returned.
+ """
+ result_conn = None
+ data = self._get_provider_data(conn_params)
+ if data:
+ # redirect target already exists in provider data and connections
+ # to the provider have been added to the connection pool. Try to
+ # obtain a connection from the pool, note that it's possible that
+ # all connection to the provider are currently in use.
+ conns = []
+ while not self._conn_pool.empty():
+ priority, conn = self._conn_pool.get_nowait()
+ if not result_conn and self._conn_params(conn) == conn_params:
+ conn.priority = priority
+ result_conn = conn
+ else:
+ conns.append((priority, conn))
+ for priority, conn in conns:
+ self._conn_pool.put((priority, conn))
+ # hack: if no free connections available, create new connection
+ # and stash "no_release" attribute (so that we only exceed
+ # self._concurrent_connections temporarily)
+ if not result_conn:
+ conn = self._create_connection(*conn_params)
+ conn.priority = 0 # redirect connections ahve highest priority
+ conn.no_release = True
+ result_conn = conn
+ else:
+ #redirect target not already known, setup provider lists
+ self._api_providers.update([conn_params])
+ self._set_provider_data(conn_params,
+ (eventlet.semaphore.Semaphore(1), None))
+ # redirects occur during cluster upgrades, i.e. results to old
+ # redirects to new, so give redirect targets highest priority
+ priority = 0
+ for i in range(self._concurrent_connections):
+ conn = self._create_connection(*conn_params)
+ conn.priority = priority
+ if i == self._concurrent_connections - 1:
+ break
+ self._conn_pool.put((priority, conn))
+ result_conn = conn
+ if result_conn:
+ result_conn.last_used = time.time()
+ if auto_login and self.auth_cookie(conn) is None:
+ self._wait_for_login(result_conn, headers)
+ return result_conn
+
+ def _login(self, conn=None, headers=None):
'''Issue login request and update authentication cookie.'''
+ cookie = None
g = request_eventlet.NvpLoginRequestEventlet(
- self, self._user, self._password)
+ self, self._user, self._password, conn, headers)
g.start()
ret = g.join()
-
if ret:
if isinstance(ret, Exception):
LOG.error(_('NvpApiClient: login error "%s"'), ret)
raise ret
- self._cookie = None
cookie = ret.getheader("Set-Cookie")
if cookie:
LOG.debug(_("Saving new authentication cookie '%s'"), cookie)
- self._cookie = cookie
- self._need_login = False
-
- # TODO: or ret is an error.
- if not ret:
- return None
-
- return self._cookie
+ return cookie
# Register as subclass.
client.NvpApiClient.register(NvpApiClientEventlet)
-# Copyright 2009-2012 Nicira Networks, Inc.
+# Copyright 2012 Nicira, Inc.
# All Rights Reserved
#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
+# @author: Aaron Rosen, Nicira Networks, Inc.
+
import httplib
-import mock
def _conn_str(conn):
proto = "https://"
elif isinstance(conn, httplib.HTTPConnection):
proto = "http://"
- elif isinstance(conn, mock.Mock):
- proto = "http://"
else:
raise TypeError(_('_conn_str() invalid connection type: %s') %
type(conn))
-# Copyright 2009-2012 Nicira Networks, Inc.
+# Copyright 2012 Nicira, Inc.
# All Rights Reserved
#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
+# @author: Aaron Rosen, Nicira Networks, Inc.
+
from abc import ABCMeta
from abc import abstractmethod
-from abc import abstractproperty
+import copy
+import httplib
+import logging
+import time
+import urlparse
+
+from quantum.plugins.nicira.nicira_nvp_plugin.api_client.common import (
+ _conn_str)
+
+logging.basicConfig(level=logging.INFO)
+LOG = logging.getLogger(__name__)
+# Default parameters.
+DEFAULT_REQUEST_TIMEOUT = 30
+DEFAULT_HTTP_TIMEOUT = 10
+DEFAULT_RETRIES = 2
+DEFAULT_REDIRECTS = 2
+DEFAULT_API_REQUEST_POOL_SIZE = 1000
+DEFAULT_MAXIMUM_REQUEST_ID = 4294967295
+DOWNLOAD_TIMEOUT = 180 # The UI code has a coorespoind 190 sec timeout
+ # for downloads, see: django/nvp_console/views.py
-class NvpApiRequest:
+
+class NvpApiRequest(object):
'''An abstract baseclass for all ApiRequest implementations.
This defines the interface and property structure for both eventlet and
__metaclass__ = ABCMeta
+ # List of allowed status codes.
+ ALLOWED_STATUS_CODES = [
+ httplib.OK,
+ httplib.CREATED,
+ httplib.NO_CONTENT,
+ httplib.MOVED_PERMANENTLY,
+ httplib.TEMPORARY_REDIRECT,
+ httplib.BAD_REQUEST,
+ httplib.UNAUTHORIZED,
+ httplib.FORBIDDEN,
+ httplib.NOT_FOUND,
+ httplib.CONFLICT,
+ httplib.INTERNAL_SERVER_ERROR,
+ httplib.SERVICE_UNAVAILABLE
+ ]
+
@abstractmethod
def start(self):
pass
def copy(self):
pass
- @abstractproperty
+ def _issue_request(self):
+ '''Issue a request to a provider.'''
+ conn = (self._client_conn or
+ self._api_client.acquire_connection(True,
+ copy.copy(self._headers),
+ rid=self._rid()))
+ if conn is None:
+ error = Exception("No API connections available")
+ self._request_error = error
+ return error
+
+ url = self._url
+ LOG.debug(_("[%d] Issuing - request '%s'"),
+ self._rid(), self._request_str(conn, url))
+ issued_time = time.time()
+ is_conn_error = False
+ is_conn_service_unavail = False
+ try:
+ redirects = 0
+ while (redirects <= self._redirects):
+ # Update connection with user specified request timeout,
+ # the connect timeout is usually smaller so we only set
+ # the request timeout after a connection is established
+ if conn.sock is None:
+ conn.connect()
+ conn.sock.settimeout(self._http_timeout)
+ elif conn.sock.gettimeout() != self._http_timeout:
+ conn.sock.settimeout(self._http_timeout)
+
+ headers = copy.copy(self._headers)
+ cookie = self._api_client.auth_cookie(conn)
+ if cookie:
+ headers["Cookie"] = cookie
+
+ gen = self._api_client.nvp_config_gen
+ if gen:
+ headers["X-Nvp-Wait-For-Config-Generation"] = gen
+ LOG.debug(_("Setting %s request header: '%s'"),
+ 'X-Nvp-Wait-For-Config-Generation', gen)
+ try:
+ conn.request(self._method, url, self._body, headers)
+ except Exception as e:
+ LOG.warn(_("[%d] Exception issuing request: '%s'"),
+ self._rid(), e)
+ raise e
+
+ response = conn.getresponse()
+ response.body = response.read()
+ response.headers = response.getheaders()
+ LOG.debug(_("[%d] Completed request '%s': %s (%0.2f seconds)"),
+ self._rid(), self._request_str(conn, url),
+ response.status, time.time() - issued_time)
+
+ new_gen = response.getheader('X-Nvp-Config-Generation', None)
+ if new_gen:
+ LOG.debug(_("Reading '%s' response header: '%s'"),
+ 'X-Nvp-config-Generation', new_gen)
+ if (self._api_client.nvp_config_gen is None or
+ self._api_client.nvp_config_gen < int(new_gen)):
+ self._api_client.nvp_config_gen = int(new_gen)
+
+ if response.status == httplib.UNAUTHORIZED:
+
+ if cookie is None and self._url != "/ws.v1/login":
+ # The connection still has no valid cookie despite
+ # attemps to authenticate and the request has failed
+ # with unauthorized status code. If this isn't a
+ # a request to authenticate, we should abort the
+ # request since there is no point in retrying.
+ self._abort = True
+ else:
+ # If request is unauthorized, clear the session cookie
+ # for the current provider so that subsequent requests
+ # to the same provider triggers re-authentication.
+ self._api_client.set_auth_cookie(conn, None)
+
+ self._api_client.set_auth_cookie(conn, None)
+ elif response.status == httplib.SERVICE_UNAVAILABLE:
+ is_conn_service_unavail = True
+
+ if response.status not in [httplib.MOVED_PERMANENTLY,
+ httplib.TEMPORARY_REDIRECT]:
+ break
+ elif redirects >= self._redirects:
+ LOG.info(_("[%d] Maximum redirects exceeded, aborting "
+ "request"), self._rid())
+ break
+ redirects += 1
+
+ conn, url = self._redirect_params(conn, response.headers,
+ self._client_conn is None)
+ if url is None:
+ response.status = httplib.INTERNAL_SERVER_ERROR
+ break
+ LOG.info(_("[%d] Redirecting request to: '%s'"),
+ self._rid(), self._request_str(conn, url))
+
+ # If we receive any of these responses, then
+ # our server did not process our request and may be in an
+ # errored state. Raise an exception, which will cause the
+ # the conn to be released with is_conn_error == True
+ # which puts the conn on the back of the client's priority
+ # queue.
+ if response.status >= 500:
+ LOG.warn(_("[%d] Request '%s %s' received: %s"),
+ self._rid(), self._method, self._url,
+ response.status)
+ raise Exception('Server error return: %s' %
+ response.status)
+ return response
+ except Exception as e:
+ if isinstance(e, httplib.BadStatusLine):
+ msg = "Invalid server response"
+ else:
+ msg = unicode(e)
+ LOG.warn(_("[%d] Failed request '%s': '%s' (%0.2f seconds)"),
+ self._rid(), self._request_str(conn, url), msg,
+ time.time() - issued_time)
+ self._request_error = e
+ is_conn_error = True
+ return e
+ finally:
+ # Make sure we release the original connection provided by the
+ # acquire_connection() call above.
+ if self._client_conn is None:
+ self._api_client.release_connection(conn, is_conn_error,
+ is_conn_service_unavail,
+ rid=self._rid())
+
+ def _redirect_params(self, conn, headers, allow_release_conn=False):
+ """Process redirect response, create new connection if necessary.
+
+ Args:
+ conn: connection that returned the redirect response
+ headers: response headers of the redirect response
+ allow_release_conn: if redirecting to a different server,
+ release existing connection back to connection pool.
+
+ Returns: Return tuple(conn, url) where conn is a connection object
+ to the redirect target and url is the path of the API request
+ """
+
+ url = None
+ for name, value in headers:
+ if name.lower() == "location":
+ url = value
+ break
+ if not url:
+ LOG.warn(_("[%d] Received redirect status without location header"
+ " field"), self._rid())
+ return (conn, None)
+ # Accept location with the following format:
+ # 1. /path, redirect to same node
+ # 2. scheme://hostname:[port]/path where scheme is https or http
+ # Reject others
+ # 3. e.g. relative paths, unsupported scheme, unspecified host
+ result = urlparse.urlparse(url)
+ if not result.scheme and not result.hostname and result.path:
+ if result.path[0] == "/":
+ if result.query:
+ url = "%s?%s" % (result.path, result.query)
+ else:
+ url = result.path
+ return (conn, url) # case 1
+ else:
+ LOG.warn(_("[%d] Received invalid redirect location: '%s'"),
+ self._rid(), url)
+ return (conn, None) # case 3
+ elif result.scheme not in ["http", "https"] or not result.hostname:
+ LOG.warn(_("[%d] Received malformed redirect location: %s"),
+ self._rid(), url)
+ return (conn, None) # case 3
+ # case 2, redirect location includes a scheme
+ # so setup a new connection and authenticate
+ if allow_release_conn:
+ self._api_client.release_connection(conn)
+ conn_params = (result.hostname, result.port, result.scheme == "https")
+ conn = self._api_client.acquire_redirect_connection(conn_params, True,
+ self._headers)
+ if result.query:
+ url = "%s?%s" % (result.path, result.query)
+ else:
+ url = result.path
+ return (conn, url)
+
+ def _rid(self):
+ '''Return current request id.'''
+ return self._request_id
+
+ @property
def request_error(self):
- pass
+ '''Return any errors associated with this instance.'''
+ return self._request_error
+
+ def _request_str(self, conn, url):
+ '''Return string representation of connection.'''
+ return "%s %s/%s" % (self._method, _conn_str(conn), url)
-# Copyright 2009-2012 Nicira Networks, Inc.
+# Copyright 2012 Nicira, Inc.
# All Rights Reserved
#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
+# @author: Aaron Rosen, Nicira Networks, Inc.
+
-import copy
import eventlet
import httplib
import json
import logging
-import request
-import time
import urllib
-import urlparse
-import client_eventlet
-from common import _conn_str
-from eventlet import timeout
+from quantum.plugins.nicira.nicira_nvp_plugin.api_client import request
eventlet.monkey_patch()
-
logging.basicConfig(level=logging.INFO)
-lg = logging.getLogger("nvp_api_request")
+LOG = logging.getLogger(__name__)
USER_AGENT = "NVP eventlet client/1.0"
-# Default parameters.
-DEFAULT_REQUEST_TIMEOUT = 30
-DEFAULT_HTTP_TIMEOUT = 10
-DEFAULT_RETRIES = 2
-DEFAULT_REDIRECTS = 2
-DEFAULT_API_REQUEST_POOL_SIZE = 1000
-DEFAULT_MAXIMUM_REQUEST_ID = 4294967295
-
-class NvpApiRequestEventlet:
+class NvpApiRequestEventlet(request.NvpApiRequest):
'''Eventlet-based ApiRequest class.
This class will form the basis for eventlet-based ApiRequest classes
(e.g. those used by the Quantum NVP Plugin).
'''
- # List of allowed status codes.
- ALLOWED_STATUS_CODES = [
- httplib.OK,
- httplib.CREATED,
- httplib.NO_CONTENT,
- httplib.MOVED_PERMANENTLY,
- httplib.TEMPORARY_REDIRECT,
- httplib.BAD_REQUEST,
- httplib.UNAUTHORIZED,
- httplib.FORBIDDEN,
- httplib.NOT_FOUND,
- httplib.CONFLICT,
- httplib.INTERNAL_SERVER_ERROR,
- httplib.SERVICE_UNAVAILABLE
- ]
-
# Maximum number of green threads present in the system at one time.
- API_REQUEST_POOL_SIZE = DEFAULT_API_REQUEST_POOL_SIZE
+ API_REQUEST_POOL_SIZE = request.DEFAULT_API_REQUEST_POOL_SIZE
# Pool of green threads. One green thread is allocated per incoming
# request. Incoming requests will block when the pool is empty.
# A unique id is assigned to each incoming request. When the current
# request id reaches MAXIMUM_REQUEST_ID it wraps around back to 0.
- MAXIMUM_REQUEST_ID = DEFAULT_MAXIMUM_REQUEST_ID
+ MAXIMUM_REQUEST_ID = request.DEFAULT_MAXIMUM_REQUEST_ID
# The request id for the next incoming request.
CURRENT_REQUEST_ID = 0
def __init__(self, nvp_api_client, url, method="GET", body=None,
headers=None,
- request_timeout=DEFAULT_REQUEST_TIMEOUT,
- retries=DEFAULT_RETRIES,
+ request_timeout=request.DEFAULT_REQUEST_TIMEOUT,
+ retries=request.DEFAULT_RETRIES,
auto_login=True,
- redirects=DEFAULT_REDIRECTS,
- http_timeout=DEFAULT_HTTP_TIMEOUT):
+ redirects=request.DEFAULT_REDIRECTS,
+ http_timeout=request.DEFAULT_HTTP_TIMEOUT, client_conn=None):
'''Constructor.'''
self._api_client = nvp_api_client
self._url = url
self._auto_login = auto_login
self._redirects = redirects
self._http_timeout = http_timeout
+ self._client_conn = client_conn
+ self._abort = False
self._request_error = None
'''Spawn a new green thread with the supplied function and args.'''
return self.__class__._spawn(func, *args, **kwargs)
- def _rid(self):
- '''Return current request id.'''
- return self._request_id
-
@classmethod
def joinall(cls):
'''Wait for all outstanding requests to complete.'''
self._headers, self._request_timeout, self._retries,
self._auto_login, self._redirects, self._http_timeout)
- @property
- def request_error(self):
- '''Return any errors associated with this instance.'''
- return self._request_error
-
def _run(self):
'''Method executed within green thread.'''
if self._request_timeout:
# No timeout exception escapes the with block.
- with timeout.Timeout(self._request_timeout, False):
+ with eventlet.timeout.Timeout(self._request_timeout, False):
return self._handle_request()
- lg.info(_('[%d] Request timeout.'), self._rid())
+ LOG.info(_('[%d] Request timeout.'), self._rid())
self._request_error = Exception(_('Request timeout'))
return None
else:
return self._handle_request()
- def _request_str(self, conn, url):
- '''Return string representation of connection.'''
- return "%s %s/%s" % (self._method, _conn_str(conn), url)
-
- def _issue_request(self):
- '''Issue a request to a provider.'''
- conn = self._api_client.acquire_connection(rid=self._rid())
- if conn is None:
- error = Exception(_("No API connections available"))
- self._request_error = error
- return error
-
- # Preserve the acquired connection as conn may be over-written by
- # redirects below.
- acquired_conn = conn
-
- url = self._url
- lg.debug(_("[%(rid)d] Issuing - request '%(req)s'"),
- {'rid': self._rid(),
- 'req': self._request_str(conn, url)})
- issued_time = time.time()
- is_conn_error = False
- try:
- redirects = 0
- while (redirects <= self._redirects):
- # Update connection with user specified request timeout,
- # the connect timeout is usually smaller so we only set
- # the request timeout after a connection is established
- if conn.sock is None:
- conn.connect()
- conn.sock.settimeout(self._http_timeout)
- elif conn.sock.gettimeout() != self._http_timeout:
- conn.sock.settimeout(self._http_timeout)
-
- headers = copy.copy(self._headers)
- gen = self._api_client.nvp_config_gen
- if gen:
- headers["X-Nvp-Wait-For-Config-Generation"] = gen
- lg.debug(_("Setting %(header)s request header: %(gen)s"),
- {'header': 'X-Nvp-Wait-For-Config-Generation',
- 'gen': gen})
- try:
- conn.request(self._method, url, self._body, headers)
- except Exception as e:
- lg.warn(_('[%(rid)d] Exception issuing request: %(e)s'),
- {'rid': self._rid(), 'e': e})
- raise e
-
- response = conn.getresponse()
- response.body = response.read()
- response.headers = response.getheaders()
- lg.debug(_("[%(rid)d] Completed request '%(req)s': %(status)s "
- "(%(time)0.2f seconds)"),
- {'rid': self._rid(),
- 'req': self._request_str(conn, url),
- 'status': response.status,
- 'time': time.time() - issued_time})
-
- new_gen = response.getheader('X-Nvp-Config-Generation', None)
- if new_gen:
- lg.debug(_("Reading %(header)s response header: %(gen)s"),
- {'header': 'X-Nvp-config-Generation',
- 'gen': new_gen})
- if (self._api_client.nvp_config_gen is None or
- self._api_client.nvp_config_gen < int(new_gen)):
- self._api_client.nvp_config_gen = int(new_gen)
-
- if response.status not in [httplib.MOVED_PERMANENTLY,
- httplib.TEMPORARY_REDIRECT]:
- break
- elif redirects >= self._redirects:
- lg.info(_("[%d] Maximum redirects exceeded, aborting "
- "request"), self._rid())
- break
- redirects += 1
-
- # In the following call, conn is replaced by the connection
- # specified in the redirect response from the server.
- conn, url = self._redirect_params(conn, response.headers)
- if url is None:
- response.status = httplib.INTERNAL_SERVER_ERROR
- break
- lg.info(_("[%(rid)d] Redirecting request to: %(req)s"),
- {'rid': self._rid(),
- 'req': self._request_str(conn, url)})
-
- # FIX for #9415. If we receive any of these responses, then
- # our server did not process our request and may be in an
- # errored state. Raise an exception, which will cause the
- # the conn to be released with is_conn_error == True
- # which puts the conn on the back of the client's priority
- # queue.
- if response.status >= 500:
- lg.warn(_("[%(rid)d] Request '%(method)s %(url)s' "
- "received: %(status)s"),
- {'rid': self._rid(), 'method': self._method,
- 'url': self._url,
- 'status': response.status})
- raise Exception(_('Server error return: %s') %
- response.status)
- return response
- except Exception as e:
- if isinstance(e, httplib.BadStatusLine):
- msg = _("Invalid server response")
- else:
- msg = unicode(e)
- lg.warn(_("[%(rid)d] Failed request '%(req)s': %(msg)s "
- "(%(time)0.2f seconds)"),
- {'rid': self._rid(), 'req': self._request_str(conn, url),
- 'msg': msg,
- 'time': time.time() - issued_time})
- self._request_error = e
- is_conn_error = True
- return e
- finally:
- # Make sure we release the original connection provided by the
- # acquire_connection() call above.
- self._api_client.release_connection(acquired_conn, is_conn_error,
- rid=self._rid())
-
- def _redirect_params(self, conn, headers):
- '''Process redirect params from a server response.'''
- url = None
- for name, value in headers:
- if name.lower() == "location":
- url = value
- break
- if not url:
- lg.warn(_("[%d] Received redirect status without location header "
- "field"), self._rid())
- return (conn, None)
- # Accept location with the following format:
- # 1. /path, redirect to same node
- # 2. scheme://hostname:[port]/path where scheme is https or http
- # Reject others
- # 3. e.g. relative paths, unsupported scheme, unspecified host
- result = urlparse.urlparse(url)
- if not result.scheme and not result.hostname and result.path:
- if result.path[0] == "/":
- if result.query:
- url = "%s?%s" % (result.path, result.query)
- else:
- url = result.path
- return (conn, url) # case 1
- else:
- lg.warn(_("[%(rid)d] Received invalid redirect location: "
- "%(url)s"),
- {'rid': self._rid(), 'url': url})
- return (conn, None) # case 3
- elif result.scheme not in ["http", "https"] or not result.hostname:
- lg.warn(_("[%(rid)d] Received malformed redirect location: "
- "%(url)s"),
- {'rid': self._rid(), 'url': url})
- return (conn, None) # case 3
- # case 2, redirect location includes a scheme
- # so setup a new connection and authenticate
- use_https = result.scheme == "https"
- api_providers = [(result.hostname, result.port, use_https)]
- api_client = client_eventlet.NvpApiClientEventlet(
- api_providers, self._api_client.user, self._api_client.password,
- use_https=use_https)
- api_client.wait_for_login()
- if api_client.auth_cookie:
- self._headers["Cookie"] = api_client.auth_cookie
- else:
- self._headers["Cookie"] = ""
- conn = api_client.acquire_connection(rid=self._rid())
- if result.query:
- url = "%s?%s" % (result.path, result.query)
- else:
- url = result.path
- return (conn, url)
-
def _handle_request(self):
'''First level request handling.'''
attempt = 0
while response is None and attempt <= self._retries:
attempt += 1
- if self._auto_login and self._api_client.need_login:
- self._api_client.wait_for_login()
-
- if self._api_client.auth_cookie:
- self._headers["Cookie"] = self._api_client.auth_cookie
-
req = self.spawn(self._issue_request).wait()
# automatically raises any exceptions returned.
if isinstance(req, httplib.HTTPResponse):
- if (req.status == httplib.UNAUTHORIZED
+ if attempt <= self._retries and not self._abort:
+ if (req.status == httplib.UNAUTHORIZED
or req.status == httplib.FORBIDDEN):
- self._api_client.need_login = True
- if attempt <= self._retries:
continue
# else fall through to return the error code
- lg.debug(_("[%(rid)d] Completed request '%(method)s %(url)s'"
- ": %(status)s"),
- {'rid': self._rid(), 'method': self._method,
- 'url': self._url, 'status': req.status})
+ LOG.debug(_("[%(rid)d] Completed request '%(method)s %(url)s'"
+ ": %(status)s"),
+ {'rid': self._rid(), 'method': self._method,
+ 'url': self._url, 'status': req.status})
self._request_error = None
response = req
else:
- lg.info(_('[%(rid)d] Error while handling request: %(req)s'),
- {'rid': self._rid(), 'req': req})
+ LOG.info(_('[%(rid)d] Error while handling request: %(req)s'),
+ {'rid': self._rid(), 'req': req})
self._request_error = req
response = None
-
return response
class NvpLoginRequestEventlet(NvpApiRequestEventlet):
'''Process a login request.'''
- def __init__(self, nvp_client, user, password):
- headers = {"Content-Type": "application/x-www-form-urlencoded"}
+ def __init__(self, nvp_client, user, password, client_conn=None,
+ headers=None):
+ if headers is None:
+ headers = {}
+ headers.update({"Content-Type": "application/x-www-form-urlencoded"})
body = urllib.urlencode({"username": user, "password": password})
NvpApiRequestEventlet.__init__(
self, nvp_client, "/ws.v1/login", "POST", body, headers,
- auto_login=False)
+ auto_login=False, client_conn=client_conn)
def session_cookie(self):
if self.successful():
class NvpGetApiProvidersRequestEventlet(NvpApiRequestEventlet):
- '''Get a list of API providers.'''
+ '''Gej a list of API providers.'''
def __init__(self, nvp_client):
url = "/ws.v1/control-cluster/node?fields=roles"
ret.append(_provider_from_listen_addr(addr))
return ret
except Exception as e:
- lg.warn(_("[%(rid)d] Failed to parse API provider: %(e)s"),
- {'rid': self._rid(), 'e': e})
+ LOG.warn(_("[%(rid)d] Failed to parse API provider: %(e)s"),
+ {'rid': self._rid(), 'e': e})
# intentionally fall through
return None
def __init__(self, nvp_client, method, url, body, content_type,
auto_login=False,
- request_timeout=DEFAULT_REQUEST_TIMEOUT,
- http_timeout=DEFAULT_HTTP_TIMEOUT,
- retries=DEFAULT_RETRIES,
- redirects=DEFAULT_REDIRECTS):
+ request_timeout=request.DEFAULT_REQUEST_TIMEOUT,
+ http_timeout=request.DEFAULT_HTTP_TIMEOUT,
+ retries=request.DEFAULT_RETRIES,
+ redirects=request.DEFAULT_REDIRECTS):
headers = {"Content-Type": content_type}
-
NvpApiRequestEventlet.__init__(
self, nvp_client, url, method, body, headers,
request_timeout=request_timeout, retries=retries,
cfg.IntOpt('max_lp_per_bridged_ls', default=64),
cfg.IntOpt('max_lp_per_overlay_ls', default=256),
cfg.IntOpt('concurrent_connections', default=5),
- cfg.IntOpt('failover_time', default=240),
+ cfg.IntOpt('nvp_gen_timeout', default=-1),
cfg.StrOpt('default_cluster_name')
]
myconn.__str__.return_value = 'myconn string'
req = self.req
- req._request_timeout = REQUEST_TIMEOUT = 1
req._redirect_params = Mock()
req._redirect_params.return_value = (myconn, 'url')
req._request_str = Mock()
with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.'
'client_eventlet.NvpApiClientEventlet') as mock:
api_client = mock.return_value
- api_client.wait_for_login.return_value = None
- api_client.auth_cookie = 'mycookie'
- api_client.acquire_connection.return_value = True
+ self.req._api_client = api_client
myconn = Mock()
(conn, retval) = self.req._redirect_params(
myconn, [('location', 'https://host:1/path')])
self.assertTrue(retval is not None)
- self.assertTrue(api_client.wait_for_login.called)
- self.assertTrue(api_client.acquire_connection.called)
+ self.assertTrue(api_client.acquire_redirect_connection.called)
def test_redirect_params_setup_htttps_and_query(self):
with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.'
'client_eventlet.NvpApiClientEventlet') as mock:
api_client = mock.return_value
- api_client.wait_for_login.return_value = None
- api_client.auth_cookie = 'mycookie'
- api_client.acquire_connection.return_value = True
+ self.req._api_client = api_client
myconn = Mock()
(conn, retval) = self.req._redirect_params(myconn, [
('location', 'https://host:1/path?q=1')])
self.assertTrue(retval is not None)
- self.assertTrue(api_client.wait_for_login.called)
- self.assertTrue(api_client.acquire_connection.called)
+ self.assertTrue(api_client.acquire_redirect_connection.called)
def test_redirect_params_setup_https_connection_no_cookie(self):
with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.'
'client_eventlet.NvpApiClientEventlet') as mock:
api_client = mock.return_value
- api_client.wait_for_login.return_value = None
- api_client.auth_cookie = None
- api_client.acquire_connection.return_value = True
+ self.req._api_client = api_client
myconn = Mock()
(conn, retval) = self.req._redirect_params(myconn, [
('location', 'https://host:1/path')])
self.assertTrue(retval is not None)
- self.assertTrue(api_client.wait_for_login.called)
- self.assertTrue(api_client.acquire_connection.called)
+ self.assertTrue(api_client.acquire_redirect_connection.called)
def test_redirect_params_setup_https_and_query_no_cookie(self):
with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.'
'client_eventlet.NvpApiClientEventlet') as mock:
api_client = mock.return_value
- api_client.wait_for_login.return_value = None
- api_client.auth_cookie = None
- api_client.acquire_connection.return_value = True
+ self.req._api_client = api_client
myconn = Mock()
(conn, retval) = self.req._redirect_params(
myconn, [('location', 'https://host:1/path?q=1')])
self.assertTrue(retval is not None)
- self.assertTrue(api_client.wait_for_login.called)
- self.assertTrue(api_client.acquire_connection.called)
+ self.assertTrue(api_client.acquire_redirect_connection.called)
def test_redirect_params_path_only_with_query(self):
with patch('quantum.plugins.nicira.nicira_nvp_plugin.api_client.'