]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Process update_network in the openvswitch agent
authorMiguel Angel Ajo <mangelajo@redhat.com>
Fri, 21 Aug 2015 12:40:05 +0000 (14:40 +0200)
committerMiguel Angel Ajo <mangelajo@redhat.com>
Thu, 27 Aug 2015 13:36:20 +0000 (15:36 +0200)
This will allow ports with attributes related to the network to be
updated as necessary. Initially QoS extension which is able to
attach a network policy to the port.

Another approach would be sending updates to every single port
on a network, but that doesn't scale well for networks with lots
of ports.

Change-Id: Ie28297840b5702a920142af02dd17b10775d76ca
Partially-Implements: blueprint ml2-qos
Closes-Bug: 1486028

neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py
neutron/plugins/ml2/plugin.py
neutron/plugins/ml2/rpc.py
neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py

index 2122fe339a2bcaa641dae1bac29382d8de9c35b7..a668bdda47dec69d4a92c690ca68bf4ee383a2f3 100644 (file)
@@ -13,6 +13,7 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import collections
 import hashlib
 import signal
 import sys
@@ -128,7 +129,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
     #   1.1 Support Security Group RPC
     #   1.2 Support DVR (Distributed Virtual Router) RPC
     #   1.3 Added param devices_to_update to security_groups_provider_updated
-    target = oslo_messaging.Target(version='1.3')
+    #   1.4 Added support for network_update
+    target = oslo_messaging.Target(version='1.4')
 
     def __init__(self, bridge_classes, integ_br, tun_br, local_ip,
                  bridge_mappings, polling_interval, tunnel_types=None,
@@ -226,6 +228,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
         self.updated_ports = set()
         # Stores port delete notifications
         self.deleted_ports = set()
+
+        self.network_ports = collections.defaultdict(set)
         # keeps association between ports and ofports to detect ofport change
         self.vifname_to_ofport_map = {}
         self.setup_rpc()
@@ -361,7 +365,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                      [constants.TUNNEL, topics.UPDATE],
                      [constants.TUNNEL, topics.DELETE],
                      [topics.SECURITY_GROUP, topics.UPDATE],
-                     [topics.DVR, topics.UPDATE]]
+                     [topics.DVR, topics.UPDATE],
+                     [topics.NETWORK, topics.UPDATE]]
         if self.l2_pop:
             consumers.append([topics.L2POPULATION,
                               topics.UPDATE, self.conf.host])
@@ -394,8 +399,27 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
     def port_delete(self, context, **kwargs):
         port_id = kwargs.get('port_id')
         self.deleted_ports.add(port_id)
+        self.updated_ports.discard(port_id)
         LOG.debug("port_delete message processed for port %s", port_id)
 
+    def network_update(self, context, **kwargs):
+        network_id = kwargs['network']['id']
+        for port_id in self.network_ports[network_id]:
+            # notifications could arrive out of order, if the port is deleted
+            # we don't want to update it anymore
+            if port_id not in self.deleted_ports:
+                self.updated_ports.add(port_id)
+        LOG.debug("network_update message processed for network "
+                  "%(network_id)s, with ports: %(ports)s",
+                  {'network_id': network_id,
+                   'ports': self.network_ports[network_id]})
+
+    def _clean_network_ports(self, port_id):
+        for port_set in self.network_ports.values():
+            if port_id in port_set:
+                port_set.remove(port_id)
+                break
+
     def process_deleted_ports(self, port_info):
         # don't try to process removed ports as deleted ports since
         # they are already gone
@@ -407,6 +431,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
             # longer have access to the network
             self.sg_agent.remove_devices_filter([port_id])
             port = self.int_br.get_vif_port_by_id(port_id)
+            self._clean_network_ports(port_id)
             self.ext_manager.delete_port(self.context,
                                          {"vif_port": port,
                                           "port_id": port_id})
@@ -1297,7 +1322,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                 has_sgs = 'security_groups' in details
                 if not port_security or not has_sgs:
                     security_disabled_devices.append(device)
-
+                self._update_port_network(details['port_id'],
+                                          details['network_id'])
                 self.ext_manager.handle_port(self.context, details)
             else:
                 LOG.warn(_LW("Device %s not defined on plugin"), device)
@@ -1305,6 +1331,10 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                     self.port_dead(port)
         return skipped_devices, need_binding_devices, security_disabled_devices
 
