--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011, Cisco Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# @author: Rohit Agarwalla, Cisco Systems, Inc.
+
+import logging as LOG
+
+from sqlalchemy.orm import exc
+
+import quantum.plugins.cisco.db.api as db
+import ucs_models
+
+from quantum.plugins.cisco.common import cisco_exceptions as c_exc
+
+
+def get_all_ucsmbinding():
+ """Lists all the ucsm bindings"""
+ LOG.debug("get_all_ucsmbinding() called")
+ session = db.get_session()
+ try:
+ bindings = session.query(ucs_models.UcsmBinding).\
+ all()
+ return bindings
+ except exc.NoResultFound:
+ return []
+
+
+def get_ucsmbinding(ucsm_ip):
+ """Lists a ucsm binding"""
+ LOG.debug("get_ucsmbinding() called")
+ session = db.get_session()
+ try:
+ binding = session.query(ucs_models.UcsmBinding).\
+ filter_by(ucsm_ip=ucsm_ip).\
+ one()
+ return binding
+ except exc.NoResultFound:
+ raise c_exc.UcsmBindingNotFound(ucsm_ip=ucsm_ip)
+
+
+def add_ucsmbinding(ucsm_ip, network_id):
+ """Adds a ucsm binding"""
+ LOG.debug("add_ucsmbinding() called")
+ session = db.get_session()
+ try:
+ binding = session.query(ucs_models.UcsmBinding).\
+ filter_by(ucsm_ip=ucsm_ip).\
+ one()
+ raise c_exc.UcsmBindingAlreadyExists(ucsm_ip=ucsm_ip)
+ except exc.NoResultFound:
+ binding = ucs_models.UcsmBinding(ucsm_ip, network_id)
+ session.add(binding)
+ session.flush()
+ return binding
+
+
+def remove_ucsmbinding(ucsm_ip):
+ """Removes a ucsm binding"""
+ LOG.debug("remove_ucsmbinding() called")
+ session = db.get_session()
+ try:
+ binding = session.query(ucs_models.UcsmBinding).\
+ filter_by(ucsm_ip=ucsm_ip).\
+ one()
+ session.delete(binding)
+ session.flush()
+ return binding
+ except exc.NoResultFound:
+ pass
+
+
+def update_ucsmbinding(ucsm_ip, new_network_id):
+ """Updates ucsm binding"""
+ LOG.debug("update_ucsmbinding() called")
+ session = db.get_session()
+ try:
+ binding = session.query(ucs_models.UcsmBinding).\
+ filter_by(ucsm_ip=ucsm_ip).\
+ one()
+ if new_network_id:
+ binding.network_id = new_network_id
+ session.merge(binding)
+ session.flush()
+ return binding
+ except exc.NoResultFound:
+ raise c_exc.UcsmBindingNotFound(ucsm_ip=ucsm_ip)
+
+
+def get_all_dynamicvnics():
+ """Lists all the dynamic vnics"""
+ LOG.debug("get_all_dynamicvnics() called")
+ session = db.get_session()
+ try:
+ vnics = session.query(ucs_models.DynamicVnic).\
+ all()
+ return vnics
+ except exc.NoResultFound:
+ return []
+
+
+def get_dynamicvnic(vnic_id):
+ """Lists a dynamic vnic"""
+ LOG.debug("get_dynamicvnic() called")
+ session = db.get_session()
+ try:
+ vnic = session.query(ucs_models.DynamicVnic).\
+ filter_by(uuid=vnic_id).\
+ one()
+ return vnic
+ except exc.NoResultFound:
+ raise c_exc.DynamicVnicNotFound(vnic_id=vnic_id)
+
+
+def add_dynamicvnic(device_name, blade_id, vnic_state):
+ """Adds a dynamic vnic"""
+ LOG.debug("add_dynamicvnic() called")
+ session = db.get_session()
+ try:
+ vnic = session.query(ucs_models.DynamicVnic).\
+ filter_by(device_name=device_name).\
+ one()
+ raise c_exc.DynamicVnicAlreadyExists(device_name=device_name)
+ except exc.NoResultFound:
+ vnic = ucs_models.DynamicVnic(device_name, blade_id, vnic_state)
+ session.add(vnic)
+ session.flush()
+ return vnic
+
+
+def remove_dynamicvnic(vnic_id):
+ """Removes a dynamic vnic"""
+ LOG.debug("remove_dynamicvnic() called")
+ session = db.get_session()
+ try:
+ vnic = session.query(ucs_models.DynamicVnic).\
+ filter_by(uuid=vnic_id).\
+ one()
+ session.delete(vnic)
+ session.flush()
+ return vnic
+ except exc.NoResultFound:
+ pass
+
+
+def update_dynamicvnic(vnic_id, new_device_name=None, new_blade_id=None,
+ vnic_state=None):
+ """Updates dynamic vnic"""
+ LOG.debug("update_dynamicvnic() called")
+ session = db.get_session()
+ try:
+ vnic = session.query(ucs_models.DynamicVnic).\
+ filter_by(uuid=vnic_id).\
+ one()
+ if new_device_name:
+ vnic.device_name = new_device_name
+ if new_blade_id:
+ vnic.blade_id = new_blade_id
+ if vnic_state:
+ vnic.vnic_state = vnic_state
+ session.merge(vnic)
+ session.flush()
+ return vnic
+ except exc.NoResultFound:
+ raise c_exc.DynamicVnicNotFound(vnic_id=vnic_id)
+
+
+def get_all_blades():
+ """Lists all the blades details"""
+ LOG.debug("get_all_blades() called")
+ session = db.get_session()
+ try:
+ blades = session.query(ucs_models.UcsBlade).\
+ all()
+ return blades
+ except exc.NoResultFound:
+ return []
+
+
+def get_blade(blade_id):
+ """Lists a blade details"""
+ LOG.debug("get_blade() called")
+ session = db.get_session()
+ try:
+ blade = session.query(ucs_models.UcsBlade).\
+ filter_by(uuid=blade_id).\
+ one()
+ return blade
+ except exc.NoResultFound:
+ raise c_exc.BladeNotFound(blade_id=blade_id)
+
+
+def add_blade(mgmt_ip, mac_addr, chassis_id, ucsm_ip, blade_state,
+ vnics_used, hostname):
+ """Adds a blade"""
+ LOG.debug("add_blade() called")
+ session = db.get_session()
+ try:
+ blade = session.query(ucs_models.UcsBlade).\
+ filter_by(mgmt_ip=mgmt_ip).\
+ one()
+ raise c_exc.BladeAlreadyExists(mgmt_ip=mgmt_ip)
+ except exc.NoResultFound:
+ blade = ucs_models.UcsBlade(mgmt_ip, mac_addr, chassis_id, ucsm_ip,
+ blade_state, vnics_used, hostname)
+ session.add(blade)
+ session.flush()
+ return blade
+
+
+def remove_blade(blade_id):
+ """Removes a blade"""
+ LOG.debug("remove_blade() called")
+ session = db.get_session()
+ try:
+ blade = session.query(ucs_models.UcsBlade).\
+ filter_by(uuid=blade_id).\
+ one()
+ session.delete(blade)
+ session.flush()
+ return blade
+ except exc.NoResultFound:
+ pass
+
+
+def update_blade(blade_id, new_mgmt_ip=None, new_mac_addr=None,
+ new_chassis_id=None, new_ucsm_ip=None,
+ new_blade_state=None, new_vnics_used=None,
+ new_hostname=None):
+ """Updates details of a blade"""
+ LOG.debug("update_blade() called")
+ session = db.get_session()
+ try:
+ blade = session.query(ucs_models.UcsBlade).\
+ filter_by(uuid=blade_id).\
+ one()
+ if new_mgmt_ip:
+ blade.mgmt_ip = new_mgmt_ip
+ if new_mac_addr:
+ blade.mac_addr = new_mac_addr
+ if new_chassis_id:
+ blade.chassis_id = new_chassis_id
+ if new_ucsm_ip:
+ blade.ucsm_ip = new_ucsm_ip
+ if new_blade_state:
+ blade.blade_state = new_blade_state
+ if new_vnics_used:
+ blade.vnics_used = new_vnics_used
+ if new_hostname:
+ blade.hostname = new_hostname
+ session.merge(blade)
+ session.flush()
+ return blade
+ except exc.NoResultFound:
+ raise c_exc.BladeNotFound(blade_id=blade_id)
+
+
+def get_all_portbindings():
+ """Lists all the port bindings"""
+ LOG.debug("db get_all_portbindings() called")
+ session = db.get_session()
+ try:
+ port_bindings = session.query(ucs_models.PortBinding).\
+ all()
+ return port_bindings
+ except exc.NoResultFound:
+ return []
+
+
+def get_portbinding(port_id):
+ """Lists a port binding"""
+ LOG.debug("get_portbinding() called")
+ session = db.get_session()
+ try:
+ port_binding = session.query(ucs_models.PortBinding).\
+ filter_by(port_id=port_id).\
+ one()
+ return port_binding
+ except exc.NoResultFound:
+ raise c_exc.PortVnicNotFound(port_id=port_id)
+
+
+def add_portbinding(port_id, dynamic_vnic_id, portprofile_name,
+ vlan_name, vlan_id, qos):
+ """Adds a port binding"""
+ LOG.debug("add_portbinding() called")
+ session = db.get_session()
+ try:
+ port_binding = session.query(ucs_models.PortBinding).\
+ filter_by(port_id=port_id).\
+ one()
+ raise c_exc.PortVnicBindingAlreadyExists(port_id=port_id)
+ except exc.NoResultFound:
+ port_binding = ucs_models.PortBinding(port_id, dynamic_vnic_id, \
+ portprofile_name, vlan_name, vlan_id, qos)
+ session.add(port_binding)
+ session.flush()
+ return port_binding
+
+
+def remove_portbinding(port_id):
+ """Removes a port binding"""
+ LOG.debug("db remove_portbinding() called")
+ session = db.get_session()
+ try:
+ port_binding = session.query(ucs_models.PortBinding).\
+ filter_by(port_id=port_id).\
+ one()
+ session.delete(port_binding)
+ session.flush()
+ return port_binding
+ except exc.NoResultFound:
+ pass
+
+
+def update_portbinding(port_id, dynamic_vnic_id=None, portprofile_name=None,
+ vlan_name=None, vlan_id=None, qos=None):
+ """Updates port binding"""
+ LOG.debug("db update_portbinding() called")
+ session = db.get_session()
+ try:
+ port_binding = session.query(ucs_models.PortBinding).\
+ filter_by(port_id=port_id).\
+ one()
+ if dynamic_vnic_id:
+ port_binding.dynamic_vnic_id = dynamic_vnic_id
+ if portprofile_name:
+ port_binding.portprofile_name = portprofile_name
+ if vlan_name:
+ port_binding.vlan_name = vlan_name
+ if vlan_name:
+ port_binding.vlan_id = vlan_id
+ if qos:
+ port_binding.qos = qos
+ session.merge(port_binding)
+ session.flush()
+ return port_binding
+ except exc.NoResultFound:
+ raise c_exc.PortVnicNotFound(port_id=port_id)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011, Cisco Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# @author: Rohit Agarwalla, Cisco Systems, Inc.
+
+import uuid
+
+from sqlalchemy import Column, Integer, String, ForeignKey
+from sqlalchemy.orm import relation
+
+from quantum.plugins.cisco.db.l2network_models import L2NetworkBase
+from quantum.plugins.cisco.db import models
+from quantum.plugins.cisco.db.models import BASE
+
+
+class UcsmBinding(BASE, L2NetworkBase):
+ """Represents a binding of ucsm to network_id"""
+ __tablename__ = 'ucsm_bindings'
+
+ id = Column(Integer, primary_key=True, autoincrement=True)
+ ucsm_ip = Column(String(255))
+ network_id = Column(String(255), ForeignKey("networks.uuid"),
+ nullable=False)
+ network = relation(models.Network)
+
+ def __init__(self, ucsm_ip, network_id):
+ self.ucsm_ip = ucsm_ip
+ self.network_id = network_id
+
+ def __repr__(self):
+ return "<UcsmBinding(%s,%s)>" % \
+ (self.ucsm_ip, self.network_id)
+
+
+class DynamicVnic(BASE, L2NetworkBase):
+ """Represents Cisco UCS Dynamic Vnics"""
+ __tablename__ = 'dynamic_vnics'
+
+ uuid = Column(String(255), primary_key=True)
+ device_name = Column(String(255))
+ blade_id = Column(String(255), ForeignKey("ucs_blades.uuid"),
+ nullable=False)
+ vnic_state = Column(String(255))
+
+ def __init__(self, device_name, blade_id, vnic_state):
+ self.uuid = uuid.uuid4()
+ self.device_name = device_name
+ self.blade_id = blade_id
+ self.vnic_state = vnic_state
+
+ def __repr__(self):
+ return "<Dyanmic Vnic(%s,%s,%s,%s)>" % \
+ (self.uuid, self.device_name, self.blade_id,
+ self.vnic_state)
+
+
+class UcsBlade(BASE, L2NetworkBase):
+ """Represents details of ucs blades"""
+ __tablename__ = 'ucs_blades'
+
+ uuid = Column(String(255), primary_key=True)
+ mgmt_ip = Column(String(255))
+ mac_addr = Column(String(255))
+ chassis_id = Column(String(255))
+ ucsm_ip = Column(String(255))
+ blade_state = Column(String(255))
+ vnics_used = Column(Integer)
+ hostname = Column(String(255))
+ dynamic_vnics = relation(DynamicVnic, order_by=DynamicVnic.uuid,
+ backref="blade")
+
+ def __init__(self, mgmt_ip, mac_addr, chassis_id, ucsm_ip,
+ blade_state, vnics_used, hostname):
+ self.uuid = uuid.uuid4()
+ self.mgmt_ip = mgmt_ip
+ self.mac_addr = mac_addr
+ self.chassis_id = chassis_id
+ self.ucsm_ip = ucsm_ip
+ self.blade_state = blade_state
+ self.vnics_used = vnics_used
+ self.hostname = hostname
+
+ def __repr__(self):
+ return "<UcsBlades (%s,%s,%s,%s,%s,%s,%s,%s)>" % \
+ (self.uuid, self.mgmt_ip, self.mac_addr, self.chassis_id,
+ self.ucsm_ip, self.blade_state, self.vnics_used, self.hostname)
+
+
+class PortBinding(BASE, L2NetworkBase):
+ """Represents Port binding to device interface"""
+ __tablename__ = 'port_bindings'
+
+ id = Column(Integer, primary_key=True, autoincrement=True)
+ port_id = Column(String(255), ForeignKey("ports.uuid"),
+ nullable=False)
+ dynamic_vnic_id = Column(String(255), ForeignKey("dynamic_vnics.uuid"),
+ nullable=False)
+ portprofile_name = Column(String(255))
+ vlan_name = Column(String(255))
+ vlan_id = Column(Integer)
+ qos = Column(String(255))
+ ports = relation(models.Port, uselist=False)
+ dynamic_vnics = relation(DynamicVnic, uselist=False)
+
+ def __init__(self, port_id, dynamic_vnic_id, portprofile_name,
+ vlan_name, vlan_id, qos):
+ self.port_id = port_id
+ self.dynamic_vnic_id = dynamic_vnic_id
+ self.portprofile_name = portprofile_name
+ self.vlan_name = vlan_name
+ self.vlan_id = vlan_id
+ self.qos = qos
+
+ def __repr__(self):
+ return "<PortProfile Binding(%s,%s,%s,%s,%s,%s)>" % \
+ (self.port_id, self.dynamic_vnic_id, self.portprofile_name,
+ self.vlan_name, self.vlan_id, self.qos)
import quantum.plugins.cisco.db.api as db
import quantum.plugins.cisco.db.l2network_db as l2network_db
import quantum.plugins.cisco.db.nexus_db as nexus_db
+import quantum.plugins.cisco.db.ucs_db as ucs_db
LOG.getLogger(const.LOGGER_COMPONENT_NAME)
+class UcsDB(object):
+ """Class consisting of methods to call ucs db methods"""
+ def get_all_ucsmbindings(self):
+ """get all ucsm bindings"""
+ bindings = []
+ try:
+ for res in ucs_db.get_all_ucsmbinding():
+ LOG.debug("Getting ucsm binding : %s" % res.ucsm_ip)
+ bind_dict = {}
+ bind_dict["ucsm-ip"] = str(res.ucsm_ip)
+ bind_dict["network-id"] = str(res.network_id)
+ bindings.append(bind_dict)
+ except Exception, exc:
+ LOG.error("Failed to get all bindings: %s" % str(exc))
+ return bindings
+
+ def get_ucsmbinding(self, ucsm_ip):
+ """get ucsm binding"""
+ binding = []
+ try:
+ for res in ucs_db.get_ucsmbinding(ucsm_ip):
+ LOG.debug("Getting ucsm binding : %s" % res.ucsm_ip)
+ bind_dict = {}
+ bind_dict["ucsm-ip"] = str(res.ucsm_ip)
+ bind_dict["network-id"] = str(res.network_id)
+ binding.append(bind_dict)
+ except Exception, exc:
+ LOG.error("Failed to get binding: %s" % str(exc))
+ return binding
+
+ def create_ucsmbinding(self, ucsm_ip, network_id):
+ """create ucsm binding"""
+ bind_dict = {}
+ try:
+ res = ucs_db.add_ucsmbinding(ucsm_ip, network_id)
+ LOG.debug("Created ucsm binding: %s" % res.ucsm_ip)
+ bind_dict["ucsm-ip"] = str(res.ucsm_ip)
+ bind_dict["network-id"] = str(res.network_id)
+ return bind_dict
+ except Exception, exc:
+ LOG.error("Failed to create ucsm binding: %s" % str(exc))
+
+ def delete_ucsmbinding(self, ucsm_ip):
+ """delete ucsm binding"""
+ try:
+ res = ucs_db.remove_ucsmbinding(ucsm_ip)
+ LOG.debug("Deleted ucsm binding : %s" % res.ucsm_ip)
+ bind_dict = {}
+ bind_dict["ucsm-ip"] = str(res.ucsm_ip)
+ return bind_dict
+ except Exception, exc:
+ raise Exception("Failed to delete dynamic vnic: %s" % str(exc))
+
+ def update_ucsmbinding(self, ucsm_ip, network_id):
+ """update ucsm binding"""
+ try:
+ res = ucs_db.update_ucsmbinding(ucsm_ip, network_id)
+ LOG.debug("Updating ucsm binding : %s" % res.ucsm_ip)
+ bind_dict = {}
+ bind_dict["ucsm-ip"] = str(res.ucsm_ip)
+ bind_dict["network-id"] = str(res.network_id)
+ return bind_dict
+ except Exception, exc:
+ raise Exception("Failed to update dynamic vnic: %s" % str(exc))
+
+ def get_all_dynamicvnics(self):
+ """get all dynamic vnics"""
+ vnics = []
+ try:
+ for res in ucs_db.get_all_dynamicvnics():
+ LOG.debug("Getting dynamic vnic : %s" % res.uuid)
+ vnic_dict = {}
+ vnic_dict["vnic-id"] = str(res.uuid)
+ vnic_dict["device-name"] = res.device_name
+ vnic_dict["blade-id"] = str(res.blade_id)
+ vnic_dict["vnic_state"] = res.vnic_state
+ vnics.append(vnic_dict)
+ except Exception, exc:
+ LOG.error("Failed to get all dynamic vnics: %s" % str(exc))
+ return vnics
+
+ def get_dynamicvnic(self, vnic_id):
+ """get dynamic vnic"""
+ vnic = []
+ try:
+ for res in ucs_db.get_dynamicvnic(vnic_id):
+ LOG.debug("Getting dynamic vnic : %s" % res.uuid)
+ vnic_dict = {}
+ vnic_dict["vnic-id"] = str(res.uuid)
+ vnic_dict["device-name"] = res.device_name
+ vnic_dict["blade-id"] = str(res.blade_id)
+ vnic_dict["vnic_state"] = res.vnic_state
+ vnic.append(vnic_dict)
+ except Exception, exc:
+ LOG.error("Failed to get dynamic vnic: %s" % str(exc))
+ return vnic
+
+ def create_dynamicvnic(self, device_name, blade_id, vnic_state):
+ """create dynamic vnic"""
+ vnic_dict = {}
+ try:
+ res = ucs_db.add_dynamicvnic(device_name, blade_id, vnic_state)
+ LOG.debug("Created dynamic vnic: %s" % res.uuid)
+ vnic_dict["vnic-id"] = str(res.uuid)
+ vnic_dict["device-name"] = res.device_name
+ vnic_dict["blade-id"] = str(res.blade_id)
+ vnic_dict["vnic_state"] = res.vnic_state
+ return vnic_dict
+ except Exception, exc:
+ LOG.error("Failed to create dynamic vnic: %s" % str(exc))
+
+ def delete_dynamicvnic(self, vnic_id):
+ """delete dynamic vnic"""
+ try:
+ res = ucs_db.remove_dynamicvnic(vnic_id)
+ LOG.debug("Deleted dynamic vnic : %s" % res.uuid)
+ vnic_dict = {}
+ vnic_dict["vnic-id"] = str(res.uuid)
+ return vnic_dict
+ except Exception, exc:
+ raise Exception("Failed to delete dynamic vnic: %s" % str(exc))
+
+ def update_dynamicvnic(self, vnic_id, device_name=None, blade_id=None,
+ vnic_state=None):
+ """update dynamic vnic"""
+ try:
+ res = ucs_db.update_dynamicvnic(vnic_id, device_name, blade_id,
+ vnic_state)
+ LOG.debug("Updating dynamic vnic : %s" % res.uuid)
+ vnic_dict = {}
+ vnic_dict["vnic-id"] = str(res.uuid)
+ vnic_dict["device-name"] = res.device_name
+ vnic_dict["blade-id"] = str(res.blade_id)
+ vnic_dict["vnic_state"] = res.vnic_state
+ return vnic_dict
+ except Exception, exc:
+ raise Exception("Failed to update dynamic vnic: %s" % str(exc))
+
+ def get_all_blades(self):
+ """get all blades"""
+ blades = []
+ try:
+ for res in ucs_db.get_all_blades():
+ LOG.debug("Getting blade : %s" % res.uuid)
+ blade_dict = {}
+ blade_dict["blade-id"] = str(res.uuid)
+ blade_dict["mgmt-ip"] = str(res.mgmt_ip)
+ blade_dict["mac-addr"] = str(res.mac_addr)
+ blade_dict["chassis-id"] = str(res.chassis_id)
+ blade_dict["ucsm-ip"] = str(res.ucsm_ip)
+ blade_dict["blade_state"] = str(res.blade_state)
+ blade_dict["vnics_used"] = str(res.vnics_used)
+ blade_dict["hostname"] = str(res.hostname)
+ blades.append(blade_dict)
+ except Exception, exc:
+ LOG.error("Failed to get all blades: %s" % str(exc))
+ return blades
+
+ def get_blade(self, blade_id):
+ """get blade"""
+ blade = []
+ try:
+ for res in ucs_db.get_blade(blade_id):
+ LOG.debug("Getting blade : %s" % res.uuid)
+ blade_dict = {}
+ blade_dict["blade-id"] = str(res.uuid)
+ blade_dict["mgmt-ip"] = str(res.mgmt_ip)
+ blade_dict["mac-addr"] = str(res.mac_addr)
+ blade_dict["chassis-id"] = str(res.chassis_id)
+ blade_dict["ucsm-ip"] = str(res.ucsm_ip)
+ blade_dict["blade_state"] = str(res.blade_state)
+ blade_dict["vnics_used"] = str(res.vnics_used)
+ blade_dict["hostname"] = str(res.hostname)
+ blade.append(blade_dict)
+ except Exception, exc:
+ LOG.error("Failed to get all blades: %s" % str(exc))
+ return blade
+
+ def create_blade(self, mgmt_ip, mac_addr, chassis_id, ucsm_ip,
+ blade_state, vnics_used, hostname):
+ """create blade"""
+ blade_dict = {}
+ try:
+ res = ucs_db.add_blade(mgmt_ip, mac_addr, chassis_id, ucsm_ip,
+ blade_state, vnics_used, hostname)
+ LOG.debug("Created blade: %s" % res.uuid)
+ blade_dict["blade-id"] = str(res.uuid)
+ blade_dict["mgmt-ip"] = str(res.mgmt_ip)
+ blade_dict["mac-addr"] = str(res.mac_addr)
+ blade_dict["chassis-id"] = str(res.chassis_id)
+ blade_dict["ucsm-ip"] = str(res.ucsm_ip)
+ blade_dict["blade_state"] = str(res.blade_state)
+ blade_dict["vnics_used"] = str(res.vnics_used)
+ blade_dict["hostname"] = str(res.hostname)
+ return blade_dict
+ except Exception, exc:
+ LOG.error("Failed to create blade: %s" % str(exc))
+
+ def delete_blade(self, blade_id):
+ """delete blade"""
+ try:
+ res = ucs_db.remove_blade(blade_id)
+ LOG.debug("Deleted blade : %s" % res.uuid)
+ blade_dict = {}
+ blade_dict["blade-id"] = str(res.uuid)
+ return blade_dict
+ except Exception, exc:
+ raise Exception("Failed to delete blade: %s" % str(exc))
+
+ def update_blade(self, blade_id, mgmt_ip=None, mac_addr=None,
+ chassis_id=None, ucsm_ip=None, blade_state=None,
+ vnics_used=None, hostname=None):
+ """update blade"""
+ try:
+ res = ucs_db.update_blade(blade_id, mgmt_ip, mac_addr,
+ chassis_id, ucsm_ip, blade_state,
+ vnics_used, hostname)
+ LOG.debug("Updating blade : %s" % res.uuid)
+ blade_dict = {}
+ blade_dict["blade-id"] = str(res.uuid)
+ blade_dict["mgmt-ip"] = str(res.mgmt_ip)
+ blade_dict["mac-addr"] = str(res.mac_addr)
+ blade_dict["chassis-id"] = str(res.chassis_id)
+ blade_dict["ucsm-ip"] = str(res.ucsm_ip)
+ blade_dict["blade_state"] = str(res.blade_state)
+ blade_dict["vnics_used"] = str(res.vnics_used)
+ blade_dict["hostname"] = str(res.hostname)
+ return blade_dict
+ except Exception, exc:
+ raise Exception("Failed to update blade: %s" % str(exc))
+
+ def get_all_port_bindings(self):
+ """get all port binding"""
+ port_bindings = []
+ try:
+ for bind in ucs_db.get_all_portbindings():
+ LOG.debug("Getting port binding for port: %s" % bind.port_id)
+ port_bind_dict = {}
+ port_bind_dict["port-id"] = bind.port_id
+ port_bind_dict["dynamic-vnic-id"] = str(bind.dynamic_vnic_id)
+ port_bind_dict["portprofile-name"] = bind.portprofile_name
+ port_bind_dict["vlan-name"] = bind.vlan_name
+ port_bind_dict["vlan-id"] = str(bind.vlan_id)
+ port_bind_dict["qos"] = bind.qos
+ port_bindings.append(port_bind_dict)
+ except Exception, exc:
+ LOG.error("Failed to get all port bindings: %s" % str(exc))
+ return port_bindings
+
+ def get_port_binding(self, port_id):
+ """get port binding"""
+ port_binding = []
+ try:
+ for bind in ucs_db.get_portbinding(port_id):
+ LOG.debug("Getting port binding for port: %s" % bind.port_id)
+ port_bind_dict = {}
+ port_bind_dict["port-id"] = bind.port_id
+ port_bind_dict["dynamic-vnic-id"] = str(bind.dynamic_vnic_id)
+ port_bind_dict["portprofile-name"] = bind.portprofile_name
+ port_bind_dict["vlan-name"] = bind.vlan_name
+ port_bind_dict["vlan-id"] = str(bind.vlan_id)
+ port_bind_dict["qos"] = bind.qos
+ port_binding.append(port_bind_dict)
+ except Exception, exc:
+ LOG.error("Failed to get port binding: %s" % str(exc))
+ return port_binding
+
+ def create_port_binding(self, port_id, dynamic_vnic_id, portprofile_name, \
+ vlan_name, vlan_id, qos):
+ """create port binding"""
+ port_bind_dict = {}
+ try:
+ res = ucs_db.add_portbinding(port_id, dynamic_vnic_id, \
+ portprofile_name, vlan_name, vlan_id, qos)
+ LOG.debug("Created port binding: %s" % res.port_id)
+ port_bind_dict["port-id"] = res.port_id
+ port_bind_dict["dynamic-vnic-id"] = str(res.dynamic_vnic_id)
+ port_bind_dict["portprofile-name"] = res.portprofile_name
+ port_bind_dict["vlan-name"] = res.vlan_name
+ port_bind_dict["vlan-id"] = str(res.vlan_id)
+ port_bind_dict["qos"] = res.qos
+ return port_bind_dict
+ except Exception, exc:
+ LOG.error("Failed to create port binding: %s" % str(exc))
+
+ def delete_port_binding(self, port_id):
+ """delete port binding"""
+ try:
+ res = ucs_db.remove_portbinding(port_id)
+ LOG.debug("Deleted port binding : %s" % res.port_id)
+ port_bind_dict = {}
+ port_bind_dict["port-id"] = res.port_id
+ return port_bind_dict
+ except Exception, exc:
+ raise Exception("Failed to delete port profile: %s" % str(exc))
+
+ def update_port_binding(self, port_id, dynamic_vnic_id, \
+ portprofile_name, vlan_name, vlan_id, qos):
+ """update port binding"""
+ try:
+ res = ucs_db.update_portbinding(port_id, dynamic_vnic_id, \
+ portprofile_name, vlan_name, vlan_id, qos)
+ LOG.debug("Updating port binding: %s" % res.port_id)
+ port_bind_dict = {}
+ port_bind_dict["port-id"] = res.port_id
+ port_bind_dict["dynamic-vnic-id"] = str(res.dynamic_vnic_id)
+ port_bind_dict["portprofile-name"] = res.portprofile_name
+ port_bind_dict["vlan-name"] = res.vlan_name
+ port_bind_dict["vlan-id"] = str(res.vlan_id)
+ port_bind_dict["qos"] = res.qos
+ return port_bind_dict
+ except Exception, exc:
+ raise Exception("Failed to update portprofile binding:%s"
+ % str(exc))
+
+
class NexusDB(object):
"""Class consisting of methods to call nexus db methods"""
def get_all_nexusportbindings(self):
vlans = []
try:
for vlan_bind in l2network_db.get_all_vlan_bindings():
- LOG.debug("Getting vlan bindings for vlan: %s" % \
+ LOG.debug("Getting vlan bindings for vlan: %s" %
vlan_bind.vlan_id)
vlan_dict = {}
vlan_dict["vlan-id"] = str(vlan_bind.vlan_id)
vlan = []
try:
for vlan_bind in l2network_db.get_vlan_binding(network_id):
- LOG.debug("Getting vlan binding for vlan: %s" \
+ LOG.debug("Getting vlan binding for vlan: %s"
% vlan_bind.vlan_id)
vlan_dict = {}
vlan_dict["vlan-id"] = str(vlan_bind.vlan_id)
def update_vlan_binding(self, network_id, vlan_id, vlan_name):
"""Update a vlan binding"""
try:
- res = l2network_db.update_vlan_binding(network_id, vlan_id, \
+ res = l2network_db.update_vlan_binding(network_id, vlan_id,
vlan_name)
LOG.debug("Updating vlan binding for vlan: %s" % res.vlan_id)
vlan_dict = {}
pp_bindings = []
try:
for pp_bind in l2network_db.get_all_pp_bindings():
- LOG.debug("Getting port profile binding: %s" % \
+ LOG.debug("Getting port profile binding: %s" %
pp_bind.portprofile_id)
ppbinding_dict = {}
ppbinding_dict["portprofile-id"] = str(pp_bind.portprofile_id)
pp_binding = []
try:
for pp_bind in l2network_db.get_pp_binding(tenant_id, pp_id):
- LOG.debug("Getting port profile binding: %s" % \
+ LOG.debug("Getting port profile binding: %s" %
pp_bind.portprofile_id)
ppbinding_dict = {}
ppbinding_dict["portprofile-id"] = str(pp_bind.portprofile_id)
"""Add a portprofile binding"""
ppbinding_dict = {}
try:
- res = l2network_db.add_pp_binding(tenant_id, port_id, pp_id, \
+ res = l2network_db.add_pp_binding(tenant_id, port_id, pp_id,
default)
LOG.debug("Created port profile binding: %s" % res.portprofile_id)
ppbinding_dict["portprofile-id"] = str(res.portprofile_id)
except Exception, exc:
raise Exception("Failed to delete port profile: %s" % str(exc))
- def update_pp_binding(self, tenant_id, pp_id, newtenant_id, \
+ def update_pp_binding(self, tenant_id, pp_id, newtenant_id,
port_id, default):
"""Update portprofile binding"""
try:
ppbinding_dict["default"] = res.default
return ppbinding_dict
except Exception, exc:
- raise Exception("Failed to update portprofile binding:%s" \
+ raise Exception("Failed to update portprofile binding:%s"
% str(exc))
raise Exception("Failed to unplug interface: %s" % str(exc))
+class UcsDBTest(unittest.TestCase):
+ """Class conisting of ucs DB unit tests"""
+ def setUp(self):
+ """Setup for ucs db tests"""
+ l2network_db.initialize()
+ self.quantum = QuantumDB()
+ self.dbtest = UcsDB()
+ LOG.debug("Setup")
+
+ def tearDown(self):
+ """Tear Down"""
+ db.clear_db()
+
+ def testa_create_ucsmbinding(self):
+ """create ucsm binding"""
+ net1 = self.quantum.create_network("t1", "netid1")
+ binding1 = self.dbtest.create_ucsmbinding("1.2.3.4", net1["net-id"])
+ self.assertTrue(binding1["ucsm-ip"] == "1.2.3.4")
+ self.teardown_ucsmbinding()
+ self.teardown_network()
+
+ def testb_getall_ucsmbindings(self):
+ """get all ucsm bindings"""
+ net1 = self.quantum.create_network("t1", "netid1")
+ binding1 = self.dbtest.create_ucsmbinding("1.2.3.4", net1["net-id"])
+ binding2 = self.dbtest.create_ucsmbinding("2.3.4.5", net1["net-id"])
+ bindings = self.dbtest.get_all_ucsmbindings()
+ count = 0
+ for bind in bindings:
+ if net1["net-id"] == bind["network-id"]:
+ count += 1
+ self.assertTrue(count == 2)
+ self.teardown_ucsmbinding()
+ self.teardown_network()
+
+ def testc_delete_ucsmbinding(self):
+ """delete ucsm binding"""
+ net1 = self.quantum.create_network("t1", "netid1")
+ binding1 = self.dbtest.create_ucsmbinding("1.2.3.4", net1["net-id"])
+ self.dbtest.delete_ucsmbinding(binding1["ucsm-ip"])
+ bindings = self.dbtest.get_all_ucsmbindings()
+ count = 0
+ for bind in bindings:
+ if "net " in bind["network-id"]:
+ count += 1
+ self.assertTrue(count == 0)
+ self.teardown_ucsmbinding()
+ self.teardown_network()
+
+ def testd_update_ucsmbinding(self):
+ """update ucsm binding"""
+ net1 = self.quantum.create_network("t1", "netid1")
+ net2 = self.quantum.create_network("t1", "netid2")
+ binding1 = self.dbtest.create_ucsmbinding("1.2.3.4", net1["net-id"])
+ binding1 = self.dbtest.update_ucsmbinding(binding1["ucsm-ip"],
+ net2["net-id"])
+ bindings = self.dbtest.get_all_ucsmbindings()
+ count = 0
+ for bind in bindings:
+ if net2["net-id"] == bind["network-id"]:
+ count += 1
+ self.assertTrue(count == 1)
+ self.teardown_ucsmbinding()
+ self.teardown_network()
+
+ def teste_create_dynamicvnic(self):
+ """create dynamic vnic"""
+ blade1 = self.dbtest.create_blade("1.2.3.4", "abcd", "chassis1",
+ "9.8.7.6", "UP", 2, "blade1")
+ vnic1 = self.dbtest.create_dynamicvnic("eth1", blade1["blade-id"],
+ "UP")
+ self.assertTrue(vnic1["device-name"] == "eth1")
+ self.teardown_dyanmicvnic()
+
+ def testf_getall_dyanmicvnics(self):
+ """get all dynamic vnics"""
+ blade1 = self.dbtest.create_blade("1.2.3.4", "abcd", "chassis1",
+ "9.8.7.6", "UP", 2, "blade1")
+ vnic1 = self.dbtest.create_dynamicvnic("eth1", blade1["blade-id"],
+ "UP")
+ vnic2 = self.dbtest.create_dynamicvnic("eth2", blade1["blade-id"],
+ "UP")
+ vnics = self.dbtest.get_all_dynamicvnics()
+ count = 0
+ for vnic in vnics:
+ if "eth" in vnic["device-name"]:
+ count += 1
+ self.assertTrue(count == 2)
+ self.teardown_dyanmicvnic()
+
+ def testg_delete_dyanmicvnic(self):
+ """delete dynamic vnic"""
+ blade1 = self.dbtest.create_blade("1.2.3.4", "abcd", "chassis1",
+ "9.8.7.6", "UP", 2, "blade1")
+ vnic1 = self.dbtest.create_dynamicvnic("eth1", blade1["blade-id"],
+ "UP")
+ self.dbtest.delete_dynamicvnic(vnic1["vnic-id"])
+ vnics = self.dbtest.get_all_dynamicvnics()
+ count = 0
+ for vnic in vnics:
+ if "eth " in vnic["device-name"]:
+ count += 1
+ self.assertTrue(count == 0)
+ self.teardown_dyanmicvnic()
+
+ def testh_updatedynamicvnic(self):
+ """update dynamic vnic"""
+ blade1 = self.dbtest.create_blade("1.2.3.4", "abcd", "chassis1",
+ "9.8.7.6", "UP", 2, "blade1")
+ vnic1 = self.dbtest.create_dynamicvnic("eth1", blade1["blade-id"],
+ "UP")
+ vnic1 = self.dbtest.update_dynamicvnic(vnic1["vnic-id"], "neweth1",
+ blade1["blade-id"], "DOWN")
+ vnics = self.dbtest.get_all_dynamicvnics()
+ count = 0
+ for vnic in vnics:
+ if "new" in vnic["device-name"]:
+ count += 1
+ self.assertTrue(count == 1)
+ self.teardown_dyanmicvnic()
+
+ def testi_create_ucsblade(self):
+ """create ucs blade"""
+ blade1 = self.dbtest.create_blade("1.2.3.4", "abcd", "chassis1",
+ "9.8.7.6", "UP", 2, "blade1")
+ self.assertTrue(blade1["mgmt-ip"] == "1.2.3.4")
+ self.teardown_ucsblade()
+
+ def testj_getall_ucsblade(self):
+ """get all ucs blades"""
+ blade1 = self.dbtest.create_blade("1.2.3.4", "abcd", "chassis1",
+ "9.8.7.6", "UP", 2, "blade1")
+ blade2 = self.dbtest.create_blade("2.3.4.5", "efgh", "chassis1",
+ "9.8.7.6", "UP", 3, "blade2")
+ blades = self.dbtest.get_all_blades()
+ count = 0
+ for blade in blades:
+ if "chassis" in blade["chassis-id"]:
+ count += 1
+ self.assertTrue(count == 2)
+ self.teardown_ucsblade()
+
+ def testk_delete_ucsblade(self):
+ """delete ucs blades"""
+ blade1 = self.dbtest.create_blade("1.2.3.4", "abcd", "chassis1",
+ "9.8.7.6", "UP", 2, "blade1")
+ self.dbtest.delete_blade(blade1["blade-id"])
+ blades = self.dbtest.get_all_blades()
+ count = 0
+ for blade in blades:
+ if "chassis " in blade["chassis-id"]:
+ count += 1
+ self.assertTrue(count == 0)
+ self.teardown_ucsblade()
+
+ def testl_update_ucsblade(self):
+ """update ucs blade"""
+ blade1 = self.dbtest.create_blade("1.2.3.4", "abcd", "chassis1",
+ "9.8.7.6", "UP", 2, "blade1")
+ blade2 = self.dbtest.update_blade(blade1["blade-id"], "2.3.4.5",
+ "newabcd", "chassis1", "9.8.7.6",
+ "UP", 3, "blade1")
+ blades = self.dbtest.get_all_blades()
+ count = 0
+ for blade in blades:
+ if "new" in blade["mac-addr"]:
+ count += 1
+ self.assertTrue(count == 1)
+ self.teardown_ucsblade()
+
+ def testm_create_portbinding(self):
+ """create port binding"""
+ net1 = self.quantum.create_network("t1", "netid1")
+ port1 = self.quantum.create_port(net1["net-id"])
+ blade1 = self.dbtest.create_blade("1.2.3.4", "abcd", "chassis1",
+ "9.8.7.6", "UP", 2, "blade1")
+ vnic1 = self.dbtest.create_dynamicvnic("eth1", blade1["blade-id"],
+ "UP")
+ port_bind1 = self.dbtest.create_port_binding(port1["port-id"],
+ vnic1["vnic-id"], "pp1", "vlan1", 10, "qos1")
+ self.assertTrue(port_bind1["port-id"] == port1["port-id"])
+ self.teardown_portbinding()
+ self.teardown_dyanmicvnic()
+ self.teardown_ucsblade()
+ self.teardown_network_port()
+
+ def testn_getall_portbindings(self):
+ """get all port binding"""
+ net1 = self.quantum.create_network("t1", "netid1")
+ port1 = self.quantum.create_port(net1["net-id"])
+ port2 = self.quantum.create_port(net1["net-id"])
+ blade1 = self.dbtest.create_blade("1.2.3.4", "abcd", "chassis1",
+ "9.8.7.6", "UP", 2, "blade1")
+ vnic1 = self.dbtest.create_dynamicvnic("eth1", blade1["blade-id"],
+ "UP")
+ vnic2 = self.dbtest.create_dynamicvnic("eth2", blade1["blade-id"],
+ "UP")
+ port_bind1 = self.dbtest.create_port_binding(port1["port-id"],
+ vnic1["vnic-id"], "pp1", "vlan1", 10, "qos1")
+ port_bind2 = self.dbtest.create_port_binding(port2["port-id"],
+ vnic2["vnic-id"], "pp2", "vlan2", 20, "qos2")
+ port_bindings = self.dbtest.get_all_port_bindings()
+ count = 0
+ for pbind in port_bindings:
+ if "vlan" in pbind["vlan-name"]:
+ count += 1
+ self.assertTrue(count == 2)
+ self.teardown_portbinding()
+ self.teardown_dyanmicvnic()
+ self.teardown_ucsblade()
+ self.teardown_network_port()
+
+ def testo_delete_portbinding(self):
+ """delete port binding"""
+ net1 = self.quantum.create_network("t1", "netid1")
+ port1 = self.quantum.create_port(net1["net-id"])
+ blade1 = self.dbtest.create_blade("1.2.3.4", "abcd", "chassis1",
+ "9.8.7.6", "UP", 2, "blade1")
+ vnic1 = self.dbtest.create_dynamicvnic("eth1", blade1["blade-id"],
+ "UP")
+ port_bind1 = self.dbtest.create_port_binding(port1["port-id"],
+ vnic1["vnic-id"], "pp1", "vlan1", 10, "qos1")
+ self.dbtest.delete_port_binding(port1["port-id"])
+ port_bindings = self.dbtest.get_all_port_bindings()
+ count = 0
+ for pbind in port_bindings:
+ if "vlan " in pbind["vlan-name"]:
+ count += 1
+ self.assertTrue(count == 0)
+ self.teardown_portbinding()
+ self.teardown_dyanmicvnic()
+ self.teardown_ucsblade()
+ self.teardown_network_port()
+
+ def testp_update_portbinding(self):
+ """update port binding"""
+ net1 = self.quantum.create_network("t1", "netid1")
+ port1 = self.quantum.create_port(net1["net-id"])
+ blade1 = self.dbtest.create_blade("1.2.3.4", "abcd", "chassis1",
+ "9.8.7.6", "UP", 2, "blade1")
+ vnic1 = self.dbtest.create_dynamicvnic("eth1", blade1["blade-id"],
+ "UP")
+ port_bind1 = self.dbtest.create_port_binding(port1["port-id"],
+ vnic1["vnic-id"], "pp1", "vlan1", 10, "qos1")
+ port_bind1 = self.dbtest.update_port_binding(port1["port-id"],
+ vnic1["vnic-id"], "newpp1", "newvlan1", 11, "newqos1")
+ port_bindings = self.dbtest.get_all_port_bindings()
+ count = 0
+ for pbind in port_bindings:
+ if "new" in pbind["vlan-name"]:
+ count += 1
+ self.assertTrue(count == 1)
+ self.teardown_portbinding()
+ self.teardown_dyanmicvnic()
+ self.teardown_ucsblade()
+ self.teardown_network_port()
+
+ def teardown_ucsmbinding(self):
+ """tear down ucsm binding"""
+ LOG.debug("Tearing Down Ucsm Bindings")
+ binds = self.dbtest.get_all_ucsmbindings()
+ for bind in binds:
+ ucsmip = bind["ucsm-ip"]
+ self.dbtest.delete_ucsmbinding(ucsmip)
+
+ def teardown_dyanmicvnic(self):
+ """tear down dynamic vnics"""
+ LOG.debug("Tearing Down Dynamic Vnics")
+ vnics = self.dbtest.get_all_dynamicvnics()
+ for vnic in vnics:
+ vnicid = vnic["vnic-id"]
+ self.dbtest.delete_dynamicvnic(vnicid)
+ self.teardown_ucsblade()
+
+ def teardown_ucsblade(self):
+ """tear down ucs blades"""
+ LOG.debug("Tearing Down Blades")
+ blades = self.dbtest.get_all_blades()
+ for blade in blades:
+ bladeid = blade["blade-id"]
+ self.dbtest.delete_blade(bladeid)
+
+ def teardown_portbinding(self):
+ """tear down port binding"""
+ LOG.debug("Tearing Down Port Binding")
+ port_bindings = self.dbtest.get_all_port_bindings()
+ for port_binding in port_bindings:
+ portid = port_binding["port-id"]
+ self.dbtest.delete_port_binding(portid)
+
+ def teardown_network(self):
+ """tearDown Network table"""
+ LOG.debug("Tearing Down Network")
+ nets = self.quantum.get_all_networks("t1")
+ for net in nets:
+ netid = net["net-id"]
+ self.quantum.delete_network(netid)
+
+ def teardown_network_port(self):
+ """tearDown for Network and Port table"""
+ networks = self.quantum.get_all_networks("t1")
+ for net in networks:
+ netid = net["net-id"]
+ name = net["net-name"]
+ if "net" in name:
+ ports = self.quantum.get_all_ports(netid)
+ for por in ports:
+ self.quantum.delete_port(netid, por["port-id"])
+ self.quantum.delete_network(netid)
+
+
class NexusDBTest(unittest.TestCase):
"""Class conisting of nexus DB unit tests"""
def setUp(self):