Set lock_path correctly.
[openstack-build/neutron-build.git] / neutron / plugins / ml2 / plugin.py
1 # Copyright (c) 2013 OpenStack Foundation
2 # All Rights Reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15
16 from eventlet import greenthread
17 from oslo_config import cfg
18 from oslo_db import api as oslo_db_api
19 from oslo_db import exception as os_db_exception
20 from oslo_log import helpers as log_helpers
21 from oslo_log import log
22 from oslo_serialization import jsonutils
23 from oslo_utils import excutils
24 from oslo_utils import importutils
25 from oslo_utils import uuidutils
26 from sqlalchemy import exc as sql_exc
27 from sqlalchemy.orm import exc as sa_exc
28
29 from neutron._i18n import _, _LE, _LI, _LW
30 from neutron.agent import securitygroups_rpc as sg_rpc
31 from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
32 from neutron.api.rpc.handlers import dhcp_rpc
33 from neutron.api.rpc.handlers import dvr_rpc
34 from neutron.api.rpc.handlers import metadata_rpc
35 from neutron.api.rpc.handlers import resources_rpc
36 from neutron.api.rpc.handlers import securitygroups_rpc
37 from neutron.api.v2 import attributes
38 from neutron.callbacks import events
39 from neutron.callbacks import exceptions
40 from neutron.callbacks import registry
41 from neutron.callbacks import resources
42 from neutron.common import constants as const
43 from neutron.common import exceptions as exc
44 from neutron.common import ipv6_utils
45 from neutron.common import rpc as n_rpc
46 from neutron.common import topics
47 from neutron.common import utils
48 from neutron.db import address_scope_db
49 from neutron.db import agents_db
50 from neutron.db import agentschedulers_db
51 from neutron.db import allowedaddresspairs_db as addr_pair_db
52 from neutron.db import api as db_api
53 from neutron.db import db_base_plugin_v2
54 from neutron.db import dvr_mac_db
55 from neutron.db import external_net_db
56 from neutron.db import extradhcpopt_db
57 from neutron.db import models_v2
58 from neutron.db import netmtu_db
59 from neutron.db.quota import driver  # noqa
60 from neutron.db import securitygroups_db
61 from neutron.db import securitygroups_rpc_base as sg_db_rpc
62 from neutron.db import vlantransparent_db
63 from neutron.extensions import allowedaddresspairs as addr_pair
64 from neutron.extensions import availability_zone as az_ext
65 from neutron.extensions import extra_dhcp_opt as edo_ext
66 from neutron.extensions import portbindings
67 from neutron.extensions import portsecurity as psec
68 from neutron.extensions import providernet as provider
69 from neutron.extensions import vlantransparent
70 from neutron import manager
71 from neutron.plugins.common import constants as service_constants
72 from neutron.plugins.ml2.common import exceptions as ml2_exc
73 from neutron.plugins.ml2 import config  # noqa
74 from neutron.plugins.ml2 import db
75 from neutron.plugins.ml2 import driver_api as api
76 from neutron.plugins.ml2 import driver_context
77 from neutron.plugins.ml2.extensions import qos as qos_ext
78 from neutron.plugins.ml2 import managers
79 from neutron.plugins.ml2 import models
80 from neutron.plugins.ml2 import rpc
81 from neutron.quota import resource_registry
82 from neutron.services.qos import qos_consts
83
84 LOG = log.getLogger(__name__)
85
86 MAX_BIND_TRIES = 10
87
88
89 SERVICE_PLUGINS_REQUIRED_DRIVERS = {
90     'qos': [qos_ext.QOS_EXT_DRIVER_ALIAS]
91 }
92
93
94 class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
95                 dvr_mac_db.DVRDbMixin,
96                 external_net_db.External_net_db_mixin,
97                 sg_db_rpc.SecurityGroupServerRpcMixin,
98                 agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
99                 addr_pair_db.AllowedAddressPairsMixin,
100                 vlantransparent_db.Vlantransparent_db_mixin,
101                 extradhcpopt_db.ExtraDhcpOptMixin,
102                 netmtu_db.Netmtu_db_mixin,
103                 address_scope_db.AddressScopeDbMixin):
104
105     """Implement the Neutron L2 abstractions using modules.
106
107     Ml2Plugin is a Neutron plugin based on separately extensible sets
108     of network types and mechanisms for connecting to networks of
109     those types. The network types and mechanisms are implemented as
110     drivers loaded via Python entry points. Networks can be made up of
111     multiple segments (not yet fully implemented).
112     """
113
114     # This attribute specifies whether the plugin supports or not
115     # bulk/pagination/sorting operations. Name mangling is used in
116     # order to ensure it is qualified by class
117     __native_bulk_support = True
118     __native_pagination_support = True
119     __native_sorting_support = True
120
121     # List of supported extensions
122     _supported_extension_aliases = ["provider", "external-net", "binding",
123                                     "quotas", "security-group", "agent",
124                                     "dhcp_agent_scheduler",
125                                     "multi-provider", "allowed-address-pairs",
126                                     "extra_dhcp_opt", "subnet_allocation",
127                                     "net-mtu", "vlan-transparent",
128                                     "address-scope", "dns-integration",
129                                     "availability_zone",
130                                     "network_availability_zone"]
131
132     @property
133     def supported_extension_aliases(self):
134         if not hasattr(self, '_aliases'):
135             aliases = self._supported_extension_aliases[:]
136             aliases += self.extension_manager.extension_aliases()
137             sg_rpc.disable_security_group_extension_by_config(aliases)
138             vlantransparent.disable_extension_by_config(aliases)
139             self._aliases = aliases
140         return self._aliases
141
142     @resource_registry.tracked_resources(
143         network=models_v2.Network,
144         port=models_v2.Port,
145         subnet=models_v2.Subnet,
146         subnetpool=models_v2.SubnetPool,
147         security_group=securitygroups_db.SecurityGroup,
148         security_group_rule=securitygroups_db.SecurityGroupRule)
149     def __init__(self):
150         # First load drivers, then initialize DB, then initialize drivers
151         self.type_manager = managers.TypeManager()
152         self.extension_manager = managers.ExtensionManager()
153         self.mechanism_manager = managers.MechanismManager()
154         super(Ml2Plugin, self).__init__()
155         self.type_manager.initialize()
156         self.extension_manager.initialize()
157         self.mechanism_manager.initialize()
158         self._setup_dhcp()
159         self._start_rpc_notifiers()
160         self.add_agent_status_check(self.agent_health_check)
161         self._verify_service_plugins_requirements()
162         LOG.info(_LI("Modular L2 Plugin initialization complete"))
163
164     def _setup_rpc(self):
165         """Initialize components to support agent communication."""
166         self.endpoints = [
167             rpc.RpcCallbacks(self.notifier, self.type_manager),
168             securitygroups_rpc.SecurityGroupServerRpcCallback(),
169             dvr_rpc.DVRServerRpcCallback(),
170             dhcp_rpc.DhcpRpcCallback(),
171             agents_db.AgentExtRpcCallback(),
172             metadata_rpc.MetadataRpcCallback(),
173             resources_rpc.ResourcesPullRpcCallback()
174         ]
175
176     def _setup_dhcp(self):
177         """Initialize components to support DHCP."""
178         self.network_scheduler = importutils.import_object(
179             cfg.CONF.network_scheduler_driver
180         )
181         self.start_periodic_dhcp_agent_status_check()
182
183     def _verify_service_plugins_requirements(self):
184         for service_plugin in cfg.CONF.service_plugins:
185             extension_drivers = SERVICE_PLUGINS_REQUIRED_DRIVERS.get(
186                 service_plugin, []
187             )
188             for extension_driver in extension_drivers:
189                 if extension_driver not in self.extension_manager.names():
190                     raise ml2_exc.ExtensionDriverNotFound(
191                         driver=extension_driver, service_plugin=service_plugin
192                     )
193
194     @property
195     def supported_qos_rule_types(self):
196         return self.mechanism_manager.supported_qos_rule_types
197
198     @log_helpers.log_method_call
199     def _start_rpc_notifiers(self):
200         """Initialize RPC notifiers for agents."""
201         self.notifier = rpc.AgentNotifierApi(topics.AGENT)
202         self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
203             dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
204         )
205
206     @log_helpers.log_method_call
207     def start_rpc_listeners(self):
208         """Start the RPC loop to let the plugin communicate with agents."""
209         self._setup_rpc()
210         self.topic = topics.PLUGIN
211         self.conn = n_rpc.create_connection()
212         self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
213         # process state reports despite dedicated rpc workers
214         self.conn.create_consumer(topics.REPORTS,
215                                   [agents_db.AgentExtRpcCallback()],
216                                   fanout=False)
217         return self.conn.consume_in_threads()
218
219     def start_rpc_state_reports_listener(self):
220         self.conn_reports = n_rpc.create_connection(new=True)
221         self.conn_reports.create_consumer(topics.REPORTS,
222                                           [agents_db.AgentExtRpcCallback()],
223                                           fanout=False)
224         return self.conn_reports.consume_in_threads()
225
226     def _filter_nets_provider(self, context, networks, filters):
227         return [network
228                 for network in networks
229                 if self.type_manager.network_matches_filters(network, filters)
230                 ]
231
232     def _check_mac_update_allowed(self, orig_port, port, binding):
233         unplugged_types = (portbindings.VIF_TYPE_BINDING_FAILED,
234                            portbindings.VIF_TYPE_UNBOUND)
235         new_mac = port.get('mac_address')
236         mac_change = (new_mac is not None and
237                       orig_port['mac_address'] != new_mac)
238         if (mac_change and binding.vif_type not in unplugged_types):
239             raise exc.PortBound(port_id=orig_port['id'],
240                                 vif_type=binding.vif_type,
241                                 old_mac=orig_port['mac_address'],
242                                 new_mac=port['mac_address'])
243         return mac_change
244
245     def _process_port_binding(self, mech_context, attrs):
246         session = mech_context._plugin_context.session
247         binding = mech_context._binding
248         port = mech_context.current
249         port_id = port['id']
250         changes = False
251
252         host = attributes.ATTR_NOT_SPECIFIED
253         if attrs and portbindings.HOST_ID in attrs:
254             host = attrs.get(portbindings.HOST_ID) or ''
255
256         original_host = binding.host
257         if (attributes.is_attr_set(host) and
258             original_host != host):
259             binding.host = host
260             changes = True
261
262         vnic_type = attrs and attrs.get(portbindings.VNIC_TYPE)
263         if (attributes.is_attr_set(vnic_type) and
264             binding.vnic_type != vnic_type):
265             binding.vnic_type = vnic_type
266             changes = True
267
268         # treat None as clear of profile.
269         profile = None
270         if attrs and portbindings.PROFILE in attrs:
271             profile = attrs.get(portbindings.PROFILE) or {}
272
273         if profile not in (None, attributes.ATTR_NOT_SPECIFIED,
274                            self._get_profile(binding)):
275             binding.profile = jsonutils.dumps(profile)
276             if len(binding.profile) > models.BINDING_PROFILE_LEN:
277                 msg = _("binding:profile value too large")
278                 raise exc.InvalidInput(error_message=msg)
279             changes = True
280
281         # Unbind the port if needed.
282         if changes:
283             binding.vif_type = portbindings.VIF_TYPE_UNBOUND
284             binding.vif_details = ''
285             db.clear_binding_levels(session, port_id, original_host)
286             mech_context._clear_binding_levels()
287
288         if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
289             binding.vif_type = portbindings.VIF_TYPE_UNBOUND
290             binding.vif_details = ''
291             db.clear_binding_levels(session, port_id, original_host)
292             mech_context._clear_binding_levels()
293             binding.host = ''
294
295         self._update_port_dict_binding(port, binding)
296         return changes
297
298     def _bind_port_if_needed(self, context, allow_notify=False,
299                              need_notify=False):
300         # Binding limit does not need to be tunable because no
301         # more than a couple of attempts should ever be required in
302         # normal operation.
303         for count in range(1, MAX_BIND_TRIES + 1):
304             if count > 1:
305                 # multiple attempts shouldn't happen very often so we log each
306                 # attempt after the 1st.
307                 greenthread.sleep(0)  # yield
308                 LOG.info(_LI("Attempt %(count)s to bind port %(port)s"),
309                          {'count': count, 'port': context.current['id']})
310             context, need_notify, try_again = self._attempt_binding(
311                 context, need_notify)
312             if not try_again:
313                 if allow_notify and need_notify:
314                     self._notify_port_updated(context)
315                 return context
316
317         LOG.error(_LE("Failed to commit binding results for %(port)s "
318                       "after %(max)s tries"),
319                   {'port': context.current['id'], 'max': MAX_BIND_TRIES})
320         return context
321
322     def _attempt_binding(self, context, need_notify):
323         # Since the mechanism driver bind_port() calls must be made
324         # outside a DB transaction locking the port state, it is
325         # possible (but unlikely) that the port's state could change
326         # concurrently while these calls are being made. If another
327         # thread or process succeeds in binding the port before this
328         # thread commits its results, the already committed results are
329         # used. If attributes such as binding:host_id,
330         # binding:profile, or binding:vnic_type are updated
331         # concurrently, the try_again flag is returned to indicate that
332         # the commit was unsuccessful.
333         plugin_context = context._plugin_context
334         port_id = context.current['id']
335         binding = context._binding
336         try_again = False
337         # First, determine whether it is necessary and possible to
338         # bind the port.
339         if (binding.vif_type != portbindings.VIF_TYPE_UNBOUND
340                 or not binding.host):
341             # We either don't need to bind the port or can't
342             return context, need_notify, try_again
343
344         # The port isn't already bound and the necessary
345         # information is available, so attempt to bind the port.
346         bind_context = self._bind_port(context)
347         # Now try to commit result of attempting to bind the port.
348         new_context, did_commit = self._commit_port_binding(
349             plugin_context, port_id, binding, bind_context)
350         if not new_context:
351             # The port has been deleted concurrently, so just
352             # return the unbound result from the initial
353             # transaction that completed before the deletion.
354             LOG.debug("Port %s has been deleted concurrently",
355                       port_id)
356             need_notify = False
357             return context, need_notify, try_again
358         # Need to notify if we succeed and our results were
359         # committed.
360         if did_commit and (new_context._binding.vif_type !=
361                            portbindings.VIF_TYPE_BINDING_FAILED):
362             need_notify = True
363             return new_context, need_notify, try_again
364         try_again = True
365         return new_context, need_notify, try_again
366
367     def _bind_port(self, orig_context):
368         # Construct a new PortContext from the one from the previous
369         # transaction.
370         port = orig_context.current
371         orig_binding = orig_context._binding
372         new_binding = models.PortBinding(
373             host=orig_binding.host,
374             vnic_type=orig_binding.vnic_type,
375             profile=orig_binding.profile,
376             vif_type=portbindings.VIF_TYPE_UNBOUND,
377             vif_details=''
378         )
379         self._update_port_dict_binding(port, new_binding)
380         new_context = driver_context.PortContext(
381             self, orig_context._plugin_context, port,
382             orig_context.network.current, new_binding, None)
383
384         # Attempt to bind the port and return the context with the
385         # result.
386         self.mechanism_manager.bind_port(new_context)
387         return new_context
388
389     def _commit_port_binding(self, plugin_context, port_id, orig_binding,
390                              new_context):
391         session = plugin_context.session
392         new_binding = new_context._binding
393
394         # After we've attempted to bind the port, we begin a
395         # transaction, get the current port state, and decide whether
396         # to commit the binding results.
397         with session.begin(subtransactions=True):
398             # Get the current port state and build a new PortContext
399             # reflecting this state as original state for subsequent
400             # mechanism driver update_port_*commit() calls.
401             port_db, cur_binding = db.get_locked_port_and_binding(session,
402                                                                   port_id)
403             if not port_db:
404                 # The port has been deleted concurrently.
405                 return (None, False)
406             oport = self._make_port_dict(port_db)
407             port = self._make_port_dict(port_db)
408             network = new_context.network.current
409             if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
410                 # REVISIT(rkukura): The PortBinding instance from the
411                 # ml2_port_bindings table, returned as cur_binding
412                 # from db.get_locked_port_and_binding() above, is
413                 # currently not used for DVR distributed ports, and is
414                 # replaced here with the DVRPortBinding instance from
415                 # the ml2_dvr_port_bindings table specific to the host
416                 # on which the distributed port is being bound. It
417                 # would be possible to optimize this code to avoid
418                 # fetching the PortBinding instance in the DVR case,
419                 # and even to avoid creating the unused entry in the
420                 # ml2_port_bindings table. But the upcoming resolution
421                 # for bug 1367391 will eliminate the
422                 # ml2_dvr_port_bindings table, use the
423                 # ml2_port_bindings table to store non-host-specific
424                 # fields for both distributed and non-distributed
425                 # ports, and introduce a new ml2_port_binding_hosts
426                 # table for the fields that need to be host-specific
427                 # in the distributed case. Since the PortBinding
428                 # instance will then be needed, it does not make sense
429                 # to optimize this code to avoid fetching it.
430                 cur_binding = db.get_dvr_port_binding_by_host(
431                     session, port_id, orig_binding.host)
432             cur_context = driver_context.PortContext(
433                 self, plugin_context, port, network, cur_binding, None,
434                 original_port=oport)
435
436             # Commit our binding results only if port has not been
437             # successfully bound concurrently by another thread or
438             # process and no binding inputs have been changed.
439             commit = ((cur_binding.vif_type in
440                        [portbindings.VIF_TYPE_UNBOUND,
441                         portbindings.VIF_TYPE_BINDING_FAILED]) and
442                       orig_binding.host == cur_binding.host and
443                       orig_binding.vnic_type == cur_binding.vnic_type and
444                       orig_binding.profile == cur_binding.profile)
445
446             if commit:
447                 # Update the port's binding state with our binding
448                 # results.
449                 cur_binding.vif_type = new_binding.vif_type
450                 cur_binding.vif_details = new_binding.vif_details
451                 db.clear_binding_levels(session, port_id, cur_binding.host)
452                 db.set_binding_levels(session, new_context._binding_levels)
453                 cur_context._binding_levels = new_context._binding_levels
454
455                 # Update PortContext's port dictionary to reflect the
456                 # updated binding state.
457                 self._update_port_dict_binding(port, cur_binding)
458
459                 # Update the port status if requested by the bound driver.
460                 if (new_context._binding_levels and
461                     new_context._new_port_status):
462                     port_db.status = new_context._new_port_status
463                     port['status'] = new_context._new_port_status
464
465                 # Call the mechanism driver precommit methods, commit
466                 # the results, and call the postcommit methods.
467                 self.mechanism_manager.update_port_precommit(cur_context)
468         if commit:
469             self.mechanism_manager.update_port_postcommit(cur_context)
470
471         # Continue, using the port state as of the transaction that
472         # just finished, whether that transaction committed new
473         # results or discovered concurrent port state changes.
474         return (cur_context, commit)
475
476     def _update_port_dict_binding(self, port, binding):
477         port[portbindings.VNIC_TYPE] = binding.vnic_type
478         port[portbindings.PROFILE] = self._get_profile(binding)
479         if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
480             port[portbindings.HOST_ID] = ''
481             port[portbindings.VIF_TYPE] = portbindings.VIF_TYPE_DISTRIBUTED
482             port[portbindings.VIF_DETAILS] = {}
483         else:
484             port[portbindings.HOST_ID] = binding.host
485             port[portbindings.VIF_TYPE] = binding.vif_type
486             port[portbindings.VIF_DETAILS] = self._get_vif_details(binding)
487
488     def _get_vif_details(self, binding):
489         if binding.vif_details:
490             try:
491                 return jsonutils.loads(binding.vif_details)
492             except Exception:
493                 LOG.error(_LE("Serialized vif_details DB value '%(value)s' "
494                               "for port %(port)s is invalid"),
495                           {'value': binding.vif_details,
496                            'port': binding.port_id})
497         return {}
498
499     def _get_profile(self, binding):
500         if binding.profile:
501             try:
502                 return jsonutils.loads(binding.profile)
503             except Exception:
504                 LOG.error(_LE("Serialized profile DB value '%(value)s' for "
505                               "port %(port)s is invalid"),
506                           {'value': binding.profile,
507                            'port': binding.port_id})
508         return {}
509
510     def _ml2_extend_port_dict_binding(self, port_res, port_db):
511         # None when called during unit tests for other plugins.
512         if port_db.port_binding:
513             self._update_port_dict_binding(port_res, port_db.port_binding)
514
515     db_base_plugin_v2.NeutronDbPluginV2.register_dict_extend_funcs(
516         attributes.PORTS, ['_ml2_extend_port_dict_binding'])
517
518     # Register extend dict methods for network and port resources.
519     # Each mechanism driver that supports extend attribute for the resources
520     # can add those attribute to the result.
521     db_base_plugin_v2.NeutronDbPluginV2.register_dict_extend_funcs(
522                attributes.NETWORKS, ['_ml2_md_extend_network_dict'])
523     db_base_plugin_v2.NeutronDbPluginV2.register_dict_extend_funcs(
524                attributes.PORTS, ['_ml2_md_extend_port_dict'])
525     db_base_plugin_v2.NeutronDbPluginV2.register_dict_extend_funcs(
526                attributes.SUBNETS, ['_ml2_md_extend_subnet_dict'])
527
528     def _ml2_md_extend_network_dict(self, result, netdb):
529         session = db_api.get_session()
530         with session.begin(subtransactions=True):
531             self.extension_manager.extend_network_dict(session, netdb, result)
532
533     def _ml2_md_extend_port_dict(self, result, portdb):
534         session = db_api.get_session()
535         with session.begin(subtransactions=True):
536             self.extension_manager.extend_port_dict(session, portdb, result)
537
538     def _ml2_md_extend_subnet_dict(self, result, subnetdb):
539         session = db_api.get_session()
540         with session.begin(subtransactions=True):
541             self.extension_manager.extend_subnet_dict(
542                 session, subnetdb, result)
543
544     # Note - The following hook methods have "ml2" in their names so
545     # that they are not called twice during unit tests due to global
546     # registration of hooks in portbindings_db.py used by other
547     # plugins.
548
549     def _ml2_port_model_hook(self, context, original_model, query):
550         query = query.outerjoin(models.PortBinding,
551                                 (original_model.id ==
552                                  models.PortBinding.port_id))
553         return query
554
555     def _ml2_port_result_filter_hook(self, query, filters):
556         values = filters and filters.get(portbindings.HOST_ID, [])
557         if not values:
558             return query
559         return query.filter(models.PortBinding.host.in_(values))
560
561     db_base_plugin_v2.NeutronDbPluginV2.register_model_query_hook(
562         models_v2.Port,
563         "ml2_port_bindings",
564         '_ml2_port_model_hook',
565         None,
566         '_ml2_port_result_filter_hook')
567
568     def _notify_port_updated(self, mech_context):
569         port = mech_context.current
570         segment = mech_context.bottom_bound_segment
571         if not segment:
572             # REVISIT(rkukura): This should notify agent to unplug port
573             network = mech_context.network.current
574             LOG.warning(_LW("In _notify_port_updated(), no bound segment for "
575                             "port %(port_id)s on network %(network_id)s"),
576                         {'port_id': port['id'],
577                          'network_id': network['id']})
578             return
579         self.notifier.port_update(mech_context._plugin_context, port,
580                                   segment[api.NETWORK_TYPE],
581                                   segment[api.SEGMENTATION_ID],
582                                   segment[api.PHYSICAL_NETWORK])
583
584     def _delete_objects(self, context, resource, objects):
585         delete_op = getattr(self, 'delete_%s' % resource)
586         for obj in objects:
587             try:
588                 delete_op(context, obj['result']['id'])
589             except KeyError:
590                 LOG.exception(_LE("Could not find %s to delete."),
591                               resource)
592             except Exception:
593                 LOG.exception(_LE("Could not delete %(res)s %(id)s."),
594                               {'res': resource,
595                                'id': obj['result']['id']})
596
597     def _create_bulk_ml2(self, resource, context, request_items):
598         objects = []
599         collection = "%ss" % resource
600         items = request_items[collection]
601         try:
602             with context.session.begin(subtransactions=True):
603                 obj_creator = getattr(self, '_create_%s_db' % resource)
604                 for item in items:
605                     attrs = item[resource]
606                     result, mech_context = obj_creator(context, item)
607                     objects.append({'mech_context': mech_context,
608                                     'result': result,
609                                     'attributes': attrs})
610
611         except Exception:
612             with excutils.save_and_reraise_exception():
613                 LOG.exception(_LE("An exception occurred while creating "
614                                   "the %(resource)s:%(item)s"),
615                               {'resource': resource, 'item': item})
616
617         try:
618             postcommit_op = getattr(self.mechanism_manager,
619                                     'create_%s_postcommit' % resource)
620             for obj in objects:
621                 postcommit_op(obj['mech_context'])
622             return objects
623         except ml2_exc.MechanismDriverError:
624             with excutils.save_and_reraise_exception():
625                 resource_ids = [res['result']['id'] for res in objects]
626                 LOG.exception(_LE("mechanism_manager.create_%(res)s"
627                                   "_postcommit failed for %(res)s: "
628                                   "'%(failed_id)s'. Deleting "
629                                   "%(res)ss %(resource_ids)s"),
630                               {'res': resource,
631                                'failed_id': obj['result']['id'],
632                                'resource_ids': ', '.join(resource_ids)})
633                 self._delete_objects(context, resource, objects)
634
635     def _create_network_db(self, context, network):
636         net_data = network[attributes.NETWORK]
637         tenant_id = net_data['tenant_id']
638         session = context.session
639         with session.begin(subtransactions=True):
640             self._ensure_default_security_group(context, tenant_id)
641             result = super(Ml2Plugin, self).create_network(context, network)
642             self.extension_manager.process_create_network(context, net_data,
643                                                           result)
644             self._process_l3_create(context, result, net_data)
645             net_data['id'] = result['id']
646             self.type_manager.create_network_segments(context, net_data,
647                                                       tenant_id)
648             self.type_manager.extend_network_dict_provider(context, result)
649             mech_context = driver_context.NetworkContext(self, context,
650                                                          result)
651             self.mechanism_manager.create_network_precommit(mech_context)
652
653             if net_data.get(api.MTU, 0) > 0:
654                 res = super(Ml2Plugin, self).update_network(context,
655                     result['id'], {'network': {api.MTU: net_data[api.MTU]}})
656                 result[api.MTU] = res.get(api.MTU, 0)
657
658             if az_ext.AZ_HINTS in net_data:
659                 self.validate_availability_zones(context, 'network',
660                                                  net_data[az_ext.AZ_HINTS])
661                 az_hints = az_ext.convert_az_list_to_string(
662                                                 net_data[az_ext.AZ_HINTS])
663                 res = super(Ml2Plugin, self).update_network(context,
664                     result['id'], {'network': {az_ext.AZ_HINTS: az_hints}})
665                 result[az_ext.AZ_HINTS] = res[az_ext.AZ_HINTS]
666
667             # Update the transparent vlan if configured
668             if utils.is_extension_supported(self, 'vlan-transparent'):
669                 vlt = vlantransparent.get_vlan_transparent(net_data)
670                 super(Ml2Plugin, self).update_network(context,
671                     result['id'], {'network': {'vlan_transparent': vlt}})
672                 result['vlan_transparent'] = vlt
673
674         return result, mech_context
675
676     def create_network(self, context, network):
677         result, mech_context = self._create_network_db(context, network)
678         try:
679             self.mechanism_manager.create_network_postcommit(mech_context)
680         except ml2_exc.MechanismDriverError:
681             with excutils.save_and_reraise_exception():
682                 LOG.error(_LE("mechanism_manager.create_network_postcommit "
683                               "failed, deleting network '%s'"), result['id'])
684                 self.delete_network(context, result['id'])
685
686         return result
687
688     def create_network_bulk(self, context, networks):
689         objects = self._create_bulk_ml2(attributes.NETWORK, context, networks)
690         return [obj['result'] for obj in objects]
691
692     def update_network(self, context, id, network):
693         net_data = network[attributes.NETWORK]
694         provider._raise_if_updates_provider_attributes(net_data)
695
696         session = context.session
697         with session.begin(subtransactions=True):
698             original_network = super(Ml2Plugin, self).get_network(context, id)
699             updated_network = super(Ml2Plugin, self).update_network(context,
700                                                                     id,
701                                                                     network)
702             self.extension_manager.process_update_network(context, net_data,
703                                                           updated_network)
704             self._process_l3_update(context, updated_network, net_data)
705             self.type_manager.extend_network_dict_provider(context,
706                                                            updated_network)
707
708             # TODO(QoS): Move out to the extension framework somehow.
709             need_network_update_notify = (
710                 qos_consts.QOS_POLICY_ID in net_data and
711                 original_network[qos_consts.QOS_POLICY_ID] !=
712                 updated_network[qos_consts.QOS_POLICY_ID])
713
714             mech_context = driver_context.NetworkContext(
715                 self, context, updated_network,
716                 original_network=original_network)
717             self.mechanism_manager.update_network_precommit(mech_context)
718
719         # TODO(apech) - handle errors raised by update_network, potentially
720         # by re-calling update_network with the previous attributes. For
721         # now the error is propogated to the caller, which is expected to
722         # either undo/retry the operation or delete the resource.
723         self.mechanism_manager.update_network_postcommit(mech_context)
724         if need_network_update_notify:
725             self.notifier.network_update(context, updated_network)
726         return updated_network
727
728     def get_network(self, context, id, fields=None):
729         session = context.session
730         with session.begin(subtransactions=True):
731             result = super(Ml2Plugin, self).get_network(context, id, None)
732             self.type_manager.extend_network_dict_provider(context, result)
733
734         return self._fields(result, fields)
735
736     def get_networks(self, context, filters=None, fields=None,
737                      sorts=None, limit=None, marker=None, page_reverse=False):
738         session = context.session
739         with session.begin(subtransactions=True):
740             nets = super(Ml2Plugin,
741                          self).get_networks(context, filters, None, sorts,
742                                             limit, marker, page_reverse)
743             self.type_manager.extend_networks_dict_provider(context, nets)
744
745             nets = self._filter_nets_provider(context, nets, filters)
746
747         return [self._fields(net, fields) for net in nets]
748
749     def _delete_ports(self, context, port_ids):
750         for port_id in port_ids:
751             try:
752                 self.delete_port(context, port_id)
753             except (exc.PortNotFound, sa_exc.ObjectDeletedError):
754                 # concurrent port deletion can be performed by
755                 # release_dhcp_port caused by concurrent subnet_delete
756                 LOG.info(_LI("Port %s was deleted concurrently"), port_id)
757             except Exception:
758                 with excutils.save_and_reraise_exception():
759                     LOG.exception(_LE("Exception auto-deleting port %s"),
760                                   port_id)
761
762     def _delete_subnets(self, context, subnet_ids):
763         for subnet_id in subnet_ids:
764             try:
765                 self.delete_subnet(context, subnet_id)
766             except (exc.SubnetNotFound, sa_exc.ObjectDeletedError):
767                 LOG.info(_LI("Subnet %s was deleted concurrently"),
768                          subnet_id)
769             except Exception:
770                 with excutils.save_and_reraise_exception():
771                     LOG.exception(_LE("Exception auto-deleting subnet %s"),
772                                   subnet_id)
773
774     def delete_network(self, context, id):
775         # REVISIT(rkukura) The super(Ml2Plugin, self).delete_network()
776         # function is not used because it auto-deletes ports and
777         # subnets from the DB without invoking the derived class's
778         # delete_port() or delete_subnet(), preventing mechanism
779         # drivers from being called. This approach should be revisited
780         # when the API layer is reworked during icehouse.
781
782         LOG.debug("Deleting network %s", id)
783         session = context.session
784         while True:
785             try:
786                 # REVISIT: Serialize this operation with a semaphore
787                 # to prevent deadlock waiting to acquire a DB lock
788                 # held by another thread in the same process, leading
789                 # to 'lock wait timeout' errors.
790                 #
791                 # Process L3 first, since, depending on the L3 plugin, it may
792                 # involve sending RPC notifications, and/or calling delete_port
793                 # on this plugin.
794                 # Additionally, a rollback may not be enough to undo the
795                 # deletion of a floating IP with certain L3 backends.
796                 self._process_l3_delete(context, id)
797                 # Using query().with_lockmode isn't necessary. Foreign-key
798                 # constraints prevent deletion if concurrent creation happens.
799                 with session.begin(subtransactions=True):
800                     # Get ports to auto-delete.
801                     ports = (session.query(models_v2.Port).
802                              enable_eagerloads(False).
803                              filter_by(network_id=id).all())
804                     LOG.debug("Ports to auto-delete: %s", ports)
805                     only_auto_del = all(p.device_owner
806                                         in db_base_plugin_v2.
807                                         AUTO_DELETE_PORT_OWNERS
808                                         for p in ports)
809                     if not only_auto_del:
810                         LOG.debug("Tenant-owned ports exist")
811                         raise exc.NetworkInUse(net_id=id)
812
813                     # Get subnets to auto-delete.
814                     subnets = (session.query(models_v2.Subnet).
815                                enable_eagerloads(False).
816                                filter_by(network_id=id).all())
817                     LOG.debug("Subnets to auto-delete: %s", subnets)
818
819                     if not (ports or subnets):
820                         network = self.get_network(context, id)
821                         mech_context = driver_context.NetworkContext(self,
822                                                                      context,
823                                                                      network)
824                         self.mechanism_manager.delete_network_precommit(
825                             mech_context)
826
827                         self.type_manager.release_network_segments(session, id)
828                         record = self._get_network(context, id)
829                         LOG.debug("Deleting network record %s", record)
830                         session.delete(record)
831
832                         # The segment records are deleted via cascade from the
833                         # network record, so explicit removal is not necessary.
834                         LOG.debug("Committing transaction")
835                         break
836
837                     port_ids = [port.id for port in ports]
838                     subnet_ids = [subnet.id for subnet in subnets]
839             except os_db_exception.DBError as e:
840                 with excutils.save_and_reraise_exception() as ctxt:
841                     if isinstance(e.inner_exception, sql_exc.IntegrityError):
842                         ctxt.reraise = False
843                         LOG.warning(_LW("A concurrent port creation has "
844                                         "occurred"))
845                         continue
846             self._delete_ports(context, port_ids)
847             self._delete_subnets(context, subnet_ids)
848
849         try:
850             self.mechanism_manager.delete_network_postcommit(mech_context)
851         except ml2_exc.MechanismDriverError:
852             # TODO(apech) - One or more mechanism driver failed to
853             # delete the network.  Ideally we'd notify the caller of
854             # the fact that an error occurred.
855             LOG.error(_LE("mechanism_manager.delete_network_postcommit"
856                           " failed"))
857         self.notifier.network_delete(context, id)
858
859     def _create_subnet_db(self, context, subnet):
860         session = context.session
861         with session.begin(subtransactions=True):
862             result = super(Ml2Plugin, self).create_subnet(context, subnet)
863             self.extension_manager.process_create_subnet(
864                 context, subnet[attributes.SUBNET], result)
865             network = self.get_network(context, result['network_id'])
866             mech_context = driver_context.SubnetContext(self, context,
867                                                         result, network)
868             self.mechanism_manager.create_subnet_precommit(mech_context)
869
870         return result, mech_context
871
872     def create_subnet(self, context, subnet):
873         result, mech_context = self._create_subnet_db(context, subnet)
874         try:
875             self.mechanism_manager.create_subnet_postcommit(mech_context)
876         except ml2_exc.MechanismDriverError:
877             with excutils.save_and_reraise_exception():
878                 LOG.error(_LE("mechanism_manager.create_subnet_postcommit "
879                               "failed, deleting subnet '%s'"), result['id'])
880                 self.delete_subnet(context, result['id'])
881         return result
882
883     def create_subnet_bulk(self, context, subnets):
884         objects = self._create_bulk_ml2(attributes.SUBNET, context, subnets)
885         return [obj['result'] for obj in objects]
886
887     def update_subnet(self, context, id, subnet):
888         session = context.session
889         with session.begin(subtransactions=True):
890             original_subnet = super(Ml2Plugin, self).get_subnet(context, id)
891             updated_subnet = super(Ml2Plugin, self).update_subnet(
892                 context, id, subnet)
893             self.extension_manager.process_update_subnet(
894                 context, subnet[attributes.SUBNET], updated_subnet)
895             network = self.get_network(context, updated_subnet['network_id'])
896             mech_context = driver_context.SubnetContext(
897                 self, context, updated_subnet, network,
898                 original_subnet=original_subnet)
899             self.mechanism_manager.update_subnet_precommit(mech_context)
900
901         # TODO(apech) - handle errors raised by update_subnet, potentially
902         # by re-calling update_subnet with the previous attributes. For
903         # now the error is propogated to the caller, which is expected to
904         # either undo/retry the operation or delete the resource.
905         self.mechanism_manager.update_subnet_postcommit(mech_context)
906         return updated_subnet
907
908     def delete_subnet(self, context, id):
909         # REVISIT(rkukura) The super(Ml2Plugin, self).delete_subnet()
910         # function is not used because it deallocates the subnet's addresses
911         # from ports in the DB without invoking the derived class's
912         # update_port(), preventing mechanism drivers from being called.
913         # This approach should be revisited when the API layer is reworked
914         # during icehouse.
915
916         LOG.debug("Deleting subnet %s", id)
917         session = context.session
918         while True:
919             with session.begin(subtransactions=True):
920                 record = self._get_subnet(context, id)
921                 subnet = self._make_subnet_dict(record, None, context=context)
922                 qry_allocated = (session.query(models_v2.IPAllocation).
923                                  filter_by(subnet_id=id).
924                                  join(models_v2.Port))
925                 is_auto_addr_subnet = ipv6_utils.is_auto_address_subnet(subnet)
926                 # Remove network owned ports, and delete IP allocations
927                 # for IPv6 addresses which were automatically generated
928                 # via SLAAC
929                 if is_auto_addr_subnet:
930                     self._subnet_check_ip_allocations_internal_router_ports(
931                             context, id)
932                 else:
933                     qry_allocated = (
934                         qry_allocated.filter(models_v2.Port.device_owner.
935                         in_(db_base_plugin_v2.AUTO_DELETE_PORT_OWNERS)))
936                 allocated = qry_allocated.all()
937                 # Delete all the IPAllocation that can be auto-deleted
938                 if allocated:
939                     for x in allocated:
940                         session.delete(x)
941                 LOG.debug("Ports to auto-deallocate: %s", allocated)
942                 # Check if there are more IP allocations, unless
943                 # is_auto_address_subnet is True. In that case the check is
944                 # unnecessary. This additional check not only would be wasteful
945                 # for this class of subnet, but is also error-prone since when
946                 # the isolation level is set to READ COMMITTED allocations made
947                 # concurrently will be returned by this query
948                 if not is_auto_addr_subnet:
949                     alloc = self._subnet_check_ip_allocations(context, id)
950                     if alloc:
951                         user_alloc = self._subnet_get_user_allocation(
952                             context, id)
953                         if user_alloc:
954                             LOG.info(_LI("Found port (%(port_id)s, %(ip)s) "
955                                          "having IP allocation on subnet "
956                                          "%(subnet)s, cannot delete"),
957                                      {'ip': user_alloc.ip_address,
958                                       'port_id': user_alloc.port_id,
959                                       'subnet': id})
960                             raise exc.SubnetInUse(subnet_id=id)
961                         else:
962                             # allocation found and it was DHCP port
963                             # that appeared after autodelete ports were
964                             # removed - need to restart whole operation
965                             raise os_db_exception.RetryRequest(
966                                 exc.SubnetInUse(subnet_id=id))
967
968                 db_base_plugin_v2._check_subnet_not_used(context, id)
969
970                 # If allocated is None, then all the IPAllocation were
971                 # correctly deleted during the previous pass.
972                 if not allocated:
973                     network = self.get_network(context, subnet['network_id'])
974                     mech_context = driver_context.SubnetContext(self, context,
975                                                                 subnet,
976                                                                 network)
977                     self.mechanism_manager.delete_subnet_precommit(
978                         mech_context)
979
980                     LOG.debug("Deleting subnet record")
981                     session.delete(record)
982
983                     # The super(Ml2Plugin, self).delete_subnet() is not called,
984                     # so need to manually call delete_subnet for pluggable ipam
985                     self.ipam.delete_subnet(context, id)
986
987                     LOG.debug("Committing transaction")
988                     break
989
990             for a in allocated:
991                 if a.port:
992                     # calling update_port() for each allocation to remove the
993                     # IP from the port and call the MechanismDrivers
994                     data = {attributes.PORT:
995                             {'fixed_ips': [{'subnet_id': ip.subnet_id,
996                                             'ip_address': ip.ip_address}
997                                            for ip in a.port.fixed_ips
998                                            if ip.subnet_id != id]}}
999                     try:
1000                         self.update_port(context, a.port_id, data)
1001                     except exc.PortNotFound:
1002                         LOG.debug("Port %s deleted concurrently", a.port_id)
1003                     except Exception:
1004                         with excutils.save_and_reraise_exception():
1005                             LOG.exception(_LE("Exception deleting fixed_ip "
1006                                               "from port %s"), a.port_id)
1007
1008         try:
1009             self.mechanism_manager.delete_subnet_postcommit(mech_context)
1010         except ml2_exc.MechanismDriverError:
1011             # TODO(apech) - One or more mechanism driver failed to
1012             # delete the subnet.  Ideally we'd notify the caller of
1013             # the fact that an error occurred.
1014             LOG.error(_LE("mechanism_manager.delete_subnet_postcommit failed"))
1015
1016     # TODO(yalei) - will be simplified after security group and address pair be
1017     # converted to ext driver too.
1018     def _portsec_ext_port_create_processing(self, context, port_data, port):
1019         attrs = port[attributes.PORT]
1020         port_security = ((port_data.get(psec.PORTSECURITY) is None) or
1021                          port_data[psec.PORTSECURITY])
1022
1023         # allowed address pair checks
1024         if self._check_update_has_allowed_address_pairs(port):
1025             if not port_security:
1026                 raise addr_pair.AddressPairAndPortSecurityRequired()
1027         else:
1028             # remove ATTR_NOT_SPECIFIED
1029             attrs[addr_pair.ADDRESS_PAIRS] = []
1030
1031         if port_security:
1032             self._ensure_default_security_group_on_port(context, port)
1033         elif self._check_update_has_security_groups(port):
1034             raise psec.PortSecurityAndIPRequiredForSecurityGroups()
1035
1036     def _create_port_db(self, context, port):
1037         attrs = port[attributes.PORT]
1038         if not attrs.get('status'):
1039             attrs['status'] = const.PORT_STATUS_DOWN
1040
1041         session = context.session
1042         with db_api.exc_to_retry(os_db_exception.DBDuplicateEntry),\
1043                 session.begin(subtransactions=True):
1044             dhcp_opts = attrs.get(edo_ext.EXTRADHCPOPTS, [])
1045             result = super(Ml2Plugin, self).create_port(context, port)
1046             self.extension_manager.process_create_port(context, attrs, result)
1047             self._portsec_ext_port_create_processing(context, result, port)
1048
1049             # sgids must be got after portsec checked with security group
1050             sgids = self._get_security_groups_on_port(context, port)
1051             self._process_port_create_security_group(context, result, sgids)
1052             network = self.get_network(context, result['network_id'])
1053             binding = db.add_port_binding(session, result['id'])
1054             mech_context = driver_context.PortContext(self, context, result,
1055                                                       network, binding, None)
1056             self._process_port_binding(mech_context, attrs)
1057
1058             result[addr_pair.ADDRESS_PAIRS] = (
1059                 self._process_create_allowed_address_pairs(
1060                     context, result,
1061                     attrs.get(addr_pair.ADDRESS_PAIRS)))
1062             self._process_port_create_extra_dhcp_opts(context, result,
1063                                                       dhcp_opts)
1064             self.mechanism_manager.create_port_precommit(mech_context)
1065
1066         return result, mech_context
1067
1068     def create_port(self, context, port):
1069         result, mech_context = self._create_port_db(context, port)
1070         # notify any plugin that is interested in port create events
1071         kwargs = {'context': context, 'port': result}
1072         registry.notify(resources.PORT, events.AFTER_CREATE, self, **kwargs)
1073
1074         try:
1075             self.mechanism_manager.create_port_postcommit(mech_context)
1076         except ml2_exc.MechanismDriverError:
1077             with excutils.save_and_reraise_exception():
1078                 LOG.error(_LE("mechanism_manager.create_port_postcommit "
1079                               "failed, deleting port '%s'"), result['id'])
1080                 self.delete_port(context, result['id'])
1081
1082         # REVISIT(rkukura): Is there any point in calling this before
1083         # a binding has been successfully established?
1084         self.notify_security_groups_member_updated(context, result)
1085
1086         try:
1087             bound_context = self._bind_port_if_needed(mech_context)
1088         except ml2_exc.MechanismDriverError:
1089             with excutils.save_and_reraise_exception():
1090                 LOG.error(_LE("_bind_port_if_needed "
1091                               "failed, deleting port '%s'"), result['id'])
1092                 self.delete_port(context, result['id'])
1093
1094         return bound_context.current
1095
1096     def create_port_bulk(self, context, ports):
1097         objects = self._create_bulk_ml2(attributes.PORT, context, ports)
1098
1099         # REVISIT(rkukura): Is there any point in calling this before
1100         # a binding has been successfully established?
1101         results = [obj['result'] for obj in objects]
1102         self.notify_security_groups_member_updated_bulk(context, results)
1103
1104         for obj in objects:
1105             attrs = obj['attributes']
1106             if attrs and attrs.get(portbindings.HOST_ID):
1107                 kwargs = {'context': context, 'port': obj['result']}
1108                 registry.notify(
1109                     resources.PORT, events.AFTER_CREATE, self, **kwargs)
1110
1111         try:
1112             for obj in objects:
1113                 obj['bound_context'] = self._bind_port_if_needed(
1114                     obj['mech_context'])
1115             return [obj['bound_context'].current for obj in objects]
1116         except ml2_exc.MechanismDriverError:
1117             with excutils.save_and_reraise_exception():
1118                 resource_ids = [res['result']['id'] for res in objects]
1119                 LOG.error(_LE("_bind_port_if_needed failed. "
1120                               "Deleting all ports from create bulk '%s'"),
1121                           resource_ids)
1122                 self._delete_objects(context, attributes.PORT, objects)
1123
1124     # TODO(yalei) - will be simplified after security group and address pair be
1125     # converted to ext driver too.
1126     def _portsec_ext_port_update_processing(self, updated_port, context, port,
1127                                             id):
1128         port_security = ((updated_port.get(psec.PORTSECURITY) is None) or
1129                          updated_port[psec.PORTSECURITY])
1130
1131         if port_security:
1132             return
1133
1134         # check the address-pairs
1135         if self._check_update_has_allowed_address_pairs(port):
1136             #  has address pairs in request
1137             raise addr_pair.AddressPairAndPortSecurityRequired()
1138         elif (not
1139          self._check_update_deletes_allowed_address_pairs(port)):
1140             # not a request for deleting the address-pairs
1141             updated_port[addr_pair.ADDRESS_PAIRS] = (
1142                     self.get_allowed_address_pairs(context, id))
1143
1144             # check if address pairs has been in db, if address pairs could
1145             # be put in extension driver, we can refine here.
1146             if updated_port[addr_pair.ADDRESS_PAIRS]:
1147                 raise addr_pair.AddressPairAndPortSecurityRequired()
1148
1149         # checks if security groups were updated adding/modifying
1150         # security groups, port security is set
1151         if self._check_update_has_security_groups(port):
1152             raise psec.PortSecurityAndIPRequiredForSecurityGroups()
1153         elif (not
1154           self._check_update_deletes_security_groups(port)):
1155             # Update did not have security groups passed in. Check
1156             # that port does not have any security groups already on it.
1157             filters = {'port_id': [id]}
1158             security_groups = (
1159                 super(Ml2Plugin, self)._get_port_security_group_bindings(
1160                         context, filters)
1161                      )
1162             if security_groups:
1163                 raise psec.PortSecurityPortHasSecurityGroup()
1164
1165     def update_port(self, context, id, port):
1166         attrs = port[attributes.PORT]
1167         need_port_update_notify = False
1168         session = context.session
1169         bound_mech_contexts = []
1170
1171         with db_api.exc_to_retry(os_db_exception.DBDuplicateEntry),\
1172                 session.begin(subtransactions=True):
1173             port_db, binding = db.get_locked_port_and_binding(session, id)
1174             if not port_db:
1175                 raise exc.PortNotFound(port_id=id)
1176             mac_address_updated = self._check_mac_update_allowed(
1177                 port_db, attrs, binding)
1178             need_port_update_notify |= mac_address_updated
1179             original_port = self._make_port_dict(port_db)
1180             updated_port = super(Ml2Plugin, self).update_port(context, id,
1181                                                               port)
1182             self.extension_manager.process_update_port(context, attrs,
1183                                                        updated_port)
1184             self._portsec_ext_port_update_processing(updated_port, context,
1185                                                      port, id)
1186
1187             if (psec.PORTSECURITY in attrs) and (
1188                         original_port[psec.PORTSECURITY] !=
1189                         updated_port[psec.PORTSECURITY]):
1190                 need_port_update_notify = True
1191             # TODO(QoS): Move out to the extension framework somehow.
1192             # Follow https://review.openstack.org/#/c/169223 for a solution.
1193             if (qos_consts.QOS_POLICY_ID in attrs and
1194                     original_port[qos_consts.QOS_POLICY_ID] !=
1195                     updated_port[qos_consts.QOS_POLICY_ID]):
1196                 need_port_update_notify = True
1197
1198             if addr_pair.ADDRESS_PAIRS in attrs:
1199                 need_port_update_notify |= (
1200                     self.update_address_pairs_on_port(context, id, port,
1201                                                       original_port,
1202                                                       updated_port))
1203             need_port_update_notify |= self.update_security_group_on_port(
1204                 context, id, port, original_port, updated_port)
1205             network = self.get_network(context, original_port['network_id'])
1206             need_port_update_notify |= self._update_extra_dhcp_opts_on_port(
1207                 context, id, port, updated_port)
1208             levels = db.get_binding_levels(session, id, binding.host)
1209             mech_context = driver_context.PortContext(
1210                 self, context, updated_port, network, binding, levels,
1211                 original_port=original_port)
1212             need_port_update_notify |= self._process_port_binding(
1213                 mech_context, attrs)
1214             # For DVR router interface ports we need to retrieve the
1215             # DVRPortbinding context instead of the normal port context.
1216             # The normal Portbinding context does not have the status
1217             # of the ports that are required by the l2pop to process the
1218             # postcommit events.
1219
1220             # NOTE:Sometimes during the update_port call, the DVR router
1221             # interface port may not have the port binding, so we cannot
1222             # create a generic bindinglist that will address both the
1223             # DVR and non-DVR cases here.
1224             # TODO(Swami): This code need to be revisited.
1225             if port_db['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
1226                 dvr_binding_list = db.get_dvr_port_bindings(session, id)
1227                 for dvr_binding in dvr_binding_list:
1228                     levels = db.get_binding_levels(session, id,
1229                                                    dvr_binding.host)
1230                     dvr_mech_context = driver_context.PortContext(
1231                         self, context, updated_port, network,
1232                         dvr_binding, levels, original_port=original_port)
1233                     self.mechanism_manager.update_port_precommit(
1234                         dvr_mech_context)
1235                     bound_mech_contexts.append(dvr_mech_context)
1236             else:
1237                 self.mechanism_manager.update_port_precommit(mech_context)
1238                 bound_mech_contexts.append(mech_context)
1239
1240         # Notifications must be sent after the above transaction is complete
1241         kwargs = {
1242             'context': context,
1243             'port': updated_port,
1244             'mac_address_updated': mac_address_updated,
1245             'original_port': original_port,
1246         }
1247         registry.notify(resources.PORT, events.AFTER_UPDATE, self, **kwargs)
1248
1249         # Note that DVR Interface ports will have bindings on
1250         # multiple hosts, and so will have multiple mech_contexts,
1251         # while other ports typically have just one.
1252         # Since bound_mech_contexts has both the DVR and non-DVR
1253         # contexts we can manage just with a single for loop.
1254         try:
1255             for mech_context in bound_mech_contexts:
1256                 self.mechanism_manager.update_port_postcommit(
1257                     mech_context)
1258         except ml2_exc.MechanismDriverError:
1259             LOG.error(_LE("mechanism_manager.update_port_postcommit "
1260                           "failed for port %s"), id)
1261
1262         self.check_and_notify_security_group_member_changed(
1263             context, original_port, updated_port)
1264         need_port_update_notify |= self.is_security_group_member_updated(
1265             context, original_port, updated_port)
1266
1267         if original_port['admin_state_up'] != updated_port['admin_state_up']:
1268             need_port_update_notify = True
1269         # NOTE: In the case of DVR ports, the port-binding is done after
1270         # router scheduling when sync_routers is called and so this call
1271         # below may not be required for DVR routed interfaces. But still
1272         # since we don't have the mech_context for the DVR router interfaces
1273         # at certain times, we just pass the port-context and return it, so
1274         # that we don't disturb other methods that are expecting a return
1275         # value.
1276         bound_context = self._bind_port_if_needed(
1277             mech_context,
1278             allow_notify=True,
1279             need_notify=need_port_update_notify)
1280         return bound_context.current
1281
1282     def _process_dvr_port_binding(self, mech_context, context, attrs):
1283         session = mech_context._plugin_context.session
1284         binding = mech_context._binding
1285         port = mech_context.current
1286         port_id = port['id']
1287
1288         if binding.vif_type != portbindings.VIF_TYPE_UNBOUND:
1289             binding.vif_details = ''
1290             binding.vif_type = portbindings.VIF_TYPE_UNBOUND
1291             if binding.host:
1292                 db.clear_binding_levels(session, port_id, binding.host)
1293             binding.host = ''
1294
1295         self._update_port_dict_binding(port, binding)
1296         binding.host = attrs and attrs.get(portbindings.HOST_ID)
1297         binding.router_id = attrs and attrs.get('device_id')
1298
1299     def update_dvr_port_binding(self, context, id, port):
1300         attrs = port[attributes.PORT]
1301
1302         host = attrs and attrs.get(portbindings.HOST_ID)
1303         host_set = attributes.is_attr_set(host)
1304
1305         if not host_set:
1306             LOG.error(_LE("No Host supplied to bind DVR Port %s"), id)
1307             return
1308
1309         session = context.session
1310         binding = db.get_dvr_port_binding_by_host(session, id, host)
1311         device_id = attrs and attrs.get('device_id')
1312         router_id = binding and binding.get('router_id')
1313         update_required = (not binding or
1314             binding.vif_type == portbindings.VIF_TYPE_BINDING_FAILED or
1315             router_id != device_id)
1316         if update_required:
1317             with session.begin(subtransactions=True):
1318                 try:
1319                     orig_port = super(Ml2Plugin, self).get_port(context, id)
1320                 except exc.PortNotFound:
1321                     LOG.debug("DVR Port %s has been deleted concurrently", id)
1322                     return
1323                 if not binding:
1324                     binding = db.ensure_dvr_port_binding(
1325                         session, id, host, router_id=device_id)
1326                 network = self.get_network(context, orig_port['network_id'])
1327                 levels = db.get_binding_levels(session, id, host)
1328                 mech_context = driver_context.PortContext(self,
1329                     context, orig_port, network,
1330                     binding, levels, original_port=orig_port)
1331                 self._process_dvr_port_binding(mech_context, context, attrs)
1332             self._bind_port_if_needed(mech_context)
1333
1334     def _pre_delete_port(self, context, port_id, port_check):
1335         """Do some preliminary operations before deleting the port."""
1336         LOG.debug("Deleting port %s", port_id)
1337         try:
1338             # notify interested parties of imminent port deletion;
1339             # a failure here prevents the operation from happening
1340             kwargs = {
1341                 'context': context,
1342                 'port_id': port_id,
1343                 'port_check': port_check
1344             }
1345             registry.notify(
1346                 resources.PORT, events.BEFORE_DELETE, self, **kwargs)
1347         except exceptions.CallbackFailure as e:
1348             # NOTE(armax): preserve old check's behavior
1349             if len(e.errors) == 1:
1350                 raise e.errors[0].error
1351             raise exc.ServicePortInUse(port_id=port_id, reason=e)
1352
1353     def delete_port(self, context, id, l3_port_check=True):
1354         self._pre_delete_port(context, id, l3_port_check)
1355         # TODO(armax): get rid of the l3 dependency in the with block
1356         removed_routers = []
1357         router_ids = []
1358         l3plugin = manager.NeutronManager.get_service_plugins().get(
1359             service_constants.L3_ROUTER_NAT)
1360         is_dvr_enabled = utils.is_extension_supported(
1361             l3plugin, const.L3_DISTRIBUTED_EXT_ALIAS)
1362
1363         session = context.session
1364         with session.begin(subtransactions=True):
1365             port_db, binding = db.get_locked_port_and_binding(session, id)
1366             if not port_db:
1367                 LOG.debug("The port '%s' was deleted", id)
1368                 return
1369             port = self._make_port_dict(port_db)
1370
1371             network = self.get_network(context, port['network_id'])
1372             bound_mech_contexts = []
1373             device_owner = port['device_owner']
1374             if device_owner == const.DEVICE_OWNER_DVR_INTERFACE:
1375                 bindings = db.get_dvr_port_bindings(context.session, id)
1376                 for bind in bindings:
1377                     levels = db.get_binding_levels(context.session, id,
1378                                                    bind.host)
1379                     mech_context = driver_context.PortContext(
1380                         self, context, port, network, bind, levels)
1381                     self.mechanism_manager.delete_port_precommit(mech_context)
1382                     bound_mech_contexts.append(mech_context)
1383             else:
1384                 levels = db.get_binding_levels(context.session, id,
1385                                                binding.host)
1386                 mech_context = driver_context.PortContext(
1387                     self, context, port, network, binding, levels)
1388                 if is_dvr_enabled and utils.is_dvr_serviced(device_owner):
1389                     removed_routers = l3plugin.dvr_deletens_if_no_port(
1390                         context, id)
1391                 self.mechanism_manager.delete_port_precommit(mech_context)
1392                 bound_mech_contexts.append(mech_context)
1393             if l3plugin:
1394                 router_ids = l3plugin.disassociate_floatingips(
1395                     context, id, do_notify=False)
1396
1397             LOG.debug("Calling delete_port for %(port_id)s owned by %(owner)s",
1398                       {"port_id": id, "owner": device_owner})
1399             super(Ml2Plugin, self).delete_port(context, id)
1400
1401         self._post_delete_port(
1402             context, port, router_ids, removed_routers, bound_mech_contexts)
1403
1404     def _post_delete_port(
1405         self, context, port, router_ids, removed_routers, bound_mech_contexts):
1406         kwargs = {
1407             'context': context,
1408             'port': port,
1409             'router_ids': router_ids,
1410             'removed_routers': removed_routers
1411         }
1412         registry.notify(resources.PORT, events.AFTER_DELETE, self, **kwargs)
1413         try:
1414             # Note that DVR Interface ports will have bindings on
1415             # multiple hosts, and so will have multiple mech_contexts,
1416             # while other ports typically have just one.
1417             for mech_context in bound_mech_contexts:
1418                 self.mechanism_manager.delete_port_postcommit(mech_context)
1419         except ml2_exc.MechanismDriverError:
1420             # TODO(apech) - One or more mechanism driver failed to
1421             # delete the port.  Ideally we'd notify the caller of the
1422             # fact that an error occurred.
1423             LOG.error(_LE("mechanism_manager.delete_port_postcommit failed for"
1424                           " port %s"), port['id'])
1425         self.notifier.port_delete(context, port['id'])
1426         self.notify_security_groups_member_updated(context, port)
1427
1428     def get_bound_port_context(self, plugin_context, port_id, host=None,
1429                                cached_networks=None):
1430         session = plugin_context.session
1431         with session.begin(subtransactions=True):
1432             try:
1433                 port_db = (session.query(models_v2.Port).
1434                            enable_eagerloads(False).
1435                            filter(models_v2.Port.id.startswith(port_id)).
1436                            one())
1437             except sa_exc.NoResultFound:
1438                 LOG.debug("No ports have port_id starting with %s",
1439                           port_id)
1440                 return
1441             except sa_exc.MultipleResultsFound:
1442                 LOG.error(_LE("Multiple ports have port_id starting with %s"),
1443                           port_id)
1444                 return
1445             port = self._make_port_dict(port_db)
1446             network = (cached_networks or {}).get(port['network_id'])
1447
1448             if not network:
1449                 network = self.get_network(plugin_context, port['network_id'])
1450
1451             if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
1452                 binding = db.get_dvr_port_binding_by_host(
1453                     session, port['id'], host)
1454                 if not binding:
1455                     LOG.error(_LE("Binding info for DVR port %s not found"),
1456                               port_id)
1457                     return None
1458                 levels = db.get_binding_levels(session, port_db.id, host)
1459                 port_context = driver_context.PortContext(
1460                     self, plugin_context, port, network, binding, levels)
1461             else:
1462                 # since eager loads are disabled in port_db query
1463                 # related attribute port_binding could disappear in
1464                 # concurrent port deletion.
1465                 # It's not an error condition.
1466                 binding = port_db.port_binding
1467                 if not binding:
1468                     LOG.info(_LI("Binding info for port %s was not found, "
1469                                  "it might have been deleted already."),
1470                              port_id)
1471                     return
1472                 levels = db.get_binding_levels(session, port_db.id,
1473                                                port_db.port_binding.host)
1474                 port_context = driver_context.PortContext(
1475                     self, plugin_context, port, network, binding, levels)
1476
1477         return self._bind_port_if_needed(port_context)
1478
1479     @oslo_db_api.wrap_db_retry(
1480         max_retries=db_api.MAX_RETRIES, retry_on_request=True,
1481         exception_checker=lambda e: isinstance(e, (sa_exc.StaleDataError,
1482                                                    os_db_exception.DBDeadlock))
1483     )
1484     def update_port_status(self, context, port_id, status, host=None,
1485                            network=None):
1486         """
1487         Returns port_id (non-truncated uuid) if the port exists.
1488         Otherwise returns None.
1489         network can be passed in to avoid another get_network call if
1490         one was already performed by the caller.
1491         """
1492         updated = False
1493         session = context.session
1494         with session.begin(subtransactions=True):
1495             port = db.get_port(session, port_id)
1496             if not port:
1497                 LOG.debug("Port %(port)s update to %(val)s by agent not found",
1498                           {'port': port_id, 'val': status})
1499                 return None
1500             if (port.status != status and
1501                 port['device_owner'] != const.DEVICE_OWNER_DVR_INTERFACE):
1502                 original_port = self._make_port_dict(port)
1503                 port.status = status
1504                 updated_port = self._make_port_dict(port)
1505                 network = network or self.get_network(
1506                     context, original_port['network_id'])
1507                 levels = db.get_binding_levels(session, port.id,
1508                                                port.port_binding.host)
1509                 mech_context = driver_context.PortContext(
1510                     self, context, updated_port, network, port.port_binding,
1511                     levels, original_port=original_port)
1512                 self.mechanism_manager.update_port_precommit(mech_context)
1513                 updated = True
1514             elif port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
1515                 binding = db.get_dvr_port_binding_by_host(
1516                     session, port['id'], host)
1517                 if not binding:
1518                     return
1519                 binding['status'] = status
1520                 binding.update(binding)
1521                 updated = True
1522
1523         if (updated and
1524             port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE):
1525             with session.begin(subtransactions=True):
1526                 port = db.get_port(session, port_id)
1527                 if not port:
1528                     LOG.warning(_LW("Port %s not found during update"),
1529                                 port_id)
1530                     return
1531                 original_port = self._make_port_dict(port)
1532                 network = network or self.get_network(
1533                     context, original_port['network_id'])
1534                 port.status = db.generate_dvr_port_status(session, port['id'])
1535                 updated_port = self._make_port_dict(port)
1536                 levels = db.get_binding_levels(session, port_id, host)
1537                 mech_context = (driver_context.PortContext(
1538                     self, context, updated_port, network,
1539                     binding, levels, original_port=original_port))
1540                 self.mechanism_manager.update_port_precommit(mech_context)
1541
1542         if updated:
1543             self.mechanism_manager.update_port_postcommit(mech_context)
1544
1545         if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
1546             db.delete_dvr_port_binding_if_stale(session, binding)
1547
1548         return port['id']
1549
1550     def port_bound_to_host(self, context, port_id, host):
1551         port = db.get_port(context.session, port_id)
1552         if not port:
1553             LOG.debug("No Port match for: %s", port_id)
1554             return False
1555         if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
1556             bindings = db.get_dvr_port_bindings(context.session, port_id)
1557             for b in bindings:
1558                 if b.host == host:
1559                     return True
1560             LOG.debug("No binding found for DVR port %s", port['id'])
1561             return False
1562         else:
1563             port_host = db.get_port_binding_host(context.session, port_id)
1564             return (port_host == host)
1565
1566     def get_ports_from_devices(self, context, devices):
1567         port_ids_to_devices = dict(
1568             (self._device_to_port_id(context, device), device)
1569             for device in devices)
1570         port_ids = list(port_ids_to_devices.keys())
1571         ports = db.get_ports_and_sgs(context, port_ids)
1572         for port in ports:
1573             # map back to original requested id
1574             port_id = next((port_id for port_id in port_ids
1575                            if port['id'].startswith(port_id)), None)
1576             port['device'] = port_ids_to_devices.get(port_id)
1577
1578         return ports
1579
1580     @staticmethod
1581     def _device_to_port_id(context, device):
1582         # REVISIT(rkukura): Consider calling into MechanismDrivers to
1583         # process device names, or having MechanismDrivers supply list
1584         # of device prefixes to strip.
1585         for prefix in const.INTERFACE_PREFIXES:
1586             if device.startswith(prefix):
1587                 return device[len(prefix):]
1588         # REVISIT(irenab): Consider calling into bound MD to
1589         # handle the get_device_details RPC
1590         if not uuidutils.is_uuid_like(device):
1591             port = db.get_port_from_device_mac(context, device)
1592             if port:
1593                 return port.id
1594         return device
1595
1596     def get_workers(self):
1597         return self.mechanism_manager.get_workers()