]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
OVS flows apply concurrently using a deferred OVSBridge
authorcedric.brandily <zzelle@gmail.com>
Fri, 16 May 2014 20:18:45 +0000 (16:18 -0400)
committerCedric Brandily <zzelle@gmail.com>
Mon, 4 Aug 2014 23:14:47 +0000 (01:14 +0200)
This change is an improvement of the commit
501213686886baccd3280e10b8856a25d3517519 and provides a cleaner
implementation. Previously flows were applied on
OVSBridge.defer_apply_off which could be called by an other
greenthread: it was impossible to ensure that all flows are applied
in a unique OVSBridge.defer_apply_off call. This change ensures that
all flows defined using a DeferredOVSBridge are applied on
DeferredOVSBridge.apply_flows or DeferredOVSBridge.__exit__ if not
exception is raised.

Author:         Cedric Brandily <zzelle@gmail.com>
Co-Authored-By: Edouard Thuleau <edouard.thuleau@cloudwatt.com>
Related-bug: #1263866
Change-Id: I1f260629ef95b98ee80e2ff946c3606da8fe7608

neutron/agent/l2population_rpc.py
neutron/agent/linux/ovs_lib.py
neutron/plugins/ofagent/agent/ofa_neutron_agent.py
neutron/plugins/openvswitch/agent/ovs_neutron_agent.py
neutron/tests/unit/agent/l2population_rpc_base.py
neutron/tests/unit/agent/linux/test_ovs_lib.py
neutron/tests/unit/agent/test_l2population_rpc.py
neutron/tests/unit/ofagent/test_ofa_neutron_agent.py
neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py

index 315a478c292d4b40be1e6edcf6a5ab208fb65ac3..b3dbc015e2fd7486d289991d9ebe932598f7856d 100644 (file)
@@ -74,7 +74,7 @@ class L2populationRpcCallBackTunnelMixin(L2populationRpcCallBackMixin):
     '''
 
     @abc.abstractmethod
-    def add_fdb_flow(self, port_info, remote_ip, lvm, ofport):
+    def add_fdb_flow(self, br, port_info, remote_ip, lvm, ofport):
         '''Add flow for fdb
 
         This method assumes to be used by method fdb_add_tun.
@@ -82,7 +82,10 @@ class L2populationRpcCallBackTunnelMixin(L2populationRpcCallBackMixin):
         on bridge.
         And you may edit some information for local arp respond.
 
+        :param br: represent the bridge on which add_fdb_flow should be
+        applied.
         :param port_info: list to include mac and ip.
+
             [mac, ip]
         :remote_ip: remote ip address.
         :param lvm: a local VLAN map of network.
@@ -91,7 +94,7 @@ class L2populationRpcCallBackTunnelMixin(L2populationRpcCallBackMixin):
         pass
 
     @abc.abstractmethod
-    def del_fdb_flow(self, port_info, remote_ip, lvm, ofport):
+    def del_fdb_flow(self, br, port_info, remote_ip, lvm, ofport):
         '''Delete flow for fdb
 
         This method assumes to be used by method fdb_remove_tun.
@@ -99,6 +102,8 @@ class L2populationRpcCallBackTunnelMixin(L2populationRpcCallBackMixin):
         from bridge.
         And you may delete some information for local arp respond.
 
+        :param br: represent the bridge on which del_fdb_flow should be
+        applied.
         :param port_info: a list to contain mac and ip.
             [mac, ip]
         :remote_ip: remote ip address.
@@ -108,7 +113,7 @@ class L2populationRpcCallBackTunnelMixin(L2populationRpcCallBackMixin):
         pass
 
     @abc.abstractmethod
-    def setup_tunnel_port(self, remote_ip, network_type):
+    def setup_tunnel_port(self, br, remote_ip, network_type):
         '''Setup an added tunnel port.
 
         This method assumes to be used by method fdb_add_tun.
@@ -116,6 +121,8 @@ class L2populationRpcCallBackTunnelMixin(L2populationRpcCallBackMixin):
         a port to a bridge.
         If you need, you may do some preparation for a bridge.
 
+        :param br: represent the bridge on which setup_tunnel_port should be
+        applied.
         :param remote_ip: an ip for port to setup.
         :param network_type: a type of network.
         :returns: a ofport value. the value 0 means to be unavailable port.
@@ -123,7 +130,7 @@ class L2populationRpcCallBackTunnelMixin(L2populationRpcCallBackMixin):
         pass
 
     @abc.abstractmethod
-    def cleanup_tunnel_port(self, tun_ofport, tunnel_type):
+    def cleanup_tunnel_port(self, br, tun_ofport, tunnel_type):
         '''Clean up a deleted tunnel port.
 
         This method assumes to be used by method fdb_remove_tun.
@@ -131,19 +138,23 @@ class L2populationRpcCallBackTunnelMixin(L2populationRpcCallBackMixin):
         deleting a port from a bridge.
         If you need, you may do some cleanup for a bridge.
 
+        :param br: represent the bridge on which cleanup_tunnel_port should be
+        applied.
         :param tun_ofport: a port value to cleanup.
         :param tunnel_type: a type of tunnel.
         '''
         pass
 
     @abc.abstractmethod
