return kwargs
+def validate_local_ip(local_ip):
+ """Verify if the ip exists on the agent's host."""
+ if not ip_lib.IPWrapper().get_device_by_ip(local_ip):
+ LOG.error(_LE("Tunneling can't be enabled with invalid local_ip '%s'."
+ " IP couldn't be found on this host's interfaces."),
+ local_ip)
+ raise SystemExit(1)
+
+
def prepare_xen_compute():
is_xen_compute_host = 'rootwrap-xen-dom0' in cfg.CONF.AGENT.root_helper
if is_xen_compute_host:
LOG.exception(_LE("Agent failed to create agent config map"))
raise SystemExit(1)
prepare_xen_compute()
+ validate_local_ip(agent_config['local_ip'])
try:
agent = OVSNeutronAgent(bridge_classes, **agent_config)
except (RuntimeError, ValueError) as e:
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants
from neutron.plugins.ml2.drivers.openvswitch.agent import ovs_neutron_agent \
as ovs_agent
+from neutron.tests import base
from neutron.tests.unit.plugins.ml2.drivers.openvswitch.agent \
import ovs_test_base
class TestOvsDvrNeutronAgentOFCtl(TestOvsDvrNeutronAgent,
ovs_test_base.OVSOFCtlTestBase):
pass
+
+
+class TestValidateTunnelLocalIP(base.BaseTestCase):
+ def test_validate_local_ip_with_valid_ip(self):
+ mock_get_device_by_ip = mock.patch.object(
+ ip_lib.IPWrapper, 'get_device_by_ip').start()
+ ovs_agent.validate_local_ip(FAKE_IP1)
+ mock_get_device_by_ip.assert_called_once_with(FAKE_IP1)
+
+ def test_validate_local_ip_with_invalid_ip(self):
+ mock_get_device_by_ip = mock.patch.object(
+ ip_lib.IPWrapper, 'get_device_by_ip').start()
+ mock_get_device_by_ip.return_value = None
+ with testtools.ExpectedException(SystemExit):
+ ovs_agent.validate_local_ip(FAKE_IP1)
+ mock_get_device_by_ip.assert_called_once_with(FAKE_IP1)