1 # Copyright (c) 2013 OpenStack Foundation
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
16 from eventlet import greenthread
17 from oslo_config import cfg
18 from oslo_db import api as oslo_db_api
19 from oslo_db import exception as os_db_exception
20 from oslo_log import helpers as log_helpers
21 from oslo_log import log
22 from oslo_serialization import jsonutils
23 from oslo_utils import excutils
24 from oslo_utils import importutils
25 from oslo_utils import uuidutils
26 from sqlalchemy import exc as sql_exc
27 from sqlalchemy.orm import exc as sa_exc
29 from neutron._i18n import _, _LE, _LI, _LW
30 from neutron.agent import securitygroups_rpc as sg_rpc
31 from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
32 from neutron.api.rpc.handlers import dhcp_rpc
33 from neutron.api.rpc.handlers import dvr_rpc
34 from neutron.api.rpc.handlers import metadata_rpc
35 from neutron.api.rpc.handlers import resources_rpc
36 from neutron.api.rpc.handlers import securitygroups_rpc
37 from neutron.api.v2 import attributes
38 from neutron.callbacks import events
39 from neutron.callbacks import exceptions
40 from neutron.callbacks import registry
41 from neutron.callbacks import resources
42 from neutron.common import constants as const
43 from neutron.common import exceptions as exc
44 from neutron.common import ipv6_utils
45 from neutron.common import rpc as n_rpc
46 from neutron.common import topics
47 from neutron.common import utils
48 from neutron.db import address_scope_db
49 from neutron.db import agents_db
50 from neutron.db import agentschedulers_db
51 from neutron.db import allowedaddresspairs_db as addr_pair_db
52 from neutron.db import api as db_api
53 from neutron.db import db_base_plugin_v2
54 from neutron.db import dvr_mac_db
55 from neutron.db import external_net_db
56 from neutron.db import extradhcpopt_db
57 from neutron.db import models_v2
58 from neutron.db import netmtu_db
59 from neutron.db.quota import driver # noqa
60 from neutron.db import securitygroups_db
61 from neutron.db import securitygroups_rpc_base as sg_db_rpc
62 from neutron.db import vlantransparent_db
63 from neutron.extensions import allowedaddresspairs as addr_pair
64 from neutron.extensions import availability_zone as az_ext
65 from neutron.extensions import extra_dhcp_opt as edo_ext
66 from neutron.extensions import portbindings
67 from neutron.extensions import portsecurity as psec
68 from neutron.extensions import providernet as provider
69 from neutron.extensions import vlantransparent
70 from neutron import manager
71 from neutron.plugins.common import constants as service_constants
72 from neutron.plugins.ml2.common import exceptions as ml2_exc
73 from neutron.plugins.ml2 import config # noqa
74 from neutron.plugins.ml2 import db
75 from neutron.plugins.ml2 import driver_api as api
76 from neutron.plugins.ml2 import driver_context
77 from neutron.plugins.ml2.extensions import qos as qos_ext
78 from neutron.plugins.ml2 import managers
79 from neutron.plugins.ml2 import models
80 from neutron.plugins.ml2 import rpc
81 from neutron.quota import resource_registry
82 from neutron.services.qos import qos_consts
84 LOG = log.getLogger(__name__)
89 SERVICE_PLUGINS_REQUIRED_DRIVERS = {
90 'qos': [qos_ext.QOS_EXT_DRIVER_ALIAS]
94 class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
95 dvr_mac_db.DVRDbMixin,
96 external_net_db.External_net_db_mixin,
97 sg_db_rpc.SecurityGroupServerRpcMixin,
98 agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
99 addr_pair_db.AllowedAddressPairsMixin,
100 vlantransparent_db.Vlantransparent_db_mixin,
101 extradhcpopt_db.ExtraDhcpOptMixin,
102 netmtu_db.Netmtu_db_mixin,
103 address_scope_db.AddressScopeDbMixin):
105 """Implement the Neutron L2 abstractions using modules.
107 Ml2Plugin is a Neutron plugin based on separately extensible sets
108 of network types and mechanisms for connecting to networks of
109 those types. The network types and mechanisms are implemented as
110 drivers loaded via Python entry points. Networks can be made up of
111 multiple segments (not yet fully implemented).
114 # This attribute specifies whether the plugin supports or not
115 # bulk/pagination/sorting operations. Name mangling is used in
116 # order to ensure it is qualified by class
117 __native_bulk_support = True
118 __native_pagination_support = True
119 __native_sorting_support = True
121 # List of supported extensions
122 _supported_extension_aliases = ["provider", "external-net", "binding",
123 "quotas", "security-group", "agent",
124 "dhcp_agent_scheduler",
125 "multi-provider", "allowed-address-pairs",
126 "extra_dhcp_opt", "subnet_allocation",
127 "net-mtu", "vlan-transparent",
128 "address-scope", "dns-integration",
130 "network_availability_zone"]
133 def supported_extension_aliases(self):
134 if not hasattr(self, '_aliases'):
135 aliases = self._supported_extension_aliases[:]
136 aliases += self.extension_manager.extension_aliases()
137 sg_rpc.disable_security_group_extension_by_config(aliases)
138 vlantransparent.disable_extension_by_config(aliases)
139 self._aliases = aliases
142 @resource_registry.tracked_resources(
143 network=models_v2.Network,
145 subnet=models_v2.Subnet,
146 subnetpool=models_v2.SubnetPool,
147 security_group=securitygroups_db.SecurityGroup,
148 security_group_rule=securitygroups_db.SecurityGroupRule)
150 # First load drivers, then initialize DB, then initialize drivers
151 self.type_manager = managers.TypeManager()
152 self.extension_manager = managers.ExtensionManager()
153 self.mechanism_manager = managers.MechanismManager()
154 super(Ml2Plugin, self).__init__()
155 self.type_manager.initialize()
156 self.extension_manager.initialize()
157 self.mechanism_manager.initialize()
159 self._start_rpc_notifiers()
160 self.add_agent_status_check(self.agent_health_check)
161 self._verify_service_plugins_requirements()
162 LOG.info(_LI("Modular L2 Plugin initialization complete"))
164 def _setup_rpc(self):
165 """Initialize components to support agent communication."""
167 rpc.RpcCallbacks(self.notifier, self.type_manager),
168 securitygroups_rpc.SecurityGroupServerRpcCallback(),
169 dvr_rpc.DVRServerRpcCallback(),
170 dhcp_rpc.DhcpRpcCallback(),
171 agents_db.AgentExtRpcCallback(),
172 metadata_rpc.MetadataRpcCallback(),
173 resources_rpc.ResourcesPullRpcCallback()
176 def _setup_dhcp(self):
177 """Initialize components to support DHCP."""
178 self.network_scheduler = importutils.import_object(
179 cfg.CONF.network_scheduler_driver
181 self.start_periodic_dhcp_agent_status_check()
183 def _verify_service_plugins_requirements(self):
184 for service_plugin in cfg.CONF.service_plugins:
185 extension_drivers = SERVICE_PLUGINS_REQUIRED_DRIVERS.get(
188 for extension_driver in extension_drivers:
189 if extension_driver not in self.extension_manager.names():
190 raise ml2_exc.ExtensionDriverNotFound(
191 driver=extension_driver, service_plugin=service_plugin
195 def supported_qos_rule_types(self):
196 return self.mechanism_manager.supported_qos_rule_types
198 @log_helpers.log_method_call
199 def _start_rpc_notifiers(self):
200 """Initialize RPC notifiers for agents."""
201 self.notifier = rpc.AgentNotifierApi(topics.AGENT)
202 self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
203 dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
206 @log_helpers.log_method_call
207 def start_rpc_listeners(self):
208 """Start the RPC loop to let the plugin communicate with agents."""
210 self.topic = topics.PLUGIN
211 self.conn = n_rpc.create_connection()
212 self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
213 # process state reports despite dedicated rpc workers
214 self.conn.create_consumer(topics.REPORTS,
215 [agents_db.AgentExtRpcCallback()],
217 return self.conn.consume_in_threads()
219 def start_rpc_state_reports_listener(self):
220 self.conn_reports = n_rpc.create_connection(new=True)
221 self.conn_reports.create_consumer(topics.REPORTS,
222 [agents_db.AgentExtRpcCallback()],
224 return self.conn_reports.consume_in_threads()
226 def _filter_nets_provider(self, context, networks, filters):
228 for network in networks
229 if self.type_manager.network_matches_filters(network, filters)
232 def _check_mac_update_allowed(self, orig_port, port, binding):
233 unplugged_types = (portbindings.VIF_TYPE_BINDING_FAILED,
234 portbindings.VIF_TYPE_UNBOUND)
235 new_mac = port.get('mac_address')
236 mac_change = (new_mac is not None and
237 orig_port['mac_address'] != new_mac)
238 if (mac_change and binding.vif_type not in unplugged_types):
239 raise exc.PortBound(port_id=orig_port['id'],
240 vif_type=binding.vif_type,
241 old_mac=orig_port['mac_address'],
242 new_mac=port['mac_address'])
245 def _process_port_binding(self, mech_context, attrs):
246 session = mech_context._plugin_context.session
247 binding = mech_context._binding
248 port = mech_context.current
252 host = attributes.ATTR_NOT_SPECIFIED
253 if attrs and portbindings.HOST_ID in attrs:
254 host = attrs.get(portbindings.HOST_ID) or ''
256 original_host = binding.host
257 if (attributes.is_attr_set(host) and
258 original_host != host):
262 vnic_type = attrs and attrs.get(portbindings.VNIC_TYPE)
263 if (attributes.is_attr_set(vnic_type) and
264 binding.vnic_type != vnic_type):
265 binding.vnic_type = vnic_type
268 # treat None as clear of profile.
270 if attrs and portbindings.PROFILE in attrs:
271 profile = attrs.get(portbindings.PROFILE) or {}
273 if profile not in (None, attributes.ATTR_NOT_SPECIFIED,
274 self._get_profile(binding)):
275 binding.profile = jsonutils.dumps(profile)
276 if len(binding.profile) > models.BINDING_PROFILE_LEN:
277 msg = _("binding:profile value too large")
278 raise exc.InvalidInput(error_message=msg)
281 # Unbind the port if needed.
283 binding.vif_type = portbindings.VIF_TYPE_UNBOUND
284 binding.vif_details = ''
285 db.clear_binding_levels(session, port_id, original_host)
286 mech_context._clear_binding_levels()
288 if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
289 binding.vif_type = portbindings.VIF_TYPE_UNBOUND
290 binding.vif_details = ''
291 db.clear_binding_levels(session, port_id, original_host)
292 mech_context._clear_binding_levels()
295 self._update_port_dict_binding(port, binding)
298 def _bind_port_if_needed(self, context, allow_notify=False,
300 # Binding limit does not need to be tunable because no
301 # more than a couple of attempts should ever be required in
303 for count in range(1, MAX_BIND_TRIES + 1):
305 # multiple attempts shouldn't happen very often so we log each
306 # attempt after the 1st.
307 greenthread.sleep(0) # yield
308 LOG.info(_LI("Attempt %(count)s to bind port %(port)s"),
309 {'count': count, 'port': context.current['id']})
310 context, need_notify, try_again = self._attempt_binding(
311 context, need_notify)
313 if allow_notify and need_notify:
314 self._notify_port_updated(context)
317 LOG.error(_LE("Failed to commit binding results for %(port)s "
318 "after %(max)s tries"),
319 {'port': context.current['id'], 'max': MAX_BIND_TRIES})
322 def _attempt_binding(self, context, need_notify):
323 # Since the mechanism driver bind_port() calls must be made
324 # outside a DB transaction locking the port state, it is
325 # possible (but unlikely) that the port's state could change
326 # concurrently while these calls are being made. If another
327 # thread or process succeeds in binding the port before this
328 # thread commits its results, the already committed results are
329 # used. If attributes such as binding:host_id,
330 # binding:profile, or binding:vnic_type are updated
331 # concurrently, the try_again flag is returned to indicate that
332 # the commit was unsuccessful.
333 plugin_context = context._plugin_context
334 port_id = context.current['id']
335 binding = context._binding
337 # First, determine whether it is necessary and possible to
339 if (binding.vif_type != portbindings.VIF_TYPE_UNBOUND
340 or not binding.host):
341 # We either don't need to bind the port or can't
342 return context, need_notify, try_again
344 # The port isn't already bound and the necessary
345 # information is available, so attempt to bind the port.
346 bind_context = self._bind_port(context)
347 # Now try to commit result of attempting to bind the port.
348 new_context, did_commit = self._commit_port_binding(
349 plugin_context, port_id, binding, bind_context)
351 # The port has been deleted concurrently, so just
352 # return the unbound result from the initial
353 # transaction that completed before the deletion.
354 LOG.debug("Port %s has been deleted concurrently",
357 return context, need_notify, try_again
358 # Need to notify if we succeed and our results were
360 if did_commit and (new_context._binding.vif_type !=
361 portbindings.VIF_TYPE_BINDING_FAILED):
363 return new_context, need_notify, try_again
365 return new_context, need_notify, try_again
367 def _bind_port(self, orig_context):
368 # Construct a new PortContext from the one from the previous
370 port = orig_context.current
371 orig_binding = orig_context._binding
372 new_binding = models.PortBinding(
373 host=orig_binding.host,
374 vnic_type=orig_binding.vnic_type,
375 profile=orig_binding.profile,
376 vif_type=portbindings.VIF_TYPE_UNBOUND,
379 self._update_port_dict_binding(port, new_binding)
380 new_context = driver_context.PortContext(
381 self, orig_context._plugin_context, port,
382 orig_context.network.current, new_binding, None)
384 # Attempt to bind the port and return the context with the
386 self.mechanism_manager.bind_port(new_context)
389 def _commit_port_binding(self, plugin_context, port_id, orig_binding,
391 session = plugin_context.session
392 new_binding = new_context._binding
394 # After we've attempted to bind the port, we begin a
395 # transaction, get the current port state, and decide whether
396 # to commit the binding results.
397 with session.begin(subtransactions=True):
398 # Get the current port state and build a new PortContext
399 # reflecting this state as original state for subsequent
400 # mechanism driver update_port_*commit() calls.
401 port_db, cur_binding = db.get_locked_port_and_binding(session,
404 # The port has been deleted concurrently.
406 oport = self._make_port_dict(port_db)
407 port = self._make_port_dict(port_db)
408 network = new_context.network.current
409 if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
410 # REVISIT(rkukura): The PortBinding instance from the
411 # ml2_port_bindings table, returned as cur_binding
412 # from db.get_locked_port_and_binding() above, is
413 # currently not used for DVR distributed ports, and is
414 # replaced here with the DVRPortBinding instance from
415 # the ml2_dvr_port_bindings table specific to the host
416 # on which the distributed port is being bound. It
417 # would be possible to optimize this code to avoid
418 # fetching the PortBinding instance in the DVR case,
419 # and even to avoid creating the unused entry in the
420 # ml2_port_bindings table. But the upcoming resolution
421 # for bug 1367391 will eliminate the
422 # ml2_dvr_port_bindings table, use the
423 # ml2_port_bindings table to store non-host-specific
424 # fields for both distributed and non-distributed
425 # ports, and introduce a new ml2_port_binding_hosts
426 # table for the fields that need to be host-specific
427 # in the distributed case. Since the PortBinding
428 # instance will then be needed, it does not make sense
429 # to optimize this code to avoid fetching it.
430 cur_binding = db.get_dvr_port_binding_by_host(
431 session, port_id, orig_binding.host)
432 cur_context = driver_context.PortContext(
433 self, plugin_context, port, network, cur_binding, None,
436 # Commit our binding results only if port has not been
437 # successfully bound concurrently by another thread or
438 # process and no binding inputs have been changed.
439 commit = ((cur_binding.vif_type in
440 [portbindings.VIF_TYPE_UNBOUND,
441 portbindings.VIF_TYPE_BINDING_FAILED]) and
442 orig_binding.host == cur_binding.host and
443 orig_binding.vnic_type == cur_binding.vnic_type and
444 orig_binding.profile == cur_binding.profile)
447 # Update the port's binding state with our binding
449 cur_binding.vif_type = new_binding.vif_type
450 cur_binding.vif_details = new_binding.vif_details
451 db.clear_binding_levels(session, port_id, cur_binding.host)
452 db.set_binding_levels(session, new_context._binding_levels)
453 cur_context._binding_levels = new_context._binding_levels
455 # Update PortContext's port dictionary to reflect the
456 # updated binding state.
457 self._update_port_dict_binding(port, cur_binding)
459 # Update the port status if requested by the bound driver.
460 if (new_context._binding_levels and
461 new_context._new_port_status):
462 port_db.status = new_context._new_port_status
463 port['status'] = new_context._new_port_status
465 # Call the mechanism driver precommit methods, commit
466 # the results, and call the postcommit methods.
467 self.mechanism_manager.update_port_precommit(cur_context)
469 self.mechanism_manager.update_port_postcommit(cur_context)
471 # Continue, using the port state as of the transaction that
472 # just finished, whether that transaction committed new
473 # results or discovered concurrent port state changes.
474 return (cur_context, commit)
476 def _update_port_dict_binding(self, port, binding):
477 port[portbindings.VNIC_TYPE] = binding.vnic_type
478 port[portbindings.PROFILE] = self._get_profile(binding)
479 if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
480 port[portbindings.HOST_ID] = ''
481 port[portbindings.VIF_TYPE] = portbindings.VIF_TYPE_DISTRIBUTED
482 port[portbindings.VIF_DETAILS] = {}
484 port[portbindings.HOST_ID] = binding.host
485 port[portbindings.VIF_TYPE] = binding.vif_type
486 port[portbindings.VIF_DETAILS] = self._get_vif_details(binding)
488 def _get_vif_details(self, binding):
489 if binding.vif_details:
491 return jsonutils.loads(binding.vif_details)
493 LOG.error(_LE("Serialized vif_details DB value '%(value)s' "
494 "for port %(port)s is invalid"),
495 {'value': binding.vif_details,
496 'port': binding.port_id})
499 def _get_profile(self, binding):
502 return jsonutils.loads(binding.profile)
504 LOG.error(_LE("Serialized profile DB value '%(value)s' for "
505 "port %(port)s is invalid"),
506 {'value': binding.profile,
507 'port': binding.port_id})
510 def _ml2_extend_port_dict_binding(self, port_res, port_db):
511 # None when called during unit tests for other plugins.
512 if port_db.port_binding:
513 self._update_port_dict_binding(port_res, port_db.port_binding)
515 db_base_plugin_v2.NeutronDbPluginV2.register_dict_extend_funcs(
516 attributes.PORTS, ['_ml2_extend_port_dict_binding'])
518 # Register extend dict methods for network and port resources.
519 # Each mechanism driver that supports extend attribute for the resources
520 # can add those attribute to the result.
521 db_base_plugin_v2.NeutronDbPluginV2.register_dict_extend_funcs(
522 attributes.NETWORKS, ['_ml2_md_extend_network_dict'])
523 db_base_plugin_v2.NeutronDbPluginV2.register_dict_extend_funcs(
524 attributes.PORTS, ['_ml2_md_extend_port_dict'])
525 db_base_plugin_v2.NeutronDbPluginV2.register_dict_extend_funcs(
526 attributes.SUBNETS, ['_ml2_md_extend_subnet_dict'])
528 def _ml2_md_extend_network_dict(self, result, netdb):
529 session = db_api.get_session()
530 with session.begin(subtransactions=True):
531 self.extension_manager.extend_network_dict(session, netdb, result)
533 def _ml2_md_extend_port_dict(self, result, portdb):
534 session = db_api.get_session()
535 with session.begin(subtransactions=True):
536 self.extension_manager.extend_port_dict(session, portdb, result)
538 def _ml2_md_extend_subnet_dict(self, result, subnetdb):
539 session = db_api.get_session()
540 with session.begin(subtransactions=True):
541 self.extension_manager.extend_subnet_dict(
542 session, subnetdb, result)
544 # Note - The following hook methods have "ml2" in their names so
545 # that they are not called twice during unit tests due to global
546 # registration of hooks in portbindings_db.py used by other
549 def _ml2_port_model_hook(self, context, original_model, query):
550 query = query.outerjoin(models.PortBinding,
551 (original_model.id ==
552 models.PortBinding.port_id))
555 def _ml2_port_result_filter_hook(self, query, filters):
556 values = filters and filters.get(portbindings.HOST_ID, [])
559 return query.filter(models.PortBinding.host.in_(values))
561 db_base_plugin_v2.NeutronDbPluginV2.register_model_query_hook(
564 '_ml2_port_model_hook',
566 '_ml2_port_result_filter_hook')
568 def _notify_port_updated(self, mech_context):
569 port = mech_context.current
570 segment = mech_context.bottom_bound_segment
572 # REVISIT(rkukura): This should notify agent to unplug port
573 network = mech_context.network.current
574 LOG.warning(_LW("In _notify_port_updated(), no bound segment for "
575 "port %(port_id)s on network %(network_id)s"),
576 {'port_id': port['id'],
577 'network_id': network['id']})
579 self.notifier.port_update(mech_context._plugin_context, port,
580 segment[api.NETWORK_TYPE],
581 segment[api.SEGMENTATION_ID],
582 segment[api.PHYSICAL_NETWORK])
584 def _delete_objects(self, context, resource, objects):
585 delete_op = getattr(self, 'delete_%s' % resource)
588 delete_op(context, obj['result']['id'])
590 LOG.exception(_LE("Could not find %s to delete."),
593 LOG.exception(_LE("Could not delete %(res)s %(id)s."),
595 'id': obj['result']['id']})
597 def _create_bulk_ml2(self, resource, context, request_items):
599 collection = "%ss" % resource
600 items = request_items[collection]
602 with context.session.begin(subtransactions=True):
603 obj_creator = getattr(self, '_create_%s_db' % resource)
605 attrs = item[resource]
606 result, mech_context = obj_creator(context, item)
607 objects.append({'mech_context': mech_context,
609 'attributes': attrs})
612 with excutils.save_and_reraise_exception():
613 LOG.exception(_LE("An exception occurred while creating "
614 "the %(resource)s:%(item)s"),
615 {'resource': resource, 'item': item})
618 postcommit_op = getattr(self.mechanism_manager,
619 'create_%s_postcommit' % resource)
621 postcommit_op(obj['mech_context'])
623 except ml2_exc.MechanismDriverError:
624 with excutils.save_and_reraise_exception():
625 resource_ids = [res['result']['id'] for res in objects]
626 LOG.exception(_LE("mechanism_manager.create_%(res)s"
627 "_postcommit failed for %(res)s: "
628 "'%(failed_id)s'. Deleting "
629 "%(res)ss %(resource_ids)s"),
631 'failed_id': obj['result']['id'],
632 'resource_ids': ', '.join(resource_ids)})
633 self._delete_objects(context, resource, objects)
635 def _create_network_db(self, context, network):
636 net_data = network[attributes.NETWORK]
637 tenant_id = net_data['tenant_id']
638 session = context.session
639 with session.begin(subtransactions=True):
640 self._ensure_default_security_group(context, tenant_id)
641 result = super(Ml2Plugin, self).create_network(context, network)
642 self.extension_manager.process_create_network(context, net_data,
644 self._process_l3_create(context, result, net_data)
645 net_data['id'] = result['id']
646 self.type_manager.create_network_segments(context, net_data,
648 self.type_manager.extend_network_dict_provider(context, result)
649 mech_context = driver_context.NetworkContext(self, context,
651 self.mechanism_manager.create_network_precommit(mech_context)
653 if net_data.get(api.MTU, 0) > 0:
654 res = super(Ml2Plugin, self).update_network(context,
655 result['id'], {'network': {api.MTU: net_data[api.MTU]}})
656 result[api.MTU] = res.get(api.MTU, 0)
658 if az_ext.AZ_HINTS in net_data:
659 self.validate_availability_zones(context, 'network',
660 net_data[az_ext.AZ_HINTS])
661 az_hints = az_ext.convert_az_list_to_string(
662 net_data[az_ext.AZ_HINTS])
663 res = super(Ml2Plugin, self).update_network(context,
664 result['id'], {'network': {az_ext.AZ_HINTS: az_hints}})
665 result[az_ext.AZ_HINTS] = res[az_ext.AZ_HINTS]
667 # Update the transparent vlan if configured
668 if utils.is_extension_supported(self, 'vlan-transparent'):
669 vlt = vlantransparent.get_vlan_transparent(net_data)
670 super(Ml2Plugin, self).update_network(context,
671 result['id'], {'network': {'vlan_transparent': vlt}})
672 result['vlan_transparent'] = vlt
674 return result, mech_context
676 def create_network(self, context, network):
677 result, mech_context = self._create_network_db(context, network)
679 self.mechanism_manager.create_network_postcommit(mech_context)
680 except ml2_exc.MechanismDriverError:
681 with excutils.save_and_reraise_exception():
682 LOG.error(_LE("mechanism_manager.create_network_postcommit "
683 "failed, deleting network '%s'"), result['id'])
684 self.delete_network(context, result['id'])
688 def create_network_bulk(self, context, networks):
689 objects = self._create_bulk_ml2(attributes.NETWORK, context, networks)
690 return [obj['result'] for obj in objects]
692 def update_network(self, context, id, network):
693 net_data = network[attributes.NETWORK]
694 provider._raise_if_updates_provider_attributes(net_data)
696 session = context.session
697 with session.begin(subtransactions=True):
698 original_network = super(Ml2Plugin, self).get_network(context, id)
699 updated_network = super(Ml2Plugin, self).update_network(context,
702 self.extension_manager.process_update_network(context, net_data,
704 self._process_l3_update(context, updated_network, net_data)
705 self.type_manager.extend_network_dict_provider(context,
708 # TODO(QoS): Move out to the extension framework somehow.
709 need_network_update_notify = (
710 qos_consts.QOS_POLICY_ID in net_data and
711 original_network[qos_consts.QOS_POLICY_ID] !=
712 updated_network[qos_consts.QOS_POLICY_ID])
714 mech_context = driver_context.NetworkContext(
715 self, context, updated_network,
716 original_network=original_network)
717 self.mechanism_manager.update_network_precommit(mech_context)
719 # TODO(apech) - handle errors raised by update_network, potentially
720 # by re-calling update_network with the previous attributes. For
721 # now the error is propogated to the caller, which is expected to
722 # either undo/retry the operation or delete the resource.
723 self.mechanism_manager.update_network_postcommit(mech_context)
724 if need_network_update_notify:
725 self.notifier.network_update(context, updated_network)
726 return updated_network
728 def get_network(self, context, id, fields=None):
729 session = context.session
730 with session.begin(subtransactions=True):
731 result = super(Ml2Plugin, self).get_network(context, id, None)
732 self.type_manager.extend_network_dict_provider(context, result)
734 return self._fields(result, fields)
736 def get_networks(self, context, filters=None, fields=None,
737 sorts=None, limit=None, marker=None, page_reverse=False):
738 session = context.session
739 with session.begin(subtransactions=True):
740 nets = super(Ml2Plugin,
741 self).get_networks(context, filters, None, sorts,
742 limit, marker, page_reverse)
743 self.type_manager.extend_networks_dict_provider(context, nets)
745 nets = self._filter_nets_provider(context, nets, filters)
747 return [self._fields(net, fields) for net in nets]
749 def _delete_ports(self, context, port_ids):
750 for port_id in port_ids:
752 self.delete_port(context, port_id)
753 except (exc.PortNotFound, sa_exc.ObjectDeletedError):
754 # concurrent port deletion can be performed by
755 # release_dhcp_port caused by concurrent subnet_delete
756 LOG.info(_LI("Port %s was deleted concurrently"), port_id)
758 with excutils.save_and_reraise_exception():
759 LOG.exception(_LE("Exception auto-deleting port %s"),
762 def _delete_subnets(self, context, subnet_ids):
763 for subnet_id in subnet_ids:
765 self.delete_subnet(context, subnet_id)
766 except (exc.SubnetNotFound, sa_exc.ObjectDeletedError):
767 LOG.info(_LI("Subnet %s was deleted concurrently"),
770 with excutils.save_and_reraise_exception():
771 LOG.exception(_LE("Exception auto-deleting subnet %s"),
774 def delete_network(self, context, id):
775 # REVISIT(rkukura) The super(Ml2Plugin, self).delete_network()
776 # function is not used because it auto-deletes ports and
777 # subnets from the DB without invoking the derived class's
778 # delete_port() or delete_subnet(), preventing mechanism
779 # drivers from being called. This approach should be revisited
780 # when the API layer is reworked during icehouse.
782 LOG.debug("Deleting network %s", id)
783 session = context.session
786 # REVISIT: Serialize this operation with a semaphore
787 # to prevent deadlock waiting to acquire a DB lock
788 # held by another thread in the same process, leading
789 # to 'lock wait timeout' errors.
791 # Process L3 first, since, depending on the L3 plugin, it may
792 # involve sending RPC notifications, and/or calling delete_port
794 # Additionally, a rollback may not be enough to undo the
795 # deletion of a floating IP with certain L3 backends.
796 self._process_l3_delete(context, id)
797 # Using query().with_lockmode isn't necessary. Foreign-key
798 # constraints prevent deletion if concurrent creation happens.
799 with session.begin(subtransactions=True):
800 # Get ports to auto-delete.
801 ports = (session.query(models_v2.Port).
802 enable_eagerloads(False).
803 filter_by(network_id=id).all())
804 LOG.debug("Ports to auto-delete: %s", ports)
805 only_auto_del = all(p.device_owner
806 in db_base_plugin_v2.
807 AUTO_DELETE_PORT_OWNERS
809 if not only_auto_del:
810 LOG.debug("Tenant-owned ports exist")
811 raise exc.NetworkInUse(net_id=id)
813 # Get subnets to auto-delete.
814 subnets = (session.query(models_v2.Subnet).
815 enable_eagerloads(False).
816 filter_by(network_id=id).all())
817 LOG.debug("Subnets to auto-delete: %s", subnets)
819 if not (ports or subnets):
820 network = self.get_network(context, id)
821 mech_context = driver_context.NetworkContext(self,
824 self.mechanism_manager.delete_network_precommit(
827 self.type_manager.release_network_segments(session, id)
828 record = self._get_network(context, id)
829 LOG.debug("Deleting network record %s", record)
830 session.delete(record)
832 # The segment records are deleted via cascade from the
833 # network record, so explicit removal is not necessary.
834 LOG.debug("Committing transaction")
837 port_ids = [port.id for port in ports]
838 subnet_ids = [subnet.id for subnet in subnets]
839 except os_db_exception.DBError as e:
840 with excutils.save_and_reraise_exception() as ctxt:
841 if isinstance(e.inner_exception, sql_exc.IntegrityError):
843 LOG.warning(_LW("A concurrent port creation has "
846 self._delete_ports(context, port_ids)
847 self._delete_subnets(context, subnet_ids)
850 self.mechanism_manager.delete_network_postcommit(mech_context)
851 except ml2_exc.MechanismDriverError:
852 # TODO(apech) - One or more mechanism driver failed to
853 # delete the network. Ideally we'd notify the caller of
854 # the fact that an error occurred.
855 LOG.error(_LE("mechanism_manager.delete_network_postcommit"
857 self.notifier.network_delete(context, id)
859 def _create_subnet_db(self, context, subnet):
860 session = context.session
861 with session.begin(subtransactions=True):
862 result = super(Ml2Plugin, self).create_subnet(context, subnet)
863 self.extension_manager.process_create_subnet(
864 context, subnet[attributes.SUBNET], result)
865 network = self.get_network(context, result['network_id'])
866 mech_context = driver_context.SubnetContext(self, context,
868 self.mechanism_manager.create_subnet_precommit(mech_context)
870 return result, mech_context
872 def create_subnet(self, context, subnet):
873 result, mech_context = self._create_subnet_db(context, subnet)
875 self.mechanism_manager.create_subnet_postcommit(mech_context)
876 except ml2_exc.MechanismDriverError:
877 with excutils.save_and_reraise_exception():
878 LOG.error(_LE("mechanism_manager.create_subnet_postcommit "
879 "failed, deleting subnet '%s'"), result['id'])
880 self.delete_subnet(context, result['id'])
883 def create_subnet_bulk(self, context, subnets):
884 objects = self._create_bulk_ml2(attributes.SUBNET, context, subnets)
885 return [obj['result'] for obj in objects]
887 def update_subnet(self, context, id, subnet):
888 session = context.session
889 with session.begin(subtransactions=True):
890 original_subnet = super(Ml2Plugin, self).get_subnet(context, id)
891 updated_subnet = super(Ml2Plugin, self).update_subnet(
893 self.extension_manager.process_update_subnet(
894 context, subnet[attributes.SUBNET], updated_subnet)
895 network = self.get_network(context, updated_subnet['network_id'])
896 mech_context = driver_context.SubnetContext(
897 self, context, updated_subnet, network,
898 original_subnet=original_subnet)
899 self.mechanism_manager.update_subnet_precommit(mech_context)
901 # TODO(apech) - handle errors raised by update_subnet, potentially
902 # by re-calling update_subnet with the previous attributes. For
903 # now the error is propogated to the caller, which is expected to
904 # either undo/retry the operation or delete the resource.
905 self.mechanism_manager.update_subnet_postcommit(mech_context)
906 return updated_subnet
908 def delete_subnet(self, context, id):
909 # REVISIT(rkukura) The super(Ml2Plugin, self).delete_subnet()
910 # function is not used because it deallocates the subnet's addresses
911 # from ports in the DB without invoking the derived class's
912 # update_port(), preventing mechanism drivers from being called.
913 # This approach should be revisited when the API layer is reworked
916 LOG.debug("Deleting subnet %s", id)
917 session = context.session
919 with session.begin(subtransactions=True):
920 record = self._get_subnet(context, id)
921 subnet = self._make_subnet_dict(record, None, context=context)
922 qry_allocated = (session.query(models_v2.IPAllocation).
923 filter_by(subnet_id=id).
924 join(models_v2.Port))
925 is_auto_addr_subnet = ipv6_utils.is_auto_address_subnet(subnet)
926 # Remove network owned ports, and delete IP allocations
927 # for IPv6 addresses which were automatically generated
929 if is_auto_addr_subnet:
930 self._subnet_check_ip_allocations_internal_router_ports(
934 qry_allocated.filter(models_v2.Port.device_owner.
935 in_(db_base_plugin_v2.AUTO_DELETE_PORT_OWNERS)))
936 allocated = qry_allocated.all()
937 # Delete all the IPAllocation that can be auto-deleted
941 LOG.debug("Ports to auto-deallocate: %s", allocated)
942 # Check if there are more IP allocations, unless
943 # is_auto_address_subnet is True. In that case the check is
944 # unnecessary. This additional check not only would be wasteful
945 # for this class of subnet, but is also error-prone since when
946 # the isolation level is set to READ COMMITTED allocations made
947 # concurrently will be returned by this query
948 if not is_auto_addr_subnet:
949 alloc = self._subnet_check_ip_allocations(context, id)
951 user_alloc = self._subnet_get_user_allocation(
954 LOG.info(_LI("Found port (%(port_id)s, %(ip)s) "
955 "having IP allocation on subnet "
956 "%(subnet)s, cannot delete"),
957 {'ip': user_alloc.ip_address,
958 'port_id': user_alloc.port_id,
960 raise exc.SubnetInUse(subnet_id=id)
962 # allocation found and it was DHCP port
963 # that appeared after autodelete ports were
964 # removed - need to restart whole operation
965 raise os_db_exception.RetryRequest(
966 exc.SubnetInUse(subnet_id=id))
968 db_base_plugin_v2._check_subnet_not_used(context, id)
970 # If allocated is None, then all the IPAllocation were
971 # correctly deleted during the previous pass.
973 network = self.get_network(context, subnet['network_id'])
974 mech_context = driver_context.SubnetContext(self, context,
977 self.mechanism_manager.delete_subnet_precommit(
980 LOG.debug("Deleting subnet record")
981 session.delete(record)
983 # The super(Ml2Plugin, self).delete_subnet() is not called,
984 # so need to manually call delete_subnet for pluggable ipam
985 self.ipam.delete_subnet(context, id)
987 LOG.debug("Committing transaction")
992 # calling update_port() for each allocation to remove the
993 # IP from the port and call the MechanismDrivers
994 data = {attributes.PORT:
995 {'fixed_ips': [{'subnet_id': ip.subnet_id,
996 'ip_address': ip.ip_address}
997 for ip in a.port.fixed_ips
998 if ip.subnet_id != id]}}
1000 self.update_port(context, a.port_id, data)
1001 except exc.PortNotFound:
1002 LOG.debug("Port %s deleted concurrently", a.port_id)
1004 with excutils.save_and_reraise_exception():
1005 LOG.exception(_LE("Exception deleting fixed_ip "
1006 "from port %s"), a.port_id)
1009 self.mechanism_manager.delete_subnet_postcommit(mech_context)
1010 except ml2_exc.MechanismDriverError:
1011 # TODO(apech) - One or more mechanism driver failed to
1012 # delete the subnet. Ideally we'd notify the caller of
1013 # the fact that an error occurred.
1014 LOG.error(_LE("mechanism_manager.delete_subnet_postcommit failed"))
1016 # TODO(yalei) - will be simplified after security group and address pair be
1017 # converted to ext driver too.
1018 def _portsec_ext_port_create_processing(self, context, port_data, port):
1019 attrs = port[attributes.PORT]
1020 port_security = ((port_data.get(psec.PORTSECURITY) is None) or
1021 port_data[psec.PORTSECURITY])
1023 # allowed address pair checks
1024 if self._check_update_has_allowed_address_pairs(port):
1025 if not port_security:
1026 raise addr_pair.AddressPairAndPortSecurityRequired()
1028 # remove ATTR_NOT_SPECIFIED
1029 attrs[addr_pair.ADDRESS_PAIRS] = []
1032 self._ensure_default_security_group_on_port(context, port)
1033 elif self._check_update_has_security_groups(port):
1034 raise psec.PortSecurityAndIPRequiredForSecurityGroups()
1036 def _create_port_db(self, context, port):
1037 attrs = port[attributes.PORT]
1038 if not attrs.get('status'):
1039 attrs['status'] = const.PORT_STATUS_DOWN
1041 session = context.session
1042 with db_api.exc_to_retry(os_db_exception.DBDuplicateEntry),\
1043 session.begin(subtransactions=True):
1044 dhcp_opts = attrs.get(edo_ext.EXTRADHCPOPTS, [])
1045 result = super(Ml2Plugin, self).create_port(context, port)
1046 self.extension_manager.process_create_port(context, attrs, result)
1047 self._portsec_ext_port_create_processing(context, result, port)
1049 # sgids must be got after portsec checked with security group
1050 sgids = self._get_security_groups_on_port(context, port)
1051 self._process_port_create_security_group(context, result, sgids)
1052 network = self.get_network(context, result['network_id'])
1053 binding = db.add_port_binding(session, result['id'])
1054 mech_context = driver_context.PortContext(self, context, result,
1055 network, binding, None)
1056 self._process_port_binding(mech_context, attrs)
1058 result[addr_pair.ADDRESS_PAIRS] = (
1059 self._process_create_allowed_address_pairs(
1061 attrs.get(addr_pair.ADDRESS_PAIRS)))
1062 self._process_port_create_extra_dhcp_opts(context, result,
1064 self.mechanism_manager.create_port_precommit(mech_context)
1066 return result, mech_context
1068 def create_port(self, context, port):
1069 result, mech_context = self._create_port_db(context, port)
1070 # notify any plugin that is interested in port create events
1071 kwargs = {'context': context, 'port': result}
1072 registry.notify(resources.PORT, events.AFTER_CREATE, self, **kwargs)
1075 self.mechanism_manager.create_port_postcommit(mech_context)
1076 except ml2_exc.MechanismDriverError:
1077 with excutils.save_and_reraise_exception():
1078 LOG.error(_LE("mechanism_manager.create_port_postcommit "
1079 "failed, deleting port '%s'"), result['id'])
1080 self.delete_port(context, result['id'])
1082 # REVISIT(rkukura): Is there any point in calling this before
1083 # a binding has been successfully established?
1084 self.notify_security_groups_member_updated(context, result)
1087 bound_context = self._bind_port_if_needed(mech_context)
1088 except ml2_exc.MechanismDriverError:
1089 with excutils.save_and_reraise_exception():
1090 LOG.error(_LE("_bind_port_if_needed "
1091 "failed, deleting port '%s'"), result['id'])
1092 self.delete_port(context, result['id'])
1094 return bound_context.current
1096 def create_port_bulk(self, context, ports):
1097 objects = self._create_bulk_ml2(attributes.PORT, context, ports)
1099 # REVISIT(rkukura): Is there any point in calling this before
1100 # a binding has been successfully established?
1101 results = [obj['result'] for obj in objects]
1102 self.notify_security_groups_member_updated_bulk(context, results)
1105 attrs = obj['attributes']
1106 if attrs and attrs.get(portbindings.HOST_ID):
1107 kwargs = {'context': context, 'port': obj['result']}
1109 resources.PORT, events.AFTER_CREATE, self, **kwargs)
1113 obj['bound_context'] = self._bind_port_if_needed(
1114 obj['mech_context'])
1115 return [obj['bound_context'].current for obj in objects]
1116 except ml2_exc.MechanismDriverError:
1117 with excutils.save_and_reraise_exception():
1118 resource_ids = [res['result']['id'] for res in objects]
1119 LOG.error(_LE("_bind_port_if_needed failed. "
1120 "Deleting all ports from create bulk '%s'"),
1122 self._delete_objects(context, attributes.PORT, objects)
1124 # TODO(yalei) - will be simplified after security group and address pair be
1125 # converted to ext driver too.
1126 def _portsec_ext_port_update_processing(self, updated_port, context, port,
1128 port_security = ((updated_port.get(psec.PORTSECURITY) is None) or
1129 updated_port[psec.PORTSECURITY])
1134 # check the address-pairs
1135 if self._check_update_has_allowed_address_pairs(port):
1136 # has address pairs in request
1137 raise addr_pair.AddressPairAndPortSecurityRequired()
1139 self._check_update_deletes_allowed_address_pairs(port)):
1140 # not a request for deleting the address-pairs
1141 updated_port[addr_pair.ADDRESS_PAIRS] = (
1142 self.get_allowed_address_pairs(context, id))
1144 # check if address pairs has been in db, if address pairs could
1145 # be put in extension driver, we can refine here.
1146 if updated_port[addr_pair.ADDRESS_PAIRS]:
1147 raise addr_pair.AddressPairAndPortSecurityRequired()
1149 # checks if security groups were updated adding/modifying
1150 # security groups, port security is set
1151 if self._check_update_has_security_groups(port):
1152 raise psec.PortSecurityAndIPRequiredForSecurityGroups()
1154 self._check_update_deletes_security_groups(port)):
1155 # Update did not have security groups passed in. Check
1156 # that port does not have any security groups already on it.
1157 filters = {'port_id': [id]}
1159 super(Ml2Plugin, self)._get_port_security_group_bindings(
1163 raise psec.PortSecurityPortHasSecurityGroup()
1165 def update_port(self, context, id, port):
1166 attrs = port[attributes.PORT]
1167 need_port_update_notify = False
1168 session = context.session
1169 bound_mech_contexts = []
1171 with db_api.exc_to_retry(os_db_exception.DBDuplicateEntry),\
1172 session.begin(subtransactions=True):
1173 port_db, binding = db.get_locked_port_and_binding(session, id)
1175 raise exc.PortNotFound(port_id=id)
1176 mac_address_updated = self._check_mac_update_allowed(
1177 port_db, attrs, binding)
1178 need_port_update_notify |= mac_address_updated
1179 original_port = self._make_port_dict(port_db)
1180 updated_port = super(Ml2Plugin, self).update_port(context, id,
1182 self.extension_manager.process_update_port(context, attrs,
1184 self._portsec_ext_port_update_processing(updated_port, context,
1187 if (psec.PORTSECURITY in attrs) and (
1188 original_port[psec.PORTSECURITY] !=
1189 updated_port[psec.PORTSECURITY]):
1190 need_port_update_notify = True
1191 # TODO(QoS): Move out to the extension framework somehow.
1192 # Follow https://review.openstack.org/#/c/169223 for a solution.
1193 if (qos_consts.QOS_POLICY_ID in attrs and
1194 original_port[qos_consts.QOS_POLICY_ID] !=
1195 updated_port[qos_consts.QOS_POLICY_ID]):
1196 need_port_update_notify = True
1198 if addr_pair.ADDRESS_PAIRS in attrs:
1199 need_port_update_notify |= (
1200 self.update_address_pairs_on_port(context, id, port,
1203 need_port_update_notify |= self.update_security_group_on_port(
1204 context, id, port, original_port, updated_port)
1205 network = self.get_network(context, original_port['network_id'])
1206 need_port_update_notify |= self._update_extra_dhcp_opts_on_port(
1207 context, id, port, updated_port)
1208 levels = db.get_binding_levels(session, id, binding.host)
1209 mech_context = driver_context.PortContext(
1210 self, context, updated_port, network, binding, levels,
1211 original_port=original_port)
1212 need_port_update_notify |= self._process_port_binding(
1213 mech_context, attrs)
1214 # For DVR router interface ports we need to retrieve the
1215 # DVRPortbinding context instead of the normal port context.
1216 # The normal Portbinding context does not have the status
1217 # of the ports that are required by the l2pop to process the
1218 # postcommit events.
1220 # NOTE:Sometimes during the update_port call, the DVR router
1221 # interface port may not have the port binding, so we cannot
1222 # create a generic bindinglist that will address both the
1223 # DVR and non-DVR cases here.
1224 # TODO(Swami): This code need to be revisited.
1225 if port_db['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
1226 dvr_binding_list = db.get_dvr_port_bindings(session, id)
1227 for dvr_binding in dvr_binding_list:
1228 levels = db.get_binding_levels(session, id,
1230 dvr_mech_context = driver_context.PortContext(
1231 self, context, updated_port, network,
1232 dvr_binding, levels, original_port=original_port)
1233 self.mechanism_manager.update_port_precommit(
1235 bound_mech_contexts.append(dvr_mech_context)
1237 self.mechanism_manager.update_port_precommit(mech_context)
1238 bound_mech_contexts.append(mech_context)
1240 # Notifications must be sent after the above transaction is complete
1243 'port': updated_port,
1244 'mac_address_updated': mac_address_updated,
1245 'original_port': original_port,
1247 registry.notify(resources.PORT, events.AFTER_UPDATE, self, **kwargs)
1249 # Note that DVR Interface ports will have bindings on
1250 # multiple hosts, and so will have multiple mech_contexts,
1251 # while other ports typically have just one.
1252 # Since bound_mech_contexts has both the DVR and non-DVR
1253 # contexts we can manage just with a single for loop.
1255 for mech_context in bound_mech_contexts:
1256 self.mechanism_manager.update_port_postcommit(
1258 except ml2_exc.MechanismDriverError:
1259 LOG.error(_LE("mechanism_manager.update_port_postcommit "
1260 "failed for port %s"), id)
1262 self.check_and_notify_security_group_member_changed(
1263 context, original_port, updated_port)
1264 need_port_update_notify |= self.is_security_group_member_updated(
1265 context, original_port, updated_port)
1267 if original_port['admin_state_up'] != updated_port['admin_state_up']:
1268 need_port_update_notify = True
1269 # NOTE: In the case of DVR ports, the port-binding is done after
1270 # router scheduling when sync_routers is called and so this call
1271 # below may not be required for DVR routed interfaces. But still
1272 # since we don't have the mech_context for the DVR router interfaces
1273 # at certain times, we just pass the port-context and return it, so
1274 # that we don't disturb other methods that are expecting a return
1276 bound_context = self._bind_port_if_needed(
1279 need_notify=need_port_update_notify)
1280 return bound_context.current
1282 def _process_dvr_port_binding(self, mech_context, context, attrs):
1283 session = mech_context._plugin_context.session
1284 binding = mech_context._binding
1285 port = mech_context.current
1286 port_id = port['id']
1288 if binding.vif_type != portbindings.VIF_TYPE_UNBOUND:
1289 binding.vif_details = ''
1290 binding.vif_type = portbindings.VIF_TYPE_UNBOUND
1292 db.clear_binding_levels(session, port_id, binding.host)
1295 self._update_port_dict_binding(port, binding)
1296 binding.host = attrs and attrs.get(portbindings.HOST_ID)
1297 binding.router_id = attrs and attrs.get('device_id')
1299 def update_dvr_port_binding(self, context, id, port):
1300 attrs = port[attributes.PORT]
1302 host = attrs and attrs.get(portbindings.HOST_ID)
1303 host_set = attributes.is_attr_set(host)
1306 LOG.error(_LE("No Host supplied to bind DVR Port %s"), id)
1309 session = context.session
1310 binding = db.get_dvr_port_binding_by_host(session, id, host)
1311 device_id = attrs and attrs.get('device_id')
1312 router_id = binding and binding.get('router_id')
1313 update_required = (not binding or
1314 binding.vif_type == portbindings.VIF_TYPE_BINDING_FAILED or
1315 router_id != device_id)
1317 with session.begin(subtransactions=True):
1319 orig_port = super(Ml2Plugin, self).get_port(context, id)
1320 except exc.PortNotFound:
1321 LOG.debug("DVR Port %s has been deleted concurrently", id)
1324 binding = db.ensure_dvr_port_binding(
1325 session, id, host, router_id=device_id)
1326 network = self.get_network(context, orig_port['network_id'])
1327 levels = db.get_binding_levels(session, id, host)
1328 mech_context = driver_context.PortContext(self,
1329 context, orig_port, network,
1330 binding, levels, original_port=orig_port)
1331 self._process_dvr_port_binding(mech_context, context, attrs)
1332 self._bind_port_if_needed(mech_context)
1334 def _pre_delete_port(self, context, port_id, port_check):
1335 """Do some preliminary operations before deleting the port."""
1336 LOG.debug("Deleting port %s", port_id)
1338 # notify interested parties of imminent port deletion;
1339 # a failure here prevents the operation from happening
1343 'port_check': port_check
1346 resources.PORT, events.BEFORE_DELETE, self, **kwargs)
1347 except exceptions.CallbackFailure as e:
1348 # NOTE(armax): preserve old check's behavior
1349 if len(e.errors) == 1:
1350 raise e.errors[0].error
1351 raise exc.ServicePortInUse(port_id=port_id, reason=e)
1353 def delete_port(self, context, id, l3_port_check=True):
1354 self._pre_delete_port(context, id, l3_port_check)
1355 # TODO(armax): get rid of the l3 dependency in the with block
1356 removed_routers = []
1358 l3plugin = manager.NeutronManager.get_service_plugins().get(
1359 service_constants.L3_ROUTER_NAT)
1360 is_dvr_enabled = utils.is_extension_supported(
1361 l3plugin, const.L3_DISTRIBUTED_EXT_ALIAS)
1363 session = context.session
1364 with session.begin(subtransactions=True):
1365 port_db, binding = db.get_locked_port_and_binding(session, id)
1367 LOG.debug("The port '%s' was deleted", id)
1369 port = self._make_port_dict(port_db)
1371 network = self.get_network(context, port['network_id'])
1372 bound_mech_contexts = []
1373 device_owner = port['device_owner']
1374 if device_owner == const.DEVICE_OWNER_DVR_INTERFACE:
1375 bindings = db.get_dvr_port_bindings(context.session, id)
1376 for bind in bindings:
1377 levels = db.get_binding_levels(context.session, id,
1379 mech_context = driver_context.PortContext(
1380 self, context, port, network, bind, levels)
1381 self.mechanism_manager.delete_port_precommit(mech_context)
1382 bound_mech_contexts.append(mech_context)
1384 levels = db.get_binding_levels(context.session, id,
1386 mech_context = driver_context.PortContext(
1387 self, context, port, network, binding, levels)
1388 if is_dvr_enabled and utils.is_dvr_serviced(device_owner):
1389 removed_routers = l3plugin.dvr_deletens_if_no_port(
1391 self.mechanism_manager.delete_port_precommit(mech_context)
1392 bound_mech_contexts.append(mech_context)
1394 router_ids = l3plugin.disassociate_floatingips(
1395 context, id, do_notify=False)
1397 LOG.debug("Calling delete_port for %(port_id)s owned by %(owner)s",
1398 {"port_id": id, "owner": device_owner})
1399 super(Ml2Plugin, self).delete_port(context, id)
1401 self._post_delete_port(
1402 context, port, router_ids, removed_routers, bound_mech_contexts)
1404 def _post_delete_port(
1405 self, context, port, router_ids, removed_routers, bound_mech_contexts):
1409 'router_ids': router_ids,
1410 'removed_routers': removed_routers
1412 registry.notify(resources.PORT, events.AFTER_DELETE, self, **kwargs)
1414 # Note that DVR Interface ports will have bindings on
1415 # multiple hosts, and so will have multiple mech_contexts,
1416 # while other ports typically have just one.
1417 for mech_context in bound_mech_contexts:
1418 self.mechanism_manager.delete_port_postcommit(mech_context)
1419 except ml2_exc.MechanismDriverError:
1420 # TODO(apech) - One or more mechanism driver failed to
1421 # delete the port. Ideally we'd notify the caller of the
1422 # fact that an error occurred.
1423 LOG.error(_LE("mechanism_manager.delete_port_postcommit failed for"
1424 " port %s"), port['id'])
1425 self.notifier.port_delete(context, port['id'])
1426 self.notify_security_groups_member_updated(context, port)
1428 def get_bound_port_context(self, plugin_context, port_id, host=None,
1429 cached_networks=None):
1430 session = plugin_context.session
1431 with session.begin(subtransactions=True):
1433 port_db = (session.query(models_v2.Port).
1434 enable_eagerloads(False).
1435 filter(models_v2.Port.id.startswith(port_id)).
1437 except sa_exc.NoResultFound:
1438 LOG.debug("No ports have port_id starting with %s",
1441 except sa_exc.MultipleResultsFound:
1442 LOG.error(_LE("Multiple ports have port_id starting with %s"),
1445 port = self._make_port_dict(port_db)
1446 network = (cached_networks or {}).get(port['network_id'])
1449 network = self.get_network(plugin_context, port['network_id'])
1451 if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
1452 binding = db.get_dvr_port_binding_by_host(
1453 session, port['id'], host)
1455 LOG.error(_LE("Binding info for DVR port %s not found"),
1458 levels = db.get_binding_levels(session, port_db.id, host)
1459 port_context = driver_context.PortContext(
1460 self, plugin_context, port, network, binding, levels)
1462 # since eager loads are disabled in port_db query
1463 # related attribute port_binding could disappear in
1464 # concurrent port deletion.
1465 # It's not an error condition.
1466 binding = port_db.port_binding
1468 LOG.info(_LI("Binding info for port %s was not found, "
1469 "it might have been deleted already."),
1472 levels = db.get_binding_levels(session, port_db.id,
1473 port_db.port_binding.host)
1474 port_context = driver_context.PortContext(
1475 self, plugin_context, port, network, binding, levels)
1477 return self._bind_port_if_needed(port_context)
1479 @oslo_db_api.wrap_db_retry(
1480 max_retries=db_api.MAX_RETRIES, retry_on_request=True,
1481 exception_checker=lambda e: isinstance(e, (sa_exc.StaleDataError,
1482 os_db_exception.DBDeadlock))
1484 def update_port_status(self, context, port_id, status, host=None,
1487 Returns port_id (non-truncated uuid) if the port exists.
1488 Otherwise returns None.
1489 network can be passed in to avoid another get_network call if
1490 one was already performed by the caller.
1493 session = context.session
1494 with session.begin(subtransactions=True):
1495 port = db.get_port(session, port_id)
1497 LOG.debug("Port %(port)s update to %(val)s by agent not found",
1498 {'port': port_id, 'val': status})
1500 if (port.status != status and
1501 port['device_owner'] != const.DEVICE_OWNER_DVR_INTERFACE):
1502 original_port = self._make_port_dict(port)
1503 port.status = status
1504 updated_port = self._make_port_dict(port)
1505 network = network or self.get_network(
1506 context, original_port['network_id'])
1507 levels = db.get_binding_levels(session, port.id,
1508 port.port_binding.host)
1509 mech_context = driver_context.PortContext(
1510 self, context, updated_port, network, port.port_binding,
1511 levels, original_port=original_port)
1512 self.mechanism_manager.update_port_precommit(mech_context)
1514 elif port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
1515 binding = db.get_dvr_port_binding_by_host(
1516 session, port['id'], host)
1519 binding['status'] = status
1520 binding.update(binding)
1524 port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE):
1525 with session.begin(subtransactions=True):
1526 port = db.get_port(session, port_id)
1528 LOG.warning(_LW("Port %s not found during update"),
1531 original_port = self._make_port_dict(port)
1532 network = network or self.get_network(
1533 context, original_port['network_id'])
1534 port.status = db.generate_dvr_port_status(session, port['id'])
1535 updated_port = self._make_port_dict(port)
1536 levels = db.get_binding_levels(session, port_id, host)
1537 mech_context = (driver_context.PortContext(
1538 self, context, updated_port, network,
1539 binding, levels, original_port=original_port))
1540 self.mechanism_manager.update_port_precommit(mech_context)
1543 self.mechanism_manager.update_port_postcommit(mech_context)
1545 if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
1546 db.delete_dvr_port_binding_if_stale(session, binding)
1550 def port_bound_to_host(self, context, port_id, host):
1551 port = db.get_port(context.session, port_id)
1553 LOG.debug("No Port match for: %s", port_id)
1555 if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
1556 bindings = db.get_dvr_port_bindings(context.session, port_id)
1560 LOG.debug("No binding found for DVR port %s", port['id'])
1563 port_host = db.get_port_binding_host(context.session, port_id)
1564 return (port_host == host)
1566 def get_ports_from_devices(self, context, devices):
1567 port_ids_to_devices = dict(
1568 (self._device_to_port_id(context, device), device)
1569 for device in devices)
1570 port_ids = list(port_ids_to_devices.keys())
1571 ports = db.get_ports_and_sgs(context, port_ids)
1573 # map back to original requested id
1574 port_id = next((port_id for port_id in port_ids
1575 if port['id'].startswith(port_id)), None)
1576 port['device'] = port_ids_to_devices.get(port_id)
1581 def _device_to_port_id(context, device):
1582 # REVISIT(rkukura): Consider calling into MechanismDrivers to
1583 # process device names, or having MechanismDrivers supply list
1584 # of device prefixes to strip.
1585 for prefix in const.INTERFACE_PREFIXES:
1586 if device.startswith(prefix):
1587 return device[len(prefix):]
1588 # REVISIT(irenab): Consider calling into bound MD to
1589 # handle the get_device_details RPC
1590 if not uuidutils.is_uuid_like(device):
1591 port = db.get_port_from_device_mac(context, device)
1596 def get_workers(self):
1597 return self.mechanism_manager.get_workers()