return port
def _get_dns_by_subnet(self, context, subnet_id):
- try:
- dns_qry = context.session.query(models_v2.DNSNameServer)
- return dns_qry.filter_by(subnet_id=subnet_id).all()
- except exc.NoResultFound:
- return []
+ dns_qry = context.session.query(models_v2.DNSNameServer)
+ return dns_qry.filter_by(subnet_id=subnet_id).all()
def _get_route_by_subnet(self, context, subnet_id):
route_qry = context.session.query(models_v2.SubnetRoute)
def network_id(net_name):
session = get_session()
- try:
- return (session.query(models.Network).
+ networks = (session.query(models.Network).
options(joinedload(models.Network.ports)).
filter_by(name=net_name).
all())
- except exc.NoResultFound:
- raise q_exc.NetworkNotFound(net_name=net_name)
+ if networks:
+ return networks
+
+ raise q_exc.NetworkNotFound(net_name=net_name)
def network_get(net_id):
"""Gets all the vlanids."""
LOG.debug(_("get_all_vlanids() called"))
session = db.get_session()
- try:
- vlanids = session.query(l2network_models.VlanID).all()
- return vlanids
- except exc.NoResultFound:
- return []
+ return session.query(l2network_models.VlanID).all()
def is_vlanid_used(vlan_id):
"""Gets all the vlanids used."""
LOG.debug(_("get_all_vlanids() called"))
session = db.get_session()
- try:
- vlanids = (session.query(l2network_models.VlanID).
- filter_by(vlan_used=True).all())
- return vlanids
- except exc.NoResultFound:
- return []
+ return (session.query(l2network_models.VlanID).
+ filter_by(vlan_used=True).all())
def get_all_vlan_bindings():
"""Lists all the vlan to network associations."""
LOG.debug(_("get_all_vlan_bindings() called"))
session = db.get_session()
- try:
- bindings = session.query(l2network_models.VlanBinding).all()
- return bindings
- except exc.NoResultFound:
- return []
+ return session.query(l2network_models.VlanBinding).all()
def get_vlan_binding(netid):
"""Lists all the qos to tenant associations."""
LOG.debug(_("get_all_qoss() called"))
session = db.get_session()
- try:
- qoss = (session.query(l2network_models.QoS).
- filter_by(tenant_id=tenant_id).all())
- return qoss
- except exc.NoResultFound:
- return []
+ return (session.query(l2network_models.QoS).
+ filter_by(tenant_id=tenant_id).all())
def get_qos(tenant_id, qos_id):
def get_all_credentials(tenant_id):
"""Lists all the creds for a tenant."""
session = db.get_session()
- try:
- creds = (session.query(l2network_models.Credential).
- filter_by(tenant_id=tenant_id).all())
- return creds
- except exc.NoResultFound:
- return []
+ return (session.query(l2network_models.Credential).
+ filter_by(tenant_id=tenant_id).all())
def get_credential(tenant_id, credential_id):
"""Gets all the vlanids."""
LOG.debug(_("get_all_vlanids() called"))
session = db.get_session()
- try:
- vlanids = session.query(network_models_v2.VlanID).all()
- return vlanids
- except exc.NoResultFound:
- return []
+ return session.query(network_models_v2.VlanID).all()
def is_vlanid_used(vlan_id):
"""Gets all the vlanids used."""
LOG.debug(_("get_all_vlanids() called"))
session = db.get_session()
- try:
- vlanids = (session.query(network_models_v2.VlanID).
- filter_by(vlan_used=True).all())
- return vlanids
- except exc.NoResultFound:
- return []
+ return (session.query(network_models_v2.VlanID).
+ filter_by(vlan_used=True).all())
def get_all_vlan_bindings():
"""Lists all the vlan to network associations."""
LOG.debug(_("get_all_vlan_bindings() called"))
session = db.get_session()
- try:
- bindings = session.query(network_models_v2.Vlan_Binding).all()
- return bindings
- except exc.NoResultFound:
- return []
+ return session.query(network_models_v2.Vlan_Binding).all()
def get_vlan_binding(netid):
"""Lists all the qos to tenant associations."""
LOG.debug(_("get_all_qoss() called"))
session = db.get_session()
- try:
- qoss = (session.query(network_models_v2.QoS).
- filter_by(tenant_id=tenant_id).all())
- return qoss
- except exc.NoResultFound:
- return []
+ return (session.query(network_models_v2.QoS).
+ filter_by(tenant_id=tenant_id).all())
def get_qos(tenant_id, qos_id):
def get_all_credentials(tenant_id):
"""Lists all the creds for a tenant."""
session = db.get_session()
- try:
- creds = (session.query(network_models_v2.Credential).
- filter_by(tenant_id=tenant_id).all())
- return creds
- except exc.NoResultFound:
- return []
+ return (session.query(network_models_v2.Credential).
+ filter_by(tenant_id=tenant_id).all())
def get_credential(tenant_id, credential_id):
def get_ovs_vlans():
session = db.get_session()
- try:
- bindings = (session.query(ovs_models_v2.VlanAllocation).
- filter_by(allocated=True).
- all())
- except exc.NoResultFound:
- return []
+ bindings = (session.query(ovs_models_v2.VlanAllocation.vlan_id).
+ filter_by(allocated=True))
return [binding.vlan_id for binding in bindings]
"""Lists all the nexusport bindings."""
LOG.debug(_("get_all_nexusport_bindings() called"))
session = db.get_session()
- try:
- bindings = session.query(nexus_models_v2.NexusPortBinding).all()
- return bindings
- except exc.NoResultFound:
- return []
+ return session.query(nexus_models_v2.NexusPortBinding).all()
def get_nexusport_binding(port_id, vlan_id, switch_ip, instance_id):
"""Lists a nexusport binding."""
LOG.debug(_("get_nexusport_binding() called"))
session = db.get_session()
- try:
- binding = (session.query(nexus_models_v2.NexusPortBinding).
- filter_by(vlan_id=vlan_id).filter_by(switch_ip=switch_ip).
- filter_by(port_id=port_id).
- filter_by(instance_id=instance_id).all())
- return binding
- except exc.NoResultFound:
- raise c_exc.NexusPortBindingNotFound(vlan_id=vlan_id)
+
+ # FIXME(rpodolyaka): https://bugs.launchpad.net/quantum/+bug/1174323
+ return (session.query(nexus_models_v2.NexusPortBinding).
+ filter_by(vlan_id=vlan_id).filter_by(switch_ip=switch_ip).
+ filter_by(port_id=port_id).
+ filter_by(instance_id=instance_id).all())
def get_nexusvlan_binding(vlan_id, switch_ip):
"""Lists a vlan and switch binding."""
LOG.debug(_("get_nexusvlan_binding() called"))
session = db.get_session()
- try:
- binding = (session.query(nexus_models_v2.NexusPortBinding).
- filter_by(vlan_id=vlan_id).filter_by(switch_ip=switch_ip).
- all())
- return binding
- except exc.NoResultFound:
- raise c_exc.NexusPortBindingNotFound(vlan_id=vlan_id)
+
+ # FIXME(rpodolyaka): https://bugs.launchpad.net/quantum/+bug/1174323
+ return (session.query(nexus_models_v2.NexusPortBinding).
+ filter_by(vlan_id=vlan_id).filter_by(switch_ip=switch_ip).
+ all())
def add_nexusport_binding(port_id, vlan_id, switch_ip, instance_id):
"""Removes a nexusport binding."""
LOG.debug(_("remove_nexusport_binding() called"))
session = db.get_session()
- try:
- binding = (session.query(nexus_models_v2.NexusPortBinding).
- filter_by(vlan_id=vlan_id).filter_by(switch_ip=switch_ip).
- filter_by(port_id=port_id).
- filter_by(instance_id=instance_id).all())
+ binding = (session.query(nexus_models_v2.NexusPortBinding).
+ filter_by(vlan_id=vlan_id).filter_by(switch_ip=switch_ip).
+ filter_by(port_id=port_id).
+ filter_by(instance_id=instance_id).all())
- for bind in binding:
- session.delete(bind)
- session.flush()
- return binding
- except exc.NoResultFound:
- pass
+ for bind in binding:
+ session.delete(bind)
+ session.flush()
+ return binding
def update_nexusport_binding(port_id, new_vlan_id):
"""Lists nexusvm bindings."""
LOG.debug(_("get_port_vlan_switch_binding() called"))
session = db.get_session()
- try:
- binding = (session.query(nexus_models_v2.NexusPortBinding).
- filter_by(port_id=port_id).filter_by(switch_ip=switch_ip).
- filter_by(vlan_id=vlan_id).all())
- return binding
- except exc.NoResultFound:
- raise c_exc.NexusPortBindingNotFound(vlan_id=vlan_id)
+
+ # FIXME(rpodolyaka): https://bugs.launchpad.net/quantum/+bug/1174323
+ return (session.query(nexus_models_v2.NexusPortBinding).
+ filter_by(port_id=port_id).filter_by(switch_ip=switch_ip).
+ filter_by(vlan_id=vlan_id).all())
def get_tunnel_endpoints():
session = db.get_session()
- try:
- # TODO(rpodolyaka): Query.all() can't raise the NoResultNound exception
- # Fix this later along with other identical cases.
- tunnels = session.query(ovs_models_v2.TunnelEndpoint).all()
- except exc.NoResultFound:
- return []
+
+ tunnels = session.query(ovs_models_v2.TunnelEndpoint)
return [{'id': tunnel.id,
'ip_address': tunnel.ip_address} for tunnel in tunnels]