]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Add extension_manager and support for extensions in linuxbridge agent
authorSławek Kapłoński <slawek@kaplonski.pl>
Thu, 26 Nov 2015 22:31:11 +0000 (23:31 +0100)
committerIhar Hrachyshka <ihrachys@redhat.com>
Tue, 5 Jan 2016 12:38:11 +0000 (12:38 +0000)
There is extensions mechanism for l2 agents already but it was
implemented only for openvswitch l2 agent. This patch adds support for
such extensions also for linuxbridge agent.

This patch also adds support for network_update events received by the
agent via RPC. It is required because sometimes when a network is
updated (for example with a QoS policy is attached to it) all ports that
belong to the network should also be updated.

Change-Id: Ie81c818d0eb817b044a6df1cbddc5864f118fe3f
Partial-bug: 1468803

neutron/plugins/ml2/drivers/linuxbridge/agent/common/constants.py
neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py
neutron/tests/unit/plugins/ml2/drivers/linuxbridge/agent/test_linuxbridge_neutron_agent.py
releasenotes/notes/linuxbridge-agent-extensions-66bdf9feee25ef99.yaml [new file with mode: 0644]

index aa970af2e474582b4f2a115d44a34266d1efb437..45c791868a860b13280716e61971f31da89bb3cd 100644 (file)
@@ -19,3 +19,5 @@ LOCAL_VLAN_ID = -2
 VXLAN_NONE = 'not_supported'
 VXLAN_MCAST = 'multicast_flooding'
 VXLAN_UCAST = 'unicast_flooding'
+
+EXTENSION_DRIVER_TYPE = 'linuxbridge'
index 242f16c4ff73cec089b549ddb57914c52af8b3e5..7cfae35cf367e30dca34370dc2646beb39bbdf5e 100644 (file)
@@ -19,6 +19,7 @@
 # Based on the structure of the OpenVSwitch agent in the
 # Neutron OpenVSwitch Plugin.
 
+import collections
 import sys
 import time
 
@@ -35,6 +36,7 @@ from oslo_utils import excutils
 from six import moves
 
 from neutron._i18n import _LE, _LI, _LW
+from neutron.agent.l2.extensions import manager as ext_manager
 from neutron.agent.linux import bridge_lib
 from neutron.agent.linux import ip_lib
 from neutron.agent.linux import utils
@@ -670,7 +672,8 @@ class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
     # history
     #   1.1 Support Security Group RPC
     #   1.3 Added param devices_to_update to security_groups_provider_updated
-    target = oslo_messaging.Target(version='1.3')
+    #   1.4 Added support for network_update
+    target = oslo_messaging.Target(version='1.4')
 
     def __init__(self, context, agent, sg_agent):
         super(LinuxBridgeRpcCallbacks, self).__init__()
@@ -708,6 +711,15 @@ class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
         self.agent.updated_devices.add(tap_name)
         LOG.debug("port_update RPC received for port: %s", port_id)
 
+    def network_update(self, context, **kwargs):
+        network_id = kwargs['network']['id']
+        LOG.debug("network_update message processed for network "
+                  "%(network_id)s, with ports: %(ports)s",
+                  {'network_id': network_id,
+                   'ports': self.agent.network_ports[network_id]})
+        for port_data in self.agent.network_ports[network_id]:
+            self.agent.updated_devices.add(port_data['device'])
+
     def fdb_add(self, context, fdb_entries):
         LOG.debug("fdb_add received")
         for network_id, values in fdb_entries.items():
@@ -810,8 +822,24 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
     def start(self):
         self.prevent_arp_spoofing = cfg.CONF.AGENT.prevent_arp_spoofing
         self.setup_linux_bridge(self.bridge_mappings, self.interface_mappings)
