def test_fork(self):
output = tests.run_python('tests/hub_test_fork.py')
lines = output.splitlines()
- self.assertEqual(lines, ["accept blocked", "child died ok"], output)
+ self.assertEqual(lines, [b"accept blocked", b"child died ok"], output)
class TestDeadRunLoop(LimitedTestCase):
with eventlet.Timeout(0.5, self.CustomException()):
# we now switch to the hub, there should be no existing timers
# that switch back to this greenlet and so this hub.switch()
- # call should block indefinately.
+ # call should block indefinitely.
self.assertRaises(self.CustomException, hub.switch)
def test_parent(self):
# we now switch to the hub which will allow
# completion of dummyproc.
# this should return execution back to the runloop and not
- # this greenlet so that hub.switch() would block indefinately.
+ # this greenlet so that hub.switch() would block indefinitely.
self.assertRaises(self.CustomException, hub.switch)
assert g.dead # sanity check that dummyproc has completed
except AttributeError:
pass
-import __builtin__
-original_import = __builtin__.__import__
+from eventlet.support.six.moves import builtins
+
+original_import = builtins.__import__
def fail_import(name, *args, **kwargs):
if 'epoll' in name:
print('kqueue tried')
return original_import(name, *args, **kwargs)
-__builtin__.__import__ = fail_import
+builtins.__import__ = fail_import
import eventlet.hubs