# License for the specific language governing permissions and limitations
# under the License.
+import collections
import hashlib
import signal
import sys
# 1.1 Support Security Group RPC
# 1.2 Support DVR (Distributed Virtual Router) RPC
# 1.3 Added param devices_to_update to security_groups_provider_updated
- target = oslo_messaging.Target(version='1.3')
+ # 1.4 Added support for network_update
+ target = oslo_messaging.Target(version='1.4')
def __init__(self, bridge_classes, integ_br, tun_br, local_ip,
bridge_mappings, polling_interval, tunnel_types=None,
self.updated_ports = set()
# Stores port delete notifications
self.deleted_ports = set()
+
+ self.network_ports = collections.defaultdict(set)
# keeps association between ports and ofports to detect ofport change
self.vifname_to_ofport_map = {}
self.setup_rpc()
[constants.TUNNEL, topics.UPDATE],
[constants.TUNNEL, topics.DELETE],
[topics.SECURITY_GROUP, topics.UPDATE],
- [topics.DVR, topics.UPDATE]]
+ [topics.DVR, topics.UPDATE],
+ [topics.NETWORK, topics.UPDATE]]
if self.l2_pop:
consumers.append([topics.L2POPULATION,
topics.UPDATE, self.conf.host])
def port_delete(self, context, **kwargs):
port_id = kwargs.get('port_id')
self.deleted_ports.add(port_id)
+ self.updated_ports.discard(port_id)
LOG.debug("port_delete message processed for port %s", port_id)
+ def network_update(self, context, **kwargs):
+ network_id = kwargs['network']['id']
+ for port_id in self.network_ports[network_id]:
+ # notifications could arrive out of order, if the port is deleted
+ # we don't want to update it anymore
+ if port_id not in self.deleted_ports:
+ self.updated_ports.add(port_id)
+ LOG.debug("network_update message processed for network "
+ "%(network_id)s, with ports: %(ports)s",
+ {'network_id': network_id,
+ 'ports': self.network_ports[network_id]})
+
+ def _clean_network_ports(self, port_id):
+ for port_set in self.network_ports.values():
+ if port_id in port_set:
+ port_set.remove(port_id)
+ break
+
def process_deleted_ports(self, port_info):
# don't try to process removed ports as deleted ports since
# they are already gone
# longer have access to the network
self.sg_agent.remove_devices_filter([port_id])
port = self.int_br.get_vif_port_by_id(port_id)
+ self._clean_network_ports(port_id)
self.ext_manager.delete_port(self.context,
{"vif_port": port,
"port_id": port_id})
has_sgs = 'security_groups' in details
if not port_security or not has_sgs:
security_disabled_devices.append(device)
-
+ self._update_port_network(details['port_id'],
+ details['network_id'])
self.ext_manager.handle_port(self.context, details)
else:
LOG.warn(_LW("Device %s not defined on plugin"), device)
self.port_dead(port)
return skipped_devices, need_binding_devices, security_disabled_devices
+ def _update_port_network(self, port_id, network_id):
+ self._clean_network_ports(port_id)
+ self.network_ports[network_id].add(port_id)
+
def treat_ancillary_devices_added(self, devices):
devices_details_list = (
self.plugin_rpc.get_devices_details_list_and_failed_devices(
self._process_l3_update(context, updated_network, net_data)
self.type_manager.extend_network_dict_provider(context,
updated_network)
+
+ # TODO(QoS): Move out to the extension framework somehow.
+ need_network_update_notify = (
+ qos_consts.QOS_POLICY_ID in net_data and
+ original_network[qos_consts.QOS_POLICY_ID] !=
+ updated_network[qos_consts.QOS_POLICY_ID])
+
mech_context = driver_context.NetworkContext(
self, context, updated_network,
original_network=original_network)
# now the error is propogated to the caller, which is expected to
# either undo/retry the operation or delete the resource.
self.mechanism_manager.update_network_postcommit(mech_context)
+ if need_network_update_notify:
+ self.notifier.network_update(context, updated_network)
return updated_network
def get_network(self, context, id, fields=None):
1.0 - Initial version.
1.1 - Added get_active_networks_info, create_dhcp_port,
update_dhcp_port, and removed get_dhcp_port methods.
-
+ 1.4 - Added network_update
"""
def __init__(self, topic):
self.topic_port_delete = topics.get_topic_name(topic,
topics.PORT,
topics.DELETE)
+ self.topic_network_update = topics.get_topic_name(topic,
+ topics.NETWORK,
+ topics.UPDATE)
target = oslo_messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
cctxt = self.client.prepare(topic=self.topic_port_delete,
fanout=True)
cctxt.cast(context, 'port_delete', port_id=port_id)
+
+ def network_update(self, context, network):
+ cctxt = self.client.prepare(topic=self.topic_network_update,
+ fanout=True, version='1.4')
+ cctxt.cast(context, 'network_update', network=network)
FAKE_IP1 = '10.0.0.1'
FAKE_IP2 = '10.0.0.2'
+TEST_PORT_ID1 = 'port-id-1'
+TEST_PORT_ID2 = 'port-id-2'
+TEST_PORT_ID3 = 'port-id-3'
+
+TEST_NETWORK_ID1 = 'net-id-1'
+TEST_NETWORK_ID2 = 'net-id-2'
+
class FakeVif(object):
ofport = 99
self.agent.agent_state, True)
def test_port_update(self):
- port = {"id": "123",
- "network_id": "124",
+ port = {"id": TEST_PORT_ID1,
+ "network_id": TEST_NETWORK_ID1,
"admin_state_up": False}
self.agent.port_update("unused_context",
port=port,
network_type="vlan",
segmentation_id="1",
physical_network="physnet")
- self.assertEqual(set(['123']), self.agent.updated_ports)
+ self.assertEqual(set([TEST_PORT_ID1]), self.agent.updated_ports)
+
+ def test_port_delete_after_update(self):
+ """Make sure a port is not marked for delete and update."""
+ port = {'id': TEST_PORT_ID1}
+
+ self.agent.port_update(context=None, port=port)
+ self.agent.port_delete(context=None, port_id=port['id'])
+ self.assertEqual(set(), self.agent.updated_ports)
+ self.assertEqual(set([port['id']]), self.agent.deleted_ports)
+
+ def test_process_deleted_ports_cleans_network_ports(self):
+ self.agent._update_port_network(TEST_PORT_ID1, TEST_NETWORK_ID1)
+ self.agent.port_delete(context=None, port_id=TEST_PORT_ID1)
+ self.agent.sg_agent = mock.Mock()
+ self.agent.int_br = mock.Mock()
+ self.agent.process_deleted_ports(port_info={})
+ self.assertEqual(set(), self.agent.network_ports[TEST_NETWORK_ID1])
+
+ def test_network_update(self):
+ """Network update marks port for update. """
+ network = {'id': TEST_NETWORK_ID1}
+ port = {'id': TEST_PORT_ID1, 'network_id': network['id']}
+
+ self.agent._update_port_network(port['id'], port['network_id'])
+ self.agent.network_update(context=None, network=network)
+ self.assertEqual(set([port['id']]), self.agent.updated_ports)
+
+ def test_network_update_outoforder(self):
+ """Network update arrives later than port_delete.
+
+ But the main agent loop still didn't process the ports,
+ so we ensure the port is not marked for update.
+ """
+ network = {'id': TEST_NETWORK_ID1}
+ port = {'id': TEST_PORT_ID1, 'network_id': network['id']}
+
+ self.agent._update_port_network(port['id'], port['network_id'])
+ self.agent.port_delete(context=None, port_id=port['id'])
+ self.agent.network_update(context=None, network=network)
+ self.assertEqual(set(), self.agent.updated_ports)
+
+ def test_update_port_network(self):
+ """Ensure ports are associated and moved across networks correctly."""
+ self.agent._update_port_network(TEST_PORT_ID1, TEST_NETWORK_ID1)
+ self.agent._update_port_network(TEST_PORT_ID2, TEST_NETWORK_ID1)
+ self.agent._update_port_network(TEST_PORT_ID3, TEST_NETWORK_ID2)
+ self.agent._update_port_network(TEST_PORT_ID1, TEST_NETWORK_ID2)
+
+ self.assertEqual(set([TEST_PORT_ID2]),
+ self.agent.network_ports[TEST_NETWORK_ID1])
+ self.assertEqual(set([TEST_PORT_ID1, TEST_PORT_ID3]),
+ self.agent.network_ports[TEST_NETWORK_ID2])
def test_port_delete(self):
vif = FakeVif()