-        configurations = {'bridge_mappings': self.bridge_mappings,
-                          'interface_mappings': self.interface_mappings}
+
+        # stores received port_updates and port_deletes for
+        # processing by the main loop
+        self.updated_devices = set()
+
+        # stores all configured ports on agent
+        self.network_ports = collections.defaultdict(list)
+        # flag to do a sync after revival
+        self.fullsync = False
+        self.context = context.get_admin_context_without_session()
+        self.setup_rpc(self.interface_mappings.values())
+        self.init_extension_manager(self.connection)
+
+        configurations = {
+            'bridge_mappings': self.bridge_mappings,
+            'interface_mappings': self.interface_mappings,
+            'extensions': self.ext_manager.names()
+        }
         if self.br_mgr.vxlan_mode != lconst.VXLAN_NONE:
             configurations['tunneling_ip'] = self.br_mgr.local_ip
             configurations['tunnel_types'] = [p_const.TYPE_VXLAN]
@@ -824,16 +852,11 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
             'agent_type': constants.AGENT_TYPE_LINUXBRIDGE,
             'start_flag': True}
 
-        # stores received port_updates for processing by the main loop
-        self.updated_devices = set()
-        # flag to do a sync after revival
-        self.fullsync = False
-        self.context = context.get_admin_context_without_session()
-        self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
-        self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
-        self.sg_agent = sg_rpc.SecurityGroupAgentRpc(self.context,
-                self.sg_plugin_rpc, defer_refresh_firewall=True)
-        self.setup_rpc(self.interface_mappings.values())
+        report_interval = cfg.CONF.AGENT.report_interval
+        if report_interval:
+            heartbeat = loopingcall.FixedIntervalLoopingCall(
+                self._report_state)
+            heartbeat.start(interval=report_interval)
         self.daemon_loop()
 
     def stop(self, graceful=True):
@@ -871,6 +894,12 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
                 LOG.error(_LE("Unable to obtain MAC address for unique ID. "
                               "Agent terminated!"))
                 exit(1)
+
+        self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
+        self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
+        self.sg_agent = sg_rpc.SecurityGroupAgentRpc(
+            self.context, self.sg_plugin_rpc, defer_refresh_firewall=True)
+
         self.agent_id = '%s%s' % ('lb', (mac.replace(":", "")))
         LOG.info(_LI("RPC agent_id: %s"), self.agent_id)
 
@@ -883,17 +912,21 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
         # Define the listening consumers for the agent
         consumers = [[topics.PORT, topics.UPDATE],
                      [topics.NETWORK, topics.DELETE],
+                     [topics.NETWORK, topics.UPDATE],
                      [topics.SECURITY_GROUP, topics.UPDATE]]
+
         if cfg.CONF.VXLAN.l2_population:
             consumers.append([topics.L2POPULATION, topics.UPDATE])
         self.connection = agent_rpc.create_consumers(self.endpoints,
                                                      self.topic,
                                                      consumers)
-        report_interval = cfg.CONF.AGENT.report_interval
-        if report_interval:
-            heartbeat = loopingcall.FixedIntervalLoopingCall(
-                self._report_state)
-            heartbeat.start(interval=report_interval)
+
+    def init_extension_manager(self, connection):
+        ext_manager.register_opts(cfg.CONF)
+        self.ext_manager = (
+            ext_manager.AgentExtensionsManager(cfg.CONF))
+        self.ext_manager.initialize(
+            connection, lconst.EXTENSION_DRIVER_TYPE)
 
     def setup_linux_bridge(self, bridge_mappings, interface_mappings):
         self.br_mgr = LinuxBridgeManager(bridge_mappings, interface_mappings)
@@ -907,6 +940,22 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
         else:
             ip_lib.IPDevice(tap_name).link.set_down()
 
+    def _clean_network_ports(self, device):
+        for netid, ports_list in self.network_ports.items():
+            for port_data in ports_list:
+                if device == port_data['device']:
+                    ports_list.remove(port_data)
+                    if ports_list == []:
+                        self.network_ports.pop(netid)
+                    return port_data['port_id']
+
+    def _update_network_ports(self, network_id, port_id, device):
+        self._clean_network_ports(device)
+        self.network_ports[network_id].append({
+            "port_id": port_id,
+            "device": device
+        })
+
     def process_network_devices(self, device_info):
         resync_a = False
         resync_b = False
