+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-#
-# Copyright 2011 Cisco Systems, Inc. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-# @author: Ying Liu, Cisco Systems, Inc.
-#
-
-import logging
-import routes
-import webob.dec
-import webob.exc
-
-from quantum import manager
-from quantum.api import faults
-from quantum.api import networks
-from quantum.api import ports
-from quantum.common import flags
-from quantum.common import wsgi
-from cisco_extensions import portprofiles
-from cisco_extensions import extensions
-
-
-LOG = logging.getLogger('quantum_extension.api')
-FLAGS = flags.FLAGS
-
-
-class ExtRouterV01(wsgi.Router):
- """
- Routes requests on the Quantum API to the appropriate controller
- """
-
- def __init__(self, ext_mgr=None):
- uri_prefix = '/tenants/{tenant_id}/'
-
- mapper = routes.Mapper()
- plugin = manager.QuantumManager().get_plugin()
- controller = portprofiles.Controller(plugin)
- ext_controller = extensions.Controller(plugin)
- mapper.connect("home", "/", controller=ext_controller,
- action="list_extension",
- conditions=dict(method=['GET']))
- #mapper.redirect("/", "www.google.com")
- mapper.resource("portprofiles", "portprofiles",
- controller=controller,
- path_prefix=uri_prefix)
- mapper.connect("associate_portprofile",
- uri_prefix
- + 'portprofiles/{portprofile_id}/assignment{.format}',
- controller=controller,
- action="associate_portprofile",
- conditions=dict(method=['PUT']))
- mapper.connect("disassociate_portprofile",
- uri_prefix
- + 'portprofiles/{portprofile_id}/assignment{.format}',
- controller=controller,
- action="disassociate_portprofile",
- conditions=dict(method=['DELETE']))
-
- super(ExtRouterV01, self).__init__(mapper)
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-#
-# Copyright 2011 Cisco Systems, Inc. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-# @author: Ying Liu, Cisco Systems, Inc.
-#
-import logging
-
-
-class ExtensionException(Exception):
- """Quantum Cisco api Exception
-
- Taken from nova.exception.NovaException
- To correctly use this class, inherit from it and define
- a 'message' property. That message will get printf'd
- with the keyword arguments provided to the constructor.
-
- """
- message = _("An unknown exception occurred.")
-
- def __init__(self, **kwargs):
- try:
- self._error_string = self.message % kwargs
-
- except Exception:
- # at least get the core message out if something happened
- self._error_string = self.message
-
- def __str__(self):
- return self._error_string
-
-
-class ProcessExecutionError(IOError):
- def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None,
- description=None):
- if description is None:
- description = "Unexpected error while running command."
- if exit_code is None:
- exit_code = '-'
- message = "%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r" % (
- description, cmd, exit_code, stdout, stderr)
- IOError.__init__(self, message)
-
-
-class Error(Exception):
- def __init__(self, message=None):
- super(Error, self).__init__(message)
-
-
-class ApiError(Error):
- def __init__(self, message='Unknown', code='Unknown'):
- self.message = message
- self.code = code
- super(ApiError, self).__init__('%s: %s' % (code, message))
-
-
-class NotFound(ExtensionException):
- pass
-
-
-class ClassNotFound(NotFound):
- message = _("Class %(class_name)s could not be found")
-
-
-class PortprofileNotFound(NotFound):
- message = _("Portprofile %(_id)s could not be found")
-
-
-class PortNotFound(NotFound):
- message = _("Port %(port_id)s could not be found " \
- "on Network %(net_id)s")
-
-
-"""
-
-
-class PortprofileInUse(ExtensionException):
- message = _("Unable to complete operation on Portprofile %(net_id)s. " \
- "There is one or more attachments plugged into its ports.")
-
-
-class PortInUse(ExtensionException):
- message = _("Unable to complete operation on port %(port_id)s " \
- "for Portprofile %(net_id)s. The attachment '%(att_id)s" \
- "is plugged into the logical port.")
-
-class AlreadyAttached(ExtensionException):
- message = _("Unable to plug the attachment %(att_id)s into port " \
- "%(port_id)s for Portprofile %(net_id)s. The attachment is " \
- "already plugged into port %(att_port_id)s")
-
-"""
-
-
-class Duplicate(Error):
- pass
-
-
-class NotAuthorized(Error):
- pass
-
-
-class NotEmpty(Error):
- pass
-
-
-class Invalid(Error):
- pass
-
-
-class InvalidContentType(Invalid):
- message = _("Invalid content type %(content_type)s.")
-
-
-class BadInputError(Exception):
- """Error resulting from a client sending bad input to a server"""
- pass
-
-
-class MissingArgumentError(Error):
- pass
-
-
-def wrap_exception(f):
- def _wrap(*args, **kw):
- try:
- return f(*args, **kw)
- except Exception, e:
- if not isinstance(e, Error):
- #exc_type, exc_value, exc_traceback = sys.exc_info()
- logging.exception('Uncaught exception')
- #logging.error(traceback.extract_stack(exc_traceback))
- raise Error(str(e))
- raise
- _wrap.func_name = f.func_name
- return _wrap
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-#
-# Copyright 2011 Cisco Systems, Inc. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-# @author: Ying Liu, Cisco Systems, Inc.
-#
-import logging
-import webob.dec
-
-from quantum.common import wsgi
-from quantum.api import api_common as common
-
-
-LOG = logging.getLogger('quantum.api.cisco_extension.extensions')
-
-
-class Controller(common.QuantumController):
-
- def __init__(self, plugin):
- #self._plugin = plugin
- #super(QuantumController, self).__init__()
- self._resource_name = 'extensions'
- super(Controller, self).__init__(plugin)
-
- def list_extension(self, req):
- """Respond to a request for listing all extension api."""
- response = "extensions api list"
- return response
-
-
\ No newline at end of file
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-#
-# Copyright 2011 Cisco Systems, Inc. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-# @author: Ying Liu, Cisco Systems, Inc.
-#
-import webob.dec
-import webob.exc
-
-from quantum.api import api_common as common
-from quantum.common import wsgi
-
-
-class Fault(webob.exc.HTTPException):
- """Error codes for API faults"""
-
- _fault_names = {
- 400: "malformedRequest",
- 401: "unauthorized",
- 420: "networkNotFound",
- 421: "PortprofileInUse",
- 430: "portNotFound",
- 431: "requestedStateInvalid",
- 432: "portInUse",
- 440: "alreadyAttached",
- 450: "PortprofileNotFound",
- 470: "serviceUnavailable",
- 471: "pluginFault"}
-
- def __init__(self, exception):
- """Create a Fault for the given webob.exc.exception."""
- self.wrapped_exc = exception
-
- @webob.dec.wsgify(RequestClass=wsgi.Request)
- def __call__(self, req):
- """Generate a WSGI response based on the exception passed to ctor."""
- #print ("*********TEST2")
- # Replace the body with fault details.
- code = self.wrapped_exc.status_int
- fault_name = self._fault_names.get(code, "quantumServiceFault")
- fault_data = {
- fault_name: {
- 'code': code,
- 'message': self.wrapped_exc.explanation,
- 'detail': self.wrapped_exc.detail}}
- # 'code' is an attribute on the fault tag itself
- metadata = {'application/xml': {'attributes': {fault_name: 'code'}}}
- default_xmlns = common.XML_NS_V10
- serializer = wsgi.Serializer(metadata, default_xmlns)
- content_type = req.best_match_content_type()
- self.wrapped_exc.body = serializer.serialize(fault_data, content_type)
- self.wrapped_exc.content_type = content_type
- return self.wrapped_exc
-
-
-class PortprofileNotFound(webob.exc.HTTPClientError):
- """
- subclass of :class:`~HTTPClientError`
-
- This indicates that the server did not find the Portprofile specified
- in the HTTP request
-
- code: 450, title: Portprofile not Found
- """
- #print ("*********TEST1")
- code = 450
- title = 'Portprofile Not Found'
- explanation = ('Unable to find a Portprofile with'
- + ' the specified identifier.')
-
-
-class PortNotFound(webob.exc.HTTPClientError):
- """
- subclass of :class:`~HTTPClientError`
-
- This indicates that the server did not find the port specified
- in the HTTP request for a given network
-
- code: 430, title: Port not Found
- """
- code = 430
- title = 'Port not Found'
- explanation = ('Unable to find a port with the specified identifier.')
-
-
-class RequestedStateInvalid(webob.exc.HTTPClientError):
- """
- subclass of :class:`~HTTPClientError`
-
- This indicates that the server could not update the port state to
- to the request value
-
- code: 431, title: Requested State Invalid
- """
- code = 431
- title = 'Requested State Invalid'
- explanation = ('Unable to update port state with specified value.')
-
-
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-#
-# Copyright 2011 Cisco Systems, Inc. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-# @author: Ying Liu, Cisco Systems, Inc.
-#
-
-import logging
-import webob.dec
-from quantum.common import wsgi
-from webob import exc
-
-from quantum.api import api_common as common
-
-from cisco_extensions import pprofiles as pprofiles_view
-from cisco_extensions import exceptions as exception
-from cisco_extensions import faults as faults
-
-LOG = logging.getLogger('quantum.api.portprofiles')
-
-
-class Controller(common.QuantumController):
- """ portprofile API controller
- based on QuantumController """
-
- _portprofile_ops_param_list = [{
- 'param-name': 'portprofile-name',
- 'required': True}, {
- 'param-name': 'vlan-id',
- 'required': True}, {
- 'param-name': 'assignment',
- 'required': False}]
-
- _assignprofile_ops_param_list = [{
- 'param-name': 'network-id',
- 'required': True}, {
- 'param-name': 'port-id',
- 'required': True}]
-
- _serialization_metadata = {
- "application/xml": {
- "attributes": {
- "portprofile": ["id", "name"],
- },
- },
- }
-
- def __init__(self, plugin):
- self._resource_name = 'portprofile'
- super(Controller, self).__init__(plugin)
-
- def index(self, request, tenant_id):
- """ Returns a list of portprofile ids """
- #TODO: this should be for a given tenant!!!
- return self._items(request, tenant_id, is_detail=False)
-
- def _items(self, request, tenant_id, is_detail):
- """ Returns a list of portprofiles. """
- portprofiles = self._plugin.get_all_portprofiles(tenant_id)
- builder = pprofiles_view.get_view_builder(request)
- result = [builder.build(portprofile, is_detail)['portprofile']
- for portprofile in portprofiles]
- return dict(portprofiles=result)
-
- def show(self, request, tenant_id, id):
- """ Returns portprofile details for the given portprofile id """
- try:
- portprofile = self._plugin.get_portprofile_details(
- tenant_id, id)
- builder = pprofiles_view.get_view_builder(request)
- #build response with details
- result = builder.build(portprofile, True)
- return dict(portprofiles=result)
- except exception.PortprofileNotFound as e:
- return faults.Fault(faults.PortprofileNotFound(e))
- #return faults.Fault(e)
-
- def create(self, request, tenant_id):
- """ Creates a new portprofile for a given tenant """
- #look for portprofile name in request
- try:
- req_params = \
- self._parse_request_params(request,
- self._portprofile_ops_param_list)
- except exc.HTTPError as e:
- return faults.Fault(e)
- portprofile = self._plugin.\
- create_portprofile(tenant_id,
- req_params['portprofile-name'],
- req_params['vlan-id'])
- builder = pprofiles_view.get_view_builder(request)
- result = builder.build(portprofile)
- return dict(portprofiles=result)
-
- def update(self, request, tenant_id, id):
- """ Updates the name for the portprofile with the given id """
- try:
- req_params = \
- self._parse_request_params(request,
- self._portprofile_ops_param_list)
- except exc.HTTPError as e:
- return faults.Fault(e)
- try:
- portprofile = self._plugin.\
- rename_portprofile(tenant_id,
- id, req_params['portprofile-name'])
-
- builder = pprofiles_view.get_view_builder(request)
- result = builder.build(portprofile, True)
- return dict(portprofiles=result)
- except exception.PortprofileNotFound as e:
- return faults.Fault(faults.PortprofileNotFound(e))
-
- def delete(self, request, tenant_id, id):
- """ Destroys the portprofile with the given id """
- try:
- self._plugin.delete_portprofile(tenant_id, id)
- return exc.HTTPAccepted()
- except exception.PortprofileNotFound as e:
- return faults.Fault(faults.PortprofileNotFound(e))
-
- #added for cisco's extension
- def associate_portprofile(self, request, tenant_id, portprofile_id):
- content_type = request.best_match_content_type()
- print "Content type:%s" % content_type
-
- try:
- req_params = \
- self._parse_request_params(request,
- self._assignprofile_ops_param_list)
- except exc.HTTPError as e:
- return faults.Fault(e)
- net_id = req_params['network-id'].strip()
- #print "*****net id "+net_id
- port_id = req_params['port-id'].strip()
- try:
- self._plugin.associate_portprofile(tenant_id,
- net_id, port_id,
- portprofile_id)
- return exc.HTTPAccepted()
- except exception.PortprofileNotFound as e:
- return faults.Fault(faults.PortprofileNotFound(e))
- except exception.PortNotFound as e:
- return faults.Fault(faults.PortNotFound(e))
-
- #added for Cisco extension
- def disassociate_portprofile(self, request, tenant_id, portprofile_id):
- content_type = request.best_match_content_type()
- print "Content type:%s" % content_type
-
- try:
- req_params = \
- self._parse_request_params(request,
- self._assignprofile_ops_param_list)
- except exc.HTTPError as e:
- return faults.Fault(e)
- net_id = req_params['network-id'].strip()
- #print "*****net id "+net_id
- port_id = req_params['port-id'].strip()
- try:
- self._plugin. \
- disassociate_portprofile(tenant_id,
- net_id, port_id, portprofile_id)
- return exc.HTTPAccepted()
- except exception.PortprofileNotFound as e:
- return faults.Fault(faults.PortprofileNotFound(e))
- except exception.PortNotFound as e:
- return faults.Fault(faults.PortNotFound(e))
+++ /dev/null
-
-
-import os
-
-
-def get_view_builder(req):
- base_url = req.application_url
- return ViewBuilder(base_url)
-
-
-class ViewBuilder(object):
- """
- ViewBuilder for Portprofile,
- derived from quantum.views.networks
- """
- def __init__(self, base_url):
- """
- :param base_url: url of the root wsgi application
- """
- self.base_url = base_url
-
- def build(self, portprofile_data, is_detail=False):
- """Generic method used to generate a portprofile entity."""
- print "portprofile-DATA:%s" %portprofile_data
- if is_detail:
- portprofile = self._build_detail(portprofile_data)
- else:
- portprofile = self._build_simple(portprofile_data)
- return portprofile
-
- def _build_simple(self, portprofile_data):
- """Return a simple model of a server."""
- return dict(portprofile=dict(id=portprofile_data['profile-id']))
-
- def _build_detail(self, portprofile_data):
- """Return a simple model of a server."""
- if (portprofile_data['assignment']==None):
- return dict(portprofile=dict(id=portprofile_data['profile-id'],
- name=portprofile_data['profile-name'],
- vlan_id=portprofile_data['vlan-id']))
- else:
- return dict(portprofile=dict(id=portprofile_data['profile-id'],
- name=portprofile_data['profile-name'],
- vlan_id=portprofile_data['vlan-id'],
- assignment=portprofile_data['assignment']))
+++ /dev/null
-[DATABASE]
-name = quantum_l2network
-user = root
-pass = nova
-host = 127.0.0.1
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2011, Cisco Systems, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-# @author: Rohit Agarwalla, Cisco Systems, Inc.
-
-import ConfigParser
-import os
-import logging as LOG
-import unittest
-
-from optparse import OptionParser
-from quantum.plugins.cisco.common import cisco_constants as const
-
-import quantum.db.api as db
-import quantum.db.models
-import quantum.plugins.cisco.db.l2network_db as l2network_db
-import quantum.plugins.cisco.db.l2network_models
-
-CONF_FILE = "db_conn.ini"
-LOG.getLogger(const.LOGGER_COMPONENT_NAME)
-
-
-def find_config(basepath):
- for root, dirs, files in os.walk(basepath):
- if CONF_FILE in files:
- return os.path.join(root, CONF_FILE)
- return None
-
-
-def db_conf(configfile=None):
- config = ConfigParser.ConfigParser()
- if configfile == None:
- if os.path.exists(CONF_FILE):
- configfile = CONF_FILE
- else:
- configfile = \
- find_config(os.path.abspath(os.path.dirname(__file__)))
- if configfile == None:
- raise Exception("Configuration file \"%s\" doesn't exist" %
- (configfile))
- LOG.debug("Using configuration file: %s" % configfile)
- config.read(configfile)
-
- DB_NAME = config.get("DATABASE", "name")
- DB_USER = config.get("DATABASE", "user")
- DB_PASS = config.get("DATABASE", "pass")
- DB_HOST = config.get("DATABASE", "host")
- options = {"sql_connection": "mysql://%s:%s@%s/%s" % (DB_USER,
- DB_PASS, DB_HOST, DB_NAME)}
- db.configure_db(options)
-
-
-class UcsDB(object):
- def get_all_ucsmbindings(self):
- bindings = []
- try:
- for x in ucs_db.get_all_ucsmbinding():
- LOG.debug("Getting ucsm binding : %s" % x.ucsm_ip)
- bind_dict = {}
- bind_dict["ucsm-ip"] = str(x.ucsm_ip)
- bind_dict["network-id"] = str(x.network_id)
- bindings.append(bind_dict)
- except Exception, e:
- LOG.error("Failed to get all bindings: %s" % str(e))
- return bindings
-
- def get_ucsmbinding(self, ucsm_ip):
- binding = []
- try:
- for x in ucs_db.get_ucsmbinding(ucsm_ip):
- LOG.debug("Getting ucsm binding : %s" % x.ucsm_ip)
- bind_dict = {}
- bind_dict["ucsm-ip"] = str(res.ucsm_ip)
- bind_dict["network-id"] = str(res.network_id)
- binding.append(bind_dict)
- except Exception, e:
- LOG.error("Failed to get binding: %s" % str(e))
- return binding
-
- def create_ucsmbinding(self, ucsm_ip, networ_id):
- bind_dict = {}
- try:
- res = ucs_db.add_ucsmbinding(ucsm_ip, networ_id)
- LOG.debug("Created ucsm binding: %s" % res.ucsm_ip)
- bind_dict["ucsm-ip"] = str(res.ucsm_ip)
- bind_dict["network-id"] = str(res.network_id)
- return bind_dict
- except Exception, e:
- LOG.error("Failed to create ucsm binding: %s" % str(e))
-
- def delete_ucsmbinding(self, ucsm_ip):
- try:
- res = ucs_db.remove_ucsmbinding(ucsm_ip)
- LOG.debug("Deleted ucsm binding : %s" % res.ucsm_ip)
- bind_dict = {}
- bind_dict["ucsm-ip"] = str(res.ucsm_ip)
- return bind_dict
- except Exception, e:
- raise Exception("Failed to delete dynamic vnic: %s" % str(e))
-
- def update_ucsmbinding(self, ucsm_ip, network_id):
- try:
- res = ucs_db.update_ucsmbinding(ucsm_ip, network_id)
- LOG.debug("Updating ucsm binding : %s" % res.ucsm_ip)
- bind_dict = {}
- bind_dict["ucsm-ip"] = str(res.ucsm_ip)
- bind_dict["network-id"] = str(res.network_id)
- return bind_dict
- except Exception, e:
- raise Exception("Failed to update dynamic vnic: %s" % str(e))
-
- def get_all_dynamicvnics(self):
- vnics = []
- try:
- for x in ucs_db.get_all_dynamicvnics():
- LOG.debug("Getting dynamic vnic : %s" % x.uuid)
- vnic_dict = {}
- vnic_dict["vnic-id"] = str(x.uuid)
- vnic_dict["device-name"] = x.device_name
- vnic_dict["blade-id"] = str(x.blade_id)
- vnics.append(vnic_dict)
- except Exception, e:
- LOG.error("Failed to get all dynamic vnics: %s" % str(e))
- return vnics
-
- def get_dynamicvnic(self, vnic_id):
- vnic = []
- try:
- for x in ucs_db.get_dynamicvnic(vnic_id):
- LOG.debug("Getting dynamic vnic : %s" % x.uuid)
- vnic_dict = {}
- vnic_dict["vnic-id"] = str(x.uuid)
- vnic_dict["device-name"] = x.device_name
- vnic_dict["blade-id"] = str(x.blade_id)
- vnic.append(vnic_dict)
- except Exception, e:
- LOG.error("Failed to get dynamic vnic: %s" % str(e))
- return vnic
-
- def create_dynamicvnic(self, device_name, blade_id):
- vnic_dict = {}
- try:
- res = ucs_db.add_dynamicvnic(device_name, blade_id)
- LOG.debug("Created dynamic vnic: %s" % res.uuid)
- vnic_dict["vnic-id"] = str(res.uuid)
- vnic_dict["device-name"] = res.device_name
- vnic_dict["blade-id"] = str(res.blade_id)
- return vnic_dict
- except Exception, e:
- LOG.error("Failed to create dynamic vnic: %s" % str(e))
-
- def delete_dynamicvnic(self, vnic_id):
- try:
- res = ucs_db.remove_dynamicvnic(vnic_id)
- LOG.debug("Deleted dynamic vnic : %s" % res.uuid)
- vnic_dict = {}
- vnic_dict["vnic-id"] = str(res.uuid)
- return vnic_dict
- except Exception, e:
- raise Exception("Failed to delete dynamic vnic: %s" % str(e))
-
- def update_dynamicvnic(self, vnic_id, device_name=None, blade_id=None):
- try:
- res = ucs_db.update_dynamicvnic(vnic_id, device_name, blade_id)
- LOG.debug("Updating dynamic vnic : %s" % res.uuid)
- vnic_dict = {}
- vnic_dict["vnic-id"] = str(res.uuid)
- vnic_dict["device-name"] = res.device_name
- vnic_dict["blade-id"] = str(res.blade_id)
- return vnic_dict
- except Exception, e:
- raise Exception("Failed to update dynamic vnic: %s" % str(e))
-
- def get_all_blades(self):
- blades = []
- try:
- for x in ucs_db.get_all_blades():
- LOG.debug("Getting blade : %s" % x.uuid)
- blade_dict = {}
- blade_dict["blade-id"] = str(x.uuid)
- blade_dict["mgmt-ip"] = str(x.mgmt_ip)
- blade_dict["mac-addr"] = str(x.mac_addr)
- blade_dict["chassis-id"] = str(x.chassis_id)
- blade_dict["ucsm-ip"] = str(x.ucsm_ip)
- blades.append(blade_dict)
- except Exception, e:
- LOG.error("Failed to get all blades: %s" % str(e))
- return blades
-
- def get_blade(self, blade_id):
- blade = []
- try:
- for x in ucs_db.get_blade(blade_id):
- LOG.debug("Getting blade : %s" % x.uuid)
- blade_dict = {}
- blade_dict["blade-id"] = str(x.uuid)
- blade_dict["mgmt-ip"] = str(x.mgmt_ip)
- blade_dict["mac-addr"] = str(x.mac_addr)
- blade_dict["chassis-id"] = str(x.chassis_id)
- blade_dict["ucsm-ip"] = str(x.ucsm_ip)
- blade.append(blade_dict)
- except Exception, e:
- LOG.error("Failed to get all blades: %s" % str(e))
- return blade
-
- def create_blade(self, mgmt_ip, mac_addr, chassis_id, ucsm_ip):
- blade_dict = {}
- try:
- res = ucs_db.add_blade(mgmt_ip, mac_addr, chassis_id, ucsm_ip)
- LOG.debug("Created blade: %s" % res.uuid)
- blade_dict["blade-id"] = str(res.uuid)
- blade_dict["mgmt-ip"] = str(res.mgmt_ip)
- blade_dict["mac-addr"] = str(res.mac_addr)
- blade_dict["chassis-id"] = str(res.chassis_id)
- blade_dict["ucsm-ip"] = str(res.ucsm_ip)
- return blade_dict
- except Exception, e:
- LOG.error("Failed to create blade: %s" % str(e))
-
- def delete_blade(self, blade_id):
- try:
- res = ucs_db.remove_blade(blade_id)
- LOG.debug("Deleted blade : %s" % res.uuid)
- blade_dict = {}
- blade_dict["blade-id"] = str(res.uuid)
- return blade_dict
- except Exception, e:
- raise Exception("Failed to delete blade: %s" % str(e))
-
- def update_blade(self, blade_id, mgmt_ip=None, mac_addr=None,\
- chassis_id=None, ucsm_ip=None):
- try:
- res = ucs_db.update_blade(blade_id, mgmt_ip, mac_addr, \
- chassis_id, ucsm_ip)
- LOG.debug("Updating blade : %s" % res.uuid)
- blade_dict = {}
- blade_dict["blade-id"] = str(res.uuid)
- blade_dict["mgmt-ip"] = str(res.mgmt_ip)
- blade_dict["mac-addr"] = str(res.mac_addr)
- blade_dict["chassis-id"] = str(res.chassis_id)
- blade_dict["ucsm-ip"] = str(res.ucsm_ip)
- return blade_dict
- except Exception, e:
- raise Exception("Failed to update blade: %s" % str(e))
-
- def get_all_port_bindings(self):
- port_bindings = []
- try:
- for x in ucs_db.get_all_portbindings():
- LOG.debug("Getting port binding for port: %s" % x.port_id)
- port_bind_dict = {}
- port_bind_dict["port-id"] = x.port_id
- port_bind_dict["dynamic-vnic-id"] = str(x.dynamic_vnic_id)
- port_bind_dict["portprofile-name"] = x.portprofile_name
- port_bind_dict["vlan-name"] = x.vlan_name
- port_bind_dict["vlan-id"] = str(x.vlan_id)
- port_bind_dict["qos"] = x.qos
- port_bindings.append(port_bind_dict)
- except Exception, e:
- LOG.error("Failed to get all port bindings: %s" % str(e))
- return port_bindings
-
- def get_port_binding(self):
- port_binding = []
- try:
- for x in ucs_db.get_portbinding(port_id):
- LOG.debug("Getting port binding for port: %s" % x.port_id)
- port_bind_dict = {}
- port_bind_dict["port-id"] = x.port_id
- port_bind_dict["dynamic-vnic-id"] = str(x.dynamic_vnic_id)
- port_bind_dict["portprofile-name"] = x.portprofile_name
- port_bind_dict["vlan-name"] = x.vlan_name
- port_bind_dict["vlan-id"] = str(x.vlan_id)
- port_bind_dict["qos"] = x.qos
- port_bindings.append(port_bind_dict)
- except Exception, e:
- LOG.error("Failed to get port binding: %s" % str(e))
- return port_binding
-
- def create_port_binding(self, port_id, dynamic_vnic_id, portprofile_name, \
- vlan_name, vlan_id, qos):
- port_bind_dict = {}
- try:
- res = ucs_db.add_portbinding(port_id, dynamic_vnic_id, \
- portprofile_name, vlan_name, vlan_id, qos)
- LOG.debug("Created port binding: %s" % res.port_id)
- port_bind_dict["port-id"] = res.port_id
- port_bind_dict["dynamic-vnic-id"] = str(res.dynamic_vnic_id)
- port_bind_dict["portprofile-name"] = res.portprofile_name
- port_bind_dict["vlan-name"] = res.vlan_name
- port_bind_dict["vlan-id"] = str(res.vlan_id)
- port_bind_dict["qos"] = res.qos
- return port_bind_dict
- except Exception, e:
- LOG.error("Failed to create port binding: %s" % str(e))
-
- def delete_port_binding(self, port_id):
- try:
- res = ucs_db.remove_portbinding(port_id)
- LOG.debug("Deleted port binding : %s" % res.port_id)
- port_bind_dict = {}
- port_bind_dict["port-id"] = res.port_id
- return port_bind_dict
- except Exception, e:
- raise Exception("Failed to delete port profile: %s" % str(e))
-
- def update_port_binding(self, port_id, dynamic_vnic_id, \
- portprofile_name, vlan_name, vlan_id, qos):
- try:
- res = ucs_db.update_portbinding(port_id, dynamic_vnic_id, \
- portprofile_name, vlan_name, vlan_id, qos)
- LOG.debug("Updating port binding: %s" % res.port_id)
- port_bind_dict = {}
- port_bind_dict["port-id"] = res.port_id
- port_bind_dict["dynamic-vnic-id"] = str(res.dynamic_vnic_id)
- port_bind_dict["portprofile-name"] = res.portprofile_name
- port_bind_dict["vlan-name"] = res.vlan_name
- port_bind_dict["vlan-id"] = str(res.vlan_id)
- port_bind_dict["qos"] = res.qos
- return port_bind_dict
- except Exception, e:
- raise Exception("Failed to update portprofile binding:%s" % str(e))
-
-
-class QuantumDB(object):
- def get_all_networks(self, tenant_id):
- nets = []
- try:
- for x in db.network_list(tenant_id):
- LOG.debug("Getting network: %s" % x.uuid)
- net_dict = {}
- net_dict["tenant-id"] = x.tenant_id
- net_dict["net-id"] = str(x.uuid)
- net_dict["net-name"] = x.name
- nets.append(net_dict)
- except Exception, e:
- LOG.error("Failed to get all networks: %s" % str(e))
- return nets
-
- def get_network(self, network_id):
- net = []
- try:
- for x in db.network_get(network_id):
- LOG.debug("Getting network: %s" % x.uuid)
- net_dict = {}
- net_dict["tenant-id"] = x.tenant_id
- net_dict["net-id"] = str(x.uuid)
- net_dict["net-name"] = x.name
- nets.append(net_dict)
- except Exception, e:
- LOG.error("Failed to get network: %s" % str(e))
- return net
-
- def create_network(self, tenant_id, net_name):
- net_dict = {}
- try:
- res = db.network_create(tenant_id, net_name)
- LOG.debug("Created network: %s" % res.uuid)
- net_dict["tenant-id"] = res.tenant_id
- net_dict["net-id"] = str(res.uuid)
- net_dict["net-name"] = res.name
- return net_dict
- except Exception, e:
- LOG.error("Failed to create network: %s" % str(e))
-
- def delete_network(self, net_id):
- try:
- net = db.network_destroy(net_id)
- LOG.debug("Deleted network: %s" % net.uuid)
- net_dict = {}
- net_dict["net-id"] = str(net.uuid)
- return net_dict
- except Exception, e:
- raise Exception("Failed to delete port: %s" % str(e))
-
- def rename_network(self, tenant_id, net_id, new_name):
- try:
- net = db.network_rename(net_id, tenant_id, new_name)
- LOG.debug("Renamed network: %s" % net.uuid)
- net_dict = {}
- net_dict["net-id"] = str(net.uuid)
- net_dict["net-name"] = net.name
- return net_dict
- except Exception, e:
- raise Exception("Failed to rename network: %s" % str(e))
-
- def get_all_ports(self, net_id):
- ports = []
- try:
- for x in db.port_list(net_id):
- LOG.debug("Getting port: %s" % x.uuid)
- port_dict = {}
- port_dict["port-id"] = str(x.uuid)
- port_dict["net-id"] = str(x.network_id)
- port_dict["int-id"] = x.interface_id
- port_dict["state"] = x.state
- ports.append(port_dict)
- return ports
- except Exception, e:
- LOG.error("Failed to get all ports: %s" % str(e))
-
- def get_port(self, port_id):
- port = []
- try:
- for x in db.port_get(port_id):
- LOG.debug("Getting port: %s" % x.uuid)
- port_dict = {}
- port_dict["port-id"] = str(x.uuid)
- port_dict["net-id"] = str(x.network_id)
- port_dict["int-id"] = x.interface_id
- port_dict["state"] = x.state
- port.append(port_dict)
- return port
- except Exception, e:
- LOG.error("Failed to get port: %s" % str(e))
-
- def create_port(self, net_id):
- port_dict = {}
- try:
- port = db.port_create(net_id)
- LOG.debug("Creating port %s" % port.uuid)
- port_dict["port-id"] = str(port.uuid)
- port_dict["net-id"] = str(port.network_id)
- port_dict["int-id"] = port.interface_id
- port_dict["state"] = port.state
- return port_dict
- except Exception, e:
- LOG.error("Failed to create port: %s" % str(e))
-
- def delete_port(self, port_id):
- try:
- port = db.port_destroy(port_id)
- LOG.debug("Deleted port %s" % port.uuid)
- port_dict = {}
- port_dict["port-id"] = str(port.uuid)
- return port_dict
- except Exception, e:
- raise Exception("Failed to delete port: %s" % str(e))
-
- def update_port(self, port_id, port_state):
- try:
- port = db.port_set_state(port_id, port_state)
- LOG.debug("Updated port %s" % port.uuid)
- port_dict = {}
- port_dict["port-id"] = str(port.uuid)
- port_dict["net-id"] = str(port.network_id)
- port_dict["int-id"] = port.interface_id
- port_dict["state"] = port.state
- return port_dict
- except Exception, e:
- raise Exception("Failed to update port state: %s" % str(e))
-
-
-class L2networkDB(object):
- def get_all_vlan_bindings(self):
- vlans = []
- try:
- for x in l2network_db.get_all_vlan_bindings():
- LOG.debug("Getting vlan bindings for vlan: %s" % x.vlan_id)
- vlan_dict = {}
- vlan_dict["vlan-id"] = str(x.vlan_id)
- vlan_dict["vlan-name"] = x.vlan_name
- vlan_dict["net-id"] = str(x.network_id)
- vlans.append(vlan_dict)
- except Exception, e:
- LOG.error("Failed to get all vlan bindings: %s" % str(e))
- return vlans
-
- def get_vlan_binding(self, network_id):
- vlan = []
- try:
- for x in l2network_db.get_vlan_binding(network_id):
- LOG.debug("Getting vlan binding for vlan: %s" % x.vlan_id)
- vlan_dict = {}
- vlan_dict["vlan-id"] = str(x.vlan_id)
- vlan_dict["vlan-name"] = x.vlan_name
- vlan_dict["net-id"] = str(x.network_id)
- vlan.append(vlan_dict)
- except Exception, e:
- LOG.error("Failed to get vlan binding: %s" % str(e))
- return vlan
-
- def create_vlan_binding(self, vlan_id, vlan_name, network_id):
- vlan_dict = {}
- try:
- res = l2network_db.add_vlan_binding(vlan_id, vlan_name, network_id)
- LOG.debug("Created vlan binding for vlan: %s" % res.vlan_id)
- vlan_dict["vlan-id"] = str(res.vlan_id)
- vlan_dict["vlan-name"] = res.vlan_name
- vlan_dict["net-id"] = str(res.network_id)
- return vlan_dict
- except Exception, e:
- LOG.error("Failed to create vlan binding: %s" % str(e))
-
- def delete_vlan_binding(self, network_id):
- try:
- res = l2network_db.remove_vlan_binding(network_id)
- LOG.debug("Deleted vlan binding for vlan: %s" % res.vlan_id)
- vlan_dict = {}
- vlan_dict["vlan-id"] = str(res.vlan_id)
- return vlan_dict
- except Exception, e:
- raise Exception("Failed to delete vlan binding: %s" % str(e))
-
- def update_vlan_binding(self, network_id, vlan_id, vlan_name):
- try:
- res = l2network_db.update_vlan_binding(network_id, vlan_id, \
- vlan_name)
- LOG.debug("Updating vlan binding for vlan: %s" % res.vlan_id)
- vlan_dict = {}
- vlan_dict["vlan-id"] = str(res.vlan_id)
- vlan_dict["vlan-name"] = res.vlan_name
- vlan_dict["net-id"] = str(res.network_id)
- return vlan_dict
- except Exception, e:
- raise Exception("Failed to update vlan binding: %s" % str(e))
-
- def get_all_portprofiles(self):
- pps = []
- try:
- for x in l2network_db.get_all_portprofiles():
- LOG.debug("Getting port profile : %s" % x.uuid)
- pp_dict = {}
- pp_dict["portprofile-id"] = str(x.uuid)
- pp_dict["portprofile-name"] = x.name
- pp_dict["vlan-id"] = str(x.vlan_id)
- pp_dict["qos"] = x.qos
- pps.append(pp_dict)
- except Exception, e:
- LOG.error("Failed to get all port profiles: %s" % str(e))
- return pps
-
- def get_portprofile(self, port_id):
- pp = []
- try:
- for x in l2network_db.get_portprofile(port_id):
- LOG.debug("Getting port profile : %s" % x.uuid)
- pp_dict = {}
- pp_dict["portprofile-id"] = str(x.uuid)
- pp_dict["portprofile-name"] = x.name
- pp_dict["vlan-id"] = str(x.vlan_id)
- pp_dict["qos"] = x.qos
- pp.append(pp_dict)
- except Exception, e:
- LOG.error("Failed to get port profile: %s" % str(e))
- return pp
-
- def create_portprofile(self, name, vlan_id, qos):
- pp_dict = {}
- try:
- res = l2network_db.add_portprofile(name, vlan_id, qos)
- LOG.debug("Created port profile: %s" % res.uuid)
- pp_dict["portprofile-id"] = str(res.uuid)
- pp_dict["portprofile-name"] = res.name
- pp_dict["vlan-id"] = str(res.vlan_id)
- pp_dict["qos"] = res.qos
- return pp_dict
- except Exception, e:
- LOG.error("Failed to create port profile: %s" % str(e))
-
- def delete_portprofile(self, pp_id):
- try:
- res = l2network_db.remove_portprofile(pp_id)
- LOG.debug("Deleted port profile : %s" % res.uuid)
- pp_dict = {}
- pp_dict["pp-id"] = str(res.uuid)
- return pp_dict
- except Exception, e:
- raise Exception("Failed to delete port profile: %s" % str(e))
-
- def update_portprofile(self, pp_id, name, vlan_id, qos):
- try:
- res = l2network_db.update_portprofile(pp_id, name, vlan_id, qos)
- LOG.debug("Updating port profile : %s" % res.uuid)
- pp_dict = {}
- pp_dict["portprofile-id"] = str(res.uuid)
- pp_dict["portprofile-name"] = res.name
- pp_dict["vlan-id"] = str(res.vlan_id)
- pp_dict["qos"] = res.qos
- return pp_dict
- except Exception, e:
- raise Exception("Failed to update port profile: %s" % str(e))
-
- def get_all_pp_bindings(self):
- pp_bindings = []
- try:
- for x in l2network_db.get_all_pp_bindings():
- LOG.debug("Getting port profile binding: %s" % \
- x.portprofile_id)
- ppbinding_dict = {}
- ppbinding_dict["portprofile-id"] = str(x.portprofile_id)
- ppbinding_dict["net-id"] = str(x.network_id)
- ppbinding_dict["tenant-id"] = x.tenant_id
- ppbinding_dict["default"] = x.default
- pp_bindings.append(ppbinding_dict)
- except Exception, e:
- LOG.error("Failed to get all port profiles: %s" % str(e))
- return pp_bindings
-
- def get_pp_binding(self, pp_id):
- pp_binding = []
- try:
- for x in l2network_db.get_pp_binding(pp_id):
- LOG.debug("Getting port profile binding: %s" % \
- x.portprofile_id)
- ppbinding_dict = {}
- ppbinding_dict["portprofile-id"] = str(x.portprofile_id)
- ppbinding_dict["net-id"] = str(x.network_id)
- ppbinding_dict["tenant-id"] = x.tenant_id
- ppbinding_dict["default"] = x.default
- pp_bindings.append(ppbinding_dict)
- except Exception, e:
- LOG.error("Failed to get port profile binding: %s" % str(e))
- return pp_binding
-
- def create_pp_binding(self, tenant_id, net_id, pp_id, default):
- ppbinding_dict = {}
- try:
- res = l2network_db.add_pp_binding(tenant_id, net_id, pp_id, \
- default)
- LOG.debug("Created port profile binding: %s" % res.portprofile_id)
- ppbinding_dict["portprofile-id"] = str(res.portprofile_id)
- ppbinding_dict["net-id"] = str(res.network_id)
- ppbinding_dict["tenant-id"] = res.tenant_id
- ppbinding_dict["default"] = res.default
- return ppbinding_dict
- except Exception, e:
- LOG.error("Failed to create port profile binding: %s" % str(e))
-
- def delete_pp_binding(self, pp_id):
- try:
- res = l2network_db.remove_pp_binding(pp_id)
- LOG.debug("Deleted port profile binding : %s" % res.portprofile_id)
- ppbinding_dict = {}
- ppbinding_dict["portprofile-id"] = str(res.portprofile_id)
- return ppbinding_dict
- except Exception, e:
- raise Exception("Failed to delete port profile: %s" % str(e))
-
- def update_pp_binding(self, pp_id, tenant_id, net_id, default):
- try:
- res = l2network_db.update_pp_binding(pp_id, tenant_id, net_id,\
- default)
- LOG.debug("Updating port profile binding: %s" % res.portprofile_id)
- ppbinding_dict = {}
- ppbinding_dict["portprofile-id"] = str(res.portprofile_id)
- ppbinding_dict["net-id"] = str(res.network_id)
- ppbinding_dict["tenant-id"] = res.tenant_id
- ppbinding_dict["default"] = res.default
- return ppbinding_dict
- except Exception, e:
- raise Exception("Failed to update portprofile binding:%s" % str(e))
-
-
-class UcsDBTest(unittest.TestCase):
- def setUp(self):
- self.dbtest = UcsDB()
- LOG.debug("Setup")
-
- def testACreateUcsmBinding(self):
- binding1 = self.dbtest.create_ucsmbinding("1.2.3.4", "net1")
- self.assertTrue(binding1["ucsm-ip"] == "1.2.3.4")
- self.tearDownUcsmBinding()
-
- def testBGetAllUcsmBindings(self):
- binding1 = self.dbtest.create_ucsmbinding("1.2.3.4", "net1")
- binding2 = self.dbtest.create_ucsmbinding("2.3.4.5", "net1")
- bindings = self.dbtest.get_all_ucsmbindings()
- count = 0
- for x in bindings:
- if "net" in x["network-id"]:
- count += 1
- self.assertTrue(count == 2)
- self.tearDownUcsmBinding()
-
- def testCDeleteUcsmBinding(self):
- binding1 = self.dbtest.create_ucsmbinding("1.2.3.4", "net1")
- self.dbtest.delete_ucsmbinding(binding1["ucsm-ip"])
- bindings = self.dbtest.get_all_ucsmbindings()
- count = 0
- for x in bindings:
- if "net " in x["network-id"]:
- count += 1
- self.assertTrue(count == 0)
- self.tearDownUcsmBinding()
-
- def testDUpdateUcsmBinding(self):
- binding1 = self.dbtest.create_ucsmbinding("1.2.3.4", "net1")
- binding1 = self.dbtest.update_ucsmbinding(binding1["ucsm-ip"], \
- "newnet1")
- bindings = self.dbtest.get_all_ucsmbindings()
- count = 0
- for x in bindings:
- if "new" in x["network-id"]:
- count += 1
- self.assertTrue(count == 1)
- self.tearDownUcsmBinding()
-
- def testECreateDynamicVnic(self):
- blade1 = self.dbtest.create_blade("1.2.3.4", "abcd", "chassis1", \
- "9.8.7.6")
- vnic1 = self.dbtest.create_dynamicvnic("eth1", blade1["blade-id"])
- self.assertTrue(vnic1["device-name"] == "eth1")
- self.tearDownDyanmicVnic()
-
- def testFGetAllDyanmicVnics(self):
- blade1 = self.dbtest.create_blade("1.2.3.4", "abcd", "chassis1", \
- "9.8.7.6")
- vnic1 = self.dbtest.create_dynamicvnic("eth1", blade1["blade-id"])
- vnic2 = self.dbtest.create_dynamicvnic("eth2", blade1["blade-id"])
- vnics = self.dbtest.get_all_dynamicvnics()
- count = 0
- for x in vnics:
- if "eth" in x["device-name"]:
- count += 1
- self.assertTrue(count == 2)
- self.tearDownDyanmicVnic()
-
- def testGDeleteDyanmicVnic(self):
- blade1 = self.dbtest.create_blade("1.2.3.4", "abcd", "chassis1", \
- "9.8.7.6")
- vnic1 = self.dbtest.create_dynamicvnic("eth1", blade1["blade-id"])
- self.dbtest.delete_dynamicvnic(vnic1["vnic-id"])
- vnics = self.dbtest.get_all_dynamicvnics()
- count = 0
- for x in vnics:
- if "eth " in x["device-name"]:
- count += 1
- self.assertTrue(count == 0)
- self.tearDownDyanmicVnic()
-
- def testHUpdateDynamicVnic(self):
- blade1 = self.dbtest.create_blade("1.2.3.4", "abcd", "chassis1", \
- "9.8.7.6")
- vnic1 = self.dbtest.create_dynamicvnic("eth1", blade1["blade-id"])
- vnic1 = self.dbtest.update_dynamicvnic(vnic1["vnic-id"], "neweth1", \
- "newblade2")
- vnics = self.dbtest.get_all_dynamicvnics()
- count = 0
- for x in vnics:
- if "new" in x["device-name"]:
- count += 1
- self.assertTrue(count == 1)
- self.tearDownDyanmicVnic()
-
- def testICreateUcsBlade(self):
- blade1 = self.dbtest.create_blade("1.2.3.4", "abcd", "chassis1", \
- "9.8.7.6")
- self.assertTrue(blade1["mgmt-ip"] == "1.2.3.4")
- self.tearDownUcsBlade()
-
- def testJGetAllUcsBlade(self):
- blade1 = self.dbtest.create_blade("1.2.3.4", "abcd", "chassis1", \
- "9.8.7.6")
- blade2 = self.dbtest.create_blade("2.3.4.5", "efgh", "chassis1", \
- "9.8.7.6")
- blades = self.dbtest.get_all_blades()
- count = 0
- for x in blades:
- if "chassis" in x["chassis-id"]:
- count += 1
- self.assertTrue(count == 2)
- self.tearDownUcsBlade()
-
- def testKDeleteUcsBlade(self):
- blade1 = self.dbtest.create_blade("1.2.3.4", "abcd", "chassis1", \
- "9.8.7.6")
- self.dbtest.delete_blade(blade1["blade-id"])
- blades = self.dbtest.get_all_blades()
- count = 0
- for x in blades:
- if "chassis " in x["chassis-id"]:
- count += 1
- self.assertTrue(count == 0)
- self.tearDownUcsBlade()
-
- def testLUpdateUcsBlade(self):
- blade1 = self.dbtest.create_blade("1.2.3.4", "abcd", "chassis1", \
- "9.8.7.6")
- blade2 = self.dbtest.update_blade(blade1["blade-id"], "2.3.4.5", \
- "newabcd", "chassis1", "9.8.7.6")
- blades = self.dbtest.get_all_blades()
- count = 0
- for x in blades:
- if "new" in x["mac-addr"]:
- count += 1
- self.assertTrue(count == 1)
- self.tearDownUcsBlade()
-
- def testMCreatePortBinding(self):
- port_bind1 = self.dbtest.create_port_binding("port1", "dv1", "pp1", \
- "vlan1", 10, "qos1")
- self.assertTrue(port_bind1["port-id"] == "port1")
- self.tearDownPortBinding()
-
- def testNGetAllPortBinding(self):
- port_bind1 = self.dbtest.create_port_binding("port1", "dv1", "pp1", \
- "vlan1", 10, "qos1")
- port_bind2 = self.dbtest.create_port_binding("port2", "dv2", "pp2", \
- "vlan2", 20, "qos2")
- port_bindings = self.dbtest.get_all_port_bindings()
- count = 0
- for x in port_bindings:
- if "port" in x["port-id"]:
- count += 1
- self.assertTrue(count == 2)
- self.tearDownPortBinding()
-
- def testODeletePortBinding(self):
- port_bind1 = self.dbtest.create_port_binding("port1", "dv1", "pp1", \
- "vlan1", 10, "qos1")
- self.dbtest.delete_port_binding("port1")
- port_bindings = self.dbtest.get_all_port_bindings()
- count = 0
- for x in port_bindings:
- if "port " in x["port-id"]:
- count += 1
- self.assertTrue(count == 0)
- self.tearDownPortBinding()
-
- def testPUpdatePortBinding(self):
- port_bind1 = self.dbtest.create_port_binding("port1", "dv1", "pp1", \
- "vlan1", 10, "qos1")
- port_bind1 = self.dbtest.update_port_binding("port1", "newdv1", \
- "newpp1", "newvlan1", 11, "newqos1")
- port_bindings = self.dbtest.get_all_port_bindings()
- count = 0
- for x in port_bindings:
- if "new" in x["dynamic-vnic-id"]:
- count += 1
- self.assertTrue(count == 1)
- self.tearDownPortBinding()
-
- def tearDownUcsmBinding(self):
- print "Tearing Down Ucsm Bindings"
- binds = self.dbtest.get_all_ucsmbindings()
- for bind in binds:
- ip = bind["ucsm-ip"]
- self.dbtest.delete_ucsmbinding(ip)
-
- def tearDownDyanmicVnic(self):
- print "Tearing Down Dynamic Vnics"
- vnics = self.dbtest.get_all_dynamicvnics()
- for vnic in vnics:
- id = vnic["vnic-id"]
- self.dbtest.delete_dynamicvnic(id)
- self.tearDownUcsBlade()
-
- def tearDownUcsBlade(self):
- print "Tearing Down Blades"
- blades = self.dbtest.get_all_blades()
- for blade in blades:
- id = blade["blade-id"]
- self.dbtest.delete_blade(id)
-
- def tearDownPortBinding(self):
- print "Tearing Down Port Binding"
- port_bindings = self.dbtest.get_all_port_bindings()
- for port_binding in port_bindings:
- id = port_binding["port-id"]
- self.dbtest.delete_port_binding(id)
-
-
-class L2networkDBTest(unittest.TestCase):
- def setUp(self):
- self.dbtest = L2networkDB()
- LOG.debug("Setup")
-
- def testACreateVlanBinding(self):
- vlan1 = self.dbtest.create_vlan_binding(10, "vlan1", "netid1")
- self.assertTrue(vlan1["vlan-id"] == "10")
- self.tearDownVlanBinding()
-
- def testBGetAllVlanBindings(self):
- vlan1 = self.dbtest.create_vlan_binding(10, "vlan1", "netid1")
- vlan2 = self.dbtest.create_vlan_binding(20, "vlan2", "netid2")
- vlans = self.dbtest.get_all_vlan_bindings()
- count = 0
- for x in vlans:
- if "netid" in x["net-id"]:
- count += 1
- self.assertTrue(count == 2)
- self.tearDownVlanBinding()
-
- def testCDeleteVlanBinding(self):
- vlan1 = self.dbtest.create_vlan_binding(10, "vlan1", "netid1")
- self.dbtest.delete_vlan_binding("netid1")
- vlans = self.dbtest.get_all_vlan_bindings()
- count = 0
- for x in vlans:
- if "netid " in x["net-id"]:
- count += 1
- self.assertTrue(count == 0)
- self.tearDownVlanBinding()
-
- def testDUpdateVlanBinding(self):
- vlan1 = self.dbtest.create_vlan_binding(10, "vlan1", "netid1")
- vlan1 = self.dbtest.update_vlan_binding("netid1", 11, "newvlan1")
- vlans = self.dbtest.get_all_vlan_bindings()
- count = 0
- for x in vlans:
- if "new" in x["vlan-name"]:
- count += 1
- self.assertTrue(count == 1)
- self.tearDownVlanBinding()
-
- def testICreatePortProfile(self):
- pp1 = self.dbtest.create_portprofile("portprofile1", 10, "qos1")
- self.assertTrue(pp1["portprofile-name"] == "portprofile1")
- self.tearDownPortProfile()
-
- def testJGetAllPortProfile(self):
- pp1 = self.dbtest.create_portprofile("portprofile1", 10, "qos1")
- pp2 = self.dbtest.create_portprofile("portprofile2", 20, "qos2")
- pps = self.dbtest.get_all_portprofiles()
- count = 0
- for x in pps:
- if "portprofile" in x["portprofile-name"]:
- count += 1
- self.assertTrue(count == 2)
- self.tearDownPortProfile()
-
- def testKDeletePortProfile(self):
- pp1 = self.dbtest.create_portprofile("portprofile1", 10, "qos1")
- self.dbtest.delete_portprofile(pp1["portprofile-id"])
- pps = self.dbtest.get_all_portprofiles()
- count = 0
- for x in pps:
- if "portprofile " in x["portprofile-name"]:
- count += 1
- self.assertTrue(count == 0)
- self.tearDownPortProfile()
-
- def testLUpdatePortProfile(self):
- pp1 = self.dbtest.create_portprofile("portprofile1", 10, "qos1")
- pp1 = self.dbtest.update_portprofile(pp1["portprofile-id"], \
- "newportprofile1", 20, "qos2")
- pps = self.dbtest.get_all_portprofiles()
- count = 0
- for x in pps:
- if "new" in x["portprofile-name"]:
- count += 1
- self.assertTrue(count == 1)
- self.tearDownPortProfile()
-
- def testMCreatePortProfileBinding(self):
- pp_binding1 = self.dbtest.create_pp_binding("t1", "net1", \
- "portprofile1", "0")
- self.assertTrue(pp_binding1["portprofile-id"] == "portprofile1")
- self.tearDownPortProfileBinding()
-
- def testNGetAllPortProfileBinding(self):
- pp_binding1 = self.dbtest.create_pp_binding("t1", "net1", \
- "portprofile1", "0")
- pp_binding2 = self.dbtest.create_pp_binding("t2", "net2", \
- "portprofile2", "0")
- pp_bindings = self.dbtest.get_all_pp_bindings()
- count = 0
- for x in pp_bindings:
- if "portprofile" in x["portprofile-id"]:
- count += 1
- self.assertTrue(count == 2)
- self.tearDownPortProfileBinding()
-
- def testODeletePortProfileBinding(self):
- pp_binding1 = self.dbtest.create_pp_binding("t1", "net1", \
- "portprofile1", "0")
- self.dbtest.delete_pp_binding(pp_binding1["portprofile-id"])
- pp_bindings = self.dbtest.get_all_pp_bindings()
- count = 0
- for x in pp_bindings:
- if "portprofile " in x["portprofile-id"]:
- count += 1
- self.assertTrue(count == 0)
- self.tearDownPortProfileBinding()
-
- def testPUpdatePortProfileBinding(self):
- pp_binding1 = self.dbtest.create_pp_binding("t1", "net1", \
- "portprofile1", "0")
- pp_binding1 = self.dbtest.update_pp_binding("portprofile1", \
- "newt1", "newnet1", "1")
- pp_bindings = self.dbtest.get_all_pp_bindings()
- count = 0
- for x in pp_bindings:
- if "new" in x["net-id"]:
- count += 1
- self.assertTrue(count == 1)
- self.tearDownPortProfileBinding()
-
- def tearDownVlanBinding(self):
- print "Tearing Down Vlan Binding"
- vlans = self.dbtest.get_all_vlan_bindings()
- for vlan in vlans:
- id = vlan["net-id"]
- self.dbtest.delete_vlan_binding(id)
-
- def tearDownPortProfile(self):
- print "Tearing Down Port Profile"
- pps = self.dbtest.get_all_portprofiles()
- for pp in pps:
- id = pp["portprofile-id"]
- self.dbtest.delete_portprofile(id)
-
- def tearDownPortProfileBinding(self):
- print "Tearing Down Port Profile Binding"
- pp_bindings = self.dbtest.get_all_pp_bindings()
- for pp_binding in pp_bindings:
- id = pp_binding["portprofile-id"]
- self.dbtest.delete_pp_binding(id)
-
-if __name__ == "__main__":
- usagestr = "Usage: %prog [OPTIONS] <command> [args]"
- parser = OptionParser(usage=usagestr)
- parser.add_option("-v", "--verbose", dest="verbose",
- action="store_true", default=False, help="turn on verbose logging")
-
- options, args = parser.parse_args()
-
- if options.verbose:
- LOG.basicConfig(level=LOG.DEBUG)
- else:
- LOG.basicConfig(level=LOG.WARN)
-
- #load the models and db based on the 2nd level plugin argument
- if args[0] == "ucs":
- ucs_db = __import__("quantum.plugins.cisco.db.ucs_db", \
- fromlist=["ucs_db"])
- ucs_model = __import__("quantum.plugins.cisco.db.ucs_models", \
- fromlist=["ucs_models"])
-
- db_conf()
-
- # Run the tests
- suite = unittest.TestLoader().loadTestsFromTestCase(L2networkDBTest)
- unittest.TextTestRunner(verbosity=2).run(suite)
- suite = unittest.TestLoader().loadTestsFromTestCase(UcsDBTest)
- unittest.TextTestRunner(verbosity=2).run(suite)
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2011, Cisco Systems, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-# @author: Rohit Agarwalla, Cisco Systems, Inc.
-
-from sqlalchemy.orm import exc
-
-import l2network_models
-import quantum.db.api as db
-import quantum.db.models as models
-
-
-def get_all_vlan_bindings():
- """Lists all the vlan to network associations"""
- session = db.get_session()
- try:
- bindings = session.query(l2network_models.VlanBinding).\
- all()
- return bindings
- except exc.NoResultFound:
- return []
-
-
-def get_vlan_binding(netid):
- """Lists the vlan given a network_id"""
- session = db.get_session()
- try:
- binding = session.query(l2network_models.VlanBinding).\
- filter_by(network_id=netid).\
- one()
- return binding
- except exc.NoResultFound:
- raise Exception("No network found with net-id = %s" % network_id)
-
-
-def add_vlan_binding(vlanid, vlanname, netid):
- """Adds a vlan to network association"""
- session = db.get_session()
- try:
- binding = session.query(l2network_models.VlanBinding).\
- filter_by(vlan_id=vlanid).\
- one()
- raise Exception("Vlan with id \"%s\" already exists" % vlanid)
- except exc.NoResultFound:
- binding = l2network_models.VlanBinding(vlanid, vlanname, netid)
- session.add(binding)
- session.flush()
- return binding
-
-
-def remove_vlan_binding(netid):
- """Removes a vlan to network association"""
- session = db.get_session()
- try:
- binding = session.query(l2network_models.VlanBinding).\
- filter_by(network_id=netid).\
- one()
- session.delete(binding)
- session.flush()
- return binding
- except exc.NoResultFound:
- pass
-
-
-def update_vlan_binding(netid, newvlanid=None, newvlanname=None):
- """Updates a vlan to network association"""
- session = db.get_session()
- try:
- binding = session.query(l2network_models.VlanBinding).\
- filter_by(network_id=netid).\
- one()
- if newvlanid:
- binding.vlan_id = newvlanid
- if newvlanname:
- binding.vlan_name = newvlanname
- session.merge(binding)
- session.flush()
- return binding
- except exc.NoResultFound:
- raise Exception("No vlan binding found with network_id = %s" % netid)
-
-
-def get_all_portprofiles():
- """Lists all the port profiles"""
- session = db.get_session()
- try:
- pps = session.query(l2network_models.PortProfile).\
- all()
- return pps
- except exc.NoResultFound:
- return []
-
-
-def get_portprofile(ppid):
- """Lists a port profile"""
- session = db.get_session()
- try:
- pp = session.query(l2network_models.PortProfile).\
- filter_by(uuid=ppid).\
- one()
- return pp
- except exc.NoResultFound:
- raise Exception("No portprofile found with id = %s" % ppid)
-
-
-def add_portprofile(ppname, vlanid, qos):
- """Adds a port profile"""
- session = db.get_session()
- try:
- pp = session.query(l2network_models.PortProfile).\
- filter_by(name=ppname).\
- one()
- raise Exception("Port profile with name %s already exists" % ppname)
- except exc.NoResultFound:
- pp = l2network_models.PortProfile(ppname, vlanid, qos)
- session.add(pp)
- session.flush()
- return pp
-
-
-def remove_portprofile(ppid):
- """Removes a port profile"""
- session = db.get_session()
- try:
- pp = session.query(l2network_models.PortProfile).\
- filter_by(uuid=ppid).\
- one()
- session.delete(pp)
- session.flush()
- return pp
- except exc.NoResultFound:
- pass
-
-
-def update_portprofile(ppid, newppname=None, newvlanid=None, newqos=None):
- """Updates port profile"""
- session = db.get_session()
- try:
- pp = session.query(l2network_models.PortProfile).\
- filter_by(uuid=ppid).\
- one()
- if newppname:
- pp.name = newppname
- if newvlanid:
- pp.vlan_id = newvlanid
- if newqos:
- pp.qos = newqos
- session.merge(pp)
- session.flush()
- return pp
- except exc.NoResultFound:
- raise Exception("No port profile with id = %s" % ppid)
-
-
-def get_all_pp_bindings():
- """Lists all the port profiles"""
- session = db.get_session()
- try:
- bindings = session.query(l2network_models.PortProfileBinding).\
- all()
- return bindings
- except exc.NoResultFound:
- return []
-
-
-def get_pp_binding(ppid):
- """Lists a port profile binding"""
- session = db.get_session()
- try:
- binding = session.query(l2network_models.PortProfileBinding).\
- filter_by(portprofile_id=ppid).\
- one()
- return binding
- except exc.NoResultFound:
- raise Exception("No portprofile binding found with id = %s" % ppid)
-
-
-def add_pp_binding(tenantid, networkid, ppid, default):
- """Adds a port profile binding"""
- session = db.get_session()
- try:
- binding = session.query(l2network_models.PortProfileBinding).\
- filter_by(portprofile_id=ppid).\
- one()
- raise Exception("Port profile binding with id \"%s\" already \
- exists" % ppid)
- except exc.NoResultFound:
- binding = l2network_models.PortProfileBinding(tenantid, networkid, \
- ppid, default)
- session.add(binding)
- session.flush()
- return binding
-
-
-def remove_pp_binding(ppid):
- """Removes a port profile binding"""
- session = db.get_session()
- try:
- binding = session.query(l2network_models.PortProfileBinding).\
- filter_by(portprofile_id=ppid).\
- one()
- session.delete(binding)
- session.flush()
- return binding
- except exc.NoResultFound:
- pass
-
-
-def update_pp_binding(ppid, newtenantid=None, newnetworkid=None, \
- newdefault=None):
- """Updates port profile binding"""
- session = db.get_session()
- try:
- binding = session.query(l2network_models.PortProfileBinding).\
- filter_by(portprofile_id=ppid).\
- one()
- if newtenantid:
- binding.tenant_id = newtenantid
- if newnetworkid:
- binding.network_id = newnetworkid
- if newdefault:
- binding.default = newdefault
- session.merge(binding)
- session.flush()
- return binding
- except exc.NoResultFound:
- raise Exception("No port profile binding with id = %s" % ppid)
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2011, Cisco Systems, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-# @author: Rohit Agarwalla, Cisco Systems, Inc.
-
-import uuid
-
-from sqlalchemy import Column, Integer, String, ForeignKey, Boolean
-from sqlalchemy.orm import relation
-
-from quantum.db.models import BASE
-
-
-class VlanBinding(BASE):
- """Represents a binding of vlan_id to network_id"""
- __tablename__ = 'vlan_bindings'
-
- vlan_id = Column(Integer, primary_key=True)
- vlan_name = Column(String(255))
- network_id = Column(String(255), nullable=False)
- #foreign key to networks.uuid
-
- def __init__(self, vlan_id, vlan_name, network_id):
- self.vlan_id = vlan_id
- self.vlan_name = vlan_name
- self.network_id = network_id
-
- def __repr__(self):
- return "<VlanBinding(%d,%s,%s)>" % \
- (self.vlan_id, self.vlan_name, self.network_id)
-
-
-class PortProfile(BASE):
- """Represents Cisco plugin level PortProfile for a network"""
- __tablename__ = 'portprofiles'
-
- uuid = Column(String(255), primary_key=True)
- name = Column(String(255))
- vlan_id = Column(Integer)
- qos = Column(String(255))
-
- def __init__(self, name, vlan_id, qos=None):
- self.uuid = uuid.uuid4()
- self.name = name
- self.vlan_id = vlan_id
- self.qos = qos
-
- def __repr__(self):
- return "<PortProfile(%s,%s,%d,%s)>" % \
- (self.uuid, self.name, self.vlan_id, self.qos)
-
-
-class PortProfileBinding(BASE):
- """Represents PortProfile binding to tenant and network"""
- __tablename__ = 'portprofile_bindings'
-
- id = Column(Integer, primary_key=True, autoincrement=True)
- tenant_id = Column(String(255))
-
- network_id = Column(String(255), nullable=False)
- #foreign key to networks.uuid
- portprofile_id = Column(String(255), nullable=False)
- #foreign key to portprofiles.uuid
- default = Column(Boolean)
-
- def __init__(self, tenant_id, network_id, portprofile_id, default):
- self.tenant_id = tenant_id
- self.network_id = network_id
- self.portprofile_id = portprofile_id
- self.default = default
-
- def __repr__(self):
- return "<PortProfile Binding(%s,%s,%s,%s)>" % \
- (self.tenant_id, self.network_id, self.portprofile_id, self.default)
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2011, Cisco Systems, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-# @author: Rohit Agarwalla, Cisco Systems, Inc.
-
-from sqlalchemy.orm import exc
-
-import quantum.db.api as db
-import ucs_models
-
-
-def get_all_ucsmbinding():
- """Lists all the ucsm bindings"""
- session = db.get_session()
- try:
- bindings = session.query(ucs_models.UcsmBinding).\
- all()
- return bindings
- except exc.NoResultFound:
- return []
-
-
-def get_ucsmbinding(ucsm_ip):
- """Lists a ucsm binding"""
- session = db.get_session()
- try:
- binding = session.query(ucs_models.UcsmBinding).\
- filter_by(ucsm_ip=ucsm_ip).\
- one()
- return binding
- except exc.NoResultFound:
- raise Exception("No binding found with ip = %s" % ucsm_ip)
-
-
-def add_ucsmbinding(ucsm_ip, network_id):
- """Adds a ucsm binding"""
- session = db.get_session()
- try:
- ip = session.query(ucs_models.UcsmBinding).\
- filter_by(ucsm_ip=ucsm_ip).\
- one()
- raise Exception("Binding with ucsm ip \"%s\" already exists" % ucsm_ip)
- except exc.NoResultFound:
- binding = ucs_models.UcsmBinding(ucsm_ip, network_id)
- session.add(binding)
- session.flush()
- return binding
-
-
-def remove_ucsmbinding(ucsm_ip):
- """Removes a ucsm binding"""
- session = db.get_session()
- try:
- binding = session.query(ucs_models.UcsmBinding).\
- filter_by(ucsm_ip=ucsm_ip).\
- one()
- session.delete(binding)
- session.flush()
- return binding
- except exc.NoResultFound:
- pass
-
-
-def update_ucsmbinding(ucsm_ip, new_network_id):
- """Updates ucsm binding"""
- session = db.get_session()
- try:
- binding = session.query(ucs_models.UcsmBinding).\
- filter_by(ucsm_ip=ucsm_ip).\
- one()
- if new_network_id:
- binding.network_id = new_network_id
- session.merge(binding)
- session.flush()
- return binding
- except exc.NoResultFound:
- raise Exception("No binding with ip = %s" % ucsm_ip)
-
-
-def get_all_dynamicvnics():
- """Lists all the dynamic vnics"""
- session = db.get_session()
- try:
- vnics = session.query(ucs_models.DynamicVnic).\
- all()
- return vnics
- except exc.NoResultFound:
- return []
-
-
-def get_dynamicvnic(vnic_id):
- """Lists a dynamic vnic"""
- session = db.get_session()
- try:
- vnic = session.query(ucs_models.DynamicVnic).\
- filter_by(uuid=vnic_id).\
- one()
- return vnic
- except exc.NoResultFound:
- raise Exception("No dynamic vnic found with id = %s" % vnic_id)
-
-
-def add_dynamicvnic(device_name, blade_id):
- """Adds a dynamic vnic"""
- session = db.get_session()
- try:
- name = session.query(ucs_models.DynamicVnic).\
- filter_by(device_name=device_name).\
- one()
- raise Exception("Dynamic vnic with device name %s already exists" % \
- device_name)
- except exc.NoResultFound:
- vnic = ucs_models.DynamicVnic(device_name, blade_id)
- session.add(vnic)
- session.flush()
- return vnic
-
-
-def remove_dynamicvnic(vnic_id):
- """Removes a dynamic vnic"""
- session = db.get_session()
- try:
- vnic = session.query(ucs_models.DynamicVnic).\
- filter_by(uuid=vnic_id).\
- one()
- session.delete(vnic)
- session.flush()
- return vnic
- except exc.NoResultFound:
- pass
-
-
-def update_dynamicvnic(vnic_id, new_device_name=None, new_blade_id=None):
- """Updates dynamic vnic"""
- session = db.get_session()
- try:
- vnic = session.query(ucs_models.DynamicVnic).\
- filter_by(uuid=vnic_id).\
- one()
- if new_device_name:
- vnic.device_name = new_device_name
- if new_blade_id:
- vnic.blade_id = new_blade_id
- session.merge(vnic)
- session.flush()
- return vnic
- except exc.NoResultFound:
- raise Exception("No dynamic vnic with id = %s" % vnic_id)
-
-
-def get_all_blades():
- """Lists all the blades details"""
- session = db.get_session()
- try:
- blades = session.query(ucs_models.UcsBlade).\
- all()
- return blades
- except exc.NoResultFound:
- return []
-
-
-def get_blade(blade_id):
- """Lists a blade details"""
- session = db.get_session()
- try:
- blade = session.query(ucs_models.UcsBlade).\
- filter_by(uuid=blade_id).\
- one()
- return blade
- except exc.NoResultFound:
- raise Exception("No blade found with id = %s" % blade_id)
-
-
-def add_blade(mgmt_ip, mac_addr, chassis_id, ucsm_ip):
- """Adds a blade"""
- session = db.get_session()
- try:
- ip = session.query(ucs_models.UcsBlade).\
- filter_by(mgmt_ip=mgmt_ip).\
- one()
- raise Exception("Blade with ip \"%s\" already exists" % mgmt_ip)
- except exc.NoResultFound:
- blade = ucs_models.UcsBlade(mgmt_ip, mac_addr, chassis_id, ucsm_ip)
- session.add(blade)
- session.flush()
- return blade
-
-
-def remove_blade(blade_id):
- """Removes a blade"""
- session = db.get_session()
- try:
- blade = session.query(ucs_models.UcsBlade).\
- filter_by(uuid=blade_id).\
- one()
- session.delete(blade)
- session.flush()
- return blade
- except exc.NoResultFound:
- pass
-
-
-def update_blade(blade_id, new_mgmt_ip=None, new_mac_addr=None, \
- new_chassis_id=None, new_ucsm_ip=None):
- """Updates details of a blade"""
- session = db.get_session()
- try:
- blade = session.query(ucs_models.UcsBlade).\
- filter_by(uuid=blade_id).\
- one()
- if new_mgmt_ip:
- blade.mgmt_ip = new_mgmt_ip
- if new_mac_addr:
- blade.mac_addr = new_mac_addr
- if new_chassis_id:
- blade.chassis_id = new_chassis_id
- if new_ucsm_ip:
- blade.ucsm_ip = new_ucsm_ip
- session.merge(blade)
- session.flush()
- return blade
- except exc.NoResultFound:
- raise Exception("No blade with id = %s" % blade_id)
-
-
-def get_all_portbindings():
- """Lists all the port bindings"""
- session = db.get_session()
- try:
- port_bindings = session.query(ucs_models.PortBinding).\
- all()
- return port_bindings
- except exc.NoResultFound:
- return []
-
-
-def get_portbinding(port_id):
- """Lists a port binding"""
- session = db.get_session()
- try:
- port_binding = session.query(ucs_models.PortBinding).\
- filter_by(port_id=port_id).\
- one()
- return port_binding
- except exc.NoResultFound:
- raise Exception("No port binding found with port id = %s" % port_id)
-
-
-def add_portbinding(port_id, dynamic_vnic_id, portprofile_name, \
- vlan_name, vlan_id, qos):
- """Adds a port binding"""
- session = db.get_session()
- try:
- port_binding = session.query(ucs_models.PortBinding).\
- filter_by(port_id=port_id).\
- one()
- raise Exception("Port Binding with portid %s already exists" % port_id)
- except exc.NoResultFound:
- port_binding = ucs_models.PortBinding(port_id, dynamic_vnic_id, \
- portprofile_name, vlan_name, vlan_id, qos)
- session.add(port_binding)
- session.flush()
- return port_binding
-
-
-def remove_portbinding(port_id):
- """Removes a port binding"""
- session = db.get_session()
- try:
- port_binding = session.query(ucs_models.PortBinding).\
- filter_by(port_id=port_id).\
- one()
- session.delete(port_binding)
- session.flush()
- return port_binding
- except exc.NoResultFound:
- pass
-
-
-def update_portbinding(port_id, dynamic_vnic_id=None, portprofile_name=None, \
- vlan_name=None, vlan_id=None, qos=None):
- """Updates port binding"""
- session = db.get_session()
- try:
- port_binding = session.query(ucs_models.PortBinding).\
- filter_by(port_id=port_id).\
- one()
- if dynamic_vnic_id:
- port_binding.dynamic_vnic_id = dynamic_vnic_id
- if portprofile_name:
- port_binding.portprofile_name = portprofile_name
- if vlan_name:
- port_binding.vlan_name = vlan_name
- if vlan_name:
- port_binding.vlan_id = vlan_id
- if qos:
- port_binding.qos = qos
- session.merge(port_binding)
- session.flush()
- return port_binding
- except exc.NoResultFound:
- raise Exception("No port binding with port id = %s" % port_id)
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2011, Cisco Systems, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-# @author: Rohit Agarwalla, Cisco Systems, Inc.
-
-import uuid
-
-from sqlalchemy import Column, Integer, String, ForeignKey, Boolean
-from sqlalchemy.orm import relation
-
-from quantum.db.models import BASE
-
-
-class UcsmBinding(BASE):
- """Represents a binding of ucsm to network_id"""
- __tablename__ = 'ucsm_bindings'
-
- id = Column(Integer, primary_key=True, autoincrement=True)
- ucsm_ip = Column(String(255))
- network_id = Column(String(255), nullable=False)
- #foreign key to networks.uuid
-
- def __init__(self, ucsm_ip, network_id):
- self.ucsm_ip = ucsm_ip
- self.network_id = network_id
-
- def __repr__(self):
- return "<UcsmBinding(%s,%s)>" % \
- (self.ucsm_ip, self.network_id)
-
-
-class DynamicVnic(BASE):
- """Represents Cisco UCS Dynamic Vnics"""
- __tablename__ = 'dynamic_vnics'
-
- uuid = Column(String(255), primary_key=True)
- device_name = Column(String(255))
- blade_id = Column(String(255), ForeignKey("ucs_blades.uuid"), \
- nullable=False)
-
- def __init__(self, device_name, blade_id):
- self.uuid = uuid.uuid4()
- self.device_name = device_name
- self.blade_id = blade_id
-
- def __repr__(self):
- return "<Dyanmic Vnic(%s,%s,%s)>" % \
- (self.uuid, self.device_name, self.blade_id)
-
-
-class UcsBlade(BASE):
- """Represents details of ucs blades"""
- __tablename__ = 'ucs_blades'
-
- uuid = Column(String(255), primary_key=True)
- mgmt_ip = Column(String(255))
- mac_addr = Column(String(255))
- chassis_id = Column(String(255))
- ucsm_ip = Column(String(255))
- dynamic_vnics = relation(DynamicVnic, order_by=DynamicVnic.uuid, \
- backref="blade")
-
- def __init__(self, mgmt_ip, mac_addr, chassis_id, ucsm_ip):
- self.uuid = uuid.uuid4()
- self.mgmt_ip = mgmt_ip
- self.mac_addr = mac_addr
- self.chassis_id = chassis_id
- self.ucsm_ip = ucsm_ip
-
- def __repr__(self):
- return "<UcsBlades (%s,%s,%s,%s,%s)>" % \
- (self.uuid, self.mgmt_ip, self.mac_addr, self.chassis_id, self.ucsm_ip)
-
-
-class PortBinding(BASE):
- """Represents Port binding to device interface"""
- __tablename__ = 'port_bindings'
-
- id = Column(Integer, primary_key=True, autoincrement=True)
- port_id = Column(String(255), nullable=False)
- #foreign key to ports.uuid
- dynamic_vnic_id = Column(String(255), nullable=False)
- #foreign key to dynamic_vnics.uuid
- portprofile_name = Column(String(255))
- vlan_name = Column(String(255))
- vlan_id = Column(Integer)
- qos = Column(String(255))
-
- def __init__(self, port_id, dynamic_vnic_id, portprofile_name, vlan_name, \
- vlan_id, qos):
- self.port_id = port_id
- self.dynamic_vnic_id = dynamic_vnic_id
- self.portprofile_name = portprofile_name
- self.vlan_name = vlan_name
- self.vlan_id = vlan_id
- self.qos = qos
-
- def __repr__(self):
- return "<PortProfile Binding(%s,%s,%s,%s,%s,%s)>" % \
- (self.port_id, self.dynamic_vnic_id, self.portprofile_name, \
- self.vlan_name, self.vlan_id, self.qos)