Set lock_path correctly.
[openstack-build/neutron-build.git] / neutron / plugins / ml2 / drivers / linuxbridge / agent / linuxbridge_neutron_agent.py
1 #!/usr/bin/env python
2 # Copyright 2012 Cisco Systems, Inc.
3 # All Rights Reserved.
4 #
5 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
6 #    not use this file except in compliance with the License. You may obtain
7 #    a copy of the License at
8 #
9 #         http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #    Unless required by applicable law or agreed to in writing, software
12 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 #    License for the specific language governing permissions and limitations
15 #    under the License.
16 #
17 #
18 # Performs per host Linux Bridge configuration for Neutron.
19 # Based on the structure of the OpenVSwitch agent in the
20 # Neutron OpenVSwitch Plugin.
21
22 import collections
23 import sys
24 import time
25
26 import netaddr
27 from oslo_config import cfg
28 from oslo_log import log as logging
29 import oslo_messaging
30 from oslo_service import loopingcall
31 from oslo_service import service
32 from oslo_utils import excutils
33 from six import moves
34
35 from neutron._i18n import _LE, _LI, _LW
36 from neutron.agent.l2.extensions import manager as ext_manager
37 from neutron.agent.linux import bridge_lib
38 from neutron.agent.linux import ip_lib
39 from neutron.agent.linux import utils
40 from neutron.agent import rpc as agent_rpc
41 from neutron.agent import securitygroups_rpc as sg_rpc
42 from neutron.common import config as common_config
43 from neutron.common import constants
44 from neutron.common import exceptions
45 from neutron.common import topics
46 from neutron.common import utils as n_utils
47 from neutron import context
48 from neutron.plugins.common import constants as p_const
49 from neutron.plugins.ml2.drivers.l2pop.rpc_manager \
50     import l2population_rpc as l2pop_rpc
51 from neutron.plugins.ml2.drivers.linuxbridge.agent import arp_protect
52 from neutron.plugins.ml2.drivers.linuxbridge.agent.common import config  # noqa
53 from neutron.plugins.ml2.drivers.linuxbridge.agent.common \
54     import constants as lconst
55
56
57 LOG = logging.getLogger(__name__)
58
59 BRIDGE_NAME_PREFIX = "brq"
60 VXLAN_INTERFACE_PREFIX = "vxlan-"
61
62
63 class NetworkSegment(object):
64     def __init__(self, network_type, physical_network, segmentation_id):
65         self.network_type = network_type
66         self.physical_network = physical_network
67         self.segmentation_id = segmentation_id
68
69
70 class LinuxBridgeManager(object):
71     def __init__(self, bridge_mappings, interface_mappings):
72         self.bridge_mappings = bridge_mappings
73         self.interface_mappings = interface_mappings
74         self.validate_interface_mappings()
75         self.validate_bridge_mappings()
76         self.ip = ip_lib.IPWrapper()
77         # VXLAN related parameters:
78         self.local_ip = cfg.CONF.VXLAN.local_ip
79         self.vxlan_mode = lconst.VXLAN_NONE
80         if cfg.CONF.VXLAN.enable_vxlan:
81             device = self.get_local_ip_device(self.local_ip)
82             self.validate_vxlan_group_with_local_ip()
83             self.local_int = device.name
84             self.check_vxlan_support()
85         # Store network mapping to segments
86         self.network_map = {}
87
88     def validate_interface_mappings(self):
89         for physnet, interface in self.interface_mappings.items():
90             if not ip_lib.device_exists(interface):
91                 LOG.error(_LE("Interface %(intf)s for physical network %(net)s"
92                               " does not exist. Agent terminated!"),
93                           {'intf': interface, 'net': physnet})
94                 sys.exit(1)
95
96     def validate_bridge_mappings(self):
97         for physnet, bridge in self.bridge_mappings.items():
98             if not ip_lib.device_exists(bridge):
99                 LOG.error(_LE("Bridge %(brq)s for physical network %(net)s"
100                               " does not exist. Agent terminated!"),
101                           {'brq': bridge, 'net': physnet})
102                 sys.exit(1)
103
104     def validate_vxlan_group_with_local_ip(self):
105         if not cfg.CONF.VXLAN.vxlan_group:
106             return
107         try:
108             ip_addr = netaddr.IPAddress(self.local_ip)
109             # Ensure the configured group address/range is valid and multicast
110             group_net = netaddr.IPNetwork(cfg.CONF.VXLAN.vxlan_group)
111             if not group_net.is_multicast():
112                 raise ValueError()
113             if not ip_addr.version == group_net.version:
114                 raise ValueError()
115         except (netaddr.core.AddrFormatError, ValueError):
116             LOG.error(_LE("Invalid VXLAN Group: %(group)s, must be an address "
117                           "or network (in CIDR notation) in a multicast "
118                           "range of the same address family as local_ip: "
119                           "%(ip)s"),
120                       {'group': cfg.CONF.VXLAN.vxlan_group,
121                        'ip': self.local_ip})
122             sys.exit(1)
123
124     def get_local_ip_device(self, local_ip):
125         """Return the device with local_ip on the host."""
126         device = self.ip.get_device_by_ip(local_ip)
127         if not device:
128             LOG.error(_LE("Tunneling cannot be enabled without the local_ip "
129                           "bound to an interface on the host. Please "
130                           "configure local_ip %s on the host interface to "
131                           "be used for tunneling and restart the agent."),
132                       local_ip)
133             sys.exit(1)
134         return device
135
136     def get_existing_bridge_name(self, physical_network):
137         if not physical_network:
138             return None
139         return self.bridge_mappings.get(physical_network)
140
141     def get_bridge_name(self, network_id):
142         if not network_id:
143             LOG.warning(_LW("Invalid Network ID, will lead to incorrect "
144                             "bridge name"))
145         bridge_name = BRIDGE_NAME_PREFIX + network_id[0:11]
146         return bridge_name
147
148     def get_subinterface_name(self, physical_interface, vlan_id):
149         if not vlan_id:
150             LOG.warning(_LW("Invalid VLAN ID, will lead to incorrect "
151                             "subinterface name"))
152         subinterface_name = '%s.%s' % (physical_interface, vlan_id)
153         return subinterface_name
154
155     def get_tap_device_name(self, interface_id):
156         if not interface_id:
157             LOG.warning(_LW("Invalid Interface ID, will lead to incorrect "
158                             "tap device name"))
159         tap_device_name = constants.TAP_DEVICE_PREFIX + interface_id[0:11]
160         return tap_device_name
161
162     def get_vxlan_device_name(self, segmentation_id):
163         if 0 <= int(segmentation_id) <= p_const.MAX_VXLAN_VNI:
164             return VXLAN_INTERFACE_PREFIX + str(segmentation_id)
165         else:
166             LOG.warning(_LW("Invalid Segmentation ID: %s, will lead to "
167                             "incorrect vxlan device name"), segmentation_id)
168
169     def get_vxlan_group(self, segmentation_id):
170         net = netaddr.IPNetwork(cfg.CONF.VXLAN.vxlan_group)
171         # Map the segmentation ID to (one of) the group address(es)
172         return str(net.network +
173                    (int(segmentation_id) & int(net.hostmask)))
174
175     def get_deletable_bridges(self):
176         bridge_list = bridge_lib.get_bridge_names()
177         bridges = {b for b in bridge_list if b.startswith(BRIDGE_NAME_PREFIX)}
178         bridges.difference_update(self.bridge_mappings.values())
179         return bridges
180
181     def get_tap_devices_count(self, bridge_name):
182         if_list = bridge_lib.BridgeDevice(bridge_name).get_interfaces()
183         return len([interface for interface in if_list if
184                     interface.startswith(constants.TAP_DEVICE_PREFIX)])
185
186     def ensure_vlan_bridge(self, network_id, phy_bridge_name,
187                            physical_interface, vlan_id):
188         """Create a vlan and bridge unless they already exist."""
189         interface = self.ensure_vlan(physical_interface, vlan_id)
190         if phy_bridge_name:
191             return self.ensure_bridge(phy_bridge_name)
192         else:
193             bridge_name = self.get_bridge_name(network_id)
194             ips, gateway = self.get_interface_details(interface)
195             if self.ensure_bridge(bridge_name, interface, ips, gateway):
196                 return interface
197
198     def ensure_vxlan_bridge(self, network_id, segmentation_id):
199         """Create a vxlan and bridge unless they already exist."""
200         interface = self.ensure_vxlan(segmentation_id)
201         if not interface:
202             LOG.error(_LE("Failed creating vxlan interface for "
203                           "%(segmentation_id)s"),
204                       {segmentation_id: segmentation_id})
205             return
206         bridge_name = self.get_bridge_name(network_id)
207         self.ensure_bridge(bridge_name, interface)
208         return interface
209
210     def get_interface_details(self, interface):
211         device = self.ip.device(interface)
212         ips = device.addr.list(scope='global')
213
214         # Update default gateway if necessary
215         gateway = device.route.get_gateway(scope='global')
216         return ips, gateway
217
218     def ensure_flat_bridge(self, network_id, phy_bridge_name,
219                            physical_interface):
220         """Create a non-vlan bridge unless it already exists."""
221         if phy_bridge_name:
222             return self.ensure_bridge(phy_bridge_name)
223         else:
224             bridge_name = self.get_bridge_name(network_id)
225             ips, gateway = self.get_interface_details(physical_interface)
226             if self.ensure_bridge(bridge_name, physical_interface, ips,
227                                   gateway):
228                 return physical_interface
229
230     def ensure_local_bridge(self, network_id, phy_bridge_name):
231         """Create a local bridge unless it already exists."""
232         if phy_bridge_name:
233             bridge_name = phy_bridge_name
234         else:
235             bridge_name = self.get_bridge_name(network_id)
236         return self.ensure_bridge(bridge_name)
237
238     def ensure_vlan(self, physical_interface, vlan_id):
239         """Create a vlan unless it already exists."""
240         interface = self.get_subinterface_name(physical_interface, vlan_id)
241         if not ip_lib.device_exists(interface):
242             LOG.debug("Creating subinterface %(interface)s for "
243                       "VLAN %(vlan_id)s on interface "
244                       "%(physical_interface)s",
245                       {'interface': interface, 'vlan_id': vlan_id,
246                        'physical_interface': physical_interface})
247             if utils.execute(['ip', 'link', 'add', 'link',
248                               physical_interface,
249                               'name', interface, 'type', 'vlan', 'id',
250                               vlan_id], run_as_root=True):
251                 return
252             if utils.execute(['ip', 'link', 'set',
253                               interface, 'up'], run_as_root=True):
254                 return
255             LOG.debug("Done creating subinterface %s", interface)
256         return interface
257
258     def ensure_vxlan(self, segmentation_id):
259         """Create a vxlan unless it already exists."""
260         interface = self.get_vxlan_device_name(segmentation_id)
261         if not ip_lib.device_exists(interface):
262             LOG.debug("Creating vxlan interface %(interface)s for "
263                       "VNI %(segmentation_id)s",
264                       {'interface': interface,
265                        'segmentation_id': segmentation_id})
266             args = {'dev': self.local_int}
267             if self.vxlan_mode == lconst.VXLAN_MCAST:
268                 args['group'] = self.get_vxlan_group(segmentation_id)
269             if cfg.CONF.VXLAN.ttl:
270                 args['ttl'] = cfg.CONF.VXLAN.ttl
271             if cfg.CONF.VXLAN.tos:
272                 args['tos'] = cfg.CONF.VXLAN.tos
273             if cfg.CONF.VXLAN.l2_population:
274                 args['proxy'] = True
275             try:
276                 int_vxlan = self.ip.add_vxlan(interface, segmentation_id,
277                                               **args)
278             except RuntimeError:
279                 with excutils.save_and_reraise_exception() as ctxt:
280                     # perform this check after an attempt rather than before
281                     # to avoid excessive lookups and a possible race condition.
282                     if ip_lib.vxlan_in_use(segmentation_id):
283                         ctxt.reraise = False
284                         LOG.error(_LE("Unable to create VXLAN interface for "
285                                       "VNI %s because it is in use by another "
286                                       "interface."), segmentation_id)
287                         return None
288             int_vxlan.link.set_up()
289             LOG.debug("Done creating vxlan interface %s", interface)
290         return interface
291
292     def update_interface_ip_details(self, destination, source, ips,
293                                     gateway):
294         if ips or gateway:
295             dst_device = self.ip.device(destination)
296             src_device = self.ip.device(source)
297
298         # Append IP's to bridge if necessary
299         if ips:
300             for ip in ips:
301                 dst_device.addr.add(cidr=ip['cidr'])
302
303         if gateway:
304             # Ensure that the gateway can be updated by changing the metric
305             metric = 100
306             if 'metric' in gateway:
307                 metric = gateway['metric'] - 1
308             dst_device.route.add_gateway(gateway=gateway['gateway'],
309                                          metric=metric)
310             src_device.route.delete_gateway(gateway=gateway['gateway'])
311
312         # Remove IP's from interface
313         if ips:
314             for ip in ips:
315                 src_device.addr.delete(cidr=ip['cidr'])
316
317     def _bridge_exists_and_ensure_up(self, bridge_name):
318         """Check if the bridge exists and make sure it is up."""
319         br = ip_lib.IPDevice(bridge_name)
320         br.set_log_fail_as_error(False)
321         try:
322             # If the device doesn't exist this will throw a RuntimeError
323             br.link.set_up()
324         except RuntimeError:
325             return False
326         return True
327
328     def ensure_bridge(self, bridge_name, interface=None, ips=None,
329                       gateway=None):
330         """Create a bridge unless it already exists."""
331         # _bridge_exists_and_ensure_up instead of device_exists is used here
332         # because there are cases where the bridge exists but it's not UP,
333         # for example:
334         # 1) A greenthread was executing this function and had not yet executed
335         # "ip link set bridge_name up" before eventlet switched to this
336         # thread running the same function
337         # 2) The Nova VIF driver was running concurrently and had just created
338         #    the bridge, but had not yet put it UP
339         if not self._bridge_exists_and_ensure_up(bridge_name):
340             LOG.debug("Starting bridge %(bridge_name)s for subinterface "
341                       "%(interface)s",
342                       {'bridge_name': bridge_name, 'interface': interface})
343             bridge_device = bridge_lib.BridgeDevice.addbr(bridge_name)
344             if bridge_device.setfd(0):
345                 return
346             if bridge_device.disable_stp():
347                 return
348             if bridge_device.disable_ipv6():
349                 return
350             if bridge_device.link.set_up():
351                 return
352             LOG.debug("Done starting bridge %(bridge_name)s for "
353                       "subinterface %(interface)s",
354                       {'bridge_name': bridge_name, 'interface': interface})
355         else:
356             bridge_device = bridge_lib.BridgeDevice(bridge_name)
357
358         if not interface:
359             return bridge_name
360
361         # Update IP info if necessary
362         self.update_interface_ip_details(bridge_name, interface, ips, gateway)
363
364         # Check if the interface is part of the bridge
365         if not bridge_device.owns_interface(interface):
366             try:
367                 # Check if the interface is not enslaved in another bridge
368                 bridge = bridge_lib.BridgeDevice.get_interface_bridge(
369                     interface)
370                 if bridge:
371                     bridge.delif(interface)
372
373                 bridge_device.addif(interface)
374             except Exception as e:
375                 LOG.error(_LE("Unable to add %(interface)s to %(bridge_name)s"
376                               "! Exception: %(e)s"),
377                           {'interface': interface, 'bridge_name': bridge_name,
378                            'e': e})
379                 return
380         return bridge_name
381
382     def ensure_physical_in_bridge(self, network_id,
383                                   network_type,
384                                   physical_network,
385                                   segmentation_id):
386         if network_type == p_const.TYPE_VXLAN:
387             if self.vxlan_mode == lconst.VXLAN_NONE:
388                 LOG.error(_LE("Unable to add vxlan interface for network %s"),
389                           network_id)
390                 return
391             return self.ensure_vxlan_bridge(network_id, segmentation_id)
392
393         # NOTE(nick-ma-z): Obtain mappings of physical bridge and interfaces
394         physical_bridge = self.get_existing_bridge_name(physical_network)
395         physical_interface = self.interface_mappings.get(physical_network)
396         if not physical_bridge and not physical_interface:
397             LOG.error(_LE("No bridge or interface mappings"
398                           " for physical network %s"),
399                       physical_network)
400             return
401         if network_type == p_const.TYPE_FLAT:
402             return self.ensure_flat_bridge(network_id, physical_bridge,
403                                            physical_interface)
404         elif network_type == p_const.TYPE_VLAN:
405             return self.ensure_vlan_bridge(network_id, physical_bridge,
406                                            physical_interface,
407                                            segmentation_id)
408         else:
409             LOG.error(_LE("Unknown network_type %(network_type)s for network "
410                           "%(network_id)s."), {network_type: network_type,
411                                              network_id: network_id})
412
413     def add_tap_interface(self, network_id, network_type, physical_network,
414                           segmentation_id, tap_device_name, device_owner):
415         """Add tap interface.
416
417         If a VIF has been plugged into a network, this function will
418         add the corresponding tap device to the relevant bridge.
419         """
420         if not ip_lib.device_exists(tap_device_name):
421             LOG.debug("Tap device: %s does not exist on "
422                       "this host, skipped", tap_device_name)
423             return False
424
425         bridge_name = self.get_existing_bridge_name(physical_network)
426         if not bridge_name:
427             bridge_name = self.get_bridge_name(network_id)
428
429         if network_type == p_const.TYPE_LOCAL:
430             self.ensure_local_bridge(network_id, bridge_name)
431         else:
432             phy_dev_name = self.ensure_physical_in_bridge(network_id,
433                                                           network_type,
434                                                           physical_network,
435                                                           segmentation_id)
436             if not phy_dev_name:
437                 return False
438             self.ensure_tap_mtu(tap_device_name, phy_dev_name)
439         # Avoid messing with plugging devices into a bridge that the agent
440         # does not own
441         if device_owner.startswith(constants.DEVICE_OWNER_PREFIXES):
442             # Check if device needs to be added to bridge
443             if not bridge_lib.BridgeDevice.get_interface_bridge(
444                 tap_device_name):
445                 data = {'tap_device_name': tap_device_name,
446                         'bridge_name': bridge_name}
447                 LOG.debug("Adding device %(tap_device_name)s to bridge "
448                           "%(bridge_name)s", data)
449                 if bridge_lib.BridgeDevice(bridge_name).addif(tap_device_name):
450                     return False
451         else:
452             data = {'tap_device_name': tap_device_name,
453                     'device_owner': device_owner,
454                     'bridge_name': bridge_name}
455             LOG.debug("Skip adding device %(tap_device_name)s to "
456                       "%(bridge_name)s. It is owned by %(device_owner)s and "
457                       "thus added elsewhere.", data)
458         return True
459
460     def ensure_tap_mtu(self, tap_dev_name, phy_dev_name):
461         """Ensure the MTU on the tap is the same as the physical device."""
462         phy_dev_mtu = ip_lib.IPDevice(phy_dev_name).link.mtu
463         ip_lib.IPDevice(tap_dev_name).link.set_mtu(phy_dev_mtu)
464
465     def add_interface(self, network_id, network_type, physical_network,
466                       segmentation_id, port_id, device_owner):
467         self.network_map[network_id] = NetworkSegment(network_type,
468                                                       physical_network,
469                                                       segmentation_id)
470         tap_device_name = self.get_tap_device_name(port_id)
471         return self.add_tap_interface(network_id, network_type,
472                                       physical_network, segmentation_id,
473                                       tap_device_name, device_owner)
474
475     def delete_bridge(self, bridge_name):
476         bridge_device = bridge_lib.BridgeDevice(bridge_name)
477         if bridge_device.exists():
478             physical_interfaces = set(self.interface_mappings.values())
479             interfaces_on_bridge = bridge_device.get_interfaces()
480             for interface in interfaces_on_bridge:
481                 self.remove_interface(bridge_name, interface)
482
483                 if interface.startswith(VXLAN_INTERFACE_PREFIX):
484                     self.delete_interface(interface)
485                 else:
486                     # Match the vlan/flat interface in the bridge.
487                     # If the bridge has an IP, it mean that this IP was moved
488                     # from the current interface, which also mean that this
489                     # interface was not created by the agent.
490                     ips, gateway = self.get_interface_details(bridge_name)
491                     if ips:
492                         self.update_interface_ip_details(interface,
493                                                          bridge_name,
494                                                          ips, gateway)
495                     elif interface not in physical_interfaces:
496                         self.delete_interface(interface)
497
498             LOG.debug("Deleting bridge %s", bridge_name)
499             if bridge_device.link.set_down():
500                 return
501             if bridge_device.delbr():
502                 return
503             LOG.debug("Done deleting bridge %s", bridge_name)
504
505         else:
506             LOG.debug("Cannot delete bridge %s; it does not exist",
507                       bridge_name)
508
509     def remove_interface(self, bridge_name, interface_name):
510         bridge_device = bridge_lib.BridgeDevice(bridge_name)
511         if bridge_device.exists():
512             if not bridge_lib.is_bridged_interface(interface_name):
513                 return True
514             LOG.debug("Removing device %(interface_name)s from bridge "
515                       "%(bridge_name)s",
516                       {'interface_name': interface_name,
517                        'bridge_name': bridge_name})
518             if bridge_device.delif(interface_name):
519                 return False
520             LOG.debug("Done removing device %(interface_name)s from bridge "
521                       "%(bridge_name)s",
522                       {'interface_name': interface_name,
523                        'bridge_name': bridge_name})
524             return True
525         else:
526             LOG.debug("Cannot remove device %(interface_name)s bridge "
527                       "%(bridge_name)s does not exist",
528                       {'interface_name': interface_name,
529                        'bridge_name': bridge_name})
530             return False
531
532     def delete_interface(self, interface):
533         device = self.ip.device(interface)
534         if device.exists():
535             LOG.debug("Deleting interface %s",
536                       interface)
537             device.link.set_down()
538             device.link.delete()
539             LOG.debug("Done deleting interface %s", interface)
540
541     def get_tap_devices(self):
542         devices = set()
543         for device in bridge_lib.get_bridge_names():
544             if device.startswith(constants.TAP_DEVICE_PREFIX):
545                 devices.add(device)
546         return devices
547
548     def vxlan_ucast_supported(self):
549         if not cfg.CONF.VXLAN.l2_population:
550             return False
551         if not ip_lib.iproute_arg_supported(
552                 ['bridge', 'fdb'], 'append'):
553             LOG.warning(_LW('Option "%(option)s" must be supported by command '
554                             '"%(command)s" to enable %(mode)s mode'),
555                         {'option': 'append',
556                          'command': 'bridge fdb',
557                          'mode': 'VXLAN UCAST'})
558             return False
559
560         test_iface = None
561         for seg_id in moves.range(1, p_const.MAX_VXLAN_VNI + 1):
562             if (ip_lib.device_exists(self.get_vxlan_device_name(seg_id))
563                     or ip_lib.vxlan_in_use(seg_id)):
564                 continue
565             test_iface = self.ensure_vxlan(seg_id)
566             break
567         else:
568             LOG.error(_LE('No valid Segmentation ID to perform UCAST test.'))
569             return False
570
571         try:
572             utils.execute(
573                 cmd=['bridge', 'fdb', 'append', constants.FLOODING_ENTRY[0],
574                      'dev', test_iface, 'dst', '1.1.1.1'],
575                 run_as_root=True, log_fail_as_error=False)
576             return True
577         except RuntimeError:
578             return False
579         finally:
580             self.delete_interface(test_iface)
581
582     def vxlan_mcast_supported(self):
583         if not cfg.CONF.VXLAN.vxlan_group:
584             LOG.warning(_LW('VXLAN muticast group(s) must be provided in '
585                             'vxlan_group option to enable VXLAN MCAST mode'))
586             return False
587         if not ip_lib.iproute_arg_supported(
588                 ['ip', 'link', 'add', 'type', 'vxlan'],
589                 'proxy'):
590             LOG.warning(_LW('Option "%(option)s" must be supported by command '
591                             '"%(command)s" to enable %(mode)s mode'),
592                         {'option': 'proxy',
593                          'command': 'ip link add type vxlan',
594                          'mode': 'VXLAN MCAST'})
595
596             return False
597         return True
598
599     def check_vxlan_support(self):
600         self.vxlan_mode = lconst.VXLAN_NONE
601
602         if self.vxlan_ucast_supported():
603             self.vxlan_mode = lconst.VXLAN_UCAST
604         elif self.vxlan_mcast_supported():
605             self.vxlan_mode = lconst.VXLAN_MCAST
606         else:
607             raise exceptions.VxlanNetworkUnsupported()
608         LOG.debug('Using %s VXLAN mode', self.vxlan_mode)
609
610     def fdb_ip_entry_exists(self, mac, ip, interface):
611         entries = utils.execute(['ip', 'neigh', 'show', 'to', ip,
612                                  'dev', interface],
613                                 run_as_root=True)
614         return mac in entries
615
616     def fdb_bridge_entry_exists(self, mac, interface, agent_ip=None):
617         entries = utils.execute(['bridge', 'fdb', 'show', 'dev', interface],
618                                 run_as_root=True)
619         if not agent_ip:
620             return mac in entries
621
622         return (agent_ip in entries and mac in entries)
623
624     def add_fdb_ip_entry(self, mac, ip, interface):
625         ip_lib.IPDevice(interface).neigh.add(ip, mac)
626
627     def remove_fdb_ip_entry(self, mac, ip, interface):
628         ip_lib.IPDevice(interface).neigh.delete(ip, mac)
629
630     def add_fdb_bridge_entry(self, mac, agent_ip, interface, operation="add"):
631         utils.execute(['bridge', 'fdb', operation, mac, 'dev', interface,
632                        'dst', agent_ip],
633                       run_as_root=True,
634                       check_exit_code=False)
635
636     def remove_fdb_bridge_entry(self, mac, agent_ip, interface):
637         utils.execute(['bridge', 'fdb', 'del', mac, 'dev', interface,
638                        'dst', agent_ip],
639                       run_as_root=True,
640                       check_exit_code=False)
641
642     def add_fdb_entries(self, agent_ip, ports, interface):
643         for mac, ip in ports:
644             if mac != constants.FLOODING_ENTRY[0]:
645                 self.add_fdb_ip_entry(mac, ip, interface)
646                 self.add_fdb_bridge_entry(mac, agent_ip, interface,
647                                           operation="replace")
648             elif self.vxlan_mode == lconst.VXLAN_UCAST:
649                 if self.fdb_bridge_entry_exists(mac, interface):
650                     self.add_fdb_bridge_entry(mac, agent_ip, interface,
651                                               "append")
652                 else:
653                     self.add_fdb_bridge_entry(mac, agent_ip, interface)
654
655     def remove_fdb_entries(self, agent_ip, ports, interface):
656         for mac, ip in ports:
657             if mac != constants.FLOODING_ENTRY[0]:
658                 self.remove_fdb_ip_entry(mac, ip, interface)
659                 self.remove_fdb_bridge_entry(mac, agent_ip, interface)
660             elif self.vxlan_mode == lconst.VXLAN_UCAST:
661                 self.remove_fdb_bridge_entry(mac, agent_ip, interface)
662
663
664 class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
665                               l2pop_rpc.L2populationRpcCallBackMixin):
666
667     # Set RPC API version to 1.0 by default.
668     # history
669     #   1.1 Support Security Group RPC
670     #   1.3 Added param devices_to_update to security_groups_provider_updated
671     #   1.4 Added support for network_update
672     target = oslo_messaging.Target(version='1.4')
673
674     def __init__(self, context, agent, sg_agent):
675         super(LinuxBridgeRpcCallbacks, self).__init__()
676         self.context = context
677         self.agent = agent
678         self.sg_agent = sg_agent
679
680     def network_delete(self, context, **kwargs):
681         LOG.debug("network_delete received")
682         network_id = kwargs.get('network_id')
683
684         # NOTE(nick-ma-z): Don't remove pre-existing user-defined bridges
685         if network_id in self.agent.br_mgr.network_map:
686             phynet = self.agent.br_mgr.network_map[network_id].physical_network
687             if phynet and phynet in self.agent.br_mgr.bridge_mappings:
688                 LOG.info(_LI("Physical network %s is defined in "
689                              "bridge_mappings and cannot be deleted."),
690                          network_id)
691                 return
692         else:
693             LOG.error(_LE("Network %s is not available."), network_id)
694             return
695
696         bridge_name = self.agent.br_mgr.get_bridge_name(network_id)
697         LOG.debug("Delete %s", bridge_name)
698         self.agent.br_mgr.delete_bridge(bridge_name)
699
700     def port_update(self, context, **kwargs):
701         port_id = kwargs['port']['id']
702         tap_name = self.agent.br_mgr.get_tap_device_name(port_id)
703         # Put the tap name in the updated_devices set.
704         # Do not store port details, as if they're used for processing
705         # notifications there is no guarantee the notifications are
706         # processed in the same order as the relevant API requests.
707         self.agent.updated_devices.add(tap_name)
708         LOG.debug("port_update RPC received for port: %s", port_id)
709
710     def network_update(self, context, **kwargs):
711         network_id = kwargs['network']['id']
712         LOG.debug("network_update message processed for network "
713                   "%(network_id)s, with ports: %(ports)s",
714                   {'network_id': network_id,
715                    'ports': self.agent.network_ports[network_id]})
716         for port_data in self.agent.network_ports[network_id]:
717             self.agent.updated_devices.add(port_data['device'])
718
719     def fdb_add(self, context, fdb_entries):
720         LOG.debug("fdb_add received")
721         for network_id, values in fdb_entries.items():
722             segment = self.agent.br_mgr.network_map.get(network_id)
723             if not segment:
724                 return
725
726             if segment.network_type != p_const.TYPE_VXLAN:
727                 return
728
729             interface = self.agent.br_mgr.get_vxlan_device_name(
730                 segment.segmentation_id)
731
732             agent_ports = values.get('ports')
733             for agent_ip, ports in agent_ports.items():
734                 if agent_ip == self.agent.br_mgr.local_ip:
735                     continue
736
737                 self.agent.br_mgr.add_fdb_entries(agent_ip,
738                                                   ports,
739                                                   interface)
740
741     def fdb_remove(self, context, fdb_entries):
742         LOG.debug("fdb_remove received")
743         for network_id, values in fdb_entries.items():
744             segment = self.agent.br_mgr.network_map.get(network_id)
745             if not segment:
746                 return
747
748             if segment.network_type != p_const.TYPE_VXLAN:
749                 return
750
751             interface = self.agent.br_mgr.get_vxlan_device_name(
752                 segment.segmentation_id)
753
754             agent_ports = values.get('ports')
755             for agent_ip, ports in agent_ports.items():
756                 if agent_ip == self.agent.br_mgr.local_ip:
757                     continue
758
759                 self.agent.br_mgr.remove_fdb_entries(agent_ip,
760                                                      ports,
761                                                      interface)
762
763     def _fdb_chg_ip(self, context, fdb_entries):
764         LOG.debug("update chg_ip received")
765         for network_id, agent_ports in fdb_entries.items():
766             segment = self.agent.br_mgr.network_map.get(network_id)
767             if not segment:
768                 return
769
770             if segment.network_type != p_const.TYPE_VXLAN:
771                 return
772
773             interface = self.agent.br_mgr.get_vxlan_device_name(
774                 segment.segmentation_id)
775
776             for agent_ip, state in agent_ports.items():
777                 if agent_ip == self.agent.br_mgr.local_ip:
778                     continue
779
780                 after = state.get('after', [])
781                 for mac, ip in after:
782                     self.agent.br_mgr.add_fdb_ip_entry(mac, ip, interface)
783
784                 before = state.get('before', [])
785                 for mac, ip in before:
786                     self.agent.br_mgr.remove_fdb_ip_entry(mac, ip, interface)
787
788     def fdb_update(self, context, fdb_entries):
789         LOG.debug("fdb_update received")
790         for action, values in fdb_entries.items():
791             method = '_fdb_' + action
792             if not hasattr(self, method):
793                 raise NotImplementedError()
794
795             getattr(self, method)(context, values)
796
797
798 class LinuxBridgeNeutronAgentRPC(service.Service):
799
800     def __init__(self, bridge_mappings, interface_mappings, polling_interval,
801                  quitting_rpc_timeout):
802         """Constructor.
803
804         :param bridge_mappings: dict mapping physical_networks to
805                physical_bridges.
806         :param interface_mappings: dict mapping physical_networks to
807                physical_interfaces.
808         :param polling_interval: interval (secs) to poll DB.
809         :param quitting_rpc_timeout: timeout in seconds for rpc calls after
810                stop is called.
811         """
812         super(LinuxBridgeNeutronAgentRPC, self).__init__()
813         self.interface_mappings = interface_mappings
814         self.bridge_mappings = bridge_mappings
815         self.polling_interval = polling_interval
816         self.quitting_rpc_timeout = quitting_rpc_timeout
817
818     def start(self):
819         self.prevent_arp_spoofing = cfg.CONF.AGENT.prevent_arp_spoofing
820         self.setup_linux_bridge(self.bridge_mappings, self.interface_mappings)
821
822         # stores received port_updates and port_deletes for
823         # processing by the main loop
824         self.updated_devices = set()
825
826         # stores all configured ports on agent
827         self.network_ports = collections.defaultdict(list)
828         # flag to do a sync after revival
829         self.fullsync = False
830         self.context = context.get_admin_context_without_session()
831         self.setup_rpc(self.interface_mappings.values())
832         self.init_extension_manager(self.connection)
833
834         configurations = {
835             'bridge_mappings': self.bridge_mappings,
836             'interface_mappings': self.interface_mappings,
837             'extensions': self.ext_manager.names()
838         }
839         if self.br_mgr.vxlan_mode != lconst.VXLAN_NONE:
840             configurations['tunneling_ip'] = self.br_mgr.local_ip
841             configurations['tunnel_types'] = [p_const.TYPE_VXLAN]
842             configurations['l2_population'] = cfg.CONF.VXLAN.l2_population
843         self.agent_state = {
844             'binary': 'neutron-linuxbridge-agent',
845             'host': cfg.CONF.host,
846             'topic': constants.L2_AGENT_TOPIC,
847             'configurations': configurations,
848             'agent_type': constants.AGENT_TYPE_LINUXBRIDGE,
849             'start_flag': True}
850
851         report_interval = cfg.CONF.AGENT.report_interval
852         if report_interval:
853             heartbeat = loopingcall.FixedIntervalLoopingCall(
854                 self._report_state)
855             heartbeat.start(interval=report_interval)
856         self.daemon_loop()
857
858     def stop(self, graceful=True):
859         LOG.info(_LI("Stopping linuxbridge agent."))
860         if graceful and self.quitting_rpc_timeout:
861             self.set_rpc_timeout(self.quitting_rpc_timeout)
862         super(LinuxBridgeNeutronAgentRPC, self).stop(graceful)
863
864     def reset(self):
865         common_config.setup_logging()
866
867     def _report_state(self):
868         try:
869             devices = len(self.br_mgr.get_tap_devices())
870             self.agent_state.get('configurations')['devices'] = devices
871             agent_status = self.state_rpc.report_state(self.context,
872                                                        self.agent_state,
873                                                        True)
874             if agent_status == constants.AGENT_REVIVED:
875                 LOG.info(_LI('Agent has just been revived. '
876                              'Doing a full sync.'))
877                 self.fullsync = True
878             self.agent_state.pop('start_flag', None)
879         except Exception:
880             LOG.exception(_LE("Failed reporting state!"))
881
882     def setup_rpc(self, physical_interfaces):
883         if physical_interfaces:
884             mac = utils.get_interface_mac(physical_interfaces[0])
885         else:
886             devices = ip_lib.IPWrapper().get_devices(True)
887             if devices:
888                 mac = utils.get_interface_mac(devices[0].name)
889             else:
890                 LOG.error(_LE("Unable to obtain MAC address for unique ID. "
891                               "Agent terminated!"))
892                 exit(1)
893
894         self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
895         self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
896         self.sg_agent = sg_rpc.SecurityGroupAgentRpc(
897             self.context, self.sg_plugin_rpc, defer_refresh_firewall=True)
898
899         self.agent_id = '%s%s' % ('lb', (mac.replace(":", "")))
900         LOG.info(_LI("RPC agent_id: %s"), self.agent_id)
901
902         self.topic = topics.AGENT
903         self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS)
904         # RPC network init
905         # Handle updates from service
906         self.endpoints = [LinuxBridgeRpcCallbacks(self.context, self,
907                                                   self.sg_agent)]
908         # Define the listening consumers for the agent
909         consumers = [[topics.PORT, topics.UPDATE],
910                      [topics.NETWORK, topics.DELETE],
911                      [topics.NETWORK, topics.UPDATE],
912                      [topics.SECURITY_GROUP, topics.UPDATE]]
913
914         if cfg.CONF.VXLAN.l2_population:
915             consumers.append([topics.L2POPULATION, topics.UPDATE])
916         self.connection = agent_rpc.create_consumers(self.endpoints,
917                                                      self.topic,
918                                                      consumers)
919
920     def init_extension_manager(self, connection):
921         ext_manager.register_opts(cfg.CONF)
922         self.ext_manager = (
923             ext_manager.AgentExtensionsManager(cfg.CONF))
924         self.ext_manager.initialize(
925             connection, lconst.EXTENSION_DRIVER_TYPE)
926
927     def setup_linux_bridge(self, bridge_mappings, interface_mappings):
928         self.br_mgr = LinuxBridgeManager(bridge_mappings, interface_mappings)
929
930     def _ensure_port_admin_state(self, port_id, admin_state_up):
931         LOG.debug("Setting admin_state_up to %s for port %s",
932                   admin_state_up, port_id)
933         tap_name = self.br_mgr.get_tap_device_name(port_id)
934         if admin_state_up:
935             ip_lib.IPDevice(tap_name).link.set_up()
936         else:
937             ip_lib.IPDevice(tap_name).link.set_down()
938
939     def _clean_network_ports(self, device):
940         for netid, ports_list in self.network_ports.items():
941             for port_data in ports_list:
942                 if device == port_data['device']:
943                     ports_list.remove(port_data)
944                     if ports_list == []:
945                         self.network_ports.pop(netid)
946                     return port_data['port_id']
947
948     def _update_network_ports(self, network_id, port_id, device):
949         self._clean_network_ports(device)
950         self.network_ports[network_id].append({
951             "port_id": port_id,
952             "device": device
953         })
954
955     def process_network_devices(self, device_info):
956         resync_a = False
957         resync_b = False
958
959         self.sg_agent.setup_port_filters(device_info.get('added'),
960                                          device_info.get('updated'))
961         # Updated devices are processed the same as new ones, as their
962         # admin_state_up may have changed. The set union prevents duplicating
963         # work when a device is new and updated in the same polling iteration.
964         devices_added_updated = (set(device_info.get('added'))
965                                  | set(device_info.get('updated')))
966         if devices_added_updated:
967             resync_a = self.treat_devices_added_updated(devices_added_updated)
968
969         if device_info.get('removed'):
970             resync_b = self.treat_devices_removed(device_info['removed'])
971         # If one of the above operations fails => resync with plugin
972         return (resync_a | resync_b)
973
974     def treat_devices_added_updated(self, devices):
975         try:
976             devices_details_list = self.plugin_rpc.get_devices_details_list(
977                 self.context, devices, self.agent_id)
978         except Exception:
979             LOG.exception(_LE("Unable to get port details for %s"), devices)
980             # resync is needed
981             return True
982
983         for device_details in devices_details_list:
984             device = device_details['device']
985             LOG.debug("Port %s added", device)
986
987             if 'port_id' in device_details:
988                 LOG.info(_LI("Port %(device)s updated. Details: %(details)s"),
989                          {'device': device, 'details': device_details})
990                 if self.prevent_arp_spoofing:
991                     port = self.br_mgr.get_tap_device_name(
992                         device_details['port_id'])
993                     arp_protect.setup_arp_spoofing_protection(port,
994                                                               device_details)
995                 # create the networking for the port
996                 network_type = device_details.get('network_type')
997                 segmentation_id = device_details.get('segmentation_id')
998                 tap_in_bridge = self.br_mgr.add_interface(
999                     device_details['network_id'], network_type,
1000                     device_details['physical_network'], segmentation_id,
1001                     device_details['port_id'], device_details['device_owner'])
1002                 # REVISIT(scheuran): Changed the way how ports admin_state_up
1003                 # is implemented.
1004                 #
1005                 # Old lb implementation:
1006                 # - admin_state_up: ensure that tap is plugged into bridge
1007                 # - admin_state_down: remove tap from bridge
1008                 # New lb implementation:
1009                 # - admin_state_up: set tap device state to up
1010                 # - admin_state_down: set tap device stae to down
1011                 #
1012                 # However both approaches could result in races with
1013                 # nova/libvirt and therefore to an invalid system state in the
1014                 # scenario, where an instance is booted with a port configured
1015                 # with admin_state_up = False:
1016                 #
1017                 # Libvirt does the following actions in exactly
1018                 # this order (see libvirt virnetdevtap.c)
1019                 #     1) Create the tap device, set its MAC and MTU
1020                 #     2) Plug the tap into the bridge
1021                 #     3) Set the tap online
1022                 #
1023                 # Old lb implementation:
1024                 #   A race could occur, if the lb agent removes the tap device
1025                 #   right after step 1). Then libvirt will add it to the bridge
1026                 #   again in step 2).
1027                 # New lb implementation:
1028                 #   The race could occur if the lb-agent sets the taps device
1029                 #   state to down right after step 2). In step 3) libvirt
1030                 #   might set it to up again.
1031                 #
1032                 # This is not an issue if an instance is booted with a port
1033                 # configured with admin_state_up = True. Libvirt would just
1034                 # set the tap device up again.
1035                 #
1036                 # This refactoring is recommended for the following reasons:
1037                 # 1) An existing race with libvirt caused by the behavior of
1038                 #    the old implementation. See Bug #1312016
1039                 # 2) The new code is much more readable
1040                 self._ensure_port_admin_state(device_details['port_id'],
1041                                               device_details['admin_state_up'])
1042                 # update plugin about port status if admin_state is up
1043                 if device_details['admin_state_up']:
1044                     if tap_in_bridge:
1045                         self.plugin_rpc.update_device_up(self.context,
1046                                                          device,
1047                                                          self.agent_id,
1048                                                          cfg.CONF.host)
1049                     else:
1050                         self.plugin_rpc.update_device_down(self.context,
1051                                                            device,
1052                                                            self.agent_id,
1053                                                            cfg.CONF.host)
1054                 self._update_network_ports(device_details['network_id'],
1055                                            device_details['port_id'],
1056                                            device_details['device'])
1057                 self.ext_manager.handle_port(self.context, device_details)
1058             else:
1059                 LOG.info(_LI("Device %s not defined on plugin"), device)
1060         return False
1061
1062     def treat_devices_removed(self, devices):
1063         resync = False
1064         self.sg_agent.remove_devices_filter(devices)
1065         for device in devices:
1066             LOG.info(_LI("Attachment %s removed"), device)
1067             details = None
1068             try:
1069                 details = self.plugin_rpc.update_device_down(self.context,
1070                                                              device,
1071                                                              self.agent_id,
1072                                                              cfg.CONF.host)
1073             except Exception:
1074                 LOG.exception(_LE("Error occurred while removing port %s"),
1075                               device)
1076                 resync = True
1077             if details and details['exists']:
1078                 LOG.info(_LI("Port %s updated."), device)
1079             else:
1080                 LOG.debug("Device %s not defined on plugin", device)
1081             port_id = self._clean_network_ports(device)
1082             self.ext_manager.delete_port(self.context,
1083                                          {'device': device,
1084                                           'port_id': port_id})
1085         if self.prevent_arp_spoofing:
1086             arp_protect.delete_arp_spoofing_protection(devices)
1087         return resync
1088
1089     def scan_devices(self, previous, sync):
1090         device_info = {}
1091
1092         # Save and reinitialize the set variable that the port_update RPC uses.
1093         # This should be thread-safe as the greenthread should not yield
1094         # between these two statements.
1095         updated_devices = self.updated_devices
1096         self.updated_devices = set()
1097
1098         current_devices = self.br_mgr.get_tap_devices()
1099         device_info['current'] = current_devices
1100
1101         if previous is None:
1102             # This is the first iteration of daemon_loop().
1103             previous = {'added': set(),
1104                         'current': set(),
1105                         'updated': set(),
1106                         'removed': set()}
1107             # clear any orphaned ARP spoofing rules (e.g. interface was
1108             # manually deleted)
1109             if self.prevent_arp_spoofing:
1110                 arp_protect.delete_unreferenced_arp_protection(current_devices)
1111
1112         if sync:
1113             # This is the first iteration, or the previous one had a problem.
1114             # Re-add all existing devices.
1115             device_info['added'] = current_devices
1116
1117             # Retry cleaning devices that may not have been cleaned properly.
1118             # And clean any that disappeared since the previous iteration.
1119             device_info['removed'] = (previous['removed'] | previous['current']
1120                                       - current_devices)
1121
1122             # Retry updating devices that may not have been updated properly.
1123             # And any that were updated since the previous iteration.
1124             # Only update devices that currently exist.
1125             device_info['updated'] = (previous['updated'] | updated_devices
1126                                       & current_devices)
1127         else:
1128             device_info['added'] = current_devices - previous['current']
1129             device_info['removed'] = previous['current'] - current_devices
1130             device_info['updated'] = updated_devices & current_devices
1131
1132         return device_info
1133
1134     def _device_info_has_changes(self, device_info):
1135         return (device_info.get('added')
1136                 or device_info.get('updated')
1137                 or device_info.get('removed'))
1138
1139     def daemon_loop(self):
1140         LOG.info(_LI("LinuxBridge Agent RPC Daemon Started!"))
1141         device_info = None
1142         sync = True
1143
1144         while True:
1145             start = time.time()
1146
1147             if self.fullsync:
1148                 sync = True
1149                 self.fullsync = False
1150
1151             if sync:
1152                 LOG.info(_LI("Agent out of sync with plugin!"))
1153
1154             device_info = self.scan_devices(previous=device_info, sync=sync)
1155             sync = False
1156
1157             if (self._device_info_has_changes(device_info)
1158                 or self.sg_agent.firewall_refresh_needed()):
1159                 LOG.debug("Agent loop found changes! %s", device_info)
1160                 try:
1161                     sync = self.process_network_devices(device_info)
1162                 except Exception:
1163                     LOG.exception(_LE("Error in agent loop. Devices info: %s"),
1164                                   device_info)
1165                     sync = True
1166
1167             # sleep till end of polling interval
1168             elapsed = (time.time() - start)
1169             if (elapsed < self.polling_interval):
1170                 time.sleep(self.polling_interval - elapsed)
1171             else:
1172                 LOG.debug("Loop iteration exceeded interval "
1173                           "(%(polling_interval)s vs. %(elapsed)s)!",
1174                           {'polling_interval': self.polling_interval,
1175                            'elapsed': elapsed})
1176
1177     def set_rpc_timeout(self, timeout):
1178         for rpc_api in (self.plugin_rpc, self.sg_plugin_rpc,
1179                         self.state_rpc):
1180             rpc_api.client.timeout = timeout
1181
1182
1183 def main():
1184     common_config.init(sys.argv[1:])
1185
1186     common_config.setup_logging()
1187     try:
1188         interface_mappings = n_utils.parse_mappings(
1189             cfg.CONF.LINUX_BRIDGE.physical_interface_mappings)
1190     except ValueError as e:
1191         LOG.error(_LE("Parsing physical_interface_mappings failed: %s. "
1192                       "Agent terminated!"), e)
1193         sys.exit(1)
1194     LOG.info(_LI("Interface mappings: %s"), interface_mappings)
1195
1196     try:
1197         bridge_mappings = n_utils.parse_mappings(
1198             cfg.CONF.LINUX_BRIDGE.bridge_mappings)
1199     except ValueError as e:
1200         LOG.error(_LE("Parsing bridge_mappings failed: %s. "
1201                       "Agent terminated!"), e)
1202         sys.exit(1)
1203     LOG.info(_LI("Bridge mappings: %s"), bridge_mappings)
1204
1205     polling_interval = cfg.CONF.AGENT.polling_interval
1206     quitting_rpc_timeout = cfg.CONF.AGENT.quitting_rpc_timeout
1207     agent = LinuxBridgeNeutronAgentRPC(bridge_mappings,
1208                                        interface_mappings,
1209                                        polling_interval,
1210                                        quitting_rpc_timeout)
1211     LOG.info(_LI("Agent initialized successfully, now running... "))
1212     launcher = service.launch(cfg.CONF, agent)
1213     launcher.wait()