Set lock_path correctly.
[openstack-build/neutron-build.git] / neutron / plugins / ml2 / drivers / openvswitch / agent / ovs_neutron_agent.py
1 # Copyright 2011 VMware, Inc.
2 # All Rights Reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15
16 import collections
17 import signal
18 import sys
19 import time
20 import uuid
21
22 import functools
23 import netaddr
24 from oslo_config import cfg
25 from oslo_log import log as logging
26 import oslo_messaging
27 from oslo_service import loopingcall
28 from oslo_service import systemd
29 import six
30 from six import moves
31
32 from neutron._i18n import _, _LE, _LI, _LW
33 from neutron.agent.common import ovs_lib
34 from neutron.agent.common import polling
35 from neutron.agent.common import utils
36 from neutron.agent.l2.extensions import manager as ext_manager
37 from neutron.agent.linux import ip_lib
38 from neutron.agent.linux import polling as linux_polling
39 from neutron.agent import rpc as agent_rpc
40 from neutron.agent import securitygroups_rpc as sg_rpc
41 from neutron.api.rpc.handlers import dvr_rpc
42 from neutron.common import config
43 from neutron.common import constants as n_const
44 from neutron.common import exceptions
45 from neutron.common import ipv6_utils as ipv6
46 from neutron.common import topics
47 from neutron.common import utils as n_utils
48 from neutron import context
49 from neutron.plugins.common import constants as p_const
50 from neutron.plugins.common import utils as p_utils
51 from neutron.plugins.ml2.drivers.l2pop.rpc_manager import l2population_rpc
52 from neutron.plugins.ml2.drivers.openvswitch.agent.common \
53     import constants
54 from neutron.plugins.ml2.drivers.openvswitch.agent \
55     import ovs_dvr_neutron_agent
56
57
58 LOG = logging.getLogger(__name__)
59 cfg.CONF.import_group('AGENT', 'neutron.plugins.ml2.drivers.openvswitch.'
60                       'agent.common.config')
61 cfg.CONF.import_group('OVS', 'neutron.plugins.ml2.drivers.openvswitch.agent.'
62                       'common.config')
63
64 # A placeholder for dead vlans.
65 DEAD_VLAN_TAG = p_const.MAX_VLAN_TAG + 1
66 UINT64_BITMASK = (1 << 64) - 1
67
68
69 class _mac_mydialect(netaddr.mac_unix):
70     word_fmt = '%.2x'
71
72
73 class DeviceListRetrievalError(exceptions.NeutronException):
74     message = _("Unable to retrieve port details for devices: %(devices)s ")
75
76
77 class LocalVLANMapping(object):
78
79     def __init__(self, vlan, network_type, physical_network, segmentation_id,
80                  vif_ports=None):
81         if vif_ports is None:
82             vif_ports = {}
83         self.vlan = vlan
84         self.network_type = network_type
85         self.physical_network = physical_network
86         self.segmentation_id = segmentation_id
87         self.vif_ports = vif_ports
88         # set of tunnel ports on which packets should be flooded
89         self.tun_ofports = set()
90
91     def __str__(self):
92         return ("lv-id = %s type = %s phys-net = %s phys-id = %s" %
93                 (self.vlan, self.network_type, self.physical_network,
94                  self.segmentation_id))
95
96
97 class OVSPluginApi(agent_rpc.PluginApi):
98     pass
99
100
101 def has_zero_prefixlen_address(ip_addresses):
102     return any(netaddr.IPNetwork(ip).prefixlen == 0 for ip in ip_addresses)
103
104
105 class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
106                       l2population_rpc.L2populationRpcCallBackTunnelMixin,
107                       dvr_rpc.DVRAgentRpcCallbackMixin):
108     '''Implements OVS-based tunneling, VLANs and flat networks.
109
110     Two local bridges are created: an integration bridge (defaults to
111     'br-int') and a tunneling bridge (defaults to 'br-tun'). An
112     additional bridge is created for each physical network interface
113     used for VLANs and/or flat networks.
114
115     All VM VIFs are plugged into the integration bridge. VM VIFs on a
116     given virtual network share a common "local" VLAN (i.e. not
117     propagated externally). The VLAN id of this local VLAN is mapped
118     to the physical networking details realizing that virtual network.
119
120     For virtual networks realized as GRE tunnels, a Logical Switch
121     (LS) identifier is used to differentiate tenant traffic on
122     inter-HV tunnels. A mesh of tunnels is created to other
123     Hypervisors in the cloud. These tunnels originate and terminate on
124     the tunneling bridge of each hypervisor. Port patching is done to
125     connect local VLANs on the integration bridge to inter-hypervisor
126     tunnels on the tunnel bridge.
127
128     For each virtual network realized as a VLAN or flat network, a
129     veth or a pair of patch ports is used to connect the local VLAN on
130     the integration bridge with the physical network bridge, with flow
131     rules adding, modifying, or stripping VLAN tags as necessary.
132     '''
133
134     # history
135     #   1.0 Initial version
136     #   1.1 Support Security Group RPC
137     #   1.2 Support DVR (Distributed Virtual Router) RPC
138     #   1.3 Added param devices_to_update to security_groups_provider_updated
139     #   1.4 Added support for network_update
140     target = oslo_messaging.Target(version='1.4')
141
142     def __init__(self, bridge_classes, conf=None):
143         '''Constructor.
144
145         :param bridge_classes: a dict for bridge classes.
146         :param conf: an instance of ConfigOpts
147         '''
148         super(OVSNeutronAgent, self).__init__()
149         self.conf = conf or cfg.CONF
150         self.ovs = ovs_lib.BaseOVS()
151         agent_conf = self.conf.AGENT
152         ovs_conf = self.conf.OVS
153
154         self.fullsync = False
155         # init bridge classes with configured datapath type.
156         self.br_int_cls, self.br_phys_cls, self.br_tun_cls = (
157             functools.partial(bridge_classes[b],
158                               datapath_type=ovs_conf.datapath_type)
159             for b in ('br_int', 'br_phys', 'br_tun'))
160
161         self.use_veth_interconnection = ovs_conf.use_veth_interconnection
162         self.veth_mtu = agent_conf.veth_mtu
163         self.available_local_vlans = set(moves.range(p_const.MIN_VLAN_TAG,
164                                                      p_const.MAX_VLAN_TAG))
165         self.tunnel_types = agent_conf.tunnel_types or []
166         self.l2_pop = agent_conf.l2_population
167         # TODO(ethuleau): Change ARP responder so it's not dependent on the
168         #                 ML2 l2 population mechanism driver.
169         self.enable_distributed_routing = agent_conf.enable_distributed_routing
170         self.arp_responder_enabled = agent_conf.arp_responder and self.l2_pop
171         self.prevent_arp_spoofing = agent_conf.prevent_arp_spoofing
172
173         host = self.conf.host
174         self.agent_id = 'ovs-agent-%s' % host
175
176         if self.tunnel_types:
177             self.enable_tunneling = True
178         else:
179             self.enable_tunneling = False
180
181         # Validate agent configurations
182         self._check_agent_configurations()
183
184         # Keep track of int_br's device count for use by _report_state()
185         self.int_br_device_count = 0
186
187         self.agent_uuid_stamp = uuid.uuid4().int & UINT64_BITMASK
188
189         self.int_br = self.br_int_cls(ovs_conf.integration_bridge)
190         self.setup_integration_br()
191         # Stores port update notifications for processing in main rpc loop
192         self.updated_ports = set()
193         # Stores port delete notifications
194         self.deleted_ports = set()
195
196         self.network_ports = collections.defaultdict(set)
197         # keeps association between ports and ofports to detect ofport change
198         self.vifname_to_ofport_map = {}
199         self.setup_rpc()
200         self.init_extension_manager(self.connection)
201         self.bridge_mappings = self._parse_bridge_mappings(
202             ovs_conf.bridge_mappings)
203         self.setup_physical_bridges(self.bridge_mappings)
204         self.local_vlan_map = {}
205
206         self.tun_br_ofports = {p_const.TYPE_GENEVE: {},
207                                p_const.TYPE_GRE: {},
208                                p_const.TYPE_VXLAN: {}}
209
210         self.polling_interval = agent_conf.polling_interval
211         self.minimize_polling = agent_conf.minimize_polling
212         self.ovsdb_monitor_respawn_interval = (
213             agent_conf.ovsdb_monitor_respawn_interval or
214             constants.DEFAULT_OVSDBMON_RESPAWN)
215         self.local_ip = ovs_conf.local_ip
216         self.tunnel_count = 0
217         self.vxlan_udp_port = agent_conf.vxlan_udp_port
218         self.dont_fragment = agent_conf.dont_fragment
219         self.tunnel_csum = agent_conf.tunnel_csum
220         self.tun_br = None
221         self.patch_int_ofport = constants.OFPORT_INVALID
222         self.patch_tun_ofport = constants.OFPORT_INVALID
223         if self.enable_tunneling:
224             # The patch_int_ofport and patch_tun_ofport are updated
225             # here inside the call to setup_tunnel_br()
226             self.setup_tunnel_br(ovs_conf.tunnel_bridge)
227
228         self.dvr_agent = ovs_dvr_neutron_agent.OVSDVRNeutronAgent(
229             self.context,
230             self.dvr_plugin_rpc,
231             self.int_br,
232             self.tun_br,
233             self.bridge_mappings,
234             self.phys_brs,
235             self.int_ofports,
236             self.phys_ofports,
237             self.patch_int_ofport,
238             self.patch_tun_ofport,
239             host,
240             self.enable_tunneling,
241             self.enable_distributed_routing)
242
243         self.agent_state = {
244             'binary': 'neutron-openvswitch-agent',
245             'host': host,
246             'topic': n_const.L2_AGENT_TOPIC,
247             'configurations': {'bridge_mappings': self.bridge_mappings,
248                                'tunnel_types': self.tunnel_types,
249                                'tunneling_ip': self.local_ip,
250                                'l2_population': self.l2_pop,
251                                'arp_responder_enabled':
252                                self.arp_responder_enabled,
253                                'enable_distributed_routing':
254                                self.enable_distributed_routing,
255                                'log_agent_heartbeats':
256                                agent_conf.log_agent_heartbeats,
257                                'extensions': self.ext_manager.names(),
258                                'datapath_type': ovs_conf.datapath_type,
259                                'ovs_capabilities': self.ovs.capabilities,
260                                'vhostuser_socket_dir':
261                                ovs_conf.vhostuser_socket_dir},
262             'agent_type': agent_conf.agent_type,
263             'start_flag': True}
264
265         report_interval = agent_conf.report_interval
266         if report_interval:
267             heartbeat = loopingcall.FixedIntervalLoopingCall(
268                 self._report_state)
269             heartbeat.start(interval=report_interval)
270
271         if self.enable_tunneling:
272             self.setup_tunnel_br_flows()
273
274         self.dvr_agent.setup_dvr_flows()
275
276         # Collect additional bridges to monitor
277         self.ancillary_brs = self.setup_ancillary_bridges(
278             ovs_conf.integration_bridge, ovs_conf.tunnel_bridge)
279
280         # In order to keep existed device's local vlan unchanged,
281         # restore local vlan mapping at start
282         self._restore_local_vlan_map()
283
284         # Security group agent support
285         self.sg_agent = sg_rpc.SecurityGroupAgentRpc(self.context,
286                 self.sg_plugin_rpc, self.local_vlan_map,
287                 defer_refresh_firewall=True)
288
289         # Initialize iteration counter
290         self.iter_num = 0
291         self.run_daemon_loop = True
292
293         self.catch_sigterm = False
294         self.catch_sighup = False
295
296         # The initialization is complete; we can start receiving messages
297         self.connection.consume_in_threads()
298
299         self.quitting_rpc_timeout = agent_conf.quitting_rpc_timeout
300
301     def _parse_bridge_mappings(self, bridge_mappings):
302         try:
303             return n_utils.parse_mappings(bridge_mappings)
304         except ValueError as e:
305             raise ValueError(_("Parsing bridge_mappings failed: %s.") % e)
306
307     def _report_state(self):
308         # How many devices are likely used by a VM
309         self.agent_state.get('configurations')['devices'] = (
310             self.int_br_device_count)
311         self.agent_state.get('configurations')['in_distributed_mode'] = (
312             self.dvr_agent.in_distributed_mode())
313
314         try:
315             agent_status = self.state_rpc.report_state(self.context,
316                                                        self.agent_state,
317                                                        True)
318             if agent_status == n_const.AGENT_REVIVED:
319                 LOG.info(_LI('Agent has just been revived. '
320                              'Doing a full sync.'))
321                 self.fullsync = True
322
323             if self.agent_state.pop('start_flag', None):
324                 # On initial start, we notify systemd after initialization
325                 # is complete.
326                 systemd.notify_once()
327         except Exception:
328             LOG.exception(_LE("Failed reporting state!"))
329
330     def _restore_local_vlan_map(self):
331         self._local_vlan_hints = {}
332         cur_ports = self.int_br.get_vif_ports()
333         port_names = [p.port_name for p in cur_ports]
334         port_info = self.int_br.get_ports_attributes(
335             "Port", columns=["name", "other_config", "tag"], ports=port_names)
336         by_name = {x['name']: x for x in port_info}
337         for port in cur_ports:
338             # if a port was deleted between get_vif_ports and
339             # get_ports_attributes, we
340             # will get a KeyError
341             try:
342                 local_vlan_map = by_name[port.port_name]['other_config']
343                 local_vlan = by_name[port.port_name]['tag']
344             except KeyError:
345                 continue
346             if not local_vlan:
347                 continue
348             net_uuid = local_vlan_map.get('net_uuid')
349             if (net_uuid and net_uuid not in self._local_vlan_hints
350                 and local_vlan != DEAD_VLAN_TAG):
351                 self.available_local_vlans.remove(local_vlan)
352                 self._local_vlan_hints[local_vlan_map['net_uuid']] = \
353                     local_vlan
354
355     def _dispose_local_vlan_hints(self):
356         self.available_local_vlans.update(self._local_vlan_hints.values())
357         self._local_vlan_hints = {}
358
359     def setup_rpc(self):
360         self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
361         self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
362         self.dvr_plugin_rpc = dvr_rpc.DVRServerRpcApi(topics.PLUGIN)
363         self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS)
364
365         # RPC network init
366         self.context = context.get_admin_context_without_session()
367         # Define the listening consumers for the agent
368         consumers = [[topics.PORT, topics.UPDATE],
369                      [topics.PORT, topics.DELETE],
370                      [constants.TUNNEL, topics.UPDATE],
371                      [constants.TUNNEL, topics.DELETE],
372                      [topics.SECURITY_GROUP, topics.UPDATE],
373                      [topics.DVR, topics.UPDATE],
374                      [topics.NETWORK, topics.UPDATE]]
375         if self.l2_pop:
376             consumers.append([topics.L2POPULATION, topics.UPDATE])
377         self.connection = agent_rpc.create_consumers([self],
378                                                      topics.AGENT,
379                                                      consumers,
380                                                      start_listening=False)
381
382     def init_extension_manager(self, connection):
383         ext_manager.register_opts(self.conf)
384         self.ext_manager = (
385             ext_manager.AgentExtensionsManager(self.conf))
386         self.ext_manager.initialize(
387             connection, constants.EXTENSION_DRIVER_TYPE)
388
389     def get_net_uuid(self, vif_id):
390         for network_id, vlan_mapping in six.iteritems(self.local_vlan_map):
391             if vif_id in vlan_mapping.vif_ports:
392                 return network_id
393
394     def port_update(self, context, **kwargs):
395         port = kwargs.get('port')
396         # Put the port identifier in the updated_ports set.
397         # Even if full port details might be provided to this call,
398         # they are not used since there is no guarantee the notifications
399         # are processed in the same order as the relevant API requests
400         self.updated_ports.add(port['id'])
401         LOG.debug("port_update message processed for port %s", port['id'])
402
403     def port_delete(self, context, **kwargs):
404         port_id = kwargs.get('port_id')
405         self.deleted_ports.add(port_id)
406         self.updated_ports.discard(port_id)
407         LOG.debug("port_delete message processed for port %s", port_id)
408
409     def network_update(self, context, **kwargs):
410         network_id = kwargs['network']['id']
411         for port_id in self.network_ports[network_id]:
412             # notifications could arrive out of order, if the port is deleted
413             # we don't want to update it anymore
414             if port_id not in self.deleted_ports:
415                 self.updated_ports.add(port_id)
416         LOG.debug("network_update message processed for network "
417                   "%(network_id)s, with ports: %(ports)s",
418                   {'network_id': network_id,
419                    'ports': self.network_ports[network_id]})
420
421     def _clean_network_ports(self, port_id):
422         for port_set in self.network_ports.values():
423             if port_id in port_set:
424                 port_set.remove(port_id)
425                 break
426
427     def process_deleted_ports(self, port_info):
428         # don't try to process removed ports as deleted ports since
429         # they are already gone
430         if 'removed' in port_info:
431             self.deleted_ports -= port_info['removed']
432         deleted_ports = list(self.deleted_ports)
433         while self.deleted_ports:
434             port_id = self.deleted_ports.pop()
435             port = self.int_br.get_vif_port_by_id(port_id)
436             self._clean_network_ports(port_id)
437             self.ext_manager.delete_port(self.context,
438                                          {"vif_port": port,
439                                           "port_id": port_id})
440             # move to dead VLAN so deleted ports no
441             # longer have access to the network
442             if port:
443                 # don't log errors since there is a chance someone will be
444                 # removing the port from the bridge at the same time
445                 self.port_dead(port, log_errors=False)
446             self.port_unbound(port_id)
447         # Flush firewall rules after ports are put on dead VLAN to be
448         # more secure
449         self.sg_agent.remove_devices_filter(deleted_ports)
450
451     def tunnel_update(self, context, **kwargs):
452         LOG.debug("tunnel_update received")
453         if not self.enable_tunneling:
454             return
455         tunnel_ip = kwargs.get('tunnel_ip')
456         tunnel_ip_hex = self.get_ip_in_hex(tunnel_ip)
457         if not tunnel_ip_hex:
458             return
459         tunnel_type = kwargs.get('tunnel_type')
460         if not tunnel_type:
461             LOG.error(_LE("No tunnel_type specified, cannot create tunnels"))
462             return
463         if tunnel_type not in self.tunnel_types:
464             LOG.error(_LE("tunnel_type %s not supported by agent"),
465                       tunnel_type)
466             return
467         if tunnel_ip == self.local_ip:
468             return
469         tun_name = '%s-%s' % (tunnel_type, tunnel_ip_hex)
470         if not self.l2_pop:
471             self._setup_tunnel_port(self.tun_br, tun_name, tunnel_ip,
472                                     tunnel_type)
473
474     def tunnel_delete(self, context, **kwargs):
475         LOG.debug("tunnel_delete received")
476         if not self.enable_tunneling:
477             return
478         tunnel_ip = kwargs.get('tunnel_ip')
479         if not tunnel_ip:
480             LOG.error(_LE("No tunnel_ip specified, cannot delete tunnels"))
481             return
482         tunnel_type = kwargs.get('tunnel_type')
483         if not tunnel_type:
484             LOG.error(_LE("No tunnel_type specified, cannot delete tunnels"))
485             return
486         if tunnel_type not in self.tunnel_types:
487             LOG.error(_LE("tunnel_type %s not supported by agent"),
488                       tunnel_type)
489             return
490         ofport = self.tun_br_ofports[tunnel_type].get(tunnel_ip)
491         self.cleanup_tunnel_port(self.tun_br, ofport, tunnel_type)
492
493     def _tunnel_port_lookup(self, network_type, remote_ip):
494         return self.tun_br_ofports[network_type].get(remote_ip)
495
496     def fdb_add(self, context, fdb_entries):
497         LOG.debug("fdb_add received")
498         for lvm, agent_ports in self.get_agent_ports(fdb_entries,
499                                                      self.local_vlan_map):
500             agent_ports.pop(self.local_ip, None)
501             if len(agent_ports):
502                 if not self.enable_distributed_routing:
503                     with self.tun_br.deferred() as deferred_br:
504                         self.fdb_add_tun(context, deferred_br, lvm,
505                                          agent_ports, self._tunnel_port_lookup)
506                 else:
507                     self.fdb_add_tun(context, self.tun_br, lvm,
508                                      agent_ports, self._tunnel_port_lookup)
509
510     def fdb_remove(self, context, fdb_entries):
511         LOG.debug("fdb_remove received")
512         for lvm, agent_ports in self.get_agent_ports(fdb_entries,
513                                                      self.local_vlan_map):
514             agent_ports.pop(self.local_ip, None)
515             if len(agent_ports):
516                 if not self.enable_distributed_routing:
517                     with self.tun_br.deferred() as deferred_br:
518                         self.fdb_remove_tun(context, deferred_br, lvm,
519                                             agent_ports,
520                                             self._tunnel_port_lookup)
521                 else:
522                     self.fdb_remove_tun(context, self.tun_br, lvm,
523                                         agent_ports, self._tunnel_port_lookup)
524
525     def add_fdb_flow(self, br, port_info, remote_ip, lvm, ofport):
526         if port_info == n_const.FLOODING_ENTRY:
527             lvm.tun_ofports.add(ofport)
528             br.install_flood_to_tun(lvm.vlan, lvm.segmentation_id,
529                                     lvm.tun_ofports)
530         else:
531             self.setup_entry_for_arp_reply(br, 'add', lvm.vlan,
532                                            port_info.mac_address,
533                                            port_info.ip_address)
534             br.install_unicast_to_tun(lvm.vlan,
535                                       lvm.segmentation_id,
536                                       ofport,
537                                       port_info.mac_address)
538
539     def del_fdb_flow(self, br, port_info, remote_ip, lvm, ofport):
540         if port_info == n_const.FLOODING_ENTRY:
541             if ofport not in lvm.tun_ofports:
542                 LOG.debug("attempt to remove a non-existent port %s", ofport)
543                 return
544             lvm.tun_ofports.remove(ofport)
545             if len(lvm.tun_ofports) > 0:
546                 br.install_flood_to_tun(lvm.vlan, lvm.segmentation_id,
547                                         lvm.tun_ofports)
548             else:
549                 # This local vlan doesn't require any more tunneling
550                 br.delete_flood_to_tun(lvm.vlan)
551         else:
552             self.setup_entry_for_arp_reply(br, 'remove', lvm.vlan,
553                                            port_info.mac_address,
554                                            port_info.ip_address)
555             br.delete_unicast_to_tun(lvm.vlan, port_info.mac_address)
556
557     def _fdb_chg_ip(self, context, fdb_entries):
558         LOG.debug("update chg_ip received")
559         with self.tun_br.deferred() as deferred_br:
560             self.fdb_chg_ip_tun(context, deferred_br, fdb_entries,
561                                 self.local_ip, self.local_vlan_map)
562
563     def setup_entry_for_arp_reply(self, br, action, local_vid, mac_address,
564                                   ip_address):
565         '''Set the ARP respond entry.
566
567         When the l2 population mechanism driver and OVS supports to edit ARP
568         fields, a table (ARP_RESPONDER) to resolve ARP locally is added to the
569         tunnel bridge.
570         '''
571         if not self.arp_responder_enabled:
572             return
573
574         ip = netaddr.IPAddress(ip_address)
575         if ip.version == 6:
576             return
577
578         ip = str(ip)
579         mac = str(netaddr.EUI(mac_address, dialect=_mac_mydialect))
580
581         if action == 'add':
582             br.install_arp_responder(local_vid, ip, mac)
583         elif action == 'remove':
584             br.delete_arp_responder(local_vid, ip)
585         else:
586             LOG.warning(_LW('Action %s not supported'), action)
587
588     def _local_vlan_for_flat(self, lvid, physical_network):
589         phys_br = self.phys_brs[physical_network]
590         phys_port = self.phys_ofports[physical_network]
591         int_br = self.int_br
592         int_port = self.int_ofports[physical_network]
593         phys_br.provision_local_vlan(port=phys_port, lvid=lvid,
594                                      segmentation_id=None,
595                                      distributed=False)
596         int_br.provision_local_vlan(port=int_port, lvid=lvid,
597                                     segmentation_id=None)
598
599     def _local_vlan_for_vlan(self, lvid, physical_network, segmentation_id):
600         distributed = self.enable_distributed_routing
601         phys_br = self.phys_brs[physical_network]
602         phys_port = self.phys_ofports[physical_network]
603         int_br = self.int_br
604         int_port = self.int_ofports[physical_network]
605         phys_br.provision_local_vlan(port=phys_port, lvid=lvid,
606                                      segmentation_id=segmentation_id,
607                                      distributed=distributed)
608         int_br.provision_local_vlan(port=int_port, lvid=lvid,
609                                     segmentation_id=segmentation_id)
610
611     def provision_local_vlan(self, net_uuid, network_type, physical_network,
612                              segmentation_id):
613         '''Provisions a local VLAN.
614
615         :param net_uuid: the uuid of the network associated with this vlan.
616         :param network_type: the network type ('gre', 'vxlan', 'vlan', 'flat',
617                                                'local', 'geneve')
618         :param physical_network: the physical network for 'vlan' or 'flat'
619         :param segmentation_id: the VID for 'vlan' or tunnel ID for 'tunnel'
620         '''
621
622         # On a restart or crash of OVS, the network associated with this VLAN
623         # will already be assigned, so check for that here before assigning a
624         # new one.
625         lvm = self.local_vlan_map.get(net_uuid)
626         if lvm:
627             lvid = lvm.vlan
628         else:
629             lvid = self._local_vlan_hints.pop(net_uuid, None)
630             if lvid is None:
631                 if not self.available_local_vlans:
632                     LOG.error(_LE("No local VLAN available for net-id=%s"),
633                               net_uuid)
634                     return
635                 lvid = self.available_local_vlans.pop()
636             self.local_vlan_map[net_uuid] = LocalVLANMapping(lvid,
637                                                              network_type,
638                                                              physical_network,
639                                                              segmentation_id)
640
641         LOG.info(_LI("Assigning %(vlan_id)s as local vlan for "
642                      "net-id=%(net_uuid)s"),
643                  {'vlan_id': lvid, 'net_uuid': net_uuid})
644
645         if network_type in constants.TUNNEL_NETWORK_TYPES:
646             if self.enable_tunneling:
647                 # outbound broadcast/multicast
648                 ofports = list(self.tun_br_ofports[network_type].values())
649                 if ofports:
650                     self.tun_br.install_flood_to_tun(lvid,
651                                                      segmentation_id,
652                                                      ofports)
653                 # inbound from tunnels: set lvid in the right table
654                 # and resubmit to Table LEARN_FROM_TUN for mac learning
655                 if self.enable_distributed_routing:
656                     self.dvr_agent.process_tunneled_network(
657                         network_type, lvid, segmentation_id)
658                 else:
659                     self.tun_br.provision_local_vlan(
660                         network_type=network_type, lvid=lvid,
661                         segmentation_id=segmentation_id)
662             else:
663                 LOG.error(_LE("Cannot provision %(network_type)s network for "
664                               "net-id=%(net_uuid)s - tunneling disabled"),
665                           {'network_type': network_type,
666                            'net_uuid': net_uuid})
667         elif network_type == p_const.TYPE_FLAT:
668             if physical_network in self.phys_brs:
669                 self._local_vlan_for_flat(lvid, physical_network)
670             else:
671                 LOG.error(_LE("Cannot provision flat network for "
672                               "net-id=%(net_uuid)s - no bridge for "
673                               "physical_network %(physical_network)s"),
674                           {'net_uuid': net_uuid,
675                            'physical_network': physical_network})
676         elif network_type == p_const.TYPE_VLAN:
677             if physical_network in self.phys_brs:
678                 self._local_vlan_for_vlan(lvid, physical_network,
679                                           segmentation_id)
680             else:
681                 LOG.error(_LE("Cannot provision VLAN network for "
682                               "net-id=%(net_uuid)s - no bridge for "
683                               "physical_network %(physical_network)s"),
684                           {'net_uuid': net_uuid,
685                            'physical_network': physical_network})
686         elif network_type == p_const.TYPE_LOCAL:
687             # no flows needed for local networks
688             pass
689         else:
690             LOG.error(_LE("Cannot provision unknown network type "
691                           "%(network_type)s for net-id=%(net_uuid)s"),
692                       {'network_type': network_type,
693                        'net_uuid': net_uuid})
694
695     def reclaim_local_vlan(self, net_uuid):
696         '''Reclaim a local VLAN.
697
698         :param net_uuid: the network uuid associated with this vlan.
699         '''
700         lvm = self.local_vlan_map.pop(net_uuid, None)
701         if lvm is None:
702             LOG.debug("Network %s not used on agent.", net_uuid)
703             return
704
705         LOG.info(_LI("Reclaiming vlan = %(vlan_id)s from "
706                      "net-id = %(net_uuid)s"),
707                  {'vlan_id': lvm.vlan, 'net_uuid': net_uuid})
708
709         if lvm.network_type in constants.TUNNEL_NETWORK_TYPES:
710             if self.enable_tunneling:
711                 self.tun_br.reclaim_local_vlan(
712                     network_type=lvm.network_type,
713                     segmentation_id=lvm.segmentation_id)
714                 self.tun_br.delete_flood_to_tun(lvm.vlan)
715                 self.tun_br.delete_unicast_to_tun(lvm.vlan, None)
716                 self.tun_br.delete_arp_responder(lvm.vlan, None)
717                 if self.l2_pop:
718                     # Try to remove tunnel ports if not used by other networks
719                     for ofport in lvm.tun_ofports:
720                         self.cleanup_tunnel_port(self.tun_br, ofport,
721                                                  lvm.network_type)
722         elif lvm.network_type == p_const.TYPE_FLAT:
723             if lvm.physical_network in self.phys_brs:
724                 # outbound
725                 br = self.phys_brs[lvm.physical_network]
726                 br.reclaim_local_vlan(
727                     port=self.phys_ofports[lvm.physical_network],
728                     lvid=lvm.vlan)
729                 # inbound
730                 br = self.int_br
731                 br.reclaim_local_vlan(
732                     port=self.int_ofports[lvm.physical_network],
733                     segmentation_id=None)
734         elif lvm.network_type == p_const.TYPE_VLAN:
735             if lvm.physical_network in self.phys_brs:
736                 # outbound
737                 br = self.phys_brs[lvm.physical_network]
738                 br.reclaim_local_vlan(
739                     port=self.phys_ofports[lvm.physical_network],
740                     lvid=lvm.vlan)
741                 # inbound
742                 br = self.int_br
743                 br.reclaim_local_vlan(
744                     port=self.int_ofports[lvm.physical_network],
745                     segmentation_id=lvm.segmentation_id)
746         elif lvm.network_type == p_const.TYPE_LOCAL:
747             # no flows needed for local networks
748             pass
749         else:
750             LOG.error(_LE("Cannot reclaim unknown network type "
751                           "%(network_type)s for net-id=%(net_uuid)s"),
752                       {'network_type': lvm.network_type,
753                        'net_uuid': net_uuid})
754
755         self.available_local_vlans.add(lvm.vlan)
756
757     def port_bound(self, port, net_uuid,
758                    network_type, physical_network,
759                    segmentation_id, fixed_ips, device_owner,
760                    ovs_restarted):
761         '''Bind port to net_uuid/lsw_id and install flow for inbound traffic
762         to vm.
763
764         :param port: an ovs_lib.VifPort object.
765         :param net_uuid: the net_uuid this port is to be associated with.
766         :param network_type: the network type ('gre', 'vlan', 'flat', 'local')
767         :param physical_network: the physical network for 'vlan' or 'flat'
768         :param segmentation_id: the VID for 'vlan' or tunnel ID for 'tunnel'
769         :param fixed_ips: the ip addresses assigned to this port
770         :param device_owner: the string indicative of owner of this port
771         :param ovs_restarted: indicates if this is called for an OVS restart.
772         '''
773         if net_uuid not in self.local_vlan_map or ovs_restarted:
774             self.provision_local_vlan(net_uuid, network_type,
775                                       physical_network, segmentation_id)
776         lvm = self.local_vlan_map[net_uuid]
777         lvm.vif_ports[port.vif_id] = port
778
779         self.dvr_agent.bind_port_to_dvr(port, lvm,
780                                         fixed_ips,
781                                         device_owner)
782         port_other_config = self.int_br.db_get_val("Port", port.port_name,
783                                                    "other_config")
784         if port_other_config is None:
785             if port.vif_id in self.deleted_ports:
786                 LOG.debug("Port %s deleted concurrently", port.vif_id)
787             elif port.vif_id in self.updated_ports:
788                 LOG.error(_LE("Expected port %s not found"), port.vif_id)
789             else:
790                 LOG.debug("Unable to get config for port %s", port.vif_id)
791             return False
792
793         vlan_mapping = {'net_uuid': net_uuid,
794                         'network_type': network_type,
795                         'physical_network': physical_network}
796         if segmentation_id is not None:
797             vlan_mapping['segmentation_id'] = segmentation_id
798         port_other_config.update(vlan_mapping)
799         self.int_br.set_db_attribute("Port", port.port_name, "other_config",
800                                      port_other_config)
801         return True
802
803     def _bind_devices(self, need_binding_ports):
804         devices_up = []
805         devices_down = []
806         port_names = [p['vif_port'].port_name for p in need_binding_ports]
807         port_info = self.int_br.get_ports_attributes(
808             "Port", columns=["name", "tag"], ports=port_names, if_exists=True)
809         tags_by_name = {x['name']: x['tag'] for x in port_info}
810         for port_detail in need_binding_ports:
811             lvm = self.local_vlan_map.get(port_detail['network_id'])
812             if not lvm:
813                 # network for port was deleted. skip this port since it
814                 # will need to be handled as a DEAD port in the next scan
815                 continue
816             port = port_detail['vif_port']
817             device = port_detail['device']
818             # Do not bind a port if it's already bound
819             cur_tag = tags_by_name.get(port.port_name)
820             if cur_tag is None:
821                 LOG.debug("Port %s was deleted concurrently, skipping it",
822                           port.port_name)
823                 continue
824             if cur_tag != lvm.vlan:
825                 self.int_br.delete_flows(in_port=port.ofport)
826             if self.prevent_arp_spoofing:
827                 self.setup_arp_spoofing_protection(self.int_br,
828                                                    port, port_detail)
829             if cur_tag != lvm.vlan:
830                 self.int_br.set_db_attribute(
831                     "Port", port.port_name, "tag", lvm.vlan)
832
833             # update plugin about port status
834             # FIXME(salv-orlando): Failures while updating device status
835             # must be handled appropriately. Otherwise this might prevent
836             # neutron server from sending network-vif-* events to the nova
837             # API server, thus possibly preventing instance spawn.
838             if port_detail.get('admin_state_up'):
839                 LOG.debug("Setting status for %s to UP", device)
840                 devices_up.append(device)
841             else:
842                 LOG.debug("Setting status for %s to DOWN", device)
843                 devices_down.append(device)
844         failed_devices = []
845         if devices_up or devices_down:
846             devices_set = self.plugin_rpc.update_device_list(
847                 self.context, devices_up, devices_down, self.agent_id,
848                 self.conf.host)
849             failed_devices = (devices_set.get('failed_devices_up') +
850                 devices_set.get('failed_devices_down'))
851         if failed_devices:
852             LOG.error(_LE("Configuration for devices %s failed!"),
853                       failed_devices)
854             #TODO(rossella_s) handle better the resync in next patches,
855             # this is just to preserve the current behavior
856             raise DeviceListRetrievalError(devices=failed_devices)
857         LOG.info(_LI("Configuration for devices up %(up)s and devices "
858                      "down %(down)s completed."),
859                  {'up': devices_up, 'down': devices_down})
860
861     @staticmethod
862     def setup_arp_spoofing_protection(bridge, vif, port_details):
863         if not port_details.get('port_security_enabled', True):
864             LOG.info(_LI("Skipping ARP spoofing rules for port '%s' because "
865                          "it has port security disabled"), vif.port_name)
866             bridge.delete_arp_spoofing_protection(port=vif.ofport)
867             return
868         if port_details['device_owner'].startswith(
869             n_const.DEVICE_OWNER_NETWORK_PREFIX):
870             LOG.debug("Skipping ARP spoofing rules for network owned port "
871                       "'%s'.", vif.port_name)
872             bridge.delete_arp_spoofing_protection(port=vif.ofport)
873             return
874         # clear any previous flows related to this port in our ARP table
875         bridge.delete_arp_spoofing_allow_rules(port=vif.ofport)
876         # collect all of the addresses and cidrs that belong to the port
877         addresses = {f['ip_address'] for f in port_details['fixed_ips']}
878         mac_addresses = {vif.vif_mac}
879         if port_details.get('allowed_address_pairs'):
880             addresses |= {p['ip_address']
881                           for p in port_details['allowed_address_pairs']}
882             mac_addresses |= {p['mac_address']
883                               for p in port_details['allowed_address_pairs']
884                               if p.get('mac_address')}
885
886         ipv6_addresses = {ip for ip in addresses
887                           if netaddr.IPNetwork(ip).version == 6}
888         # Allow neighbor advertisements for LLA address.
889         ipv6_addresses |= {str(ipv6.get_ipv6_addr_by_EUI64(
890                                n_const.IPV6_LLA_PREFIX, mac))
891                            for mac in mac_addresses}
892         if not has_zero_prefixlen_address(ipv6_addresses):
893             # Install protection only when prefix is not zero because a /0
894             # prefix allows any address anyway and the nd_target can only
895             # match on /1 or more.
896             bridge.install_icmpv6_na_spoofing_protection(port=vif.ofport,
897                 ip_addresses=ipv6_addresses)
898
899         ipv4_addresses = {ip for ip in addresses
900                           if netaddr.IPNetwork(ip).version == 4}
901         if not has_zero_prefixlen_address(ipv4_addresses):
902             # Install protection only when prefix is not zero because a /0
903             # prefix allows any address anyway and the ARP_SPA can only
904             # match on /1 or more.
905             bridge.install_arp_spoofing_protection(port=vif.ofport,
906                                                    ip_addresses=ipv4_addresses)
907         else:
908             bridge.delete_arp_spoofing_protection(port=vif.ofport)
909
910     def port_unbound(self, vif_id, net_uuid=None):
911         '''Unbind port.
912
913         Removes corresponding local vlan mapping object if this is its last
914         VIF.
915
916         :param vif_id: the id of the vif
917         :param net_uuid: the net_uuid this port is associated with.
918         '''
919         if net_uuid is None:
920             net_uuid = self.get_net_uuid(vif_id)
921
922         if not self.local_vlan_map.get(net_uuid):
923             LOG.info(_LI('port_unbound(): net_uuid %s not in local_vlan_map'),
924                      net_uuid)
925             return
926
927         lvm = self.local_vlan_map[net_uuid]
928
929         if vif_id in lvm.vif_ports:
930             vif_port = lvm.vif_ports[vif_id]
931             self.dvr_agent.unbind_port_from_dvr(vif_port, lvm)
932         lvm.vif_ports.pop(vif_id, None)
933
934         if not lvm.vif_ports:
935             self.reclaim_local_vlan(net_uuid)
936
937     def port_dead(self, port, log_errors=True):
938         '''Once a port has no binding, put it on the "dead vlan".
939
940         :param port: an ovs_lib.VifPort object.
941         '''
942         # Don't kill a port if it's already dead
943         cur_tag = self.int_br.db_get_val("Port", port.port_name, "tag",
944                                          log_errors=log_errors)
945         if cur_tag and cur_tag != DEAD_VLAN_TAG:
946             self.int_br.set_db_attribute("Port", port.port_name, "tag",
947                                          DEAD_VLAN_TAG, log_errors=log_errors)
948             self.int_br.drop_port(in_port=port.ofport)
949
950     def setup_integration_br(self):
951         '''Setup the integration bridge.
952
953         '''
954         self.int_br.set_agent_uuid_stamp(self.agent_uuid_stamp)
955         # Ensure the integration bridge is created.
956         # ovs_lib.OVSBridge.create() will run
957         #   ovs-vsctl -- --may-exist add-br BRIDGE_NAME
958         # which does nothing if bridge already exists.
959         self.int_br.create()
960         self.int_br.set_secure_mode()
961         self.int_br.setup_controllers(self.conf)
962
963         self.int_br.delete_port(self.conf.OVS.int_peer_patch_port)
964         if self.conf.AGENT.drop_flows_on_start:
965             self.int_br.delete_flows()
966         self.int_br.setup_default_table()
967
968     def setup_ancillary_bridges(self, integ_br, tun_br):
969         '''Setup ancillary bridges - for example br-ex.'''
970         ovs = ovs_lib.BaseOVS()
971         ovs_bridges = set(ovs.get_bridges())
972         # Remove all known bridges
973         ovs_bridges.remove(integ_br)
974         if self.enable_tunneling:
975             ovs_bridges.remove(tun_br)
976         br_names = [self.phys_brs[physical_network].br_name for
977                     physical_network in self.phys_brs]
978         ovs_bridges.difference_update(br_names)
979         # Filter list of bridges to those that have external
980         # bridge-id's configured
981         br_names = []
982         for bridge in ovs_bridges:
983             bridge_id = ovs.get_bridge_external_bridge_id(bridge)
984             if bridge_id != bridge:
985                 br_names.append(bridge)
986         ovs_bridges.difference_update(br_names)
987         ancillary_bridges = []
988         for bridge in ovs_bridges:
989             br = ovs_lib.OVSBridge(bridge)
990             LOG.info(_LI('Adding %s to list of bridges.'), bridge)
991             ancillary_bridges.append(br)
992         return ancillary_bridges
993
994     def setup_tunnel_br(self, tun_br_name=None):
995         '''(re)initialize the tunnel bridge.
996
997         Creates tunnel bridge, and links it to the integration bridge
998         using a patch port.
999
1000         :param tun_br_name: the name of the tunnel bridge.
1001         '''
1002         if not self.tun_br:
1003             self.tun_br = self.br_tun_cls(tun_br_name)
1004         self.tun_br.set_agent_uuid_stamp(self.agent_uuid_stamp)
1005
1006         # tun_br.create() won't recreate bridge if it exists, but will handle
1007         # cases where something like datapath_type has changed
1008         self.tun_br.create(secure_mode=True)
1009         self.tun_br.setup_controllers(self.conf)
1010         if (not self.int_br.port_exists(self.conf.OVS.int_peer_patch_port) or
1011                 self.patch_tun_ofport == ovs_lib.INVALID_OFPORT):
1012             self.patch_tun_ofport = self.int_br.add_patch_port(
1013                 self.conf.OVS.int_peer_patch_port,
1014                 self.conf.OVS.tun_peer_patch_port)
1015         if (not self.tun_br.port_exists(self.conf.OVS.tun_peer_patch_port) or
1016                 self.patch_int_ofport == ovs_lib.INVALID_OFPORT):
1017             self.patch_int_ofport = self.tun_br.add_patch_port(
1018                 self.conf.OVS.tun_peer_patch_port,
1019                 self.conf.OVS.int_peer_patch_port)
1020         if ovs_lib.INVALID_OFPORT in (self.patch_tun_ofport,
1021                                       self.patch_int_ofport):
1022             LOG.error(_LE("Failed to create OVS patch port. Cannot have "
1023                           "tunneling enabled on this agent, since this "
1024                           "version of OVS does not support tunnels or patch "
1025                           "ports. Agent terminated!"))
1026             exit(1)
1027         if self.conf.AGENT.drop_flows_on_start:
1028             self.tun_br.delete_flows()
1029
1030     def setup_tunnel_br_flows(self):
1031         '''Setup the tunnel bridge.
1032
1033         Add all flows to the tunnel bridge.
1034         '''
1035         self.tun_br.setup_default_table(self.patch_int_ofport,
1036                                         self.arp_responder_enabled)
1037
1038     def setup_physical_bridges(self, bridge_mappings):
1039         '''Setup the physical network bridges.
1040
1041         Creates physical network bridges and links them to the
1042         integration bridge using veths or patch ports.
1043
1044         :param bridge_mappings: map physical network names to bridge names.
1045         '''
1046         self.phys_brs = {}
1047         self.int_ofports = {}
1048         self.phys_ofports = {}
1049         ip_wrapper = ip_lib.IPWrapper()
1050         ovs = ovs_lib.BaseOVS()
1051         ovs_bridges = ovs.get_bridges()
1052         for physical_network, bridge in six.iteritems(bridge_mappings):
1053             LOG.info(_LI("Mapping physical network %(physical_network)s to "
1054                          "bridge %(bridge)s"),
1055                      {'physical_network': physical_network,
1056                       'bridge': bridge})
1057             # setup physical bridge
1058             if bridge not in ovs_bridges:
1059                 LOG.error(_LE("Bridge %(bridge)s for physical network "
1060                               "%(physical_network)s does not exist. Agent "
1061                               "terminated!"),
1062                           {'physical_network': physical_network,
1063                            'bridge': bridge})
1064                 sys.exit(1)
1065             br = self.br_phys_cls(bridge)
1066             # The bridge already exists, so create won't recreate it, but will
1067             # handle things like changing the datapath_type
1068             br.create()
1069             br.setup_controllers(self.conf)
1070             br.setup_default_table()
1071             self.phys_brs[physical_network] = br
1072
1073             # interconnect physical and integration bridges using veth/patches
1074             int_if_name = p_utils.get_interface_name(
1075                 bridge, prefix=constants.PEER_INTEGRATION_PREFIX)
1076             phys_if_name = p_utils.get_interface_name(
1077                 bridge, prefix=constants.PEER_PHYSICAL_PREFIX)
1078             # Interface type of port for physical and integration bridges must
1079             # be same, so check only one of them.
1080             int_type = self.int_br.db_get_val("Interface", int_if_name, "type")
1081             if self.use_veth_interconnection:
1082                 # Drop ports if the interface types doesn't match the
1083                 # configuration value.
1084                 if int_type == 'patch':
1085                     self.int_br.delete_port(int_if_name)
1086                     br.delete_port(phys_if_name)
1087                 device = ip_lib.IPDevice(int_if_name)
1088                 if device.exists():
1089                     device.link.delete()
1090                     # Give udev a chance to process its rules here, to avoid
1091                     # race conditions between commands launched by udev rules
1092                     # and the subsequent call to ip_wrapper.add_veth
1093                     utils.execute(['udevadm', 'settle', '--timeout=10'])
1094                 int_veth, phys_veth = ip_wrapper.add_veth(int_if_name,
1095                                                           phys_if_name)
1096                 int_ofport = self.int_br.add_port(int_veth)
1097                 phys_ofport = br.add_port(phys_veth)
1098             else:
1099                 # Drop ports if the interface type doesn't match the
1100                 # configuration value
1101                 if int_type == 'veth':
1102                     self.int_br.delete_port(int_if_name)
1103                     br.delete_port(phys_if_name)
1104                 # Create patch ports without associating them in order to block
1105                 # untranslated traffic before association
1106                 int_ofport = self.int_br.add_patch_port(
1107                     int_if_name, constants.NONEXISTENT_PEER)
1108                 phys_ofport = br.add_patch_port(
1109                     phys_if_name, constants.NONEXISTENT_PEER)
1110
1111             self.int_ofports[physical_network] = int_ofport
1112             self.phys_ofports[physical_network] = phys_ofport
1113
1114             # block all untranslated traffic between bridges
1115             self.int_br.drop_port(in_port=int_ofport)
1116             br.drop_port(in_port=phys_ofport)
1117
1118             if self.use_veth_interconnection:
1119                 # enable veth to pass traffic
1120                 int_veth.link.set_up()
1121                 phys_veth.link.set_up()
1122                 if self.veth_mtu:
1123                     # set up mtu size for veth interfaces
1124                     int_veth.link.set_mtu(self.veth_mtu)
1125                     phys_veth.link.set_mtu(self.veth_mtu)
1126             else:
1127                 # associate patch ports to pass traffic
1128                 self.int_br.set_db_attribute('Interface', int_if_name,
1129                                              'options:peer', phys_if_name)
1130                 br.set_db_attribute('Interface', phys_if_name,
1131                                     'options:peer', int_if_name)
1132
1133     def update_stale_ofport_rules(self):
1134         # right now the ARP spoofing rules are the only thing that utilizes
1135         # ofport-based rules, so make arp_spoofing protection a conditional
1136         # until something else uses ofport
1137         if not self.prevent_arp_spoofing:
1138             return []
1139         previous = self.vifname_to_ofport_map
1140         current = self.int_br.get_vif_port_to_ofport_map()
1141
1142         # if any ofport numbers have changed, re-process the devices as
1143         # added ports so any rules based on ofport numbers are updated.
1144         moved_ports = self._get_ofport_moves(current, previous)
1145
1146         # delete any stale rules based on removed ofports
1147         ofports_deleted = set(previous.values()) - set(current.values())
1148         for ofport in ofports_deleted:
1149             self.int_br.delete_arp_spoofing_protection(port=ofport)
1150
1151         # store map for next iteration
1152         self.vifname_to_ofport_map = current
1153         return moved_ports
1154
1155     @staticmethod
1156     def _get_ofport_moves(current, previous):
1157         """Returns a list of moved ports.
1158
1159         Takes two port->ofport maps and returns a list ports that moved to a
1160         different ofport. Deleted ports are not included.
1161         """
1162         port_moves = []
1163         for name, ofport in previous.items():
1164             if name not in current:
1165                 continue
1166             current_ofport = current[name]
1167             if ofport != current_ofport:
1168                 port_moves.append(name)
1169         return port_moves
1170
1171     def _get_port_info(self, registered_ports, cur_ports,
1172                        readd_registered_ports):
1173         port_info = {'current': cur_ports}
1174         # FIXME(salv-orlando): It's not really necessary to return early
1175         # if nothing has changed.
1176         if not readd_registered_ports and cur_ports == registered_ports:
1177             return port_info
1178
1179         if readd_registered_ports:
1180             port_info['added'] = cur_ports
1181         else:
1182             port_info['added'] = cur_ports - registered_ports
1183         # Update port_info with ports not found on the integration bridge
1184         port_info['removed'] = registered_ports - cur_ports
1185         return port_info
1186
1187     def process_ports_events(self, events, registered_ports, ancillary_ports,
1188                              old_ports_not_ready, updated_ports=None):
1189         port_info = {}
1190         port_info['added'] = set()
1191         port_info['removed'] = set()
1192         port_info['current'] = registered_ports
1193
1194         ancillary_port_info = {}
1195         ancillary_port_info['added'] = set()
1196         ancillary_port_info['removed'] = set()
1197         ancillary_port_info['current'] = ancillary_ports
1198         ports_not_ready_yet = set()
1199
1200         # if a port was added and then removed or viceversa since the agent
1201         # can't know the order of the operations, check the status of the port
1202         # to determine if the port was added or deleted
1203         ports_removed_and_added = [
1204             p for p in events['added'] if p in events['removed']]
1205         for p in ports_removed_and_added:
1206             if ovs_lib.BaseOVS().port_exists(p['name']):
1207                 events['removed'].remove(p)
1208             else:
1209                 events['added'].remove(p)
1210
1211         #TODO(rossella_s): scanning the ancillary bridge won't be needed
1212         # anymore when https://review.openstack.org/#/c/203381 since the bridge
1213         # id stored in external_ids will be used to identify the bridge the
1214         # port belongs to
1215         cur_ancillary_ports = set()
1216         for bridge in self.ancillary_brs:
1217             cur_ancillary_ports |= bridge.get_vif_port_set()
1218         cur_ancillary_ports |= ancillary_port_info['current']
1219
1220         def _process_port(port, ports, ancillary_ports):
1221             # check 'iface-id' is set otherwise is not a port
1222             # the agent should care about
1223             if 'attached-mac' in port.get('external_ids', []):
1224                 iface_id = self.int_br.portid_from_external_ids(
1225                     port['external_ids'])
1226                 if iface_id:
1227                     if port['ofport'] == ovs_lib.UNASSIGNED_OFPORT:
1228                         LOG.debug("Port %s not ready yet on the bridge",
1229                                   iface_id)
1230                         ports_not_ready_yet.add(port['name'])
1231                         return
1232                     # check if port belongs to ancillary bridge
1233                     if iface_id in cur_ancillary_ports:
1234                         ancillary_ports.add(iface_id)
1235                     else:
1236                         ports.add(iface_id)
1237         if old_ports_not_ready:
1238             old_ports_not_ready_attrs = self.int_br.get_ports_attributes(
1239                 'Interface', columns=['name', 'external_ids', 'ofport'],
1240                 ports=old_ports_not_ready, if_exists=True)
1241             now_ready_ports = set(
1242                 [p['name'] for p in old_ports_not_ready_attrs])
1243             LOG.debug("Ports %s are now ready", now_ready_ports)
1244             old_ports_not_ready_yet = old_ports_not_ready - now_ready_ports
1245             removed_ports = set([p['name'] for p in events['removed']])
1246             old_ports_not_ready_yet -= removed_ports
1247             LOG.debug("Ports %s were not ready at last iteration and are not "
1248                       "ready yet", old_ports_not_ready_yet)
1249             ports_not_ready_yet |= old_ports_not_ready_yet
1250             events['added'].extend(old_ports_not_ready_attrs)
1251
1252         for port in events['added']:
1253             _process_port(port, port_info['added'],
1254                           ancillary_port_info['added'])
1255         for port in events['removed']:
1256             _process_port(port, port_info['removed'],
1257                           ancillary_port_info['removed'])
1258
1259         if updated_ports is None:
1260             updated_ports = set()
1261         updated_ports.update(self.check_changed_vlans())
1262
1263         # Disregard devices that were never noticed by the agent
1264         port_info['removed'] &= port_info['current']
1265         port_info['current'] |= port_info['added']
1266         port_info['current'] -= port_info['removed']
1267
1268         ancillary_port_info['removed'] &= ancillary_port_info['current']
1269         ancillary_port_info['current'] |= ancillary_port_info['added']
1270         ancillary_port_info['current'] -= ancillary_port_info['removed']
1271
1272         if updated_ports:
1273             # Some updated ports might have been removed in the
1274             # meanwhile, and therefore should not be processed.
1275             # In this case the updated port won't be found among
1276             # current ports.
1277             updated_ports &= port_info['current']
1278             port_info['updated'] = updated_ports
1279         return port_info, ancillary_port_info, ports_not_ready_yet
1280
1281     def scan_ports(self, registered_ports, sync, updated_ports=None):
1282         cur_ports = self.int_br.get_vif_port_set()
1283         self.int_br_device_count = len(cur_ports)
1284         port_info = self._get_port_info(registered_ports, cur_ports, sync)
1285         if updated_ports is None:
1286             updated_ports = set()
1287         updated_ports.update(self.check_changed_vlans())
1288         if updated_ports:
1289             # Some updated ports might have been removed in the
1290             # meanwhile, and therefore should not be processed.
1291             # In this case the updated port won't be found among
1292             # current ports.
1293             updated_ports &= cur_ports
1294             if updated_ports:
1295                 port_info['updated'] = updated_ports
1296         return port_info
1297
1298     def scan_ancillary_ports(self, registered_ports, sync):
1299         cur_ports = set()
1300         for bridge in self.ancillary_brs:
1301             cur_ports |= bridge.get_vif_port_set()
1302         return self._get_port_info(registered_ports, cur_ports, sync)
1303
1304     def check_changed_vlans(self):
1305         """Return ports which have lost their vlan tag.
1306
1307         The returned value is a set of port ids of the ports concerned by a
1308         vlan tag loss.
1309         """
1310         port_tags = self.int_br.get_port_tag_dict()
1311         changed_ports = set()
1312         for lvm in self.local_vlan_map.values():
1313             for port in lvm.vif_ports.values():
1314                 if (
1315                     port.port_name in port_tags
1316                     and port_tags[port.port_name] != lvm.vlan
1317                 ):
1318                     LOG.info(
1319                         _LI("Port '%(port_name)s' has lost "
1320                             "its vlan tag '%(vlan_tag)d'!"),
1321                         {'port_name': port.port_name,
1322                          'vlan_tag': lvm.vlan}
1323                     )
1324                     changed_ports.add(port.vif_id)
1325         return changed_ports
1326
1327     def treat_vif_port(self, vif_port, port_id, network_id, network_type,
1328                        physical_network, segmentation_id, admin_state_up,
1329                        fixed_ips, device_owner, ovs_restarted):
1330         # When this function is called for a port, the port should have
1331         # an OVS ofport configured, as only these ports were considered
1332         # for being treated. If that does not happen, it is a potential
1333         # error condition of which operators should be aware
1334         port_needs_binding = True
1335         if not vif_port.ofport:
1336             LOG.warn(_LW("VIF port: %s has no ofport configured, "
1337                          "and might not be able to transmit"), vif_port.vif_id)
1338         if vif_port:
1339             if admin_state_up:
1340                 port_needs_binding = self.port_bound(
1341                     vif_port, network_id, network_type,
1342                     physical_network, segmentation_id,
1343                     fixed_ips, device_owner, ovs_restarted)
1344             else:
1345                 LOG.info(_LI("VIF port: %s admin state up disabled, "
1346                              "putting on the dead VLAN"), vif_port.vif_id)
1347
1348                 self.port_dead(vif_port)
1349                 port_needs_binding = False
1350         else:
1351             LOG.debug("No VIF port for port %s defined on agent.", port_id)
1352         return port_needs_binding
1353
1354     def _setup_tunnel_port(self, br, port_name, remote_ip, tunnel_type):
1355         ofport = br.add_tunnel_port(port_name,
1356                                     remote_ip,
1357                                     self.local_ip,
1358                                     tunnel_type,
1359                                     self.vxlan_udp_port,
1360                                     self.dont_fragment,
1361                                     self.tunnel_csum)
1362         if ofport == ovs_lib.INVALID_OFPORT:
1363             LOG.error(_LE("Failed to set-up %(type)s tunnel port to %(ip)s"),
1364                       {'type': tunnel_type, 'ip': remote_ip})
1365             return 0
1366
1367         self.tun_br_ofports[tunnel_type][remote_ip] = ofport
1368         # Add flow in default table to resubmit to the right
1369         # tunneling table (lvid will be set in the latter)
1370         br.setup_tunnel_port(tunnel_type, ofport)
1371
1372         ofports = self.tun_br_ofports[tunnel_type].values()
1373         if ofports and not self.l2_pop:
1374             # Update flooding flows to include the new tunnel
1375             for vlan_mapping in list(self.local_vlan_map.values()):
1376                 if vlan_mapping.network_type == tunnel_type:
1377                     br.install_flood_to_tun(vlan_mapping.vlan,
1378                                             vlan_mapping.segmentation_id,
1379                                             ofports)
1380         return ofport
1381
1382     def setup_tunnel_port(self, br, remote_ip, network_type):
1383         remote_ip_hex = self.get_ip_in_hex(remote_ip)
1384         if not remote_ip_hex:
1385             return 0
1386         port_name = '%s-%s' % (network_type, remote_ip_hex)
1387         ofport = self._setup_tunnel_port(br,
1388                                          port_name,
1389                                          remote_ip,
1390                                          network_type)
1391         return ofport
1392
1393     def cleanup_tunnel_port(self, br, tun_ofport, tunnel_type):
1394         # Check if this tunnel port is still used
1395         for lvm in self.local_vlan_map.values():
1396             if tun_ofport in lvm.tun_ofports:
1397                 break
1398         # If not, remove it
1399         else:
1400             items = list(self.tun_br_ofports[tunnel_type].items())
1401             for remote_ip, ofport in items:
1402                 if ofport == tun_ofport:
1403                     port_name = '%s-%s' % (tunnel_type,
1404                                            self.get_ip_in_hex(remote_ip))
1405                     br.delete_port(port_name)
1406                     br.cleanup_tunnel_port(ofport)
1407                     self.tun_br_ofports[tunnel_type].pop(remote_ip, None)
1408
1409     def treat_devices_added_or_updated(self, devices, ovs_restarted):
1410         skipped_devices = []
1411         need_binding_devices = []
1412         security_disabled_devices = []
1413         devices_details_list = (
1414             self.plugin_rpc.get_devices_details_list_and_failed_devices(
1415                 self.context,
1416                 devices,
1417                 self.agent_id,
1418                 self.conf.host))
1419         if devices_details_list.get('failed_devices'):
1420             #TODO(rossella_s) handle better the resync in next patches,
1421             # this is just to preserve the current behavior
1422             raise DeviceListRetrievalError(devices=devices)
1423
1424         devices = devices_details_list.get('devices')
1425         vif_by_id = self.int_br.get_vifs_by_ids(
1426             [vif['device'] for vif in devices])
1427         for details in devices:
1428             device = details['device']
1429             LOG.debug("Processing port: %s", device)
1430             port = vif_by_id.get(device)
1431             if not port:
1432                 # The port disappeared and cannot be processed
1433                 LOG.info(_LI("Port %s was not found on the integration bridge "
1434                              "and will therefore not be processed"), device)
1435                 skipped_devices.append(device)
1436                 continue
1437
1438             if 'port_id' in details:
1439                 LOG.info(_LI("Port %(device)s updated. Details: %(details)s"),
1440                          {'device': device, 'details': details})
1441                 details['vif_port'] = port
1442                 need_binding = self.treat_vif_port(port, details['port_id'],
1443                                                    details['network_id'],
1444                                                    details['network_type'],
1445                                                    details['physical_network'],
1446                                                    details['segmentation_id'],
1447                                                    details['admin_state_up'],
1448                                                    details['fixed_ips'],
1449                                                    details['device_owner'],
1450                                                    ovs_restarted)
1451                 if need_binding:
1452                     need_binding_devices.append(details)
1453
1454                 port_security = details['port_security_enabled']
1455                 has_sgs = 'security_groups' in details
1456                 if not port_security or not has_sgs:
1457                     security_disabled_devices.append(device)
1458                 self._update_port_network(details['port_id'],
1459                                           details['network_id'])
1460                 self.ext_manager.handle_port(self.context, details)
1461             else:
1462                 LOG.warn(_LW("Device %s not defined on plugin"), device)
1463                 if (port and port.ofport != -1):
1464                     self.port_dead(port)
1465         return skipped_devices, need_binding_devices, security_disabled_devices
1466
1467     def _update_port_network(self, port_id, network_id):
1468         self._clean_network_ports(port_id)
1469         self.network_ports[network_id].add(port_id)
1470
1471     def treat_ancillary_devices_added(self, devices):
1472         devices_details_list = (
1473             self.plugin_rpc.get_devices_details_list_and_failed_devices(
1474                 self.context,
1475                 devices,
1476                 self.agent_id,
1477                 self.conf.host))
1478         if devices_details_list.get('failed_devices'):
1479             #TODO(rossella_s) handle better the resync in next patches,
1480             # this is just to preserve the current behavior
1481             raise DeviceListRetrievalError(devices=devices)
1482         devices_added = [
1483             d['device'] for d in devices_details_list.get('devices')]
1484         LOG.info(_LI("Ancillary Ports %s added"), devices_added)
1485
1486         # update plugin about port status
1487         devices_set_up = (
1488             self.plugin_rpc.update_device_list(self.context,
1489                                                devices_added,
1490                                                [],
1491                                                self.agent_id,
1492                                                self.conf.host))
1493         if devices_set_up.get('failed_devices_up'):
1494             #TODO(rossella_s) handle better the resync in next patches,
1495             # this is just to preserve the current behavior
1496             raise DeviceListRetrievalError()
1497
1498     def treat_devices_removed(self, devices):
1499         resync = False
1500         self.sg_agent.remove_devices_filter(devices)
1501         LOG.info(_LI("Ports %s removed"), devices)
1502         devices_down = self.plugin_rpc.update_device_list(self.context,
1503                                                           [],
1504                                                           devices,
1505                                                           self.agent_id,
1506                                                           self.conf.host)
1507         failed_devices = devices_down.get('failed_devices_down')
1508         if failed_devices:
1509             LOG.debug("Port removal failed for %(devices)s ", failed_devices)
1510             resync = True
1511         for device in devices:
1512             self.port_unbound(device)
1513         return resync
1514
1515     def treat_ancillary_devices_removed(self, devices):
1516         resync = False
1517         LOG.info(_LI("Ancillary ports %s removed"), devices)
1518         devices_down = self.plugin_rpc.update_device_list(self.context,
1519                                                           [],
1520                                                           devices,
1521                                                           self.agent_id,
1522                                                           self.conf.host)
1523         failed_devices = devices_down.get('failed_devices_down')
1524         if failed_devices:
1525             LOG.debug("Port removal failed for %(devices)s ", failed_devices)
1526             resync = True
1527         for detail in devices_down.get('devices_down'):
1528             if detail['exists']:
1529                 LOG.info(_LI("Port %s updated."), detail['device'])
1530                 # Nothing to do regarding local networking
1531             else:
1532                 LOG.debug("Device %s not defined on plugin", detail['device'])
1533         return resync
1534
1535     def process_network_ports(self, port_info, ovs_restarted):
1536         resync_a = False
1537         resync_b = False
1538         # TODO(salv-orlando): consider a solution for ensuring notifications
1539         # are processed exactly in the same order in which they were
1540         # received. This is tricky because there are two notification
1541         # sources: the neutron server, and the ovs db monitor process
1542         # If there is an exception while processing security groups ports
1543         # will not be wired anyway, and a resync will be triggered
1544         # VIF wiring needs to be performed always for 'new' devices.
1545         # For updated ports, re-wiring is not needed in most cases, but needs
1546         # to be performed anyway when the admin state of a device is changed.
1547         # A device might be both in the 'added' and 'updated'
1548         # list at the same time; avoid processing it twice.
1549         devices_added_updated = (port_info.get('added', set()) |
1550                                  port_info.get('updated', set()))
1551         need_binding_devices = []
1552         security_disabled_ports = []
1553         if devices_added_updated:
1554             start = time.time()
1555             try:
1556                 (skipped_devices, need_binding_devices,
1557                     security_disabled_ports) = (
1558                     self.treat_devices_added_or_updated(
1559                         devices_added_updated, ovs_restarted))
1560                 LOG.debug("process_network_ports - iteration:%(iter_num)d - "
1561                           "treat_devices_added_or_updated completed. "
1562                           "Skipped %(num_skipped)d devices of "
1563                           "%(num_current)d devices currently available. "
1564                           "Time elapsed: %(elapsed).3f",
1565                           {'iter_num': self.iter_num,
1566                            'num_skipped': len(skipped_devices),
1567                            'num_current': len(port_info['current']),
1568                            'elapsed': time.time() - start})
1569                 # Update the list of current ports storing only those which
1570                 # have been actually processed.
1571                 port_info['current'] = (port_info['current'] -
1572                                         set(skipped_devices))
1573             except DeviceListRetrievalError:
1574                 # Need to resync as there was an error with server
1575                 # communication.
1576                 LOG.exception(_LE("process_network_ports - iteration:%d - "
1577                                   "failure while retrieving port details "
1578                                   "from server"), self.iter_num)
1579                 resync_a = True
1580
1581         # TODO(salv-orlando): Optimize avoiding applying filters
1582         # unnecessarily, (eg: when there are no IP address changes)
1583         added_ports = port_info.get('added', set())
1584         if security_disabled_ports:
1585             added_ports -= set(security_disabled_ports)
1586         self.sg_agent.setup_port_filters(added_ports,
1587                                          port_info.get('updated', set()))
1588         self._bind_devices(need_binding_devices)
1589
1590         if 'removed' in port_info and port_info['removed']:
1591             start = time.time()
1592             resync_b = self.treat_devices_removed(port_info['removed'])
1593             LOG.debug("process_network_ports - iteration:%(iter_num)d - "
1594                       "treat_devices_removed completed in %(elapsed).3f",
1595                       {'iter_num': self.iter_num,
1596                        'elapsed': time.time() - start})
1597         # If one of the above operations fails => resync with plugin
1598         return (resync_a | resync_b)
1599
1600     def process_ancillary_network_ports(self, port_info):
1601         resync_a = False
1602         resync_b = False
1603         if 'added' in port_info and port_info['added']:
1604             start = time.time()
1605             try:
1606                 self.treat_ancillary_devices_added(port_info['added'])
1607                 LOG.debug("process_ancillary_network_ports - iteration: "
1608                           "%(iter_num)d - treat_ancillary_devices_added "
1609                           "completed in %(elapsed).3f",
1610                           {'iter_num': self.iter_num,
1611                            'elapsed': time.time() - start})
1612             except DeviceListRetrievalError:
1613                 # Need to resync as there was an error with server
1614                 # communication.
1615                 LOG.exception(_LE("process_ancillary_network_ports - "
1616                                   "iteration:%d - failure while retrieving "
1617                                   "port details from server"), self.iter_num)
1618                 resync_a = True
1619         if 'removed' in port_info and port_info['removed']:
1620             start = time.time()
1621             resync_b = self.treat_ancillary_devices_removed(
1622                 port_info['removed'])
1623             LOG.debug("process_ancillary_network_ports - iteration: "
1624                       "%(iter_num)d - treat_ancillary_devices_removed "
1625                       "completed in %(elapsed).3f",
1626                       {'iter_num': self.iter_num,
1627                        'elapsed': time.time() - start})
1628
1629         # If one of the above operations fails => resync with plugin
1630         return (resync_a | resync_b)
1631
1632     def get_ip_in_hex(self, ip_address):
1633         try:
1634             return '%08x' % netaddr.IPAddress(ip_address, version=4)
1635         except Exception:
1636             LOG.warn(_LW("Invalid remote IP: %s"), ip_address)
1637             return
1638
1639     def tunnel_sync(self):
1640         try:
1641             for tunnel_type in self.tunnel_types:
1642                 details = self.plugin_rpc.tunnel_sync(self.context,
1643                                                       self.local_ip,
1644                                                       tunnel_type,
1645                                                       self.conf.host)
1646                 if not self.l2_pop:
1647                     tunnels = details['tunnels']
1648                     for tunnel in tunnels:
1649                         if self.local_ip != tunnel['ip_address']:
1650                             remote_ip = tunnel['ip_address']
1651                             remote_ip_hex = self.get_ip_in_hex(remote_ip)
1652                             if not remote_ip_hex:
1653                                 continue
1654                             tun_name = '%s-%s' % (tunnel_type, remote_ip_hex)
1655                             self._setup_tunnel_port(self.tun_br,
1656                                                     tun_name,
1657                                                     tunnel['ip_address'],
1658                                                     tunnel_type)
1659         except Exception as e:
1660             LOG.debug("Unable to sync tunnel IP %(local_ip)s: %(e)s",
1661                       {'local_ip': self.local_ip, 'e': e})
1662             return True
1663         return False
1664
1665     def _agent_has_updates(self, polling_manager):
1666         return (polling_manager.is_polling_required or
1667                 self.updated_ports or
1668                 self.deleted_ports or
1669                 self.sg_agent.firewall_refresh_needed())
1670
1671     def _port_info_has_changes(self, port_info):
1672         return (port_info.get('added') or
1673                 port_info.get('removed') or
1674                 port_info.get('updated'))
1675
1676     def check_ovs_status(self):
1677         # Check for the canary flow
1678         status = self.int_br.check_canary_table()
1679         if status == constants.OVS_RESTARTED:
1680             LOG.warn(_LW("OVS is restarted. OVSNeutronAgent will reset "
1681                          "bridges and recover ports."))
1682         elif status == constants.OVS_DEAD:
1683             LOG.warn(_LW("OVS is dead. OVSNeutronAgent will keep running "
1684                          "and checking OVS status periodically."))
1685         return status
1686
1687     def loop_count_and_wait(self, start_time, port_stats):
1688         # sleep till end of polling interval
1689         elapsed = time.time() - start_time
1690         LOG.debug("Agent rpc_loop - iteration:%(iter_num)d "
1691                   "completed. Processed ports statistics: "
1692                   "%(port_stats)s. Elapsed:%(elapsed).3f",
1693                   {'iter_num': self.iter_num,
1694                    'port_stats': port_stats,
1695                    'elapsed': elapsed})
1696         if elapsed < self.polling_interval:
1697             time.sleep(self.polling_interval - elapsed)
1698         else:
1699             LOG.debug("Loop iteration exceeded interval "
1700                       "(%(polling_interval)s vs. %(elapsed)s)!",
1701                       {'polling_interval': self.polling_interval,
1702                        'elapsed': elapsed})
1703         self.iter_num = self.iter_num + 1
1704
1705     def get_port_stats(self, port_info, ancillary_port_info):
1706         port_stats = {
1707             'regular': {
1708                 'added': len(port_info.get('added', [])),
1709                 'updated': len(port_info.get('updated', [])),
1710                 'removed': len(port_info.get('removed', []))}}
1711         if self.ancillary_brs:
1712             port_stats['ancillary'] = {
1713                 'added': len(ancillary_port_info.get('added', [])),
1714                 'removed': len(ancillary_port_info.get('removed', []))}
1715         return port_stats
1716
1717     def cleanup_stale_flows(self):
1718         bridges = [self.int_br]
1719         if self.enable_tunneling:
1720             bridges.append(self.tun_br)
1721         for bridge in bridges:
1722             LOG.info(_LI("Cleaning stale %s flows"), bridge.br_name)
1723             bridge.cleanup_flows()
1724
1725     def process_port_info(self, start, polling_manager, sync, ovs_restarted,
1726                        ports, ancillary_ports, updated_ports_copy,
1727                        consecutive_resyncs, ports_not_ready_yet):
1728         # There are polling managers that don't have get_events, e.g.
1729         # AlwaysPoll used by windows implementations
1730         # REVISIT (rossella_s) This needs to be reworked to hide implementation
1731         # details regarding polling in BasePollingManager subclasses
1732         if sync or not (hasattr(polling_manager, 'get_events')):
1733             if sync:
1734                 LOG.info(_LI("Agent out of sync with plugin!"))
1735                 consecutive_resyncs = consecutive_resyncs + 1
1736                 if (consecutive_resyncs >=
1737                         constants.MAX_DEVICE_RETRIES):
1738                     LOG.warn(_LW(
1739                         "Clearing cache of registered ports,"
1740                         " retries to resync were > %s"),
1741                              constants.MAX_DEVICE_RETRIES)
1742                     ports.clear()
1743                     ancillary_ports.clear()
1744                     consecutive_resyncs = 0
1745             else:
1746                 consecutive_resyncs = 0
1747
1748             # NOTE(rossella_s) don't empty the queue of events
1749             # calling polling_manager.get_events() since
1750             # the agent might miss some event (for example a port
1751             # deletion)
1752             reg_ports = (set() if ovs_restarted else ports)
1753             port_info = self.scan_ports(reg_ports, sync,
1754                                         updated_ports_copy)
1755             # Treat ancillary devices if they exist
1756             if self.ancillary_brs:
1757                 ancillary_port_info = self.scan_ancillary_ports(
1758                     ancillary_ports, sync)
1759                 LOG.debug("Agent rpc_loop - iteration:%(iter_num)d"
1760                           " - ancillary port info retrieved. "
1761                           "Elapsed:%(elapsed).3f",
1762                           {'iter_num': self.iter_num,
1763                            'elapsed': time.time() - start})
1764             else:
1765                 ancillary_port_info = {}
1766
1767         else:
1768             consecutive_resyncs = 0
1769             events = polling_manager.get_events()
1770             port_info, ancillary_port_info, ports_not_ready_yet = (
1771                 self.process_ports_events(events, ports, ancillary_ports,
1772                                           ports_not_ready_yet,
1773                                           updated_ports_copy))
1774         return (port_info, ancillary_port_info, consecutive_resyncs,
1775                 ports_not_ready_yet)
1776
1777     def rpc_loop(self, polling_manager=None):
1778         if not polling_manager:
1779             polling_manager = polling.get_polling_manager(
1780                 minimize_polling=False)
1781
1782         sync = False
1783         ports = set()
1784         updated_ports_copy = set()
1785         ancillary_ports = set()
1786         tunnel_sync = True
1787         ovs_restarted = False
1788         consecutive_resyncs = 0
1789         need_clean_stale_flow = True
1790         ports_not_ready_yet = set()
1791         while self._check_and_handle_signal():
1792             if self.fullsync:
1793                 LOG.info(_LI("rpc_loop doing a full sync."))
1794                 sync = True
1795                 self.fullsync = False
1796             port_info = {}
1797             ancillary_port_info = {}
1798             start = time.time()
1799             LOG.debug("Agent rpc_loop - iteration:%d started",
1800                       self.iter_num)
1801             ovs_status = self.check_ovs_status()
1802             if ovs_status == constants.OVS_RESTARTED:
1803                 self.setup_integration_br()
1804                 self.setup_physical_bridges(self.bridge_mappings)
1805                 if self.enable_tunneling:
1806                     self.setup_tunnel_br()
1807                     self.setup_tunnel_br_flows()
1808                     tunnel_sync = True
1809                 if self.enable_distributed_routing:
1810                     self.dvr_agent.reset_ovs_parameters(self.int_br,
1811                                                  self.tun_br,
1812                                                  self.patch_int_ofport,
1813                                                  self.patch_tun_ofport)
1814                     self.dvr_agent.reset_dvr_parameters()
1815                     self.dvr_agent.setup_dvr_flows()
1816                 # restart the polling manager so that it will signal as added
1817                 # all the current ports
1818                 # REVISIT (rossella_s) Define a method "reset" in
1819                 # BasePollingManager that will be implemented by AlwaysPoll as
1820                 # no action and by InterfacePollingMinimizer as start/stop
1821                 if isinstance(
1822                     polling_manager, linux_polling.InterfacePollingMinimizer):
1823                     polling_manager.stop()
1824                     polling_manager.start()
1825             elif ovs_status == constants.OVS_DEAD:
1826                 # Agent doesn't apply any operations when ovs is dead, to
1827                 # prevent unexpected failure or crash. Sleep and continue
1828                 # loop in which ovs status will be checked periodically.
1829                 port_stats = self.get_port_stats({}, {})
1830                 self.loop_count_and_wait(start, port_stats)
1831                 continue
1832             # Notify the plugin of tunnel IP
1833             if self.enable_tunneling and tunnel_sync:
1834                 LOG.info(_LI("Agent tunnel out of sync with plugin!"))
1835                 try:
1836                     tunnel_sync = self.tunnel_sync()
1837                 except Exception:
1838                     LOG.exception(_LE("Error while synchronizing tunnels"))
1839                     tunnel_sync = True
1840             ovs_restarted |= (ovs_status == constants.OVS_RESTARTED)
1841             if (self._agent_has_updates(polling_manager) or sync
1842                     or ports_not_ready_yet):
1843                 try:
1844                     LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
1845                               "starting polling. Elapsed:%(elapsed).3f",
1846                               {'iter_num': self.iter_num,
1847                                'elapsed': time.time() - start})
1848                     # Save updated ports dict to perform rollback in
1849                     # case resync would be needed, and then clear
1850                     # self.updated_ports. As the greenthread should not yield
1851                     # between these two statements, this will be thread-safe
1852                     updated_ports_copy = self.updated_ports
1853                     self.updated_ports = set()
1854                     (port_info, ancillary_port_info, consecutive_resyncs,
1855                      ports_not_ready_yet) = (self.process_port_info(
1856                             start, polling_manager, sync, ovs_restarted,
1857                             ports, ancillary_ports, updated_ports_copy,
1858                             consecutive_resyncs, ports_not_ready_yet)
1859                     )
1860                     sync = False
1861                     self.process_deleted_ports(port_info)
1862                     ofport_changed_ports = self.update_stale_ofport_rules()
1863                     if ofport_changed_ports:
1864                         port_info.setdefault('updated', set()).update(
1865                             ofport_changed_ports)
1866                     LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
1867                               "port information retrieved. "
1868                               "Elapsed:%(elapsed).3f",
1869                               {'iter_num': self.iter_num,
1870                                'elapsed': time.time() - start})
1871                     # Secure and wire/unwire VIFs and update their status
1872                     # on Neutron server
1873                     if (self._port_info_has_changes(port_info) or
1874                         self.sg_agent.firewall_refresh_needed() or
1875                         ovs_restarted):
1876                         LOG.debug("Starting to process devices in:%s",
1877                                   port_info)
1878                         # If treat devices fails - must resync with plugin
1879                         sync = self.process_network_ports(port_info,
1880                                                           ovs_restarted)
1881                         if not sync and need_clean_stale_flow:
1882                             self.cleanup_stale_flows()
1883                             need_clean_stale_flow = False
1884                         LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
1885                                   "ports processed. Elapsed:%(elapsed).3f",
1886                                   {'iter_num': self.iter_num,
1887                                    'elapsed': time.time() - start})
1888
1889                     ports = port_info['current']
1890
1891                     if self.ancillary_brs:
1892                         sync |= self.process_ancillary_network_ports(
1893                             ancillary_port_info)
1894                         LOG.debug("Agent rpc_loop - iteration: "
1895                                   "%(iter_num)d - ancillary ports "
1896                                   "processed. Elapsed:%(elapsed).3f",
1897                                   {'iter_num': self.iter_num,
1898                                    'elapsed': time.time() - start})
1899                         ancillary_ports = ancillary_port_info['current']
1900
1901                     polling_manager.polling_completed()
1902                     # Keep this flag in the last line of "try" block,
1903                     # so we can sure that no other Exception occurred.
1904                     if not sync:
1905                         ovs_restarted = False
1906                         self._dispose_local_vlan_hints()
1907                 except Exception:
1908                     LOG.exception(_LE("Error while processing VIF ports"))
1909                     # Put the ports back in self.updated_port
1910                     self.updated_ports |= updated_ports_copy
1911                     sync = True
1912             port_stats = self.get_port_stats(port_info, ancillary_port_info)
1913             self.loop_count_and_wait(start, port_stats)
1914
1915     def daemon_loop(self):
1916         # Start everything.
1917         LOG.info(_LI("Agent initialized successfully, now running... "))
1918         signal.signal(signal.SIGTERM, self._handle_sigterm)
1919         if hasattr(signal, 'SIGHUP'):
1920             signal.signal(signal.SIGHUP, self._handle_sighup)
1921         with polling.get_polling_manager(
1922             self.minimize_polling,
1923             self.ovsdb_monitor_respawn_interval) as pm:
1924
1925             self.rpc_loop(polling_manager=pm)
1926
1927     def _handle_sigterm(self, signum, frame):
1928         self.catch_sigterm = True
1929         if self.quitting_rpc_timeout:
1930             self.set_rpc_timeout(self.quitting_rpc_timeout)
1931
1932     def _handle_sighup(self, signum, frame):
1933         self.catch_sighup = True
1934
1935     def _check_and_handle_signal(self):
1936         if self.catch_sigterm:
1937             LOG.info(_LI("Agent caught SIGTERM, quitting daemon loop."))
1938             self.run_daemon_loop = False
1939             self.catch_sigterm = False
1940         if self.catch_sighup:
1941             LOG.info(_LI("Agent caught SIGHUP, resetting."))
1942             self.conf.reload_config_files()
1943             config.setup_logging()
1944             LOG.debug('Full set of CONF:')
1945             self.conf.log_opt_values(LOG, logging.DEBUG)
1946             self.catch_sighup = False
1947         return self.run_daemon_loop
1948
1949     def set_rpc_timeout(self, timeout):
1950         for rpc_api in (self.plugin_rpc, self.sg_plugin_rpc,
1951                         self.dvr_plugin_rpc, self.state_rpc):
1952             rpc_api.client.timeout = timeout
1953
1954     def _check_agent_configurations(self):
1955         if (self.enable_distributed_routing and self.enable_tunneling
1956             and not self.l2_pop):
1957
1958             raise ValueError(_("DVR deployments for VXLAN/GRE/Geneve "
1959                                "underlays require L2-pop to be enabled, "
1960                                "in both the Agent and Server side."))
1961
1962
1963 def validate_local_ip(local_ip):
1964     """Verify if the ip exists on the agent's host."""
1965     if not ip_lib.IPWrapper().get_device_by_ip(local_ip):
1966         LOG.error(_LE("Tunneling can't be enabled with invalid local_ip '%s'."
1967                       " IP couldn't be found on this host's interfaces."),
1968                   local_ip)
1969         raise SystemExit(1)
1970
1971
1972 def validate_tunnel_config(tunnel_types, local_ip):
1973     """Verify local ip and tunnel config if tunneling is enabled."""
1974     if not tunnel_types:
1975         return
1976
1977     validate_local_ip(local_ip)
1978     for tun in tunnel_types:
1979         if tun not in constants.TUNNEL_NETWORK_TYPES:
1980             LOG.error(_LE('Invalid tunnel type specified: %s'), tun)
1981             raise SystemExit(1)
1982
1983
1984 def prepare_xen_compute():
1985     is_xen_compute_host = 'rootwrap-xen-dom0' in cfg.CONF.AGENT.root_helper
1986     if is_xen_compute_host:
1987         # Force ip_lib to always use the root helper to ensure that ip
1988         # commands target xen dom0 rather than domU.
1989         cfg.CONF.register_opts(ip_lib.OPTS)
1990         cfg.CONF.set_default('ip_lib_force_root', True)
1991
1992
1993 def main(bridge_classes):
1994     prepare_xen_compute()
1995     validate_tunnel_config(cfg.CONF.AGENT.tunnel_types, cfg.CONF.OVS.local_ip)
1996
1997     try:
1998         agent = OVSNeutronAgent(bridge_classes, cfg.CONF)
1999     except (RuntimeError, ValueError) as e:
2000         LOG.error(_LE("%s Agent terminated!"), e)
2001         sys.exit(1)
2002     agent.daemon_loop()