1.3 - get_device_details rpc signature upgrade to obtain 'host' and
return value to include fixed_ips and device_owner for
the device port
+ 1.4 - tunnel_sync rpc signature upgrade to obtain 'host'
'''
def __init__(self, topic):
return cctxt.call(context, 'update_device_up', device=device,
agent_id=agent_id, host=host)
- def tunnel_sync(self, context, tunnel_ip, tunnel_type=None):
- cctxt = self.client.prepare()
- return cctxt.call(context, 'tunnel_sync', tunnel_ip=tunnel_ip,
- tunnel_type=tunnel_type)
+ def tunnel_sync(self, context, tunnel_ip, tunnel_type=None, host=None):
+ try:
+ cctxt = self.client.prepare(version='1.4')
+ res = cctxt.call(context, 'tunnel_sync', tunnel_ip=tunnel_ip,
+ tunnel_type=tunnel_type, host=host)
+ except oslo_messaging.UnsupportedVersion:
+ LOG.warn(_LW('Tunnel synchronization requires a server upgrade.'))
+ cctxt = self.client.prepare()
+ res = cctxt.call(context, 'tunnel_sync', tunnel_ip=tunnel_ip,
+ tunnel_type=tunnel_type)
+ return res
cctxt.cast(context, 'tunnel_update', tunnel_ip=tunnel_ip,
tunnel_type=tunnel_type)
- # TODO(romilg): Add tunnel_delete rpc in dependent patch-set
+ def _get_tunnel_delete_topic(self):
+ return topics.get_topic_name(self.topic,
+ TUNNEL,
+ topics.DELETE)
+
+ def tunnel_delete(self, context, tunnel_ip, tunnel_type):
+ cctxt = self.client.prepare(topic=self._get_tunnel_delete_topic(),
+ fanout=True)
+ cctxt.cast(context, 'tunnel_delete', tunnel_ip=tunnel_ip,
+ tunnel_type=tunnel_type)
# 1.3 get_device_details rpc signature upgrade to obtain 'host' and
# return value to include fixed_ips and device_owner for
# the device port
- target = oslo_messaging.Target(version='1.3')
+ # 1.4 tunnel_sync rpc signature upgrade to obtain 'host'
+ target = oslo_messaging.Target(version='1.4')
def __init__(self, notifier, type_manager):
self.setup_tunnel_callback_mixin(notifier, type_manager)
for tunnel_type in self.tunnel_types:
self.plugin_rpc.tunnel_sync(self.context,
self.local_ip,
- tunnel_type)
+ tunnel_type,
+ cfg.CONF.host)
except Exception as e:
LOG.debug("Unable to sync tunnel IP %(local_ip)s: %(e)s",
{'local_ip': self.local_ip, 'e': e})
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE],
[constants.TUNNEL, topics.UPDATE],
+ [constants.TUNNEL, topics.DELETE],
[topics.SECURITY_GROUP, topics.UPDATE],
[topics.DVR, topics.UPDATE]]
if self.l2_pop:
self._setup_tunnel_port(self.tun_br, tun_name, tunnel_ip,
tunnel_type)
+ def tunnel_delete(self, context, **kwargs):
+ LOG.debug("tunnel_delete received")
+ if not self.enable_tunneling:
+ return
+ tunnel_ip = kwargs.get('tunnel_ip')
+ if not tunnel_ip:
+ LOG.error(_LE("No tunnel_ip specified, cannot delete tunnels"))
+ return
+ tunnel_type = kwargs.get('tunnel_type')
+ if not tunnel_type:
+ LOG.error(_LE("No tunnel_type specified, cannot delete tunnels"))
+ return
+ if tunnel_type not in self.tunnel_types:
+ LOG.error(_LE("tunnel_type %s not supported by agent"),
+ tunnel_type)
+ return
+ ofport = self.tun_br_ofports[tunnel_type].get(tunnel_ip)
+ self.cleanup_tunnel_port(self.tun_br, ofport, tunnel_type)
+
def fdb_add(self, context, fdb_entries):
LOG.debug("fdb_add received")
for lvm, agent_ports in self.get_agent_ports(fdb_entries,
try:
return '%08x' % netaddr.IPAddress(ip_address, version=4)
except Exception:
- LOG.warn(_LW("Unable to create tunnel port. "
- "Invalid remote IP: %s"), ip_address)
+ LOG.warn(_LW("Invalid remote IP: %s"), ip_address)
return
def tunnel_sync(self):
for tunnel_type in self.tunnel_types:
details = self.plugin_rpc.tunnel_sync(self.context,
self.local_ip,
- tunnel_type)
+ tunnel_type,
+ cfg.CONF.host)
if not self.l2_pop:
tunnels = details['tunnels']
for tunnel in tunnels:
rpcapi, None,
'tunnel_sync', rpc_method='call',
tunnel_ip='fake_tunnel_ip',
- tunnel_type=None)
+ tunnel_type=None,
+ host='fake_host',
+ version='1.4')
from neutron.common import exceptions
from neutron.common import topics
from neutron.plugins.ml2.drivers import type_tunnel
+from neutron.plugins.ml2 import managers
from neutron.plugins.ml2 import rpc as plugin_rpc
from neutron.tests import base
def setUp(self):
super(RpcCallbacksTestCase, self).setUp()
- self.callbacks = plugin_rpc.RpcCallbacks(mock.Mock(), mock.Mock())
+ self.type_manager = managers.TypeManager()
+ self.notifier = plugin_rpc.AgentNotifierApi(topics.AGENT)
+ self.callbacks = plugin_rpc.RpcCallbacks(self.notifier,
+ self.type_manager)
self.manager = mock.patch.object(
plugin_rpc.manager, 'NeutronManager').start()
self.l3plugin = mock.Mock()
fanout=True,
tunnel_ip='fake_ip', tunnel_type='gre')
+ def test_tunnel_delete(self):
+ rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT)
+ self._test_rpc_api(
+ rpcapi,
+ topics.get_topic_name(topics.AGENT,
+ type_tunnel.TUNNEL,
+ topics.DELETE),
+ 'tunnel_delete', rpc_method='cast',
+ fanout=True,
+ tunnel_ip='fake_ip', tunnel_type='gre')
+
def test_device_details(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_rpc_api(rpcapi, None,
self._test_rpc_api(rpcapi, None,
'tunnel_sync', rpc_method='call',
tunnel_ip='fake_tunnel_ip',
- tunnel_type=None)
+ tunnel_type=None,
+ host='fake_host',
+ version='1.4')
def test_update_device_up(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers import type_gre
+from neutron.tests.unit.ml2 import test_rpcapi
from neutron.tests.unit.ml2 import test_type_tunnel
from neutron.tests.unit import testlib_api
class GreTypeMultiRangeTest(test_type_tunnel.TunnelTypeMultiRangeTestMixin,
- testlib_api.SqlTestCase):
+ testlib_api.SqlTestCase):
DRIVER_CLASS = type_gre.GreTypeDriver
+
+
+class GreTypeRpcCallbackTest(test_type_tunnel.TunnelRpcCallbackTestMixin,
+ test_rpcapi.RpcCallbacksTestCase,
+ testlib_api.SqlTestCase):
+ DRIVER_CLASS = type_gre.GreTypeDriver
+ TYPE = p_const.TYPE_GRE
# See the License for the specific language governing permissions and
# limitations under the License.
+import contextlib
+import mock
from six import moves
import testtools
from testtools import matchers
from neutron.db import api as db
from neutron.plugins.ml2 import driver_api as api
+TUNNEL_IP_ONE = "10.10.10.10"
+TUNNEL_IP_TWO = "10.10.10.20"
+HOST_ONE = 'fake_host_one'
+HOST_TWO = 'fake_host_two'
TUN_MIN = 100
TUN_MAX = 109
TUNNEL_RANGES = [(TUN_MIN, TUN_MAX)]
self.TUN_MIN1, self.TUN_MAX1):
alloc = self.driver.get_allocation(self.session, key)
self.assertFalse(alloc.allocated)
+
+
+class TunnelRpcCallbackTestMixin(object):
+
+ DRIVER_CLASS = None
+ TYPE = None
+
+ def setUp(self):
+ super(TunnelRpcCallbackTestMixin, self).setUp()
+ self.driver = self.DRIVER_CLASS()
+
+ def _test_tunnel_sync(self, kwargs, delete_tunnel=False):
+ with contextlib.nested(
+ mock.patch.object(self.notifier, 'tunnel_update'),
+ mock.patch.object(self.notifier, 'tunnel_delete')
+ ) as (tunnel_update, tunnel_delete):
+ details = self.callbacks.tunnel_sync('fake_context', **kwargs)
+ tunnels = details['tunnels']
+ for tunnel in tunnels:
+ self.assertEqual(kwargs['tunnel_ip'], tunnel['ip_address'])
+ self.assertEqual(kwargs['host'], tunnel['host'])
+ self.assertTrue(tunnel_update.called)
+ if delete_tunnel:
+ self.assertTrue(tunnel_delete.called)
+ else:
+ self.assertFalse(tunnel_delete.called)
+
+ def _test_tunnel_sync_raises(self, kwargs):
+ with contextlib.nested(
+ mock.patch.object(self.notifier, 'tunnel_update'),
+ mock.patch.object(self.notifier, 'tunnel_delete')
+ ) as (tunnel_update, tunnel_delete):
+ self.assertRaises(exc.InvalidInput,
+ self.callbacks.tunnel_sync,
+ 'fake_context', **kwargs)
+ self.assertFalse(tunnel_update.called)
+ self.assertFalse(tunnel_delete.called)
+
+ def test_tunnel_sync_called_without_host_passed(self):
+ kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE,
+ 'host': None}
+ self._test_tunnel_sync(kwargs)
+
+ def test_tunnel_sync_called_with_host_passed_for_existing_tunnel_ip(self):
+ self.driver.add_endpoint(TUNNEL_IP_ONE, None)
+
+ kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE,
+ 'host': HOST_ONE}
+ self._test_tunnel_sync(kwargs)
+
+ def test_tunnel_sync_called_with_host_passed(self):
+ kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE,
+ 'host': HOST_ONE}
+ self._test_tunnel_sync(kwargs)
+
+ def test_tunnel_sync_called_for_existing_endpoint(self):
+ self.driver.add_endpoint(TUNNEL_IP_ONE, HOST_ONE)
+
+ kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE,
+ 'host': HOST_ONE}
+ self._test_tunnel_sync(kwargs)
+
+ def test_tunnel_sync_called_for_existing_host_with_tunnel_ip_changed(self):
+ self.driver.add_endpoint(TUNNEL_IP_ONE, HOST_ONE)
+
+ kwargs = {'tunnel_ip': TUNNEL_IP_TWO, 'tunnel_type': self.TYPE,
+ 'host': HOST_ONE}
+ self._test_tunnel_sync(kwargs, True)
+
+ def test_tunnel_sync_called_with_used_tunnel_ip_case_one(self):
+ self.driver.add_endpoint(TUNNEL_IP_ONE, HOST_ONE)
+
+ kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE,
+ 'host': HOST_TWO}
+ self._test_tunnel_sync_raises(kwargs)
+
+ def test_tunnel_sync_called_with_used_tunnel_ip_case_two(self):
+ self.driver.add_endpoint(TUNNEL_IP_ONE, None)
+ self.driver.add_endpoint(TUNNEL_IP_TWO, HOST_TWO)
+
+ kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE,
+ 'host': HOST_TWO}
+ self._test_tunnel_sync_raises(kwargs)
+
+ def test_tunnel_sync_called_without_tunnel_ip(self):
+ kwargs = {'tunnel_type': self.TYPE, 'host': None}
+ self._test_tunnel_sync_raises(kwargs)
+
+ def test_tunnel_sync_called_without_tunnel_type(self):
+ kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'host': None}
+ self._test_tunnel_sync_raises(kwargs)
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers import type_vxlan
+from neutron.tests.unit.ml2 import test_rpcapi
from neutron.tests.unit.ml2 import test_type_tunnel
from neutron.tests.unit import testlib_api
class VxlanTypeMultiRangeTest(test_type_tunnel.TunnelTypeMultiRangeTestMixin,
testlib_api.SqlTestCase):
DRIVER_CLASS = type_vxlan.VxlanTypeDriver
+
+
+class VxlanTypeRpcCallbackTest(test_type_tunnel.TunnelRpcCallbackTestMixin,
+ test_rpcapi.RpcCallbacksTestCase,
+ testlib_api.SqlTestCase):
+ DRIVER_CLASS = type_vxlan.VxlanTypeDriver
+ TYPE = p_const.TYPE_VXLAN
self.agent.local_ip = 'agent_ip'
self.agent.context = 'fake_context'
self.agent.tunnel_types = ['vxlan']
+ self.agent.host = cfg.CONF.host
with mock.patch.object(
self.agent.plugin_rpc, 'tunnel_sync'
) as tunnel_sync_rpc_fn:
tunnel_sync_rpc_fn.assert_called_once_with(
self.agent.context,
self.agent.local_ip,
- self.agent.tunnel_types[0])
+ self.agent.tunnel_types[0],
+ self.agent.host)
def test__get_ports(self):
ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser')
mock.call(self.agent.tun_br, 'gre-0a0a0a0a', '10.10.10.10', 'gre')]
self.agent._setup_tunnel_port.assert_has_calls(expected_calls)
+ def test_tunnel_delete(self):
+ kwargs = {'tunnel_ip': '10.10.10.10',
+ 'tunnel_type': 'gre'}
+ self.agent.enable_tunneling = True
+ self.agent.tunnel_types = ['gre']
+ self.agent.tun_br_ofports = {'gre': {'10.10.10.10': '1'}}
+ with mock.patch.object(
+ self.agent, 'cleanup_tunnel_port'
+ ) as clean_tun_fn:
+ self.agent.tunnel_delete(context=None, **kwargs)
+ self.assertTrue(clean_tun_fn.called)
+
def test_ovs_status(self):
reply2 = {'current': set(['tap0']),
'added': set(['tap2']),