-    def setup_entry_for_arp_reply(self, action, local_vid, mac_address,
+    def setup_entry_for_arp_reply(self, br, action, local_vid, mac_address,
                                   ip_address):
         '''Operate the ARP respond information.
 
         Do operation of arp respond information for an action
         In ovs do adding or removing flow entry to edit an arp reply.
 
+        :param br: represent the bridge on which setup_entry_for_arp_reply
+        should be applied.
         :param action: an action to operate for arp respond infomation.
             "add" or "remove"
         :param local_vid: id in local VLAN map of network's ARP entry.
@@ -159,28 +170,29 @@ class L2populationRpcCallBackTunnelMixin(L2populationRpcCallBackMixin):
             yield (lvm, agent_ports)
 
     @log.log
-    def fdb_add_tun(self, context, lvm, agent_ports, ofports):
+    def fdb_add_tun(self, context, br, lvm, agent_ports, ofports):
         for remote_ip, ports in agent_ports.items():
             # Ensure we have a tunnel port with this remote agent
             ofport = ofports[lvm.network_type].get(remote_ip)
             if not ofport:
-                ofport = self.setup_tunnel_port(remote_ip, lvm.network_type)
+                ofport = self.setup_tunnel_port(br, remote_ip,
+                                                lvm.network_type)
                 if ofport == 0:
                     continue
             for port in ports:
-                self.add_fdb_flow(port, remote_ip, lvm, ofport)
+                self.add_fdb_flow(br, port, remote_ip, lvm, ofport)
 
     @log.log
-    def fdb_remove_tun(self, context, lvm, agent_ports, ofports):
+    def fdb_remove_tun(self, context, br, lvm, agent_ports, ofports):
         for remote_ip, ports in agent_ports.items():
             ofport = ofports[lvm.network_type].get(remote_ip)
             if not ofport:
                 continue
             for port in ports:
-                self.del_fdb_flow(port, remote_ip, lvm, ofport)
+                self.del_fdb_flow(br, port, remote_ip, lvm, ofport)
                 if port == n_const.FLOODING_ENTRY:
                     # Check if this tunnel port is still used
-                    self.cleanup_tunnel_port(ofport, lvm.network_type)
+                    self.cleanup_tunnel_port(br, ofport, lvm.network_type)
 
     @log.log
     def fdb_update(self, context, fdb_entries):
@@ -198,13 +210,16 @@ class L2populationRpcCallBackTunnelMixin(L2populationRpcCallBackMixin):
             getattr(self, method)(context, values)
 
     @log.log
-    def fdb_chg_ip_tun(self, context, fdb_entries, local_ip, local_vlan_map):
+    def fdb_chg_ip_tun(self, context, br, fdb_entries, local_ip,
+                       local_vlan_map):
         '''fdb update when an IP of a port is updated.
 
         The ML2 l2-pop mechanism driver sends an fdb update rpc message when an
         IP of a port is updated.
 
         :param context: RPC context.
+        :param br: represent the bridge on which fdb_chg_ip_tun should be
+        applied.
         :param fdb_entries: fdb dicts that contain all mac/IP informations per
                             agent and network.
                                {'net1':
@@ -231,8 +246,10 @@ class L2populationRpcCallBackTunnelMixin(L2populationRpcCallBackMixin):
 
                 after = state.get('after')
                 for mac, ip in after:
-                    self.setup_entry_for_arp_reply('add', lvm.vlan, mac, ip)
+                    self.setup_entry_for_arp_reply(br, 'add', lvm.vlan, mac,
+                                                   ip)
 
                 before = state.get('before')
                 for mac, ip in before:
-                    self.setup_entry_for_arp_reply('remove', lvm.vlan, mac, ip)
+                    self.setup_entry_for_arp_reply(br, 'remove', lvm.vlan, mac,
+                                                   ip)
index c8421a2124ad34581573751d58654a215a1c6f05..90be0c3b52aa87883e501e70e2672ca3c916f4db 100644 (file)
@@ -13,6 +13,9 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import itertools
+import operator
+
 from oslo.config import cfg
 
 from neutron.agent.linux import ip_lib
@@ -103,8 +106,6 @@ class OVSBridge(BaseOVS):
     def __init__(self, br_name, root_helper):
         super(OVSBridge, self).__init__(root_helper)
         self.br_name = br_name
-        self.defer_apply_flows = False
-        self.deferred_flows = {'add': '', 'mod': '', 'del': ''}
 
     def set_controller(self, controller_names):
         vsctl_command = ['--', 'set-controller', self.br_name]
@@ -188,26 +189,18 @@ class OVSBridge(BaseOVS):
         return self.db_get_val('Bridge',
                                self.br_name, 'datapath_id').strip('"')
 
+    def do_action_flows(self, action, kwargs_list):
+        flow_strs = [_build_flow_expr_str(kw, action) for kw in kwargs_list]
+        self.run_ofctl('%s-flows' % action, ['-'], '\n'.join(flow_strs))
+
     def add_flow(self, **kwargs):
-        flow_str = _build_flow_expr_str(kwargs, 'add')
-        if self.defer_apply_flows:
-            self.deferred_flows['add'] += flow_str + '\n'
-        else:
-            self.run_ofctl("add-flow", [flow_str])
+        self.do_action_flows('add', [kwargs])
 
     def mod_flow(self, **kwargs):
-        flow_str = _build_flow_expr_str(kwargs, 'mod')
-        if self.defer_apply_flows:
-            self.deferred_flows['mod'] += flow_str + '\n'
-        else:
-            self.run_ofctl("mod-flows", [flow_str])
+        self.do_action_flows('mod', [kwargs])
 
     def delete_flows(self, **kwargs):
-        flow_expr_str = _build_flow_expr_str(kwargs, 'del')
-        if self.defer_apply_flows:
-            self.deferred_flows['del'] += flow_expr_str + '\n'
-        else:
-            self.run_ofctl("del-flows", [flow_expr_str])
+        self.do_action_flows('del', [kwargs])
 
     def dump_flows_for_table(self, table):
         retval = None
@@ -218,39 +211,8 @@ class OVSBridge(BaseOVS):
                                if 'NXST' not in item)
         return retval
 
-    def defer_apply_on(self):
-        # TODO(vivek): when defer_apply_on is used, DVR
-        # flows are only getting partially configured when
-        # run concurrently with l2-pop ON.
-        # Will need make ovs_lib flow API context sensitive
-        # and then use the same across this file, which will
-        # address the race issue here.
-        LOG.debug(_('defer_apply_on'))
-        self.defer_apply_flows = True
-
-    def defer_apply_off(self):
-        # TODO(vivek): when defer_apply_off is used, DVR
-        # flows are only getting partially configured when
-        # run concurrently with l2-pop ON.
-        # Will need make ovs_lib flow API context sensitive
-        # and then use the same across this file, which will
-        # address the race issue here.
-        LOG.debug(_('defer_apply_off'))
-        # Note(ethuleau): stash flows and disable deferred mode. Then apply
-        # flows from the stashed reference to be sure to not purge flows that
-        # were added between two ofctl commands.
-        stashed_deferred_flows, self.deferred_flows = (
-            self.deferred_flows, {'add': '', 'mod': '', 'del': ''}
-        )
-        self.defer_apply_flows = False
-        for action, flows in stashed_deferred_flows.items():
-            if flows:
-                LOG.debug(_('Applying following deferred flows '
-                            'to bridge %s'), self.br_name)
-                for line in flows.splitlines():
-                    LOG.debug(_('%(action)s: %(flow)s'),
-                              {'action': action, 'flow': line})
-                self.run_ofctl('%s-flows' % action, ['-'], flows)
+    def deferred(self, **kwargs):
+        return DeferredOVSBridge(self, **kwargs)
 
     def add_tunnel_port(self, port_name, remote_ip, local_ip,
                         tunnel_type=p_const.TYPE_GRE,
@@ -488,6 +450,77 @@ class OVSBridge(BaseOVS):
         self.destroy()
 
 
+class DeferredOVSBridge(object):
+    '''Deferred OVSBridge.
+
+    This class wraps add_flow, mod_flow and delete_flows calls to an OVSBridge
+    and defers their application until apply_flows call in order to perform
+    bulk calls. It wraps also ALLOWED_PASSTHROUGHS calls to avoid mixing
+    OVSBridge and DeferredOVSBridge uses.
+    This class can be used as a context, in such case apply_flows is called on
+    __exit__ except if an exception is raised.
+    This class is not thread-safe, that's why for every use a new instance
+    must be implemented.
+    '''
+    ALLOWED_PASSTHROUGHS = 'add_port', 'delete_port'
+
+    def __init__(self, br, full_ordered=False,
+                 order=('add', 'mod', 'del')):
+        '''Constructor.
+
+        :param br: wrapped bridge
+        :param full_ordered: Optional, disable flow reordering (slower)
+        :param order: Optional, define in which order flow are applied
+        '''
+
+        self.br = br
+        self.full_ordered = full_ordered
+        self.order = order
+        if not self.full_ordered:
+            self.weights = dict((y, x) for x, y in enumerate(self.order))
+        self.action_flow_tuples = []
+
+    def __getattr__(self, name):
+        if name in self.ALLOWED_PASSTHROUGHS:
+            return getattr(self.br, name)
+        raise AttributeError(name)
+
+    def add_flow(self, **kwargs):
+        self.action_flow_tuples.append(('add', kwargs))
+
+    def mod_flow(self, **kwargs):
+        self.action_flow_tuples.append(('mod', kwargs))
+
+    def delete_flows(self, **kwargs):
+        self.action_flow_tuples.append(('del', kwargs))
+
+    def apply_flows(self):
+        action_flow_tuples = self.action_flow_tuples
+        self.action_flow_tuples = []
+        if not action_flow_tuples:
+            return
+
+        if not self.full_ordered:
+            action_flow_tuples.sort(key=lambda af: self.weights[af[0]])
+
+        grouped = itertools.groupby(action_flow_tuples,
+                                    key=operator.itemgetter(0))
+        itemgetter_1 = operator.itemgetter(1)
+        for action, action_flow_list in grouped:
+            flows = map(itemgetter_1, action_flow_list)
+            self.br.do_action_flows(action, flows)
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        if exc_type is None:
+            self.apply_flows()
+        else:
+            LOG.exception(_("OVS flows could not be applied on bridge %s"),
+                          self.br.br_name)
+
+
 def get_bridge_for_iface(root_helper, iface):
     args = ["ovs-vsctl", "--timeout=%d" % cfg.CONF.ovs_vsctl_timeout,
             "iface-to-br", iface]
index 33d849b685adfb162c010ed0446ecfbc59b0987e..af4d3bca5a9bea601cb1f5f39df8f7c2e07781c5 100644 (file)
@@ -367,7 +367,7 @@ class OFANeutronAgent(n_rpc.RpcCallback,
                                                      self.local_vlan_map):
             agent_ports.pop(self.local_ip, None)
             if len(agent_ports):
-                self.fdb_add_tun(context, lvm, agent_ports,
+                self.fdb_add_tun(context, self.tun_br, lvm, agent_ports,
                                  self.tun_br_ofports)
 
     def fdb_remove(self, context, fdb_entries):
@@ -376,11 +376,11 @@ class OFANeutronAgent(n_rpc.RpcCallback,
                                                      self.local_vlan_map):
             agent_ports.pop(self.local_ip, None)
             if len(agent_ports):
-                self.fdb_remove_tun(context, lvm, agent_ports,
+                self.fdb_remove_tun(context, self.tun_br, lvm, agent_ports,
                                     self.tun_br_ofports)
 
-    def _add_fdb_flooding_flow(self, lvm):
-        datapath = self.tun_br.datapath
+    def _add_fdb_flooding_flow(self, br, lvm):
+        datapath = br.datapath
         ofp = datapath.ofproto
         ofpp = datapath.ofproto_parser
         match = ofpp.OFPMatch(
@@ -399,13 +399,13 @@ class OFANeutronAgent(n_rpc.RpcCallback,
                               match=match, instructions=instructions)
         self.ryu_send_msg(msg)
 
-    def add_fdb_flow(self, port_info, remote_ip, lvm, ofport):
-        datapath = self.tun_br.datapath
+    def add_fdb_flow(self, br, port_info, remote_ip, lvm, ofport):
+        datapath = br.datapath
         ofp = datapath.ofproto
         ofpp = datapath.ofproto_parser
         if port_info == n_const.FLOODING_ENTRY:
             lvm.tun_ofports.add(ofport)
-            self._add_fdb_flooding_flow(lvm)
+            self._add_fdb_flooding_flow(br, lvm)
         else:
             self.ryuapp.add_arp_table_entry(
                 lvm.vlan, port_info[1], port_info[0])
@@ -425,14 +425,14 @@ class OFANeutronAgent(n_rpc.RpcCallback,
                                   match=match, instructions=instructions)
             self.ryu_send_msg(msg)
 
-    def del_fdb_flow(self, port_info, remote_ip, lvm, ofport):
-        datapath = self.tun_br.datapath
+    def del_fdb_flow(self, br, port_info, remote_ip, lvm, ofport):
+        datapath = br.datapath
         ofp = datapath.ofproto
         ofpp = datapath.ofproto_parser
         if port_info == n_const.FLOODING_ENTRY:
             lvm.tun_ofports.remove(ofport)
             if len(lvm.tun_ofports) > 0:
-                self._add_fdb_flooding_flow(lvm)
+                self._add_fdb_flooding_flow(br, lvm)
             else:
                 # This local vlan doesn't require any more tunelling
                 match = ofpp.OFPMatch(
@@ -457,7 +457,7 @@ class OFANeutronAgent(n_rpc.RpcCallback,
                                   match=match)
             self.ryu_send_msg(msg)
 
-    def setup_entry_for_arp_reply(self, action, local_vid, mac_address,
+    def setup_entry_for_arp_reply(self, br, action, local_vid, mac_address,
                                   ip_address):
         if action == 'add':
             self.ryuapp.add_arp_table_entry(local_vid, ip_address, mac_address)
@@ -466,8 +466,8 @@ class OFANeutronAgent(n_rpc.RpcCallback,
 
     def _fdb_chg_ip(self, context, fdb_entries):
         LOG.debug("update chg_ip received")
-        self.fdb_chg_ip_tun(
-            context, fdb_entries, self.local_ip, self.local_vlan_map)
+        self.fdb_chg_ip_tun(context, self.tun_br, fdb_entries, self.local_ip,
+                            self.local_vlan_map)
 
     def _provision_local_vlan_inbound_for_tunnel(self, lvid, network_type,
                                                  segmentation_id):
@@ -673,7 +673,8 @@ class OFANeutronAgent(n_rpc.RpcCallback,
                 self.ryu_send_msg(msg)
                 # Try to remove tunnel ports if not used by other networks
                 for ofport in lvm.tun_ofports:
-                    self.cleanup_tunnel_port(ofport, lvm.network_type)
+                    self.cleanup_tunnel_port(self.tun_br, ofport,
+                                             lvm.network_type)
         elif lvm.network_type in (p_const.TYPE_FLAT, p_const.TYPE_VLAN):
             if lvm.physical_network in self.phys_brs:
                 self._reclaim_local_vlan_outbound(lvm)
@@ -1078,13 +1079,13 @@ class OFANeutronAgent(n_rpc.RpcCallback,
         else:
             LOG.debug(_("No VIF port for port %s defined on agent."), port_id)
 
-    def _setup_tunnel_port(self, port_name, remote_ip, tunnel_type):
-        ofport = self.tun_br.add_tunnel_port(port_name,
-                                             remote_ip,
-                                             self.local_ip,
-                                             tunnel_type,
-                                             self.vxlan_udp_port,
-                                             self.dont_fragment)
+    def _setup_tunnel_port(self, br, port_name, remote_ip, tunnel_type):
+        ofport = br.add_tunnel_port(port_name,
+                                    remote_ip,
+                                    self.local_ip,
+                                    tunnel_type,
+                                    self.vxlan_udp_port,
+                                    self.dont_fragment)
         ofport_int = -1
         try:
             ofport_int = int(ofport)
@@ -1099,27 +1100,28 @@ class OFANeutronAgent(n_rpc.RpcCallback,
         self.tun_br_ofports[tunnel_type][remote_ip] = ofport
         # Add flow in default table to resubmit to the right
         # tunelling table (lvid will be set in the latter)
-        match = self.tun_br.ofparser.OFPMatch(in_port=int(ofport))
-        instructions = [self.tun_br.ofparser.OFPInstructionGotoTable(
+        match = br.ofparser.OFPMatch(in_port=int(ofport))
+        instructions = [br.ofparser.OFPInstructionGotoTable(
             table_id=constants.TUN_TABLE[tunnel_type])]
-        msg = self.tun_br.ofparser.OFPFlowMod(self.tun_br.datapath,
-                                              priority=1,
-                                              match=match,
-                                              instructions=instructions)
+        msg = br.ofparser.OFPFlowMod(br.datapath,
+                                     priority=1,
+                                     match=match,
+                                     instructions=instructions)
         self.ryu_send_msg(msg)
         return ofport
 
-    def setup_tunnel_port(self, remote_ip, network_type):
+    def setup_tunnel_port(self, br, remote_ip, network_type):
         port_name = self._create_tunnel_port_name(network_type, remote_ip)
         if not port_name:
             return 0
-        ofport = self._setup_tunnel_port(port_name,
+        ofport = self._setup_tunnel_port(br,
+                                         port_name,
                                          remote_ip,
                                          network_type)
         return ofport
 
-    def _remove_tunnel_port(self, tun_ofport, tunnel_type):
-        datapath = self.tun_br.datapath
+    def _remove_tunnel_port(self, br, tun_ofport, tunnel_type):
+        datapath = br.datapath
         ofp = datapath.ofproto
         ofpp = datapath.ofproto_parser
         for remote_ip, ofport in self.tun_br_ofports[tunnel_type].items():
@@ -1127,7 +1129,7 @@ class OFANeutronAgent(n_rpc.RpcCallback,
                 port_name = self._create_tunnel_port_name(tunnel_type,
                                                           remote_ip)
                 if port_name:
-                    self.tun_br.delete_port(port_name)
+                    br.delete_port(port_name)
                 match = ofpp.OFPMatch(in_port=int(ofport))
                 msg = ofpp.OFPFlowMod(datapath,
                                       command=ofp.OFPFC_DELETE,
@@ -1137,14 +1139,14 @@ class OFANeutronAgent(n_rpc.RpcCallback,
                 self.ryu_send_msg(msg)
                 self.tun_br_ofports[tunnel_type].pop(remote_ip, None)
 
-    def cleanup_tunnel_port(self, tun_ofport, tunnel_type):
+    def cleanup_tunnel_port(self, br, tun_ofport, tunnel_type):
         # Check if this tunnel port is still used
         for lvm in self.local_vlan_map.values():
             if tun_ofport in lvm.tun_ofports:
                 break
         # If not, remove it
         else:
-            self._remove_tunnel_port(tun_ofport, tunnel_type)
+            self._remove_tunnel_port(br, tun_ofport, tunnel_type)
 
     def treat_devices_added_or_updated(self, devices):
         resync = False
index f0c90be672075aa2161bf86f549590b7e2be693f..d373126cfbd103f10c6efda126fe2d914d21b86f 100644 (file)
@@ -332,7 +332,8 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
             return
         tun_name = '%s-%s' % (tunnel_type, tunnel_id)
         if not self.l2_pop:
-            self._setup_tunnel_port(tun_name, tunnel_ip, tunnel_type)
+            self._setup_tunnel_port(self.tun_br, tun_name, tunnel_ip,
+                                    tunnel_type)
 
     def fdb_add(self, context, fdb_entries):
         LOG.debug("fdb_add received")
@@ -341,11 +342,12 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
             agent_ports.pop(self.local_ip, None)
             if len(agent_ports):
                 if not self.enable_distributed_routing:
-                    self.tun_br.defer_apply_on()
-                self.fdb_add_tun(context, lvm, agent_ports,
-                                 self.tun_br_ofports)
-                if not self.enable_distributed_routing:
-                    self.tun_br.defer_apply_off()
+                    with self.tun_br.deferred() as deferred_br:
+                        self.fdb_add_tun(context, deferred_br, lvm,
+                                         agent_ports, self.tun_br_ofports)
+                else:
+                    self.fdb_add_tun(context, self.tun_br, lvm,
+                                     agent_ports, self.tun_br_ofports)
 
     def fdb_remove(self, context, fdb_entries):
         LOG.debug("fdb_remove received")
@@ -354,59 +356,58 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
             agent_ports.pop(self.local_ip, None)
             if len(agent_ports):
                 if not self.enable_distributed_routing:
-                    self.tun_br.defer_apply_on()
-                self.fdb_remove_tun(context, lvm, agent_ports,
-                                    self.tun_br_ofports)
-                if not self.enable_distributed_routing:
-                    self.tun_br.defer_apply_off()
+                    with self.tun_br.deferred() as deferred_br:
+                        self.fdb_remove_tun(context, deferred_br, lvm,
+                                            agent_ports, self.tun_br_ofports)
+                else:
+                    self.fdb_remove_tun(context, self.tun_br, lvm,
+                                        agent_ports, self.tun_br_ofports)
 
-    def add_fdb_flow(self, port_info, remote_ip, lvm, ofport):
+    def add_fdb_flow(self, br, port_info, remote_ip, lvm, ofport):
         if port_info == q_const.FLOODING_ENTRY:
             lvm.tun_ofports.add(ofport)
             ofports = ','.join(lvm.tun_ofports)
-            self.tun_br.mod_flow(table=constants.FLOOD_TO_TUN,
-                                 dl_vlan=lvm.vlan,
-                                 actions="strip_vlan,set_tunnel:%s,"
-                                 "output:%s" % (lvm.segmentation_id, ofports))
+            br.mod_flow(table=constants.FLOOD_TO_TUN,
+                        dl_vlan=lvm.vlan,
+                        actions="strip_vlan,set_tunnel:%s,output:%s" %
+                        (lvm.segmentation_id, ofports))
         else:
-            self.setup_entry_for_arp_reply('add', lvm.vlan, port_info[0],
+            self.setup_entry_for_arp_reply(br, 'add', lvm.vlan, port_info[0],
                                            port_info[1])
             if not self.dvr_agent.is_dvr_router_interface(port_info[1]):
-                self.tun_br.add_flow(table=constants.UCAST_TO_TUN,
-                                     priority=2,
-                                     dl_vlan=lvm.vlan,
-                                     dl_dst=port_info[0],
-                                     actions="strip_vlan,set_tunnel:%s,"
-                                     "output:%s" %
-                                     (lvm.segmentation_id, ofport))
-
-    def del_fdb_flow(self, port_info, remote_ip, lvm, ofport):
+                br.add_flow(table=constants.UCAST_TO_TUN,
+                            priority=2,
+                            dl_vlan=lvm.vlan,
+                            dl_dst=port_info[0],
+                            actions="strip_vlan,set_tunnel:%s,output:%s" %
+                            (lvm.segmentation_id, ofport))
+
+    def del_fdb_flow(self, br, port_info, remote_ip, lvm, ofport):
         if port_info == q_const.FLOODING_ENTRY:
             lvm.tun_ofports.remove(ofport)
             if len(lvm.tun_ofports) > 0:
                 ofports = ','.join(lvm.tun_ofports)
-                self.tun_br.mod_flow(table=constants.FLOOD_TO_TUN,
-                                     dl_vlan=lvm.vlan,
-                                     actions="strip_vlan,"
-                                     "set_tunnel:%s,output:%s" %
-                                     (lvm.segmentation_id, ofports))
+                br.mod_flow(table=constants.FLOOD_TO_TUN,
+                            dl_vlan=lvm.vlan,
+                            actions="strip_vlan,set_tunnel:%s,output:%s" %
+                            (lvm.segmentation_id, ofports))
             else:
                 # This local vlan doesn't require any more tunnelling
-                self.tun_br.delete_flows(table=constants.FLOOD_TO_TUN,
-                                         dl_vlan=lvm.vlan)
+                br.delete_flows(table=constants.FLOOD_TO_TUN, dl_vlan=lvm.vlan)
         else:
-            self.setup_entry_for_arp_reply('remove', lvm.vlan, port_info[0],
-                                           port_info[1])
-            self.tun_br.delete_flows(table=constants.UCAST_TO_TUN,
-                                     dl_vlan=lvm.vlan,
-                                     dl_dst=port_info[0])
+            self.setup_entry_for_arp_reply(br, 'remove', lvm.vlan,
+                                           port_info[0], port_info[1])
+            br.delete_flows(table=constants.UCAST_TO_TUN,
+                            dl_vlan=lvm.vlan,
+                            dl_dst=port_info[0])
 
     def _fdb_chg_ip(self, context, fdb_entries):
         LOG.debug("update chg_ip received")
-        self.fdb_chg_ip_tun(
-            context, fdb_entries, self.local_ip, self.local_vlan_map)
+        with self.tun_br.deferred() as deferred_br:
+            self.fdb_chg_ip_tun(context, deferred_br, fdb_entries,
+                                self.local_ip, self.local_vlan_map)
 
-    def setup_entry_for_arp_reply(self, action, local_vid, mac_address,
+    def setup_entry_for_arp_reply(self, br, action, local_vid, mac_address,
                                   ip_address):
         '''Set the ARP respond entry.
 
@@ -422,17 +423,17 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
 
         if action == 'add':
             actions = constants.ARP_RESPONDER_ACTIONS % {'mac': mac, 'ip': ip}
-            self.tun_br.add_flow(table=constants.ARP_RESPONDER,
-                                 priority=1,
-                                 proto='arp',
-                                 dl_vlan=local_vid,
-                                 nw_dst='%s' % ip,
-                                 actions=actions)
+            br.add_flow(table=constants.ARP_RESPONDER,
+                        priority=1,
+                        proto='arp',
+                        dl_vlan=local_vid,
+                        nw_dst='%s' % ip,
+                        actions=actions)
         elif action == 'remove':
-            self.tun_br.delete_flows(table=constants.ARP_RESPONDER,
-                                     proto='arp',
-                                     dl_vlan=local_vid,
-                                     nw_dst='%s' % ip)
+            br.delete_flows(table=constants.ARP_RESPONDER,
+                            proto='arp',
+                            dl_vlan=local_vid,
+                            nw_dst='%s' % ip)
         else:
             LOG.warning(_('Action %s not supported'), action)
 
@@ -570,7 +571,8 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
                 if self.l2_pop:
                     # Try to remove tunnel ports if not used by other networks
                     for ofport in lvm.tun_ofports:
-                        self.cleanup_tunnel_port(ofport, lvm.network_type)
+                        self.cleanup_tunnel_port(self.tun_br, ofport,
+                                                 lvm.network_type)
         elif lvm.network_type == p_const.TYPE_FLAT:
             if lvm.physical_network in self.phys_brs:
                 # outbound
@@ -728,16 +730,16 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
             ancillary_bridges.append(br)
         return ancillary_bridges
 
-    def setup_tunnel_br(self, tun_br=None):
+    def setup_tunnel_br(self, tun_br_name=None):
         '''Setup the tunnel bridge.
 
         Creates tunnel bridge, and links it to the integration bridge
         using a patch port.
 
-        :param tun_br: the name of the tunnel bridge.
+        :param tun_br_name: the name of the tunnel bridge.
         '''
         if not self.tun_br:
-            self.tun_br = ovs_lib.OVSBridge(tun_br, self.root_helper)
+            self.tun_br = ovs_lib.OVSBridge(tun_br_name, self.root_helper)
 
         self.tun_br.reset_bridge()
         self.patch_tun_ofport = self.int_br.add_patch_port(
@@ -1011,13 +1013,13 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
         else:
             LOG.debug(_("No VIF port for port %s defined on agent."), port_id)
 
-    def _setup_tunnel_port(self, port_name, remote_ip, tunnel_type):
-        ofport = self.tun_br.add_tunnel_port(port_name,
-                                             remote_ip,
-                                             self.local_ip,
-                                             tunnel_type,
-                                             self.vxlan_udp_port,
-                                             self.dont_fragment)
+    def _setup_tunnel_port(self, br, port_name, remote_ip, tunnel_type):
+        ofport = br.add_tunnel_port(port_name,
+                                    remote_ip,
+                                    self.local_ip,
+                                    tunnel_type,
+                                    self.vxlan_udp_port,
+                                    self.dont_fragment)
         ofport_int = -1
         try:
             ofport_int = int(ofport)
@@ -1032,35 +1034,34 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
         self.tun_br_ofports[tunnel_type][remote_ip] = ofport
         # Add flow in default table to resubmit to the right
         # tunnelling table (lvid will be set in the latter)
-        self.tun_br.add_flow(priority=1,
-                             in_port=ofport,
-                             actions="resubmit(,%s)" %
-                             constants.TUN_TABLE[tunnel_type])
+        br.add_flow(priority=1,
+                    in_port=ofport,
+                    actions="resubmit(,%s)" %
+                    constants.TUN_TABLE[tunnel_type])
 
         ofports = ','.join(self.tun_br_ofports[tunnel_type].values())
         if ofports and not self.l2_pop:
             # Update flooding flows to include the new tunnel
             for network_id, vlan_mapping in self.local_vlan_map.iteritems():
                 if vlan_mapping.network_type == tunnel_type:
-                    self.tun_br.mod_flow(table=constants.FLOOD_TO_TUN,
-                                         dl_vlan=vlan_mapping.vlan,
-                                         actions="strip_vlan,"
-                                         "set_tunnel:%s,output:%s" %
-                                         (vlan_mapping.segmentation_id,
-                                          ofports))
+                    br.mod_flow(table=constants.FLOOD_TO_TUN,
+                                dl_vlan=vlan_mapping.vlan,
+                                actions="strip_vlan,set_tunnel:%s,output:%s" %
+                                (vlan_mapping.segmentation_id, ofports))
         return ofport
 
-    def setup_tunnel_port(self, remote_ip, network_type):
+    def setup_tunnel_port(self, br, remote_ip, network_type):
         remote_ip_hex = self.get_ip_in_hex(remote_ip)
         if not remote_ip_hex:
             return 0
         port_name = '%s-%s' % (network_type, remote_ip_hex)
-        ofport = self._setup_tunnel_port(port_name,
+        ofport = self._setup_tunnel_port(br,
+                                         port_name,
                                          remote_ip,
                                          network_type)
         return ofport
 
-    def cleanup_tunnel_port(self, tun_ofport, tunnel_type):
+    def cleanup_tunnel_port(self, br, tun_ofport, tunnel_type):
         # Check if this tunnel port is still used
         for lvm in self.local_vlan_map.values():
             if tun_ofport in lvm.tun_ofports:
@@ -1071,8 +1072,8 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
                 if ofport == tun_ofport:
                     port_name = '%s-%s' % (tunnel_type,
                                            self.get_ip_in_hex(remote_ip))
-                    self.tun_br.delete_port(port_name)
-                    self.tun_br.delete_flows(in_port=ofport)
+                    br.delete_port(port_name)
+                    br.delete_flows(in_port=ofport)
                     self.tun_br_ofports[tunnel_type].pop(remote_ip, None)
 
     def treat_devices_added_or_updated(self, devices, ovs_restarted):
@@ -1283,7 +1284,6 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
             return
 
     def tunnel_sync(self):
-        resync = False
         try:
             for tunnel_type in self.tunnel_types:
                 details = self.plugin_rpc.tunnel_sync(self.context,
@@ -1303,13 +1303,15 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
                                 continue
                             tun_name = '%s-%s' % (tunnel_type,
                                                   tunnel_id or remote_ip_hex)
-                            self._setup_tunnel_port(
-                                tun_name, tunnel['ip_address'], tunnel_type)
+                            self._setup_tunnel_port(self.tun_br,
+                                                    tun_name,
+                                                    tunnel['ip_address'],
+                                                    tunnel_type)
         except Exception as e:
             LOG.debug(_("Unable to sync tunnel IP %(local_ip)s: %(e)s"),
                       {'local_ip': self.local_ip, 'e': e})
-            resync = True
-        return resync
+            return True
+        return False
 
     def _agent_has_updates(self, polling_manager):
         return (polling_manager.is_polling_required or
index 6460626283b263c823298ea4c053bddd3e3534de..135f0395ec410e9b46b34e7414fa50f3ae66f0f8 100644 (file)
@@ -14,6 +14,7 @@
 # @author: Fumihiko Kakuma, VA Linux Systems Japan K.K.
 
 import collections
+import mock
 
 from neutron.agent import l2population_rpc
 from neutron.plugins.openvswitch.agent import ovs_neutron_agent
@@ -28,19 +29,19 @@ class FakeNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin):
     def fdb_remove(self, context, fdb_entries):
         pass
 
-    def add_fdb_flow(self, port_info, remote_ip, lvm, ofport):
+    def add_fdb_flow(self, br, port_info, remote_ip, lvm, ofport):
         pass
 
-    def del_fdb_flow(self, port_info, remote_ip, lvm, ofport):
+    def del_fdb_flow(self, br, port_info, remote_ip, lvm, ofport):
         pass
 
-    def setup_tunnel_port(self, remote_ip, network_type):
+    def setup_tunnel_port(self, br, remote_ip, network_type):
         pass
 
-    def cleanup_tunnel_port(self, tun_ofport, tunnel_type):
+    def cleanup_tunnel_port(self, br, tun_ofport, tunnel_type):
         pass
 
-    def setup_entry_for_arp_reply(self, action, local_vid, mac_address,
+    def setup_entry_for_arp_reply(self, br, action, local_vid, mac_address,
                                   ip_address):
         pass
 
@@ -50,6 +51,7 @@ class TestL2populationRpcCallBackTunnelMixinBase(base.BaseTestCase):
     def setUp(self):
         super(TestL2populationRpcCallBackTunnelMixinBase, self).setUp()
         self.fakeagent = FakeNeutronAgent()
+        self.fakebr = mock.Mock()
         Port = collections.namedtuple('Port', 'ip, ofport')
         LVM = collections.namedtuple(
             'LVM', 'net, vlan, phys, segid, mac, ip, vif, port')
index 24ae10c3f0f3a0cef0a81b2363c47242b439db8e..6742a3b3bf9cdece3c7d0444480d98bc26c25c1a 100644 (file)
@@ -260,39 +260,38 @@ class OVS_Lib_Test(base.BaseTestCase):
         self.br.add_flow(**flow_dict_6)
         self.br.add_flow(**flow_dict_7)
         expected_calls = [
-            mock.call(["ovs-ofctl", "add-flow", self.BR_NAME,
-                       "hard_timeout=0,idle_timeout=0,"
-                       "priority=2,dl_src=ca:fe:de:ad:be:ef"
-                       ",actions=strip_vlan,output:0"],
-                      process_input=None, root_helper=self.root_helper),
-            mock.call(["ovs-ofctl", "add-flow", self.BR_NAME,
-                       "hard_timeout=0,idle_timeout=0,"
-                       "priority=1,actions=normal"],
-                      process_input=None, root_helper=self.root_helper),
-            mock.call(["ovs-ofctl", "add-flow", self.BR_NAME,
-                       "hard_timeout=0,idle_timeout=0,"
-                       "priority=2,actions=drop"],
-                      process_input=None, root_helper=self.root_helper),
-            mock.call(["ovs-ofctl", "add-flow", self.BR_NAME,
-                       "hard_timeout=0,idle_timeout=0,"
-                       "priority=2,in_port=%s,actions=drop" % ofport],
-                      process_input=None, root_helper=self.root_helper),
-            mock.call(["ovs-ofctl", "add-flow", self.BR_NAME,
-                       "hard_timeout=0,idle_timeout=0,"
-                       "priority=4,dl_vlan=%s,in_port=%s,"
-                       "actions=strip_vlan,set_tunnel:%s,normal"
-                       % (vid, ofport, lsw_id)],
-                      process_input=None, root_helper=self.root_helper),
-            mock.call(["ovs-ofctl", "add-flow", self.BR_NAME,
-                       "hard_timeout=0,idle_timeout=0,"
-                       "priority=3,tun_id=%s,actions="
-                       "mod_vlan_vid:%s,output:%s"
-                       % (lsw_id, vid, ofport)],
-                      process_input=None, root_helper=self.root_helper),
-            mock.call(["ovs-ofctl", "add-flow", self.BR_NAME,
-                       "hard_timeout=0,idle_timeout=0,"
-                       "priority=4,nw_src=%s,arp,actions=drop" % cidr],
-                      process_input=None, root_helper=self.root_helper),
+            mock.call(["ovs-ofctl", "add-flows", self.BR_NAME, '-'],
+                      process_input="hard_timeout=0,idle_timeout=0,"
+                                    "priority=2,dl_src=ca:fe:de:ad:be:ef"
+                                    ",actions=strip_vlan,output:0",
+                      root_helper=self.root_helper),
+            mock.call(["ovs-ofctl", "add-flows", self.BR_NAME, '-'],
+                      process_input="hard_timeout=0,idle_timeout=0,"
+                                    "priority=1,actions=normal",
+                      root_helper=self.root_helper),
+            mock.call(["ovs-ofctl", "add-flows", self.BR_NAME, '-'],
+                      process_input="hard_timeout=0,idle_timeout=0,"
+                                    "priority=2,actions=drop",
+                      root_helper=self.root_helper),
+            mock.call(["ovs-ofctl", "add-flows", self.BR_NAME, '-'],
+                      process_input="hard_timeout=0,idle_timeout=0,priority=2,"
+                                    "in_port=%s,actions=drop" % ofport,
+                      root_helper=self.root_helper),
+            mock.call(["ovs-ofctl", "add-flows", self.BR_NAME, '-'],
+                      process_input="hard_timeout=0,idle_timeout=0,"
+                                    "priority=4,dl_vlan=%s,in_port=%s,"
+                                    "actions=strip_vlan,set_tunnel:%s,normal"
+                                    % (vid, ofport, lsw_id),
+                      root_helper=self.root_helper),
+            mock.call(["ovs-ofctl", "add-flows", self.BR_NAME, '-'],
+                      process_input="hard_timeout=0,idle_timeout=0,priority=3,"
+                                    "tun_id=%s,actions=mod_vlan_vid:%s,"
+                                    "output:%s" % (lsw_id, vid, ofport),
+                      root_helper=self.root_helper),
+            mock.call(["ovs-ofctl", "add-flows", self.BR_NAME, '-'],
+                      process_input="hard_timeout=0,idle_timeout=0,priority=4,"
+                                    "nw_src=%s,arp,actions=drop" % cidr,
+                      root_helper=self.root_helper),
         ]
         self.execute.assert_has_calls(expected_calls)
 
@@ -304,9 +303,9 @@ class OVS_Lib_Test(base.BaseTestCase):
 
         self.br.add_flow(**flow_dict)
         self.execute.assert_called_once_with(
-            ["ovs-ofctl", "add-flow", self.BR_NAME,
-             "hard_timeout=1000,idle_timeout=2000,priority=1,actions=normal"],
-            process_input=None,
+            ["ovs-ofctl", "add-flows", self.BR_NAME, '-'],
+            process_input="hard_timeout=1000,idle_timeout=2000,priority=1,"
+                          "actions=normal",
             root_helper=self.root_helper)
 
     def test_add_flow_default_priority(self):
@@ -314,9 +313,9 @@ class OVS_Lib_Test(base.BaseTestCase):
 
         self.br.add_flow(**flow_dict)
         self.execute.assert_called_once_with(
-            ["ovs-ofctl", "add-flow", self.BR_NAME,
-             "hard_timeout=0,idle_timeout=0,priority=1,actions=normal"],
-            process_input=None,
+            ["ovs-ofctl", "add-flows", self.BR_NAME, '-'],
+            process_input="hard_timeout=0,idle_timeout=0,priority=1,"
+                          "actions=normal",
             root_helper=self.root_helper)
 
     def _test_get_port_ofport(self, ofport, expected_result):
@@ -362,15 +361,15 @@ class OVS_Lib_Test(base.BaseTestCase):
         self.br.delete_flows(tun_id=lsw_id)
         self.br.delete_flows(dl_vlan=vid)
         expected_calls = [
-            mock.call(["ovs-ofctl", "del-flows", self.BR_NAME,
-                       "in_port=" + ofport],
-                      process_input=None, root_helper=self.root_helper),
-            mock.call(["ovs-ofctl", "del-flows", self.BR_NAME,
-                       "tun_id=%s" % lsw_id],
-                      process_input=None, root_helper=self.root_helper),
-            mock.call(["ovs-ofctl", "del-flows", self.BR_NAME,
-                       "dl_vlan=%s" % vid],
-                      process_input=None, root_helper=self.root_helper),
+            mock.call(["ovs-ofctl", "del-flows", self.BR_NAME, '-'],
+                      process_input="in_port=" + ofport,
+                      root_helper=self.root_helper),
+            mock.call(["ovs-ofctl", "del-flows", self.BR_NAME, '-'],
+                      process_input="tun_id=%s" % lsw_id,
+                      root_helper=self.root_helper),
+            mock.call(["ovs-ofctl", "del-flows", self.BR_NAME, '-'],
+                      process_input="dl_vlan=%s" % vid,
+                      root_helper=self.root_helper),
         ]
         self.execute.assert_has_calls(expected_calls)
 
@@ -425,75 +424,6 @@ class OVS_Lib_Test(base.BaseTestCase):
                           self.br.mod_flow,
                           **params)
 
-    def test_defer_apply_flows(self):
-
-        flow_expr = mock.patch.object(ovs_lib, '_build_flow_expr_str').start()
-        flow_expr.side_effect = ['added_flow_1', 'added_flow_2',
-                                 'deleted_flow_1']
-        run_ofctl = mock.patch.object(self.br, 'run_ofctl').start()
-
-        self.br.defer_apply_on()
-        self.br.add_flow(flow='add_flow_1')
-        self.br.defer_apply_on()
-        self.br.add_flow(flow='add_flow_2')
-        self.br.delete_flows(flow='delete_flow_1')
-        self.br.defer_apply_off()
-
-        flow_expr.assert_has_calls([
-            mock.call({'flow': 'add_flow_1'}, 'add'),
-            mock.call({'flow': 'add_flow_2'}, 'add'),
-            mock.call({'flow': 'delete_flow_1'}, 'del')
-        ])
-
-        run_ofctl.assert_has_calls([
-            mock.call('add-flows', ['-'], 'added_flow_1\nadded_flow_2\n'),
-            mock.call('del-flows', ['-'], 'deleted_flow_1\n')
-        ])
-
-    def test_defer_apply_flows_concurrently(self):
-        flow_expr = mock.patch.object(ovs_lib, '_build_flow_expr_str').start()
-        flow_expr.side_effect = ['added_flow_1', 'deleted_flow_1',
-                                 'modified_flow_1', 'added_flow_2',
-                                 'deleted_flow_2', 'modified_flow_2']
-
-        run_ofctl = mock.patch.object(self.br, 'run_ofctl').start()
-
-        def run_ofctl_fake(cmd, args, process_input=None):
-            self.br.defer_apply_on()
-            if cmd == 'add-flows':
-                self.br.add_flow(flow='added_flow_2')
-            elif cmd == 'del-flows':
-                self.br.delete_flows(flow='deleted_flow_2')
-            elif cmd == 'mod-flows':
-                self.br.mod_flow(flow='modified_flow_2')
-        run_ofctl.side_effect = run_ofctl_fake
-
-        self.br.defer_apply_on()
-        self.br.add_flow(flow='added_flow_1')
-        self.br.delete_flows(flow='deleted_flow_1')
-        self.br.mod_flow(flow='modified_flow_1')
-        self.br.defer_apply_off()
-
-        run_ofctl.side_effect = None
-        self.br.defer_apply_off()
-
-        flow_expr.assert_has_calls([
-            mock.call({'flow': 'added_flow_1'}, 'add'),
-            mock.call({'flow': 'deleted_flow_1'}, 'del'),
-            mock.call({'flow': 'modified_flow_1'}, 'mod'),
-            mock.call({'flow': 'added_flow_2'}, 'add'),
-            mock.call({'flow': 'deleted_flow_2'}, 'del'),
-            mock.call({'flow': 'modified_flow_2'}, 'mod')
-        ])
-        run_ofctl.assert_has_calls([
-            mock.call('add-flows', ['-'], 'added_flow_1\n'),
-            mock.call('del-flows', ['-'], 'deleted_flow_1\n'),
-            mock.call('mod-flows', ['-'], 'modified_flow_1\n'),
-            mock.call('add-flows', ['-'], 'added_flow_2\n'),
-            mock.call('del-flows', ['-'], 'deleted_flow_2\n'),
-            mock.call('mod-flows', ['-'], 'modified_flow_2\n')
-        ])
-
     def test_add_tunnel_port(self):
         pname = "tap99"
         local_ip = "1.1.1.1"
@@ -932,3 +862,111 @@ class OVS_Lib_Test(base.BaseTestCase):
         data = [[["map", external_ids], "tap99", 1]]
         self.assertIsNone(self._test_get_vif_port_by_id('tap99id', data,
                                                         "br-ext"))
+
+
+class TestDeferredOVSBridge(base.BaseTestCase):
+
+    def setUp(self):
+        super(TestDeferredOVSBridge, self).setUp()
+
+        self.br = mock.Mock()
+        self.mocked_do_action_flows = mock.patch.object(
+            self.br, 'do_action_flows').start()
+
+        self.add_flow_dict1 = dict(in_port=11, actions='drop')
+        self.add_flow_dict2 = dict(in_port=12, actions='drop')
+        self.mod_flow_dict1 = dict(in_port=21, actions='drop')
+        self.mod_flow_dict2 = dict(in_port=22, actions='drop')
+        self.del_flow_dict1 = dict(in_port=31)
+        self.del_flow_dict2 = dict(in_port=32)
+
+    def _verify_mock_call(self, expected_calls):
+        self.mocked_do_action_flows.assert_has_calls(expected_calls)
+        self.assertEqual(len(expected_calls),
+                         len(self.mocked_do_action_flows.mock_calls))
+
+    def test_apply_on_exit(self):
+        expected_calls = [
+            mock.call('add', [self.add_flow_dict1]),
+            mock.call('mod', [self.mod_flow_dict1]),
+            mock.call('del', [self.del_flow_dict1]),
+        ]
+
+        with ovs_lib.DeferredOVSBridge(self.br) as deferred_br:
+            deferred_br.add_flow(**self.add_flow_dict1)
+            deferred_br.mod_flow(**self.mod_flow_dict1)
+            deferred_br.delete_flows(**self.del_flow_dict1)
+            self._verify_mock_call([])
+        self._verify_mock_call(expected_calls)
+
+    def test_apply_on_exit_with_errors(self):
+        try:
+            with ovs_lib.DeferredOVSBridge(self.br) as deferred_br:
+                deferred_br.add_flow(**self.add_flow_dict1)
+                deferred_br.mod_flow(**self.mod_flow_dict1)
+                deferred_br.delete_flows(**self.del_flow_dict1)
+                raise Exception
+        except Exception:
+            self._verify_mock_call([])
+        else:
+            self.fail('Exception would be reraised')
+
+    def test_apply(self):
+        expected_calls = [
+            mock.call('add', [self.add_flow_dict1]),
+            mock.call('mod', [self.mod_flow_dict1]),
+            mock.call('del', [self.del_flow_dict1]),
+        ]
+
+        with ovs_lib.DeferredOVSBridge(self.br) as deferred_br:
+            deferred_br.add_flow(**self.add_flow_dict1)
+            deferred_br.mod_flow(**self.mod_flow_dict1)
+            deferred_br.delete_flows(**self.del_flow_dict1)
+            self._verify_mock_call([])
+            deferred_br.apply_flows()
+            self._verify_mock_call(expected_calls)
+        self._verify_mock_call(expected_calls)
+
+    def test_apply_order(self):
+        expected_calls = [
+            mock.call('del', [self.del_flow_dict1, self.del_flow_dict2]),
+            mock.call('mod', [self.mod_flow_dict1, self.mod_flow_dict2]),
+            mock.call('add', [self.add_flow_dict1, self.add_flow_dict2]),
+        ]
+
+        order = 'del', 'mod', 'add'
+        with ovs_lib.DeferredOVSBridge(self.br, order=order) as deferred_br:
+            deferred_br.add_flow(**self.add_flow_dict1)
+            deferred_br.mod_flow(**self.mod_flow_dict1)
+            deferred_br.delete_flows(**self.del_flow_dict1)
+            deferred_br.delete_flows(**self.del_flow_dict2)
+            deferred_br.add_flow(**self.add_flow_dict2)
+            deferred_br.mod_flow(**self.mod_flow_dict2)
+        self._verify_mock_call(expected_calls)
+
+    def test_apply_full_ordered(self):
+        expected_calls = [
+            mock.call('add', [self.add_flow_dict1]),
+            mock.call('mod', [self.mod_flow_dict1]),
+            mock.call('del', [self.del_flow_dict1, self.del_flow_dict2]),
+            mock.call('add', [self.add_flow_dict2]),
+            mock.call('mod', [self.mod_flow_dict2]),
+        ]
+
+        with ovs_lib.DeferredOVSBridge(self.br,
+                                       full_ordered=True) as deferred_br:
+            deferred_br.add_flow(**self.add_flow_dict1)
+            deferred_br.mod_flow(**self.mod_flow_dict1)
+            deferred_br.delete_flows(**self.del_flow_dict1)
+            deferred_br.delete_flows(**self.del_flow_dict2)
+            deferred_br.add_flow(**self.add_flow_dict2)
+            deferred_br.mod_flow(**self.mod_flow_dict2)
+        self._verify_mock_call(expected_calls)
+
+    def test_getattr_unallowed_attr(self):
+        with ovs_lib.DeferredOVSBridge(self.br) as deferred_br:
+            self.assertEqual(self.br.add_port, deferred_br.add_port)
+
+    def test_getattr_unallowed_attr(self):
+        with ovs_lib.DeferredOVSBridge(self.br) as deferred_br:
+            self.assertRaises(AttributeError, getattr, deferred_br, 'failure')
index 2b9bbfcab3a8cedb04f75966738403c1d37fa6a0..e6b626a22bf1c7a6fe05d56deecf1ca65c7d44e5 100644 (file)
@@ -69,15 +69,15 @@ class TestL2populationRpcCallBackTunnelMixin(
             mock.patch.object(self.fakeagent, 'setup_tunnel_port'),
             mock.patch.object(self.fakeagent, 'add_fdb_flow'),
         ) as (mock_setup_tunnel_port, mock_add_fdb_flow):
-            self.fakeagent.fdb_add_tun('context', self.lvm1,
+            self.fakeagent.fdb_add_tun('context', self.fakebr, self.lvm1,
                                        self.agent_ports, self.ofports)
         expected = [
-            mock.call([self.lvms[0].mac, self.lvms[0].ip], self.ports[0].ip,
-                      self.lvm1, self.ports[0].ofport),
-            mock.call([self.lvms[1].mac, self.lvms[1].ip], self.ports[1].ip,
-                      self.lvm1, self.ports[1].ofport),
-            mock.call([self.lvms[2].mac, self.lvms[2].ip], self.ports[2].ip,
-                      self.lvm1, self.ports[2].ofport),
+            mock.call(self.fakebr, [self.lvms[0].mac, self.lvms[0].ip],
+                      self.ports[0].ip, self.lvm1, self.ports[0].ofport),
+            mock.call(self.fakebr, [self.lvms[1].mac, self.lvms[1].ip],
+                      self.ports[1].ip, self.lvm1, self.ports[1].ofport),
+            mock.call(self.fakebr, [self.lvms[2].mac, self.lvms[2].ip],
+                      self.ports[2].ip, self.lvm1, self.ports[2].ofport),
         ]
         self.assertEqual(sorted(expected),
                          sorted(mock_add_fdb_flow.call_args_list))
@@ -90,17 +90,17 @@ class TestL2populationRpcCallBackTunnelMixin(
                               return_value=ofport),
             mock.patch.object(self.fakeagent, 'add_fdb_flow'),
         ) as (mock_setup_tunnel_port, mock_add_fdb_flow):
-            self.fakeagent.fdb_add_tun('context', self.lvm1,
+            self.fakeagent.fdb_add_tun('context', self.fakebr, self.lvm1,
                                        self.agent_ports, self.ofports)
         mock_setup_tunnel_port.assert_called_once_with(
-            self.ports[1].ip, self.lvm1.network_type)
+            self.fakebr, self.ports[1].ip, self.lvm1.network_type)
         expected = [
-            mock.call([self.lvms[0].mac, self.lvms[0].ip], self.ports[0].ip,
-                      self.lvm1, self.ports[0].ofport),
-            mock.call([self.lvms[1].mac, self.lvms[1].ip], self.ports[1].ip,
-                      self.lvm1, ofport),
-            mock.call([self.lvms[2].mac, self.lvms[2].ip], self.ports[2].ip,
-                      self.lvm1, self.ports[2].ofport),
+            mock.call(self.fakebr, [self.lvms[0].mac, self.lvms[0].ip],
+                      self.ports[0].ip, self.lvm1, self.ports[0].ofport),
+            mock.call(self.fakebr, [self.lvms[1].mac, self.lvms[1].ip],
+                      self.ports[1].ip, self.lvm1, ofport),
+            mock.call(self.fakebr, [self.lvms[2].mac, self.lvms[2].ip],
+                      self.ports[2].ip, self.lvm1, self.ports[2].ofport),
         ]
         self.assertEqual(sorted(expected),
                          sorted(mock_add_fdb_flow.call_args_list))
@@ -112,15 +112,15 @@ class TestL2populationRpcCallBackTunnelMixin(
                               return_value=0),
             mock.patch.object(self.fakeagent, 'add_fdb_flow'),
         ) as (mock_setup_tunnel_port, mock_add_fdb_flow):
-            self.fakeagent.fdb_add_tun('context', self.lvm1,
+            self.fakeagent.fdb_add_tun('context', self.fakebr, self.lvm1,
                                        self.agent_ports, self.ofports)
         mock_setup_tunnel_port.assert_called_once_with(
-            self.ports[1].ip, self.lvm1.network_type)
+            self.fakebr, self.ports[1].ip, self.lvm1.network_type)
         expected = [
-            mock.call([self.lvms[0].mac, self.lvms[0].ip], self.ports[0].ip,
-                      self.lvm1, self.ports[0].ofport),
-            mock.call([self.lvms[2].mac, self.lvms[2].ip], self.ports[2].ip,
-                      self.lvm1, self.ports[2].ofport),
+            mock.call(self.fakebr, [self.lvms[0].mac, self.lvms[0].ip],
+                      self.ports[0].ip, self.lvm1, self.ports[0].ofport),
+            mock.call(self.fakebr, [self.lvms[2].mac, self.lvms[2].ip],
+                      self.ports[2].ip, self.lvm1, self.ports[2].ofport),
         ]
         self.assertEqual(sorted(expected),
                          sorted(mock_add_fdb_flow.call_args_list))
@@ -128,15 +128,15 @@ class TestL2populationRpcCallBackTunnelMixin(
     def test_fdb_remove_tun(self):
         with mock.patch.object(
             self.fakeagent, 'del_fdb_flow') as mock_del_fdb_flow:
-            self.fakeagent.fdb_remove_tun('context', self.lvm1,
+            self.fakeagent.fdb_remove_tun('context', self.fakebr, self.lvm1,
                                           self.agent_ports, self.ofports)
         expected = [
-            mock.call([self.lvms[0].mac, self.lvms[0].ip], self.ports[0].ip,
-                      self.lvm1, self.ports[0].ofport),
-            mock.call([self.lvms[1].mac, self.lvms[1].ip], self.ports[1].ip,
-                      self.lvm1, self.ports[1].ofport),
-            mock.call([self.lvms[2].mac, self.lvms[2].ip], self.ports[2].ip,
-                      self.lvm1, self.ports[2].ofport),
+            mock.call(self.fakebr, [self.lvms[0].mac, self.lvms[0].ip],
+                      self.ports[0].ip, self.lvm1, self.ports[0].ofport),
+            mock.call(self.fakebr, [self.lvms[1].mac, self.lvms[1].ip],
+                      self.ports[1].ip, self.lvm1, self.ports[1].ofport),
+            mock.call(self.fakebr, [self.lvms[2].mac, self.lvms[2].ip],
+                      self.ports[2].ip, self.lvm1, self.ports[2].ofport),
         ]
         self.assertEqual(sorted(expected),
                          sorted(mock_del_fdb_flow.call_args_list))
@@ -147,32 +147,33 @@ class TestL2populationRpcCallBackTunnelMixin(
             mock.patch.object(self.fakeagent, 'del_fdb_flow'),
             mock.patch.object(self.fakeagent, 'cleanup_tunnel_port'),
         ) as (mock_del_fdb_flow, mock_cleanup_tunnel_port):
-            self.fakeagent.fdb_remove_tun('context', self.lvm1,
+            self.fakeagent.fdb_remove_tun('context', self.fakebr, self.lvm1,
                                           self.agent_ports, self.ofports)
         expected = [
-            mock.call([self.lvms[0].mac, self.lvms[0].ip], self.ports[0].ip,
-                      self.lvm1, self.ports[0].ofport),
-            mock.call([n_const.FLOODING_ENTRY[0], n_const.FLOODING_ENTRY[1]],
+            mock.call(self.fakebr, [self.lvms[0].mac, self.lvms[0].ip],
+                      self.ports[0].ip, self.lvm1, self.ports[0].ofport),
+            mock.call(self.fakebr,
+                      [n_const.FLOODING_ENTRY[0], n_const.FLOODING_ENTRY[1]],
                       self.ports[1].ip, self.lvm1, self.ports[1].ofport),
-            mock.call([self.lvms[2].mac, self.lvms[2].ip], self.ports[2].ip,
-                      self.lvm1, self.ports[2].ofport),
+            mock.call(self.fakebr, [self.lvms[2].mac, self.lvms[2].ip],
+                      self.ports[2].ip, self.lvm1, self.ports[2].ofport),
         ]
         self.assertEqual(sorted(expected),
                          sorted(mock_del_fdb_flow.call_args_list))
         mock_cleanup_tunnel_port.assert_called_once_with(
-            self.ports[1].ofport, self.lvm1.network_type)
+            self.fakebr, self.ports[1].ofport, self.lvm1.network_type)
 
     def test_fdb_remove_tun_non_existence_key_in_ofports(self):
         del self.ofports[self.type_gre][self.ports[1].ip]
         with mock.patch.object(
             self.fakeagent, 'del_fdb_flow') as mock_del_fdb_flow:
-            self.fakeagent.fdb_remove_tun('context', self.lvm1,
+            self.fakeagent.fdb_remove_tun('context', self.fakebr, self.lvm1,
                                           self.agent_ports, self.ofports)
         expected = [
-            mock.call([self.lvms[0].mac, self.lvms[0].ip], self.ports[0].ip,
-                      self.lvm1, self.ports[0].ofport),
-            mock.call([self.lvms[2].mac, self.lvms[2].ip], self.ports[2].ip,
-                      self.lvm1, self.ports[2].ofport),
+            mock.call(self.fakebr, [self.lvms[0].mac, self.lvms[0].ip],
+                      self.ports[0].ip, self.lvm1, self.ports[0].ofport),
+            mock.call(self.fakebr, [self.lvms[2].mac, self.lvms[2].ip],
+                      self.ports[2].ip, self.lvm1, self.ports[2].ofport),
         ]
         self.assertEqual(sorted(expected),
                          sorted(mock_del_fdb_flow.call_args_list))
@@ -192,20 +193,21 @@ class TestL2populationRpcCallBackTunnelMixin(
     def test__fdb_chg_ip(self):
         m_setup_entry_for_arp_reply = mock.Mock()
         self.fakeagent.setup_entry_for_arp_reply = m_setup_entry_for_arp_reply
-        self.fakeagent.fdb_chg_ip_tun('context', self.upd_fdb_entry1_val,
-                                      self.local_ip, self.local_vlan_map1)
+        self.fakeagent.fdb_chg_ip_tun('context', self.fakebr,
+                                      self.upd_fdb_entry1_val, self.local_ip,
+                                      self.local_vlan_map1)
         expected = [
-            mock.call('remove', self.lvm1.vlan, self.lvms[0].mac,
+            mock.call(self.fakebr, 'remove', self.lvm1.vlan, self.lvms[0].mac,
                       self.lvms[0].ip),
-            mock.call('add', self.lvm1.vlan, self.lvms[1].mac,
+            mock.call(self.fakebr, 'add', self.lvm1.vlan, self.lvms[1].mac,
                       self.lvms[1].ip),
-            mock.call('remove', self.lvm1.vlan, self.lvms[0].mac,
+            mock.call(self.fakebr, 'remove', self.lvm1.vlan, self.lvms[0].mac,
                       self.lvms[0].ip),
-            mock.call('add', self.lvm1.vlan, self.lvms[1].mac,
+            mock.call(self.fakebr, 'add', self.lvm1.vlan, self.lvms[1].mac,
                       self.lvms[1].ip),
-            mock.call('remove', self.lvm2.vlan, self.lvms[0].mac,
+            mock.call(self.fakebr, 'remove', self.lvm2.vlan, self.lvms[0].mac,
                       self.lvms[0].ip),
-            mock.call('add', self.lvm2.vlan, self.lvms[2].mac,
+            mock.call(self.fakebr, 'add', self.lvm2.vlan, self.lvms[2].mac,
                       self.lvms[2].ip),
         ]
         m_setup_entry_for_arp_reply.assert_has_calls(expected, any_order=True)
@@ -214,7 +216,7 @@ class TestL2populationRpcCallBackTunnelMixin(
         m_setup_entry_for_arp_reply = mock.Mock()
         self.fakeagent.setup_entry_for_arp_reply = m_setup_entry_for_arp_reply
         self.fakeagent.fdb_chg_ip_tun(
-            'context', self.upd_fdb_entry1, self.local_ip, {})
+            'context', self.fakebr, self.upd_fdb_entry1, self.local_ip, {})
         self.assertFalse(m_setup_entry_for_arp_reply.call_count)
 
     def test__fdb_chg_ip_ip_is_local_ip(self):
@@ -228,6 +230,7 @@ class TestL2populationRpcCallBackTunnelMixin(
         }
         m_setup_entry_for_arp_reply = mock.Mock()
         self.fakeagent.setup_entry_for_arp_reply = m_setup_entry_for_arp_reply
-        self.fakeagent.fdb_chg_ip_tun('context', upd_fdb_entry_val,
-                                      self.local_ip, self.local_vlan_map1)
+        self.fakeagent.fdb_chg_ip_tun('context', self.fakebr,
+                                      upd_fdb_entry_val, self.local_ip,
+                                      self.local_vlan_map1)
         self.assertFalse(m_setup_entry_for_arp_reply.call_count)
index 1180b1de66e43506780ec4e187ddf38f46b2470b..b9690da0065c6947582315e3c42b80a464a416a6 100644 (file)
@@ -736,7 +736,7 @@ class TestOFANeutronAgent(ofa_test_base.OFAAgentTestBase):
             fdb_entry[self.lvms[0].net]['ports'][tunnel_ip] = [['mac', 'ip']]
             self.agent.fdb_add(None, fdb_entry)
             add_tun_fn.assert_called_with(
-                tun_name, tunnel_ip, self.tunnel_type)
+                self.agent.tun_br, tun_name, tunnel_ip, self.tunnel_type)
 
     def test_fdb_del_port(self):
         self._prepare_l2_pop_ofports()
@@ -831,7 +831,7 @@ class TestOFANeutronAgent(ofa_test_base.OFAAgentTestBase):
             mock.patch.object(self.mod_agent.LOG, 'error')
         ) as (add_tunnel_port_fn, log_error_fn):
             ofport = self.agent._setup_tunnel_port(
-                'gre-1', 'remote_ip', p_const.TYPE_GRE)
+                self.agent.tun_br, 'gre-1', 'remote_ip', p_const.TYPE_GRE)
             add_tunnel_port_fn.assert_called_once_with(
                 'gre-1', 'remote_ip', self.agent.local_ip, p_const.TYPE_GRE,
                 self.agent.vxlan_udp_port, self.agent.dont_fragment)
@@ -848,7 +848,7 @@ class TestOFANeutronAgent(ofa_test_base.OFAAgentTestBase):
             mock.patch.object(self.mod_agent.LOG, 'error')
         ) as (add_tunnel_port_fn, log_exc_fn, log_error_fn):
             ofport = self.agent._setup_tunnel_port(
-                'gre-1', 'remote_ip', p_const.TYPE_GRE)
+                self.agent.tun_br, 'gre-1', 'remote_ip', p_const.TYPE_GRE)
             add_tunnel_port_fn.assert_called_once_with(
                 'gre-1', 'remote_ip', self.agent.local_ip, p_const.TYPE_GRE,
                 self.agent.vxlan_udp_port, self.agent.dont_fragment)
index 084bd020e99322dcb0a6ccf9d3abab76aeedc45c..b7e8f3c1335b2373df5f90af7f2ff37a84363948 100644 (file)
@@ -931,7 +931,8 @@ class TestOvsNeutronAgent(base.BaseTestCase):
                               return_value='6'),
             mock.patch.object(self.agent.tun_br, "add_flow")
         ) as (add_tun_port_fn, add_flow_fn):
-            self.agent._setup_tunnel_port('portname', '1.2.3.4', 'vxlan')
+            self.agent._setup_tunnel_port(self.agent.tun_br, 'portname',
+                                          '1.2.3.4', 'vxlan')
             self.assertTrue(add_tun_port_fn.called)
 
     def test_port_unbound(self):
@@ -996,7 +997,7 @@ class TestOvsNeutronAgent(base.BaseTestCase):
                        [[FAKE_MAC, FAKE_IP1],
                         n_const.FLOODING_ENTRY]}}}
         with mock.patch.object(self.agent.tun_br,
-                               "defer_apply_on") as defer_fn:
+                               "deferred") as defer_fn:
             self.agent.fdb_add(None, fdb_entry)
             self.assertFalse(defer_fn.called)
 
@@ -1013,33 +1014,36 @@ class TestOvsNeutronAgent(base.BaseTestCase):
                        [[FAKE_MAC, FAKE_IP1],
                         n_const.FLOODING_ENTRY]}}}
         with contextlib.nested(
-            mock.patch.object(self.agent.tun_br, 'add_flow'),
-            mock.patch.object(self.agent.tun_br, 'mod_flow'),
+            mock.patch.object(self.agent.tun_br, 'deferred'),
+            mock.patch.object(self.agent.tun_br, 'do_action_flows'),
             mock.patch.object(self.agent, '_setup_tunnel_port'),
-        ) as (add_flow_fn, mod_flow_fn, add_tun_fn):
+        ) as (deferred_fn, do_action_flows_fn, add_tun_fn):
+            deferred_fn.return_value = ovs_lib.DeferredOVSBridge(
+                self.agent.tun_br)
             self.agent.fdb_add(None, fdb_entry)
             self.assertFalse(add_tun_fn.called)
             actions = (constants.ARP_RESPONDER_ACTIONS %
                        {'mac': netaddr.EUI(FAKE_MAC, dialect=netaddr.mac_unix),
                         'ip': netaddr.IPAddress(FAKE_IP1)})
-            add_flow_fn.assert_has_calls([
-                mock.call(table=constants.ARP_RESPONDER,
-                          priority=1,
-                          proto='arp',
-                          dl_vlan='vlan1',
-                          nw_dst=FAKE_IP1,
-                          actions=actions),
-                mock.call(table=constants.UCAST_TO_TUN,
-                          priority=2,
-                          dl_vlan='vlan1',
-                          dl_dst=FAKE_MAC,
-                          actions='strip_vlan,'
-                          'set_tunnel:seg1,output:2')
-            ])
-            mod_flow_fn.assert_called_with(table=constants.FLOOD_TO_TUN,
-                                           dl_vlan='vlan1',
-                                           actions='strip_vlan,'
-                                           'set_tunnel:seg1,output:1,2')
+            expected_calls = [
+                mock.call('add', [dict(table=constants.ARP_RESPONDER,
+                                       priority=1,
+                                       proto='arp',
+                                       dl_vlan='vlan1',
+                                       nw_dst=FAKE_IP1,
+                                       actions=actions),
+                                  dict(table=constants.UCAST_TO_TUN,
+                                       priority=2,
+                                       dl_vlan='vlan1',
+                                       dl_dst=FAKE_MAC,
+                                       actions='strip_vlan,'
+                                       'set_tunnel:seg1,output:2')]),
+                mock.call('mod', [dict(table=constants.FLOOD_TO_TUN,
+                                       dl_vlan='vlan1',
+                                       actions='strip_vlan,'
+                                       'set_tunnel:seg1,output:1,2')]),
+            ]
+            do_action_flows_fn.assert_has_calls(expected_calls)
 
     def test_fdb_del_flows(self):
         self._prepare_l2_pop_ofports()
@@ -1051,23 +1055,27 @@ class TestOvsNeutronAgent(base.BaseTestCase):
                        [[FAKE_MAC, FAKE_IP1],
                         n_const.FLOODING_ENTRY]}}}
         with contextlib.nested(
-            mock.patch.object(self.agent.tun_br, 'mod_flow'),
-            mock.patch.object(self.agent.tun_br, 'delete_flows'),
-        ) as (mod_flow_fn, del_flow_fn):
+            mock.patch.object(self.agent.tun_br, 'deferred'),
+            mock.patch.object(self.agent.tun_br, 'do_action_flows'),
+        ) as (deferred_fn, do_action_flows_fn):
+            deferred_fn.return_value = ovs_lib.DeferredOVSBridge(
+                self.agent.tun_br)
             self.agent.fdb_remove(None, fdb_entry)
-            mod_flow_fn.assert_called_with(table=constants.FLOOD_TO_TUN,
-                                           dl_vlan='vlan2',
-                                           actions='strip_vlan,'
-                                           'set_tunnel:seg2,output:1')
-            expected = [mock.call(table=constants.ARP_RESPONDER,
-                                  proto='arp',
-                                  dl_vlan='vlan2',
-                                  nw_dst=FAKE_IP1),
-                        mock.call(table=constants.UCAST_TO_TUN,
-                                  dl_vlan='vlan2',
-                                  dl_dst=FAKE_MAC),
-                        mock.call(in_port='2')]
-            del_flow_fn.assert_has_calls(expected)
+            expected_calls = [
+                mock.call('mod', [dict(table=constants.FLOOD_TO_TUN,
+                                       dl_vlan='vlan2',
+                                       actions='strip_vlan,'
+                                       'set_tunnel:seg2,output:1')]),
+                mock.call('del', [dict(table=constants.ARP_RESPONDER,
+                                       proto='arp',
+                                       dl_vlan='vlan2',
+                                       nw_dst=FAKE_IP1),
+                                  dict(table=constants.UCAST_TO_TUN,
+                                       dl_vlan='vlan2',
+                                       dl_dst=FAKE_MAC),
+                                  dict(in_port='2')]),
+            ]
+            do_action_flows_fn.assert_has_calls(expected_calls)
 
     def test_fdb_add_port(self):
         self._prepare_l2_pop_ofports()
@@ -1076,15 +1084,18 @@ class TestOvsNeutronAgent(base.BaseTestCase):
                       'segment_id': 'tun1',
                       'ports': {'1.1.1.1': [[FAKE_MAC, FAKE_IP1]]}}}
         with contextlib.nested(
-            mock.patch.object(self.agent.tun_br, 'add_flow'),
-            mock.patch.object(self.agent.tun_br, 'mod_flow'),
+            mock.patch.object(self.agent.tun_br, 'deferred'),
+            mock.patch.object(self.agent.tun_br, 'do_action_flows'),
             mock.patch.object(self.agent, '_setup_tunnel_port')
-        ) as (add_flow_fn, mod_flow_fn, add_tun_fn):
+        ) as (deferred_fn, do_action_flows_fn, add_tun_fn):
+            deferred_br = ovs_lib.DeferredOVSBridge(self.agent.tun_br)
+            deferred_fn.return_value = deferred_br
             self.agent.fdb_add(None, fdb_entry)
             self.assertFalse(add_tun_fn.called)
             fdb_entry['net1']['ports']['10.10.10.10'] = [[FAKE_MAC, FAKE_IP1]]
             self.agent.fdb_add(None, fdb_entry)
-            add_tun_fn.assert_called_with('gre-0a0a0a0a', '10.10.10.10', 'gre')
+            add_tun_fn.assert_called_with(
+                deferred_br, 'gre-0a0a0a0a', '10.10.10.10', 'gre')
 
     def test_fdb_del_port(self):
         self._prepare_l2_pop_ofports()
@@ -1093,11 +1104,14 @@ class TestOvsNeutronAgent(base.BaseTestCase):
                       'segment_id': 'tun2',
                       'ports': {'2.2.2.2': [n_const.FLOODING_ENTRY]}}}
         with contextlib.nested(
-            mock.patch.object(self.agent.tun_br, 'delete_flows'),
+            mock.patch.object(self.agent.tun_br, 'deferred'),
+            mock.patch.object(self.agent.tun_br, 'do_action_flows'),
             mock.patch.object(self.agent.tun_br, 'delete_port')
-        ) as (del_flow_fn, del_port_fn):
+        ) as (deferred_fn, do_action_flows_fn, delete_port_fn):
+            deferred_br = ovs_lib.DeferredOVSBridge(self.agent.tun_br)
+            deferred_fn.return_value = deferred_br
             self.agent.fdb_remove(None, fdb_entry)
-            del_port_fn.assert_called_once_with('gre-02020202')
+            delete_port_fn.assert_called_once_with('gre-02020202')
 
     def test_fdb_update_chg_ip(self):
         self._prepare_l2_pop_ofports()
@@ -1107,23 +1121,30 @@ class TestOvsNeutronAgent(base.BaseTestCase):
                          {'before': [[FAKE_MAC, FAKE_IP1]],
                           'after': [[FAKE_MAC, FAKE_IP2]]}}}}
         with contextlib.nested(
-            mock.patch.object(self.agent.tun_br, 'add_flow'),
-            mock.patch.object(self.agent.tun_br, 'delete_flows')
-        ) as (add_flow_fn, del_flow_fn):
+            mock.patch.object(self.agent.tun_br, 'deferred'),
+            mock.patch.object(self.agent.tun_br, 'do_action_flows'),
+        ) as (deferred_fn, do_action_flows_fn):
+            deferred_br = ovs_lib.DeferredOVSBridge(self.agent.tun_br)
+            deferred_fn.return_value = deferred_br
             self.agent.fdb_update(None, fdb_entries)
             actions = (constants.ARP_RESPONDER_ACTIONS %
                        {'mac': netaddr.EUI(FAKE_MAC, dialect=netaddr.mac_unix),
                         'ip': netaddr.IPAddress(FAKE_IP2)})
-            add_flow_fn.assert_called_once_with(table=constants.ARP_RESPONDER,
-                                                priority=1,
-                                                proto='arp',
-                                                dl_vlan='vlan1',
-                                                nw_dst=FAKE_IP2,
-                                                actions=actions)
-            del_flow_fn.assert_called_once_with(table=constants.ARP_RESPONDER,
-                                                proto='arp',
-                                                dl_vlan='vlan1',
-                                                nw_dst=FAKE_IP1)
+            expected_calls = [
+                mock.call('add', [dict(table=constants.ARP_RESPONDER,
+                                       priority=1,
+                                       proto='arp',
+                                       dl_vlan='vlan1',
+                                       nw_dst=FAKE_IP2,
+                                       actions=actions)]),
+                mock.call('del', [dict(table=constants.ARP_RESPONDER,
+                                       proto='arp',
+                                       dl_vlan='vlan1',
+                                       nw_dst=FAKE_IP1)])
+            ]
+            do_action_flows_fn.assert_has_calls(expected_calls)
+            self.assertEqual(len(expected_calls),
+                             len(do_action_flows_fn.mock_calls))
 
     def test_recl_lv_port_to_preserve(self):
         self._prepare_l2_pop_ofports()
@@ -1191,7 +1212,7 @@ class TestOvsNeutronAgent(base.BaseTestCase):
             mock.patch.object(ovs_neutron_agent.LOG, 'error')
         ) as (add_tunnel_port_fn, log_error_fn):
             ofport = self.agent._setup_tunnel_port(
-                'gre-1', 'remote_ip', p_const.TYPE_GRE)
+                self.agent.tun_br, 'gre-1', 'remote_ip', p_const.TYPE_GRE)
             add_tunnel_port_fn.assert_called_once_with(
                 'gre-1', 'remote_ip', self.agent.local_ip, p_const.TYPE_GRE,
                 self.agent.vxlan_udp_port, self.agent.dont_fragment)
@@ -1208,7 +1229,7 @@ class TestOvsNeutronAgent(base.BaseTestCase):
             mock.patch.object(ovs_neutron_agent.LOG, 'error')
         ) as (add_tunnel_port_fn, log_exc_fn, log_error_fn):
             ofport = self.agent._setup_tunnel_port(
-                'gre-1', 'remote_ip', p_const.TYPE_GRE)
+                self.agent.tun_br, 'gre-1', 'remote_ip', p_const.TYPE_GRE)
             add_tunnel_port_fn.assert_called_once_with(
                 'gre-1', 'remote_ip', self.agent.local_ip, p_const.TYPE_GRE,
                 self.agent.vxlan_udp_port, self.agent.dont_fragment)
@@ -1228,7 +1249,7 @@ class TestOvsNeutronAgent(base.BaseTestCase):
         ) as (add_tunnel_port_fn, log_error_fn):
             self.agent.dont_fragment = False
             ofport = self.agent._setup_tunnel_port(
-                'gre-1', 'remote_ip', p_const.TYPE_GRE)
+                self.agent.tun_br, 'gre-1', 'remote_ip', p_const.TYPE_GRE)
             add_tunnel_port_fn.assert_called_once_with(
                 'gre-1', 'remote_ip', self.agent.local_ip, p_const.TYPE_GRE,
                 self.agent.vxlan_udp_port, self.agent.dont_fragment)
@@ -1247,7 +1268,8 @@ class TestOvsNeutronAgent(base.BaseTestCase):
         ) as (tunnel_sync_rpc_fn, _setup_tunnel_port_fn):
             self.agent.tunnel_types = ['gre']
             self.agent.tunnel_sync()
-            expected_calls = [mock.call('gre-42', '100.101.102.103', 'gre')]
+            expected_calls = [mock.call(self.agent.tun_br, 'gre-42',
+                                        '100.101.102.103', 'gre')]
             _setup_tunnel_port_fn.assert_has_calls(expected_calls)
 
     def test_tunnel_sync_with_ml2_plugin(self):
@@ -1259,7 +1281,7 @@ class TestOvsNeutronAgent(base.BaseTestCase):
         ) as (tunnel_sync_rpc_fn, _setup_tunnel_port_fn):
             self.agent.tunnel_types = ['vxlan']
             self.agent.tunnel_sync()
-            expected_calls = [mock.call('vxlan-64651f0f',
+            expected_calls = [mock.call(self.agent.tun_br, 'vxlan-64651f0f',
                                         '100.101.31.15', 'vxlan')]
             _setup_tunnel_port_fn.assert_has_calls(expected_calls)
 
@@ -1273,7 +1295,8 @@ class TestOvsNeutronAgent(base.BaseTestCase):
         ) as (tunnel_sync_rpc_fn, _setup_tunnel_port_fn):
             self.agent.tunnel_types = ['vxlan']
             self.agent.tunnel_sync()
-            _setup_tunnel_port_fn.assert_called_once_with('vxlan-64646464',
+            _setup_tunnel_port_fn.assert_called_once_with(self.agent.tun_br,
+                                                          'vxlan-64646464',
                                                           '100.100.100.100',
                                                           'vxlan')
 
@@ -1285,7 +1308,8 @@ class TestOvsNeutronAgent(base.BaseTestCase):
         self.agent.tunnel_types = ['gre']
         self.agent.l2_pop = False
         self.agent.tunnel_update(context=None, **kwargs)
-        expected_calls = [mock.call('gre-0a0a0a0a', '10.10.10.10', 'gre')]
+        expected_calls = [
+            mock.call(self.agent.tun_br, 'gre-0a0a0a0a', '10.10.10.10', 'gre')]
         self.agent._setup_tunnel_port.assert_has_calls(expected_calls)
 
     def test_ovs_restart(self):