# It should be false when you use nova security group.
# enable_security_group = True
-[qos]
-# QoS agent driver
-# agent_driver = ovs
-
#-----------------------------------------------------------------------------
# Sample Configurations.
#-----------------------------------------------------------------------------
An agent extension extends the agent core functionality.
"""
- def initialize(self):
+ def initialize(self, connection, driver_type):
"""Perform agent core resource extension initialization.
+ :param connection: RPC connection that can be reused by the extension
+ to define its RPC endpoints
+ :param driver_type: a string that defines the agent type to the
+ extension. Can be used to choose the right backend
+ implementation.
+
Called after all extensions have been loaded.
No port handling will be called before this method.
"""
invoke_on_load=True, name_order=True)
LOG.info(_LI("Loaded agent extensions: %s"), self.names())
- def initialize(self, connection):
+ def initialize(self, connection, driver_type):
+ """Initialize enabled L2 agent extensions.
+
+ :param connection: RPC connection that can be reused by extensions to
+ define their RPC endpoints
+ :param driver_type: a string that defines the agent type to the
+ extension. Can be used by the extension to choose
+ the right backend implementation.
+ """
# Initialize each agent extension in the list.
for extension in self:
LOG.info(_LI("Initializing agent extension '%s'"), extension.name)
- extension.obj.initialize(connection)
+ extension.obj.initialize(connection, driver_type)
def handle_port(self, context, data):
"""Notify all agent extensions to handle port."""
import collections
from oslo_concurrency import lockutils
-from oslo_config import cfg
import six
from neutron.agent.l2 import agent_extension
@six.add_metaclass(abc.ABCMeta)
class QosAgentDriver(object):
- """Define stable abstract interface for QoS Agent Driver.
+ """Defines stable abstract interface for QoS Agent Driver.
QoS Agent driver defines the interface to be implemented by Agent
for applying QoS Rules on a port.
def initialize(self):
"""Perform QoS agent driver initialization.
"""
- pass
@abc.abstractmethod
def create(self, port, qos_policy):
"""
#TODO(QoS) we may want to provide default implementations of calling
#delete and then update
- pass
@abc.abstractmethod
def update(self, port, qos_policy):
:param port: port object.
:param qos_policy: the QoS policy to be applied on port.
"""
- pass
@abc.abstractmethod
def delete(self, port, qos_policy):
:param port: port object.
:param qos_policy: the QoS policy to be removed from port.
"""
- pass
class QosAgentExtension(agent_extension.AgentCoreResourceExtension):
SUPPORTED_RESOURCES = [resources.QOS_POLICY]
- def initialize(self, connection):
+ def initialize(self, connection, driver_type):
"""Perform Agent Extension initialization.
"""
- super(QosAgentExtension, self).initialize()
-
self.resource_rpc = resources_rpc.ResourcesPullRpcApi()
self.qos_driver = manager.NeutronManager.load_class_for_provider(
- 'neutron.qos.agent_drivers', cfg.CONF.qos.agent_driver)()
+ 'neutron.qos.agent_drivers', driver_type)()
self.qos_driver.initialize()
# we cannot use a dict of sets here because port dicts are not hashable
"timeout won't be changed"))
]
-qos_opts = [
- cfg.StrOpt('agent_driver', default='ovs', help=_('QoS agent driver.')),
-]
-
cfg.CONF.register_opts(ovs_opts, "OVS")
cfg.CONF.register_opts(agent_opts, "AGENT")
-cfg.CONF.register_opts(qos_opts, "qos")
config.register_agent_state_opts_helper(cfg.CONF)
OVS_RESTARTED = 0
OVS_NORMAL = 1
OVS_DEAD = 2
+
+EXTENSION_DRIVER_TYPE = 'ovs'
ext_manager.register_opts(self.conf)
self.ext_manager = (
ext_manager.AgentExtensionsManager(self.conf))
- self.ext_manager.initialize(connection)
+ self.ext_manager.initialize(
+ connection, constants.EXTENSION_DRIVER_TYPE)
def get_net_uuid(self, vif_id):
for network_id, vlan_mapping in six.iteritems(self.local_vlan_map):
def test_initialize(self):
connection = object()
- self.manager.initialize(connection)
+ self.manager.initialize(connection, 'fake_driver_type')
ext = self._get_extension()
- ext.initialize.assert_called_once_with(connection)
+ ext.initialize.assert_called_once_with(connection, 'fake_driver_type')
def test_handle_port(self):
context = object()
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
from neutron import context
-from neutron.plugins.ml2.drivers.openvswitch.agent.common import config # noqa
+from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants
from neutron.tests import base
def setUp(self):
super(QosExtensionRpcTestCase, self).setUp()
- self.qos_ext.initialize(self.connection)
+ self.qos_ext.initialize(
+ self.connection, constants.EXTENSION_DRIVER_TYPE)
self.pull_mock = mock.patch.object(
self.qos_ext.resource_rpc, 'pull',
@mock.patch.object(registry, 'subscribe')
@mock.patch.object(resources_rpc, 'ResourcesPushRpcCallback')
def test_initialize_subscribed_to_rpc(self, rpc_mock, subscribe_mock):
- self.qos_ext.initialize(self.connection)
+ self.qos_ext.initialize(
+ self.connection, constants.EXTENSION_DRIVER_TYPE)
self.connection.create_consumer.assert_has_calls(
[mock.call(
resources_rpc.resource_type_versioned_topic(resource_type),