# See the License for the specific language governing permissions and
# limitations under the License.
-from abc import abstractmethod
+import abc
from neutron.api import extensions
from neutron.api.v2 import attributes as attr
"""
raise exceptions.BadRequest
- @abstractmethod
+ @abc.abstractmethod
def delete_agent(self, context, id):
"""Delete agent.
"""
pass
- @abstractmethod
+ @abc.abstractmethod
def update_agent(self, context, agent):
"""Disable or Enable the agent.
"""
pass
- @abstractmethod
+ @abc.abstractmethod
def get_agents(self, context, filters=None, fields=None):
pass
- @abstractmethod
+ @abc.abstractmethod
def get_agent(self, context, id, fields=None):
pass
# License for the specific language governing permissions and limitations
# under the License.
-from abc import abstractmethod
+import abc
from neutron.api import extensions
from neutron.api.v2 import base
All of method must be in an admin context.
"""
- @abstractmethod
+ @abc.abstractmethod
def add_network_to_dhcp_agent(self, context, id, network_id):
pass
- @abstractmethod
+ @abc.abstractmethod
def remove_network_from_dhcp_agent(self, context, id, network_id):
pass
- @abstractmethod
+ @abc.abstractmethod
def list_networks_on_dhcp_agent(self, context, id):
pass
- @abstractmethod
+ @abc.abstractmethod
def list_dhcp_agents_hosting_network(self, context, network_id):
pass
from neutron.common import exceptions as qexception
from neutron.openstack.common import log as logging
from neutron.plugins.common import constants
-from neutron.services.service_base import ServicePluginBase
+from neutron.services import service_base
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
-class FirewallPluginBase(ServicePluginBase):
+class FirewallPluginBase(service_base.ServicePluginBase):
def get_plugin_name(self):
return constants.FIREWALL
# License for the specific language governing permissions and limitations
# under the License.
-from abc import abstractmethod
+import abc
from oslo.config import cfg
class RouterPluginBase(object):
- @abstractmethod
+ @abc.abstractmethod
def create_router(self, context, router):
pass
- @abstractmethod
+ @abc.abstractmethod
def update_router(self, context, id, router):
pass
- @abstractmethod
+ @abc.abstractmethod
def get_router(self, context, id, fields=None):
pass
- @abstractmethod
+ @abc.abstractmethod
def delete_router(self, context, id):
pass
- @abstractmethod
+ @abc.abstractmethod
def get_routers(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None, page_reverse=False):
pass
- @abstractmethod
+ @abc.abstractmethod
def add_router_interface(self, context, router_id, interface_info):
pass
- @abstractmethod
+ @abc.abstractmethod
def remove_router_interface(self, context, router_id, interface_info):
pass
- @abstractmethod
+ @abc.abstractmethod
def create_floatingip(self, context, floatingip):
pass
- @abstractmethod
+ @abc.abstractmethod
def update_floatingip(self, context, id, floatingip):
pass
- @abstractmethod
+ @abc.abstractmethod
def get_floatingip(self, context, id, fields=None):
pass
- @abstractmethod
+ @abc.abstractmethod
def delete_floatingip(self, context, id):
pass
- @abstractmethod
+ @abc.abstractmethod
def get_floatingips(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
# License for the specific language governing permissions and limitations
# under the License.
-from abc import abstractmethod
+import abc
import webob.exc
All of method must be in an admin context.
"""
- @abstractmethod
+ @abc.abstractmethod
def add_router_to_l3_agent(self, context, id, router_id):
pass
- @abstractmethod
+ @abc.abstractmethod
def remove_router_from_l3_agent(self, context, id, router_id):
pass
- @abstractmethod
+ @abc.abstractmethod
def list_routers_on_l3_agent(self, context, id):
pass
- @abstractmethod
+ @abc.abstractmethod
def list_l3_agents_hosting_router(self, context, router_id):
pass
# License for the specific language governing permissions and limitations
# under the License.
-from abc import abstractmethod
+import abc
from neutron.api import extensions
from neutron.api.v2 import base
All of method must be in an admin context.
"""
- @abstractmethod
+ @abc.abstractmethod
def list_pools_on_lbaas_agent(self, context, id):
pass
- @abstractmethod
+ @abc.abstractmethod
def get_lbaas_agent_hosting_pool(self, context, pool_id):
pass
from neutron.common import exceptions as qexception
from neutron import manager
from neutron.plugins.common import constants
-from neutron.services.service_base import ServicePluginBase
+from neutron.services import service_base
# Loadbalancer Exceptions
@six.add_metaclass(abc.ABCMeta)
-class LoadBalancerPluginBase(ServicePluginBase):
+class LoadBalancerPluginBase(service_base.ServicePluginBase):
def get_plugin_name(self):
return constants.LOADBALANCER
import webob
from neutron.api import extensions
-from neutron.api.v2.attributes import convert_to_int
+from neutron.api.v2 import attributes
from neutron.api.v2 import base
from neutron.api.v2 import resource
from neutron.common import exceptions as n_exc
-from neutron.manager import NeutronManager
+from neutron import manager
from neutron.openstack.common import importutils
from neutron import quota
from neutron import wsgi
def _update_attributes(self):
for quota_resource in QUOTAS.resources.iterkeys():
attr_dict = EXTENDED_ATTRIBUTES_2_0[RESOURCE_COLLECTION]
- attr_dict[quota_resource] = {'allow_post': False,
- 'allow_put': True,
- 'convert_to': convert_to_int,
- 'validate': {'type:range':
- [-1, sys.maxsize]},
- 'is_visible': True}
+ attr_dict[quota_resource] = {
+ 'allow_post': False,
+ 'allow_put': True,
+ 'convert_to': attributes.convert_to_int,
+ 'validate': {'type:range': [-1, sys.maxsize]},
+ 'is_visible': True}
self._update_extended_attributes = False
def _get_quotas(self, request, tenant_id):
def get_resources(cls):
"""Returns Ext Resources."""
controller = resource.Resource(
- QuotaSetsController(NeutronManager.get_plugin()),
+ QuotaSetsController(manager.NeutronManager.get_plugin()),
faults=base.FAULT_MAP)
return [extensions.ResourceExtension(
Quotasv2.get_alias(),
# License for the specific language governing permissions and limitations
# under the License.
-from abc import ABCMeta
-from abc import abstractmethod
+import abc
import netaddr
from oslo.config import cfg
return {}
-@six.add_metaclass(ABCMeta)
+@six.add_metaclass(abc.ABCMeta)
class SecurityGroupPluginBase(object):
- @abstractmethod
+ @abc.abstractmethod
def create_security_group(self, context, security_group):
pass
- @abstractmethod
+ @abc.abstractmethod
def update_security_group(self, context, id, security_group):
pass
- @abstractmethod
+ @abc.abstractmethod
def delete_security_group(self, context, id):
pass
- @abstractmethod
+ @abc.abstractmethod
def get_security_groups(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
pass
- @abstractmethod
+ @abc.abstractmethod
def get_security_group(self, context, id, fields=None):
pass
- @abstractmethod
+ @abc.abstractmethod
def create_security_group_rule(self, context, security_group_rule):
pass
- @abstractmethod
+ @abc.abstractmethod
def delete_security_group_rule(self, context, id):
pass
- @abstractmethod
+ @abc.abstractmethod
def get_security_group_rules(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
pass
- @abstractmethod
+ @abc.abstractmethod
def get_security_group_rule(self, context, id, fields=None):
pass
from neutron.api.v2 import resource_helper
from neutron.common import exceptions as qexception
from neutron.plugins.common import constants
-from neutron.services.service_base import ServicePluginBase
+from neutron.services import service_base
class VPNServiceNotFound(qexception.NotFound):
@six.add_metaclass(abc.ABCMeta)
-class VPNPluginBase(ServicePluginBase):
+class VPNPluginBase(service_base.ServicePluginBase):
def get_plugin_name(self):
return constants.VPN