@@ -1006,6 +1055,10 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
                                                            device,
                                                            self.agent_id,
                                                            cfg.CONF.host)
+                self._update_network_ports(device_details['network_id'],
+                                           device_details['port_id'],
+                                           device_details['device'])
+                self.ext_manager.handle_port(self.context, device_details)
             else:
                 LOG.info(_LI("Device %s not defined on plugin"), device)
         return False
@@ -1029,6 +1082,10 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
                 LOG.info(_LI("Port %s updated."), device)
             else:
                 LOG.debug("Device %s not defined on plugin", device)
+            port_id = self._clean_network_ports(device)
+            self.ext_manager.delete_port(self.context,
+                                         {'device': device,
+                                          'port_id': port_id})
         if self.prevent_arp_spoofing:
             arp_protect.delete_arp_spoofing_protection(devices)
         return resync
index db7e36267a72232eb3f786f430d9fc278792e5af..84532f1a26099a4d31ccaf07c49cf5b7f72c6d28 100644 (file)
@@ -12,6 +12,7 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import collections
 import sys
 
 import mock
@@ -30,12 +31,18 @@ from neutron.plugins.ml2.drivers.linuxbridge.agent \
 from neutron.tests import base
 
 LOCAL_IP = '192.168.0.33'
+PORT_1 = 'abcdef01-12ddssdfds-fdsfsd'
 DEVICE_1 = 'tapabcdef01-12'
+NETWORK_ID = '57653b20-ed5b-4ed0-a31d-06f84e3fd909'
 BRIDGE_MAPPING_VALUE = 'br-eth2'
 BRIDGE_MAPPINGS = {'physnet0': BRIDGE_MAPPING_VALUE}
 INTERFACE_MAPPINGS = {'physnet1': 'eth1'}
 FAKE_DEFAULT_DEV = mock.Mock()
 FAKE_DEFAULT_DEV.name = 'eth1'
+PORT_DATA = {
+    "port_id": PORT_1,
+    "device": DEVICE_1
+}
 
 
 class FakeIpLinkCommand(object):
@@ -128,10 +135,13 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
         agent = self.agent
         agent._ensure_port_admin_state = mock.Mock()
         devices = [DEVICE_1]
+        agent.network_ports[NETWORK_ID].append(PORT_DATA)
         with mock.patch.object(agent.plugin_rpc,
                                "update_device_down") as fn_udd,\
                 mock.patch.object(agent.sg_agent,
-                                  "remove_devices_filter") as fn_rdf:
+                                  "remove_devices_filter") as fn_rdf,\
+                mock.patch.object(agent.ext_manager,
+                                  "delete_port") as ext_mgr_delete_port:
             fn_udd.return_value = {'device': DEVICE_1,
                                    'exists': True}
             with mock.patch.object(linuxbridge_neutron_agent.LOG,
@@ -141,14 +151,21 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
                 self.assertFalse(resync)
                 self.assertTrue(fn_udd.called)
                 self.assertTrue(fn_rdf.called)
+                self.assertTrue(ext_mgr_delete_port.called)
+                self.assertTrue(
+                    PORT_DATA not in agent.network_ports[NETWORK_ID]
+                )
 
     def test_treat_devices_removed_with_not_existed_device(self):
         agent = self.agent
         devices = [DEVICE_1]
+        agent.network_ports[NETWORK_ID].append(PORT_DATA)
         with mock.patch.object(agent.plugin_rpc,
                                "update_device_down") as fn_udd,\
                 mock.patch.object(agent.sg_agent,
-                                  "remove_devices_filter") as fn_rdf:
+                                  "remove_devices_filter") as fn_rdf,\
+                mock.patch.object(agent.ext_manager,
+                                  "delete_port") as ext_mgr_delete_port:
             fn_udd.return_value = {'device': DEVICE_1,
                                    'exists': False}
             with mock.patch.object(linuxbridge_neutron_agent.LOG,
@@ -158,19 +175,30 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
                 self.assertFalse(resync)
                 self.assertTrue(fn_udd.called)
                 self.assertTrue(fn_rdf.called)
+                self.assertTrue(ext_mgr_delete_port.called)
+                self.assertTrue(
+                    PORT_DATA not in agent.network_ports[NETWORK_ID]
+                )
 
     def test_treat_devices_removed_failed(self):
         agent = self.agent
         devices = [DEVICE_1]
+        agent.network_ports[NETWORK_ID].append(PORT_DATA)
         with mock.patch.object(agent.plugin_rpc,
                                "update_device_down") as fn_udd,\
                 mock.patch.object(agent.sg_agent,
-                                  "remove_devices_filter") as fn_rdf:
+                                  "remove_devices_filter") as fn_rdf,\
+                mock.patch.object(agent.ext_manager,
+                                  "delete_port") as ext_mgr_delete_port:
             fn_udd.side_effect = Exception()
             resync = agent.treat_devices_removed(devices)
             self.assertTrue(resync)
             self.assertTrue(fn_udd.called)
             self.assertTrue(fn_rdf.called)
