import os
import random
+from neutron.agent.linux import ovs_lib
from neutron.agent.linux import utils
from neutron.plugins.common import constants as q_const
from neutron.tests import base
+BR_PREFIX = 'test-br'
+
+
class BaseLinuxTestCase(base.BaseTestCase):
+ def setUp(self, root_helper='sudo'):
+ super(BaseLinuxTestCase, self).setUp()
+
+ self.root_helper = root_helper
def check_command(self, cmd, error_text, skip_msg):
try:
name = prefix + str(random.randint(1, 0x7fffffff))
return name[:max_length]
- def create_ovs_resource(self, name_prefix, creation_func):
- """Create a new ovs resource that does not already exist.
+ def create_resource(self, name_prefix, creation_func, *args, **kwargs):
+ """Create a new resource that does not already exist.
:param name_prefix: The prefix for a randomly generated name
:param creation_func: A function taking the name of the resource
- to be created. An error is assumed to indicate a name
- collision.
+ to be created as it's first argument. An error is assumed
+ to indicate a name collision.
+ :param *args *kwargs: These will be passed to the create function.
"""
while True:
name = self.get_rand_name(q_const.MAX_DEV_NAME_LEN, name_prefix)
try:
- return creation_func(name)
+ return creation_func(name, *args, **kwargs)
except RuntimeError:
continue
+
+
+class BaseOVSLinuxTestCase(BaseLinuxTestCase):
+ def setUp(self, root_helper='sudo'):
+ super(BaseOVSLinuxTestCase, self).setUp(root_helper)
+ self.ovs = ovs_lib.BaseOVS(self.root_helper)
+
+ def create_ovs_bridge(self, br_prefix=BR_PREFIX):
+ br = self.create_resource(br_prefix, self.ovs.add_bridge)
+ self.addCleanup(br.destroy)
+ return br
--- /dev/null
+# Copyright 2014 Cisco Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from neutron.agent.linux import ovs_lib
+from neutron.plugins.common import constants as p_const
+from neutron.tests.functional.agent.linux import base as base_agent
+
+
+PORT_PREFIX = 'testp-'
+INVALID_OFPORT_ID = '-1'
+
+
+class TestOVSAgentVXLAN(base_agent.BaseOVSLinuxTestCase):
+
+ def setUp(self):
+ super(TestOVSAgentVXLAN, self).setUp()
+
+ self._check_test_requirements()
+
+ def _check_test_requirements(self):
+ self.check_sudo_enabled()
+ self.check_command(['which', 'ovs-vsctl'],
+ 'Exit code: 1', 'ovs-vsctl is not installed')
+ self.check_command(['sudo', '-n', 'ovs-vsctl', 'show'],
+ 'Exit code: 1',
+ 'password-less sudo not granted for ovs-vsctl')
+
+ def test_ovs_lib_vxlan_version_check(self):
+ """Verify VXLAN versions match
+
+ This function compares the return values of functionally checking if
+ VXLAN is supported with the ovs_lib programmatic check. It will fail
+ if the two do not align.
+ """
+ expected = self.is_vxlan_supported()
+ actual = self.is_ovs_lib_vxlan_supported()
+ self.assertEqual(actual, expected)
+
+ def is_ovs_lib_vxlan_supported(self):
+ try:
+ ovs_lib.check_ovs_vxlan_version(self.root_helper)
+ except SystemError:
+ return False
+ else:
+ return True
+
+ def is_vxlan_supported(self):
+ bridge = self.create_ovs_bridge()
+ vxlan_port = self.create_resource(
+ PORT_PREFIX,
+ bridge.add_tunnel_port,
+ "10.10.10.10",
+ "10.10.10.20",
+ p_const.TYPE_VXLAN)
+
+ return vxlan_port != INVALID_OFPORT_ID
import eventlet
-from neutron.agent.linux import ovs_lib
from neutron.agent.linux import ovsdb_monitor
from neutron.tests.functional.agent.linux import base as base_agent
-class BaseMonitorTest(base_agent.BaseLinuxTestCase):
+class BaseMonitorTest(base_agent.BaseOVSLinuxTestCase):
def setUp(self):
- super(BaseMonitorTest, self).setUp()
-
- self._check_test_requirements()
-
# Emulate using a rootwrap script with sudo
- self.root_helper = 'sudo sudo'
- self.ovs = ovs_lib.BaseOVS(self.root_helper)
- self.bridge = self.create_ovs_resource('test-br-', self.ovs.add_bridge)
+ super(BaseMonitorTest, self).setUp(root_helper='sudo sudo')
- def cleanup_bridge():
- self.bridge.destroy()
- self.addCleanup(cleanup_bridge)
+ self._check_test_requirements()
+ self.bridge = self.create_ovs_bridge()
def _check_test_requirements(self):
self.check_sudo_enabled()
self.check_command(['which', 'ovsdb-client'],
- 'Exit code: 1',
- 'ovsdb-client is not installed')
+ 'Exit code: 1', 'ovsdb-client is not installed')
self.check_command(['sudo', '-n', 'ovsdb-client', 'list-dbs'],
'Exit code: 1',
'password-less sudo not granted for ovsdb-client')
'Initial call should always be true')
self.assertFalse(self.monitor.has_updates,
'has_updates without port addition should be False')
- self.create_ovs_resource('test-port-', self.bridge.add_port)
+ self.create_resource('test-port-', self.bridge.add_port)
with self.assert_max_execution_time():
# has_updates after port addition should become True
while not self.monitor.has_updates: