API version history:
1.0 - Initial version.
-
+ 1.3 - get_device_details rpc signature upgrade to obtain 'host' and
+ return value to include fixed_ips and device_owner for
+ the device port
'''
BASE_RPC_API_VERSION = '1.1'
super(PluginApi, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
- def get_device_details(self, context, device, agent_id):
+ def get_device_details(self, context, device, agent_id, host=None):
return self.call(context,
self.make_msg('get_device_details', device=device,
- agent_id=agent_id),
+ agent_id=agent_id, host=host),
topic=self.topic)
- def get_devices_details_list(self, context, devices, agent_id):
+ def get_devices_details_list(self, context, devices, agent_id, host=None):
res = []
try:
res = self.call(context,
self.make_msg('get_devices_details_list',
devices=devices,
- agent_id=agent_id),
- topic=self.topic, version='1.2')
+ agent_id=agent_id,
+ host=host),
+ topic=self.topic, version='1.3')
except messaging.UnsupportedVersion:
+ # If the server has not been upgraded yet, a DVR-enabled agent
+ # may not work correctly, however it can function in 'degraded'
+ # mode, in that DVR routers may not be in the system yet, and
+ # it might be not necessary to retrieve info about the host.
+ LOG.warn(_('DVR functionality requires a server upgrade.'))
res = [
self.call(context,
self.make_msg('get_device_details', device=device,
- agent_id=agent_id),
+ agent_id=agent_id, host=host),
topic=self.topic)
for device in devices
]
--- /dev/null
+# Copyright 2014, Hewlett-Packard Development Company, L.P.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from neutron.common import log
+from neutron.common import topics
+from neutron import manager
+from neutron.openstack.common import log as logging
+
+LOG = logging.getLogger(__name__)
+
+
+class DVRServerRpcApiMixin(object):
+ """Agent-side RPC (stub) for agent-to-plugin interaction."""
+
+ DVR_RPC_VERSION = "1.0"
+
+ @log.log
+ def get_dvr_mac_address_by_host(self, context, host):
+ return self.call(context,
+ self.make_msg('get_dvr_mac_address_by_host',
+ host=host),
+ version=self.DVR_RPC_VERSION,
+ topic=self.topic)
+
+ @log.log
+ def get_dvr_mac_address_list(self, context):
+ return self.call(context,
+ self.make_msg('get_dvr_mac_address_list'),
+ version=self.DVR_RPC_VERSION,
+ topic=self.topic)
+
+ @log.log
+ def get_compute_ports_on_host_by_subnet(self, context, host, subnet):
+ return self.call(context,
+ self.make_msg('get_compute_ports_on_host_by_subnet',
+ host=host,
+ subnet=subnet),
+ version=self.DVR_RPC_VERSION,
+ topic=self.topic)
+
+ @log.log
+ def get_subnet_for_dvr(self, context, subnet):
+ return self.call(context,
+ self.make_msg('get_subnet_for_dvr',
+ subnet=subnet),
+ version=self.DVR_RPC_VERSION,
+ topic=self.topic)
+
+
+class DVRServerRpcCallbackMixin(object):
+ """Plugin-side RPC (implementation) for agent-to-plugin interaction."""
+
+ @property
+ def plugin(self):
+ if not getattr(self, '_plugin', None):
+ self._plugin = manager.NeutronManager.get_plugin()
+ return self._plugin
+
+ def get_dvr_mac_address_list(self, context):
+ return self.plugin.get_dvr_mac_address_list(context)
+
+ def get_dvr_mac_address_by_host(self, context, host):
+ return self.plugin.get_dvr_mac_address_by_host(context, host)
+
+ def get_compute_ports_on_host_by_subnet(self, context, host, subnet):
+ return self.plugin.get_compute_ports_on_host_by_subnet(context,
+ host,
+ subnet)
+
+ def get_subnet_for_dvr(self, context, subnet):
+ return self.plugin.get_subnet_for_dvr(context, subnet)
+
+
+class DVRAgentRpcApiMixin(object):
+ """Plugin-side RPC (stub) for plugin-to-agent interaction."""
+
+ DVR_RPC_VERSION = "1.0"
+
+ def _get_dvr_update_topic(self):
+ return topics.get_topic_name(self.topic,
+ topics.DVR,
+ topics.UPDATE)
+
+ def dvr_mac_address_update(self, context, dvr_macs):
+ """Notify dvr mac address updates."""
+ if not dvr_macs:
+ return
+ self.fanout_cast(context,
+ self.make_msg('dvr_mac_address_update',
+ dvr_macs=dvr_macs),
+ version=self.DVR_RPC_VERSION,
+ topic=self._get_dvr_update_topic())
+
+
+class DVRAgentRpcCallbackMixin(object):
+ """Agent-side RPC (implementation) for plugin-to-agent interaction."""
+
+ dvr_agent = None
+
+ def dvr_mac_address_update(self, context, **kwargs):
+ """Callback for dvr_mac_addresses update.
+
+ :param dvr_macs: list of updated dvr_macs
+ """
+ dvr_macs = kwargs.get('dvr_macs', [])
+ LOG.debug("dvr_macs updated on remote: %s", dvr_macs)
+ if not self.dvr_agent:
+ LOG.warn(_("DVR agent binding currently not set."))
+ return
+ self.dvr_agent.dvr_mac_address_update(dvr_macs)
PORT = 'port'
SECURITY_GROUP = 'security_group'
L2POPULATION = 'l2population'
+DVR = 'dvr'
CREATE = 'create'
DELETE = 'delete'
return dvrma
def _create_dvr_mac_address(self, context, host):
- """Create dvr mac address for a given host."""
+ """Create DVR mac address for a given host."""
base_mac = cfg.CONF.dvr_base_mac.split(':')
max_retries = cfg.CONF.mac_generation_retries
for attempt in reversed(range(max_retries)):
LOG.debug("Generated DVR mac for host %(host)s "
"is %(mac_address)s",
{'host': host, 'mac_address': mac_address})
+ dvr_macs = self.get_dvr_mac_address_list(context)
+ # TODO(vivek): improve scalability of this fanout by
+ # sending a single mac address rather than the entire set
+ self.notifier.dvr_mac_address_update(context, dvr_macs)
return self._make_dvr_mac_address_dict(dvr_mac_binding)
except db_exc.DBDuplicateEntry:
LOG.debug("Generated DVR mac %(mac)s exists."
LOG.debug(_("Checking router: %(id)s for host: %(host)s"),
{'id': router['id'], 'host': host})
self._ensure_host_set_on_port(context, plugin, host,
- router.get('gw_port'))
+ router.get('gw_port'),
+ router['id'])
for interface in router.get(constants.INTERFACE_KEY, []):
self._ensure_host_set_on_port(context, plugin, host,
- interface)
+ interface, router['id'])
- def _ensure_host_set_on_port(self, context, plugin, host, port):
+ def _ensure_host_set_on_port(self, context, plugin, host, port,
+ router_id=None):
if (port and
- (port.get(portbindings.HOST_ID) != host or
+ (port.get('device_owner') !=
+ constants.DEVICE_OWNER_DVR_INTERFACE and
+ port.get(portbindings.HOST_ID) != host or
port.get(portbindings.VIF_TYPE) ==
portbindings.VIF_TYPE_BINDING_FAILED)):
+ # All ports, including ports created for SNAT'ing for
+ # DVR are handled here
plugin.update_port(context, port['id'],
{'port': {portbindings.HOST_ID: host}})
+ elif (port and
+ port.get('device_owner') ==
+ constants.DEVICE_OWNER_DVR_INTERFACE):
+ # Ports that are DVR interfaces have multiple bindings (based on
+ # of hosts on which DVR router interfaces are spawned). Such
+ # bindings are created/updated here by invoking
+ # update_dvr_port_binding
+ plugin.update_dvr_port_binding(context, port['id'],
+ {'port':
+ {portbindings.HOST_ID: host,
+ 'device_id': router_id}
+ })
def get_external_network_id(self, context, **kwargs):
"""Get one external network id for l3 agent.
return self._bind_port_if_needed(port_context)
- def update_port_status(self, context, port_id, status):
+ def update_port_status(self, context, port_id, status, host=None):
updated = False
session = context.session
# REVISIT: Serialize this operation with a semaphore to
return True
- def port_bound_to_host(self, port_id, host):
+ def port_bound_to_host(self, context, port_id, host):
port_host = db.get_port_binding_host(port_id)
return (port_host == host)
# under the License.
from neutron.agent import securitygroups_rpc as sg_rpc
+from neutron.api.rpc.handlers import dvr_rpc
from neutron.common import constants as q_const
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron import manager
from neutron.openstack.common import log
from neutron.openstack.common import uuidutils
+from neutron.plugins.common import constants as service_constants
from neutron.plugins.ml2 import db
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2.drivers import type_tunnel
class RpcCallbacks(n_rpc.RpcCallback,
dhcp_rpc_base.DhcpRpcCallbackMixin,
+ dvr_rpc.DVRServerRpcCallbackMixin,
sg_db_rpc.SecurityGroupServerRpcCallbackMixin,
type_tunnel.TunnelRpcCallbackMixin):
- RPC_API_VERSION = '1.2'
+ RPC_API_VERSION = '1.3'
# history
# 1.0 Initial version (from openvswitch/linuxbridge)
# 1.1 Support Security Group RPC
# 1.2 Support get_devices_details_list
+ # 1.3 Support Distributed Virtual Router (DVR)
def __init__(self, notifier, type_manager):
self.setup_tunnel_callback_mixin(notifier, type_manager)
"""Agent requests device details."""
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
- LOG.debug(_("Device %(device)s details requested by agent "
- "%(agent_id)s"),
- {'device': device, 'agent_id': agent_id})
+ host = kwargs.get('host')
+ LOG.debug("Device %(device)s details requested by agent "
+ "%(agent_id)s with host %(host)s",
+ {'device': device, 'agent_id': agent_id, 'host': host})
port_id = self._device_to_port_id(device)
plugin = manager.NeutronManager.get_plugin()
if port['status'] != new_status:
plugin.update_port_status(rpc_context,
port_id,
- new_status)
+ new_status,
+ host)
entry = {'device': device,
'network_id': port['network_id'],
'admin_state_up': port['admin_state_up'],
'network_type': segment[api.NETWORK_TYPE],
'segmentation_id': segment[api.SEGMENTATION_ID],
- 'physical_network': segment[api.PHYSICAL_NETWORK]}
+ 'physical_network': segment[api.PHYSICAL_NETWORK],
+ 'fixed_ips': port['fixed_ips'],
+ 'device_owner': port['device_owner']}
LOG.debug(_("Returning: %s"), entry)
return entry
plugin = manager.NeutronManager.get_plugin()
port_id = self._device_to_port_id(device)
port_exists = True
- if (host and not plugin.port_bound_to_host(port_id, host)):
+ if (host and not plugin.port_bound_to_host(rpc_context,
+ port_id, host)):
LOG.debug(_("Device %(device)s not bound to the"
" agent host %(host)s"),
{'device': device, 'host': host})
'exists': port_exists}
port_exists = plugin.update_port_status(rpc_context, port_id,
- q_const.PORT_STATUS_DOWN)
+ q_const.PORT_STATUS_DOWN,
+ host)
return {'device': device,
'exists': port_exists}
{'device': device, 'agent_id': agent_id})
plugin = manager.NeutronManager.get_plugin()
port_id = self._device_to_port_id(device)
- if (host and not plugin.port_bound_to_host(port_id, host)):
+ if (host and not plugin.port_bound_to_host(rpc_context,
+ port_id, host)):
LOG.debug(_("Device %(device)s not bound to the"
" agent host %(host)s"),
{'device': device, 'host': host})
return
plugin.update_port_status(rpc_context, port_id,
- q_const.PORT_STATUS_ACTIVE)
+ q_const.PORT_STATUS_ACTIVE,
+ host)
+ l3plugin = manager.NeutronManager.get_service_plugins().get(
+ service_constants.L3_ROUTER_NAT)
+ if l3plugin:
+ l3plugin.dvr_vmarp_table_update(rpc_context, port_id, "add")
+
+ def get_dvr_mac_address_by_host(self, rpc_context, **kwargs):
+ host = kwargs.get('host')
+ LOG.debug("DVR Agent requests mac_address for host %s", host)
+ return super(RpcCallbacks, self).get_dvr_mac_address_by_host(
+ rpc_context, host)
+
+ def get_compute_ports_on_host_by_subnet(self, rpc_context, **kwargs):
+ host = kwargs.get('host')
+ subnet = kwargs.get('subnet')
+ LOG.debug("DVR Agent requests list of VM ports on host %s", host)
+ return super(RpcCallbacks, self).get_compute_ports_on_host_by_subnet(
+ rpc_context, host, subnet)
+
+ def get_subnet_for_dvr(self, rpc_context, **kwargs):
+ subnet = kwargs.get('subnet')
+ return super(RpcCallbacks, self).get_subnet_for_dvr(rpc_context,
+ subnet)
class AgentNotifierApi(n_rpc.RpcProxy,
+ dvr_rpc.DVRAgentRpcApiMixin,
sg_rpc.SecurityGroupAgentRpcApiMixin,
type_tunnel.TunnelAgentRpcApiMixin):
"""Agent side of the openvswitch rpc API.
rpcapi, topics.PLUGIN,
'get_device_details', rpc_method='call',
device='fake_device',
- agent_id='fake_agent_id')
+ agent_id='fake_agent_id',
+ host='fake_host')
def test_devices_details_list(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
rpcapi, topics.PLUGIN,
'get_devices_details_list', rpc_method='call',
devices=['fake_device1', 'fake_device2'],
- agent_id='fake_agent_id',
- version='1.2')
+ agent_id='fake_agent_id', host='fake_host',
+ version='1.3')
def test_update_device_down(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_lb_api(rpcapi, topics.PLUGIN,
'get_device_details', rpc_method='call',
device='fake_device',
- agent_id='fake_agent_id')
+ agent_id='fake_agent_id',
+ host='fake_host')
def test_devices_details_list(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_lb_api(rpcapi, topics.PLUGIN,
'get_devices_details_list', rpc_method='call',
devices=['fake_device1', 'fake_device2'],
- agent_id='fake_agent_id',
- version='1.2')
+ agent_id='fake_agent_id', host='fake_host',
+ version='1.3')
def test_update_device_down(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_rpc_api(rpcapi, topics.PLUGIN,
'get_device_details', rpc_method='call',
device='fake_device',
- agent_id='fake_agent_id')
+ agent_id='fake_agent_id',
+ host='fake_host')
def test_devices_details_list(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_rpc_api(rpcapi, topics.PLUGIN,
'get_devices_details_list', rpc_method='call',
devices=['fake_device1', 'fake_device2'],
- agent_id='fake_agent_id',
- version='1.2')
+ agent_id='fake_agent_id', host='fake_host',
+ version='1.3')
def test_update_device_down(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_mlnx_api(rpcapi, topics.PLUGIN,
'get_device_details', rpc_method='call',
device='fake_device',
- agent_id='fake_agent_id')
+ agent_id='fake_agent_id',
+ host='fake_host')
def test_devices_details_list(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_mlnx_api(rpcapi, topics.PLUGIN,
'get_devices_details_list', rpc_method='call',
devices=['fake_device1', 'fake_device1'],
- agent_id='fake_agent_id',
- version='1.2')
+ agent_id='fake_agent_id', host='fake_host',
+ version='1.3')
def test_update_device_down(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_ovs_api(rpcapi, topics.PLUGIN,
'get_device_details', rpc_method='call',
device='fake_device',
- agent_id='fake_agent_id')
+ agent_id='fake_agent_id',
+ host='fake_host')
def test_devices_details_list(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_ovs_api(rpcapi, topics.PLUGIN,
'get_devices_details_list', rpc_method='call',
devices=['fake_device1', 'fake_device2'],
- agent_id='fake_agent_id',
- version='1.2')
+ agent_id='fake_agent_id', host='fake_host',
+ version='1.3')
def test_update_device_down(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)