+            self.assertTrue(ext_mgr_delete_port.called)
+            self.assertTrue(
+                PORT_DATA not in agent.network_ports[NETWORK_ID]
+            )
 
     def _test_scan_devices(self, previous, updated,
                            fake_current, expected, sync):
@@ -273,6 +301,27 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
         self._test_scan_devices(previous, updated, fake_current, expected,
                                 sync=False)
 
+    def test_scan_devices_updated_deleted_concurrently(self):
+        previous = {
+            'current': set([1, 2]),
+            'updated': set(),
+            'added': set(),
+            'removed': set()
+        }
+        # Device 2 disappeared.
+        fake_current = set([1])
+        # Device 2 got an concurrent update via network_update
+        updated = set([2])
+        expected = {
+            'current': set([1]),
+            'updated': set(),
+            'added': set(),
+            'removed': set([2])
+        }
+        self._test_scan_devices(
+            previous, updated, fake_current, expected, sync=False
+        )
+
     def test_scan_devices_updated_on_sync(self):
         previous = {'current': set([1, 2]),
                     'updated': set([1]),
@@ -318,6 +367,11 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
                         'segmentation_id': 100,
                         'physical_network': 'physnet1',
                         'device_owner': constants.DEVICE_OWNER_NETWORK_PREFIX}
+        mock_port_data = {
+            'port_id': mock_details['port_id'],
+            'device': mock_details['device']
+        }
+        agent.ext_manager = mock.Mock()
         agent.plugin_rpc = mock.Mock()
         agent.plugin_rpc.get_devices_details_list.return_value = [mock_details]
         agent.br_mgr = mock.Mock()
@@ -331,6 +385,10 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
                                       100, 'port123',
                                       constants.DEVICE_OWNER_NETWORK_PREFIX)
         self.assertTrue(agent.plugin_rpc.update_device_up.called)
+        self.assertTrue(agent.ext_manager.handle_port.called)
+        self.assertTrue(
+            mock_port_data in agent.network_ports[mock_details['network_id']]
+        )
 
     def test_set_rpc_timeout(self):
         self.agent.stop()
@@ -370,6 +428,63 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
     def test_ensure_port_admin_state_down(self):
         self._test_ensure_port_admin_state(False)
 
