def existing_dhcp_networks(cls, conf, root_helper):
"""Return a list of existing networks ids that we have configs for."""
- raise NotImplementedError
+ raise NotImplementedError()
@classmethod
def check_version(cls):
"""Execute version checks on DHCP server."""
- raise NotImplementedError
+ raise NotImplementedError()
@classmethod
def get_isolated_subnets(cls, network):
"""Returns a dict indicating whether or not a subnet is isolated"""
- raise NotImplementedError
+ raise NotImplementedError()
@classmethod
def should_enable_metadata(cls, conf, network):
"""True if the metadata-proxy should be enabled for the network."""
- raise NotImplementedError
+ raise NotImplementedError()
class DhcpLocalProcess(DhcpBase):
return getattr(self._plugin, name)(*arg_list, **kwargs)
return _handle_action
else:
- raise AttributeError
+ raise AttributeError()
def _get_pagination_helper(self, request):
if self._allow_pagination and self._native_pagination:
This operation is not allow in REST API.
@raise exceptions.BadRequest:
"""
- raise exceptions.BadRequest
+ raise exceptions.BadRequest()
@abc.abstractmethod
def delete_agent(self, context, id):
.. note:: this method is optional, as it was not part of the originally
defined plugin API.
"""
- raise NotImplementedError
+ raise NotImplementedError()
@abc.abstractmethod
def delete_subnet(self, context, id):
NOTE: this method is optional, as it was not part of the originally
defined plugin API.
"""
- raise NotImplementedError
+ raise NotImplementedError()
@abc.abstractmethod
def delete_network(self, context, id):
.. note:: this method is optional, as it was not part of the originally
defined plugin API.
"""
- raise NotImplementedError
+ raise NotImplementedError()
@abc.abstractmethod
def delete_port(self, context, id):
.. note:: this method is optional, as it was not part of the originally
defined plugin API.
"""
- raise NotImplementedError
+ raise NotImplementedError()
def rpc_workers_supported(self):
"""Return whether the plugin supports multiple RPC workers.
except KeyError as e:
LOG.error(_("Missing device parameter:%s. Aborting "
"CSR1kvRoutingDriver initialization"), e)
- raise cfg_exc.CSR1kvInitializationException
+ raise cfg_exc.CSR1kvInitializationException()
###### Public Functions ########
def router_added(self, ri):
self._set_ha_HSRP(subinterface, vrf_name, priority, group, ip)
def _csr_add_ha_VRRP(self, ri, port):
- raise NotImplementedError
+ raise NotImplementedError()
def _csr_add_ha_GBLP(self, ri, port):
- raise NotImplementedError
+ raise NotImplementedError()
def _csr_remove_ha(self, ri, port):
pass
alloc.allocated = True
db_session.add(alloc)
except exc.NoResultFound:
- raise c_exc.VlanIDOutsidePool
+ raise c_exc.VlanIDOutsidePool()
def release_vlan(db_session, physical_network, vlan_id):
alloc.allocated = True
db_session.add(alloc)
except exc.NoResultFound:
- raise c_exc.VxlanIDOutsidePool
+ raise c_exc.VxlanIDOutsidePool()
def release_vxlan(db_session, vxlan_id):
segment2),
encap_profile)
else:
- raise cisco_exceptions.NoClusterFound
+ raise cisco_exceptions.NoClusterFound()
for profile in encap_dict:
n1kvclient.update_encapsulation_profile(context, profile,
seg_min, seg_max = self._get_segment_range(
network_profile['segment_range'])
if not seg_min <= segmentation_id <= seg_max:
- raise cisco_exceptions.VlanIDOutsidePool
+ raise cisco_exceptions.VlanIDOutsidePool()
n1kv_db_v2.reserve_specific_vlan(session,
physical_network,
segmentation_id)
return getattr(plugin, key)
# if no plugin support the method, then raise
- raise AttributeError
+ raise AttributeError()
def _extend_network_dict(self, context, network):
flavor = self._get_flavor_by_network_id(context, network['id'])
def create_port(self, context, port):
p = port['port']
if 'network_id' not in p:
- raise exc.NotFound
+ raise exc.NotFound()
plugin = self._get_plugin_by_network_id(context, p['network_id'])
return plugin.create_port(context, port)
def create_subnet(self, context, subnet):
s = subnet['subnet']
if 'network_id' not in s:
- raise exc.NotFound
+ raise exc.NotFound()
plugin = self._get_plugin_by_network_id(context,
s['network_id'])
return plugin.create_subnet(context, subnet)
LOG.warning(_("Allocate %(type)s segment from pool failed "
"after %(number)s failed attempts"),
{"type": network_type, "number": DB_MAX_RETRIES})
- raise exc.NoNetworkFoundInMaximumAllowedAttempts
+ raise exc.NoNetworkFoundInMaximumAllowedAttempts()
if self.is_partial_segment(segment):
alloc = self.allocate_partially_specified_segment(session)
if not alloc:
- raise exc.NoNetworkAvailable
+ raise exc.NoNetworkAvailable()
else:
segmentation_id = segment.get(api.SEGMENTATION_ID)
alloc = self.allocate_fully_specified_segment(
alloc = self.allocate_partially_specified_segment(
session, **filters)
if not alloc:
- raise exc.NoNetworkAvailable
+ raise exc.NoNetworkAvailable()
else:
alloc = self.allocate_fully_specified_segment(
session, **filters)
def _synchronize_state(self, sp):
# If the plugin has been destroyed, stop the LoopingCall
if not self._plugin:
- raise loopingcall.LoopingCallDone
+ raise loopingcall.LoopingCallDone()
start = timeutils.utcnow()
# Reset page cursor variables if necessary
if sp.current_chunk == 0:
try:
val = int(val)
if val < 0:
- raise ValueError
+ raise ValueError()
except (ValueError, TypeError):
msg = _("'%s' must be a non negative integer.") % val
raise qexception.InvalidInput(error_message=msg)
msg = _("'rpc_workers = %d' ignored because start_rpc_listeners "
"is not implemented.")
LOG.error(msg, cfg.CONF.rpc_workers)
- raise NotImplementedError
+ raise NotImplementedError()
try:
rpc = RpcWorker(plugin)
deferred_br.add_flow(**self.add_flow_dict1)
deferred_br.mod_flow(**self.mod_flow_dict1)
deferred_br.delete_flows(**self.del_flow_dict1)
- raise Exception
+ raise Exception()
except Exception:
self._verify_mock_call([])
else:
LOG.error('Failed to delete port %(p_id)s for vm instance '
'%(v_id)s due to %(err)s',
{'p_id': port['id'], 'v_id': vm_id, 'err': e})
- raise nova_exc.InternalServerError
+ raise nova_exc.InternalServerError()
def _mock_svc_vm_create_delete(self, plugin):
# Mock novaclient methods for creation/deletion of service VMs
if index == 0:
return '/usr/local/bin/neutron-dhcp-agent'
else:
- raise IndexError
+ raise IndexError()
expected = [
'ip',
def test_create_network_gateway_nsx_error_returns_500(self):
def raise_nsx_api_exc(*args, **kwargs):
- raise api_exc.NsxApiException
+ raise api_exc.NsxApiException()
with mock.patch.object(nsxlib.l2gateway,
'create_l2_gw_service',
version.Version(fake_version))
def _faulty_request(*args, **kwargs):
- raise exception.NsxApiException
+ raise exception.NsxApiException()
instance.return_value.request.side_effect = _faulty_request
self.fake_cluster = cluster.NSXCluster(