'neutron.common.exceptions.NeutronException.use_fatal_exceptions',
fake_use_fatal_exceptions))
+ self.setup_rpc_mocks()
+
+ if sys.version_info < (2, 7) and getattr(self, 'fmt', '') == 'xml':
+ raise self.skipException('XML Testing Skipped in Py26')
+
+ self.setup_config()
+ self.addOnException(self.check_for_systemexit)
+
+ def setup_rpc_mocks(self):
# don't actually start RPC listeners when testing
self.useFixture(fixtures.MonkeyPatch(
'neutron.common.rpc.Connection.consume_in_threads',
fake_consume_in_threads))
+ # immediately return RPC calls
+ self.useFixture(fixtures.MonkeyPatch(
+ 'neutron.common.rpc.RpcProxy._RpcProxy__call_rpc_method',
+ mock.MagicMock()))
+
self.useFixture(fixtures.MonkeyPatch(
'oslo.messaging.Notifier', fake_notifier.FakeNotifier))
self.addCleanup(n_rpc.cleanup)
n_rpc.init(CONF)
- if sys.version_info < (2, 7) and getattr(self, 'fmt', '') == 'xml':
- raise self.skipException('XML Testing Skipped in Py26')
-
- self.setup_config()
- self.addOnException(self.check_for_systemexit)
-
def check_for_systemexit(self, exc_info):
if isinstance(exc_info[1], SystemExit):
self.fail("A SystemExit was raised during the test. %s"
'FixedIntervalLoopingCall',
new=MockFixedIntervalLoopingCall)):
self.agent = ovs_neutron_agent.OVSNeutronAgent(**kwargs)
+ # set back to true because initial report state will succeed due
+ # to mocked out RPC calls
+ self.agent.use_call = True
self.agent.tun_br = mock.Mock()
self.agent.sg_agent = mock.Mock()