with context.session.begin(subtransactions=True):
# goto to the plugin DB and fetch the network
network = self._get_network(context, id)
- if fields and 'status' in fields:
+ if (self.nvp_sync_opts.always_read_status or
+ fields and 'status' in fields):
# External networks are not backed by nvp lswitches
if not network.external:
# Perform explicit state synchronization
- self._synchronizer.synchronize_network(
- context, network)
+ self._synchronizer.synchronize_network(context, network)
# Don't do field selection here otherwise we won't be able
# to add provider networks fields
net_result = self._make_network_dict(network)
def get_port(self, context, id, fields=None):
with context.session.begin(subtransactions=True):
- if fields and 'status' in fields:
+ if (self.nvp_sync_opts.always_read_status or
+ fields and 'status' in fields):
# Perform explicit state synchronization
db_port = self._get_port(context, id)
self._synchronizer.synchronize_port(
return super(NvpPluginV2, self).get_port(context, id, fields)
def get_router(self, context, id, fields=None):
- if fields and 'status' in fields:
+ if (self.nvp_sync_opts.always_read_status or
+ fields and 'status' in fields):
db_router = self._get_router(context, id)
# Perform explicit state synchronization
self._synchronizer.synchronize_router(
args = ['--config-file', get_fake_conf('neutron.conf.test'),
'--config-file', get_fake_conf('nvp.ini.test')]
config.parse(args=args)
+ cfg.CONF.set_override('allow_overlapping_ips', True)
self._plugin = NeutronPlugin.NvpPluginV2()
+ # Mock neutron manager plugin load functions to speed up tests
mock_nm_get_plugin = mock.patch('neutron.manager.NeutronManager.'
'get_plugin')
+ mock_nm_get_service_plugins = mock.patch(
+ 'neutron.manager.NeutronManager.get_service_plugins')
self.mock_nm_get_plugin = mock_nm_get_plugin.start()
self.mock_nm_get_plugin.return_value = self._plugin
+ mock_nm_get_service_plugins.start()
super(NvpSyncTestCase, self).setUp()
self.addCleanup(self.fc.reset_all)
self.addCleanup(patch_sync.stop)
self.addCleanup(mock_nvpapi.stop)
self.addCleanup(mock_nm_get_plugin.stop)
+ self.addCleanup(mock_nm_get_service_plugins.stop)
def tearDown(self):
cfg.CONF.reset()
exp_status = constants.NET_STATUS_ACTIVE
self.assertEqual(exp_status, q_net['status'])
+ def test_synchronize_network_on_get(self):
+ cfg.CONF.set_override('always_read_status', True, 'NVP_SYNC')
+ ctx = context.get_admin_context()
+ with self._populate_data(ctx):
+ # Put a network down to verify punctual synchronization
+ q_net_id = ls_uuid = self.fc._fake_lswitch_dict.keys()[0]
+ self.fc._fake_lswitch_dict[ls_uuid]['status'] = 'false'
+ q_net_data = self._plugin.get_network(ctx, q_net_id)
+ self.assertEqual(constants.NET_STATUS_DOWN, q_net_data['status'])
+
def test_synchronize_port(self):
ctx = context.get_admin_context()
with self._populate_data(ctx):
- # Put a network down to verify synchronization
+ # Put a port down to verify synchronization
lp_uuid = self.fc._fake_lswitch_lport_dict.keys()[0]
lport = self.fc._fake_lswitch_lport_dict[lp_uuid]
q_port_id = self._get_tag_dict(lport['tags'])['q_port_id']
exp_status = constants.PORT_STATUS_ACTIVE
self.assertEqual(exp_status, q_port['status'])
+ def test_synchronize_port_on_get(self):
+ cfg.CONF.set_override('always_read_status', True, 'NVP_SYNC')
+ ctx = context.get_admin_context()
+ with self._populate_data(ctx):
+ # Put a port down to verify punctual synchronization
+ lp_uuid = self.fc._fake_lswitch_lport_dict.keys()[0]
+ lport = self.fc._fake_lswitch_lport_dict[lp_uuid]
+ q_port_id = self._get_tag_dict(lport['tags'])['q_port_id']
+ lport['status'] = 'false'
+ q_port_data = self._plugin.get_port(ctx, q_port_id)
+ self.assertEqual(constants.PORT_STATUS_DOWN,
+ q_port_data['status'])
+
def test_synchronize_router(self):
ctx = context.get_admin_context()
with self._populate_data(ctx):
- # Put a network down to verify synchronization
+ # Put a router down to verify synchronization
q_rtr_id = lr_uuid = self.fc._fake_lrouter_dict.keys()[0]
self.fc._fake_lrouter_dict[lr_uuid]['status'] = 'false'
q_rtr_data = self._plugin._get_router(ctx, q_rtr_id)
exp_status = constants.NET_STATUS_ACTIVE
self.assertEqual(exp_status, q_rtr['status'])
+ def test_synchronize_router_on_get(self):
+ cfg.CONF.set_override('always_read_status', True, 'NVP_SYNC')
+ ctx = context.get_admin_context()
+ with self._populate_data(ctx):
+ # Put a router down to verify punctual synchronization
+ q_rtr_id = lr_uuid = self.fc._fake_lrouter_dict.keys()[0]
+ self.fc._fake_lrouter_dict[lr_uuid]['status'] = 'false'
+ q_rtr_data = self._plugin.get_router(ctx, q_rtr_id)
+ self.assertEqual(constants.NET_STATUS_DOWN, q_rtr_data['status'])
+
def test_sync_nvp_failure_backoff(self):
self.mock_nvpapi.return_value.request.side_effect = (
NvpApiClient.RequestTimeout)