# under the License.
+from oslo import messaging
+
from neutron.api.rpc.handlers import dvr_rpc
from neutron.common import constants as n_const
+from neutron.common import rpc as n_rpc
from neutron.common import utils as n_utils
+from neutron.openstack.common import excutils
+from neutron.openstack.common.gettextutils import _LE, _LW, _LI
from neutron.openstack.common import log as logging
from neutron.plugins.openvswitch.common import constants
enable_distributed_routing=False):
self.context = context
self.plugin_rpc = plugin_rpc
- self.int_br = integ_br
- self.tun_br = tun_br
- self.patch_int_ofport = patch_int_ofport
- self.patch_tun_ofport = patch_tun_ofport
self.host = host
self.enable_tunneling = enable_tunneling
self.enable_distributed_routing = enable_distributed_routing
+ self.reset_ovs_parameters(integ_br, tun_br,
+ patch_int_ofport, patch_tun_ofport)
+ self.reset_dvr_parameters()
+ self.dvr_mac_address = None
+ if self.enable_tunneling and self.enable_distributed_routing:
+ self.get_dvr_mac_address()
def reset_ovs_parameters(self, integ_br, tun_br,
patch_int_ofport, patch_tun_ofport):
'''Reset the openvswitch parameters'''
- if not (self.enable_tunneling and self.enable_distributed_routing):
- return
self.int_br = integ_br
self.tun_br = tun_br
self.patch_int_ofport = patch_int_ofport
self.patch_tun_ofport = patch_tun_ofport
- def setup_dvr_flows_on_integ_tun_br(self):
- '''Setup up initial dvr flows into br-int and br-tun'''
- if not (self.enable_tunneling and self.enable_distributed_routing):
- return
- LOG.debug("L2 Agent operating in DVR Mode")
- self.dvr_mac_address = None
+ def reset_dvr_parameters(self):
+ '''Reset the DVR parameters'''
self.local_dvr_map = {}
self.local_csnat_map = {}
self.local_ports = {}
self.registered_dvr_macs = set()
- # get the local DVR MAC Address
+
+ def get_dvr_mac_address(self):
try:
- details = self.plugin_rpc.get_dvr_mac_address_by_host(
- self.context, self.host)
- LOG.debug("L2 Agent DVR: Received response for "
- "get_dvr_mac_address_by_host() from "
- "plugin: %r", details)
- self.dvr_mac_address = details['mac_address']
- except Exception:
- LOG.error(_("DVR: Failed to obtain local DVR Mac address"))
- self.enable_distributed_routing = False
+ self.get_dvr_mac_address_with_retry()
+ except n_rpc.RemoteError as e:
+ LOG.warning(_LW('L2 agent could not get DVR MAC address at '
+ 'startup due to RPC error. It happens when the '
+ 'server does not support this RPC API. Detailed '
+ 'message: %s'), e)
+ except messaging.MessagingTimeout:
+ LOG.error(_LE('DVR: Failed to obtain a valid local '
+ 'DVR MAC address - L2 Agent operating '
+ 'in Non-DVR Mode'))
+
+ if not self.in_distributed_mode():
# switch all traffic using L2 learning
self.int_br.add_flow(table=constants.LOCAL_SWITCHING,
priority=1, actions="normal")
+
+ def get_dvr_mac_address_with_retry(self):
+ # Get the local DVR MAC Address from the Neutron Server.
+ # This is the first place where we contact the server on startup
+ # so retry in case it's not ready to respond
+ for retry_count in reversed(range(5)):
+ try:
+ details = self.plugin_rpc.get_dvr_mac_address_by_host(
+ self.context, self.host)
+ except messaging.MessagingTimeout as e:
+ with excutils.save_and_reraise_exception() as ctx:
+ if retry_count > 0:
+ ctx.reraise = False
+ LOG.warning(_LW('L2 agent could not get DVR MAC '
+ 'address from server. Retrying. '
+ 'Detailed message: %s'), e)
+ else:
+ LOG.debug("L2 Agent DVR: Received response for "
+ "get_dvr_mac_address_by_host() from "
+ "plugin: %r", details)
+ self.dvr_mac_address = details['mac_address']
+ return
+
+ def setup_dvr_flows_on_integ_tun_br(self):
+ '''Setup up initial dvr flows into br-int and br-tun'''
+ if not (self.enable_tunneling and self.enable_distributed_routing):
+ return
+
+ if not self.in_distributed_mode():
return
+ LOG.info(_LI("L2 Agent operating in DVR Mode with MAC %s"),
+ self.dvr_mac_address)
# Remove existing flows in integration bridge
self.int_br.remove_all_flows()
constants.PATCH_LV_TO_TUN)
def dvr_mac_address_update(self, dvr_macs):
- if not (self.enable_tunneling and self.enable_distributed_routing):
- return
-
- LOG.debug("DVR Mac address update with host-mac: %s", dvr_macs)
-
if not self.dvr_mac_address:
LOG.debug("Self mac unknown, ignoring this "
"dvr_mac_address_update() ")
LOG.debug("Added DVR MAC flow for %s", newmac)
self.registered_dvr_macs.add(newmac)
+ def in_distributed_mode(self):
+ return self.dvr_mac_address is not None
+
def is_dvr_router_interface(self, device_owner):
return device_owner == n_const.DEVICE_OWNER_DVR_INTERFACE
def process_tunneled_network(self, network_type, lvid, segmentation_id):
if not (self.enable_tunneling and self.enable_distributed_routing):
return
+
+ if self.in_distributed_mode():
+ table_id = constants.DVR_NOT_LEARN
+ else:
+ table_id = constants.LEARN_FROM_TUN
self.tun_br.add_flow(table=constants.TUN_TABLE[network_type],
priority=1,
tun_id=segmentation_id,
actions="mod_vlan_vid:%s,"
"resubmit(,%s)" %
- (lvid, constants.DVR_NOT_LEARN))
+ (lvid, table_id))
def _bind_distributed_router_interface_port(self, port, fixed_ips,
device_owner, local_vlan):
def bind_port_to_dvr(self, port, network_type, fixed_ips,
device_owner, local_vlan_id):
- # a port coming up as distributed router interface
- if not (self.enable_tunneling and self.enable_distributed_routing):
+ if not self.in_distributed_mode():
return
if network_type not in constants.TUNNEL_NETWORK_TYPES:
self.local_ports.pop(port.vif_id, None)
def unbind_port_from_dvr(self, vif_port, local_vlan_id):
- if not (self.enable_tunneling and self.enable_distributed_routing):
+ if not self.in_distributed_mode():
return
# Handle port removed use-case
if vif_port and vif_port.vif_id not in self.local_ports:
self.patch_tun_ofport = constants.OFPORT_INVALID
if self.enable_tunneling:
# The patch_int_ofport and patch_tun_ofport are updated
- # here inside the call to setup_tunnel_br
- self.setup_tunnel_br(tun_br)
+ # here inside the call to reset_tunnel_br()
+ self.reset_tunnel_br(tun_br)
self.dvr_agent = ovs_dvr_neutron_agent.OVSDVRNeutronAgent(
self.context,
self.enable_tunneling,
self.enable_distributed_routing)
+ report_interval = cfg.CONF.AGENT.report_interval
+ if report_interval:
+ heartbeat = loopingcall.FixedIntervalLoopingCall(
+ self._report_state)
+ heartbeat.start(interval=report_interval)
+
+ if self.enable_tunneling:
+ self.setup_tunnel_br()
+
self.dvr_agent.setup_dvr_flows_on_integ_tun_br()
# Collect additional bridges to monitor
# How many devices are likely used by a VM
self.agent_state.get('configurations')['devices'] = (
self.int_br_device_count)
+ self.agent_state.get('configurations')['in_distributed_mode'] = (
+ self.dvr_agent.in_distributed_mode())
+
try:
self.state_rpc.report_state(self.context,
self.agent_state,
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
- report_interval = cfg.CONF.AGENT.report_interval
- if report_interval:
- heartbeat = loopingcall.FixedIntervalLoopingCall(
- self._report_state)
- heartbeat.start(interval=report_interval)
def get_net_uuid(self, vif_id):
for network_id, vlan_mapping in self.local_vlan_map.iteritems():
ancillary_bridges.append(br)
return ancillary_bridges
- def setup_tunnel_br(self, tun_br_name=None):
- '''Setup the tunnel bridge.
+ def reset_tunnel_br(self, tun_br_name=None):
+ '''(re)initialize the tunnel bridge.
Creates tunnel bridge, and links it to the integration bridge
using a patch port.
exit(1)
self.tun_br.remove_all_flows()
+ def setup_tunnel_br(self):
+ '''Setup the tunnel bridge.
+
+ Add all flows to the tunnel bridge.
+ '''
# Table 0 (default) will sort incoming traffic depending on in_port
self.tun_br.add_flow(priority=1,
in_port=self.patch_int_ofport,
self.setup_integration_br()
self.setup_physical_bridges(self.bridge_mappings)
if self.enable_tunneling:
+ self.reset_tunnel_br()
self.setup_tunnel_br()
tunnel_sync = True
- self.dvr_agent.reset_ovs_parameters(self.int_br,
- self.tun_br,
- self.patch_int_ofport,
- self.patch_tun_ofport)
- self.dvr_agent.setup_dvr_flows_on_integ_tun_br()
+ if self.enable_distributed_routing:
+ self.dvr_agent.reset_ovs_parameters(self.int_br,
+ self.tun_br,
+ self.patch_int_ofport,
+ self.patch_tun_ofport)
+ self.dvr_agent.reset_dvr_parameters()
+ self.dvr_agent.setup_dvr_flows_on_integ_tun_br()
# Notify the plugin of tunnel IP
if self.enable_tunneling and tunnel_sync:
LOG.info(_("Agent tunnel out of sync with plugin!"))
import mock
import netaddr
from oslo.config import cfg
+from oslo import messaging
import testtools
from neutron.agent.linux import async_process
from neutron.agent.linux import ovs_lib
from neutron.agent.linux import utils
from neutron.common import constants as n_const
+from neutron.common import rpc as n_rpc
from neutron.openstack.common import log
from neutron.plugins.common import constants as p_const
from neutron.plugins.openvswitch.agent import ovs_neutron_agent
def test_setup_dvr_flows_on_int_br(self):
self._setup_for_dvr_test()
with contextlib.nested(
- mock.patch.object(
- self.agent.dvr_agent.plugin_rpc,
- 'get_dvr_mac_address_by_host',
- return_value={'host': 'cn1',
- 'mac_address': 'aa:bb:cc:dd:ee:ff'}),
- mock.patch.object(self.agent.dvr_agent.int_br, 'add_flow'),
- mock.patch.object(self.agent.dvr_agent.tun_br, 'add_flow'),
mock.patch.object(self.agent.dvr_agent.int_br,
'remove_all_flows'),
+ mock.patch.object(self.agent.dvr_agent.int_br, 'add_flow'),
+ mock.patch.object(self.agent.dvr_agent.tun_br, 'add_flow'),
mock.patch.object(
self.agent.dvr_agent.plugin_rpc,
'get_dvr_mac_address_list',
'mac_address': 'aa:bb:cc:dd:ee:ff'},
{'host': 'cn2',
'mac_address': '11:22:33:44:55:66'}])) as \
- (get_subnet_fn, get_cphost_fn, get_vif_fn,
- add_flow_fn, delete_flows_fn):
+ (remove_flows_fn, add_int_flow_fn, add_tun_flow_fn,
+ get_mac_list_fn):
self.agent.dvr_agent.setup_dvr_flows_on_integ_tun_br()
+ self.assertTrue(self.agent.dvr_agent.in_distributed_mode())
+ self.assertTrue(remove_flows_fn.called)
+ self.assertEqual(add_int_flow_fn.call_count, 5)
+ self.assertEqual(add_tun_flow_fn.call_count, 5)
+
+ def test_get_dvr_mac_address(self):
+ self._setup_for_dvr_test()
+ self.agent.dvr_agent.dvr_mac_address = None
+ with mock.patch.object(self.agent.dvr_agent.plugin_rpc,
+ 'get_dvr_mac_address_by_host',
+ return_value={'host': 'cn1',
+ 'mac_address': 'aa:22:33:44:55:66'}):
+
+ self.agent.dvr_agent.get_dvr_mac_address()
+ self.assertEqual('aa:22:33:44:55:66',
+ self.agent.dvr_agent.dvr_mac_address)
+ self.assertTrue(self.agent.dvr_agent.in_distributed_mode())
+
+ def test_get_dvr_mac_address_exception(self):
+ self._setup_for_dvr_test()
+ self.agent.dvr_agent.dvr_mac_address = None
+ with contextlib.nested(
+ mock.patch.object(self.agent.dvr_agent.plugin_rpc,
+ 'get_dvr_mac_address_by_host',
+ side_effect=n_rpc.RemoteError),
+ mock.patch.object(self.agent.dvr_agent.int_br,
+ 'add_flow')) as (gd_mac, add_int_flow_fn):
+
+ self.agent.dvr_agent.get_dvr_mac_address()
+ self.assertIsNone(self.agent.dvr_agent.dvr_mac_address)
+ self.assertFalse(self.agent.dvr_agent.in_distributed_mode())
+ self.assertEqual(add_int_flow_fn.call_count, 1)
+
+ def test_get_dvr_mac_address_retried(self):
+ valid_entry = {'host': 'cn1', 'mac_address': 'aa:22:33:44:55:66'}
+ raise_timeout = messaging.MessagingTimeout()
+ # Raise a timeout the first 2 times it calls get_dvr_mac_address()
+ self._setup_for_dvr_test()
+ self.agent.dvr_agent.dvr_mac_address = None
+ with mock.patch.object(self.agent.dvr_agent.plugin_rpc,
+ 'get_dvr_mac_address_by_host',
+ side_effect=(raise_timeout, raise_timeout,
+ valid_entry)):
+
+ self.agent.dvr_agent.get_dvr_mac_address()
+ self.assertEqual('aa:22:33:44:55:66',
+ self.agent.dvr_agent.dvr_mac_address)
+ self.assertTrue(self.agent.dvr_agent.in_distributed_mode())
+ self.assertEqual(self.agent.dvr_agent.plugin_rpc.
+ get_dvr_mac_address_by_host.call_count, 3)
+
+ def test_get_dvr_mac_address_retried_max(self):
+ raise_timeout = messaging.MessagingTimeout()
+ # Raise a timeout every time until we give up, currently 5 tries
+ self._setup_for_dvr_test()
+ self.agent.dvr_agent.dvr_mac_address = None
+ with mock.patch.object(self.agent.dvr_agent.plugin_rpc,
+ 'get_dvr_mac_address_by_host',
+ side_effect=raise_timeout):
+
+ self.agent.dvr_agent.get_dvr_mac_address()
+ self.assertIsNone(self.agent.dvr_agent.dvr_mac_address)
+ self.assertFalse(self.agent.dvr_agent.in_distributed_mode())
+ self.assertEqual(self.agent.dvr_agent.plugin_rpc.
+ get_dvr_mac_address_by_host.call_count, 5)
def _test_port_dead(self, cur_tag=None):
port = mock.Mock()
mock.patch.object(sys, "exit")
) as (intbr_patch_fn, tunbr_patch_fn, remove_all_fn,
add_flow_fn, ovs_br_fn, reset_br_fn, exit_fn):
- self.agent.setup_tunnel_br(None)
+ self.agent.reset_tunnel_br(None)
+ self.agent.setup_tunnel_br()
self.assertTrue(intbr_patch_fn.called)
def test_setup_tunnel_port(self):