from sqlalchemy import event
from sqlalchemy import orm
from sqlalchemy.orm import exc
+from sqlalchemy import sql
from neutron.api.v2 import attributes
from neutron.common import constants
if not context.is_admin and hasattr(model, 'tenant_id'):
if hasattr(model, 'shared'):
query_filter = ((model.tenant_id == context.tenant_id) |
- (model.shared == True))
+ (model.shared == sql.true()))
else:
query_filter = (model.tenant_id == context.tenant_id)
# Execute query hooks registered from mixins and plugins
import netaddr
import re
from sqlalchemy.orm import exc
-from sqlalchemy.sql import and_
+from sqlalchemy import sql
from neutron.api.v2 import attributes
from neutron.common import exceptions as n_exc
with db_session.begin(subtransactions=True):
alloc = (db_session.query(n1kv_models_v2.N1kvVlanAllocation).
- filter(and_(
+ filter(sql.and_(
n1kv_models_v2.N1kvVlanAllocation.vlan_id >= seg_min,
n1kv_models_v2.N1kvVlanAllocation.vlan_id <= seg_max,
n1kv_models_v2.N1kvVlanAllocation.physical_network ==
network_profile['physical_network'],
- n1kv_models_v2.N1kvVlanAllocation.allocated == False)
+ n1kv_models_v2.N1kvVlanAllocation.allocated ==
+ sql.false())
)).first()
if alloc:
segment_id = alloc.vlan_id
with db_session.begin(subtransactions=True):
alloc = (db_session.query(n1kv_models_v2.N1kvVxlanAllocation).
- filter(and_(
+ filter(sql.and_(
n1kv_models_v2.N1kvVxlanAllocation.vxlan_id >=
seg_min,
n1kv_models_v2.N1kvVxlanAllocation.vxlan_id <=
seg_max,
- n1kv_models_v2.N1kvVxlanAllocation.allocated == False)
+ n1kv_models_v2.N1kvVxlanAllocation.allocated ==
+ sql.false())
).first())
if alloc:
segment_id = alloc.vxlan_id
profile_type=c_const.POLICY))
a_set = set(i.profile_id for i in a_set_q)
b_set_q = (db_session.query(n1kv_models_v2.ProfileBinding).
- filter(and_(n1kv_models_v2.ProfileBinding.
- tenant_id != c_const.TENANT_ID_NOT_SET,
- n1kv_models_v2.ProfileBinding.
- profile_type == c_const.POLICY)))
+ filter(sql.and_(n1kv_models_v2.ProfileBinding.
+ tenant_id != c_const.TENANT_ID_NOT_SET,
+ n1kv_models_v2.ProfileBinding.
+ profile_type == c_const.POLICY)))
b_set = set(i.profile_id for i in b_set_q)
(db_session.query(n1kv_models_v2.ProfileBinding).
- filter(and_(n1kv_models_v2.ProfileBinding.profile_id.
- in_(a_set & b_set), n1kv_models_v2.ProfileBinding.
- tenant_id == c_const.TENANT_ID_NOT_SET)).
+ filter(sql.and_(n1kv_models_v2.ProfileBinding.profile_id.
+ in_(a_set & b_set),
+ n1kv_models_v2.ProfileBinding.tenant_id ==
+ c_const.TENANT_ID_NOT_SET)).
delete(synchronize_session="fetch"))
def _add_policy_profile(self,
# @author: Francois Eleouet, Orange
# @author: Mathieu Rohon, Orange
+from sqlalchemy import sql
+
from neutron.common import constants as const
from neutron.db import agents_db
from neutron.db import db_base_plugin_v2 as base_db
ml2_models.PortBinding.host)
query = query.join(models_v2.Port)
query = query.filter(models_v2.Port.network_id == network_id,
- models_v2.Port.admin_state_up == True,
+ models_v2.Port.admin_state_up == sql.true(),
agents_db.Agent.agent_type.in_(
l2_const.SUPPORTED_AGENT_TYPES))
return query
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.orm import exc as sa_exc
+from sqlalchemy import sql
from neutron.api.v2 import attributes
from neutron.db import model_base
query = (context.session.query(nmodels.OFCFilterMapping)
.join(PacketFilter,
nmodels.OFCFilterMapping.neutron_id == PacketFilter.id)
- .filter(PacketFilter.admin_state_up == True))
+ .filter(PacketFilter.admin_state_up == sql.true()))
network_id = port['network_id']
net_pf_query = (query.filter(PacketFilter.network_id == network_id)
- .filter(PacketFilter.in_port == None))
+ .filter(PacketFilter.in_port == sql.null()))
net_filters = [(pf['neutron_id'], pf['ofc_id']) for pf in net_pf_query]
port_pf_query = query.filter(PacketFilter.in_port == port['id'])
import random
from oslo.config import cfg
+from sqlalchemy import sql
from neutron.common import constants
from neutron.db import agents_db
query = query.filter(agents_db.Agent.agent_type ==
constants.AGENT_TYPE_DHCP,
agents_db.Agent.host == host,
- agents_db.Agent.admin_state_up == True)
+ agents_db.Agent.admin_state_up == sql.true())
dhcp_agents = query.all()
for dhcp_agent in dhcp_agents:
if agents_db.AgentDbMixin.is_agent_down(
import six
from sqlalchemy.orm import exc
-from sqlalchemy.sql import exists
+from sqlalchemy import sql
from neutron.common import constants
from neutron.db import agents_db
query = query.filter(agents_db.Agent.agent_type ==
constants.AGENT_TYPE_L3,
agents_db.Agent.host == host,
- agents_db.Agent.admin_state_up == True)
+ agents_db.Agent.admin_state_up == sql.true())
try:
l3_agent = query.one()
except (exc.MultipleResultsFound, exc.NoResultFound):
else:
# get all routers that are not hosted
#TODO(gongysh) consider the disabled agent's router
- stmt = ~exists().where(
+ stmt = ~sql.exists().where(
l3_db.Router.id ==
l3_agentschedulers_db.RouterL3AgentBinding.router_id)
unscheduled_router_ids = [router_id_[0] for router_id_ in
# Port security/IP was updated off. Need to check that no security
# groups are on port.
- if (ret_port[psec.PORTSECURITY] != True or not has_ip):
+ if ret_port[psec.PORTSECURITY] is not True or not has_ip:
if has_security_groups:
raise psec.PortSecurityAndIPRequiredForSecurityGroups()
commands = {posargs}
[flake8]
-# E711/E712 comparison to False should be 'if cond is False:' or 'if not cond:'
-# query = query.filter(Component.disabled == False)
# E125 continuation line does not distinguish itself from next logical line
# H302 import only modules
# TODO(marun) H404 multi line docstring should start with a summary
-ignore = E711,E712,E125,H302,H404
+ignore = E125,H302,H404
show-source = true
builtins = _
exclude = .venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,build,tools