agent_id=agent_id),
topic=self.topic)
- def update_device_down(self, context, device, agent_id):
+ def update_device_down(self, context, device, agent_id, host=None):
return self.call(context,
self.make_msg('update_device_down', device=device,
- agent_id=agent_id),
+ agent_id=agent_id, host=host),
topic=self.topic)
- def update_device_up(self, context, device, agent_id):
+ def update_device_up(self, context, device, agent_id, host=None):
return self.call(context,
self.make_msg('update_device_up', device=device,
- agent_id=agent_id),
+ agent_id=agent_id, host=host),
topic=self.topic)
def tunnel_sync(self, context, tunnel_ip, tunnel_type=None):
try:
self.plugin_rpc.update_device_down(self.context,
device,
- self.agent_id)
+ self.agent_id,
+ cfg.CONF.host)
except Exception as e:
LOG.debug(
_("Removing port failed for device %(device)s: %(e)s"),
# update plugin about port status
self.agent.plugin_rpc.update_device_up(self.context,
tap_device_name,
- self.agent.agent_id)
+ self.agent.agent_id,
+ cfg.CONF.host)
else:
self.plugin_rpc.update_device_down(self.context,
tap_device_name,
- self.agent.agent_id)
+ self.agent.agent_id,
+ cfg.CONF.host)
else:
bridge_name = self.agent.br_mgr.get_bridge_name(
port['network_id'])
# update plugin about port status
self.agent.plugin_rpc.update_device_down(self.context,
tap_device_name,
- self.agent.agent_id)
+ self.agent.agent_id,
+ cfg.CONF.host)
except rpc_common.Timeout:
LOG.error(_("RPC timeout while updating port %s"), port['id'])
# update plugin about port status
self.plugin_rpc.update_device_up(self.context,
device,
- self.agent_id)
+ self.agent_id,
+ cfg.CONF.host)
else:
self.plugin_rpc.update_device_down(self.context,
device,
- self.agent_id)
+ self.agent_id,
+ cfg.CONF.host)
else:
self.remove_port_binding(details['network_id'],
details['port_id'])
try:
details = self.plugin_rpc.update_device_down(self.context,
device,
- self.agent_id)
+ self.agent_id,
+ cfg.CONF.host)
except Exception as e:
LOG.debug(_("port_removed failed for %(device)s: %(e)s"),
{'device': device, 'e': e})
from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.extensions import portbindings
from neutron.extensions import providernet as provider
+from neutron import manager
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
# TODO(garyk) - live migration and port status
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
+ host = kwargs.get('host')
+ port = self.get_port_from_device(device)
LOG.debug(_("Device %(device)s no longer exists on %(agent_id)s"),
{'device': device, 'agent_id': agent_id})
- port = self.get_port_from_device(device)
+ plugin = manager.NeutronManager.get_plugin()
if port:
entry = {'device': device,
'exists': True}
- if port['status'] != q_const.PORT_STATUS_DOWN:
+ if (host and not
+ plugin.get_port_host(rpc_context, port['id']) == host):
+ LOG.debug(_("Device %(device)s not bound to the"
+ " agent host %(host)s"),
+ {'device': device, 'host': host})
+ elif port['status'] != q_const.PORT_STATUS_DOWN:
# Set port status to DOWN
db.set_port_status(port['id'], q_const.PORT_STATUS_DOWN)
else:
"""Device is up on agent."""
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
- LOG.debug(_("Device %(device)s up %(agent_id)s"),
+ host = kwargs.get('host')
+ port = self.get_port_from_device.get_port(device)
+ LOG.debug(_("Device %(device)s up on %(agent_id)s"),
{'device': device, 'agent_id': agent_id})
- port = self.get_port_from_device(device)
+ plugin = manager.NeutronManager.get_plugin()
if port:
- if port['status'] != q_const.PORT_STATUS_ACTIVE:
- # Set port status to ACTIVE
- db.set_port_status(port['id'], q_const.PORT_STATUS_ACTIVE)
+ if (host and
+ not plugin.get_port_host(rpc_context, port['id']) == host):
+ LOG.debug(_("Device %(device)s not bound to the"
+ " agent host %(host)s"),
+ {'device': device, 'host': host})
+ return
+ elif port['status'] != q_const.PORT_STATUS_ACTIVE:
+ db.set_port_status(port['id'],
+ q_const.PORT_STATUS_ACTIVE)
else:
LOG.debug(_("%s can not be found in database"), device)
port_dict['fixed_ips'] = [ip['ip_address']
for ip in port['fixed_ips']]
return port_dict
+
+
+def get_port_binding_host(port_id):
+ session = db_api.get_session()
+ with session.begin(subtransactions=True):
+ try:
+ query = (session.query(models.PortBinding).
+ filter(models.PortBinding.port_id.startswith(port_id)).
+ one())
+ except exc.NoResultFound:
+ LOG.debug(_("No binding found for port %(port_id)s"),
+ {'port_id': port_id})
+ return
+ return query.host
LOG.warning(_("Port %(port)s updated up by agent not found"),
{'port': port_id})
return False
-
if port.status != status:
original_port = self._make_port_dict(port)
port.status = status
self.mechanism_manager.update_port_postcommit(mech_context)
return True
+
+ def port_bound_to_host(self, port_id, host):
+ port_host = db.get_port_binding_host(port_id)
+ return (port_host == host)
# TODO(garyk) - live migration and port status
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
+ host = kwargs.get('host')
LOG.debug(_("Device %(device)s no longer exists at agent "
"%(agent_id)s"),
{'device': device, 'agent_id': agent_id})
+ plugin = manager.NeutronManager.get_plugin()
port_id = self._device_to_port_id(device)
+ port_exists = True
+ if (host and not plugin.port_bound_to_host(port_id, host)):
+ LOG.debug(_("Device %(device)s not bound to the"
+ " agent host %(host)s"),
+ {'device': device, 'host': host})
+ return {'device': device,
+ 'exists': port_exists}
- plugin = manager.NeutronManager.get_plugin()
port_exists = plugin.update_port_status(rpc_context, port_id,
q_const.PORT_STATUS_DOWN)
"""Device is up on agent."""
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
+ host = kwargs.get('host')
LOG.debug(_("Device %(device)s up at agent %(agent_id)s"),
{'device': device, 'agent_id': agent_id})
+ plugin = manager.NeutronManager.get_plugin()
port_id = self._device_to_port_id(device)
+ if (host and not plugin.port_bound_to_host(port_id, host)):
+ LOG.debug(_("Device %(device)s not bound to the"
+ " agent host %(host)s"),
+ {'device': device, 'host': host})
+ return
- plugin = manager.NeutronManager.get_plugin()
plugin.update_port_status(rpc_context, port_id,
q_const.PORT_STATUS_ACTIVE)
if port['admin_state_up']:
# update plugin about port status
self.plugin_rpc.update_device_up(self.context, port['id'],
- self.agent_id)
+ self.agent_id,
+ cfg.CONF.host)
else:
# update plugin about port status
self.plugin_rpc.update_device_down(self.context, port['id'],
- self.agent_id)
+ self.agent_id,
+ cfg.CONF.host)
except rpc_common.Timeout:
LOG.error(_("RPC timeout while updating port %s"), port['id'])
# update plugin about port status
self.plugin_rpc.update_device_up(self.context,
device,
- self.agent_id)
+ self.agent_id,
+ cfg.CONF.host)
else:
LOG.debug(_("Device %s not defined on plugin"), device)
if (port and int(port.ofport) != -1):
# update plugin about port status
self.plugin_rpc.update_device_up(self.context,
device,
- self.agent_id)
+ self.agent_id,
+ cfg.CONF.host)
return resync
def treat_devices_removed(self, devices):
try:
details = self.plugin_rpc.update_device_down(self.context,
device,
- self.agent_id)
+ self.agent_id,
+ cfg.CONF.host)
except Exception as e:
LOG.debug(_("port_removed failed for %(device)s: %(e)s"),
{'device': device, 'e': e})
try:
details = self.plugin_rpc.update_device_down(self.context,
device,
- self.agent_id)
+ self.agent_id,
+ cfg.CONF.host)
except Exception as e:
LOG.debug(_("port_removed failed for %(device)s: %(e)s"),
{'device': device, 'e': e})
from neutron.extensions import extra_dhcp_opt as edo_ext
from neutron.extensions import portbindings
from neutron.extensions import providernet as provider
+from neutron import manager
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
def update_device_down(self, rpc_context, **kwargs):
"""Device no longer exists on agent."""
- # TODO(garyk) - live migration and port status
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
+ host = kwargs.get('host')
+ port = ovs_db_v2.get_port(device)
LOG.debug(_("Device %(device)s no longer exists on %(agent_id)s"),
{'device': device, 'agent_id': agent_id})
- port = ovs_db_v2.get_port(device)
if port:
entry = {'device': device,
'exists': True}
- if port['status'] != q_const.PORT_STATUS_DOWN:
+ plugin = manager.NeutronManager.get_plugin()
+ if (host and
+ not plugin.get_port_host(rpc_context, port['id']) == host):
+ LOG.debug(_("Device %(device)s not bound to the"
+ " agent host %(host)s"),
+ {'device': device, 'host': host})
+ elif port['status'] != q_const.PORT_STATUS_DOWN:
# Set port status to DOWN
- ovs_db_v2.set_port_status(port['id'], q_const.PORT_STATUS_DOWN)
+ ovs_db_v2.set_port_status(port['id'],
+ q_const.PORT_STATUS_DOWN)
else:
entry = {'device': device,
'exists': False}
"""Device is up on agent."""
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
+ host = kwargs.get('host')
+ port = ovs_db_v2.get_port(device)
LOG.debug(_("Device %(device)s up on %(agent_id)s"),
{'device': device, 'agent_id': agent_id})
- port = ovs_db_v2.get_port(device)
+ plugin = manager.NeutronManager.get_plugin()
if port:
- if port['status'] != q_const.PORT_STATUS_ACTIVE:
+ if (host and
+ not plugin.get_port_host(rpc_context, port['id']) == host):
+ LOG.debug(_("Device %(device)s not bound to the"
+ " agent host %(host)s"),
+ {'device': device, 'host': host})
+ return
+ elif port['status'] != q_const.PORT_STATUS_ACTIVE:
ovs_db_v2.set_port_status(port['id'],
q_const.PORT_STATUS_ACTIVE)
else:
rpcapi, topics.PLUGIN,
'update_device_down', rpc_method='call',
device='fake_device',
- agent_id='fake_agent_id')
+ agent_id='fake_agent_id',
+ host='fake_host')
def test_tunnel_sync(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
rpc_obj.update_device_down.assert_called_with(
self.lb_rpc.context,
"tap123",
- self.lb_rpc.agent.agent_id
+ self.lb_rpc.agent.agent_id,
+ cfg.CONF.host
)
def test_port_update_plugin_rpc_failed(self):
self._test_lb_api(rpcapi, topics.PLUGIN,
'update_device_down', rpc_method='call',
device='fake_device',
- agent_id='fake_agent_id')
+ agent_id='fake_agent_id',
+ host='fake_host')
def test_update_device_up(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_lb_api(rpcapi, topics.PLUGIN,
'update_device_up', rpc_method='call',
device='fake_device',
- agent_id='fake_agent_id')
+ agent_id='fake_agent_id',
+ host='fake_host')
self._test_rpc_api(rpcapi, topics.PLUGIN,
'update_device_down', rpc_method='call',
device='fake_device',
- agent_id='fake_agent_id')
+ agent_id='fake_agent_id',
+ host='fake_host')
def test_tunnel_sync(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_rpc_api(rpcapi, topics.PLUGIN,
'update_device_up', rpc_method='call',
device='fake_device',
- agent_id='fake_agent_id')
+ agent_id='fake_agent_id',
+ host='fake_host')
self._test_mlnx_api(rpcapi, topics.PLUGIN,
'update_device_down', rpc_method='call',
device='fake_device',
- agent_id='fake_agent_id')
+ agent_id='fake_agent_id',
+ host='fake_host')
def test_update_device_up(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_mlnx_api(rpcapi, topics.PLUGIN,
'update_device_up', rpc_method='call',
device='fake_device',
- agent_id='fake_agent_id')
+ agent_id='fake_agent_id',
+ host='fake_host')
"124", "vlan", "physnet",
"1", False)
upddown_fn.assert_called_with(self.agent.context,
- "123", self.agent.agent_id)
+ "123", self.agent.agent_id,
+ cfg.CONF.host)
port["admin_state_up"] = True
self.agent.port_update("unused_context",
segmentation_id="1",
physical_network="physnet")
updup_fn.assert_called_with(self.agent.context,
- "123", self.agent.agent_id)
+ "123", self.agent.agent_id,
+ cfg.CONF.host)
def test_port_update_plugin_rpc_failed(self):
port = {'id': 1,
self._test_ovs_api(rpcapi, topics.PLUGIN,
'update_device_down', rpc_method='call',
device='fake_device',
- agent_id='fake_agent_id')
+ agent_id='fake_agent_id',
+ host='fake_host')
def test_tunnel_sync(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_ovs_api(rpcapi, topics.PLUGIN,
'update_device_up', rpc_method='call',
device='fake_device',
- agent_id='fake_agent_id')
+ agent_id='fake_agent_id',
+ host='fake_host')