return value to include fixed_ips and device_owner for
the device port
1.4 - tunnel_sync rpc signature upgrade to obtain 'host'
+ 1.5 - Support update_device_list and
+ get_devices_details_list_and_failed_devices
'''
def __init__(self, topic):
]
return res
+ def get_devices_details_list_and_failed_devices(self, context, devices,
+ agent_id, host=None):
+ """Get devices details and the list of devices that failed.
+
+ This method returns the devices details. If an error is thrown when
+ retrieving the devices details, the device is put in a list of
+ failed devices.
+ """
+ try:
+ cctxt = self.client.prepare(version='1.5')
+ res = cctxt.call(
+ context,
+ 'get_devices_details_list_and_failed_devices',
+ devices=devices, agent_id=agent_id, host=host)
+ except oslo_messaging.UnsupportedVersion:
+ #TODO(rossella_s): Remove this failback logic in M
+ res = self._device_list_rpc_call_with_failed_dev(
+ self.get_device_details, context, agent_id, host, devices)
+ return res
+
def update_device_down(self, context, device, agent_id, host=None):
cctxt = self.client.prepare()
return cctxt.call(context, 'update_device_down', device=device,
return cctxt.call(context, 'update_device_up', device=device,
agent_id=agent_id, host=host)
+ def _device_list_rpc_call_with_failed_dev(self, rpc_call, context,
+ agent_id, host, devices):
+ succeeded_devices = []
+ failed_devices = []
+ for device in devices:
+ try:
+ rpc_device = rpc_call(context, device, agent_id, host)
+ except Exception:
+ failed_devices.append(device)
+ else:
+ # update_device_up doesn't return the device
+ succeeded_dev = rpc_device or device
+ succeeded_devices.append(succeeded_dev)
+ return {'devices': succeeded_devices, 'failed_devices': failed_devices}
+
+ def update_device_list(self, context, devices_up, devices_down,
+ agent_id, host):
+ try:
+ cctxt = self.client.prepare(version='1.5')
+ res = cctxt.call(context, 'update_device_list',
+ devices_up=devices_up, devices_down=devices_down,
+ agent_id=agent_id, host=host)
+ except oslo_messaging.UnsupportedVersion:
+ #TODO(rossella_s): Remove this failback logic in M
+ dev_up = self._device_list_rpc_call_with_failed_dev(
+ self.update_device_up, context, agent_id, host, devices_up)
+ dev_down = self._device_list_rpc_call_with_failed_dev(
+ self.update_device_down, context, agent_id, host, devices_down)
+
+ res = {'devices_up': dev_up.get('devices'),
+ 'failed_devices_up': dev_up.get('failed_devices'),
+ 'devices_down': dev_down.get('devices'),
+ 'failed_devices_down': dev_down.get('failed_devices')}
+ return res
+
def tunnel_sync(self, context, tunnel_ip, tunnel_type=None, host=None):
try:
cctxt = self.client.prepare(version='1.4')
class DeviceListRetrievalError(exceptions.NeutronException):
- message = _("Unable to retrieve port details for devices: %(devices)s "
- "because of error: %(error)s")
+ message = _("Unable to retrieve port details for devices: %(devices)s ")
# A class to represent a VIF (i.e., a port that has 'iface-id' and 'vif-mac'
port_other_config)
def _bind_devices(self, need_binding_ports):
+ devices_up = []
+ devices_down = []
port_info = self.int_br.db_list(
"Port", columns=["name", "tag"])
tags_by_name = {x['name']: x['tag'] for x in port_info}
# API server, thus possibly preventing instance spawn.
if port_detail.get('admin_state_up'):
LOG.debug("Setting status for %s to UP", device)
- self.plugin_rpc.update_device_up(
- self.context, device, self.agent_id, self.conf.host)
+ devices_up.append(device)
else:
LOG.debug("Setting status for %s to DOWN", device)
- self.plugin_rpc.update_device_down(
- self.context, device, self.agent_id, self.conf.host)
- LOG.info(_LI("Configuration for device %s completed."), device)
+ devices_down.append(device)
+ failed_devices = []
+ if devices_up or devices_down:
+ devices_set = self.plugin_rpc.update_device_list(
+ self.context, devices_up, devices_down, self.agent_id,
+ self.conf.host)
+ failed_devices = (devices_set.get('failed_devices_up') +
+ devices_set.get('failed_devices_down'))
+ if failed_devices:
+ LOG.error(_LE("Configuration for devices %s failed!"),
+ failed_devices)
+ #TODO(rossella_s) handle better the resync in next patches,
+ # this is just to preserve the current behavior
+ raise DeviceListRetrievalError(devices=failed_devices)
+ LOG.info(_LI("Configuration for devices up %(up)s and devices "
+ "down %(down)s completed."),
+ {'up': devices_up, 'down': devices_down})
@staticmethod
def setup_arp_spoofing_protection(bridge, vif, port_details):
def treat_devices_added_or_updated(self, devices, ovs_restarted):
skipped_devices = []
need_binding_devices = []
- try:
- devices_details_list = self.plugin_rpc.get_devices_details_list(
+ devices_details_list = (
+ self.plugin_rpc.get_devices_details_list_and_failed_devices(
self.context,
devices,
self.agent_id,
- self.conf.host)
- except Exception as e:
- raise DeviceListRetrievalError(devices=devices, error=e)
+ self.conf.host))
+ if devices_details_list.get('failed_devices'):
+ #TODO(rossella_s) handle better the resync in next patches,
+ # this is just to preserve the current behavior
+ raise DeviceListRetrievalError(devices=devices)
+
+ devices = devices_details_list.get('devices')
vif_by_id = self.int_br.get_vifs_by_ids(
- [vif['device'] for vif in devices_details_list])
- for details in devices_details_list:
+ [vif['device'] for vif in devices])
+ for details in devices:
device = details['device']
LOG.debug("Processing port: %s", device)
port = vif_by_id.get(device)
return skipped_devices, need_binding_devices
def treat_ancillary_devices_added(self, devices):
- try:
- devices_details_list = self.plugin_rpc.get_devices_details_list(
+ devices_details_list = (
+ self.plugin_rpc.get_devices_details_list_and_failed_devices(
self.context,
devices,
self.agent_id,
- self.conf.host)
- except Exception as e:
- raise DeviceListRetrievalError(devices=devices, error=e)
-
- for details in devices_details_list:
- device = details['device']
- LOG.info(_LI("Ancillary Port %s added"), device)
-
- # update plugin about port status
- self.plugin_rpc.update_device_up(self.context,
- device,
- self.agent_id,
- self.conf.host)
+ self.conf.host))
+ if devices_details_list.get('failed_devices'):
+ #TODO(rossella_s) handle better the resync in next patches,
+ # this is just to preserve the current behavior
+ raise DeviceListRetrievalError(devices=devices)
+ devices_added = [
+ d['device'] for d in devices_details_list.get('devices')]
+ LOG.info(_LI("Ancillary Ports %s added"), devices_added)
+
+ # update plugin about port status
+ devices_set_up = (
+ self.plugin_rpc.update_device_list(self.context,
+ devices_added,
+ [],
+ self.agent_id,
+ self.conf.host))
+ if devices_set_up.get('failed_devices_up'):
+ #TODO(rossella_s) handle better the resync in next patches,
+ # this is just to preserve the current behavior
+ raise DeviceListRetrievalError()
def treat_devices_removed(self, devices):
resync = False
self.sg_agent.remove_devices_filter(devices)
+ LOG.info(_LI("Ports %s removed"), devices)
+ devices_down = self.plugin_rpc.update_device_list(self.context,
+ [],
+ devices,
+ self.agent_id,
+ self.conf.host)
+ failed_devices = devices_down.get('failed_devices_down')
+ if failed_devices:
+ LOG.debug("Port removal failed for %(devices)s ", failed_devices)
+ resync = True
for device in devices:
- LOG.info(_LI("Attachment %s removed"), device)
- try:
- self.plugin_rpc.update_device_down(self.context,
- device,
- self.agent_id,
- self.conf.host)
- except Exception as e:
- LOG.debug("port_removed failed for %(device)s: %(e)s",
- {'device': device, 'e': e})
- resync = True
- continue
self.port_unbound(device)
return resync
def treat_ancillary_devices_removed(self, devices):
resync = False
- for device in devices:
- LOG.info(_LI("Attachment %s removed"), device)
- try:
- details = self.plugin_rpc.update_device_down(self.context,
- device,
- self.agent_id,
- self.conf.host)
- except Exception as e:
- LOG.debug("port_removed failed for %(device)s: %(e)s",
- {'device': device, 'e': e})
- resync = True
- continue
- if details['exists']:
- LOG.info(_LI("Port %s updated."), device)
+ LOG.info(_LI("Ancillary ports %s removed"), devices)
+ devices_down = self.plugin_rpc.update_device_list(self.context,
+ [],
+ devices,
+ self.agent_id,
+ self.conf.host)
+ failed_devices = devices_down.get('failed_devices_down')
+ if failed_devices:
+ LOG.debug("Port removal failed for %(devices)s ", failed_devices)
+ resync = True
+ for detail in devices_down.get('devices_down'):
+ if detail['exists']:
+ LOG.info(_LI("Port %s updated."), detail['device'])
# Nothing to do regarding local networking
else:
- LOG.debug("Device %s not defined on plugin", device)
+ LOG.debug("Device %s not defined on plugin", detail['device'])
return resync
def process_network_ports(self, port_info, ovs_restarted):
port_info.get('updated', set()))
self._bind_devices(need_binding_devices)
- if 'removed' in port_info:
+ if 'removed' in port_info and port_info['removed']:
start = time.time()
resync_b = self.treat_devices_removed(port_info['removed'])
LOG.debug("process_network_ports - iteration:%(iter_num)d - "
def process_ancillary_network_ports(self, port_info):
resync_a = False
resync_b = False
- if 'added' in port_info:
+ if 'added' in port_info and port_info['added']:
start = time.time()
try:
self.treat_ancillary_devices_added(port_info['added'])
LOG.debug("process_ancillary_network_ports - iteration: "
"%(iter_num)d - treat_ancillary_devices_added "
"completed in %(elapsed).3f",
- {'iter_num': self.iter_num,
- 'elapsed': time.time() - start})
+ {'iter_num': self.iter_num,
+ 'elapsed': time.time() - start})
except DeviceListRetrievalError:
# Need to resync as there was an error with server
# communication.
"iteration:%d - failure while retrieving "
"port details from server"), self.iter_num)
resync_a = True
- if 'removed' in port_info:
+ if 'removed' in port_info and port_info['removed']:
start = time.time()
resync_b = self.treat_ancillary_devices_removed(
port_info['removed'])
from neutron.common import topics
from neutron.extensions import portbindings
from neutron.extensions import portsecurity as psec
-from neutron.i18n import _LW
+from neutron.i18n import _LE, _LW
from neutron import manager
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2.drivers import type_tunnel
# return value to include fixed_ips and device_owner for
# the device port
# 1.4 tunnel_sync rpc signature upgrade to obtain 'host'
- target = oslo_messaging.Target(version='1.4')
+ # 1.5 Support update_device_list and
+ # get_devices_details_list_and_failed_devices
+ target = oslo_messaging.Target(version='1.5')
def __init__(self, notifier, type_manager):
self.setup_tunnel_callback_mixin(notifier, type_manager)
for device in kwargs.pop('devices', [])
]
+ def get_devices_details_list_and_failed_devices(self,
+ rpc_context,
+ **kwargs):
+ devices = []
+ failed_devices = []
+ cached_networks = {}
+ for device in kwargs.pop('devices', []):
+ try:
+ devices.append(self.get_device_details(
+ rpc_context,
+ device=device,
+ cached_networks=cached_networks,
+ **kwargs))
+ except Exception:
+ LOG.error(_LE("Failed to get details for device %s"),
+ device)
+ failed_devices.append(device)
+
+ return {'devices': devices,
+ 'failed_devices': failed_devices}
+
def update_device_down(self, rpc_context, **kwargs):
"""Device no longer exists on agent."""
# TODO(garyk) - live migration and port status
registry.notify(
resources.PORT, events.AFTER_UPDATE, plugin, **kwargs)
+ def update_device_list(self, rpc_context, **kwargs):
+ devices_up = []
+ failed_devices_up = []
+ devices_down = []
+ failed_devices_down = []
+ devices = kwargs.get('devices_up')
+ if devices:
+ for device in devices:
+ try:
+ self.update_device_up(
+ rpc_context,
+ device=device,
+ **kwargs)
+ except Exception:
+ failed_devices_up.append(device)
+ LOG.error(_LE("Failed to update device %s up"), device)
+ else:
+ devices_up.append(device)
+
+ devices = kwargs.get('devices_down')
+ if devices:
+ for device in devices:
+ try:
+ dev = self.update_device_down(
+ rpc_context,
+ device=device,
+ **kwargs)
+ except Exception:
+ failed_devices_down.append(device)
+ LOG.error(_LE("Failed to update device %s down"), device)
+ else:
+ devices_down.append(dev)
+
+ return {'devices_up': devices_up,
+ 'failed_devices_up': failed_devices_up,
+ 'devices_down': devices_down,
+ 'failed_devices_down': failed_devices_down}
+
class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
sg_rpc.SecurityGroupAgentRpcApiMixin,
for port in [self.patch_tun, self.patch_int]:
self.assertTrue(self.ovs.port_exists(port))
- def assert_no_vlan_tags(self, ports, agent):
- for port in ports:
- res = agent.int_br.db_get_val('Port', port.get('vif_name'), 'tag')
- self.assertEqual([], res)
-
def assert_vlan_tags(self, ports, agent):
for port in ports:
res = agent.int_br.db_get_val('Port', port.get('vif_name'), 'tag')
class TestOVSAgent(OVSAgentTestFramework):
- def _expected_plugin_rpc_call(self, call, expected_devices):
+ def _expected_plugin_rpc_call(self, call, expected_devices, is_up=True):
"""Helper to check expected rpc call are received
:param call: The call to check
:param expected_devices The device for which call is expected
+ :param is_up True if expected_devices are devices that are set up,
+ False if expected_devices are devices that are set down
"""
- args = (args[0][1] for args in call.call_args_list)
- return not (set(expected_devices) - set(args))
+ if is_up:
+ rpc_devices = [
+ dev for args in call.call_args_list for dev in args[0][1]]
+ else:
+ rpc_devices = [
+ dev for args in call.call_args_list for dev in args[0][2]]
+ return not (set(expected_devices) - set(rpc_devices))
- def _create_ports(self, network, agent):
+ def _create_ports(self, network, agent, trigger_resync=False):
ports = []
for x in range(3):
ports.append(self._create_test_port_dict())
+ def mock_device_raise_exception(context, devices_up, devices_down,
+ agent_id, host=None):
+ agent.plugin_rpc.update_device_list.side_effect = (
+ mock_update_device)
+ raise Exception('Exception to trigger resync')
+
def mock_device_details(context, devices, agent_id, host=None):
+
details = []
for port in ports:
if port['id'] in devices:
dev = OVSAgentTestFramework._get_device_details(
port, network)
details.append(dev)
- return details
+ return {'devices': details, 'failed_devices': []}
- agent.plugin_rpc.get_devices_details_list.side_effect = (
- mock_device_details)
+ def mock_update_device(context, devices_up, devices_down, agent_id,
+ host=None):
+ dev_up = []
+ dev_down = []
+ for port in ports:
+ if devices_up and port['id'] in devices_up:
+ dev_up.append(port['id'])
+ if devices_down and port['id'] in devices_down:
+ dev_down.append({'device': port['id'], 'exists': True})
+ return {'devices_up': dev_up,
+ 'failed_devices_up': [],
+ 'devices_down': dev_down,
+ 'failed_devices_down': []}
+
+ (agent.plugin_rpc.get_devices_details_list_and_failed_devices.
+ side_effect) = mock_device_details
+ if trigger_resync:
+ agent.plugin_rpc.update_device_list.side_effect = (
+ mock_device_raise_exception)
+ else:
+ agent.plugin_rpc.update_device_list.side_effect = (
+ mock_update_device)
return ports
def test_port_creation_and_deletion(self):
up_ports_ids = [p['id'] for p in ports]
agent_utils.wait_until_true(
lambda: self._expected_plugin_rpc_call(
- agent.plugin_rpc.update_device_up, up_ports_ids))
+ agent.plugin_rpc.update_device_list, up_ports_ids))
down_ports_ids = [p['id'] for p in ports]
for port in ports:
agent.int_br.delete_port(port['vif_name'])
agent_utils.wait_until_true(
lambda: self._expected_plugin_rpc_call(
- agent.plugin_rpc.update_device_down, down_ports_ids))
+ agent.plugin_rpc.update_device_list, down_ports_ids, False))
def test_resync_devices_set_up_after_exception(self):
agent = self.create_agent()
self.start_agent(agent)
network = self._create_test_network_dict()
- ports = self._create_ports(network, agent)
- agent.plugin_rpc.update_device_up.side_effect = [
- Exception('Exception to trigger resync'),
- None, None, None]
+ ports = self._create_ports(network, agent, True)
self._plug_ports(network, ports, agent)
ports_ids = [p['id'] for p in ports]
agent_utils.wait_until_true(
lambda: self._expected_plugin_rpc_call(
- agent.plugin_rpc.update_device_up, ports_ids))
+ agent.plugin_rpc.update_device_list, ports_ids))
def test_port_vlan_tags(self):
agent = self.create_agent()
self.start_agent(agent)
- ports = []
- for x in range(3):
- ports.append(self._create_test_port_dict())
network = self._create_test_network_dict()
+ ports = self._create_ports(network, agent)
+ ports_ids = [p['id'] for p in ports]
self._plug_ports(network, ports, agent)
- agent.provision_local_vlan(network['id'], 'vlan', 'physnet', 1)
- self.assert_no_vlan_tags(ports, agent)
- self._bind_ports(ports, network, agent)
+ agent_utils.wait_until_true(
+ lambda: self._expected_plugin_rpc_call(
+ agent.plugin_rpc.update_device_list, ports_ids))
self.assert_vlan_tags(ports, agent)
def test_assert_bridges_ports_vxlan(self):
vif_port_set, registered_ports, port_tags_dict=port_tags_dict)
self.assertEqual(expected, actual)
- def test_treat_devices_added_returns_raises_for_missing_device(self):
- with mock.patch.object(self.agent.plugin_rpc,
- 'get_devices_details_list',
- side_effect=Exception()),\
- mock.patch.object(self.agent.int_br,
- 'get_vif_port_by_id',
- return_value=mock.Mock()):
- self.assertRaises(
- self.mod_agent.DeviceListRetrievalError,
- self.agent.treat_devices_added_or_updated, [{}], False)
+ def test_bind_devices(self):
+ devices_up = ['tap1']
+ devices_down = ['tap2']
+ self.agent.local_vlan_map["net1"] = mock.Mock()
+ port_details = [
+ {'network_id': 'net1', 'vif_port': mock.Mock(),
+ 'device': devices_up[0],
+ 'admin_state_up': True},
+ {'network_id': 'net1', 'vif_port': mock.Mock(),
+ 'device': devices_down[0],
+ 'admin_state_up': False}]
+ with mock.patch.object(
+ self.agent.plugin_rpc, 'update_device_list',
+ return_value={'devices_up': devices_up,
+ 'devices_down': devices_down,
+ 'failed_devices_up': [],
+ 'failed_devices_down': []}) as update_devices, \
+ mock.patch.object(self.agent,
+ 'int_br') as int_br:
+ int_br.db_list.return_value = []
+ self.agent._bind_devices(port_details)
+ update_devices.assert_called_once_with(mock.ANY, devices_up,
+ devices_down,
+ mock.ANY, mock.ANY)
def _mock_treat_devices_added_updated(self, details, port, func_name):
"""Mock treat devices added or updated.
:returns: whether the named function was called
"""
with mock.patch.object(self.agent.plugin_rpc,
- 'get_devices_details_list',
- return_value=[details]),\
+ 'get_devices_details_list_and_failed_devices',
+ return_value={'devices': [details],
+ 'failed_devices': None}),\
mock.patch.object(self.agent.int_br,
'get_vifs_by_ids',
return_value={details['device']: port}),\
+ mock.patch.object(self.agent.plugin_rpc, 'update_device_list',
+ return_value={'devices_up': [],
+ 'devices_down': details,
+ 'failed_devices_up': [],
+ 'failed_devices_down': []}),\
mock.patch.object(self.agent, func_name) as func:
skip_devs, need_bound_devices = (
self.agent.treat_devices_added_or_updated([{}], False))
mock.MagicMock(), port, 'port_dead'))
def test_treat_devices_added_does_not_process_missing_port(self):
- with mock.patch.object(self.agent.plugin_rpc,
- 'get_device_details') as get_dev_fn,\
+ with mock.patch.object(
+ self.agent.plugin_rpc,
+ 'get_devices_details_list_and_failed_devices') as get_dev_fn,\
mock.patch.object(self.agent.int_br,
'get_vif_port_by_id',
return_value=None):
dev_mock = mock.MagicMock()
dev_mock.__getitem__.return_value = 'the_skipped_one'
with mock.patch.object(self.agent.plugin_rpc,
- 'get_devices_details_list',
- return_value=[dev_mock]),\
+ 'get_devices_details_list_and_failed_devices',
+ return_value={'devices': [dev_mock],
+ 'failed_devices': None}),\
mock.patch.object(self.agent.int_br,
'get_vifs_by_ids',
return_value={}),\
}
with mock.patch.object(self.agent.plugin_rpc,
- 'get_devices_details_list',
- return_value=[fake_details_dict]),\
+ 'get_devices_details_list_and_failed_devices',
+ return_value={'devices': [fake_details_dict],
+ 'failed_devices': None}),\
mock.patch.object(self.agent.int_br,
'get_vifs_by_ids',
return_value={'xxx': mock.MagicMock()}),\
self.assertFalse(skip_devs)
self.assertTrue(treat_vif_port.called)
- def test_treat_devices_removed_returns_true_for_missing_device(self):
- with mock.patch.object(self.agent.plugin_rpc, 'update_device_down',
- side_effect=Exception()):
- self.assertTrue(self.agent.treat_devices_removed([{}]))
-
def _mock_treat_devices_removed(self, port_exists):
details = dict(exists=port_exists)
- with mock.patch.object(self.agent.plugin_rpc, 'update_device_down',
- return_value=details):
+ with mock.patch.object(self.agent.plugin_rpc,
+ 'update_device_list',
+ return_value={'devices_up': [],
+ 'devices_down': details,
+ 'failed_devices_up': [],
+ 'failed_devices_down': []}):
with mock.patch.object(self.agent, 'port_unbound') as port_unbound:
self.assertFalse(self.agent.treat_devices_removed([{}]))
self.assertTrue(port_unbound.called)
'physical_network', 'segmentation_id',
'admin_state_up', 'fixed_ips', 'device',
'device_owner')}]
- self.agent.plugin_rpc.get_devices_details_list.return_value = plist
+ (self.agent.plugin_rpc.get_devices_details_list_and_failed_devices.
+ return_value) = {'devices': plist, 'failed_devices': []}
+ self.agent.plugin_rpc.update_device_list.return_value = {
+ 'devices_up': plist, 'devices_down': [], 'failed_devices_up': [],
+ 'failed_devices_down': []}
self.agent.setup_arp_spoofing_protection = mock.Mock()
self.agent.treat_devices_added_or_updated([], False)
self.assertFalse(self.agent.setup_arp_spoofing_protection.called)
int_br.reset_mock()
tun_br.reset_mock()
with mock.patch.object(self.agent, 'reclaim_local_vlan'),\
- mock.patch.object(self.agent.plugin_rpc,
- 'update_device_down',
- return_value=None),\
+ mock.patch.object(self.agent.plugin_rpc, 'update_device_list',
+ return_value={
+ 'devices_up': [],
+ 'devices_down': [self._port.vif_id],
+ 'failed_devices_up': [],
+ 'failed_devices_down': []}),\
mock.patch.object(self.agent, 'int_br', new=int_br),\
mock.patch.object(self.agent, 'tun_br', new=tun_br),\
mock.patch.object(self.agent.dvr_agent, 'int_br', new=int_br),\
int_br.reset_mock()
tun_br.reset_mock()
with mock.patch.object(self.agent, 'reclaim_local_vlan'),\
- mock.patch.object(self.agent.plugin_rpc,
- 'update_device_down',
- return_value=None),\
+ mock.patch.object(self.agent.plugin_rpc, 'update_device_list',
+ return_value={
+ 'devices_up': [],
+ 'devices_down': [
+ self._compute_port.vif_id],
+ 'failed_devices_up': [],
+ 'failed_devices_down': []}),\
mock.patch.object(self.agent, 'int_br', new=int_br),\
mock.patch.object(self.agent, 'tun_br', new=tun_br),\
mock.patch.object(self.agent.dvr_agent, 'int_br', new=int_br),\
int_br.reset_mock()
tun_br.reset_mock()
with mock.patch.object(self.agent, 'reclaim_local_vlan'),\
- mock.patch.object(self.agent.plugin_rpc,
- 'update_device_down',
- return_value=None),\
+ mock.patch.object(self.agent.plugin_rpc, 'update_device_list',
+ return_value={
+ 'devices_up': [],
+ 'devices_down': [self._port.vif_id],
+ 'failed_devices_up': [],
+ 'failed_devices_down': []}),\
mock.patch.object(self.agent, 'int_br', new=int_br),\
mock.patch.object(self.agent, 'tun_br', new=tun_br),\
mock.patch.object(self.agent.dvr_agent, 'int_br', new=int_br),\
import mock
from oslo_config import cfg
from oslo_context import context as oslo_context
+import oslo_messaging
from sqlalchemy.orm import exc
from neutron.agent import rpc as agent_rpc
self.callbacks.get_device_details(mock.Mock())
self.assertTrue(self.plugin.update_port_status.called)
- def test_get_devices_details_list(self):
+ def _test_get_devices_list(self, callback, side_effect, expected):
devices = [1, 2, 3, 4, 5]
kwargs = {'host': 'fake_host', 'agent_id': 'fake_agent_id'}
with mock.patch.object(self.callbacks, 'get_device_details',
- side_effect=devices) as f:
- res = self.callbacks.get_devices_details_list('fake_context',
- devices=devices,
- **kwargs)
- self.assertEqual(devices, res)
+ side_effect=side_effect) as f:
+ res = callback('fake_context', devices=devices, **kwargs)
+ self.assertEqual(expected, res)
self.assertEqual(len(devices), f.call_count)
calls = [mock.call('fake_context', device=i,
cached_networks={}, **kwargs)
for i in devices]
f.assert_has_calls(calls)
+ def test_get_devices_details_list(self):
+ devices = [1, 2, 3, 4, 5]
+ expected = devices
+ callback = self.callbacks.get_devices_details_list
+ self._test_get_devices_list(callback, devices, expected)
+
def test_get_devices_details_list_with_empty_devices(self):
with mock.patch.object(self.callbacks, 'get_device_details') as f:
res = self.callbacks.get_devices_details_list('fake_context')
self.assertFalse(f.called)
self.assertEqual([], res)
+ def test_get_devices_details_list_and_failed_devices(self):
+ devices = [1, 2, 3, 4, 5]
+ expected = {'devices': devices, 'failed_devices': []}
+ callback = (
+ self.callbacks.get_devices_details_list_and_failed_devices)
+ self._test_get_devices_list(callback, devices, expected)
+
+ def test_get_devices_details_list_and_failed_devices_failures(self):
+ devices = [1, Exception('testdevice'), 3,
+ Exception('testdevice'), 5]
+ expected = {'devices': [1, 3, 5], 'failed_devices': [2, 4]}
+ callback = (
+ self.callbacks.get_devices_details_list_and_failed_devices)
+ self._test_get_devices_list(callback, devices, expected)
+
+ def test_get_devices_details_list_and_failed_devices_empty_dev(self):
+ with mock.patch.object(self.callbacks, 'get_device_details') as f:
+ res = self.callbacks.get_devices_details_list_and_failed_devices(
+ 'fake_context')
+ self.assertFalse(f.called)
+ self.assertEqual({'devices': [], 'failed_devices': []}, res)
+
def _test_update_device_not_bound_to_host(self, func):
self.plugin.port_bound_to_host.return_value = False
self.plugin._device_to_port_id.return_value = 'fake_port_id'
self.callbacks.update_device_down(
mock.Mock(), device='fake_device'))
+ def _test_update_device_list(self, devices_up_side_effect,
+ devices_down_side_effect, expected):
+ devices_up = [1, 2, 3]
+ devices_down = [4, 5]
+ kwargs = {'host': 'fake_host', 'agent_id': 'fake_agent_id'}
+ with mock.patch.object(self.callbacks, 'update_device_up',
+ side_effect=devices_up_side_effect) as f_up, \
+ mock.patch.object(self.callbacks, 'update_device_down',
+ side_effect=devices_down_side_effect) as f_down:
+ res = self.callbacks.update_device_list(
+ 'fake_context', devices_up=devices_up,
+ devices_down=devices_down, **kwargs)
+ self.assertEqual(expected, res)
+ self.assertEqual(len(devices_up), f_up.call_count)
+ self.assertEqual(len(devices_down), f_down.call_count)
+
+ def test_update_device_list_no_failure(self):
+ devices_up_side_effect = [1, 2, 3]
+ devices_down_side_effect = [
+ {'device': 4, 'exists': True},
+ {'device': 5, 'exists': True}]
+ expected = {'devices_up': devices_up_side_effect,
+ 'failed_devices_up': [],
+ 'devices_down':
+ [{'device': 4, 'exists': True},
+ {'device': 5, 'exists': True}],
+ 'failed_devices_down': []}
+ self._test_update_device_list(devices_up_side_effect,
+ devices_down_side_effect,
+ expected)
+
+ def test_update_device_list_failed_devices(self):
+
+ devices_up_side_effect = [1, Exception('testdevice'), 3]
+ devices_down_side_effect = [{'device': 4, 'exists': True},
+ Exception('testdevice')]
+ expected = {'devices_up': [1, 3],
+ 'failed_devices_up': [2],
+ 'devices_down':
+ [{'device': 4, 'exists': True}],
+ 'failed_devices_down': [5]}
+
+ self._test_update_device_list(devices_up_side_effect,
+ devices_down_side_effect,
+ expected)
+
+ def test_update_device_list_empty_devices(self):
+
+ expected = {'devices_up': [],
+ 'failed_devices_up': [],
+ 'devices_down': [],
+ 'failed_devices_down': []}
+
+ kwargs = {'host': 'fake_host', 'agent_id': 'fake_agent_id'}
+ res = self.callbacks.update_device_list(
+ 'fake_context', devices_up=[], devices_down=[], **kwargs)
+ self.assertEqual(expected, res)
+
class RpcApiTestCase(base.BaseTestCase):
device='fake_device',
agent_id='fake_agent_id',
host='fake_host')
+
+ def test_update_device_list(self):
+ rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
+ self._test_rpc_api(rpcapi, None,
+ 'update_device_list', rpc_method='call',
+ devices_up=['fake_device1', 'fake_device2'],
+ devices_down=['fake_device3', 'fake_device4'],
+ agent_id='fake_agent_id',
+ host='fake_host',
+ version='1.5')
+
+ def test_update_device_list_unsupported(self):
+ rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
+ ctxt = oslo_context.RequestContext('fake_user', 'fake_project')
+ devices_up = ['fake_device1', 'fake_device2']
+ devices_down = ['fake_device3', 'fake_device4']
+ expected_ret_val = {'devices_up': ['fake_device2'],
+ 'failed_devices_up': ['fake_device1'],
+ 'devices_down': [
+ {'device': 'fake_device3', 'exists': True}],
+ 'failed_devices_down': ['fake_device4']}
+ rpcapi.update_device_up = mock.Mock(
+ side_effect=[Exception('fake_device1 fails'), None])
+ rpcapi.update_device_down = mock.Mock(
+ side_effect=[{'device': 'fake_device3', 'exists': True},
+ Exception('fake_device4 fails')])
+ with mock.patch.object(rpcapi.client, 'call'),\
+ mock.patch.object(rpcapi.client, 'prepare') as prepare_mock:
+ prepare_mock.side_effect = oslo_messaging.UnsupportedVersion(
+ 'test')
+ res = rpcapi.update_device_list(ctxt, devices_up, devices_down,
+ 'fake_agent_id', 'fake_host')
+ self.assertEqual(expected_ret_val, res)
+
+ def test_get_devices_details_list_and_failed_devices(self):
+ rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
+ self._test_rpc_api(rpcapi, None,
+ 'get_devices_details_list_and_failed_devices',
+ rpc_method='call',
+ devices=['fake_device1', 'fake_device2'],
+ agent_id='fake_agent_id',
+ host='fake_host',
+ version='1.5')
+
+ def test_devices_details_list_and_failed_devices(self):
+ rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
+ self._test_rpc_api(rpcapi, None,
+ 'get_devices_details_list_and_failed_devices',
+ rpc_method='call',
+ devices=['fake_device1', 'fake_device2'],
+ agent_id='fake_agent_id', host='fake_host',
+ version='1.5')
+
+ def test_get_devices_details_list_and_failed_devices_unsupported(self):
+ rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
+ ctxt = oslo_context.RequestContext('fake_user', 'fake_project')
+ devices = ['fake_device1', 'fake_device2']
+ dev2_details = {'device': 'fake_device2', 'network_id': 'net_id',
+ 'port_id': 'port_id', 'admin_state_up': True}
+ expected_ret_val = {'devices': [dev2_details],
+ 'failed_devices': ['fake_device1']}
+ rpcapi.get_device_details = mock.Mock(
+ side_effect=[Exception('fake_device1 fails'), dev2_details])
+ with mock.patch.object(rpcapi.client, 'call'),\
+ mock.patch.object(rpcapi.client, 'prepare') as prepare_mock:
+ prepare_mock.side_effect = oslo_messaging.UnsupportedVersion(
+ 'test')
+ res = rpcapi.get_devices_details_list_and_failed_devices(
+ ctxt, devices, 'fake_agent_id', 'fake_host')
+ self.assertEqual(expected_ret_val, res)