from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
-from neutron.openstack.common.rpc import common
from neutron.openstack.common import service
from neutron import service as neutron_service
% {'net_id': network.id, 'action': action})
except Exception as e:
self.schedule_resync(e)
- if (isinstance(e, common.RemoteError)
+ if (isinstance(e, rpc_compat.RemoteError)
and e.exc_type == 'NetworkNotFound'
or isinstance(e, exceptions.NetworkNotFound)):
LOG.warning(_("Network %s has been deleted."), network.id)
from neutron.openstack.common import loopingcall
from neutron.openstack.common import periodic_task
from neutron.openstack.common import processutils
-from neutron.openstack.common.rpc import common as rpc_common
from neutron.openstack.common import service
from neutron import service as neutron_service
from neutron.services.firewall.agents.l3reference import firewall_l3_agent
def get_external_network_id(self, context):
"""Make a remote process call to retrieve the external network id.
- @raise common.RemoteError: with TooManyExternalNetworks
- as exc_type if there are
- more than one external network
+ @raise rpc_compat.RemoteError: with TooManyExternalNetworks
+ as exc_type if there are
+ more than one external network
"""
return self.call(context,
self.make_msg('get_external_network_id',
self.target_ex_net_id = self.plugin_rpc.get_external_network_id(
self.context)
return self.target_ex_net_id
- except rpc_common.RemoteError as e:
+ except rpc_compat.RemoteError as e:
with excutils.save_and_reraise_exception() as ctx:
if e.exc_type == 'TooManyExternalNetworks':
ctx.reraise = False
self._process_routers(routers, all_routers=True)
self.fullsync = False
LOG.debug(_("_sync_routers_task successfully completed"))
- except rpc_common.RPCException:
+ except rpc_compat.RPCException:
LOG.exception(_("Failed synchronizing routers due to RPC error"))
self.fullsync = True
return
# License for the specific language governing permissions and limitations
# under the License.
+from neutron.openstack.common.rpc import common as rpc_common
from neutron.openstack.common.rpc import proxy
emulate RpcProxy class behaviour using oslo.messaging API once the
migration is applied.
'''
+
+
+# exceptions
+RPCException = rpc_common.RPCException
+RemoteError = rpc_common.RemoteError
+MessagingTimeout = rpc_common.Timeout
from neutron.common import config as logging_config
from neutron.common import constants
from neutron.common import exceptions
+from neutron.common import rpc_compat
from neutron.common import topics
from neutron.common import utils as q_utils
from neutron import context
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
-from neutron.openstack.common.rpc import common as rpc_common
from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.common import constants as p_const
from neutron.plugins.linuxbridge.common import config # noqa
tap_device_name,
self.agent.agent_id,
cfg.CONF.host)
- except rpc_common.Timeout:
+ except rpc_compat.MessagingTimeout:
LOG.error(_("RPC timeout while updating port %s"), port['id'])
def fdb_add(self, context, fdb_entries):
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import config as logging_config
from neutron.common import constants as q_constants
+from neutron.common import rpc_compat
from neutron.common import topics
from neutron.common import utils as q_utils
from neutron import context
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
-from neutron.openstack.common.rpc import common as rpc_common
from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.common import constants as p_const
from neutron.plugins.mlnx.agent import utils
port['mac_address'],
self.agent.agent_id,
cfg.CONF.host)
- except rpc_common.Timeout:
+ except rpc_compat.MessagingTimeout:
LOG.error(_("RPC timeout while updating port %s"), port['id'])
else:
LOG.debug(_("No port %s defined on agent."), port['id'])
from neutron.agent.linux import utils
from neutron.common import constants
from neutron.common import exceptions
-from neutron.openstack.common.rpc import common as rpc_common
+from neutron.common import rpc_compat
from neutron.plugins.common import constants as p_const
from neutron.plugins.linuxbridge.agent import linuxbridge_neutron_agent
from neutron.plugins.linuxbridge.common import constants as lconst
port = {"admin_state_up": True,
"id": "1234-5678",
"network_id": "123-123"}
- plugin_rpc.update_device_up.side_effect = rpc_common.Timeout
+ timeout_class = rpc_compat.MessagingTimeout
+ plugin_rpc.update_device_up.side_effect = timeout_class
self.lb_rpc.port_update(mock.Mock(), port=port)
self.assertTrue(plugin_rpc.update_device_up.called)
self.assertEqual(log.call_count, 1)
log.reset_mock()
port["admin_state_up"] = False
- plugin_rpc.update_device_down.side_effect = rpc_common.Timeout
+ plugin_rpc.update_device_down.side_effect = timeout_class
self.lb_rpc.port_update(mock.Mock(), port=port)
self.assertTrue(plugin_rpc.update_device_down.called)
self.assertEqual(log.call_count, 1)
from neutron.agent.linux import interface
from neutron.common import constants as const
from neutron.common import exceptions
-from neutron.openstack.common.rpc import common
+from neutron.common import rpc_compat
from neutron.tests import base
def test_call_driver_remote_error_net_not_found(self):
self._test_call_driver_failure(
- exc=common.RemoteError(exc_type='NetworkNotFound'),
+ exc=rpc_compat.RemoteError(exc_type='NetworkNotFound'),
trace_level='warning')
def test_call_driver_network_not_found(self):