"create_qos_queue:": "rule:admin_only",
"get_qos_queue:": "rule:admin_only",
- "get_qos_queues:": "rule:admin_only"
+ "get_qos_queues:": "rule:admin_only",
+
+ "update_agent": "rule:admin_only",
+ "delete_agent": "rule:admin_only",
+ "get_agent": "rule:admin_only",
+ "get_agents": "rule:admin_only"
}
# default driver to use for quota checks
# quota_driver = quantum.quota.ConfDriver
+# =========== items for agent management extension =============
+# Seconds to regard the agent as down.
+# agent_down_time = 5
+# =========== end of items for agent management extension =====
+
[DEFAULT_SERVICETYPE]
# Description of the default service type (optional)
# description = "default service type"
# Change to "sudo" to skip the filtering and just run the comand directly
# root_helper = sudo
+# =========== items for agent management extension =============
+# seconds between nodes reporting state to server, should be less than
+# agent_down_time
+# report_interval = 4
+
+# =========== end of items for agent management extension =====
+
[keystone_authtoken]
auth_host = 127.0.0.1
auth_port = 35357
help=_('Root helper application.')),
]
+AGENT_STATE_OPTS = [
+ cfg.IntOpt('report_interval', default=4,
+ help=_('Seconds between nodes reporting state to server')),
+]
+
def register_root_helper(conf):
# The first call is to ensure backward compatibility
conf.register_opts(ROOT_HELPER_OPTS, 'AGENT')
+def register_agent_state_opts_helper(conf):
+ conf.register_opts(AGENT_STATE_OPTS, 'AGENT')
+
+
def get_root_helper(conf):
root_helper = conf.AGENT.root_helper
if root_helper is not 'sudo':
from quantum.agent.linux import ip_lib
from quantum.agent.linux import iptables_manager
from quantum.agent.linux import utils
+from quantum.agent import rpc as agent_rpc
from quantum.common import constants as l3_constants
from quantum.common import topics
from quantum import context
from quantum import manager
from quantum.openstack.common import importutils
from quantum.openstack.common import log as logging
+from quantum.openstack.common import loopingcall
from quantum.openstack.common import periodic_task
from quantum.openstack.common.rpc import common as rpc_common
from quantum.openstack.common.rpc import proxy
help=_("UUID of external network for routers implemented "
"by the agents.")),
cfg.StrOpt('l3_agent_manager',
- default='quantum.agent.l3_agent.L3NATAgent',
+ default='quantum.agent.l3_agent.L3NATAgentWithStateReport',
help=_("The Quantum L3 Agent manager.")),
]
LOG.exception(_("Error importing interface driver '%s'"),
self.conf.interface_driver)
sys.exit(1)
+ self.context = context.get_admin_context_without_session()
self.plugin_rpc = L3PluginApi(topics.PLUGIN, host)
self.fullsync = True
self.sync_sem = semaphore.Semaphore(1)
if self.conf.gateway_external_network_id:
return self.conf.gateway_external_network_id
try:
- return self.plugin_rpc.get_external_network_id(
- context.get_admin_context())
+ return self.plugin_rpc.get_external_network_id(self.context)
except rpc_common.RemoteError as e:
if e.exc_type == 'TooManyExternalNetworks':
msg = _(
LOG.info(_("L3 agent started"))
+class L3NATAgentWithStateReport(L3NATAgent):
+
+ def __init__(self, host, conf=None):
+ super(L3NATAgentWithStateReport, self).__init__(host=host, conf=conf)
+ self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
+ self.agent_state = {
+ 'binary': 'quantum-l3-agent',
+ 'host': host,
+ 'topic': topics.L3_AGENT,
+ 'configurations': {
+ 'use_namespaces': self.conf.use_namespaces,
+ 'router_id': self.conf.router_id,
+ 'handle_internal_only_routers':
+ self.conf.handle_internal_only_routers,
+ 'gateway_external_network_id':
+ self.conf.gateway_external_network_id,
+ 'interface_driver': self.conf.interface_driver},
+ 'start_flag': True,
+ 'agent_type': l3_constants.AGENT_TYPE_L3}
+ report_interval = cfg.CONF.AGENT.report_interval
+ if report_interval:
+ heartbeat = loopingcall.LoopingCall(self._report_state)
+ heartbeat.start(interval=report_interval)
+
+ def _report_state(self):
+ num_ex_gw_ports = 0
+ num_interfaces = 0
+ num_floating_ips = 0
+ router_infos = self.router_info.values()
+ num_routers = len(router_infos)
+ for ri in router_infos:
+ ex_gw_port = self._get_ex_gw_port(ri)
+ if ex_gw_port:
+ num_ex_gw_ports += 1
+ num_interfaces += len(ri.router.get(l3_constants.INTERFACE_KEY,
+ []))
+ num_floating_ips += len(ri.router.get(l3_constants.FLOATINGIP_KEY,
+ []))
+ configurations = self.agent_state['configurations']
+ configurations['routers'] = num_routers
+ configurations['ex_gw_ports'] = num_ex_gw_ports
+ configurations['interfaces'] = num_interfaces
+ configurations['floating_ips'] = num_floating_ips
+ try:
+ self.state_rpc.report_state(self.context,
+ self.agent_state)
+ self.agent_state.pop('start_flag', None)
+ except Exception:
+ LOG.exception(_("Failed reporting state!"))
+
+
def main():
eventlet.monkey_patch()
conf = cfg.CONF
conf.register_opts(L3NATAgent.OPTS)
+ config.register_agent_state_opts_helper(conf)
config.register_root_helper(conf)
conf.register_opts(interface.OPTS)
conf.register_opts(external_process.OPTS)
conf()
config.setup_logging(conf)
- server = quantum_service.Service.create(binary='quantum-l3-agent',
- topic=topics.L3_AGENT)
+ server = quantum_service.Service.create(
+ binary='quantum-l3-agent',
+ topic=topics.L3_AGENT,
+ report_interval=cfg.CONF.AGENT.report_interval)
service.launch(server).wait()
return connection
+class PluginReportStateAPI(proxy.RpcProxy):
+ BASE_RPC_API_VERSION = '1.0'
+
+ def __init__(self, topic):
+ super(PluginReportStateAPI, self).__init__(
+ topic=topic, default_version=self.BASE_RPC_API_VERSION)
+
+ def report_state(self, context, agent_state):
+ return self.cast(context,
+ self.make_msg('report_state',
+ agent_state={'agent_state':
+ agent_state}),
+ topic=self.topic)
+
+
class PluginApi(proxy.RpcProxy):
'''Agent side of the rpc API.
TYPE_FLOAT = "float"
TYPE_LIST = "list"
TYPE_DICT = "dict"
+
+AGENT_TYPE_DHCP = 'DHCP agent'
+AGENT_TYPE_OVS = 'Open vSwitch agent'
+AGENT_TYPE_LINUXBRIDGE = 'Linux bridge agent'
+AGENT_TYPE_L3 = 'L3 agent'
+L2_AGENT_TOPIC = 'N/A'
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2013 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import sqlalchemy as sa
+from sqlalchemy.orm import exc
+
+from quantum.db import model_base
+from quantum.db import models_v2
+from quantum.extensions import agent as ext_agent
+from quantum import manager
+from quantum.openstack.common import cfg
+from quantum.openstack.common import jsonutils
+from quantum.openstack.common import log as logging
+from quantum.openstack.common import timeutils
+
+LOG = logging.getLogger(__name__)
+cfg.CONF.register_opt(
+ cfg.IntOpt('agent_down_time', default=5,
+ help=_("Seconds to regard the agent is down.")))
+
+
+class Agent(model_base.BASEV2, models_v2.HasId):
+ """Represents agents running in quantum deployments"""
+
+ # L3 agent, DHCP agent, OVS agent, LinuxBridge
+ agent_type = sa.Column(sa.String(255), nullable=False)
+ binary = sa.Column(sa.String(255), nullable=False)
+ # TOPIC is a fanout exchange topic
+ topic = sa.Column(sa.String(255), nullable=False)
+ # TOPIC.host is a target topic
+ host = sa.Column(sa.String(255), nullable=False)
+ admin_state_up = sa.Column(sa.Boolean, default=True,
+ nullable=False)
+ # the time when first report came from agents
+ created_at = sa.Column(sa.DateTime, nullable=False)
+ # the time when first report came after agents start
+ started_at = sa.Column(sa.DateTime, nullable=False)
+ # updated when agents report
+ heartbeat_timestamp = sa.Column(sa.DateTime, nullable=False)
+ # description is note for admin user
+ description = sa.Column(sa.String(255))
+ # configurations: a json dict string, I think 4095 is enough
+ configurations = sa.Column(sa.String(4095), nullable=False)
+
+
+class AgentDbMixin(ext_agent.AgentPluginBase):
+ """Mixin class to add agent extension to db_plugin_base_v2."""
+
+ def _get_agent(self, context, id):
+ try:
+ agent = self._get_by_id(context, Agent, id)
+ except exc.NoResultFound:
+ raise ext_agent.AgentNotFound(id=id)
+ return agent
+
+ def _is_agent_down(self, heart_beat_time_str):
+ return timeutils.is_older_than(heart_beat_time_str,
+ cfg.CONF.agent_down_time)
+
+ def _make_agent_dict(self, agent, fields=None):
+ attr = ext_agent.RESOURCE_ATTRIBUTE_MAP.get(
+ ext_agent.RESOURCE_NAME + 's')
+ res = dict((k, agent[k]) for k in attr
+ if k not in ['alive', 'configurations'])
+ res['alive'] = not self._is_agent_down(res['heartbeat_timestamp'])
+ try:
+ res['configurations'] = jsonutils.loads(agent['configurations'])
+ except Exception:
+ msg = _('Configurations for agent %(agent_type)s on host %(host)s'
+ ' are invalid.')
+ LOG.warn(msg, {'agent_type': res['agent_type'],
+ 'host': res['host']})
+ res['configurations'] = {}
+ return self._fields(res, fields)
+
+ def delete_agent(self, context, id):
+ with context.session.begin(subtransactions=True):
+ agent = self._get_agent(context, id)
+ context.session.delete(agent)
+
+ def update_agent(self, context, id, agent):
+ agent_data = agent['agent']
+ with context.session.begin(subtransactions=True):
+ agent = self._get_agent(context, id)
+ agent.update(agent_data)
+ return self._make_agent_dict(agent)
+
+ def get_agents(self, context, filters=None, fields=None):
+ return self._get_collection(context, Agent,
+ self._make_agent_dict,
+ filters=filters, fields=fields)
+
+ def _get_agent_by_type_and_host(self, context, agent_type, host):
+ query = self._model_query(context, Agent)
+ try:
+ agent_db = query.filter(Agent.agent_type == agent_type,
+ Agent.host == host).one()
+ return agent_db
+ except exc.NoResultFound:
+ raise ext_agent.AgentNotFoundByTypeHost(agent_type=agent_type,
+ host=host)
+ except exc.MultipleResultsFound:
+ raise ext_agent.MultipleAgentFoundByTypeHost(agent_type=agent_type,
+ host=host)
+
+ def get_agent(self, context, id, fields=None):
+ agent = self._get_agent(context, id)
+ return self._make_agent_dict(agent, fields)
+
+ def create_or_update_agent(self, context, agent):
+ """Create or update agent according to report."""
+ with context.session.begin(subtransactions=True):
+ res_keys = ['agent_type', 'binary', 'host', 'topic']
+ res = dict((k, agent[k]) for k in res_keys)
+
+ configurations_dict = agent.get('configurations', {})
+ res['configurations'] = jsonutils.dumps(configurations_dict)
+ current_time = timeutils.utcnow()
+ try:
+ agent_db = self._get_agent_by_type_and_host(
+ context, agent['agent_type'], agent['host'])
+ res['heartbeat_timestamp'] = current_time
+ if agent.get('start_flag'):
+ res['started_at'] = current_time
+ agent_db.update(res)
+ except ext_agent.AgentNotFoundByTypeHost:
+ res['created_at'] = current_time
+ res['started_at'] = current_time
+ res['heartbeat_timestamp'] = current_time
+ res['admin_state_up'] = True
+ agent_db = Agent(**res)
+ context.session.add(agent_db)
+
+
+class AgentExtRpcCallback(object):
+ """Processes the rpc report in plugin implementations."""
+ RPC_API_VERSION = '1.0'
+
+ def report_state(self, context, **kwargs):
+ """Report state from agent to server. """
+ agent_state = kwargs['agent_state']['agent_state']
+ plugin = manager.QuantumManager.get_plugin()
+ plugin.create_or_update_agent(context, agent_state)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 OpenStack LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+"""Add agent management extension model support
+
+Revision ID: 511471cc46b
+Revises: 54c2c487e913
+Create Date: 2013-02-18 05:09:32.523460
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '511471cc46b'
+down_revision = '54c2c487e913'
+
+# Change to ['*'] if this migration applies to all plugins
+
+migration_for_plugins = [
+ 'quantum.plugins.openvswitch.ovs_quantum_plugin.OVSQuantumPluginV2',
+ 'quantum.plugins.linuxbridge.lb_quantum_plugin.LinuxBridgePluginV2',
+]
+
+from alembic import op
+import sqlalchemy as sa
+
+
+from quantum.db import migration
+
+
+def upgrade(active_plugin=None, options=None):
+ if not migration.should_run(active_plugin, migration_for_plugins):
+ return
+
+ ### commands auto generated by Alembic - please adjust! ###
+ op.create_table(
+ 'agents',
+ sa.Column('id', sa.String(length=36), nullable=False),
+ sa.Column('agent_type', sa.String(length=255), nullable=False),
+ sa.Column('binary', sa.String(length=255), nullable=False),
+ sa.Column('topic', sa.String(length=255), nullable=False),
+ sa.Column('host', sa.String(length=255), nullable=False),
+ sa.Column('admin_state_up', sa.Boolean(), nullable=False),
+ sa.Column('created_at', sa.DateTime(), nullable=False),
+ sa.Column('started_at', sa.DateTime(), nullable=False),
+ sa.Column('heartbeat_timestamp', sa.DateTime(), nullable=False),
+ sa.Column('description', sa.String(length=255), nullable=True),
+ sa.Column('configurations', sa.String(length=4095), nullable=False),
+ sa.PrimaryKeyConstraint('id')
+ )
+ ### end Alembic commands ###
+
+
+def downgrade(active_plugin=None, options=None):
+ if not migration.should_run(active_plugin, migration_for_plugins):
+ return
+
+ ### commands auto generated by Alembic - please adjust! ###
+ op.drop_table('agents')
+ ### end Alembic commands ###
--- /dev/null
+# Copyright (c) 2013 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import abstractmethod
+
+from quantum.api import extensions
+from quantum.api.v2 import attributes as attr
+from quantum.api.v2 import base
+from quantum.common import exceptions
+from quantum import manager
+
+
+# Attribute Map
+RESOURCE_NAME = 'agent'
+RESOURCE_ATTRIBUTE_MAP = {
+ RESOURCE_NAME + 's': {
+ 'id': {'allow_post': False, 'allow_put': False,
+ 'validate': {'type:uuid': None},
+ 'is_visible': True},
+ 'agent_type': {'allow_post': False, 'allow_put': False,
+ 'is_visible': True},
+ 'binary': {'allow_post': False, 'allow_put': False,
+ 'is_visible': True},
+ 'topic': {'allow_post': False, 'allow_put': False,
+ 'is_visible': True},
+ 'host': {'allow_post': False, 'allow_put': False,
+ 'is_visible': True},
+ 'admin_state_up': {'allow_post': False, 'allow_put': True,
+ 'convert_to': attr.convert_to_boolean,
+ 'is_visible': True},
+ 'created_at': {'allow_post': False, 'allow_put': False,
+ 'is_visible': True},
+ 'started_at': {'allow_post': False, 'allow_put': False,
+ 'is_visible': True},
+ 'heartbeat_timestamp': {'allow_post': False, 'allow_put': False,
+ 'is_visible': True},
+ 'alive': {'allow_post': False, 'allow_put': False,
+ 'is_visible': True},
+ 'configurations': {'allow_post': False, 'allow_put': False,
+ 'is_visible': True},
+ 'description': {'allow_post': False, 'allow_put': True,
+ 'is_visible': True,
+ 'validate': {'type:string': None}},
+ },
+}
+
+
+class AgentNotFound(exceptions.NotFound):
+ message = _("Agent %(id)s could not be found")
+
+
+class AgentNotFoundByTypeHost(exceptions.NotFound):
+ message = _("Agent with agent_type=%(agent_type)s and host=%(host)s "
+ "could not be found")
+
+
+class MultipleAgentFoundByTypeHost(exceptions.Conflict):
+ message = _("Multiple agents with agent_type=%(agent_type)s and "
+ "host=%(host)s found")
+
+
+class Agent(object):
+ """Agent management extension"""
+
+ @classmethod
+ def get_name(cls):
+ return "agent"
+
+ @classmethod
+ def get_alias(cls):
+ return "agent"
+
+ @classmethod
+ def get_description(cls):
+ return "The agent management extension."
+
+ @classmethod
+ def get_namespace(cls):
+ return "http://docs.openstack.org/ext/agent/api/v2.0"
+
+ @classmethod
+ def get_updated(cls):
+ return "2013-02-03T10:00:00-00:00"
+
+ @classmethod
+ def get_resources(cls):
+ """ Returns Ext Resources """
+ my_plurals = [(key, key[:-1]) for key in RESOURCE_ATTRIBUTE_MAP.keys()]
+ attr.PLURALS.update(dict(my_plurals))
+ plugin = manager.QuantumManager.get_plugin()
+ params = RESOURCE_ATTRIBUTE_MAP.get(RESOURCE_NAME + 's')
+ controller = base.create_resource(RESOURCE_NAME + 's',
+ RESOURCE_NAME,
+ plugin, params
+ )
+
+ ex = extensions.ResourceExtension(RESOURCE_NAME + 's',
+ controller)
+
+ return [ex]
+
+ def get_extended_resources(self, version):
+ return {}
+
+
+class AgentPluginBase(object):
+ """ REST API to operate the Agent.
+
+ All of method must be in an admin context.
+ """
+
+ def create_agent(self, context, agent):
+ """ Create agent.
+
+ This operation is not allow in REST API.
+ @raise exceptions.BadRequest:
+ """
+ raise exceptions.BadRequest
+
+ @abstractmethod
+ def delete_agent(self, context, id):
+ """Delete agent.
+ Agents register themselves on reporting state.
+ But if a agent does not report its status
+ for a long time (for example, it is dead for ever. ),
+ admin can remove it. Agents must be disabled before
+ being removed.
+ """
+ pass
+
+ @abstractmethod
+ def update_agent(self, context, agent):
+ """Disable or Enable the agent.
+ Discription also can be updated.
+
+ Some agents cannot be disabled,
+ such as plugins, services.
+ An error code should be reported in this case.
+ @raise exceptions.BadRequest:
+ """
+ pass
+
+ @abstractmethod
+ def get_agents(self, context, filters=None, fields=None):
+ pass
+
+ @abstractmethod
+ def get_agent(self, context, id, fields=None):
+ pass
from quantum.agent import rpc as agent_rpc
from quantum.agent import securitygroups_rpc as sg_rpc
from quantum.common import config as logging_config
+from quantum.common import constants
from quantum.common import topics
from quantum.common import utils as q_utils
from quantum import context
from quantum.openstack.common import log as logging
+from quantum.openstack.common import loopingcall
from quantum.openstack.common.rpc import dispatcher
# NOTE (e0ne): this import is needed for config init
from quantum.plugins.linuxbridge.common import config
self.polling_interval = polling_interval
self.root_helper = root_helper
self.setup_linux_bridge(interface_mappings)
+ self.agent_state = {
+ 'binary': 'quantum-linuxbridge-agent',
+ 'host': cfg.CONF.host,
+ 'topic': constants.L2_AGENT_TOPIC,
+ 'configurations': interface_mappings,
+ 'agent_type': constants.AGENT_TYPE_LINUXBRIDGE,
+ 'start_flag': True}
+
self.setup_rpc(interface_mappings.values())
self.init_firewall()
+ def _report_state(self):
+ try:
+ devices = len(self.br_mgr.udev_get_tap_devices())
+ self.agent_state.get('configurations')['devices'] = devices
+ self.state_rpc.report_state(self.context,
+ self.agent_state)
+ self.agent_state.pop('start_flag', None)
+ except Exception:
+ LOG.exception("Failed reporting state!")
+
def setup_rpc(self, physical_interfaces):
if physical_interfaces:
mac = utils.get_interface_mac(physical_interfaces[0])
self.topic = topics.AGENT
self.plugin_rpc = LinuxBridgePluginApi(topics.PLUGIN)
-
+ self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
# RPC network init
self.context = context.get_admin_context_without_session()
# Handle updates from service
self.connection = agent_rpc.create_consumers(self.dispatcher,
self.topic,
consumers)
+ report_interval = cfg.CONF.AGENT.report_interval
+ if report_interval:
+ heartbeat = loopingcall.LoopingCall(self._report_state)
+ heartbeat.start(interval=report_interval)
def setup_linux_bridge(self, interface_mappings):
self.br_mgr = LinuxBridgeManager(interface_mappings, self.root_helper)
cfg.CONF.register_opts(vlan_opts, "VLANS")
cfg.CONF.register_opts(bridge_opts, "LINUX_BRIDGE")
cfg.CONF.register_opts(agent_opts, "AGENT")
+config.register_agent_state_opts_helper(cfg.CONF)
config.register_root_helper(cfg.CONF)
from quantum.common import rpc as q_rpc
from quantum.common import topics
from quantum.common import utils
+from quantum.db import agents_db
from quantum.db import api as db_api
from quantum.db import db_base_plugin_v2
from quantum.db import dhcp_rpc_base
class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
l3_rpc_base.L3RpcCallbackMixin,
- sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
+ sg_db_rpc.SecurityGroupServerRpcCallbackMixin
+ ):
- RPC_API_VERSION = '1.1'
- # Device names start with "tap"
# history
# 1.1 Support Security Group RPC
+ RPC_API_VERSION = '1.1'
+ # Device names start with "tap"
TAP_PREFIX_LEN = 3
def create_rpc_dispatcher(self):
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
- return q_rpc.PluginRpcDispatcher([self])
+ return q_rpc.PluginRpcDispatcher([self,
+ agents_db.AgentExtRpcCallback()])
@classmethod
def get_port_from_device(cls, device):
class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2,
l3_db.L3_NAT_db_mixin,
- sg_db_rpc.SecurityGroupServerRpcMixin):
+ sg_db_rpc.SecurityGroupServerRpcMixin,
+ agents_db.AgentDbMixin):
"""Implement the Quantum abstractions using Linux bridging.
A new VLAN is created for each network. An agent is relied upon
__native_bulk_support = True
supported_extension_aliases = ["provider", "router", "binding", "quotas",
- "security-group"]
+ "security-group", "agent"]
network_view = "extension:provider_network:view"
network_set = "extension:provider_network:set"
from quantum.agent import rpc as agent_rpc
from quantum.agent import securitygroups_rpc as sg_rpc
from quantum.common import config as logging_config
+from quantum.common import constants as q_const
from quantum.common import topics
from quantum.common import utils as q_utils
from quantum import context
from quantum.openstack.common import log as logging
+from quantum.openstack.common import loopingcall
from quantum.openstack.common.rpc import dispatcher
from quantum.plugins.openvswitch.common import config
from quantum.extensions import securitygroup as ext_sg
self.tunnel_count = 0
if self.enable_tunneling:
self.setup_tunnel_br(tun_br)
-
+ self.agent_state = {
+ 'binary': 'quantum-openvswitch-agent',
+ 'host': cfg.CONF.host,
+ 'topic': q_const.L2_AGENT_TOPIC,
+ 'configurations': bridge_mappings,
+ 'agent_type': q_const.AGENT_TYPE_OVS,
+ 'start_flag': True}
self.setup_rpc(integ_br)
# Security group agent supprot
self.plugin_rpc,
root_helper)
+ def _report_state(self):
+ try:
+ # How many devices are likely used by a VM
+ ports = self.int_br.get_vif_port_set()
+ num_devices = len(ports)
+ self.agent_state.get('configurations')['devices'] = num_devices
+ self.state_rpc.report_state(self.context,
+ self.agent_state)
+ self.agent_state.pop('start_flag', None)
+ except Exception:
+ LOG.exception(_("Failed reporting state!"))
+
def setup_rpc(self, integ_br):
mac = utils.get_interface_mac(integ_br)
self.agent_id = '%s%s' % ('ovs', (mac.replace(":", "")))
self.topic = topics.AGENT
self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
+ self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
# RPC network init
self.context = context.get_admin_context_without_session()
self.connection = agent_rpc.create_consumers(self.dispatcher,
self.topic,
consumers)
+ report_interval = cfg.CONF.AGENT.report_interval
+ if report_interval:
+ heartbeat = loopingcall.LoopingCall(self._report_state)
+ heartbeat.start(interval=report_interval)
def get_net_uuid(self, vif_id):
for network_id, vlan_mapping in self.local_vlan_map.iteritems():
cfg.CONF.register_opts(ovs_opts, "OVS")
cfg.CONF.register_opts(agent_opts, "AGENT")
+config.register_agent_state_opts_helper(cfg.CONF)
config.register_root_helper(cfg.CONF)
from quantum.common import exceptions as q_exc
from quantum.common import rpc as q_rpc
from quantum.common import topics
+from quantum.db import agents_db
from quantum.db import db_base_plugin_v2
from quantum.db import dhcp_rpc_base
from quantum.db import l3_db
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
- return q_rpc.PluginRpcDispatcher([self])
+ return q_rpc.PluginRpcDispatcher([self,
+ agents_db.AgentExtRpcCallback()])
@classmethod
def get_port_from_device(cls, device):
class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
l3_db.L3_NAT_db_mixin,
- sg_db_rpc.SecurityGroupServerRpcMixin):
+ sg_db_rpc.SecurityGroupServerRpcMixin,
+ agents_db.AgentDbMixin):
+
"""Implement the Quantum abstractions using Open vSwitch.
Depending on whether tunneling is enabled, either a GRE tunnel or
# is qualified by class
__native_bulk_support = True
supported_extension_aliases = ["provider", "router",
- "binding", "quotas", "security-group"]
+ "binding", "quotas", "security-group",
+ "agent"]
network_view = "extension:provider_network:view"
network_set = "extension:provider_network:set"
LOG = logging.getLogger(__name__)
service_opts = [
- cfg.IntOpt('report_interval',
- default=10,
- help=_('Seconds between nodes reporting state to datastore')),
cfg.IntOpt('periodic_interval',
default=40,
help=_('Seconds between running periodic tasks')),
# Avoid rpc initialization for unit tests
cfg.CONF.set_override('rpc_backend',
'quantum.openstack.common.rpc.impl_fake')
+ cfg.CONF.set_override('report_interval', 0, 'AGENT')
kwargs = ovs_quantum_agent.create_agent_config_map(cfg.CONF)
with mock.patch('quantum.plugins.openvswitch.agent.ovs_quantum_agent.'
'OVSQuantumAgent.setup_integration_br',
'admin_state_up': True}
with mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
return_value='2'):
- with mock.patch.object(self.agent, 'port_bound') as port_bound:
- self.agent.port_update(mock.Mock(), port=port)
- self.assertTrue(port_bound.called)
-
- with mock.patch.object(self.agent, 'port_dead') as port_dead:
- port['admin_state_up'] = False
- self.agent.port_update(mock.Mock(), port=port)
- self.assertTrue(port_dead.called)
+ with mock.patch.object(self.agent.plugin_rpc,
+ 'update_device_up') as device_up:
+ with mock.patch.object(self.agent, 'port_bound') as port_bound:
+ self.agent.port_update(mock.Mock(), port=port)
+ self.assertTrue(port_bound.called)
+ self.assertTrue(device_up.called)
+ with mock.patch.object(self.agent.plugin_rpc,
+ 'update_device_down') as device_down:
+ with mock.patch.object(self.agent, 'port_dead') as port_dead:
+ port['admin_state_up'] = False
+ self.agent.port_update(mock.Mock(), port=port)
+ self.assertTrue(port_dead.called)
+ self.assertTrue(device_down.called)
def test_process_network_ports(self):
reply = {'current': set(['tap0']),
def setUp(self):
cfg.CONF.set_override('rpc_backend',
'quantum.openstack.common.rpc.impl_fake')
+ cfg.CONF.set_override('report_interval', 0, 'AGENT')
self.mox = mox.Mox()
self.INT_BRIDGE = 'integration_bridge'
self.TUN_BRIDGE = 'tunnel_bridge'
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2013 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import copy
+import time
+
+from webob import exc
+
+from quantum.common import constants
+from quantum.common import topics
+from quantum.common.test_lib import test_config
+from quantum import context
+from quantum.db import agents_db
+from quantum.db import db_base_plugin_v2
+from quantum.extensions import agent
+from quantum.openstack.common import cfg
+from quantum.openstack.common import log as logging
+from quantum.openstack.common import uuidutils
+from quantum.tests.unit import test_api_v2
+from quantum.tests.unit import test_db_plugin
+
+
+LOG = logging.getLogger(__name__)
+
+_uuid = uuidutils.generate_uuid
+_get_path = test_api_v2._get_path
+L3_HOSTA = 'hosta'
+DHCP_HOSTA = 'hosta'
+L3_HOSTB = 'hostb'
+DHCP_HOSTC = 'hostc'
+
+
+class AgentTestExtensionManager(object):
+
+ def get_resources(self):
+ return agent.Agent.get_resources()
+
+ def get_actions(self):
+ return []
+
+ def get_request_extensions(self):
+ return []
+
+
+# This plugin class is just for testing
+class TestAgentPlugin(db_base_plugin_v2.QuantumDbPluginV2,
+ agents_db.AgentDbMixin):
+ supported_extension_aliases = ["agent"]
+
+
+class AgentDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
+ fmt = 'json'
+
+ def setUp(self):
+ self.adminContext = context.get_admin_context()
+ test_config['plugin_name_v2'] = (
+ 'quantum.tests.unit.test_agent_ext_plugin.TestAgentPlugin')
+ # for these tests we need to enable overlapping ips
+ cfg.CONF.set_default('allow_overlapping_ips', True)
+ ext_mgr = AgentTestExtensionManager()
+ test_config['extension_manager'] = ext_mgr
+ super(AgentDBTestCase, self).setUp()
+
+ def _list_agents(self, expected_res_status=None,
+ quantum_context=None,
+ query_string=None):
+ comp_res = self._list('agents',
+ quantum_context=quantum_context,
+ query_params=query_string)
+ if expected_res_status:
+ self.assertEqual(comp_res.status_int, expected_res_status)
+ return comp_res
+
+ def _register_agent_states(self):
+ """Register two L3 agents and two DHCP agents."""
+ l3_hosta = {
+ 'binary': 'quantum-l3-agent',
+ 'host': L3_HOSTA,
+ 'topic': topics.L3_AGENT,
+ 'configurations': {'use_namespaces': True,
+ 'router_id': None,
+ 'handle_internal_only_routers':
+ True,
+ 'gateway_external_network_id':
+ None,
+ 'interface_driver': 'interface_driver',
+ },
+ 'agent_type': constants.AGENT_TYPE_L3}
+ l3_hostb = copy.deepcopy(l3_hosta)
+ l3_hostb['host'] = L3_HOSTB
+ dhcp_hosta = {
+ 'binary': 'quantum-dhcp-agent',
+ 'host': DHCP_HOSTA,
+ 'topic': 'DHCP_AGENT',
+ 'configurations': {'dhcp_driver': 'dhcp_driver',
+ 'use_namespaces': True,
+ },
+ 'agent_type': constants.AGENT_TYPE_DHCP}
+ dhcp_hostc = copy.deepcopy(dhcp_hosta)
+ dhcp_hostc['host'] = DHCP_HOSTC
+ callback = agents_db.AgentExtRpcCallback()
+ callback.report_state(self.adminContext,
+ agent_state={'agent_state': l3_hosta})
+ callback.report_state(self.adminContext,
+ agent_state={'agent_state': l3_hostb})
+ callback.report_state(self.adminContext,
+ agent_state={'agent_state': dhcp_hosta})
+ callback.report_state(self.adminContext,
+ agent_state={'agent_state': dhcp_hostc})
+ return [l3_hosta, l3_hostb, dhcp_hosta, dhcp_hostc]
+
+ def test_create_agent(self):
+ data = {'agent': {}}
+ _req = self.new_create_request('agents', data, self.fmt)
+ _req.environ['quantum.context'] = context.Context(
+ '', 'tenant_id')
+ res = _req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
+
+ def test_list_agent(self):
+ agents = self._register_agent_states()
+ res = self._list('agents')
+ for agent in res['agents']:
+ if (agent['host'] == DHCP_HOSTA and
+ agent['agent_type'] == constants.AGENT_TYPE_DHCP):
+ self.assertEqual(
+ 'dhcp_driver',
+ agent['configurations']['dhcp_driver'])
+ break
+ self.assertEqual(len(agents), len(res['agents']))
+
+ def test_show_agent(self):
+ self._register_agent_states()
+ agents = self._list_agents(
+ query_string='binary=quantum-l3-agent')
+ self.assertEqual(2, len(agents['agents']))
+ agent = self._show('agents', agents['agents'][0]['id'])
+ self.assertEqual('quantum-l3-agent', agent['agent']['binary'])
+
+ def test_update_agent(self):
+ self._register_agent_states()
+ agents = self._list_agents(
+ query_string='binary=quantum-l3-agent&host=' + L3_HOSTB)
+ self.assertEqual(1, len(agents['agents']))
+ com_id = agents['agents'][0]['id']
+ agent = self._show('agents', com_id)
+ new_agent = {}
+ new_agent['agent'] = {}
+ new_agent['agent']['admin_state_up'] = False
+ new_agent['agent']['description'] = 'description'
+ self._update('agents', com_id, new_agent)
+ agent = self._show('agents', com_id)
+ self.assertFalse(agent['agent']['admin_state_up'])
+ self.assertEqual('description', agent['agent']['description'])
+
+ def test_dead_agent(self):
+ cfg.CONF.set_override('agent_down_time', 1)
+ self._register_agent_states()
+ time.sleep(1.5)
+ agents = self._list_agents(
+ query_string='binary=quantum-l3-agent&host=' + L3_HOSTB)
+ self.assertFalse(agents['agents'][0]['alive'])
+
+
+class AgentDBTestCaseXML(AgentDBTestCase):
+ fmt = 'xml'