# License for the specific language governing permissions and limitations
# under the License.
+import mock
import random
import string
import warnings
"always", category=wtype, module='^neutron\\.')
+class OpenFixture(fixtures.Fixture):
+ """Mock access to a specific file while preserving open for others."""
+
+ def __init__(self, filepath, contents=''):
+ self.path = filepath
+ self.contents = contents
+
+ def _setUp(self):
+ self.mock_open = mock.mock_open(read_data=self.contents)
+ self._orig_open = open
+
+ def replacement_open(name, *args, **kwargs):
+ if name == self.path:
+ return self.mock_open(name, *args, **kwargs)
+ return self._orig_open(name, *args, **kwargs)
+
+ self._patch = mock.patch('six.moves.builtins.open',
+ new=replacement_open)
+ self._patch.start()
+ self.addCleanup(self._patch.stop)
+
+
class SafeCleanupFixture(fixtures.Fixture):
"""Catch errors in daughter fixture cleanup."""
from neutron.agent.linux import daemon
from neutron.common import exceptions
from neutron.tests import base
+from neutron.tests import tools
FAKE_FD = 8
self.assertEqual(34, p.read())
def test_is_running(self):
- with mock.patch('six.moves.builtins.open') as mock_open:
- p = daemon.Pidfile('thefile', 'python')
- mock_open.return_value.__enter__ = lambda s: s
- mock_open.return_value.__exit__ = mock.Mock()
- mock_open.return_value.readline.return_value = 'python'
+ mock_open = self.useFixture(
+ tools.OpenFixture('/proc/34/cmdline', 'python')).mock_open
+ p = daemon.Pidfile('thefile', 'python')
- with mock.patch.object(p, 'read') as read:
- read.return_value = 34
- self.assertTrue(p.is_running())
+ with mock.patch.object(p, 'read') as read:
+ read.return_value = 34
+ self.assertTrue(p.is_running())
- mock_open.assert_called_once_with('/proc/34/cmdline', 'r')
+ mock_open.assert_called_once_with('/proc/34/cmdline', 'r')
def test_is_running_uuid_true(self):
- with mock.patch('six.moves.builtins.open') as mock_open:
- p = daemon.Pidfile('thefile', 'python', uuid='1234')
- mock_open.return_value.__enter__ = lambda s: s
- mock_open.return_value.__exit__ = mock.Mock()
- mock_open.return_value.readline.return_value = 'python 1234'
+ mock_open = self.useFixture(
+ tools.OpenFixture('/proc/34/cmdline', 'python 1234')).mock_open
+ p = daemon.Pidfile('thefile', 'python', uuid='1234')
- with mock.patch.object(p, 'read') as read:
- read.return_value = 34
- self.assertTrue(p.is_running())
+ with mock.patch.object(p, 'read') as read:
+ read.return_value = 34
+ self.assertTrue(p.is_running())
- mock_open.assert_called_once_with('/proc/34/cmdline', 'r')
+ mock_open.assert_called_once_with('/proc/34/cmdline', 'r')
def test_is_running_uuid_false(self):
- with mock.patch('six.moves.builtins.open') as mock_open:
- p = daemon.Pidfile('thefile', 'python', uuid='6789')
- mock_open.return_value.__enter__ = lambda s: s
- mock_open.return_value.__exit__ = mock.Mock()
- mock_open.return_value.readline.return_value = 'python 1234'
+ mock_open = self.useFixture(
+ tools.OpenFixture('/proc/34/cmdline', 'python 1234')).mock_open
+ p = daemon.Pidfile('thefile', 'python', uuid='6789')
- with mock.patch.object(p, 'read') as read:
- read.return_value = 34
- self.assertFalse(p.is_running())
+ with mock.patch.object(p, 'read') as read:
+ read.return_value = 34
+ self.assertFalse(p.is_running())
- mock_open.assert_called_once_with('/proc/34/cmdline', 'r')
+ mock_open.assert_called_once_with('/proc/34/cmdline', 'r')
class TestDaemon(base.BaseTestCase):
from neutron.common import utils
from neutron.extensions import extra_dhcp_opt as edo_ext
from neutron.tests import base
+from neutron.tests import tools
LOG = logging.getLogger(__name__)
parent.assert_has_calls(expected)
def test_get_interface_name(self):
- with mock.patch('six.moves.builtins.open') as mock_open:
- mock_open.return_value.__enter__ = lambda s: s
- mock_open.return_value.__exit__ = mock.Mock()
- mock_open.return_value.read.return_value = 'tap0'
- lp = LocalChild(self.conf, FakeDualNetwork())
- self.assertEqual(lp.interface_name, 'tap0')
+ net = FakeDualNetwork()
+ path = '/dhcp/%s/interface' % net.id
+ self.useFixture(tools.OpenFixture(path, 'tap0'))
+ lp = LocalChild(self.conf, net)
+ self.assertEqual(lp.interface_name, 'tap0')
def test_set_interface_name(self):
with mock.patch('neutron.agent.linux.utils.replace_file') as replace:
exp_addn_name, exp_addn_data,
exp_opt_name, exp_opt_data,) = self._test_reload_allocation_data
- with mock.patch('six.moves.builtins.open') as mock_open:
- mock_open.return_value.__enter__ = lambda s: s
- mock_open.return_value.__exit__ = mock.Mock()
- mock_open.return_value.readline.return_value = None
-
- test_pm = mock.Mock()
- dm = self._get_dnsmasq(FakeDualNetwork(), test_pm)
- dm.reload_allocations()
- self.assertTrue(test_pm.register.called)
- self.external_process().enable.assert_called_once_with(
- reload_cfg=True)
-
- self.safe.assert_has_calls([
- mock.call(exp_host_name, exp_host_data),
- mock.call(exp_addn_name, exp_addn_data),
- mock.call(exp_opt_name, exp_opt_data),
- ])
+ net = FakeDualNetwork()
+ hpath = '/dhcp/%s/host' % net.id
+ ipath = '/dhcp/%s/interface' % net.id
+ self.useFixture(tools.OpenFixture(hpath))
+ self.useFixture(tools.OpenFixture(ipath))
+ test_pm = mock.Mock()
+ dm = self._get_dnsmasq(net, test_pm)
+ dm.reload_allocations()
+ self.assertTrue(test_pm.register.called)
+ self.external_process().enable.assert_called_once_with(
+ reload_cfg=True)
+
+ self.safe.assert_has_calls([
+ mock.call(exp_host_name, exp_host_data),
+ mock.call(exp_addn_name, exp_addn_data),
+ mock.call(exp_opt_name, exp_opt_data),
+ ])
def test_release_unused_leases(self):
dnsmasq = self._get_dnsmasq(FakeDualNetwork())
def test_read_hosts_file_leases(self):
filename = '/path/to/file'
- with mock.patch('six.moves.builtins.open') as mock_open:
- mock_open.return_value.__enter__ = lambda s: s
- mock_open.return_value.__exit__ = mock.Mock()
- lines = ["00:00:80:aa:bb:cc,inst-name,192.168.0.1",
- "00:00:80:aa:bb:cc,inst-name,[fdca:3ba5:a17a::1]"]
- mock_open.return_value.readlines.return_value = lines
-
- dnsmasq = self._get_dnsmasq(FakeDualNetwork())
- leases = dnsmasq._read_hosts_file_leases(filename)
+ lines = ["00:00:80:aa:bb:cc,inst-name,192.168.0.1",
+ "00:00:80:aa:bb:cc,inst-name,[fdca:3ba5:a17a::1]"]
+ mock_open = self.useFixture(
+ tools.OpenFixture(filename, '\n'.join(lines))).mock_open
+ dnsmasq = self._get_dnsmasq(FakeDualNetwork())
+ leases = dnsmasq._read_hosts_file_leases(filename)
self.assertEqual(set([("192.168.0.1", "00:00:80:aa:bb:cc", None),
("fdca:3ba5:a17a::1", "00:00:80:aa:bb:cc",
def test_read_hosts_file_leases_with_client_id(self):
filename = '/path/to/file'
- with mock.patch('six.moves.builtins.open') as mock_open:
- mock_open.return_value.__enter__ = lambda s: s
- mock_open.return_value.__exit__ = mock.Mock()
- lines = ["00:00:80:aa:bb:cc,id:client1,inst-name,192.168.0.1",
- "00:00:80:aa:bb:cc,id:client2,inst-name,"
- "[fdca:3ba5:a17a::1]"]
- mock_open.return_value.readlines.return_value = lines
-
- dnsmasq = self._get_dnsmasq(FakeDualNetwork())
- leases = dnsmasq._read_hosts_file_leases(filename)
+ lines = ["00:00:80:aa:bb:cc,id:client1,inst-name,192.168.0.1",
+ "00:00:80:aa:bb:cc,id:client2,inst-name,"
+ "[fdca:3ba5:a17a::1]"]
+ mock_open = self.useFixture(
+ tools.OpenFixture(filename, '\n'.join(lines))).mock_open
+ dnsmasq = self._get_dnsmasq(FakeDualNetwork())
+ leases = dnsmasq._read_hosts_file_leases(filename)
self.assertEqual(set([("192.168.0.1", "00:00:80:aa:bb:cc", 'client1'),
("fdca:3ba5:a17a::1", "00:00:80:aa:bb:cc",
from neutron.agent.linux import external_process as ep
from neutron.common import utils as common_utils
from neutron.tests import base
+from neutron.tests import tools
TEST_UUID = 'test-uuid'
self.assertEqual(retval, '/var/path/uuid.pid')
def test_pid(self):
- with mock.patch('six.moves.builtins.open') as mock_open:
- mock_open.return_value.__enter__ = lambda s: s
- mock_open.return_value.__exit__ = mock.Mock()
- mock_open.return_value.read.return_value = '5'
- manager = ep.ProcessManager(self.conf, 'uuid')
- self.assertEqual(manager.pid, 5)
+ self.useFixture(tools.OpenFixture('/var/path/uuid.pid', '5'))
+ manager = ep.ProcessManager(self.conf, 'uuid')
+ self.assertEqual(manager.pid, 5)
def test_pid_no_an_int(self):
- with mock.patch('six.moves.builtins.open') as mock_open:
- mock_open.return_value.__enter__ = lambda s: s
- mock_open.return_value.__exit__ = mock.Mock()
- mock_open.return_value.read.return_value = 'foo'
- manager = ep.ProcessManager(self.conf, 'uuid')
- self.assertIsNone(manager.pid, 5)
+ self.useFixture(tools.OpenFixture('/var/path/uuid.pid', 'foo'))
+ manager = ep.ProcessManager(self.conf, 'uuid')
+ self.assertIsNone(manager.pid)
def test_pid_invalid_file(self):
with mock.patch.object(ep.ProcessManager, 'get_pid_file_name') as name:
self.assertIsNone(manager.pid)
def test_active(self):
- with mock.patch('six.moves.builtins.open') as mock_open:
- mock_open.return_value.__enter__ = lambda s: s
- mock_open.return_value.__exit__ = mock.Mock()
- mock_open.return_value.readline.return_value = \
- 'python foo --router_id=uuid'
- with mock.patch.object(ep.ProcessManager, 'pid') as pid:
- pid.__get__ = mock.Mock(return_value=4)
- manager = ep.ProcessManager(self.conf, 'uuid')
- self.assertTrue(manager.active)
+ mock_open = self.useFixture(
+ tools.OpenFixture('/proc/4/cmdline', 'python foo --router_id=uuid')
+ ).mock_open
+ with mock.patch.object(ep.ProcessManager, 'pid') as pid:
+ pid.__get__ = mock.Mock(return_value=4)
+ manager = ep.ProcessManager(self.conf, 'uuid')
+ self.assertTrue(manager.active)
- mock_open.assert_called_once_with('/proc/4/cmdline', 'r')
+ mock_open.assert_called_once_with('/proc/4/cmdline', 'r')
def test_active_none(self):
dummy_cmd_line = 'python foo --router_id=uuid'
self.assertFalse(manager.active)
def test_active_cmd_mismatch(self):
- with mock.patch('six.moves.builtins.open') as mock_open:
- mock_open.return_value.__enter__ = lambda s: s
- mock_open.return_value.__exit__ = mock.Mock()
- mock_open.return_value.readline.return_value = \
- 'python foo --router_id=anotherid'
- with mock.patch.object(ep.ProcessManager, 'pid') as pid:
- pid.__get__ = mock.Mock(return_value=4)
- manager = ep.ProcessManager(self.conf, 'uuid')
- self.assertFalse(manager.active)
+ mock_open = self.useFixture(
+ tools.OpenFixture('/proc/4/cmdline',
+ 'python foo --router_id=anotherid')
+ ).mock_open
+ with mock.patch.object(ep.ProcessManager, 'pid') as pid:
+ pid.__get__ = mock.Mock(return_value=4)
+ manager = ep.ProcessManager(self.conf, 'uuid')
+ self.assertFalse(manager.active)
- mock_open.assert_called_once_with('/proc/4/cmdline', 'r')
+ mock_open.assert_called_once_with('/proc/4/cmdline', 'r')
from neutron.common import constants
from neutron.common import ipv6_utils
from neutron.tests import base
+from neutron.tests import tools
class IPv6byEUI64TestCase(base.BaseTestCase):
self.addCleanup(reset_detection_flag)
self.mock_exists = mock.patch("os.path.exists",
return_value=True).start()
- mock_open = mock.patch("six.moves.builtins.open").start()
- self.mock_read = mock_open.return_value.__enter__.return_value.read
+ self.proc_path = '/proc/sys/net/ipv6/conf/default/disable_ipv6'
def test_enabled(self):
- self.mock_read.return_value = "0"
+ self.useFixture(tools.OpenFixture(self.proc_path, '0'))
enabled = ipv6_utils.is_enabled()
self.assertTrue(enabled)
def test_disabled(self):
- self.mock_read.return_value = "1"
+ self.useFixture(tools.OpenFixture(self.proc_path, '1'))
enabled = ipv6_utils.is_enabled()
self.assertFalse(enabled)
def test_disabled_non_exists(self):
+ mo = self.useFixture(tools.OpenFixture(self.proc_path, '1')).mock_open
self.mock_exists.return_value = False
enabled = ipv6_utils.is_enabled()
self.assertFalse(enabled)
- self.assertFalse(self.mock_read.called)
+ self.assertFalse(mo.called)
def test_memoize(self):
- self.mock_read.return_value = "0"
+ mo = self.useFixture(tools.OpenFixture(self.proc_path, '0')).mock_open
ipv6_utils.is_enabled()
enabled = ipv6_utils.is_enabled()
self.assertTrue(enabled)
- self.mock_read.assert_called_once_with()
+ mo.assert_called_once_with(self.proc_path, 'r')
class TestIsAutoAddressSubnet(base.BaseTestCase):
if file_heads is None:
file_heads = []
fake_config = self.configs[0]
+ mock_open = self.useFixture(
+ tools.OpenFixture(cli._get_head_file_path(fake_config),
+ '\n'.join(file_heads))
+ ).mock_open
with mock.patch('alembic.script.ScriptDirectory.from_config') as fc,\
mock.patch.object(cli, '_use_separate_migration_branches',
return_value=not branchless):
fc.return_value.get_heads.return_value = heads
- with mock.patch.object(cli, 'open') as mock_open:
- mock_open.return_value.__enter__ = lambda s: s
- mock_open.return_value.__exit__ = mock.Mock()
- mock_open.return_value.read.return_value = (
- '\n'.join(file_heads))
-
- if not branchless or all(head in file_heads for head in heads):
- cli.validate_head_file(fake_config)
- else:
- self.assertRaises(
- SystemExit,
- cli.validate_head_file,
- fake_config
- )
- self.assertTrue(self.mock_alembic_err.called)
- if branchless:
- mock_open.assert_called_with(
- cli._get_head_file_path(fake_config))
- else:
- self.assertFalse(mock_open.called)
+ if not branchless or all(head in file_heads for head in heads):
+ cli.validate_head_file(fake_config)
+ else:
+ self.assertRaises(
+ SystemExit,
+ cli.validate_head_file,
+ fake_config
+ )
+ self.assertTrue(self.mock_alembic_err.called)
if branchless:
+ mock_open.assert_called_with(
+ cli._get_head_file_path(fake_config))
fc.assert_called_once_with(fake_config)
else:
+ self.assertFalse(mock_open.called)
self.assertFalse(fc.called)
def test_validate_head_file_multiple_heads(self):