"""Implentation of Brocade Neutron Plugin."""
from oslo_config import cfg
-from oslo_context import context as oslo_context
from oslo_log import log as logging
import oslo_messaging
from oslo_utils import importutils
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils
+from neutron import context as n_context
from neutron.db import agents_db
from neutron.db import agentschedulers_db
from neutron.db import api as db
physical_interface)
self.base_binding_dict = self._get_base_binding_dict()
portbindings_base.register_port_dict_function()
- self.ctxt = oslo_context.get_admin_context()
- self.ctxt.session = db.get_session()
+ self.ctxt = n_context.get_admin_context()
self._vlan_bitmap = vbm.VlanBitmap(self.ctxt)
self._setup_rpc()
self.network_scheduler = importutils.import_object(
# RPC support
self.service_topics = {svc_constants.CORE: topics.PLUGIN,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
- self.rpc_context = oslo_context.RequestContext('neutron', 'neutron',
- is_admin=False)
+ self.rpc_context = n_context.ContextBase('neutron', 'neutron',
+ is_admin=False)
self.conn = n_rpc.create_connection(new=True)
self.endpoints = [BridgeRpcCallbacks(),
securitygroups_rpc.SecurityGroupServerRpcCallback(),
Test vlans alloc/dealloc.
"""
-from oslo_context import context as oslo_context
-
-from neutron.db import api as db
+from neutron import context as n_context
from neutron.plugins.brocade import vlanbm as vlan_bitmap
from neutron.tests.unit import testlib_api
def setUp(self):
super(TestVlanBitmap, self).setUp()
- self.context = oslo_context.get_admin_context()
- self.context.session = db.get_session()
+ self.context = n_context.get_admin_context()
def test_vlan(self):
"""test vlan allocation/de-alloc."""