From: gongysh Date: Fri, 1 Feb 2013 12:30:12 +0000 (+0800) Subject: Agent management extension X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=881884844d9e4a6abea600a723c8b521eb201d7c;p=openstack-build%2Fneutron-build.git Agent management extension 1/3 part of blueprint quantum-scheduler This patch adds agent management support to l3-agent and plugin-agent (ovs and linuxbridge). Change-Id: Iebc272f45c7530c995f32ef3729b11cd76779385 --- diff --git a/etc/policy.json b/etc/policy.json index cdaad0d17..a427507f5 100644 --- a/etc/policy.json +++ b/etc/policy.json @@ -53,5 +53,10 @@ "create_qos_queue:": "rule:admin_only", "get_qos_queue:": "rule:admin_only", - "get_qos_queues:": "rule:admin_only" + "get_qos_queues:": "rule:admin_only", + + "update_agent": "rule:admin_only", + "delete_agent": "rule:admin_only", + "get_agent": "rule:admin_only", + "get_agents": "rule:admin_only" } diff --git a/etc/quantum.conf b/etc/quantum.conf index f78da1dea..ebf2343d8 100644 --- a/etc/quantum.conf +++ b/etc/quantum.conf @@ -190,6 +190,11 @@ notification_topics = notifications # default driver to use for quota checks # quota_driver = quantum.quota.ConfDriver +# =========== items for agent management extension ============= +# Seconds to regard the agent as down. +# agent_down_time = 5 +# =========== end of items for agent management extension ===== + [DEFAULT_SERVICETYPE] # Description of the default service type (optional) # description = "default service type" @@ -208,6 +213,13 @@ notification_topics = notifications # Change to "sudo" to skip the filtering and just run the comand directly # root_helper = sudo +# =========== items for agent management extension ============= +# seconds between nodes reporting state to server, should be less than +# agent_down_time +# report_interval = 4 + +# =========== end of items for agent management extension ===== + [keystone_authtoken] auth_host = 127.0.0.1 auth_port = 35357 diff --git a/quantum/agent/common/config.py b/quantum/agent/common/config.py index 1b1178026..fe7b19d35 100644 --- a/quantum/agent/common/config.py +++ b/quantum/agent/common/config.py @@ -29,6 +29,11 @@ ROOT_HELPER_OPTS = [ help=_('Root helper application.')), ] +AGENT_STATE_OPTS = [ + cfg.IntOpt('report_interval', default=4, + help=_('Seconds between nodes reporting state to server')), +] + def register_root_helper(conf): # The first call is to ensure backward compatibility @@ -36,6 +41,10 @@ def register_root_helper(conf): conf.register_opts(ROOT_HELPER_OPTS, 'AGENT') +def register_agent_state_opts_helper(conf): + conf.register_opts(AGENT_STATE_OPTS, 'AGENT') + + def get_root_helper(conf): root_helper = conf.AGENT.root_helper if root_helper is not 'sudo': diff --git a/quantum/agent/l3_agent.py b/quantum/agent/l3_agent.py index f5730fde4..d8f3eaf2f 100644 --- a/quantum/agent/l3_agent.py +++ b/quantum/agent/l3_agent.py @@ -32,12 +32,14 @@ from quantum.agent.linux import interface from quantum.agent.linux import ip_lib from quantum.agent.linux import iptables_manager from quantum.agent.linux import utils +from quantum.agent import rpc as agent_rpc from quantum.common import constants as l3_constants from quantum.common import topics from quantum import context from quantum import manager from quantum.openstack.common import importutils from quantum.openstack.common import log as logging +from quantum.openstack.common import loopingcall from quantum.openstack.common import periodic_task from quantum.openstack.common.rpc import common as rpc_common from quantum.openstack.common.rpc import proxy @@ -139,7 +141,7 @@ class L3NATAgent(manager.Manager): help=_("UUID of external network for routers implemented " "by the agents.")), cfg.StrOpt('l3_agent_manager', - default='quantum.agent.l3_agent.L3NATAgent', + default='quantum.agent.l3_agent.L3NATAgentWithStateReport', help=_("The Quantum L3 Agent manager.")), ] @@ -161,6 +163,7 @@ class L3NATAgent(manager.Manager): LOG.exception(_("Error importing interface driver '%s'"), self.conf.interface_driver) sys.exit(1) + self.context = context.get_admin_context_without_session() self.plugin_rpc = L3PluginApi(topics.PLUGIN, host) self.fullsync = True self.sync_sem = semaphore.Semaphore(1) @@ -211,8 +214,7 @@ class L3NATAgent(manager.Manager): if self.conf.gateway_external_network_id: return self.conf.gateway_external_network_id try: - return self.plugin_rpc.get_external_network_id( - context.get_admin_context()) + return self.plugin_rpc.get_external_network_id(self.context) except rpc_common.RemoteError as e: if e.exc_type == 'TooManyExternalNetworks': msg = _( @@ -617,15 +619,69 @@ class L3NATAgent(manager.Manager): LOG.info(_("L3 agent started")) +class L3NATAgentWithStateReport(L3NATAgent): + + def __init__(self, host, conf=None): + super(L3NATAgentWithStateReport, self).__init__(host=host, conf=conf) + self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN) + self.agent_state = { + 'binary': 'quantum-l3-agent', + 'host': host, + 'topic': topics.L3_AGENT, + 'configurations': { + 'use_namespaces': self.conf.use_namespaces, + 'router_id': self.conf.router_id, + 'handle_internal_only_routers': + self.conf.handle_internal_only_routers, + 'gateway_external_network_id': + self.conf.gateway_external_network_id, + 'interface_driver': self.conf.interface_driver}, + 'start_flag': True, + 'agent_type': l3_constants.AGENT_TYPE_L3} + report_interval = cfg.CONF.AGENT.report_interval + if report_interval: + heartbeat = loopingcall.LoopingCall(self._report_state) + heartbeat.start(interval=report_interval) + + def _report_state(self): + num_ex_gw_ports = 0 + num_interfaces = 0 + num_floating_ips = 0 + router_infos = self.router_info.values() + num_routers = len(router_infos) + for ri in router_infos: + ex_gw_port = self._get_ex_gw_port(ri) + if ex_gw_port: + num_ex_gw_ports += 1 + num_interfaces += len(ri.router.get(l3_constants.INTERFACE_KEY, + [])) + num_floating_ips += len(ri.router.get(l3_constants.FLOATINGIP_KEY, + [])) + configurations = self.agent_state['configurations'] + configurations['routers'] = num_routers + configurations['ex_gw_ports'] = num_ex_gw_ports + configurations['interfaces'] = num_interfaces + configurations['floating_ips'] = num_floating_ips + try: + self.state_rpc.report_state(self.context, + self.agent_state) + self.agent_state.pop('start_flag', None) + except Exception: + LOG.exception(_("Failed reporting state!")) + + def main(): eventlet.monkey_patch() conf = cfg.CONF conf.register_opts(L3NATAgent.OPTS) + config.register_agent_state_opts_helper(conf) config.register_root_helper(conf) conf.register_opts(interface.OPTS) conf.register_opts(external_process.OPTS) conf() config.setup_logging(conf) - server = quantum_service.Service.create(binary='quantum-l3-agent', - topic=topics.L3_AGENT) + server = quantum_service.Service.create( + binary='quantum-l3-agent', + topic=topics.L3_AGENT, + report_interval=cfg.CONF.AGENT.report_interval) service.launch(server).wait() diff --git a/quantum/agent/rpc.py b/quantum/agent/rpc.py index 4fb925672..97d9c800f 100644 --- a/quantum/agent/rpc.py +++ b/quantum/agent/rpc.py @@ -49,6 +49,21 @@ def create_consumers(dispatcher, prefix, topic_details): return connection +class PluginReportStateAPI(proxy.RpcProxy): + BASE_RPC_API_VERSION = '1.0' + + def __init__(self, topic): + super(PluginReportStateAPI, self).__init__( + topic=topic, default_version=self.BASE_RPC_API_VERSION) + + def report_state(self, context, agent_state): + return self.cast(context, + self.make_msg('report_state', + agent_state={'agent_state': + agent_state}), + topic=self.topic) + + class PluginApi(proxy.RpcProxy): '''Agent side of the rpc API. diff --git a/quantum/common/constants.py b/quantum/common/constants.py index 8f661f0f4..2ae4e7d74 100644 --- a/quantum/common/constants.py +++ b/quantum/common/constants.py @@ -52,3 +52,9 @@ TYPE_LONG = "long" TYPE_FLOAT = "float" TYPE_LIST = "list" TYPE_DICT = "dict" + +AGENT_TYPE_DHCP = 'DHCP agent' +AGENT_TYPE_OVS = 'Open vSwitch agent' +AGENT_TYPE_LINUXBRIDGE = 'Linux bridge agent' +AGENT_TYPE_L3 = 'L3 agent' +L2_AGENT_TOPIC = 'N/A' diff --git a/quantum/db/agents_db.py b/quantum/db/agents_db.py new file mode 100644 index 000000000..70c56780e --- /dev/null +++ b/quantum/db/agents_db.py @@ -0,0 +1,157 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2013 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import sqlalchemy as sa +from sqlalchemy.orm import exc + +from quantum.db import model_base +from quantum.db import models_v2 +from quantum.extensions import agent as ext_agent +from quantum import manager +from quantum.openstack.common import cfg +from quantum.openstack.common import jsonutils +from quantum.openstack.common import log as logging +from quantum.openstack.common import timeutils + +LOG = logging.getLogger(__name__) +cfg.CONF.register_opt( + cfg.IntOpt('agent_down_time', default=5, + help=_("Seconds to regard the agent is down."))) + + +class Agent(model_base.BASEV2, models_v2.HasId): + """Represents agents running in quantum deployments""" + + # L3 agent, DHCP agent, OVS agent, LinuxBridge + agent_type = sa.Column(sa.String(255), nullable=False) + binary = sa.Column(sa.String(255), nullable=False) + # TOPIC is a fanout exchange topic + topic = sa.Column(sa.String(255), nullable=False) + # TOPIC.host is a target topic + host = sa.Column(sa.String(255), nullable=False) + admin_state_up = sa.Column(sa.Boolean, default=True, + nullable=False) + # the time when first report came from agents + created_at = sa.Column(sa.DateTime, nullable=False) + # the time when first report came after agents start + started_at = sa.Column(sa.DateTime, nullable=False) + # updated when agents report + heartbeat_timestamp = sa.Column(sa.DateTime, nullable=False) + # description is note for admin user + description = sa.Column(sa.String(255)) + # configurations: a json dict string, I think 4095 is enough + configurations = sa.Column(sa.String(4095), nullable=False) + + +class AgentDbMixin(ext_agent.AgentPluginBase): + """Mixin class to add agent extension to db_plugin_base_v2.""" + + def _get_agent(self, context, id): + try: + agent = self._get_by_id(context, Agent, id) + except exc.NoResultFound: + raise ext_agent.AgentNotFound(id=id) + return agent + + def _is_agent_down(self, heart_beat_time_str): + return timeutils.is_older_than(heart_beat_time_str, + cfg.CONF.agent_down_time) + + def _make_agent_dict(self, agent, fields=None): + attr = ext_agent.RESOURCE_ATTRIBUTE_MAP.get( + ext_agent.RESOURCE_NAME + 's') + res = dict((k, agent[k]) for k in attr + if k not in ['alive', 'configurations']) + res['alive'] = not self._is_agent_down(res['heartbeat_timestamp']) + try: + res['configurations'] = jsonutils.loads(agent['configurations']) + except Exception: + msg = _('Configurations for agent %(agent_type)s on host %(host)s' + ' are invalid.') + LOG.warn(msg, {'agent_type': res['agent_type'], + 'host': res['host']}) + res['configurations'] = {} + return self._fields(res, fields) + + def delete_agent(self, context, id): + with context.session.begin(subtransactions=True): + agent = self._get_agent(context, id) + context.session.delete(agent) + + def update_agent(self, context, id, agent): + agent_data = agent['agent'] + with context.session.begin(subtransactions=True): + agent = self._get_agent(context, id) + agent.update(agent_data) + return self._make_agent_dict(agent) + + def get_agents(self, context, filters=None, fields=None): + return self._get_collection(context, Agent, + self._make_agent_dict, + filters=filters, fields=fields) + + def _get_agent_by_type_and_host(self, context, agent_type, host): + query = self._model_query(context, Agent) + try: + agent_db = query.filter(Agent.agent_type == agent_type, + Agent.host == host).one() + return agent_db + except exc.NoResultFound: + raise ext_agent.AgentNotFoundByTypeHost(agent_type=agent_type, + host=host) + except exc.MultipleResultsFound: + raise ext_agent.MultipleAgentFoundByTypeHost(agent_type=agent_type, + host=host) + + def get_agent(self, context, id, fields=None): + agent = self._get_agent(context, id) + return self._make_agent_dict(agent, fields) + + def create_or_update_agent(self, context, agent): + """Create or update agent according to report.""" + with context.session.begin(subtransactions=True): + res_keys = ['agent_type', 'binary', 'host', 'topic'] + res = dict((k, agent[k]) for k in res_keys) + + configurations_dict = agent.get('configurations', {}) + res['configurations'] = jsonutils.dumps(configurations_dict) + current_time = timeutils.utcnow() + try: + agent_db = self._get_agent_by_type_and_host( + context, agent['agent_type'], agent['host']) + res['heartbeat_timestamp'] = current_time + if agent.get('start_flag'): + res['started_at'] = current_time + agent_db.update(res) + except ext_agent.AgentNotFoundByTypeHost: + res['created_at'] = current_time + res['started_at'] = current_time + res['heartbeat_timestamp'] = current_time + res['admin_state_up'] = True + agent_db = Agent(**res) + context.session.add(agent_db) + + +class AgentExtRpcCallback(object): + """Processes the rpc report in plugin implementations.""" + RPC_API_VERSION = '1.0' + + def report_state(self, context, **kwargs): + """Report state from agent to server. """ + agent_state = kwargs['agent_state']['agent_state'] + plugin = manager.QuantumManager.get_plugin() + plugin.create_or_update_agent(context, agent_state) diff --git a/quantum/db/migration/alembic_migrations/versions/511471cc46b_agent_ext_model_supp.py b/quantum/db/migration/alembic_migrations/versions/511471cc46b_agent_ext_model_supp.py new file mode 100644 index 000000000..7151164eb --- /dev/null +++ b/quantum/db/migration/alembic_migrations/versions/511471cc46b_agent_ext_model_supp.py @@ -0,0 +1,73 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 OpenStack LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +"""Add agent management extension model support + +Revision ID: 511471cc46b +Revises: 54c2c487e913 +Create Date: 2013-02-18 05:09:32.523460 + +""" + +# revision identifiers, used by Alembic. +revision = '511471cc46b' +down_revision = '54c2c487e913' + +# Change to ['*'] if this migration applies to all plugins + +migration_for_plugins = [ + 'quantum.plugins.openvswitch.ovs_quantum_plugin.OVSQuantumPluginV2', + 'quantum.plugins.linuxbridge.lb_quantum_plugin.LinuxBridgePluginV2', +] + +from alembic import op +import sqlalchemy as sa + + +from quantum.db import migration + + +def upgrade(active_plugin=None, options=None): + if not migration.should_run(active_plugin, migration_for_plugins): + return + + ### commands auto generated by Alembic - please adjust! ### + op.create_table( + 'agents', + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('agent_type', sa.String(length=255), nullable=False), + sa.Column('binary', sa.String(length=255), nullable=False), + sa.Column('topic', sa.String(length=255), nullable=False), + sa.Column('host', sa.String(length=255), nullable=False), + sa.Column('admin_state_up', sa.Boolean(), nullable=False), + sa.Column('created_at', sa.DateTime(), nullable=False), + sa.Column('started_at', sa.DateTime(), nullable=False), + sa.Column('heartbeat_timestamp', sa.DateTime(), nullable=False), + sa.Column('description', sa.String(length=255), nullable=True), + sa.Column('configurations', sa.String(length=4095), nullable=False), + sa.PrimaryKeyConstraint('id') + ) + ### end Alembic commands ### + + +def downgrade(active_plugin=None, options=None): + if not migration.should_run(active_plugin, migration_for_plugins): + return + + ### commands auto generated by Alembic - please adjust! ### + op.drop_table('agents') + ### end Alembic commands ### diff --git a/quantum/extensions/agent.py b/quantum/extensions/agent.py new file mode 100644 index 000000000..c83325519 --- /dev/null +++ b/quantum/extensions/agent.py @@ -0,0 +1,161 @@ +# Copyright (c) 2013 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import abstractmethod + +from quantum.api import extensions +from quantum.api.v2 import attributes as attr +from quantum.api.v2 import base +from quantum.common import exceptions +from quantum import manager + + +# Attribute Map +RESOURCE_NAME = 'agent' +RESOURCE_ATTRIBUTE_MAP = { + RESOURCE_NAME + 's': { + 'id': {'allow_post': False, 'allow_put': False, + 'validate': {'type:uuid': None}, + 'is_visible': True}, + 'agent_type': {'allow_post': False, 'allow_put': False, + 'is_visible': True}, + 'binary': {'allow_post': False, 'allow_put': False, + 'is_visible': True}, + 'topic': {'allow_post': False, 'allow_put': False, + 'is_visible': True}, + 'host': {'allow_post': False, 'allow_put': False, + 'is_visible': True}, + 'admin_state_up': {'allow_post': False, 'allow_put': True, + 'convert_to': attr.convert_to_boolean, + 'is_visible': True}, + 'created_at': {'allow_post': False, 'allow_put': False, + 'is_visible': True}, + 'started_at': {'allow_post': False, 'allow_put': False, + 'is_visible': True}, + 'heartbeat_timestamp': {'allow_post': False, 'allow_put': False, + 'is_visible': True}, + 'alive': {'allow_post': False, 'allow_put': False, + 'is_visible': True}, + 'configurations': {'allow_post': False, 'allow_put': False, + 'is_visible': True}, + 'description': {'allow_post': False, 'allow_put': True, + 'is_visible': True, + 'validate': {'type:string': None}}, + }, +} + + +class AgentNotFound(exceptions.NotFound): + message = _("Agent %(id)s could not be found") + + +class AgentNotFoundByTypeHost(exceptions.NotFound): + message = _("Agent with agent_type=%(agent_type)s and host=%(host)s " + "could not be found") + + +class MultipleAgentFoundByTypeHost(exceptions.Conflict): + message = _("Multiple agents with agent_type=%(agent_type)s and " + "host=%(host)s found") + + +class Agent(object): + """Agent management extension""" + + @classmethod + def get_name(cls): + return "agent" + + @classmethod + def get_alias(cls): + return "agent" + + @classmethod + def get_description(cls): + return "The agent management extension." + + @classmethod + def get_namespace(cls): + return "http://docs.openstack.org/ext/agent/api/v2.0" + + @classmethod + def get_updated(cls): + return "2013-02-03T10:00:00-00:00" + + @classmethod + def get_resources(cls): + """ Returns Ext Resources """ + my_plurals = [(key, key[:-1]) for key in RESOURCE_ATTRIBUTE_MAP.keys()] + attr.PLURALS.update(dict(my_plurals)) + plugin = manager.QuantumManager.get_plugin() + params = RESOURCE_ATTRIBUTE_MAP.get(RESOURCE_NAME + 's') + controller = base.create_resource(RESOURCE_NAME + 's', + RESOURCE_NAME, + plugin, params + ) + + ex = extensions.ResourceExtension(RESOURCE_NAME + 's', + controller) + + return [ex] + + def get_extended_resources(self, version): + return {} + + +class AgentPluginBase(object): + """ REST API to operate the Agent. + + All of method must be in an admin context. + """ + + def create_agent(self, context, agent): + """ Create agent. + + This operation is not allow in REST API. + @raise exceptions.BadRequest: + """ + raise exceptions.BadRequest + + @abstractmethod + def delete_agent(self, context, id): + """Delete agent. + Agents register themselves on reporting state. + But if a agent does not report its status + for a long time (for example, it is dead for ever. ), + admin can remove it. Agents must be disabled before + being removed. + """ + pass + + @abstractmethod + def update_agent(self, context, agent): + """Disable or Enable the agent. + Discription also can be updated. + + Some agents cannot be disabled, + such as plugins, services. + An error code should be reported in this case. + @raise exceptions.BadRequest: + """ + pass + + @abstractmethod + def get_agents(self, context, filters=None, fields=None): + pass + + @abstractmethod + def get_agent(self, context, id, fields=None): + pass diff --git a/quantum/plugins/linuxbridge/agent/linuxbridge_quantum_agent.py b/quantum/plugins/linuxbridge/agent/linuxbridge_quantum_agent.py index 96c9ac32c..ea28ee2c8 100755 --- a/quantum/plugins/linuxbridge/agent/linuxbridge_quantum_agent.py +++ b/quantum/plugins/linuxbridge/agent/linuxbridge_quantum_agent.py @@ -35,10 +35,12 @@ from quantum.agent.linux import utils from quantum.agent import rpc as agent_rpc from quantum.agent import securitygroups_rpc as sg_rpc from quantum.common import config as logging_config +from quantum.common import constants from quantum.common import topics from quantum.common import utils as q_utils from quantum import context from quantum.openstack.common import log as logging +from quantum.openstack.common import loopingcall from quantum.openstack.common.rpc import dispatcher # NOTE (e0ne): this import is needed for config init from quantum.plugins.linuxbridge.common import config @@ -463,9 +465,27 @@ class LinuxBridgeQuantumAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin): self.polling_interval = polling_interval self.root_helper = root_helper self.setup_linux_bridge(interface_mappings) + self.agent_state = { + 'binary': 'quantum-linuxbridge-agent', + 'host': cfg.CONF.host, + 'topic': constants.L2_AGENT_TOPIC, + 'configurations': interface_mappings, + 'agent_type': constants.AGENT_TYPE_LINUXBRIDGE, + 'start_flag': True} + self.setup_rpc(interface_mappings.values()) self.init_firewall() + def _report_state(self): + try: + devices = len(self.br_mgr.udev_get_tap_devices()) + self.agent_state.get('configurations')['devices'] = devices + self.state_rpc.report_state(self.context, + self.agent_state) + self.agent_state.pop('start_flag', None) + except Exception: + LOG.exception("Failed reporting state!") + def setup_rpc(self, physical_interfaces): if physical_interfaces: mac = utils.get_interface_mac(physical_interfaces[0]) @@ -482,7 +502,7 @@ class LinuxBridgeQuantumAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin): self.topic = topics.AGENT self.plugin_rpc = LinuxBridgePluginApi(topics.PLUGIN) - + self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN) # RPC network init self.context = context.get_admin_context_without_session() # Handle updates from service @@ -496,6 +516,10 @@ class LinuxBridgeQuantumAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin): self.connection = agent_rpc.create_consumers(self.dispatcher, self.topic, consumers) + report_interval = cfg.CONF.AGENT.report_interval + if report_interval: + heartbeat = loopingcall.LoopingCall(self._report_state) + heartbeat.start(interval=report_interval) def setup_linux_bridge(self, interface_mappings): self.br_mgr = LinuxBridgeManager(interface_mappings, self.root_helper) diff --git a/quantum/plugins/linuxbridge/common/config.py b/quantum/plugins/linuxbridge/common/config.py index 43b1e68a3..327bad385 100644 --- a/quantum/plugins/linuxbridge/common/config.py +++ b/quantum/plugins/linuxbridge/common/config.py @@ -51,4 +51,5 @@ agent_opts = [ cfg.CONF.register_opts(vlan_opts, "VLANS") cfg.CONF.register_opts(bridge_opts, "LINUX_BRIDGE") cfg.CONF.register_opts(agent_opts, "AGENT") +config.register_agent_state_opts_helper(cfg.CONF) config.register_root_helper(cfg.CONF) diff --git a/quantum/plugins/linuxbridge/lb_quantum_plugin.py b/quantum/plugins/linuxbridge/lb_quantum_plugin.py index 668e319e2..db6948e49 100644 --- a/quantum/plugins/linuxbridge/lb_quantum_plugin.py +++ b/quantum/plugins/linuxbridge/lb_quantum_plugin.py @@ -24,6 +24,7 @@ from quantum.common import exceptions as q_exc from quantum.common import rpc as q_rpc from quantum.common import topics from quantum.common import utils +from quantum.db import agents_db from quantum.db import api as db_api from quantum.db import db_base_plugin_v2 from quantum.db import dhcp_rpc_base @@ -48,12 +49,13 @@ LOG = logging.getLogger(__name__) class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin, l3_rpc_base.L3RpcCallbackMixin, - sg_db_rpc.SecurityGroupServerRpcCallbackMixin): + sg_db_rpc.SecurityGroupServerRpcCallbackMixin + ): - RPC_API_VERSION = '1.1' - # Device names start with "tap" # history # 1.1 Support Security Group RPC + RPC_API_VERSION = '1.1' + # Device names start with "tap" TAP_PREFIX_LEN = 3 def create_rpc_dispatcher(self): @@ -62,7 +64,8 @@ class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return q_rpc.PluginRpcDispatcher([self]) + return q_rpc.PluginRpcDispatcher([self, + agents_db.AgentExtRpcCallback()]) @classmethod def get_port_from_device(cls, device): @@ -170,7 +173,8 @@ class AgentNotifierApi(proxy.RpcProxy, class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2, l3_db.L3_NAT_db_mixin, - sg_db_rpc.SecurityGroupServerRpcMixin): + sg_db_rpc.SecurityGroupServerRpcMixin, + agents_db.AgentDbMixin): """Implement the Quantum abstractions using Linux bridging. A new VLAN is created for each network. An agent is relied upon @@ -193,7 +197,7 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2, __native_bulk_support = True supported_extension_aliases = ["provider", "router", "binding", "quotas", - "security-group"] + "security-group", "agent"] network_view = "extension:provider_network:view" network_set = "extension:provider_network:set" diff --git a/quantum/plugins/openvswitch/agent/ovs_quantum_agent.py b/quantum/plugins/openvswitch/agent/ovs_quantum_agent.py index be34a86cb..070e2209b 100644 --- a/quantum/plugins/openvswitch/agent/ovs_quantum_agent.py +++ b/quantum/plugins/openvswitch/agent/ovs_quantum_agent.py @@ -32,10 +32,12 @@ from quantum.agent.linux import utils from quantum.agent import rpc as agent_rpc from quantum.agent import securitygroups_rpc as sg_rpc from quantum.common import config as logging_config +from quantum.common import constants as q_const from quantum.common import topics from quantum.common import utils as q_utils from quantum import context from quantum.openstack.common import log as logging +from quantum.openstack.common import loopingcall from quantum.openstack.common.rpc import dispatcher from quantum.plugins.openvswitch.common import config from quantum.extensions import securitygroup as ext_sg @@ -176,7 +178,13 @@ class OVSQuantumAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin): self.tunnel_count = 0 if self.enable_tunneling: self.setup_tunnel_br(tun_br) - + self.agent_state = { + 'binary': 'quantum-openvswitch-agent', + 'host': cfg.CONF.host, + 'topic': q_const.L2_AGENT_TOPIC, + 'configurations': bridge_mappings, + 'agent_type': q_const.AGENT_TYPE_OVS, + 'start_flag': True} self.setup_rpc(integ_br) # Security group agent supprot @@ -184,11 +192,24 @@ class OVSQuantumAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin): self.plugin_rpc, root_helper) + def _report_state(self): + try: + # How many devices are likely used by a VM + ports = self.int_br.get_vif_port_set() + num_devices = len(ports) + self.agent_state.get('configurations')['devices'] = num_devices + self.state_rpc.report_state(self.context, + self.agent_state) + self.agent_state.pop('start_flag', None) + except Exception: + LOG.exception(_("Failed reporting state!")) + def setup_rpc(self, integ_br): mac = utils.get_interface_mac(integ_br) self.agent_id = '%s%s' % ('ovs', (mac.replace(":", ""))) self.topic = topics.AGENT self.plugin_rpc = OVSPluginApi(topics.PLUGIN) + self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN) # RPC network init self.context = context.get_admin_context_without_session() @@ -202,6 +223,10 @@ class OVSQuantumAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin): self.connection = agent_rpc.create_consumers(self.dispatcher, self.topic, consumers) + report_interval = cfg.CONF.AGENT.report_interval + if report_interval: + heartbeat = loopingcall.LoopingCall(self._report_state) + heartbeat.start(interval=report_interval) def get_net_uuid(self, vif_id): for network_id, vlan_mapping in self.local_vlan_map.iteritems(): diff --git a/quantum/plugins/openvswitch/common/config.py b/quantum/plugins/openvswitch/common/config.py index 82f9c0b14..6f16e3cdb 100644 --- a/quantum/plugins/openvswitch/common/config.py +++ b/quantum/plugins/openvswitch/common/config.py @@ -62,4 +62,5 @@ agent_opts = [ cfg.CONF.register_opts(ovs_opts, "OVS") cfg.CONF.register_opts(agent_opts, "AGENT") +config.register_agent_state_opts_helper(cfg.CONF) config.register_root_helper(cfg.CONF) diff --git a/quantum/plugins/openvswitch/ovs_quantum_plugin.py b/quantum/plugins/openvswitch/ovs_quantum_plugin.py index 76754e743..05a0a8981 100644 --- a/quantum/plugins/openvswitch/ovs_quantum_plugin.py +++ b/quantum/plugins/openvswitch/ovs_quantum_plugin.py @@ -30,6 +30,7 @@ from quantum.common import constants as q_const from quantum.common import exceptions as q_exc from quantum.common import rpc as q_rpc from quantum.common import topics +from quantum.db import agents_db from quantum.db import db_base_plugin_v2 from quantum.db import dhcp_rpc_base from quantum.db import l3_db @@ -71,7 +72,8 @@ class OVSRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return q_rpc.PluginRpcDispatcher([self]) + return q_rpc.PluginRpcDispatcher([self, + agents_db.AgentExtRpcCallback()]) @classmethod def get_port_from_device(cls, device): @@ -208,7 +210,9 @@ class AgentNotifierApi(proxy.RpcProxy, class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2, l3_db.L3_NAT_db_mixin, - sg_db_rpc.SecurityGroupServerRpcMixin): + sg_db_rpc.SecurityGroupServerRpcMixin, + agents_db.AgentDbMixin): + """Implement the Quantum abstractions using Open vSwitch. Depending on whether tunneling is enabled, either a GRE tunnel or @@ -231,7 +235,8 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2, # is qualified by class __native_bulk_support = True supported_extension_aliases = ["provider", "router", - "binding", "quotas", "security-group"] + "binding", "quotas", "security-group", + "agent"] network_view = "extension:provider_network:view" network_set = "extension:provider_network:set" diff --git a/quantum/service.py b/quantum/service.py index fa88c7bc6..33529288a 100644 --- a/quantum/service.py +++ b/quantum/service.py @@ -34,9 +34,6 @@ from quantum import wsgi LOG = logging.getLogger(__name__) service_opts = [ - cfg.IntOpt('report_interval', - default=10, - help=_('Seconds between nodes reporting state to datastore')), cfg.IntOpt('periodic_interval', default=40, help=_('Seconds between running periodic tasks')), diff --git a/quantum/tests/unit/openvswitch/test_ovs_quantum_agent.py b/quantum/tests/unit/openvswitch/test_ovs_quantum_agent.py index b11b4ce7e..17f0c8c4f 100644 --- a/quantum/tests/unit/openvswitch/test_ovs_quantum_agent.py +++ b/quantum/tests/unit/openvswitch/test_ovs_quantum_agent.py @@ -50,6 +50,7 @@ class TestOvsQuantumAgent(unittest.TestCase): # Avoid rpc initialization for unit tests cfg.CONF.set_override('rpc_backend', 'quantum.openstack.common.rpc.impl_fake') + cfg.CONF.set_override('report_interval', 0, 'AGENT') kwargs = ovs_quantum_agent.create_agent_config_map(cfg.CONF) with mock.patch('quantum.plugins.openvswitch.agent.ovs_quantum_agent.' 'OVSQuantumAgent.setup_integration_br', @@ -160,14 +161,19 @@ class TestOvsQuantumAgent(unittest.TestCase): 'admin_state_up': True} with mock.patch.object(self.agent.int_br, 'get_vif_port_by_id', return_value='2'): - with mock.patch.object(self.agent, 'port_bound') as port_bound: - self.agent.port_update(mock.Mock(), port=port) - self.assertTrue(port_bound.called) - - with mock.patch.object(self.agent, 'port_dead') as port_dead: - port['admin_state_up'] = False - self.agent.port_update(mock.Mock(), port=port) - self.assertTrue(port_dead.called) + with mock.patch.object(self.agent.plugin_rpc, + 'update_device_up') as device_up: + with mock.patch.object(self.agent, 'port_bound') as port_bound: + self.agent.port_update(mock.Mock(), port=port) + self.assertTrue(port_bound.called) + self.assertTrue(device_up.called) + with mock.patch.object(self.agent.plugin_rpc, + 'update_device_down') as device_down: + with mock.patch.object(self.agent, 'port_dead') as port_dead: + port['admin_state_up'] = False + self.agent.port_update(mock.Mock(), port=port) + self.assertTrue(port_dead.called) + self.assertTrue(device_down.called) def test_process_network_ports(self): reply = {'current': set(['tap0']), diff --git a/quantum/tests/unit/openvswitch/test_ovs_tunnel.py b/quantum/tests/unit/openvswitch/test_ovs_tunnel.py index 5571f8186..8f5e578f5 100644 --- a/quantum/tests/unit/openvswitch/test_ovs_tunnel.py +++ b/quantum/tests/unit/openvswitch/test_ovs_tunnel.py @@ -64,6 +64,7 @@ class TunnelTest(unittest.TestCase): def setUp(self): cfg.CONF.set_override('rpc_backend', 'quantum.openstack.common.rpc.impl_fake') + cfg.CONF.set_override('report_interval', 0, 'AGENT') self.mox = mox.Mox() self.INT_BRIDGE = 'integration_bridge' self.TUN_BRIDGE = 'tunnel_bridge' diff --git a/quantum/tests/unit/test_agent_ext_plugin.py b/quantum/tests/unit/test_agent_ext_plugin.py new file mode 100644 index 000000000..60f65d97f --- /dev/null +++ b/quantum/tests/unit/test_agent_ext_plugin.py @@ -0,0 +1,180 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2013 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import time + +from webob import exc + +from quantum.common import constants +from quantum.common import topics +from quantum.common.test_lib import test_config +from quantum import context +from quantum.db import agents_db +from quantum.db import db_base_plugin_v2 +from quantum.extensions import agent +from quantum.openstack.common import cfg +from quantum.openstack.common import log as logging +from quantum.openstack.common import uuidutils +from quantum.tests.unit import test_api_v2 +from quantum.tests.unit import test_db_plugin + + +LOG = logging.getLogger(__name__) + +_uuid = uuidutils.generate_uuid +_get_path = test_api_v2._get_path +L3_HOSTA = 'hosta' +DHCP_HOSTA = 'hosta' +L3_HOSTB = 'hostb' +DHCP_HOSTC = 'hostc' + + +class AgentTestExtensionManager(object): + + def get_resources(self): + return agent.Agent.get_resources() + + def get_actions(self): + return [] + + def get_request_extensions(self): + return [] + + +# This plugin class is just for testing +class TestAgentPlugin(db_base_plugin_v2.QuantumDbPluginV2, + agents_db.AgentDbMixin): + supported_extension_aliases = ["agent"] + + +class AgentDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase): + fmt = 'json' + + def setUp(self): + self.adminContext = context.get_admin_context() + test_config['plugin_name_v2'] = ( + 'quantum.tests.unit.test_agent_ext_plugin.TestAgentPlugin') + # for these tests we need to enable overlapping ips + cfg.CONF.set_default('allow_overlapping_ips', True) + ext_mgr = AgentTestExtensionManager() + test_config['extension_manager'] = ext_mgr + super(AgentDBTestCase, self).setUp() + + def _list_agents(self, expected_res_status=None, + quantum_context=None, + query_string=None): + comp_res = self._list('agents', + quantum_context=quantum_context, + query_params=query_string) + if expected_res_status: + self.assertEqual(comp_res.status_int, expected_res_status) + return comp_res + + def _register_agent_states(self): + """Register two L3 agents and two DHCP agents.""" + l3_hosta = { + 'binary': 'quantum-l3-agent', + 'host': L3_HOSTA, + 'topic': topics.L3_AGENT, + 'configurations': {'use_namespaces': True, + 'router_id': None, + 'handle_internal_only_routers': + True, + 'gateway_external_network_id': + None, + 'interface_driver': 'interface_driver', + }, + 'agent_type': constants.AGENT_TYPE_L3} + l3_hostb = copy.deepcopy(l3_hosta) + l3_hostb['host'] = L3_HOSTB + dhcp_hosta = { + 'binary': 'quantum-dhcp-agent', + 'host': DHCP_HOSTA, + 'topic': 'DHCP_AGENT', + 'configurations': {'dhcp_driver': 'dhcp_driver', + 'use_namespaces': True, + }, + 'agent_type': constants.AGENT_TYPE_DHCP} + dhcp_hostc = copy.deepcopy(dhcp_hosta) + dhcp_hostc['host'] = DHCP_HOSTC + callback = agents_db.AgentExtRpcCallback() + callback.report_state(self.adminContext, + agent_state={'agent_state': l3_hosta}) + callback.report_state(self.adminContext, + agent_state={'agent_state': l3_hostb}) + callback.report_state(self.adminContext, + agent_state={'agent_state': dhcp_hosta}) + callback.report_state(self.adminContext, + agent_state={'agent_state': dhcp_hostc}) + return [l3_hosta, l3_hostb, dhcp_hosta, dhcp_hostc] + + def test_create_agent(self): + data = {'agent': {}} + _req = self.new_create_request('agents', data, self.fmt) + _req.environ['quantum.context'] = context.Context( + '', 'tenant_id') + res = _req.get_response(self.ext_api) + self.assertEqual(res.status_int, exc.HTTPBadRequest.code) + + def test_list_agent(self): + agents = self._register_agent_states() + res = self._list('agents') + for agent in res['agents']: + if (agent['host'] == DHCP_HOSTA and + agent['agent_type'] == constants.AGENT_TYPE_DHCP): + self.assertEqual( + 'dhcp_driver', + agent['configurations']['dhcp_driver']) + break + self.assertEqual(len(agents), len(res['agents'])) + + def test_show_agent(self): + self._register_agent_states() + agents = self._list_agents( + query_string='binary=quantum-l3-agent') + self.assertEqual(2, len(agents['agents'])) + agent = self._show('agents', agents['agents'][0]['id']) + self.assertEqual('quantum-l3-agent', agent['agent']['binary']) + + def test_update_agent(self): + self._register_agent_states() + agents = self._list_agents( + query_string='binary=quantum-l3-agent&host=' + L3_HOSTB) + self.assertEqual(1, len(agents['agents'])) + com_id = agents['agents'][0]['id'] + agent = self._show('agents', com_id) + new_agent = {} + new_agent['agent'] = {} + new_agent['agent']['admin_state_up'] = False + new_agent['agent']['description'] = 'description' + self._update('agents', com_id, new_agent) + agent = self._show('agents', com_id) + self.assertFalse(agent['agent']['admin_state_up']) + self.assertEqual('description', agent['agent']['description']) + + def test_dead_agent(self): + cfg.CONF.set_override('agent_down_time', 1) + self._register_agent_states() + time.sleep(1.5) + agents = self._list_agents( + query_string='binary=quantum-l3-agent&host=' + L3_HOSTB) + self.assertFalse(agents['agents'][0]['alive']) + + +class AgentDBTestCaseXML(AgentDBTestCase): + fmt = 'xml'