From: Miguel Angel Ajo Date: Fri, 21 Aug 2015 12:40:05 +0000 (+0200) Subject: Process update_network in the openvswitch agent X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=b292e197ff8fde50823717263ae0b13fc66c980b;p=openstack-build%2Fneutron-build.git Process update_network in the openvswitch agent This will allow ports with attributes related to the network to be updated as necessary. Initially QoS extension which is able to attach a network policy to the port. Another approach would be sending updates to every single port on a network, but that doesn't scale well for networks with lots of ports. Change-Id: Ie28297840b5702a920142af02dd17b10775d76ca Partially-Implements: blueprint ml2-qos Closes-Bug: 1486028 --- diff --git a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py index 2122fe339..a668bdda4 100644 --- a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import collections import hashlib import signal import sys @@ -128,7 +129,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, # 1.1 Support Security Group RPC # 1.2 Support DVR (Distributed Virtual Router) RPC # 1.3 Added param devices_to_update to security_groups_provider_updated - target = oslo_messaging.Target(version='1.3') + # 1.4 Added support for network_update + target = oslo_messaging.Target(version='1.4') def __init__(self, bridge_classes, integ_br, tun_br, local_ip, bridge_mappings, polling_interval, tunnel_types=None, @@ -226,6 +228,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, self.updated_ports = set() # Stores port delete notifications self.deleted_ports = set() + + self.network_ports = collections.defaultdict(set) # keeps association between ports and ofports to detect ofport change self.vifname_to_ofport_map = {} self.setup_rpc() @@ -361,7 +365,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, [constants.TUNNEL, topics.UPDATE], [constants.TUNNEL, topics.DELETE], [topics.SECURITY_GROUP, topics.UPDATE], - [topics.DVR, topics.UPDATE]] + [topics.DVR, topics.UPDATE], + [topics.NETWORK, topics.UPDATE]] if self.l2_pop: consumers.append([topics.L2POPULATION, topics.UPDATE, self.conf.host]) @@ -394,8 +399,27 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, def port_delete(self, context, **kwargs): port_id = kwargs.get('port_id') self.deleted_ports.add(port_id) + self.updated_ports.discard(port_id) LOG.debug("port_delete message processed for port %s", port_id) + def network_update(self, context, **kwargs): + network_id = kwargs['network']['id'] + for port_id in self.network_ports[network_id]: + # notifications could arrive out of order, if the port is deleted + # we don't want to update it anymore + if port_id not in self.deleted_ports: + self.updated_ports.add(port_id) + LOG.debug("network_update message processed for network " + "%(network_id)s, with ports: %(ports)s", + {'network_id': network_id, + 'ports': self.network_ports[network_id]}) + + def _clean_network_ports(self, port_id): + for port_set in self.network_ports.values(): + if port_id in port_set: + port_set.remove(port_id) + break + def process_deleted_ports(self, port_info): # don't try to process removed ports as deleted ports since # they are already gone @@ -407,6 +431,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, # longer have access to the network self.sg_agent.remove_devices_filter([port_id]) port = self.int_br.get_vif_port_by_id(port_id) + self._clean_network_ports(port_id) self.ext_manager.delete_port(self.context, {"vif_port": port, "port_id": port_id}) @@ -1297,7 +1322,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, has_sgs = 'security_groups' in details if not port_security or not has_sgs: security_disabled_devices.append(device) - + self._update_port_network(details['port_id'], + details['network_id']) self.ext_manager.handle_port(self.context, details) else: LOG.warn(_LW("Device %s not defined on plugin"), device) @@ -1305,6 +1331,10 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, self.port_dead(port) return skipped_devices, need_binding_devices, security_disabled_devices + def _update_port_network(self, port_id, network_id): + self._clean_network_ports(port_id) + self.network_ports[network_id].add(port_id) + def treat_ancillary_devices_added(self, devices): devices_details_list = ( self.plugin_rpc.get_devices_details_list_and_failed_devices( diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index 5d9a21361..1d8213bc9 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -665,6 +665,13 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, self._process_l3_update(context, updated_network, net_data) self.type_manager.extend_network_dict_provider(context, updated_network) + + # TODO(QoS): Move out to the extension framework somehow. + need_network_update_notify = ( + qos_consts.QOS_POLICY_ID in net_data and + original_network[qos_consts.QOS_POLICY_ID] != + updated_network[qos_consts.QOS_POLICY_ID]) + mech_context = driver_context.NetworkContext( self, context, updated_network, original_network=original_network) @@ -675,6 +682,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, # now the error is propogated to the caller, which is expected to # either undo/retry the operation or delete the resource. self.mechanism_manager.update_network_postcommit(mech_context) + if need_network_update_notify: + self.notifier.network_update(context, updated_network) return updated_network def get_network(self, context, id, fields=None): diff --git a/neutron/plugins/ml2/rpc.py b/neutron/plugins/ml2/rpc.py index 8ae7997a2..b402c58c1 100644 --- a/neutron/plugins/ml2/rpc.py +++ b/neutron/plugins/ml2/rpc.py @@ -279,7 +279,7 @@ class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin, 1.0 - Initial version. 1.1 - Added get_active_networks_info, create_dhcp_port, update_dhcp_port, and removed get_dhcp_port methods. - + 1.4 - Added network_update """ def __init__(self, topic): @@ -293,6 +293,9 @@ class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin, self.topic_port_delete = topics.get_topic_name(topic, topics.PORT, topics.DELETE) + self.topic_network_update = topics.get_topic_name(topic, + topics.NETWORK, + topics.UPDATE) target = oslo_messaging.Target(topic=topic, version='1.0') self.client = n_rpc.get_client(target) @@ -314,3 +317,8 @@ class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin, cctxt = self.client.prepare(topic=self.topic_port_delete, fanout=True) cctxt.cast(context, 'port_delete', port_id=port_id) + + def network_update(self, context, network): + cctxt = self.client.prepare(topic=self.topic_network_update, + fanout=True, version='1.4') + cctxt.cast(context, 'network_update', network=network) diff --git a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py index 07c02f362..569befe60 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py @@ -43,6 +43,13 @@ FAKE_MAC = '00:11:22:33:44:55' FAKE_IP1 = '10.0.0.1' FAKE_IP2 = '10.0.0.2' +TEST_PORT_ID1 = 'port-id-1' +TEST_PORT_ID2 = 'port-id-2' +TEST_PORT_ID3 = 'port-id-3' + +TEST_NETWORK_ID1 = 'net-id-1' +TEST_NETWORK_ID2 = 'net-id-2' + class FakeVif(object): ofport = 99 @@ -629,15 +636,67 @@ class TestOvsNeutronAgent(object): self.agent.agent_state, True) def test_port_update(self): - port = {"id": "123", - "network_id": "124", + port = {"id": TEST_PORT_ID1, + "network_id": TEST_NETWORK_ID1, "admin_state_up": False} self.agent.port_update("unused_context", port=port, network_type="vlan", segmentation_id="1", physical_network="physnet") - self.assertEqual(set(['123']), self.agent.updated_ports) + self.assertEqual(set([TEST_PORT_ID1]), self.agent.updated_ports) + + def test_port_delete_after_update(self): + """Make sure a port is not marked for delete and update.""" + port = {'id': TEST_PORT_ID1} + + self.agent.port_update(context=None, port=port) + self.agent.port_delete(context=None, port_id=port['id']) + self.assertEqual(set(), self.agent.updated_ports) + self.assertEqual(set([port['id']]), self.agent.deleted_ports) + + def test_process_deleted_ports_cleans_network_ports(self): + self.agent._update_port_network(TEST_PORT_ID1, TEST_NETWORK_ID1) + self.agent.port_delete(context=None, port_id=TEST_PORT_ID1) + self.agent.sg_agent = mock.Mock() + self.agent.int_br = mock.Mock() + self.agent.process_deleted_ports(port_info={}) + self.assertEqual(set(), self.agent.network_ports[TEST_NETWORK_ID1]) + + def test_network_update(self): + """Network update marks port for update. """ + network = {'id': TEST_NETWORK_ID1} + port = {'id': TEST_PORT_ID1, 'network_id': network['id']} + + self.agent._update_port_network(port['id'], port['network_id']) + self.agent.network_update(context=None, network=network) + self.assertEqual(set([port['id']]), self.agent.updated_ports) + + def test_network_update_outoforder(self): + """Network update arrives later than port_delete. + + But the main agent loop still didn't process the ports, + so we ensure the port is not marked for update. + """ + network = {'id': TEST_NETWORK_ID1} + port = {'id': TEST_PORT_ID1, 'network_id': network['id']} + + self.agent._update_port_network(port['id'], port['network_id']) + self.agent.port_delete(context=None, port_id=port['id']) + self.agent.network_update(context=None, network=network) + self.assertEqual(set(), self.agent.updated_ports) + + def test_update_port_network(self): + """Ensure ports are associated and moved across networks correctly.""" + self.agent._update_port_network(TEST_PORT_ID1, TEST_NETWORK_ID1) + self.agent._update_port_network(TEST_PORT_ID2, TEST_NETWORK_ID1) + self.agent._update_port_network(TEST_PORT_ID3, TEST_NETWORK_ID2) + self.agent._update_port_network(TEST_PORT_ID1, TEST_NETWORK_ID2) + + self.assertEqual(set([TEST_PORT_ID2]), + self.agent.network_ports[TEST_NETWORK_ID1]) + self.assertEqual(set([TEST_PORT_ID1, TEST_PORT_ID3]), + self.agent.network_ports[TEST_NETWORK_ID2]) def test_port_delete(self): vif = FakeVif()