2 # Copyright 2012 Cisco Systems, Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License"); you may
6 # not use this file except in compliance with the License. You may obtain
7 # a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 # License for the specific language governing permissions and limitations
18 # Performs per host Linux Bridge configuration for Neutron.
19 # Based on the structure of the OpenVSwitch agent in the
20 # Neutron OpenVSwitch Plugin.
27 from oslo_config import cfg
28 from oslo_log import log as logging
30 from oslo_service import loopingcall
31 from oslo_service import service
32 from oslo_utils import excutils
35 from neutron._i18n import _LE, _LI, _LW
36 from neutron.agent.l2.extensions import manager as ext_manager
37 from neutron.agent.linux import bridge_lib
38 from neutron.agent.linux import ip_lib
39 from neutron.agent.linux import utils
40 from neutron.agent import rpc as agent_rpc
41 from neutron.agent import securitygroups_rpc as sg_rpc
42 from neutron.common import config as common_config
43 from neutron.common import constants
44 from neutron.common import exceptions
45 from neutron.common import topics
46 from neutron.common import utils as n_utils
47 from neutron import context
48 from neutron.plugins.common import constants as p_const
49 from neutron.plugins.ml2.drivers.l2pop.rpc_manager \
50 import l2population_rpc as l2pop_rpc
51 from neutron.plugins.ml2.drivers.linuxbridge.agent import arp_protect
52 from neutron.plugins.ml2.drivers.linuxbridge.agent.common import config # noqa
53 from neutron.plugins.ml2.drivers.linuxbridge.agent.common \
54 import constants as lconst
57 LOG = logging.getLogger(__name__)
59 BRIDGE_NAME_PREFIX = "brq"
60 VXLAN_INTERFACE_PREFIX = "vxlan-"
63 class NetworkSegment(object):
64 def __init__(self, network_type, physical_network, segmentation_id):
65 self.network_type = network_type
66 self.physical_network = physical_network
67 self.segmentation_id = segmentation_id
70 class LinuxBridgeManager(object):
71 def __init__(self, bridge_mappings, interface_mappings):
72 self.bridge_mappings = bridge_mappings
73 self.interface_mappings = interface_mappings
74 self.validate_interface_mappings()
75 self.validate_bridge_mappings()
76 self.ip = ip_lib.IPWrapper()
77 # VXLAN related parameters:
78 self.local_ip = cfg.CONF.VXLAN.local_ip
79 self.vxlan_mode = lconst.VXLAN_NONE
80 if cfg.CONF.VXLAN.enable_vxlan:
81 device = self.get_local_ip_device(self.local_ip)
82 self.validate_vxlan_group_with_local_ip()
83 self.local_int = device.name
84 self.check_vxlan_support()
85 # Store network mapping to segments
88 def validate_interface_mappings(self):
89 for physnet, interface in self.interface_mappings.items():
90 if not ip_lib.device_exists(interface):
91 LOG.error(_LE("Interface %(intf)s for physical network %(net)s"
92 " does not exist. Agent terminated!"),
93 {'intf': interface, 'net': physnet})
96 def validate_bridge_mappings(self):
97 for physnet, bridge in self.bridge_mappings.items():
98 if not ip_lib.device_exists(bridge):
99 LOG.error(_LE("Bridge %(brq)s for physical network %(net)s"
100 " does not exist. Agent terminated!"),
101 {'brq': bridge, 'net': physnet})
104 def validate_vxlan_group_with_local_ip(self):
105 if not cfg.CONF.VXLAN.vxlan_group:
108 ip_addr = netaddr.IPAddress(self.local_ip)
109 # Ensure the configured group address/range is valid and multicast
110 group_net = netaddr.IPNetwork(cfg.CONF.VXLAN.vxlan_group)
111 if not group_net.is_multicast():
113 if not ip_addr.version == group_net.version:
115 except (netaddr.core.AddrFormatError, ValueError):
116 LOG.error(_LE("Invalid VXLAN Group: %(group)s, must be an address "
117 "or network (in CIDR notation) in a multicast "
118 "range of the same address family as local_ip: "
120 {'group': cfg.CONF.VXLAN.vxlan_group,
121 'ip': self.local_ip})
124 def get_local_ip_device(self, local_ip):
125 """Return the device with local_ip on the host."""
126 device = self.ip.get_device_by_ip(local_ip)
128 LOG.error(_LE("Tunneling cannot be enabled without the local_ip "
129 "bound to an interface on the host. Please "
130 "configure local_ip %s on the host interface to "
131 "be used for tunneling and restart the agent."),
136 def get_existing_bridge_name(self, physical_network):
137 if not physical_network:
139 return self.bridge_mappings.get(physical_network)
141 def get_bridge_name(self, network_id):
143 LOG.warning(_LW("Invalid Network ID, will lead to incorrect "
145 bridge_name = BRIDGE_NAME_PREFIX + network_id[0:11]
148 def get_subinterface_name(self, physical_interface, vlan_id):
150 LOG.warning(_LW("Invalid VLAN ID, will lead to incorrect "
151 "subinterface name"))
152 subinterface_name = '%s.%s' % (physical_interface, vlan_id)
153 return subinterface_name
155 def get_tap_device_name(self, interface_id):
157 LOG.warning(_LW("Invalid Interface ID, will lead to incorrect "
159 tap_device_name = constants.TAP_DEVICE_PREFIX + interface_id[0:11]
160 return tap_device_name
162 def get_vxlan_device_name(self, segmentation_id):
163 if 0 <= int(segmentation_id) <= p_const.MAX_VXLAN_VNI:
164 return VXLAN_INTERFACE_PREFIX + str(segmentation_id)
166 LOG.warning(_LW("Invalid Segmentation ID: %s, will lead to "
167 "incorrect vxlan device name"), segmentation_id)
169 def get_vxlan_group(self, segmentation_id):
170 net = netaddr.IPNetwork(cfg.CONF.VXLAN.vxlan_group)
171 # Map the segmentation ID to (one of) the group address(es)
172 return str(net.network +
173 (int(segmentation_id) & int(net.hostmask)))
175 def get_deletable_bridges(self):
176 bridge_list = bridge_lib.get_bridge_names()
177 bridges = {b for b in bridge_list if b.startswith(BRIDGE_NAME_PREFIX)}
178 bridges.difference_update(self.bridge_mappings.values())
181 def get_tap_devices_count(self, bridge_name):
182 if_list = bridge_lib.BridgeDevice(bridge_name).get_interfaces()
183 return len([interface for interface in if_list if
184 interface.startswith(constants.TAP_DEVICE_PREFIX)])
186 def ensure_vlan_bridge(self, network_id, phy_bridge_name,
187 physical_interface, vlan_id):
188 """Create a vlan and bridge unless they already exist."""
189 interface = self.ensure_vlan(physical_interface, vlan_id)
191 return self.ensure_bridge(phy_bridge_name)
193 bridge_name = self.get_bridge_name(network_id)
194 ips, gateway = self.get_interface_details(interface)
195 if self.ensure_bridge(bridge_name, interface, ips, gateway):
198 def ensure_vxlan_bridge(self, network_id, segmentation_id):
199 """Create a vxlan and bridge unless they already exist."""
200 interface = self.ensure_vxlan(segmentation_id)
202 LOG.error(_LE("Failed creating vxlan interface for "
203 "%(segmentation_id)s"),
204 {segmentation_id: segmentation_id})
206 bridge_name = self.get_bridge_name(network_id)
207 self.ensure_bridge(bridge_name, interface)
210 def get_interface_details(self, interface):
211 device = self.ip.device(interface)
212 ips = device.addr.list(scope='global')
214 # Update default gateway if necessary
215 gateway = device.route.get_gateway(scope='global')
218 def ensure_flat_bridge(self, network_id, phy_bridge_name,
220 """Create a non-vlan bridge unless it already exists."""
222 return self.ensure_bridge(phy_bridge_name)
224 bridge_name = self.get_bridge_name(network_id)
225 ips, gateway = self.get_interface_details(physical_interface)
226 if self.ensure_bridge(bridge_name, physical_interface, ips,
228 return physical_interface
230 def ensure_local_bridge(self, network_id, phy_bridge_name):
231 """Create a local bridge unless it already exists."""
233 bridge_name = phy_bridge_name
235 bridge_name = self.get_bridge_name(network_id)
236 return self.ensure_bridge(bridge_name)
238 def ensure_vlan(self, physical_interface, vlan_id):
239 """Create a vlan unless it already exists."""
240 interface = self.get_subinterface_name(physical_interface, vlan_id)
241 if not ip_lib.device_exists(interface):
242 LOG.debug("Creating subinterface %(interface)s for "
243 "VLAN %(vlan_id)s on interface "
244 "%(physical_interface)s",
245 {'interface': interface, 'vlan_id': vlan_id,
246 'physical_interface': physical_interface})
247 if utils.execute(['ip', 'link', 'add', 'link',
249 'name', interface, 'type', 'vlan', 'id',
250 vlan_id], run_as_root=True):
252 if utils.execute(['ip', 'link', 'set',
253 interface, 'up'], run_as_root=True):
255 LOG.debug("Done creating subinterface %s", interface)
258 def ensure_vxlan(self, segmentation_id):
259 """Create a vxlan unless it already exists."""
260 interface = self.get_vxlan_device_name(segmentation_id)
261 if not ip_lib.device_exists(interface):
262 LOG.debug("Creating vxlan interface %(interface)s for "
263 "VNI %(segmentation_id)s",
264 {'interface': interface,
265 'segmentation_id': segmentation_id})
266 args = {'dev': self.local_int}
267 if self.vxlan_mode == lconst.VXLAN_MCAST:
268 args['group'] = self.get_vxlan_group(segmentation_id)
269 if cfg.CONF.VXLAN.ttl:
270 args['ttl'] = cfg.CONF.VXLAN.ttl
271 if cfg.CONF.VXLAN.tos:
272 args['tos'] = cfg.CONF.VXLAN.tos
273 if cfg.CONF.VXLAN.l2_population:
276 int_vxlan = self.ip.add_vxlan(interface, segmentation_id,
279 with excutils.save_and_reraise_exception() as ctxt:
280 # perform this check after an attempt rather than before
281 # to avoid excessive lookups and a possible race condition.
282 if ip_lib.vxlan_in_use(segmentation_id):
284 LOG.error(_LE("Unable to create VXLAN interface for "
285 "VNI %s because it is in use by another "
286 "interface."), segmentation_id)
288 int_vxlan.link.set_up()
289 LOG.debug("Done creating vxlan interface %s", interface)
292 def update_interface_ip_details(self, destination, source, ips,
295 dst_device = self.ip.device(destination)
296 src_device = self.ip.device(source)
298 # Append IP's to bridge if necessary
301 dst_device.addr.add(cidr=ip['cidr'])
304 # Ensure that the gateway can be updated by changing the metric
306 if 'metric' in gateway:
307 metric = gateway['metric'] - 1
308 dst_device.route.add_gateway(gateway=gateway['gateway'],
310 src_device.route.delete_gateway(gateway=gateway['gateway'])
312 # Remove IP's from interface
315 src_device.addr.delete(cidr=ip['cidr'])
317 def _bridge_exists_and_ensure_up(self, bridge_name):
318 """Check if the bridge exists and make sure it is up."""
319 br = ip_lib.IPDevice(bridge_name)
320 br.set_log_fail_as_error(False)
322 # If the device doesn't exist this will throw a RuntimeError
328 def ensure_bridge(self, bridge_name, interface=None, ips=None,
330 """Create a bridge unless it already exists."""
331 # _bridge_exists_and_ensure_up instead of device_exists is used here
332 # because there are cases where the bridge exists but it's not UP,
334 # 1) A greenthread was executing this function and had not yet executed
335 # "ip link set bridge_name up" before eventlet switched to this
336 # thread running the same function
337 # 2) The Nova VIF driver was running concurrently and had just created
338 # the bridge, but had not yet put it UP
339 if not self._bridge_exists_and_ensure_up(bridge_name):
340 LOG.debug("Starting bridge %(bridge_name)s for subinterface "
342 {'bridge_name': bridge_name, 'interface': interface})
343 bridge_device = bridge_lib.BridgeDevice.addbr(bridge_name)
344 if bridge_device.setfd(0):
346 if bridge_device.disable_stp():
348 if bridge_device.disable_ipv6():
350 if bridge_device.link.set_up():
352 LOG.debug("Done starting bridge %(bridge_name)s for "
353 "subinterface %(interface)s",
354 {'bridge_name': bridge_name, 'interface': interface})
356 bridge_device = bridge_lib.BridgeDevice(bridge_name)
361 # Update IP info if necessary
362 self.update_interface_ip_details(bridge_name, interface, ips, gateway)
364 # Check if the interface is part of the bridge
365 if not bridge_device.owns_interface(interface):
367 # Check if the interface is not enslaved in another bridge
368 bridge = bridge_lib.BridgeDevice.get_interface_bridge(
371 bridge.delif(interface)
373 bridge_device.addif(interface)
374 except Exception as e:
375 LOG.error(_LE("Unable to add %(interface)s to %(bridge_name)s"
376 "! Exception: %(e)s"),
377 {'interface': interface, 'bridge_name': bridge_name,
382 def ensure_physical_in_bridge(self, network_id,
386 if network_type == p_const.TYPE_VXLAN:
387 if self.vxlan_mode == lconst.VXLAN_NONE:
388 LOG.error(_LE("Unable to add vxlan interface for network %s"),
391 return self.ensure_vxlan_bridge(network_id, segmentation_id)
393 # NOTE(nick-ma-z): Obtain mappings of physical bridge and interfaces
394 physical_bridge = self.get_existing_bridge_name(physical_network)
395 physical_interface = self.interface_mappings.get(physical_network)
396 if not physical_bridge and not physical_interface:
397 LOG.error(_LE("No bridge or interface mappings"
398 " for physical network %s"),
401 if network_type == p_const.TYPE_FLAT:
402 return self.ensure_flat_bridge(network_id, physical_bridge,
404 elif network_type == p_const.TYPE_VLAN:
405 return self.ensure_vlan_bridge(network_id, physical_bridge,
409 LOG.error(_LE("Unknown network_type %(network_type)s for network "
410 "%(network_id)s."), {network_type: network_type,
411 network_id: network_id})
413 def add_tap_interface(self, network_id, network_type, physical_network,
414 segmentation_id, tap_device_name, device_owner):
415 """Add tap interface.
417 If a VIF has been plugged into a network, this function will
418 add the corresponding tap device to the relevant bridge.
420 if not ip_lib.device_exists(tap_device_name):
421 LOG.debug("Tap device: %s does not exist on "
422 "this host, skipped", tap_device_name)
425 bridge_name = self.get_existing_bridge_name(physical_network)
427 bridge_name = self.get_bridge_name(network_id)
429 if network_type == p_const.TYPE_LOCAL:
430 self.ensure_local_bridge(network_id, bridge_name)
432 phy_dev_name = self.ensure_physical_in_bridge(network_id,
438 self.ensure_tap_mtu(tap_device_name, phy_dev_name)
439 # Avoid messing with plugging devices into a bridge that the agent
441 if device_owner.startswith(constants.DEVICE_OWNER_PREFIXES):
442 # Check if device needs to be added to bridge
443 if not bridge_lib.BridgeDevice.get_interface_bridge(
445 data = {'tap_device_name': tap_device_name,
446 'bridge_name': bridge_name}
447 LOG.debug("Adding device %(tap_device_name)s to bridge "
448 "%(bridge_name)s", data)
449 if bridge_lib.BridgeDevice(bridge_name).addif(tap_device_name):
452 data = {'tap_device_name': tap_device_name,
453 'device_owner': device_owner,
454 'bridge_name': bridge_name}
455 LOG.debug("Skip adding device %(tap_device_name)s to "
456 "%(bridge_name)s. It is owned by %(device_owner)s and "
457 "thus added elsewhere.", data)
460 def ensure_tap_mtu(self, tap_dev_name, phy_dev_name):
461 """Ensure the MTU on the tap is the same as the physical device."""
462 phy_dev_mtu = ip_lib.IPDevice(phy_dev_name).link.mtu
463 ip_lib.IPDevice(tap_dev_name).link.set_mtu(phy_dev_mtu)
465 def add_interface(self, network_id, network_type, physical_network,
466 segmentation_id, port_id, device_owner):
467 self.network_map[network_id] = NetworkSegment(network_type,
470 tap_device_name = self.get_tap_device_name(port_id)
471 return self.add_tap_interface(network_id, network_type,
472 physical_network, segmentation_id,
473 tap_device_name, device_owner)
475 def delete_bridge(self, bridge_name):
476 bridge_device = bridge_lib.BridgeDevice(bridge_name)
477 if bridge_device.exists():
478 physical_interfaces = set(self.interface_mappings.values())
479 interfaces_on_bridge = bridge_device.get_interfaces()
480 for interface in interfaces_on_bridge:
481 self.remove_interface(bridge_name, interface)
483 if interface.startswith(VXLAN_INTERFACE_PREFIX):
484 self.delete_interface(interface)
486 # Match the vlan/flat interface in the bridge.
487 # If the bridge has an IP, it mean that this IP was moved
488 # from the current interface, which also mean that this
489 # interface was not created by the agent.
490 ips, gateway = self.get_interface_details(bridge_name)
492 self.update_interface_ip_details(interface,
495 elif interface not in physical_interfaces:
496 self.delete_interface(interface)
498 LOG.debug("Deleting bridge %s", bridge_name)
499 if bridge_device.link.set_down():
501 if bridge_device.delbr():
503 LOG.debug("Done deleting bridge %s", bridge_name)
506 LOG.debug("Cannot delete bridge %s; it does not exist",
509 def remove_interface(self, bridge_name, interface_name):
510 bridge_device = bridge_lib.BridgeDevice(bridge_name)
511 if bridge_device.exists():
512 if not bridge_lib.is_bridged_interface(interface_name):
514 LOG.debug("Removing device %(interface_name)s from bridge "
516 {'interface_name': interface_name,
517 'bridge_name': bridge_name})
518 if bridge_device.delif(interface_name):
520 LOG.debug("Done removing device %(interface_name)s from bridge "
522 {'interface_name': interface_name,
523 'bridge_name': bridge_name})
526 LOG.debug("Cannot remove device %(interface_name)s bridge "
527 "%(bridge_name)s does not exist",
528 {'interface_name': interface_name,
529 'bridge_name': bridge_name})
532 def delete_interface(self, interface):
533 device = self.ip.device(interface)
535 LOG.debug("Deleting interface %s",
537 device.link.set_down()
539 LOG.debug("Done deleting interface %s", interface)
541 def get_tap_devices(self):
543 for device in bridge_lib.get_bridge_names():
544 if device.startswith(constants.TAP_DEVICE_PREFIX):
548 def vxlan_ucast_supported(self):
549 if not cfg.CONF.VXLAN.l2_population:
551 if not ip_lib.iproute_arg_supported(
552 ['bridge', 'fdb'], 'append'):
553 LOG.warning(_LW('Option "%(option)s" must be supported by command '
554 '"%(command)s" to enable %(mode)s mode'),
556 'command': 'bridge fdb',
557 'mode': 'VXLAN UCAST'})
561 for seg_id in moves.range(1, p_const.MAX_VXLAN_VNI + 1):
562 if (ip_lib.device_exists(self.get_vxlan_device_name(seg_id))
563 or ip_lib.vxlan_in_use(seg_id)):
565 test_iface = self.ensure_vxlan(seg_id)
568 LOG.error(_LE('No valid Segmentation ID to perform UCAST test.'))
573 cmd=['bridge', 'fdb', 'append', constants.FLOODING_ENTRY[0],
574 'dev', test_iface, 'dst', '1.1.1.1'],
575 run_as_root=True, log_fail_as_error=False)
580 self.delete_interface(test_iface)
582 def vxlan_mcast_supported(self):
583 if not cfg.CONF.VXLAN.vxlan_group:
584 LOG.warning(_LW('VXLAN muticast group(s) must be provided in '
585 'vxlan_group option to enable VXLAN MCAST mode'))
587 if not ip_lib.iproute_arg_supported(
588 ['ip', 'link', 'add', 'type', 'vxlan'],
590 LOG.warning(_LW('Option "%(option)s" must be supported by command '
591 '"%(command)s" to enable %(mode)s mode'),
593 'command': 'ip link add type vxlan',
594 'mode': 'VXLAN MCAST'})
599 def check_vxlan_support(self):
600 self.vxlan_mode = lconst.VXLAN_NONE
602 if self.vxlan_ucast_supported():
603 self.vxlan_mode = lconst.VXLAN_UCAST
604 elif self.vxlan_mcast_supported():
605 self.vxlan_mode = lconst.VXLAN_MCAST
607 raise exceptions.VxlanNetworkUnsupported()
608 LOG.debug('Using %s VXLAN mode', self.vxlan_mode)
610 def fdb_ip_entry_exists(self, mac, ip, interface):
611 entries = utils.execute(['ip', 'neigh', 'show', 'to', ip,
614 return mac in entries
616 def fdb_bridge_entry_exists(self, mac, interface, agent_ip=None):
617 entries = utils.execute(['bridge', 'fdb', 'show', 'dev', interface],
620 return mac in entries
622 return (agent_ip in entries and mac in entries)
624 def add_fdb_ip_entry(self, mac, ip, interface):
625 ip_lib.IPDevice(interface).neigh.add(ip, mac)
627 def remove_fdb_ip_entry(self, mac, ip, interface):
628 ip_lib.IPDevice(interface).neigh.delete(ip, mac)
630 def add_fdb_bridge_entry(self, mac, agent_ip, interface, operation="add"):
631 utils.execute(['bridge', 'fdb', operation, mac, 'dev', interface,
634 check_exit_code=False)
636 def remove_fdb_bridge_entry(self, mac, agent_ip, interface):
637 utils.execute(['bridge', 'fdb', 'del', mac, 'dev', interface,
640 check_exit_code=False)
642 def add_fdb_entries(self, agent_ip, ports, interface):
643 for mac, ip in ports:
644 if mac != constants.FLOODING_ENTRY[0]:
645 self.add_fdb_ip_entry(mac, ip, interface)
646 self.add_fdb_bridge_entry(mac, agent_ip, interface,
648 elif self.vxlan_mode == lconst.VXLAN_UCAST:
649 if self.fdb_bridge_entry_exists(mac, interface):
650 self.add_fdb_bridge_entry(mac, agent_ip, interface,
653 self.add_fdb_bridge_entry(mac, agent_ip, interface)
655 def remove_fdb_entries(self, agent_ip, ports, interface):
656 for mac, ip in ports:
657 if mac != constants.FLOODING_ENTRY[0]:
658 self.remove_fdb_ip_entry(mac, ip, interface)
659 self.remove_fdb_bridge_entry(mac, agent_ip, interface)
660 elif self.vxlan_mode == lconst.VXLAN_UCAST:
661 self.remove_fdb_bridge_entry(mac, agent_ip, interface)
664 class LinuxBridgeRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
665 l2pop_rpc.L2populationRpcCallBackMixin):
667 # Set RPC API version to 1.0 by default.
669 # 1.1 Support Security Group RPC
670 # 1.3 Added param devices_to_update to security_groups_provider_updated
671 # 1.4 Added support for network_update
672 target = oslo_messaging.Target(version='1.4')
674 def __init__(self, context, agent, sg_agent):
675 super(LinuxBridgeRpcCallbacks, self).__init__()
676 self.context = context
678 self.sg_agent = sg_agent
680 def network_delete(self, context, **kwargs):
681 LOG.debug("network_delete received")
682 network_id = kwargs.get('network_id')
684 # NOTE(nick-ma-z): Don't remove pre-existing user-defined bridges
685 if network_id in self.agent.br_mgr.network_map:
686 phynet = self.agent.br_mgr.network_map[network_id].physical_network
687 if phynet and phynet in self.agent.br_mgr.bridge_mappings:
688 LOG.info(_LI("Physical network %s is defined in "
689 "bridge_mappings and cannot be deleted."),
693 LOG.error(_LE("Network %s is not available."), network_id)
696 bridge_name = self.agent.br_mgr.get_bridge_name(network_id)
697 LOG.debug("Delete %s", bridge_name)
698 self.agent.br_mgr.delete_bridge(bridge_name)
700 def port_update(self, context, **kwargs):
701 port_id = kwargs['port']['id']
702 tap_name = self.agent.br_mgr.get_tap_device_name(port_id)
703 # Put the tap name in the updated_devices set.
704 # Do not store port details, as if they're used for processing
705 # notifications there is no guarantee the notifications are
706 # processed in the same order as the relevant API requests.
707 self.agent.updated_devices.add(tap_name)
708 LOG.debug("port_update RPC received for port: %s", port_id)
710 def network_update(self, context, **kwargs):
711 network_id = kwargs['network']['id']
712 LOG.debug("network_update message processed for network "
713 "%(network_id)s, with ports: %(ports)s",
714 {'network_id': network_id,
715 'ports': self.agent.network_ports[network_id]})
716 for port_data in self.agent.network_ports[network_id]:
717 self.agent.updated_devices.add(port_data['device'])
719 def fdb_add(self, context, fdb_entries):
720 LOG.debug("fdb_add received")
721 for network_id, values in fdb_entries.items():
722 segment = self.agent.br_mgr.network_map.get(network_id)
726 if segment.network_type != p_const.TYPE_VXLAN:
729 interface = self.agent.br_mgr.get_vxlan_device_name(
730 segment.segmentation_id)
732 agent_ports = values.get('ports')
733 for agent_ip, ports in agent_ports.items():
734 if agent_ip == self.agent.br_mgr.local_ip:
737 self.agent.br_mgr.add_fdb_entries(agent_ip,
741 def fdb_remove(self, context, fdb_entries):
742 LOG.debug("fdb_remove received")
743 for network_id, values in fdb_entries.items():
744 segment = self.agent.br_mgr.network_map.get(network_id)
748 if segment.network_type != p_const.TYPE_VXLAN:
751 interface = self.agent.br_mgr.get_vxlan_device_name(
752 segment.segmentation_id)
754 agent_ports = values.get('ports')
755 for agent_ip, ports in agent_ports.items():
756 if agent_ip == self.agent.br_mgr.local_ip:
759 self.agent.br_mgr.remove_fdb_entries(agent_ip,
763 def _fdb_chg_ip(self, context, fdb_entries):
764 LOG.debug("update chg_ip received")
765 for network_id, agent_ports in fdb_entries.items():
766 segment = self.agent.br_mgr.network_map.get(network_id)
770 if segment.network_type != p_const.TYPE_VXLAN:
773 interface = self.agent.br_mgr.get_vxlan_device_name(
774 segment.segmentation_id)
776 for agent_ip, state in agent_ports.items():
777 if agent_ip == self.agent.br_mgr.local_ip:
780 after = state.get('after', [])
781 for mac, ip in after:
782 self.agent.br_mgr.add_fdb_ip_entry(mac, ip, interface)
784 before = state.get('before', [])
785 for mac, ip in before:
786 self.agent.br_mgr.remove_fdb_ip_entry(mac, ip, interface)
788 def fdb_update(self, context, fdb_entries):
789 LOG.debug("fdb_update received")
790 for action, values in fdb_entries.items():
791 method = '_fdb_' + action
792 if not hasattr(self, method):
793 raise NotImplementedError()
795 getattr(self, method)(context, values)
798 class LinuxBridgeNeutronAgentRPC(service.Service):
800 def __init__(self, bridge_mappings, interface_mappings, polling_interval,
801 quitting_rpc_timeout):
804 :param bridge_mappings: dict mapping physical_networks to
806 :param interface_mappings: dict mapping physical_networks to
808 :param polling_interval: interval (secs) to poll DB.
809 :param quitting_rpc_timeout: timeout in seconds for rpc calls after
812 super(LinuxBridgeNeutronAgentRPC, self).__init__()
813 self.interface_mappings = interface_mappings
814 self.bridge_mappings = bridge_mappings
815 self.polling_interval = polling_interval
816 self.quitting_rpc_timeout = quitting_rpc_timeout
819 self.prevent_arp_spoofing = cfg.CONF.AGENT.prevent_arp_spoofing
820 self.setup_linux_bridge(self.bridge_mappings, self.interface_mappings)
822 # stores received port_updates and port_deletes for
823 # processing by the main loop
824 self.updated_devices = set()
826 # stores all configured ports on agent
827 self.network_ports = collections.defaultdict(list)
828 # flag to do a sync after revival
829 self.fullsync = False
830 self.context = context.get_admin_context_without_session()
831 self.setup_rpc(self.interface_mappings.values())
832 self.init_extension_manager(self.connection)
835 'bridge_mappings': self.bridge_mappings,
836 'interface_mappings': self.interface_mappings,
837 'extensions': self.ext_manager.names()
839 if self.br_mgr.vxlan_mode != lconst.VXLAN_NONE:
840 configurations['tunneling_ip'] = self.br_mgr.local_ip
841 configurations['tunnel_types'] = [p_const.TYPE_VXLAN]
842 configurations['l2_population'] = cfg.CONF.VXLAN.l2_population
844 'binary': 'neutron-linuxbridge-agent',
845 'host': cfg.CONF.host,
846 'topic': constants.L2_AGENT_TOPIC,
847 'configurations': configurations,
848 'agent_type': constants.AGENT_TYPE_LINUXBRIDGE,
851 report_interval = cfg.CONF.AGENT.report_interval
853 heartbeat = loopingcall.FixedIntervalLoopingCall(
855 heartbeat.start(interval=report_interval)
858 def stop(self, graceful=True):
859 LOG.info(_LI("Stopping linuxbridge agent."))
860 if graceful and self.quitting_rpc_timeout:
861 self.set_rpc_timeout(self.quitting_rpc_timeout)
862 super(LinuxBridgeNeutronAgentRPC, self).stop(graceful)
865 common_config.setup_logging()
867 def _report_state(self):
869 devices = len(self.br_mgr.get_tap_devices())
870 self.agent_state.get('configurations')['devices'] = devices
871 agent_status = self.state_rpc.report_state(self.context,
874 if agent_status == constants.AGENT_REVIVED:
875 LOG.info(_LI('Agent has just been revived. '
876 'Doing a full sync.'))
878 self.agent_state.pop('start_flag', None)
880 LOG.exception(_LE("Failed reporting state!"))
882 def setup_rpc(self, physical_interfaces):
883 if physical_interfaces:
884 mac = utils.get_interface_mac(physical_interfaces[0])
886 devices = ip_lib.IPWrapper().get_devices(True)
888 mac = utils.get_interface_mac(devices[0].name)
890 LOG.error(_LE("Unable to obtain MAC address for unique ID. "
891 "Agent terminated!"))
894 self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
895 self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
896 self.sg_agent = sg_rpc.SecurityGroupAgentRpc(
897 self.context, self.sg_plugin_rpc, defer_refresh_firewall=True)
899 self.agent_id = '%s%s' % ('lb', (mac.replace(":", "")))
900 LOG.info(_LI("RPC agent_id: %s"), self.agent_id)
902 self.topic = topics.AGENT
903 self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS)
905 # Handle updates from service
906 self.endpoints = [LinuxBridgeRpcCallbacks(self.context, self,
908 # Define the listening consumers for the agent
909 consumers = [[topics.PORT, topics.UPDATE],
910 [topics.NETWORK, topics.DELETE],
911 [topics.NETWORK, topics.UPDATE],
912 [topics.SECURITY_GROUP, topics.UPDATE]]
914 if cfg.CONF.VXLAN.l2_population:
915 consumers.append([topics.L2POPULATION, topics.UPDATE])
916 self.connection = agent_rpc.create_consumers(self.endpoints,
920 def init_extension_manager(self, connection):
921 ext_manager.register_opts(cfg.CONF)
923 ext_manager.AgentExtensionsManager(cfg.CONF))
924 self.ext_manager.initialize(
925 connection, lconst.EXTENSION_DRIVER_TYPE)
927 def setup_linux_bridge(self, bridge_mappings, interface_mappings):
928 self.br_mgr = LinuxBridgeManager(bridge_mappings, interface_mappings)
930 def _ensure_port_admin_state(self, port_id, admin_state_up):
931 LOG.debug("Setting admin_state_up to %s for port %s",
932 admin_state_up, port_id)
933 tap_name = self.br_mgr.get_tap_device_name(port_id)
935 ip_lib.IPDevice(tap_name).link.set_up()
937 ip_lib.IPDevice(tap_name).link.set_down()
939 def _clean_network_ports(self, device):
940 for netid, ports_list in self.network_ports.items():
941 for port_data in ports_list:
942 if device == port_data['device']:
943 ports_list.remove(port_data)
945 self.network_ports.pop(netid)
946 return port_data['port_id']
948 def _update_network_ports(self, network_id, port_id, device):
949 self._clean_network_ports(device)
950 self.network_ports[network_id].append({
955 def process_network_devices(self, device_info):
959 self.sg_agent.setup_port_filters(device_info.get('added'),
960 device_info.get('updated'))
961 # Updated devices are processed the same as new ones, as their
962 # admin_state_up may have changed. The set union prevents duplicating
963 # work when a device is new and updated in the same polling iteration.
964 devices_added_updated = (set(device_info.get('added'))
965 | set(device_info.get('updated')))
966 if devices_added_updated:
967 resync_a = self.treat_devices_added_updated(devices_added_updated)
969 if device_info.get('removed'):
970 resync_b = self.treat_devices_removed(device_info['removed'])
971 # If one of the above operations fails => resync with plugin
972 return (resync_a | resync_b)
974 def treat_devices_added_updated(self, devices):
976 devices_details_list = self.plugin_rpc.get_devices_details_list(
977 self.context, devices, self.agent_id)
979 LOG.exception(_LE("Unable to get port details for %s"), devices)
983 for device_details in devices_details_list:
984 device = device_details['device']
985 LOG.debug("Port %s added", device)
987 if 'port_id' in device_details:
988 LOG.info(_LI("Port %(device)s updated. Details: %(details)s"),
989 {'device': device, 'details': device_details})
990 if self.prevent_arp_spoofing:
991 port = self.br_mgr.get_tap_device_name(
992 device_details['port_id'])
993 arp_protect.setup_arp_spoofing_protection(port,
995 # create the networking for the port
996 network_type = device_details.get('network_type')
997 segmentation_id = device_details.get('segmentation_id')
998 tap_in_bridge = self.br_mgr.add_interface(
999 device_details['network_id'], network_type,
1000 device_details['physical_network'], segmentation_id,
1001 device_details['port_id'], device_details['device_owner'])
1002 # REVISIT(scheuran): Changed the way how ports admin_state_up
1005 # Old lb implementation:
1006 # - admin_state_up: ensure that tap is plugged into bridge
1007 # - admin_state_down: remove tap from bridge
1008 # New lb implementation:
1009 # - admin_state_up: set tap device state to up
1010 # - admin_state_down: set tap device stae to down
1012 # However both approaches could result in races with
1013 # nova/libvirt and therefore to an invalid system state in the
1014 # scenario, where an instance is booted with a port configured
1015 # with admin_state_up = False:
1017 # Libvirt does the following actions in exactly
1018 # this order (see libvirt virnetdevtap.c)
1019 # 1) Create the tap device, set its MAC and MTU
1020 # 2) Plug the tap into the bridge
1021 # 3) Set the tap online
1023 # Old lb implementation:
1024 # A race could occur, if the lb agent removes the tap device
1025 # right after step 1). Then libvirt will add it to the bridge
1027 # New lb implementation:
1028 # The race could occur if the lb-agent sets the taps device
1029 # state to down right after step 2). In step 3) libvirt
1030 # might set it to up again.
1032 # This is not an issue if an instance is booted with a port
1033 # configured with admin_state_up = True. Libvirt would just
1034 # set the tap device up again.
1036 # This refactoring is recommended for the following reasons:
1037 # 1) An existing race with libvirt caused by the behavior of
1038 # the old implementation. See Bug #1312016
1039 # 2) The new code is much more readable
1040 self._ensure_port_admin_state(device_details['port_id'],
1041 device_details['admin_state_up'])
1042 # update plugin about port status if admin_state is up
1043 if device_details['admin_state_up']:
1045 self.plugin_rpc.update_device_up(self.context,
1050 self.plugin_rpc.update_device_down(self.context,
1054 self._update_network_ports(device_details['network_id'],
1055 device_details['port_id'],
1056 device_details['device'])
1057 self.ext_manager.handle_port(self.context, device_details)
1059 LOG.info(_LI("Device %s not defined on plugin"), device)
1062 def treat_devices_removed(self, devices):
1064 self.sg_agent.remove_devices_filter(devices)
1065 for device in devices:
1066 LOG.info(_LI("Attachment %s removed"), device)
1069 details = self.plugin_rpc.update_device_down(self.context,
1074 LOG.exception(_LE("Error occurred while removing port %s"),
1077 if details and details['exists']:
1078 LOG.info(_LI("Port %s updated."), device)
1080 LOG.debug("Device %s not defined on plugin", device)
1081 port_id = self._clean_network_ports(device)
1082 self.ext_manager.delete_port(self.context,
1084 'port_id': port_id})
1085 if self.prevent_arp_spoofing:
1086 arp_protect.delete_arp_spoofing_protection(devices)
1089 def scan_devices(self, previous, sync):
1092 # Save and reinitialize the set variable that the port_update RPC uses.
1093 # This should be thread-safe as the greenthread should not yield
1094 # between these two statements.
1095 updated_devices = self.updated_devices
1096 self.updated_devices = set()
1098 current_devices = self.br_mgr.get_tap_devices()
1099 device_info['current'] = current_devices
1101 if previous is None:
1102 # This is the first iteration of daemon_loop().
1103 previous = {'added': set(),
1107 # clear any orphaned ARP spoofing rules (e.g. interface was
1109 if self.prevent_arp_spoofing:
1110 arp_protect.delete_unreferenced_arp_protection(current_devices)
1113 # This is the first iteration, or the previous one had a problem.
1114 # Re-add all existing devices.
1115 device_info['added'] = current_devices
1117 # Retry cleaning devices that may not have been cleaned properly.
1118 # And clean any that disappeared since the previous iteration.
1119 device_info['removed'] = (previous['removed'] | previous['current']
1122 # Retry updating devices that may not have been updated properly.
1123 # And any that were updated since the previous iteration.
1124 # Only update devices that currently exist.
1125 device_info['updated'] = (previous['updated'] | updated_devices
1128 device_info['added'] = current_devices - previous['current']
1129 device_info['removed'] = previous['current'] - current_devices
1130 device_info['updated'] = updated_devices & current_devices
1134 def _device_info_has_changes(self, device_info):
1135 return (device_info.get('added')
1136 or device_info.get('updated')
1137 or device_info.get('removed'))
1139 def daemon_loop(self):
1140 LOG.info(_LI("LinuxBridge Agent RPC Daemon Started!"))
1149 self.fullsync = False
1152 LOG.info(_LI("Agent out of sync with plugin!"))
1154 device_info = self.scan_devices(previous=device_info, sync=sync)
1157 if (self._device_info_has_changes(device_info)
1158 or self.sg_agent.firewall_refresh_needed()):
1159 LOG.debug("Agent loop found changes! %s", device_info)
1161 sync = self.process_network_devices(device_info)
1163 LOG.exception(_LE("Error in agent loop. Devices info: %s"),
1167 # sleep till end of polling interval
1168 elapsed = (time.time() - start)
1169 if (elapsed < self.polling_interval):
1170 time.sleep(self.polling_interval - elapsed)
1172 LOG.debug("Loop iteration exceeded interval "
1173 "(%(polling_interval)s vs. %(elapsed)s)!",
1174 {'polling_interval': self.polling_interval,
1175 'elapsed': elapsed})
1177 def set_rpc_timeout(self, timeout):
1178 for rpc_api in (self.plugin_rpc, self.sg_plugin_rpc,
1180 rpc_api.client.timeout = timeout
1184 common_config.init(sys.argv[1:])
1186 common_config.setup_logging()
1188 interface_mappings = n_utils.parse_mappings(
1189 cfg.CONF.LINUX_BRIDGE.physical_interface_mappings)
1190 except ValueError as e:
1191 LOG.error(_LE("Parsing physical_interface_mappings failed: %s. "
1192 "Agent terminated!"), e)
1194 LOG.info(_LI("Interface mappings: %s"), interface_mappings)
1197 bridge_mappings = n_utils.parse_mappings(
1198 cfg.CONF.LINUX_BRIDGE.bridge_mappings)
1199 except ValueError as e:
1200 LOG.error(_LE("Parsing bridge_mappings failed: %s. "
1201 "Agent terminated!"), e)
1203 LOG.info(_LI("Bridge mappings: %s"), bridge_mappings)
1205 polling_interval = cfg.CONF.AGENT.polling_interval
1206 quitting_rpc_timeout = cfg.CONF.AGENT.quitting_rpc_timeout
1207 agent = LinuxBridgeNeutronAgentRPC(bridge_mappings,
1210 quitting_rpc_timeout)
1211 LOG.info(_LI("Agent initialized successfully, now running... "))
1212 launcher = service.launch(cfg.CONF, agent)