# LinuxBridge
#interface_driver = quantum.agent.linux.interface.BridgeInterfaceDriver
-# The Quantum user information for accessing the Quantum API.
-auth_url = http://localhost:35357/v2.0
-auth_region = RegionOne
-admin_tenant_name = %SERVICE_TENANT_NAME%
-admin_user = %SERVICE_USER%
-admin_password = %SERVICE_PASSWORD%
-
# Use "sudo quantum-rootwrap /etc/quantum/rootwrap.conf" to use the real
# root filter facility.
# Change to "sudo" to skip the filtering and just run the comand directly
# TCP Port used by Quantum metadata server
# metadata_port = 9697
-# The time in seconds between state poll requests
-# polling_interval = 3
-
# Send this many gratuitous ARPs for HA setup. Set it below or equal to 0
# to disable this feature.
# send_arp_for_ha = 3
+
+# seconds between re-sync routers' data if needed
+# periodic_interval = 40
+
+# seconds to start to sync routers' data after
+# starting agent
+# periodic_fuzzy_delay = 5
[DEFAULT]
# The list of modules to copy from openstack-common
-modules=cfg,context,eventlet_backdoor,exception,excutils,fileutils,gettextutils,importutils,iniparser,jsonutils,local,lockutils,log,loopingcall,network_utils,notifier,policy,rpc,service,setup,threadgroup,timeutils,uuidutils,version
+modules=cfg,context,eventlet_backdoor,exception,excutils,fileutils,gettextutils,importutils,iniparser,jsonutils,local,lockutils,log,loopingcall,network_utils,notifier,periodic_task,policy,rpc,service,setup,threadgroup,timeutils,uuidutils,version
# The base module to hold the copy of openstack.common
base=quantum
"""
import sys
-import time
+import eventlet
+from eventlet import semaphore
import netaddr
from quantum.agent.common import config
from quantum.agent.linux import ip_lib
from quantum.agent.linux import iptables_manager
from quantum.agent.linux import utils
-from quantum.db import l3_db
+from quantum.common import constants as l3_constants
+from quantum.common import topics
+from quantum import context
+from quantum import manager
from quantum.openstack.common import cfg
from quantum.openstack.common import importutils
from quantum.openstack.common import log as logging
-from quantumclient.v2_0 import client
+from quantum.openstack.common import periodic_task
+from quantum.openstack.common.rpc import common as rpc_common
+from quantum.openstack.common.rpc import proxy
+from quantum.openstack.common import service
+from quantum import service as quantum_service
+
LOG = logging.getLogger(__name__)
NS_PREFIX = 'qrouter-'
EXTERNAL_DEV_PREFIX = 'qg-'
+class L3PluginApi(proxy.RpcProxy):
+ """Agent side of the l3 agent RPC API.
+
+ API version history:
+ 1.0 - Initial version.
+
+ """
+
+ BASE_RPC_API_VERSION = '1.0'
+
+ def __init__(self, topic, host):
+ super(L3PluginApi, self).__init__(
+ topic=topic, default_version=self.BASE_RPC_API_VERSION)
+ self.host = host
+
+ def get_routers(self, context, fullsync=True, router_id=None):
+ """Make a remote process call to retrieve the sync data for routers."""
+ router_ids = [router_id] if router_id else None
+ return self.call(context,
+ self.make_msg('sync_routers', host=self.host,
+ fullsync=fullsync,
+ router_ids=router_ids),
+ topic=self.topic)
+
+ def get_external_network_id(self, context):
+ """Make a remote process call to retrieve the external network id.
+
+ @raise common.RemoteError: with TooManyExternalNetworks
+ as exc_type if there are
+ more than one external network
+ """
+ return self.call(context,
+ self.make_msg('get_external_network_id',
+ host=self.host),
+ topic=self.topic)
+
+
class RouterInfo(object):
- def __init__(self, router_id, root_helper, use_namespaces):
+ def __init__(self, router_id, root_helper, use_namespaces, router=None):
self.router_id = router_id
self.ex_gw_port = None
self.internal_ports = []
self.floating_ips = []
self.root_helper = root_helper
self.use_namespaces = use_namespaces
-
+ self.router = router
self.iptables_manager = iptables_manager.IptablesManager(
root_helper=root_helper,
#FIXME(danwent): use_ipv6=True,
return NS_PREFIX + self.router_id
-class L3NATAgent(object):
+class L3NATAgent(manager.Manager):
OPTS = [
- cfg.StrOpt('admin_user'),
- cfg.StrOpt('admin_password'),
- cfg.StrOpt('admin_tenant_name'),
- cfg.StrOpt('auth_url'),
- cfg.StrOpt('auth_strategy', default='keystone'),
- cfg.StrOpt('auth_region'),
cfg.StrOpt('root_helper', default='sudo'),
cfg.StrOpt('external_network_bridge', default='br-ex',
help="Name of bridge used for external network traffic."),
cfg.StrOpt('interface_driver',
help="The driver used to manage the virtual interface."),
- cfg.IntOpt('polling_interval',
- default=3,
- help="The time in seconds between state poll requests."),
cfg.IntOpt('metadata_port',
default=9697,
help="TCP Port used by Quantum metadata namespace proxy."),
cfg.StrOpt('gateway_external_network_id', default='',
help="UUID of external network for routers implemented "
"by the agents."),
+ cfg.StrOpt('l3_agent_manager',
+ default='quantum.agent.l3_agent.L3NATAgent'),
]
- def __init__(self, conf):
- self.conf = conf
+ def __init__(self, host, conf=None):
+ if conf:
+ self.conf = conf
+ else:
+ self.conf = cfg.CONF
self.router_info = {}
- if not conf.interface_driver:
- LOG.error(_('You must specify an interface driver'))
+ if not self.conf.interface_driver:
+ LOG.error(_('An interface driver must be specified'))
sys.exit(1)
try:
- self.driver = importutils.import_object(conf.interface_driver,
- conf)
+ self.driver = importutils.import_object(self.conf.interface_driver,
+ self.conf)
except:
- LOG.exception(_("Error importing interface driver '%s'"),
- conf.interface_driver)
+ LOG.exception(_("Error importing interface driver '%s'"
+ % self.conf.interface_driver))
sys.exit(1)
-
- self.polling_interval = conf.polling_interval
-
- self.qclient = client.Client(
- username=self.conf.admin_user,
- password=self.conf.admin_password,
- tenant_name=self.conf.admin_tenant_name,
- auth_url=self.conf.auth_url,
- auth_strategy=self.conf.auth_strategy,
- region_name=self.conf.auth_region
- )
-
+ self.plugin_rpc = L3PluginApi(topics.PLUGIN, host)
+ self.fullsync = True
+ self.sync_sem = semaphore.Semaphore(1)
if self.conf.use_namespaces:
self._destroy_all_router_namespaces()
+ super(L3NATAgent, self).__init__(host=self.conf.host)
def _destroy_all_router_namespaces(self):
"""Destroy all router namespaces on the host to eliminate
try:
self._destroy_router_namespace(ns)
except:
- LOG.exception(_("Couldn't delete namespace '%s'"), ns)
+ LOG.exception(_("Failed deleting namespace '%s'") % ns)
def _destroy_router_namespace(self, namespace):
ns_ip = ip_lib.IPWrapper(self.conf.root_helper,
ip_wrapper = ip_wrapper_root.ensure_namespace(ri.ns_name())
ip_wrapper.netns.execute(['sysctl', '-w', 'net.ipv4.ip_forward=1'])
- def daemon_loop(self):
- #TODO(danwent): this simple diff logic does not handle if
- # details of a router port (e.g., IP, mac) are changed behind
- # our back. Will fix this properly with update notifications.
-
- while True:
- try:
- self.do_single_loop()
- except:
- LOG.exception(_("Error running l3_nat daemon_loop"))
-
- time.sleep(self.polling_interval)
-
def _fetch_external_net_id(self):
"""Find UUID of single external network for this agent"""
if self.conf.gateway_external_network_id:
return self.conf.gateway_external_network_id
-
- params = {'router:external': True}
- ex_nets = self.qclient.list_networks(**params)['networks']
- if len(ex_nets) > 1:
- raise Exception(_("Must configure 'gateway_external_network_id' "
- "if Quantum has more than one external "
- "network."))
- if len(ex_nets) == 0:
- return None
- return ex_nets[0]['id']
-
- def do_single_loop(self):
-
- if (self.conf.external_network_bridge and
- not ip_lib.device_exists(self.conf.external_network_bridge)):
- LOG.error(_("External network bridge '%s' does not exist"),
- self.conf.external_network_bridge)
- return
-
- prev_router_ids = set(self.router_info)
- cur_router_ids = set()
-
- target_ex_net_id = self._fetch_external_net_id()
-
- # identify and update new or modified routers
- for r in self.qclient.list_routers()['routers']:
- if not r['admin_state_up']:
- continue
-
- ex_net_id = (r['external_gateway_info'] and
- r['external_gateway_info'].get('network_id'))
- if not ex_net_id and not self.conf.handle_internal_only_routers:
- continue
-
- if ex_net_id and ex_net_id != target_ex_net_id:
- continue
-
- # If namespaces are disabled, only process the router associated
- # with the configured agent id.
- if (self.conf.use_namespaces or
- r['id'] == self.conf.router_id):
- cur_router_ids.add(r['id'])
+ try:
+ return self.plugin_rpc.get_external_network_id(
+ context.get_admin_context())
+ except rpc_common.RemoteError as e:
+ if e.exc_type == 'TooManyExternalNetworks':
+ msg = _(
+ "The 'gateway_external_network_id' must be configured"
+ " if Quantum has more than one external network.")
+ raise Exception(msg)
else:
- continue
- if r['id'] not in self.router_info:
- self._router_added(r['id'])
-
- ri = self.router_info[r['id']]
- self.process_router(ri)
+ raise
- # identify and remove routers that no longer exist
- for router_id in prev_router_ids - cur_router_ids:
- self._router_removed(router_id)
- prev_router_ids = cur_router_ids
-
- def _router_added(self, router_id):
+ def _router_added(self, router_id, router=None):
ri = RouterInfo(router_id, self.conf.root_helper,
- self.conf.use_namespaces)
+ self.conf.use_namespaces, router)
self.router_info[router_id] = ri
if self.conf.use_namespaces:
self._create_router_namespace(ri)
if not ips:
raise Exception(_("Router port %s has no IP address") % port['id'])
if len(ips) > 1:
- LOG.error(_("Ignoring multiple IPs on router port %s"), port['id'])
- port['subnet'] = self.qclient.show_subnet(
- ips[0]['subnet_id'])['subnet']
+ LOG.error(_("Ignoring multiple IPs on router port %s") %
+ port['id'])
prefixlen = netaddr.IPNetwork(port['subnet']['cidr']).prefixlen
port['ip_cidr'] = "%s/%s" % (ips[0]['ip_address'], prefixlen)
def process_router(self, ri):
ex_gw_port = self._get_ex_gw_port(ri)
-
- internal_ports = self.qclient.list_ports(
- device_id=ri.router_id,
- device_owner=l3_db.DEVICE_OWNER_ROUTER_INTF)['ports']
-
+ internal_ports = ri.router.get(l3_constants.INTERFACE_KEY, [])
existing_port_ids = set([p['id'] for p in ri.internal_ports])
current_port_ids = set([p['id'] for p in internal_ports
if p['admin_state_up']])
ri.ex_gw_port = ex_gw_port
def process_router_floating_ips(self, ri, ex_gw_port):
- floating_ips = self.qclient.list_floatingips(
- router_id=ri.router_id)['floatingips']
+ floating_ips = ri.router.get(l3_constants.FLOATINGIP_KEY, [])
existing_floating_ip_ids = set([fip['id'] for fip in ri.floating_ips])
cur_floating_ip_ids = set([fip['id'] for fip in floating_ips])
ri.floating_ips.append(new_fip)
def _get_ex_gw_port(self, ri):
- ports = self.qclient.list_ports(
- device_id=ri.router_id,
- device_owner=l3_db.DEVICE_OWNER_ROUTER_GW)['ports']
- if not ports:
- return None
- elif len(ports) == 1:
- return ports[0]
- else:
- LOG.error(_("Ignoring multiple gateway ports for router %s"),
- ri.router_id)
+ return ri.router.get('gw_port')
def _send_gratuitous_arp_packet(self, ri, interface_name, ip_address):
if self.conf.send_arp_for_ha > 0:
('float-snat', '-s %s -j SNAT --to %s' %
(fixed_ip, floating_ip))]
+ def router_deleted(self, context, router_id):
+ """Deal with router deletion RPC message."""
+ with self.sync_sem:
+ if router_id in self.router_info:
+ try:
+ self._router_removed(router_id)
+ except Exception:
+ msg = _("Failed dealing with router "
+ "'%s' deletion RPC message")
+ LOG.debug(msg, router_id)
+ self.fullsync = True
+
+ def routers_updated(self, context, routers):
+ """Deal with routers modification and creation RPC message."""
+ if not routers:
+ return
+ with self.sync_sem:
+ try:
+ self._process_routers(routers)
+ except Exception:
+ msg = _("Failed dealing with routers update RPC message")
+ LOG.debug(msg)
+ self.fullsync = True
+
+ def _process_routers(self, routers):
+ if (self.conf.external_network_bridge and
+ not ip_lib.device_exists(self.conf.external_network_bridge)):
+ LOG.error(_("The external network bridge '%s' does not exist")
+ % self.conf.external_network_bridge)
+ return
+
+ target_ex_net_id = self._fetch_external_net_id()
+
+ for r in routers:
+ if not r['admin_state_up']:
+ continue
+
+ # If namespaces are disabled, only process the router associated
+ # with the configured agent id.
+ if (not self.conf.use_namespaces and
+ r['id'] != self.conf.router_id):
+ continue
+
+ ex_net_id = (r['external_gateway_info'] or {}).get('network_id')
+ if not ex_net_id and not self.conf.handle_internal_only_routers:
+ continue
+
+ if ex_net_id and ex_net_id != target_ex_net_id:
+ continue
+
+ if r['id'] not in self.router_info:
+ self._router_added(r['id'])
+
+ ri = self.router_info[r['id']]
+ ri.router = r
+ self.process_router(ri)
+
+ @periodic_task.periodic_task
+ def _sync_routers_task(self, context):
+ # we need to sync with router deletion RPC message
+ with self.sync_sem:
+ if self.fullsync:
+ try:
+ if not self.conf.use_namespaces:
+ router_id = self.conf.router_id
+ else:
+ router_id = None
+ routers = self.plugin_rpc.get_routers(
+ context, router_id)
+ self.router_info = {}
+ self._process_routers(routers)
+ self.fullsync = False
+ except Exception:
+ LOG.exception(_("Failed synchronizing routers"))
+ self.fullsync = True
+
+ def after_start(self):
+ LOG.info(_("L3 agent started"))
+
def main():
- conf = config.setup_conf()
+ eventlet.monkey_patch()
+ conf = cfg.CONF
conf.register_opts(L3NATAgent.OPTS)
conf.register_opts(interface.OPTS)
conf.register_opts(external_process.OPTS)
conf(sys.argv)
config.setup_logging(conf)
-
- mgr = L3NATAgent(conf)
- mgr.daemon_loop()
+ server = quantum_service.Service.create(binary='quantum-l3-agent',
+ topic=topics.L3_AGENT)
+ service.launch(server).wait()
# See the License for the specific language governing permissions and
# limitations under the License.
-import socket
-
import netaddr
import webob.exc
QUOTAS = quota.QUOTAS
-def _get_hostname():
- return socket.gethostname()
-
-
-# Register the configuration options
-cfg.CONF.register_opt(cfg.StrOpt('host', default=_get_hostname()))
-
-
def _fields(request):
"""
Extracts the list of fields to return
from paste import deploy
from quantum.api.v2 import attributes
+from quantum.common import utils
from quantum.openstack.common import cfg
from quantum.openstack.common import log as logging
from quantum.version import version_info as quantum_version
cfg.BoolOpt('allow_overlapping_ips', default=False),
cfg.StrOpt('control_exchange',
default='quantum',
- help='AMQP exchange to connect to if using RabbitMQ or Qpid')
+ help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
+ cfg.StrOpt('host', default=utils.get_hostname()),
]
PORT_STATUS_BUILD = 'BUILD'
PORT_STATUS_DOWN = 'DOWN'
PORT_STATUS_ERROR = 'ERROR'
+
+DEVICE_OWNER_ROUTER_INTF = "network:router_interface"
+DEVICE_OWNER_ROUTER_GW = "network:router_gateway"
+DEVICE_OWNER_FLOATINGIP = "network:floatingip"
+FLOATINGIP_KEY = '_floatingips'
+INTERFACE_KEY = '_interfaces'
class InvalidExtenstionEnv(QuantumException):
message = _("Invalid extension environment: %(reason)s")
+
+
+class TooManyExternalNetworks(QuantumException):
+ message = _("More than one external network exists")
PLUGIN = 'q-plugin'
DHCP = 'q-dhcp-notifer'
+L3_AGENT = 'l3_agent'
+
def get_topic_name(prefix, table, operation):
"""Create a topic name.
import os
import signal
+import socket
from eventlet.green import subprocess
(value, mapping))
mappings[key] = value
return mappings
+
+
+def get_hostname():
+ return socket.getfqdn()
import webob.exc as w_exc
from quantum.api.v2 import attributes
+from quantum.common import constants as l3_constants
from quantum.common import exceptions as q_exc
from quantum.db import db_base_plugin_v2
+from quantum.db import l3_rpc_agent_api
from quantum.db import model_base
from quantum.db import models_v2
from quantum.extensions import l3
-from quantum.openstack.common import cfg
from quantum.openstack.common import log as logging
from quantum.openstack.common import uuidutils
from quantum import policy
LOG = logging.getLogger(__name__)
-l3_opts = [
- cfg.StrOpt('metadata_ip_address', default='127.0.0.1'),
- cfg.IntOpt('metadata_port', default=8775)
-]
-# Register the configuration options
-cfg.CONF.register_opts(l3_opts)
-
-DEVICE_OWNER_ROUTER_INTF = "network:router_interface"
-DEVICE_OWNER_ROUTER_GW = "network:router_gateway"
-DEVICE_OWNER_FLOATINGIP = "network:floatingip"
+DEVICE_OWNER_ROUTER_INTF = l3_constants.DEVICE_OWNER_ROUTER_INTF
+DEVICE_OWNER_ROUTER_GW = l3_constants.DEVICE_OWNER_ROUTER_GW
+DEVICE_OWNER_FLOATINGIP = l3_constants.DEVICE_OWNER_FLOATINGIP
class Router(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
# Ensure we actually have something to update
if r.keys():
router_db.update(r)
+ routers = self.get_sync_data(context.elevated(),
+ [router_db['id']])
+ l3_rpc_agent_api.L3AgentNofity.routers_updated(context, routers)
return self._make_router_dict(router_db)
def _update_router_gw_info(self, context, router_id, info):
self._delete_port(context.elevated(), ports[0]['id'])
context.session.delete(router)
+ l3_rpc_agent_api.L3AgentNofity.router_deleted(context, id)
def get_router(self, context, id, fields=None):
router = self._get_router(context, id)
self._check_for_dup_router_subnet(context, router_id,
port['network_id'],
fixed_ips[0]['subnet_id'])
- with context.session.begin(subtransactions=True):
- port.update({'device_id': router_id,
- 'device_owner': DEVICE_OWNER_ROUTER_INTF})
+ port.update({'device_id': router_id,
+ 'device_owner': DEVICE_OWNER_ROUTER_INTF})
elif 'subnet_id' in interface_info:
subnet_id = interface_info['subnet_id']
subnet = self._get_subnet(context, subnet_id)
'device_id': router_id,
'device_owner': DEVICE_OWNER_ROUTER_INTF,
'name': ''}})
+
+ routers = self.get_sync_data(context.elevated(), [router_id])
+ l3_rpc_agent_api.L3AgentNofity.routers_updated(context, routers)
return {'port_id': port['id'],
'subnet_id': port['fixed_ips'][0]['subnet_id']}
raise w_exc.HTTPNotFound("Router %(router_id)s has no "
"interface on subnet %(subnet_id)s"
% locals())
+ routers = self.get_sync_data(context.elevated(), [router_id])
+ l3_rpc_agent_api.L3AgentNofity.routers_updated(context, routers)
def _get_floatingip(self, context, id):
try:
except Exception:
LOG.exception(_("Floating IP association failed"))
raise
-
+ router_id = floatingip_db['router_id']
+ if router_id:
+ routers = self.get_sync_data(context.elevated(), [router_id])
+ l3_rpc_agent_api.L3AgentNofity.routers_updated(context, routers)
return self._make_floatingip_dict(floatingip_db)
def update_floatingip(self, context, id, floatingip):
fip['tenant_id'] = floatingip_db['tenant_id']
fip['id'] = id
fip_port_id = floatingip_db['floating_port_id']
+ before_router_id = floatingip_db['router_id']
self._update_fip_assoc(context, fip, floatingip_db,
self.get_port(context.elevated(),
fip_port_id))
+ router_ids = []
+ if before_router_id:
+ router_ids.append(before_router_id)
+ router_id = floatingip_db['router_id']
+ if router_id and router_id != before_router_id:
+ router_ids.append(router_id)
+ if router_ids:
+ routers = self.get_sync_data(context.elevated(), router_ids)
+ l3_rpc_agent_api.L3AgentNofity.routers_updated(context, routers)
return self._make_floatingip_dict(floatingip_db)
def delete_floatingip(self, context, id):
floatingip = self._get_floatingip(context, id)
+ router_id = floatingip['router_id']
with context.session.begin(subtransactions=True):
context.session.delete(floatingip)
self.delete_port(context.elevated(),
floatingip['floating_port_id'],
l3_port_check=False)
+ if router_id:
+ routers = self.get_sync_data(context.elevated(), [router_id])
+ l3_rpc_agent_api.L3AgentNofity.routers_updated(context, routers)
def get_floatingip(self, context, id, fields=None):
floatingip = self._get_floatingip(context, id)
try:
fip_qry = context.session.query(FloatingIP)
floating_ip = fip_qry.filter_by(fixed_port_id=port_id).one()
+ router_id = floating_ip['router_id']
floating_ip.update({'fixed_port_id': None,
'fixed_ip_address': None,
'router_id': None})
# should never happen
raise Exception('Multiple floating IPs found for port %s'
% port_id)
+ if router_id:
+ routers = self.get_sync_data(context.elevated(), [router_id])
+ l3_rpc_agent_api.L3AgentNofity.routers_updated(context, routers)
def _check_l3_view_auth(self, context, network):
return policy.check(context,
return [n for n in nets if n['id'] in ext_nets]
else:
return [n for n in nets if n['id'] not in ext_nets]
+
+ def _get_sync_routers(self, context, router_ids=None):
+ """Query routers and their gw ports for l3 agent.
+
+ Query routers with the router_ids. The gateway ports, if any,
+ will be queried too.
+ l3 agent has an option to deal with only one router id. In addition,
+ when we need to notify the agent the data about only one router
+ (when modification of router, its interfaces, gw_port and floatingips),
+ we will have router_ids.
+ @param router_ids: the list of router ids which we want to query.
+ if it is None, all of routers will be queried.
+ @return: a list of dicted routers with dicted gw_port populated if any
+ """
+ router_query = context.session.query(Router)
+ if router_ids:
+ router_query = router_query.filter(Router.id.in_(router_ids))
+ routers = router_query.all()
+ gw_port_ids = []
+ if not routers:
+ return []
+ for router in routers:
+ gw_port_id = router.gw_port_id
+ if gw_port_id:
+ gw_port_ids.append(gw_port_id)
+ gw_ports = []
+ if gw_port_ids:
+ gw_ports = self._get_sync_gw_ports(context, gw_port_ids)
+ gw_port_id_gw_port_dict = {}
+ for gw_port in gw_ports:
+ gw_port_id_gw_port_dict[gw_port['id']] = gw_port
+ router_id_gw_port_id_dict = {}
+ for router in routers:
+ router_id_gw_port_id_dict[router.id] = router.gw_port_id
+ routers_list = [self._make_router_dict(c, None) for c in routers]
+ for router in routers_list:
+ gw_port_id = router_id_gw_port_id_dict[router['id']]
+ if gw_port_id:
+ router['gw_port'] = gw_port_id_gw_port_dict[gw_port_id]
+ return routers_list
+
+ def _get_sync_floating_ips(self, context, router_ids):
+ """Query floating_ips that relate to list of router_ids."""
+ if not router_ids:
+ return []
+ return self.get_floatingips(context, {'router_id': router_ids})
+
+ def _get_sync_gw_ports(self, context, gw_port_ids):
+ if not gw_port_ids:
+ return []
+ filters = {'id': gw_port_ids}
+ gw_ports = self.get_ports(context, filters)
+ if gw_ports:
+ self._populate_subnet_for_ports(context, gw_ports)
+ return gw_ports
+
+ def _get_sync_interfaces(self, context, router_ids):
+ """Query router interfaces that relate to list of router_ids."""
+ if not router_ids:
+ return []
+ filters = {'device_id': router_ids,
+ 'device_owner': [DEVICE_OWNER_ROUTER_INTF]}
+ interfaces = self.get_ports(context, filters)
+ if interfaces:
+ self._populate_subnet_for_ports(context, interfaces)
+ return interfaces
+
+ def _populate_subnet_for_ports(self, context, ports):
+ """Populate ports with subnet.
+
+ These ports already have fixed_ips populated.
+ """
+ if not ports:
+ return
+ subnet_id_ports_dict = {}
+ for port in ports:
+ fixed_ips = port.get('fixed_ips', [])
+ if len(fixed_ips) > 1:
+ LOG.error(_("Ignoring multiple IPs on router port %s") %
+ port['id'])
+ ports.remove(port)
+ continue
+ # Empty fixed_ips should not happen
+ fixed_ip = fixed_ips[0]
+ my_ports = subnet_id_ports_dict.get(fixed_ip['subnet_id'], [])
+ my_ports.append(port)
+ subnet_id_ports_dict[fixed_ip['subnet_id']] = my_ports
+ filters = {'id': subnet_id_ports_dict.keys()}
+ fields = ['id', 'cidr', 'gateway_ip']
+ subnet_dicts = self.get_subnets(context, filters, fields)
+ for subnet_dict in subnet_dicts:
+ ports = subnet_id_ports_dict.get(subnet_dict['id'], [])
+ for port in ports:
+ # TODO(gongysh) stash the subnet into fixed_ips
+ # to make the payload smaller.
+ port['subnet'] = {'id': subnet_dict['id'],
+ 'cidr': subnet_dict['cidr'],
+ 'gateway_ip': subnet_dict['gateway_ip']}
+
+ def _process_sync_data(self, routers, interfaces, floating_ips):
+ routers_dict = {}
+ for router in routers:
+ routers_dict[router['id']] = router
+ for floating_ip in floating_ips:
+ router = routers_dict.get(floating_ip['router_id'])
+ if router:
+ router_floatingips = router.get(l3_constants.FLOATINGIP_KEY,
+ [])
+ router_floatingips.append(floating_ip)
+ router[l3_constants.FLOATINGIP_KEY] = router_floatingips
+ for interface in interfaces:
+ router = routers_dict.get(interface['device_id'])
+ if router:
+ router_interfaces = router.get(l3_constants.INTERFACE_KEY, [])
+ router_interfaces.append(interface)
+ router[l3_constants.INTERFACE_KEY] = router_interfaces
+ return routers_dict.values()
+
+ def get_sync_data(self, context, router_ids=None):
+ """Query routers and their related floating_ips, interfaces."""
+ with context.session.begin(subtransactions=True):
+ routers = self._get_sync_routers(context,
+ router_ids)
+ router_ids = [router['id'] for router in routers]
+ floating_ips = self._get_sync_floating_ips(context, router_ids)
+ interfaces = self._get_sync_interfaces(context, router_ids)
+ return self._process_sync_data(routers, interfaces, floating_ips)
+
+ def get_external_network_id(self, context):
+ nets = self.get_networks(context, {'router:external': [True]})
+ if len(nets) > 1:
+ raise q_exc.TooManyExternalNetworks()
+ else:
+ return nets[0]['id'] if nets else None
--- /dev/null
+# Copyright (c) 2012 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from quantum.common import topics
+from quantum.openstack.common import jsonutils
+from quantum.openstack.common import log as logging
+from quantum.openstack.common.rpc import proxy
+
+
+LOG = logging.getLogger(__name__)
+
+
+class L3AgentNotifyAPI(proxy.RpcProxy):
+ """API for plugin to notify L3 agent."""
+ BASE_RPC_API_VERSION = '1.0'
+
+ def __init__(self, topic=topics.L3_AGENT):
+ super(L3AgentNotifyAPI, self).__init__(
+ topic=topic, default_version=self.BASE_RPC_API_VERSION)
+
+ def router_deleted(self, context, router_id):
+ LOG.debug(_('Nofity agent the router %s is deleted'), router_id)
+ self.cast(context,
+ self.make_msg('router_deleted',
+ router_id=router_id),
+ topic=self.topic)
+
+ def routers_updated(self, context, routers):
+ if routers:
+ LOG.debug(_('Nofity agent routers were updated:\n %s'),
+ jsonutils.dumps(routers, indent=5))
+ self.cast(context,
+ self.make_msg('routers_updated',
+ routers=routers),
+ topic=self.topic)
+
+
+L3AgentNofity = L3AgentNotifyAPI()
--- /dev/null
+# Copyright (c) 2012 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from quantum import context as quantum_context
+from quantum import manager
+from quantum.openstack.common import jsonutils
+from quantum.openstack.common import log as logging
+
+
+LOG = logging.getLogger(__name__)
+
+
+class L3RpcCallbackMixin(object):
+ """A mix-in that enable L3 agent rpc support in plugin implementations."""
+
+ def sync_routers(self, context, **kwargs):
+ """Sync routers according to filters to a specific agent.
+
+ @param context: contain user information
+ @param kwargs: host, or router_id
+ @return: a list of routers
+ with their interfaces and floating_ips
+ """
+ router_id = kwargs.get('router_id')
+ # TODO(gongysh) we will use host in kwargs for multi host BP
+ context = quantum_context.get_admin_context()
+ plugin = manager.QuantumManager.get_plugin()
+ routers = plugin.get_sync_data(context, router_id)
+ LOG.debug(_("Routers returned to l3 agent:\n %s"),
+ jsonutils.dumps(routers, indent=5))
+ return routers
+
+ def get_external_network_id(self, context, **kwargs):
+ """Get one external network id for l3 agent.
+
+ l3 agent expects only on external network when it performs
+ this query.
+ """
+ context = quantum_context.get_admin_context()
+ plugin = manager.QuantumManager.get_plugin()
+ net_id = plugin.get_external_network_id(context)
+ LOG.debug(_("External network ID returned to l3 agent: %s"),
+ net_id)
+ return net_id
# under the License.
# @author: Somik Behera, Nicira Networks, Inc.
-"""
-Quantum's Manager class is responsible for parsing a config file and
-instantiating the correct plugin that concretely implement quantum_plugin_base
-class.
-The caller should make sure that QuantumManager is a singleton.
-"""
-
from quantum.common.exceptions import ClassNotFound
from quantum.openstack.common import cfg
from quantum.openstack.common import importutils
from quantum.openstack.common import log as logging
+from quantum.openstack.common import periodic_task
from quantum.plugins.common import constants
LOG = logging.getLogger(__name__)
-class QuantumManager(object):
+class Manager(periodic_task.PeriodicTasks):
+
+ # Set RPC API version to 1.0 by default.
+ RPC_API_VERSION = '1.0'
+
+ def __init__(self, host=None):
+ if not host:
+ host = cfg.CONF.host
+ self.host = host
+ super(Manager, self).__init__()
+
+ def periodic_tasks(self, context, raise_on_error=False):
+ self.run_periodic_tasks(context, raise_on_error=raise_on_error)
+
+ def init_host(self):
+ """Handle initialization if this is a standalone service.
+ Child classes should override this method.
+
+ """
+ pass
+
+ def after_start(self):
+ """Handler post initialization stuff.
+
+ Child classes can override this method.
+ """
+ pass
+
+
+class QuantumManager(object):
+ """
+ Quantum's Manager class is responsible for parsing a config file and
+ instantiating the correct plugin that concretely implement
+ quantum_plugin_base class.
+ The caller should make sure that QuantumManager is a singleton.
+ """
_instance = None
def __init__(self, options=None, config_file=None):
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from quantum.openstack.common.gettextutils import _
+from quantum.openstack.common import log as logging
+
+LOG = logging.getLogger(__name__)
+
+
+def periodic_task(*args, **kwargs):
+ """Decorator to indicate that a method is a periodic task.
+
+ This decorator can be used in two ways:
+
+ 1. Without arguments '@periodic_task', this will be run on every tick
+ of the periodic scheduler.
+
+ 2. With arguments, @periodic_task(ticks_between_runs=N), this will be
+ run on every N ticks of the periodic scheduler.
+ """
+ def decorator(f):
+ f._periodic_task = True
+ f._ticks_between_runs = kwargs.pop('ticks_between_runs', 0)
+ return f
+
+ # NOTE(sirp): The `if` is necessary to allow the decorator to be used with
+ # and without parens.
+ #
+ # In the 'with-parens' case (with kwargs present), this function needs to
+ # return a decorator function since the interpreter will invoke it like:
+ #
+ # periodic_task(*args, **kwargs)(f)
+ #
+ # In the 'without-parens' case, the original function will be passed
+ # in as the first argument, like:
+ #
+ # periodic_task(f)
+ if kwargs:
+ return decorator
+ else:
+ return decorator(args[0])
+
+
+class _PeriodicTasksMeta(type):
+ def __init__(cls, names, bases, dict_):
+ """Metaclass that allows us to collect decorated periodic tasks."""
+ super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_)
+
+ # NOTE(sirp): if the attribute is not present then we must be the base
+ # class, so, go ahead and initialize it. If the attribute is present,
+ # then we're a subclass so make a copy of it so we don't step on our
+ # parent's toes.
+ try:
+ cls._periodic_tasks = cls._periodic_tasks[:]
+ except AttributeError:
+ cls._periodic_tasks = []
+
+ try:
+ cls._ticks_to_skip = cls._ticks_to_skip.copy()
+ except AttributeError:
+ cls._ticks_to_skip = {}
+
+ # This uses __dict__ instead of
+ # inspect.getmembers(cls, inspect.ismethod) so only the methods of the
+ # current class are added when this class is scanned, and base classes
+ # are not added redundantly.
+ for value in cls.__dict__.values():
+ if getattr(value, '_periodic_task', False):
+ task = value
+ name = task.__name__
+ cls._periodic_tasks.append((name, task))
+ cls._ticks_to_skip[name] = task._ticks_between_runs
+
+
+class PeriodicTasks(object):
+ __metaclass__ = _PeriodicTasksMeta
+
+ def run_periodic_tasks(self, context, raise_on_error=False):
+ """Tasks to be run at a periodic interval."""
+ for task_name, task in self._periodic_tasks:
+ full_task_name = '.'.join([self.__class__.__name__, task_name])
+
+ ticks_to_skip = self._ticks_to_skip[task_name]
+ if ticks_to_skip > 0:
+ LOG.debug(_("Skipping %(full_task_name)s, %(ticks_to_skip)s"
+ " ticks left until next run"), locals())
+ self._ticks_to_skip[task_name] -= 1
+ continue
+
+ self._ticks_to_skip[task_name] = task._ticks_between_runs
+ LOG.debug(_("Running periodic task %(full_task_name)s"), locals())
+
+ try:
+ task(self, context)
+ except Exception as e:
+ if raise_on_error:
+ raise
+ LOG.exception(_("Error during %(full_task_name)s: %(e)s"),
+ locals())
from quantum.db import db_base_plugin_v2
from quantum.db import dhcp_rpc_base
from quantum.db import l3_db
+from quantum.db import l3_rpc_base
from quantum.extensions import providernet as provider
from quantum.openstack.common import cfg
from quantum.openstack.common import log as logging
LOG = logging.getLogger(__name__)
-class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin):
+class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
+ l3_rpc_base.L3RpcCallbackMixin):
# Set RPC API version to 1.0 by default.
RPC_API_VERSION = '1.0'
from quantum.db import db_base_plugin_v2
from quantum.db import dhcp_rpc_base
from quantum.db import l3_db
+from quantum.db import l3_rpc_base
from quantum.extensions import providernet as provider
from quantum.openstack.common import cfg
from quantum.openstack.common import log as logging
LOG = logging.getLogger(__name__)
-class OVSRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin):
+class OVSRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
+ l3_rpc_base.L3RpcCallbackMixin):
# Set RPC API version to 1.0 by default.
RPC_API_VERSION = '1.0'
# License for the specific language governing permissions and limitations
# under the License.
+import inspect
import logging as std_logging
+import os
+import random
from quantum.common import config
+from quantum import context
from quantum.openstack.common import cfg
+from quantum.openstack.common import importutils
from quantum.openstack.common import log as logging
+from quantum.openstack.common import loopingcall
+from quantum.openstack.common.rpc import service
from quantum import wsgi
+LOG = logging.getLogger(__name__)
+
+service_opts = [
+ cfg.IntOpt('report_interval',
+ default=10,
+ help='seconds between nodes reporting state to datastore'),
+ cfg.IntOpt('periodic_interval',
+ default=40,
+ help='seconds between running periodic tasks'),
+ cfg.IntOpt('periodic_fuzzy_delay',
+ default=5,
+ help='range of seconds to randomly delay when starting the'
+ ' periodic task scheduler to reduce stampeding.'
+ ' (Disable by setting to 0)'),
+]
+CONF = cfg.CONF
+CONF.register_opts(service_opts)
+
LOG = logging.getLogger(__name__)
{'host': cfg.CONF.bind_host,
'port': cfg.CONF.bind_port})
return server
+
+
+class Service(service.Service):
+ """Service object for binaries running on hosts.
+
+ A service takes a manager and enables rpc by listening to queues based
+ on topic. It also periodically runs tasks on the manager."""
+
+ def __init__(self, host, binary, topic, manager, report_interval=None,
+ periodic_interval=None, periodic_fuzzy_delay=None,
+ *args, **kwargs):
+
+ self.binary = binary
+ self.manager_class_name = manager
+ manager_class = importutils.import_class(self.manager_class_name)
+ self.manager = manager_class(host=host, *args, **kwargs)
+ self.report_interval = report_interval
+ self.periodic_interval = periodic_interval
+ self.periodic_fuzzy_delay = periodic_fuzzy_delay
+ self.saved_args, self.saved_kwargs = args, kwargs
+ self.timers = []
+ super(Service, self).__init__(host, topic, manager=self.manager)
+
+ def start(self):
+ self.manager.init_host()
+ super(Service, self).start()
+ if self.report_interval:
+ pulse = loopingcall.LoopingCall(self.report_state)
+ pulse.start(interval=self.report_interval,
+ initial_delay=self.report_interval)
+ self.timers.append(pulse)
+
+ if self.periodic_interval:
+ if self.periodic_fuzzy_delay:
+ initial_delay = random.randint(0, self.periodic_fuzzy_delay)
+ else:
+ initial_delay = None
+
+ periodic = loopingcall.LoopingCall(self.periodic_tasks)
+ periodic.start(interval=self.periodic_interval,
+ initial_delay=initial_delay)
+ self.timers.append(periodic)
+ self.manager.after_start()
+
+ def __getattr__(self, key):
+ manager = self.__dict__.get('manager', None)
+ return getattr(manager, key)
+
+ @classmethod
+ def create(cls, host=None, binary=None, topic=None, manager=None,
+ report_interval=None, periodic_interval=None,
+ periodic_fuzzy_delay=None):
+ """Instantiates class and passes back application object.
+
+ :param host: defaults to CONF.host
+ :param binary: defaults to basename of executable
+ :param topic: defaults to bin_name - 'nova-' part
+ :param manager: defaults to CONF.<topic>_manager
+ :param report_interval: defaults to CONF.report_interval
+ :param periodic_interval: defaults to CONF.periodic_interval
+ :param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay
+
+ """
+ if not host:
+ host = CONF.host
+ if not binary:
+ binary = os.path.basename(inspect.stack()[-1][1])
+ if not topic:
+ topic = binary.rpartition('quantum-')[2]
+ topic = topic.replace("-", "_")
+ if not manager:
+ manager = CONF.get('%s_manager' % topic, None)
+ if report_interval is None:
+ report_interval = CONF.report_interval
+ if periodic_interval is None:
+ periodic_interval = CONF.periodic_interval
+ if periodic_fuzzy_delay is None:
+ periodic_fuzzy_delay = CONF.periodic_fuzzy_delay
+ service_obj = cls(host, binary, topic, manager,
+ report_interval=report_interval,
+ periodic_interval=periodic_interval,
+ periodic_fuzzy_delay=periodic_fuzzy_delay)
+
+ return service_obj
+
+ def kill(self):
+ """Destroy the service object."""
+ self.stop()
+
+ def stop(self):
+ super(Service, self).stop()
+ for x in self.timers:
+ try:
+ x.stop()
+ except Exception:
+ LOG.exception("exception occurs when timer stops")
+ pass
+ self.timers = []
+
+ def wait(self):
+ super(Service, self).wait()
+ for x in self.timers:
+ try:
+ x.wait()
+ except Exception:
+ LOG.exception("exception occurs when waiting for timer")
+ pass
+
+ def periodic_tasks(self, raise_on_error=False):
+ """Tasks to be run at a periodic interval."""
+ ctxt = context.get_admin_context()
+ self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
+
+ def report_state(self):
+ """Update the state of this service."""
+ # Todo(gongysh) report state to quantum server
+ pass
self._delete('subnets', subnet['subnet']['id'])
@contextlib.contextmanager
- def port(self, subnet=None, fixed_ips=None, fmt='json', no_delete=False,
+ def port(self, subnet=None, fmt='json', no_delete=False,
**kwargs):
if not subnet:
with self.subnet() as subnet:
# under the License.
import copy
-import time
-import unittest
+import unittest2
import mock
-from quantum.agent.common import config
from quantum.agent import l3_agent
from quantum.agent.linux import interface
-from quantum.db import l3_db
+from quantum.common import config as base_config
+from quantum.common import constants as l3_constants
+from quantum.openstack.common import cfg
from quantum.openstack.common import uuidutils
_uuid = uuidutils.generate_uuid
+HOSTNAME = 'myhost'
-class TestBasicRouterOperations(unittest.TestCase):
+class TestBasicRouterOperations(unittest2.TestCase):
def setUp(self):
- self.conf = config.setup_conf()
+ self.conf = cfg.CommonConfigOpts()
+ self.conf.register_opts(base_config.core_opts)
self.conf.register_opts(l3_agent.L3NATAgent.OPTS)
self.conf.register_opts(interface.OPTS)
self.conf.set_override('interface_driver',
self.mock_ip = mock.MagicMock()
ip_cls.return_value = self.mock_ip
- self.client_cls_p = mock.patch('quantumclient.v2_0.client.Client')
- client_cls = self.client_cls_p.start()
- self.client_inst = mock.Mock()
- client_cls.return_value = self.client_inst
+ self.l3pluginApi_cls_p = mock.patch(
+ 'quantum.agent.l3_agent.L3PluginApi')
+ l3pluginApi_cls = self.l3pluginApi_cls_p.start()
+ self.plugin_api = mock.Mock()
+ l3pluginApi_cls.return_value = self.plugin_api
def tearDown(self):
self.device_exists_p.stop()
- self.client_cls_p.stop()
+ self.l3pluginApi_cls_p.stop()
self.ip_cls_p.stop()
self.dvr_cls_p.stop()
self.utils_exec_p.stop()
self.assertTrue(ri.ns_name().endswith(id))
def testAgentCreate(self):
- agent = l3_agent.L3NATAgent(self.conf)
+ agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
def _test_internal_network_action(self, action):
port_id = _uuid()
network_id = _uuid()
ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
self.conf.use_namespaces)
- agent = l3_agent.L3NATAgent(self.conf)
+ agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
interface_name = agent.get_internal_device_name(port_id)
cidr = '99.0.1.9/24'
mac = 'ca:fe:de:ad:be:ef'
router_id = _uuid()
ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
self.conf.use_namespaces)
- agent = l3_agent.L3NATAgent(self.conf)
+ agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
internal_cidrs = ['100.0.1.0/24', '200.74.0.0/16']
ex_gw_port = {'fixed_ips': [{'ip_address': '20.0.0.30',
'subnet_id': _uuid()}],
router_id = _uuid()
ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
self.conf.use_namespaces)
- agent = l3_agent.L3NATAgent(self.conf)
+ agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
floating_ip = '20.0.0.100'
fixed_ip = '10.0.0.23'
ex_gw_port = {'fixed_ips': [{'ip_address': '20.0.0.30',
def testProcessRouter(self):
- agent = l3_agent.L3NATAgent(self.conf)
+ agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
router_id = _uuid()
- ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
- self.conf.use_namespaces)
-
- # return data so that state is built up
ex_gw_port = {'id': _uuid(),
'network_id': _uuid(),
'fixed_ips': [{'ip_address': '19.4.4.4',
- 'subnet_id': _uuid()}]}
+ 'subnet_id': _uuid()}],
+ 'subnet': {'cidr': '19.4.4.0/24',
+ 'gateway_ip': '19.4.4.1'}}
internal_port = {'id': _uuid(),
'network_id': _uuid(),
'admin_state_up': True,
'fixed_ips': [{'ip_address': '35.4.4.4',
'subnet_id': _uuid()}],
- 'mac_address': 'ca:fe:de:ad:be:ef'}
-
- def fake_list_ports1(**kwargs):
- if kwargs['device_owner'] == l3_db.DEVICE_OWNER_ROUTER_GW:
- return {'ports': [ex_gw_port]}
- elif kwargs['device_owner'] == l3_db.DEVICE_OWNER_ROUTER_INTF:
- return {'ports': [internal_port]}
-
- fake_subnet = {'subnet': {'cidr': '19.4.4.0/24',
- 'gateway_ip': '19.4.4.1'}}
+ 'mac_address': 'ca:fe:de:ad:be:ef',
+ 'subnet': {'cidr': '35.4.4.0/24',
+ 'gateway_ip': '35.4.4.1'}}
fake_floatingips1 = {'floatingips': [
{'id': _uuid(),
'floating_ip_address': '8.8.8.8',
'fixed_ip_address': '7.7.7.7',
'port_id': _uuid()}]}
-
- self.client_inst.list_ports.side_effect = fake_list_ports1
- self.client_inst.show_subnet.return_value = fake_subnet
- self.client_inst.list_floatingips.return_value = fake_floatingips1
+ router = {
+ 'id': router_id,
+ l3_constants.FLOATINGIP_KEY: fake_floatingips1['floatingips'],
+ l3_constants.INTERFACE_KEY: [internal_port],
+ 'gw_port': ex_gw_port}
+ ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
+ self.conf.use_namespaces, router=router)
agent.process_router(ri)
# remap floating IP to a new fixed ip
fake_floatingips2 = copy.deepcopy(fake_floatingips1)
fake_floatingips2['floatingips'][0]['fixed_ip_address'] = '7.7.7.8'
- self.client_inst.list_floatingips.return_value = fake_floatingips2
+ router[l3_constants.FLOATINGIP_KEY] = fake_floatingips2['floatingips']
agent.process_router(ri)
# remove just the floating ips
- self.client_inst.list_floatingips.return_value = {'floatingips': []}
+ del router[l3_constants.FLOATINGIP_KEY]
agent.process_router(ri)
- # now return no ports so state is torn down
- self.client_inst.list_ports.return_value = {'ports': []}
+ # now no ports so state is torn down
+ del router[l3_constants.INTERFACE_KEY]
+ del router['gw_port']
agent.process_router(ri)
- def testSingleLoopRouterRemoval(self):
- agent = l3_agent.L3NATAgent(self.conf)
- router_id = _uuid()
-
- self.client_inst.list_ports.return_value = {'ports': []}
+ def testRoutersWithAdminStateDown(self):
+ agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
+ self.plugin_api.get_external_network_id.return_value = None
- self.client_inst.list_networks.return_value = {'networks': []}
+ routers = [
+ {'id': _uuid(),
+ 'admin_state_up': False,
+ 'external_gateway_info': {}}]
+ agent._process_routers(routers)
+ self.assertNotIn(routers[0]['id'], agent.router_info)
- self.client_inst.list_routers.return_value = {'routers': [
- {'id': router_id,
+ def testSingleLoopRouterRemoval(self):
+ agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
+ self.plugin_api.get_external_network_id.return_value = None
+ routers = [
+ {'id': _uuid(),
'admin_state_up': True,
- 'external_gateway_info': {}}]}
- agent.do_single_loop()
-
- self.client_inst.list_routers.return_value = {'routers': []}
- agent.do_single_loop()
- self.external_process.assert_has_calls(
- [mock.call(agent.conf, router_id, 'sudo', 'qrouter-' + router_id),
- mock.call().enable(mock.ANY),
- mock.call(agent.conf, router_id, 'sudo', 'qrouter-' + router_id),
- mock.call().disable()])
+ 'external_gateway_info': {}}]
+ agent._process_routers(routers)
+ agent.router_deleted(None, routers[0]['id'])
# verify that remove is called
self.assertEquals(self.mock_ip.get_devices.call_count, 1)
self.device_exists.assert_has_calls(
[mock.call(self.conf.external_network_bridge)])
- def testDaemonLoop(self):
-
- # just take a pass through the loop, then raise on time.sleep()
- time_sleep_p = mock.patch('time.sleep')
- time_sleep = time_sleep_p.start()
-
- class ExpectedException(Exception):
- pass
-
- time_sleep.side_effect = ExpectedException()
-
- agent = l3_agent.L3NATAgent(self.conf)
- self.assertRaises(ExpectedException, agent.daemon_loop)
-
- time_sleep_p.stop()
-
def testDestroyNamespace(self):
class FakeDev(object):
self.mock_ip.get_devices.return_value = [FakeDev('qr-aaaa'),
FakeDev('qgw-aaaa')]
- agent = l3_agent.L3NATAgent(self.conf)
+ agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
agent._destroy_all_router_namespaces()
-
- def testMain(self):
- agent_mock_p = mock.patch('quantum.agent.l3_agent.L3NATAgent')
- agent_mock = agent_mock_p.start()
- agent_mock.daemon_loop.return_value = None
- with mock.patch('quantum.agent.common.config.setup_logging'):
- with mock.patch('quantum.agent.l3_agent.sys') as mock_sys:
- mock_sys.argv = []
- l3_agent.main()
-
- agent_mock_p.stop()
from quantum.api import extensions
from quantum.api.v2 import attributes
from quantum.common import config
+from quantum.common import constants as l3_constants
from quantum.common import exceptions as q_exc
from quantum.common.test_lib import test_config
from quantum import context
from quantum.db import db_base_plugin_v2
from quantum.db import l3_db
+from quantum.db import l3_rpc_agent_api
from quantum.db import models_v2
from quantum.extensions import l3
from quantum import manager
def setUp(self):
plugin = 'quantum.extensions.l3.RouterPluginBase'
-
# Ensure 'stale' patched copies of the plugin are never returned
manager.QuantumManager._instance = None
self._plugin_patcher = mock.patch(plugin, autospec=True)
self.plugin = self._plugin_patcher.start()
-
# Instantiate mock plugin and enable the 'router' extension
manager.QuantumManager.get_plugin().supported_extension_aliases = (
["router"])
instance = self.plugin.return_value
instance.create_router.return_value = return_value
instance.get_routers_count.return_value = 0
-
res = self.api.post_json(_get_path('routers'), data)
-
instance.create_router.assert_called_with(mock.ANY,
router=data)
self.assertEqual(res.status_int, exc.HTTPCreated.code)
with self.network(router__external=True) as ext_net:
self.assertEqual(ext_net['network'][l3.EXTERNAL],
True)
+
+ def _test_notify_op_agent(self, target_func, *args):
+ l3_rpc_agent_api_str = (
+ 'quantum.db.l3_rpc_agent_api.L3AgentNotifyAPI')
+ oldNotify = l3_rpc_agent_api.L3AgentNofity
+ try:
+ with mock.patch(l3_rpc_agent_api_str) as notifyApi:
+ l3_rpc_agent_api.L3AgentNofity = notifyApi
+ kargs = [item for item in args]
+ kargs.append(notifyApi)
+ target_func(*kargs)
+ except:
+ l3_rpc_agent_api.L3AgentNofity = oldNotify
+ raise
+ else:
+ l3_rpc_agent_api.L3AgentNofity = oldNotify
+
+ def _test_router_gateway_op_agent(self, notifyApi):
+ with self.router() as r:
+ with self.subnet() as s:
+ self._set_net_external(s['subnet']['network_id'])
+ self._add_external_gateway_to_router(
+ r['router']['id'],
+ s['subnet']['network_id'])
+ self._remove_external_gateway_from_router(
+ r['router']['id'],
+ s['subnet']['network_id'])
+ self.assertEquals(
+ 2, notifyApi.routers_updated.call_count)
+
+ def test_router_gateway_op_agent(self):
+ self._test_notify_op_agent(self._test_router_gateway_op_agent)
+
+ def _test_interfaces_op_agent(self, r, notifyApi):
+ with self.port(no_delete=True) as p:
+ self._router_interface_action('add',
+ r['router']['id'],
+ None,
+ p['port']['id'])
+ # clean-up
+ self._router_interface_action('remove',
+ r['router']['id'],
+ None,
+ p['port']['id'])
+ self.assertEquals(2, notifyApi.routers_updated.call_count)
+
+ def test_interfaces_op_agent(self):
+ with self.router() as r:
+ self._test_notify_op_agent(
+ self._test_interfaces_op_agent, r)
+
+ def _test_floatingips_op_agent(self, notifyApi):
+ with self.floatingip_with_assoc() as fip:
+ pass
+ # add gateway, add interface, associate, deletion of floatingip,
+ # delete gateway, delete interface
+ self.assertEquals(6, notifyApi.routers_updated.call_count)
+
+ def test_floatingips_op_agent(self):
+ self._test_notify_op_agent(self._test_floatingips_op_agent)
+
+ def test_l3_agent_routers_query_interfaces(self):
+ with self.router() as r:
+ with self.port(no_delete=True) as p:
+ self._router_interface_action('add',
+ r['router']['id'],
+ None,
+ p['port']['id'])
+
+ plugin = TestL3NatPlugin()
+ routers = plugin.get_sync_data(context.get_admin_context(),
+ None)
+ self.assertEqual(1, len(routers))
+ interfaces = routers[0][l3_constants.INTERFACE_KEY]
+ self.assertEqual(1, len(interfaces))
+ subnet_id = interfaces[0]['subnet']['id']
+ wanted_subnetid = p['port']['fixed_ips'][0]['subnet_id']
+ self.assertEqual(wanted_subnetid, subnet_id)
+ # clean-up
+ self._router_interface_action('remove',
+ r['router']['id'],
+ None,
+ p['port']['id'])
+
+ def test_l3_agent_routers_query_ignore_interfaces_with_moreThanOneIp(self):
+ with self.router() as r:
+ with self.subnet(cidr='9.0.1.0/24') as subnet:
+ with self.port(subnet=subnet,
+ no_delete=True,
+ fixed_ips=[{'ip_address': '9.0.1.3'}]) as p:
+ self._router_interface_action('add',
+ r['router']['id'],
+ None,
+ p['port']['id'])
+ port = {'port': {'fixed_ips':
+ [{'ip_address': '9.0.1.4',
+ 'subnet_id': subnet['subnet']['id']},
+ {'ip_address': '9.0.1.5',
+ 'subnet_id': subnet['subnet']['id']}]}}
+ plugin = TestL3NatPlugin()
+ ctx = context.get_admin_context()
+ plugin.update_port(ctx, p['port']['id'], port)
+ routers = plugin.get_sync_data(ctx, None)
+ self.assertEqual(1, len(routers))
+ interfaces = routers[0].get(l3_constants.INTERFACE_KEY, [])
+ self.assertEqual(0, len(interfaces))
+ # clean-up
+ self._router_interface_action('remove',
+ r['router']['id'],
+ None,
+ p['port']['id'])
+
+ def test_l3_agent_routers_query_gateway(self):
+ with self.router() as r:
+ with self.subnet() as s:
+ self._set_net_external(s['subnet']['network_id'])
+ self._add_external_gateway_to_router(
+ r['router']['id'],
+ s['subnet']['network_id'])
+ plugin = TestL3NatPlugin()
+ routers = plugin.get_sync_data(context.get_admin_context(),
+ [r['router']['id']])
+ self.assertEqual(1, len(routers))
+ gw_port = routers[0]['gw_port']
+ self.assertEquals(s['subnet']['id'], gw_port['subnet']['id'])
+ self._remove_external_gateway_from_router(
+ r['router']['id'],
+ s['subnet']['network_id'])
+
+ def test_l3_agent_routers_query_floatingips(self):
+ with self.floatingip_with_assoc() as fip:
+ plugin = TestL3NatPlugin()
+ routers = plugin.get_sync_data(context.get_admin_context(),
+ [fip['floatingip']['router_id']])
+ self.assertEqual(1, len(routers))
+ floatingips = routers[0][l3_constants.FLOATINGIP_KEY]
+ self.assertEqual(1, len(floatingips))
+ self.assertEquals(floatingips[0]['id'],
+ fip['floatingip']['id'])
+ self.assertEquals(floatingips[0]['port_id'],
+ fip['floatingip']['port_id'])
+ self.assertTrue(floatingips[0]['fixed_ip_address'] is not None)
+ self.assertTrue(floatingips[0]['router_id'] is not None)