+    def test_update_network_ports(self):
+        port_1_data = PORT_DATA
+        NETWORK_2_ID = 'fake_second_network'
+        port_2_data = {
+            'port_id': 'fake_port_2',
+            'device': 'fake_port_2_device_name'
+        }
+        self.agent.network_ports[NETWORK_ID].append(
+            port_1_data
+        )
+        self.agent.network_ports[NETWORK_ID].append(
+            port_2_data
+        )
+        #check update port:
+        self.agent._update_network_ports(
+            NETWORK_2_ID, port_2_data['port_id'], port_2_data['device']
+        )
+        self.assertTrue(
+            port_2_data not in self.agent.network_ports[NETWORK_ID]
+        )
+        self.assertTrue(
+            port_2_data in self.agent.network_ports[NETWORK_2_ID]
+        )
+
+    def test_clean_network_ports(self):
+        port_1_data = PORT_DATA
+        port_2_data = {
+            'port_id': 'fake_port_2',
+            'device': 'fake_port_2_device_name'
+        }
+        self.agent.network_ports[NETWORK_ID].append(
+            port_1_data
+        )
+        self.agent.network_ports[NETWORK_ID].append(
+            port_2_data
+        )
+        #check removing port from network when other ports are still there:
+        cleaned_port_id = self.agent._clean_network_ports(DEVICE_1)
+        self.assertTrue(
+            NETWORK_ID in self.agent.network_ports.keys()
+        )
+        self.assertTrue(
+            port_1_data not in self.agent.network_ports[NETWORK_ID]
+        )
+        self.assertTrue(
+            port_2_data in self.agent.network_ports[NETWORK_ID]
+        )
+        self.assertEqual(cleaned_port_id, PORT_1)
+        #and now remove last port from network:
+        cleaned_port_id = self.agent._clean_network_ports(
+            port_2_data['device']
+        )
+        self.assertTrue(
+            NETWORK_ID not in self.agent.network_ports.keys()
+        )
+        self.assertEqual(cleaned_port_id, port_2_data['port_id'])
+
 
 class TestLinuxBridgeManager(base.BaseTestCase):
     def setUp(self):
@@ -1048,6 +1163,8 @@ class TestLinuxBridgeRpcCallbacks(base.BaseTestCase):
                 segment.network_type = 'vxlan'
                 segment.segmentation_id = 1
                 self.br_mgr.network_map['net_id'] = segment
+                self.updated_devices = set()
+                self.network_ports = collections.defaultdict(list)
 
         self.lb_rpc = linuxbridge_neutron_agent.LinuxBridgeRpcCallbacks(
             object(),
@@ -1059,17 +1176,30 @@ class TestLinuxBridgeRpcCallbacks(base.BaseTestCase):
         mock_net = mock.Mock()
         mock_net.physical_network = None
 
-        self.lb_rpc.agent.br_mgr.network_map = {'123': mock_net}
+        self.lb_rpc.agent.br_mgr.network_map = {NETWORK_ID: mock_net}
 
         with mock.patch.object(self.lb_rpc.agent.br_mgr,
                                "get_bridge_name") as get_br_fn,\
                 mock.patch.object(self.lb_rpc.agent.br_mgr,
                                   "delete_bridge") as del_fn:
             get_br_fn.return_value = "br0"
-            self.lb_rpc.network_delete("anycontext", network_id="123")
-            get_br_fn.assert_called_with("123")
+            self.lb_rpc.network_delete("anycontext", network_id=NETWORK_ID)
+            get_br_fn.assert_called_with(NETWORK_ID)
             del_fn.assert_called_with("br0")
 
+    def test_port_update(self):
+        port = {'id': PORT_1}
+        self.lb_rpc.port_update(context=None, port=port)
+        self.assertEqual(set([DEVICE_1]), self.lb_rpc.agent.updated_devices)
+
+    def test_network_update(self):
+        updated_network = {'id': NETWORK_ID}
+        self.lb_rpc.agent.network_ports = {
+            NETWORK_ID: [PORT_DATA]
+        }
+        self.lb_rpc.network_update(context=None, network=updated_network)
+        self.assertEqual(set([DEVICE_1]), self.lb_rpc.agent.updated_devices)
+
     def test_network_delete_with_existed_brq(self):
         mock_net = mock.Mock()
         mock_net.physical_network = 'physnet0'
diff --git a/releasenotes/notes/linuxbridge-agent-extensions-66bdf9feee25ef99.yaml b/releasenotes/notes/linuxbridge-agent-extensions-66bdf9feee25ef99.yaml
new file mode 100644 (file)
index 0000000..a839c91
--- /dev/null
@@ -0,0 +1,8 @@
+---
+prelude: >
+    The Linuxbridge agent now supports l2 agent extensions.
+features:
+  - The Linuxbridge agent can now be extended by 3rd parties using a pluggable
+    mechanism.
+fixes:
+  - partially closes bug 1468803