from quantum.agent import rpc as agent_rpc
from quantum.common import exceptions
from quantum.common import topics
+from quantum import context
from quantum.openstack.common import cfg
-from quantum.openstack.common import context
from quantum.openstack.common import importutils
from quantum.openstack.common import jsonutils
from quantum.openstack.common import log as logging
self.cache = NetworkCache()
self.dhcp_driver_cls = importutils.import_class(conf.dhcp_driver)
- ctx = context.RequestContext('quantum', 'quantum', is_admin=True)
+ ctx = context.get_admin_context_without_session()
self.plugin_rpc = DhcpPluginApi(topics.PLUGIN, ctx)
self.device_manager = DeviceManager(self.conf, self.plugin_rpc)
--- /dev/null
+# Copyright (c) 2012 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from quantum.openstack.common import log as logging
+from quantum.openstack.common.rpc import dispatcher
+from quantum import context
+
+
+LOG = logging.getLogger(__name__)
+
+
+class PluginRpcDispatcher(dispatcher.RpcDispatcher):
+ """This class is used to convert RPC common context into
+ Quantum Context."""
+
+ def __init__(self, callbacks):
+ super(PluginRpcDispatcher, self).__init__(callbacks)
+
+ def dispatch(self, rpc_ctxt, version, method, **kwargs):
+ rpc_ctxt_dict = rpc_ctxt.to_dict()
+ user_id = rpc_ctxt_dict.pop('user_id', None)
+ if not user_id:
+ user_id = rpc_ctxt_dict.pop('user', None)
+ tenant_id = rpc_ctxt_dict.pop('tenant_id', None)
+ if not tenant_id:
+ tenant_id = rpc_ctxt_dict.pop('project_id', None)
+ quantum_ctxt = context.Context(user_id, tenant_id, **rpc_ctxt_dict)
+ return super(PluginRpcDispatcher, self).dispatch(
+ quantum_ctxt, version, method, **kwargs)
LOG = logging.getLogger(__name__)
-class Context(common_context.RequestContext):
+class ContextBase(common_context.RequestContext):
"""Security context and request information.
Represents the user taking a given action within the system.
if kwargs:
LOG.warn(_('Arguments dropped when creating '
'context: %s') % str(kwargs))
- super(Context, self).__init__(user=user_id, tenant=tenant_id,
- is_admin=is_admin)
- self.user_id = user_id
- self.tenant_id = tenant_id
+ super(ContextBase, self).__init__(user=user_id, tenant=tenant_id,
+ is_admin=is_admin)
self.roles = roles or []
if self.is_admin is None:
self.is_admin = 'admin' in [x.lower() for x in self.roles]
self.timestamp = timestamp
self._session = None
+ @property
+ def project_id(self):
+ return self.tenant
+
+ @property
+ def tenant_id(self):
+ return self.tenant
+
+ @tenant_id.setter
+ def tenant_id(self, tenant_id):
+ self.tenant = tenant_id
+
+ @property
+ def user_id(self):
+ return self.user
+
+ @user_id.setter
+ def user_id(self, user_id):
+ self.user = user_id
+
def _get_read_deleted(self):
return self._read_deleted
read_deleted = property(_get_read_deleted, _set_read_deleted,
_del_read_deleted)
- @property
- def session(self):
- if self._session is None:
- self._session = db_api.get_session()
- return self._session
-
def to_dict(self):
return {'user_id': self.user_id,
'tenant_id': self.tenant_id,
+ 'project_id': self.project_id,
'is_admin': self.is_admin,
'read_deleted': self.read_deleted,
'roles': self.roles,
return context
+class Context(ContextBase):
+ @property
+ def session(self):
+ if self._session is None:
+ self._session = db_api.get_session()
+ return self._session
+
+
def get_admin_context(read_deleted="no"):
return Context(user_id=None,
tenant_id=None,
is_admin=True,
read_deleted=read_deleted)
+
+
+def get_admin_context_without_session(read_deleted="no"):
+ return ContextBase(user_id=None,
+ tenant_id=None,
+ is_admin=True,
+ read_deleted=read_deleted)
from sqlalchemy.orm import exc
from quantum.api.v2 import attributes
-from quantum import context as quantum_context
from quantum import manager
-from quantum.openstack.common import context
from quantum.openstack.common import log as logging
LOG = logging.getLogger(__name__)
-def augment_context(context):
- """Augments RPC with additional attributes, so that plugin calls work."""
- return quantum_context.Context(context.user, None, is_admin=True,
- roles=['admin'])
-
-
class DhcpRpcCallbackMixin(object):
"""A mix-in that enable DHCP agent support in plugin implementations."""
host = kwargs.get('host')
LOG.debug('Network list requested from %s', host)
plugin = manager.QuantumManager.get_plugin()
- context = augment_context(context)
filters = dict(admin_state_up=[True])
return [net['id'] for net in
def get_network_info(self, context, **kwargs):
"""Retrieve and return a extended information about a network."""
network_id = kwargs.get('network_id')
- context = augment_context(context)
plugin = manager.QuantumManager.get_plugin()
network = plugin.get_network(context, network_id)
host = kwargs.get('host')
network_id = kwargs.get('network_id')
device_id = kwargs.get('device_id')
- # There could be more than one dhcp server per network, so create
- # a device id that combines host and network ids
+ # There could be more than one dhcp server per network, so create
+ # a device id that combines host and network ids
LOG.debug('Port %s for %s requested from %s', device_id, network_id,
host)
- context = augment_context(context)
plugin = manager.QuantumManager.get_plugin()
retval = None
LOG.debug('DHCP port deletion for %s d request from %s', network_id,
host)
- context = augment_context(context)
plugin = manager.QuantumManager.get_plugin()
filters = dict(network_id=[network_id], device_id=[device_id])
ports = plugin.get_ports(context, filters=filters)
LOG.debug('DHCP port remove fixed_ip for %s d request from %s',
subnet_id,
host)
-
- context = augment_context(context)
plugin = manager.QuantumManager.get_plugin()
filters = dict(network_id=[network_id], device_id=[device_id])
ports = plugin.get_ports(context, filters=filters)
LOG.debug('Updating lease expiration for %s on network %s from %s.',
ip_address, network_id, host)
-
- context = augment_context(context)
plugin = manager.QuantumManager.get_plugin()
plugin.update_fixed_ip_lease_expiration(context, network_id,
from quantum.agent.linux import utils
from quantum.agent import rpc as agent_rpc
from quantum.common import config as logging_config
-from quantum.common import constants
from quantum.common import topics
from quantum.common import utils as q_utils
+from quantum import context
from quantum.openstack.common import cfg
-from quantum.openstack.common import context
from quantum.openstack.common import log as logging
-from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import dispatcher
from quantum.plugins.linuxbridge.common import config
from quantum.plugins.linuxbridge.common import constants as lconst
self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
# RPC network init
- self.context = context.RequestContext('quantum', 'quantum',
- is_admin=False)
+ self.context = context.get_admin_context_without_session()
# Handle updates from service
self.callbacks = LinuxBridgeRpcCallbacks(self.context,
self.linux_br)
from quantum.api.v2 import attributes
from quantum.common import constants as q_const
from quantum.common import exceptions as q_exc
+from quantum.common import rpc as q_rpc
from quantum.common import topics
from quantum.db import api as db_api
from quantum.db import db_base_plugin_v2
from quantum.db import dhcp_rpc_base
from quantum.db import l3_db
-from quantum.db import models_v2
from quantum.extensions import providernet as provider
from quantum.openstack.common import cfg
-from quantum.openstack.common import context
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
-from quantum.openstack.common.rpc import dispatcher
from quantum.openstack.common.rpc import proxy
from quantum.plugins.linuxbridge.common import constants
from quantum.plugins.linuxbridge.db import l2network_db_v2 as db
# Device names start with "tap"
TAP_PREFIX_LEN = 3
- def __init__(self, rpc_context):
- self.rpc_context = rpc_context
-
def create_rpc_dispatcher(self):
'''Get the rpc dispatcher for this manager.
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
- return dispatcher.RpcDispatcher([self])
+ return q_rpc.PluginRpcDispatcher([self])
def get_device_details(self, rpc_context, **kwargs):
"""Agent requests device details"""
def _setup_rpc(self):
# RPC support
self.topic = topics.PLUGIN
- self.rpc_context = context.RequestContext('quantum', 'quantum',
- is_admin=False)
self.conn = rpc.create_connection(new=True)
- self.callbacks = LinuxBridgeRpcCallbacks(self.rpc_context)
+ self.callbacks = LinuxBridgeRpcCallbacks()
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher,
fanout=False)
binding.vlan_id, self.network_vlan_ranges)
# the network_binding record is deleted via cascade from
# the network record, so explicit removal is not necessary
- self.notifier.network_delete(self.rpc_context, id)
+ self.notifier.network_delete(context, id)
def get_network(self, context, id, fields=None):
net = super(LinuxBridgePluginV2, self).get_network(context, id, None)
if original_port['admin_state_up'] != port['admin_state_up']:
binding = db.get_network_binding(context.session,
port['network_id'])
- self.notifier.port_update(self.rpc_context, port,
+ self.notifier.port_update(context, port,
binding.physical_network,
binding.vlan_id)
return port
from quantum.agent.linux import utils
from quantum.agent import rpc as agent_rpc
from quantum.common import config as logging_config
-from quantum.common import constants as q_const
from quantum.common import topics
from quantum.common import utils as q_utils
+from quantum import context
from quantum.openstack.common import cfg
-from quantum.openstack.common import context
from quantum.openstack.common import log as logging
-from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import dispatcher
from quantum.plugins.openvswitch.common import config
from quantum.plugins.openvswitch.common import constants
self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
# RPC network init
- self.context = context.RequestContext('quantum', 'quantum',
- is_admin=False)
+ self.context = context.get_admin_context_without_session()
# Handle updates from service
self.dispatcher = self.create_rpc_dispatcher()
# Define the listening consumers for the agent
# @author: Aaron Rosen, Nicira Networks, Inc.
# @author: Bob Kukura, Red Hat, Inc.
-import os
import sys
from quantum.api.v2 import attributes
from quantum.common import constants as q_const
from quantum.common import exceptions as q_exc
+from quantum.common import rpc as q_rpc
from quantum.common import topics
from quantum.db import db_base_plugin_v2
from quantum.db import dhcp_rpc_base
from quantum.db import l3_db
from quantum.extensions import providernet as provider
from quantum.openstack.common import cfg
-from quantum.openstack.common import context
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
-from quantum.openstack.common.rpc import dispatcher
from quantum.openstack.common.rpc import proxy
from quantum.plugins.openvswitch.common import config
from quantum.plugins.openvswitch.common import constants
# Set RPC API version to 1.0 by default.
RPC_API_VERSION = '1.0'
- def __init__(self, rpc_context, notifier):
- self.rpc_context = rpc_context
+ def __init__(self, notifier):
self.notifier = notifier
def create_rpc_dispatcher(self):
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
- return dispatcher.RpcDispatcher([self])
+ return q_rpc.PluginRpcDispatcher([self])
def get_device_details(self, rpc_context, **kwargs):
"""Agent requests device details"""
entry = dict()
entry['tunnels'] = tunnels
# Notify all other listening agents
- self.notifier.tunnel_update(self.rpc_context, tunnel.ip_address,
+ self.notifier.tunnel_update(rpc_context, tunnel.ip_address,
tunnel.id)
# Return the list of tunnels IP's to the agent
return entry
def setup_rpc(self):
# RPC support
self.topic = topics.PLUGIN
- self.rpc_context = context.RequestContext('quantum', 'quantum',
- is_admin=False)
self.conn = rpc.create_connection(new=True)
self.notifier = AgentNotifierApi(topics.AGENT)
- self.callbacks = OVSRpcCallbacks(self.rpc_context, self.notifier)
+ self.callbacks = OVSRpcCallbacks(self.notifier)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher,
fanout=False)
self.network_vlan_ranges)
# the network_binding record is deleted via cascade from
# the network record, so explicit removal is not necessary
- self.notifier.network_delete(self.rpc_context, id)
+ self.notifier.network_delete(context, id)
def get_network(self, context, id, fields=None):
net = super(OVSQuantumPluginV2, self).get_network(context, id, None)
if original_port['admin_state_up'] != port['admin_state_up']:
binding = ovs_db_v2.get_network_binding(None,
port['network_id'])
- self.notifier.port_update(self.rpc_context, port,
+ self.notifier.port_update(context, port,
binding.network_type,
binding.segmentation_id,
binding.physical_network)
from quantum.db import dhcp_rpc_base
-class TestDhcpAugmentContext(unittest.TestCase):
- def test_augment_context(self):
- context = mock.Mock()
- context.user = 'quantum'
- context.tenant = None
- context.is_admin = True
-
- new_context = dhcp_rpc_base.augment_context(context)
-
- self.assertEqual(new_context.user_id, context.user)
- self.assertEqual(new_context.roles, ['admin'])
-
-
class TestDhcpRpcCallackMixin(unittest.TestCase):
- def setUp(self):
- self.context_p = mock.patch('quantum.db.dhcp_rpc_base.augment_context')
- self.context_p.start()
+ def setUp(self):
self.plugin_p = mock.patch('quantum.manager.QuantumManager.get_plugin')
get_plugin = self.plugin_p.start()
self.plugin = mock.Mock()
def tearDown(self):
self.log_p.stop()
self.plugin_p.stop()
- self.context_p.stop()
def test_get_active_networks(self):
plugin_retval = [dict(id='a'), dict(id='b')]
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 Nicira, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import unittest2 as unittest
+
+import mock
+
+from quantum import context
+
+
+class TestQuantumContext(unittest.TestCase):
+
+ def setUp(self):
+ db_api = 'quantum.db.api.get_session'
+ self._db_api_session_patcher = mock.patch(db_api)
+ self.db_api_session = self._db_api_session_patcher.start()
+
+ def tearDown(self):
+ self._db_api_session_patcher.stop()
+
+ def testQuantumContextCreate(self):
+ cxt = context.Context('user_id', 'tenant_id')
+ self.assertEquals('user_id', cxt.user_id)
+ self.assertEquals('tenant_id', cxt.project_id)
+
+ def testQuantumContextToDict(self):
+ cxt = context.Context('user_id', 'tenant_id')
+ cxt_dict = cxt.to_dict()
+ self.assertEquals('user_id', cxt_dict['user_id'])
+ self.assertEquals('tenant_id', cxt_dict['project_id'])
+
+ def testQuantumContextAdminToDict(self):
+ self.db_api_session.return_value = 'fakesession'
+ cxt = context.get_admin_context()
+ cxt_dict = cxt.to_dict()
+ self.assertIsNone(cxt_dict['user_id'])
+ self.assertIsNone(cxt_dict['tenant_id'])
+ self.assertIsNotNone(cxt.session)
+ self.assertFalse('session' in cxt_dict)
+
+ def testQuantumContextAdminWithoutSessionToDict(self):
+ cxt = context.get_admin_context_without_session()
+ cxt_dict = cxt.to_dict()
+ self.assertIsNone(cxt_dict['user_id'])
+ self.assertIsNone(cxt_dict['tenant_id'])
+ try:
+ session = cxt.session
+ except Exception:
+ pass
+ else:
+ self.assertFalse(True, 'without_session admin context'
+ 'should has no session property!')