+    def _update_port_network(self, port_id, network_id):
+        self._clean_network_ports(port_id)
+        self.network_ports[network_id].add(port_id)
+
     def treat_ancillary_devices_added(self, devices):
         devices_details_list = (
             self.plugin_rpc.get_devices_details_list_and_failed_devices(
index 5d9a21361963c7911d274c578ba30c01798cd9ce..1d8213bc97853a1975ed82c84c57fb9edd0ec5ba 100644 (file)
@@ -665,6 +665,13 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
             self._process_l3_update(context, updated_network, net_data)
             self.type_manager.extend_network_dict_provider(context,
                                                            updated_network)
+
+            # TODO(QoS): Move out to the extension framework somehow.
+            need_network_update_notify = (
+                qos_consts.QOS_POLICY_ID in net_data and
+                original_network[qos_consts.QOS_POLICY_ID] !=
+                updated_network[qos_consts.QOS_POLICY_ID])
+
             mech_context = driver_context.NetworkContext(
                 self, context, updated_network,
                 original_network=original_network)
@@ -675,6 +682,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
         # now the error is propogated to the caller, which is expected to
         # either undo/retry the operation or delete the resource.
         self.mechanism_manager.update_network_postcommit(mech_context)
+        if need_network_update_notify:
+            self.notifier.network_update(context, updated_network)
         return updated_network
 
     def get_network(self, context, id, fields=None):
index 8ae7997a2187fcecafde73cacf97abfc1ac564a5..b402c58c1843e529c3d4aa629924975e6f35df5c 100644 (file)
@@ -279,7 +279,7 @@ class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
         1.0 - Initial version.
         1.1 - Added get_active_networks_info, create_dhcp_port,
               update_dhcp_port, and removed get_dhcp_port methods.
-
+        1.4 - Added network_update
     """
 
     def __init__(self, topic):
@@ -293,6 +293,9 @@ class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
         self.topic_port_delete = topics.get_topic_name(topic,
                                                        topics.PORT,
                                                        topics.DELETE)
+        self.topic_network_update = topics.get_topic_name(topic,
+                                                          topics.NETWORK,
+                                                          topics.UPDATE)
 
         target = oslo_messaging.Target(topic=topic, version='1.0')
         self.client = n_rpc.get_client(target)
@@ -314,3 +317,8 @@ class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
         cctxt = self.client.prepare(topic=self.topic_port_delete,
                                     fanout=True)
         cctxt.cast(context, 'port_delete', port_id=port_id)
+
+    def network_update(self, context, network):
+        cctxt = self.client.prepare(topic=self.topic_network_update,
+                                    fanout=True, version='1.4')
+        cctxt.cast(context, 'network_update', network=network)
index 07c02f36251a08de2ffca8092d0777e3da255d0a..569befe60fb56734efcddf7b0521f4562ad6ed93 100644 (file)
@@ -43,6 +43,13 @@ FAKE_MAC = '00:11:22:33:44:55'
 FAKE_IP1 = '10.0.0.1'
 FAKE_IP2 = '10.0.0.2'
 
+TEST_PORT_ID1 = 'port-id-1'
+TEST_PORT_ID2 = 'port-id-2'
+TEST_PORT_ID3 = 'port-id-3'
+
+TEST_NETWORK_ID1 = 'net-id-1'
+TEST_NETWORK_ID2 = 'net-id-2'
+
 
 class FakeVif(object):
     ofport = 99
@@ -629,15 +636,67 @@ class TestOvsNeutronAgent(object):
                                          self.agent.agent_state, True)
 
     def test_port_update(self):
-        port = {"id": "123",
-                "network_id": "124",
+        port = {"id": TEST_PORT_ID1,
+                "network_id": TEST_NETWORK_ID1,
                 "admin_state_up": False}
         self.agent.port_update("unused_context",
                                port=port,
                                network_type="vlan",
                                segmentation_id="1",
                                physical_network="physnet")
-        self.assertEqual(set(['123']), self.agent.updated_ports)
+        self.assertEqual(set([TEST_PORT_ID1]), self.agent.updated_ports)
+
+    def test_port_delete_after_update(self):
+        """Make sure a port is not marked for delete and update."""
+        port = {'id': TEST_PORT_ID1}
+
+        self.agent.port_update(context=None, port=port)
+        self.agent.port_delete(context=None, port_id=port['id'])
+        self.assertEqual(set(), self.agent.updated_ports)
+        self.assertEqual(set([port['id']]), self.agent.deleted_ports)
+
+    def test_process_deleted_ports_cleans_network_ports(self):
+        self.agent._update_port_network(TEST_PORT_ID1, TEST_NETWORK_ID1)
+        self.agent.port_delete(context=None, port_id=TEST_PORT_ID1)
+        self.agent.sg_agent = mock.Mock()
+        self.agent.int_br = mock.Mock()
+        self.agent.process_deleted_ports(port_info={})
+        self.assertEqual(set(), self.agent.network_ports[TEST_NETWORK_ID1])
+
+    def test_network_update(self):
+        """Network update marks port for update. """
+        network = {'id': TEST_NETWORK_ID1}
+        port = {'id': TEST_PORT_ID1, 'network_id': network['id']}
+
+        self.agent._update_port_network(port['id'], port['network_id'])
+        self.agent.network_update(context=None, network=network)
+        self.assertEqual(set([port['id']]), self.agent.updated_ports)
+
+    def test_network_update_outoforder(self):
+        """Network update arrives later than port_delete.
+
+        But the main agent loop still didn't process the ports,
+        so we ensure the port is not marked for update.
+        """
+        network = {'id': TEST_NETWORK_ID1}
+        port = {'id': TEST_PORT_ID1, 'network_id': network['id']}
+
+        self.agent._update_port_network(port['id'], port['network_id'])
+        self.agent.port_delete(context=None, port_id=port['id'])
+        self.agent.network_update(context=None, network=network)
+        self.assertEqual(set(), self.agent.updated_ports)
+
+    def test_update_port_network(self):
+        """Ensure ports are associated and moved across networks correctly."""
+        self.agent._update_port_network(TEST_PORT_ID1, TEST_NETWORK_ID1)
+        self.agent._update_port_network(TEST_PORT_ID2, TEST_NETWORK_ID1)
+        self.agent._update_port_network(TEST_PORT_ID3, TEST_NETWORK_ID2)
+        self.agent._update_port_network(TEST_PORT_ID1, TEST_NETWORK_ID2)
+
+        self.assertEqual(set([TEST_PORT_ID2]),
+                         self.agent.network_ports[TEST_NETWORK_ID1])
+        self.assertEqual(set([TEST_PORT_ID1, TEST_PORT_ID3]),
+                         self.agent.network_ports[TEST_NETWORK_ID2])
 
     def test_port_delete(self):
         vif = FakeVif()