import random
-from sqlalchemy.orm import exc
-
from neutron.common import constants
from neutron.common import exceptions
from neutron import context
from neutron.db import external_net_db
from neutron.db import l3_db
from neutron.db import models_v2
+from neutron.extensions import l3
from neutron.openstack.common import jsonutils
from neutron.openstack.common import log
from neutron.openstack.common import loopingcall
try:
network = self._plugin._get_network(context,
neutron_network_data['id'])
- except exc.NoResultFound:
+ except exceptions.NetworkNotFound:
pass
else:
network.status = status
try:
router = self._plugin._get_router(context,
neutron_router_data['id'])
- except exc.NoResultFound:
+ except l3.RouterNotFound:
pass
else:
router.status = status
try:
port = self._plugin._get_port(context,
neutron_port_data['id'])
- except exc.NoResultFound:
+ except exceptions.PortNotFound:
pass
else:
port.status = status
from neutron.api.v2 import attributes as attr
from neutron.common import config
from neutron.common import constants
+from neutron.common import exceptions as n_exc
from neutron import context
+from neutron.extensions import l3
from neutron.openstack.common import jsonutils as json
from neutron.openstack.common import log
from neutron.plugins.vmware.api_client import client
exp_status = constants.NET_STATUS_ACTIVE
self.assertEqual(exp_status, q_net['status'])
+ def test_synchronize_network_not_found_in_db_no_raise(self):
+ ctx = context.get_admin_context()
+ with self._populate_data(ctx):
+ # Put a network down to verify synchronization
+ ls_uuid = self.fc._fake_lswitch_dict.keys()[0]
+ q_net_id = self._get_tag_dict(
+ self.fc._fake_lswitch_dict[ls_uuid]['tags'])['quantum_net_id']
+ self.fc._fake_lswitch_dict[ls_uuid]['status'] = 'false'
+ q_net_data = self._plugin._get_network(ctx, q_net_id)
+ with mock.patch.object(self._plugin,
+ '_get_network') as _get_network:
+ _get_network.side_effect = n_exc.NetworkNotFound(
+ net_id=q_net_data['id'])
+ self._plugin._synchronizer.synchronize_network(ctx, q_net_data)
+
def test_synchronize_network_on_get(self):
cfg.CONF.set_override('always_read_status', True, 'NSX_SYNC')
ctx = context.get_admin_context()
q_net_data = self._plugin.get_network(ctx, q_net_id)
self.assertEqual(constants.NET_STATUS_DOWN, q_net_data['status'])
+ def test_synchronize_port_not_found_in_db_no_raise(self):
+ ctx = context.get_admin_context()
+ with self._populate_data(ctx):
+ # Put a port down to verify synchronization
+ lp_uuid = self.fc._fake_lswitch_lport_dict.keys()[0]
+ lport = self.fc._fake_lswitch_lport_dict[lp_uuid]
+ q_port_id = self._get_tag_dict(lport['tags'])['q_port_id']
+ lport['status'] = 'true'
+ q_port_data = self._plugin._get_port(ctx, q_port_id)
+ with mock.patch.object(self._plugin,
+ '_get_port') as _get_port:
+ _get_port.side_effect = n_exc.PortNotFound(
+ port_id=q_port_data['id'])
+ self._plugin._synchronizer.synchronize_port(ctx, q_port_data)
+
def test_synchronize_port(self):
ctx = context.get_admin_context()
with self._populate_data(ctx):
self.assertEqual(constants.PORT_STATUS_DOWN,
q_port_data['status'])
+ def test_synchronize_routernot_found_in_db_no_raise(self):
+ ctx = context.get_admin_context()
+ with self._populate_data(ctx):
+ # Put a router down to verify synchronization
+ lr_uuid = self.fc._fake_lrouter_dict.keys()[0]
+ q_rtr_id = self._get_tag_dict(
+ self.fc._fake_lrouter_dict[lr_uuid]['tags'])['q_router_id']
+ self.fc._fake_lrouter_dict[lr_uuid]['status'] = 'false'
+ q_rtr_data = self._plugin._get_router(ctx, q_rtr_id)
+ with mock.patch.object(self._plugin,
+ '_get_router') as _get_router:
+ _get_router.side_effect = l3.RouterNotFound(
+ router_id=q_rtr_data['id'])
+ self._plugin._synchronizer.synchronize_router(ctx, q_rtr_data)
+
def test_synchronize_router(self):
ctx = context.get_admin_context()
with self._populate_data(ctx):