An agent extension extends the agent core functionality.
"""
- def initialize(self, resource_rpc):
+ def initialize(self):
"""Perform agent core resource extension initialization.
Called after all extensions have been loaded.
No abstract methods defined below will be
called prior to this method being called.
- :param resource_rpc - the agent side rpc for getting
- resource by type and id
"""
- self.resource_rpc = resource_rpc
+ pass
def handle_network(self, context, data):
"""handle agent extension for network.
class AgentExtensionsManager(stevedore.named.NamedExtensionManager):
"""Manage agent extensions."""
- def __init__(self, agent_extensions):
+ def __init__(self):
# Ordered list of agent extensions, defining
# the order in which the agent extensions are called.
+ #TODO(QoS): get extensions from config
+ agent_extensions = ('qos', )
+
LOG.info(_LI("Configured agent extensions names: %s"),
agent_extensions)
{'name': extension.name, 'method': method_name}
)
- def initialize(self, resource_rpc):
+ def initialize(self):
# Initialize each agent extension in the list.
for extension in self:
LOG.info(_LI("Initializing agent extension '%s'"), extension.name)
- extension.obj.initialize(resource_rpc)
+ extension.obj.initialize()
def handle_network(self, context, data):
"""Notify all agent extensions to handle network."""
from neutron.agent.l2 import agent_extension
from neutron.api.rpc.callbacks import resources
+from neutron.api.rpc.handlers import resources_rpc
from neutron import manager
class QosAgentExtension(agent_extension.AgentCoreResourceExtension):
- def initialize(self, resource_rpc):
+ def initialize(self):
"""Perform Agent Extension initialization.
- :param resource_rpc: the agent side rpc for getting
- resource by type and id
"""
- super(QosAgentExtension, self).initialize(resource_rpc)
+ super(QosAgentExtension, self).initialize()
+ self.resource_rpc = resources_rpc.ResourcesServerRpcApi()
self.qos_driver = manager.NeutronManager.load_class_for_provider(
'neutron.qos.agent_drivers', cfg.CONF.qos.agent_driver)()
self.qos_driver.initialize()
+++ /dev/null
-# Copyright (c) 2015 Mellanox Technologies, Ltd
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import abc
-
-import six
-
-from neutron.agent.l2 import agent_extensions_manager
-
-
-#TODO(QoS): add unit tests to L2 Agent
-@six.add_metaclass(abc.ABCMeta)
-class L2Agent(object):
- """Define stable abstract interface for L2 Agent
-
- This class initialize the agent extension manager and
- provides API for calling the extensions manager process
- extensions methods.
- """
- def __init__(self, polling_interval):
- self.polling_interval = polling_interval
- self.agent_extensions_mgr = None
- self.resource_rpc = None
-
- def initialize(self):
- #TODO(QoS): get extensions from server ????
- agent_extensions = ('qos', )
- self.agent_extensions_mgr = (
- agent_extensions_manager.AgentExtensionsManager(
- agent_extensions))
- self.agent_extensions_mgr.initialize(self.resource_rpc)
-
- def process_network_extensions(self, context, network):
- self.agent_extensions_mgr.handle_network(
- context, network)
-
- def process_subnet_extensions(self, context, subnet):
- self.agent_extensions_mgr.handle_subnet(
- context, subnet)
-
- def process_port_extensions(self, context, port):
- self.agent_extensions_mgr.handle_port(
- context, port)
from neutron.agent.common import ovs_lib
from neutron.agent.common import polling
from neutron.agent.common import utils
+from neutron.agent.l2 import agent_extensions_manager
from neutron.agent.linux import ip_lib
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
# keeps association between ports and ofports to detect ofport change
self.vifname_to_ofport_map = {}
self.setup_rpc()
+ self.init_agent_extensions_mgr()
self.bridge_mappings = bridge_mappings
self.setup_physical_bridges(self.bridge_mappings)
self.local_vlan_map = {}
consumers,
start_listening=False)
+ def init_agent_extensions_mgr(self):
+ self.agent_extensions_mgr = (
+ agent_extensions_manager.AgentExtensionsManager())
+ self.agent_extensions_mgr.initialize()
+
def get_net_uuid(self, vif_id):
for network_id, vlan_mapping in six.iteritems(self.local_vlan_map):
if vif_id in vlan_mapping.vif_ports:
if need_binding:
details['vif_port'] = port
need_binding_devices.append(details)
+ self.agent_extensions_mgr.handle_port(self.context, details)
else:
LOG.warn(_LW("Device %s not defined on plugin"), device)
if (port and port.ofport != -1):
return_value=lambda: mock.Mock(spec=qos_agent.QosAgentDriver)
).start()
+ self.qos_agent.initialize()
self._create_fake_resource_rpc()
- self.qos_agent.initialize(self.resource_rpc_mock)
def _create_fake_resource_rpc(self):
self.get_info_mock = mock.Mock(return_value=TEST_GET_INFO_RULES)
- self.resource_rpc_mock = mock.Mock()
- self.resource_rpc_mock.get_info = self.get_info_mock
+ self.qos_agent.resource_rpc.get_info = self.get_info_mock
def _create_test_port_dict(self):
return {'port_id': uuidutils.generate_uuid(),
def test_handle_known_port_change_policy_id(self):
port = self._create_test_port_dict()
self.qos_agent.handle_port(self.context, port)
- self.resource_rpc_mock.get_info.reset_mock()
+ self.qos_agent.resource_rpc.get_info.reset_mock()
port['qos_policy_id'] = uuidutils.generate_uuid()
self.qos_agent.handle_port(self.context, port)
self.get_info_mock.assert_called_once_with(
return_value=None):
self.assertFalse(get_dev_fn.called)
- def test_treat_devices_added_updated_updates_known_port(self):
+ #TODO(QoS) that this mock should go away once we don't hardcode
+ #qos extension.
+ @mock.patch('neutron.api.rpc.handlers.resources_rpc.'
+ 'ResourcesServerRpcApi.get_info', return_value=[])
+ def test_treat_devices_added_updated_updates_known_port(
+ self, *args):
details = mock.MagicMock()
details.__contains__.side_effect = lambda x: True
self.assertTrue(self._mock_treat_devices_added_updated(