]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Agent management extension
authorgongysh <gongysh@linux.vnet.ibm.com>
Fri, 1 Feb 2013 12:30:12 +0000 (20:30 +0800)
committergongysh <gongysh@linux.vnet.ibm.com>
Mon, 18 Feb 2013 03:43:45 +0000 (11:43 +0800)
1/3 part of blueprint quantum-scheduler

This patch adds agent management support
to l3-agent and plugin-agent (ovs and linuxbridge).

Change-Id: Iebc272f45c7530c995f32ef3729b11cd76779385

19 files changed:
etc/policy.json
etc/quantum.conf
quantum/agent/common/config.py
quantum/agent/l3_agent.py
quantum/agent/rpc.py
quantum/common/constants.py
quantum/db/agents_db.py [new file with mode: 0644]
quantum/db/migration/alembic_migrations/versions/511471cc46b_agent_ext_model_supp.py [new file with mode: 0644]
quantum/extensions/agent.py [new file with mode: 0644]
quantum/plugins/linuxbridge/agent/linuxbridge_quantum_agent.py
quantum/plugins/linuxbridge/common/config.py
quantum/plugins/linuxbridge/lb_quantum_plugin.py
quantum/plugins/openvswitch/agent/ovs_quantum_agent.py
quantum/plugins/openvswitch/common/config.py
quantum/plugins/openvswitch/ovs_quantum_plugin.py
quantum/service.py
quantum/tests/unit/openvswitch/test_ovs_quantum_agent.py
quantum/tests/unit/openvswitch/test_ovs_tunnel.py
quantum/tests/unit/test_agent_ext_plugin.py [new file with mode: 0644]

index cdaad0d17ade2b99341ced790e0f8b1ca4fddeaf..a427507f5bf2ef3a7e26106ff1cf812917d8f384 100644 (file)
 
     "create_qos_queue:": "rule:admin_only",
     "get_qos_queue:": "rule:admin_only",
-    "get_qos_queues:": "rule:admin_only"
+    "get_qos_queues:": "rule:admin_only",
+
+    "update_agent": "rule:admin_only",
+    "delete_agent": "rule:admin_only",
+    "get_agent": "rule:admin_only",
+    "get_agents": "rule:admin_only"
 }
index f78da1deaec6d2cf46845bd36d32f99bb9587495..ebf2343d864a59791087eb6756fc6db6627040c4 100644 (file)
@@ -190,6 +190,11 @@ notification_topics = notifications
 # default driver to use for quota checks
 # quota_driver = quantum.quota.ConfDriver
 
+# =========== items for agent management extension =============
+# Seconds to regard the agent as down.
+# agent_down_time = 5
+# ===========  end of items for agent management extension =====
+
 [DEFAULT_SERVICETYPE]
 # Description of the default service type (optional)
 # description = "default service type"
@@ -208,6 +213,13 @@ notification_topics = notifications
 # Change to "sudo" to skip the filtering and just run the comand directly
 # root_helper = sudo
 
+# =========== items for agent management extension =============
+# seconds between nodes reporting state to server, should be less than
+# agent_down_time
+# report_interval = 4
+
+# ===========  end of items for agent management extension =====
+
 [keystone_authtoken]
 auth_host = 127.0.0.1
 auth_port = 35357
index 1b117802663565411d8cf81b16e3c48102004114..fe7b19d35561d08a00a6ca5af19d325920302956 100644 (file)
@@ -29,6 +29,11 @@ ROOT_HELPER_OPTS = [
                help=_('Root helper application.')),
 ]
 
+AGENT_STATE_OPTS = [
+    cfg.IntOpt('report_interval', default=4,
+               help=_('Seconds between nodes reporting state to server')),
+]
+
 
 def register_root_helper(conf):
     # The first call is to ensure backward compatibility
@@ -36,6 +41,10 @@ def register_root_helper(conf):
     conf.register_opts(ROOT_HELPER_OPTS, 'AGENT')
 
 
+def register_agent_state_opts_helper(conf):
+    conf.register_opts(AGENT_STATE_OPTS, 'AGENT')
+
+
 def get_root_helper(conf):
     root_helper = conf.AGENT.root_helper
     if root_helper is not 'sudo':
index f5730fde4ee5c210fce7c7b373e916c8e7a4f075..d8f3eaf2f4050cdc10571f2cb5c698d97c61bc27 100644 (file)
@@ -32,12 +32,14 @@ from quantum.agent.linux import interface
 from quantum.agent.linux import ip_lib
 from quantum.agent.linux import iptables_manager
 from quantum.agent.linux import utils
+from quantum.agent import rpc as agent_rpc
 from quantum.common import constants as l3_constants
 from quantum.common import topics
 from quantum import context
 from quantum import manager
 from quantum.openstack.common import importutils
 from quantum.openstack.common import log as logging
+from quantum.openstack.common import loopingcall
 from quantum.openstack.common import periodic_task
 from quantum.openstack.common.rpc import common as rpc_common
 from quantum.openstack.common.rpc import proxy
@@ -139,7 +141,7 @@ class L3NATAgent(manager.Manager):
                    help=_("UUID of external network for routers implemented "
                           "by the agents.")),
         cfg.StrOpt('l3_agent_manager',
-                   default='quantum.agent.l3_agent.L3NATAgent',
+                   default='quantum.agent.l3_agent.L3NATAgentWithStateReport',
                    help=_("The Quantum L3 Agent manager.")),
     ]
 
@@ -161,6 +163,7 @@ class L3NATAgent(manager.Manager):
             LOG.exception(_("Error importing interface driver '%s'"),
                           self.conf.interface_driver)
             sys.exit(1)
+        self.context = context.get_admin_context_without_session()
         self.plugin_rpc = L3PluginApi(topics.PLUGIN, host)
         self.fullsync = True
         self.sync_sem = semaphore.Semaphore(1)
@@ -211,8 +214,7 @@ class L3NATAgent(manager.Manager):
         if self.conf.gateway_external_network_id:
             return self.conf.gateway_external_network_id
         try:
-            return self.plugin_rpc.get_external_network_id(
-                context.get_admin_context())
+            return self.plugin_rpc.get_external_network_id(self.context)
         except rpc_common.RemoteError as e:
             if e.exc_type == 'TooManyExternalNetworks':
                 msg = _(
@@ -617,15 +619,69 @@ class L3NATAgent(manager.Manager):
         LOG.info(_("L3 agent started"))
 
 
+class L3NATAgentWithStateReport(L3NATAgent):
+
+    def __init__(self, host, conf=None):
+        super(L3NATAgentWithStateReport, self).__init__(host=host, conf=conf)
+        self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
+        self.agent_state = {
+            'binary': 'quantum-l3-agent',
+            'host': host,
+            'topic': topics.L3_AGENT,
+            'configurations': {
+                'use_namespaces': self.conf.use_namespaces,
+                'router_id': self.conf.router_id,
+                'handle_internal_only_routers':
+                self.conf.handle_internal_only_routers,
+                'gateway_external_network_id':
+                self.conf.gateway_external_network_id,
+                'interface_driver': self.conf.interface_driver},
+            'start_flag': True,
+            'agent_type': l3_constants.AGENT_TYPE_L3}
+        report_interval = cfg.CONF.AGENT.report_interval
+        if report_interval:
+            heartbeat = loopingcall.LoopingCall(self._report_state)
+            heartbeat.start(interval=report_interval)
+
+    def _report_state(self):
+        num_ex_gw_ports = 0
+        num_interfaces = 0
+        num_floating_ips = 0
+        router_infos = self.router_info.values()
+        num_routers = len(router_infos)
+        for ri in router_infos:
+            ex_gw_port = self._get_ex_gw_port(ri)
+            if ex_gw_port:
+                num_ex_gw_ports += 1
+            num_interfaces += len(ri.router.get(l3_constants.INTERFACE_KEY,
+                                                []))
+            num_floating_ips += len(ri.router.get(l3_constants.FLOATINGIP_KEY,
+                                                  []))
+        configurations = self.agent_state['configurations']
+        configurations['routers'] = num_routers
+        configurations['ex_gw_ports'] = num_ex_gw_ports
+        configurations['interfaces'] = num_interfaces
+        configurations['floating_ips'] = num_floating_ips
+        try:
+            self.state_rpc.report_state(self.context,
+                                        self.agent_state)
+            self.agent_state.pop('start_flag', None)
+        except Exception:
+            LOG.exception(_("Failed reporting state!"))
+
+
 def main():
     eventlet.monkey_patch()
     conf = cfg.CONF
     conf.register_opts(L3NATAgent.OPTS)
+    config.register_agent_state_opts_helper(conf)
     config.register_root_helper(conf)
     conf.register_opts(interface.OPTS)
     conf.register_opts(external_process.OPTS)
     conf()
     config.setup_logging(conf)
-    server = quantum_service.Service.create(binary='quantum-l3-agent',
-                                            topic=topics.L3_AGENT)
+    server = quantum_service.Service.create(
+        binary='quantum-l3-agent',
+        topic=topics.L3_AGENT,
+        report_interval=cfg.CONF.AGENT.report_interval)
     service.launch(server).wait()
index 4fb92567295177bedc85e2af4ad27311f44dff18..97d9c800f4b7f860135389eb9f26235f9840041b 100644 (file)
@@ -49,6 +49,21 @@ def create_consumers(dispatcher, prefix, topic_details):
     return connection
 
 
+class PluginReportStateAPI(proxy.RpcProxy):
+    BASE_RPC_API_VERSION = '1.0'
+
+    def __init__(self, topic):
+        super(PluginReportStateAPI, self).__init__(
+            topic=topic, default_version=self.BASE_RPC_API_VERSION)
+
+    def report_state(self, context, agent_state):
+        return self.cast(context,
+                         self.make_msg('report_state',
+                                       agent_state={'agent_state':
+                                                    agent_state}),
+                         topic=self.topic)
+
+
 class PluginApi(proxy.RpcProxy):
     '''Agent side of the rpc API.
 
index 8f661f0f4b115f62ba9cd6e512b37f8c48e04c41..2ae4e7d74101be4bf917b284d94d2bc5bff7b938 100644 (file)
@@ -52,3 +52,9 @@ TYPE_LONG = "long"
 TYPE_FLOAT = "float"
 TYPE_LIST = "list"
 TYPE_DICT = "dict"
+
+AGENT_TYPE_DHCP = 'DHCP agent'
+AGENT_TYPE_OVS = 'Open vSwitch agent'
+AGENT_TYPE_LINUXBRIDGE = 'Linux bridge agent'
+AGENT_TYPE_L3 = 'L3 agent'
+L2_AGENT_TOPIC = 'N/A'
diff --git a/quantum/db/agents_db.py b/quantum/db/agents_db.py
new file mode 100644 (file)
index 0000000..70c5678
--- /dev/null
@@ -0,0 +1,157 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2013 OpenStack LLC.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import sqlalchemy as sa
+from sqlalchemy.orm import exc
+
+from quantum.db import model_base
+from quantum.db import models_v2
+from quantum.extensions import agent as ext_agent
+from quantum import manager
+from quantum.openstack.common import cfg
+from quantum.openstack.common import jsonutils
+from quantum.openstack.common import log as logging
+from quantum.openstack.common import timeutils
+
+LOG = logging.getLogger(__name__)
+cfg.CONF.register_opt(
+    cfg.IntOpt('agent_down_time', default=5,
+               help=_("Seconds to regard the agent is down.")))
+
+
+class Agent(model_base.BASEV2, models_v2.HasId):
+    """Represents agents running in quantum deployments"""
+
+    # L3 agent, DHCP agent, OVS agent, LinuxBridge
+    agent_type = sa.Column(sa.String(255), nullable=False)
+    binary = sa.Column(sa.String(255), nullable=False)
+    # TOPIC is a fanout exchange topic
+    topic = sa.Column(sa.String(255), nullable=False)
+    # TOPIC.host is a target topic
+    host = sa.Column(sa.String(255), nullable=False)
+    admin_state_up = sa.Column(sa.Boolean, default=True,
+                               nullable=False)
+    # the time when first report came from agents
+    created_at = sa.Column(sa.DateTime, nullable=False)
+    # the time when first report came after agents start
+    started_at = sa.Column(sa.DateTime, nullable=False)
+    # updated when agents report
+    heartbeat_timestamp = sa.Column(sa.DateTime, nullable=False)
+    # description is note for admin user
+    description = sa.Column(sa.String(255))
+    # configurations: a json dict string, I think 4095 is enough
+    configurations = sa.Column(sa.String(4095), nullable=False)
+
+
+class AgentDbMixin(ext_agent.AgentPluginBase):
+    """Mixin class to add agent extension to db_plugin_base_v2."""
+
+    def _get_agent(self, context, id):
+        try:
+            agent = self._get_by_id(context, Agent, id)
+        except exc.NoResultFound:
+            raise ext_agent.AgentNotFound(id=id)
+        return agent
+
+    def _is_agent_down(self, heart_beat_time_str):
+        return timeutils.is_older_than(heart_beat_time_str,
+                                       cfg.CONF.agent_down_time)
+
+    def _make_agent_dict(self, agent, fields=None):
+        attr = ext_agent.RESOURCE_ATTRIBUTE_MAP.get(
+            ext_agent.RESOURCE_NAME + 's')
+        res = dict((k, agent[k]) for k in attr
+                   if k not in ['alive', 'configurations'])
+        res['alive'] = not self._is_agent_down(res['heartbeat_timestamp'])
+        try:
+            res['configurations'] = jsonutils.loads(agent['configurations'])
+        except Exception:
+            msg = _('Configurations for agent %(agent_type)s on host %(host)s'
+                    ' are invalid.')
+            LOG.warn(msg, {'agent_type': res['agent_type'],
+                           'host': res['host']})
+            res['configurations'] = {}
+        return self._fields(res, fields)
+
+    def delete_agent(self, context, id):
+        with context.session.begin(subtransactions=True):
+            agent = self._get_agent(context, id)
+            context.session.delete(agent)
+
+    def update_agent(self, context, id, agent):
+        agent_data = agent['agent']
+        with context.session.begin(subtransactions=True):
+            agent = self._get_agent(context, id)
+            agent.update(agent_data)
+        return self._make_agent_dict(agent)
+
+    def get_agents(self, context, filters=None, fields=None):
+        return self._get_collection(context, Agent,
+                                    self._make_agent_dict,
+                                    filters=filters, fields=fields)
+
+    def _get_agent_by_type_and_host(self, context, agent_type, host):
+        query = self._model_query(context, Agent)
+        try:
+            agent_db = query.filter(Agent.agent_type == agent_type,
+                                    Agent.host == host).one()
+            return agent_db
+        except exc.NoResultFound:
+            raise ext_agent.AgentNotFoundByTypeHost(agent_type=agent_type,
+                                                    host=host)
+        except exc.MultipleResultsFound:
+            raise ext_agent.MultipleAgentFoundByTypeHost(agent_type=agent_type,
+                                                         host=host)
+
+    def get_agent(self, context, id, fields=None):
+        agent = self._get_agent(context, id)
+        return self._make_agent_dict(agent, fields)
+
+    def create_or_update_agent(self, context, agent):
+        """Create or update agent according to report."""
+        with context.session.begin(subtransactions=True):
+            res_keys = ['agent_type', 'binary', 'host', 'topic']
+            res = dict((k, agent[k]) for k in res_keys)
+
+            configurations_dict = agent.get('configurations', {})
+            res['configurations'] = jsonutils.dumps(configurations_dict)
+            current_time = timeutils.utcnow()
+            try:
+                agent_db = self._get_agent_by_type_and_host(
+                    context, agent['agent_type'], agent['host'])
+                res['heartbeat_timestamp'] = current_time
+                if agent.get('start_flag'):
+                    res['started_at'] = current_time
+                agent_db.update(res)
+            except ext_agent.AgentNotFoundByTypeHost:
+                res['created_at'] = current_time
+                res['started_at'] = current_time
+                res['heartbeat_timestamp'] = current_time
+                res['admin_state_up'] = True
+                agent_db = Agent(**res)
+                context.session.add(agent_db)
+
+
+class AgentExtRpcCallback(object):
+    """Processes the rpc report in plugin implementations."""
+    RPC_API_VERSION = '1.0'
+
+    def report_state(self, context, **kwargs):
+        """Report state from agent to server. """
+        agent_state = kwargs['agent_state']['agent_state']
+        plugin = manager.QuantumManager.get_plugin()
+        plugin.create_or_update_agent(context, agent_state)
diff --git a/quantum/db/migration/alembic_migrations/versions/511471cc46b_agent_ext_model_supp.py b/quantum/db/migration/alembic_migrations/versions/511471cc46b_agent_ext_model_supp.py
new file mode 100644 (file)
index 0000000..7151164
--- /dev/null
@@ -0,0 +1,73 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 OpenStack LLC
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+
+"""Add agent management extension model support
+
+Revision ID: 511471cc46b
+Revises: 54c2c487e913
+Create Date: 2013-02-18 05:09:32.523460
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '511471cc46b'
+down_revision = '54c2c487e913'
+
+# Change to ['*'] if this migration applies to all plugins
+
+migration_for_plugins = [
+    'quantum.plugins.openvswitch.ovs_quantum_plugin.OVSQuantumPluginV2',
+    'quantum.plugins.linuxbridge.lb_quantum_plugin.LinuxBridgePluginV2',
+]
+
+from alembic import op
+import sqlalchemy as sa
+
+
+from quantum.db import migration
+
+
+def upgrade(active_plugin=None, options=None):
+    if not migration.should_run(active_plugin, migration_for_plugins):
+        return
+
+    ### commands auto generated by Alembic - please adjust! ###
+    op.create_table(
+        'agents',
+        sa.Column('id', sa.String(length=36), nullable=False),
+        sa.Column('agent_type', sa.String(length=255), nullable=False),
+        sa.Column('binary', sa.String(length=255), nullable=False),
+        sa.Column('topic', sa.String(length=255), nullable=False),
+        sa.Column('host', sa.String(length=255), nullable=False),
+        sa.Column('admin_state_up', sa.Boolean(), nullable=False),
+        sa.Column('created_at', sa.DateTime(), nullable=False),
+        sa.Column('started_at', sa.DateTime(), nullable=False),
+        sa.Column('heartbeat_timestamp', sa.DateTime(), nullable=False),
+        sa.Column('description', sa.String(length=255), nullable=True),
+        sa.Column('configurations', sa.String(length=4095), nullable=False),
+        sa.PrimaryKeyConstraint('id')
+    )
+    ### end Alembic commands ###
+
+
+def downgrade(active_plugin=None, options=None):
+    if not migration.should_run(active_plugin, migration_for_plugins):
+        return
+
+    ### commands auto generated by Alembic - please adjust! ###
+    op.drop_table('agents')
+    ### end Alembic commands ###
diff --git a/quantum/extensions/agent.py b/quantum/extensions/agent.py
new file mode 100644 (file)
index 0000000..c833255
--- /dev/null
@@ -0,0 +1,161 @@
+# Copyright (c) 2013 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import abstractmethod
+
+from quantum.api import extensions
+from quantum.api.v2 import attributes as attr
+from quantum.api.v2 import base
+from quantum.common import exceptions
+from quantum import manager
+
+
+# Attribute Map
+RESOURCE_NAME = 'agent'
+RESOURCE_ATTRIBUTE_MAP = {
+    RESOURCE_NAME + 's': {
+        'id': {'allow_post': False, 'allow_put': False,
+               'validate': {'type:uuid': None},
+               'is_visible': True},
+        'agent_type': {'allow_post': False, 'allow_put': False,
+                       'is_visible': True},
+        'binary': {'allow_post': False, 'allow_put': False,
+                   'is_visible': True},
+        'topic': {'allow_post': False, 'allow_put': False,
+                  'is_visible': True},
+        'host': {'allow_post': False, 'allow_put': False,
+                 'is_visible': True},
+        'admin_state_up': {'allow_post': False, 'allow_put': True,
+                           'convert_to': attr.convert_to_boolean,
+                           'is_visible': True},
+        'created_at': {'allow_post': False, 'allow_put': False,
+                       'is_visible': True},
+        'started_at': {'allow_post': False, 'allow_put': False,
+                       'is_visible': True},
+        'heartbeat_timestamp': {'allow_post': False, 'allow_put': False,
+                                'is_visible': True},
+        'alive': {'allow_post': False, 'allow_put': False,
+                  'is_visible': True},
+        'configurations': {'allow_post': False, 'allow_put': False,
+                           'is_visible': True},
+        'description': {'allow_post': False, 'allow_put': True,
+                        'is_visible': True,
+                        'validate': {'type:string': None}},
+    },
+}
+
+
+class AgentNotFound(exceptions.NotFound):
+    message = _("Agent %(id)s could not be found")
+
+
+class AgentNotFoundByTypeHost(exceptions.NotFound):
+    message = _("Agent with agent_type=%(agent_type)s and host=%(host)s "
+                "could not be found")
+
+
+class MultipleAgentFoundByTypeHost(exceptions.Conflict):
+    message = _("Multiple agents with agent_type=%(agent_type)s and "
+                "host=%(host)s found")
+
+
+class Agent(object):
+    """Agent management extension"""
+
+    @classmethod
+    def get_name(cls):
+        return "agent"
+
+    @classmethod
+    def get_alias(cls):
+        return "agent"
+
+    @classmethod
+    def get_description(cls):
+        return "The agent management extension."
+
+    @classmethod
+    def get_namespace(cls):
+        return "http://docs.openstack.org/ext/agent/api/v2.0"
+
+    @classmethod
+    def get_updated(cls):
+        return "2013-02-03T10:00:00-00:00"
+
+    @classmethod
+    def get_resources(cls):
+        """ Returns Ext Resources """
+        my_plurals = [(key, key[:-1]) for key in RESOURCE_ATTRIBUTE_MAP.keys()]
+        attr.PLURALS.update(dict(my_plurals))
+        plugin = manager.QuantumManager.get_plugin()
+        params = RESOURCE_ATTRIBUTE_MAP.get(RESOURCE_NAME + 's')
+        controller = base.create_resource(RESOURCE_NAME + 's',
+                                          RESOURCE_NAME,
+                                          plugin, params
+                                          )
+
+        ex = extensions.ResourceExtension(RESOURCE_NAME + 's',
+                                          controller)
+
+        return [ex]
+
+    def get_extended_resources(self, version):
+        return {}
+
+
+class AgentPluginBase(object):
+    """ REST API to operate the Agent.
+
+    All of method must be in an admin context.
+    """
+
+    def create_agent(self, context, agent):
+        """ Create agent.
+
+        This operation is not allow in REST API.
+        @raise exceptions.BadRequest:
+        """
+        raise exceptions.BadRequest
+
+    @abstractmethod
+    def delete_agent(self, context, id):
+        """Delete agent.
+        Agents register themselves on reporting state.
+        But if a agent does not report its status
+        for a long time (for example, it is dead for ever. ),
+        admin can remove it. Agents must be disabled before
+        being removed.
+        """
+        pass
+
+    @abstractmethod
+    def update_agent(self, context, agent):
+        """Disable or Enable the agent.
+        Discription also can be updated.
+
+        Some agents cannot be disabled,
+        such as plugins, services.
+        An error code should be reported in this case.
+        @raise exceptions.BadRequest:
+        """
+        pass
+
+    @abstractmethod
+    def get_agents(self, context, filters=None, fields=None):
+        pass
+
+    @abstractmethod
+    def get_agent(self, context, id, fields=None):
+        pass
index 96c9ac32c4a93163623b9531bf68fb1c7471b3de..ea28ee2c802577b686a3434add752b765c00d9bd 100755 (executable)
@@ -35,10 +35,12 @@ from quantum.agent.linux import utils
 from quantum.agent import rpc as agent_rpc
 from quantum.agent import securitygroups_rpc as sg_rpc
 from quantum.common import config as logging_config
+from quantum.common import constants
 from quantum.common import topics
 from quantum.common import utils as q_utils
 from quantum import context
 from quantum.openstack.common import log as logging
+from quantum.openstack.common import loopingcall
 from quantum.openstack.common.rpc import dispatcher
 # NOTE (e0ne): this import is needed for config init
 from quantum.plugins.linuxbridge.common import config
@@ -463,9 +465,27 @@ class LinuxBridgeQuantumAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
         self.polling_interval = polling_interval
         self.root_helper = root_helper
         self.setup_linux_bridge(interface_mappings)
+        self.agent_state = {
+            'binary': 'quantum-linuxbridge-agent',
+            'host': cfg.CONF.host,
+            'topic': constants.L2_AGENT_TOPIC,
+            'configurations': interface_mappings,
+            'agent_type': constants.AGENT_TYPE_LINUXBRIDGE,
+            'start_flag': True}
+
         self.setup_rpc(interface_mappings.values())
         self.init_firewall()
 
+    def _report_state(self):
+        try:
+            devices = len(self.br_mgr.udev_get_tap_devices())
+            self.agent_state.get('configurations')['devices'] = devices
+            self.state_rpc.report_state(self.context,
+                                        self.agent_state)
+            self.agent_state.pop('start_flag', None)
+        except Exception:
+            LOG.exception("Failed reporting state!")
+
     def setup_rpc(self, physical_interfaces):
         if physical_interfaces:
             mac = utils.get_interface_mac(physical_interfaces[0])
@@ -482,7 +502,7 @@ class LinuxBridgeQuantumAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
 
         self.topic = topics.AGENT
         self.plugin_rpc = LinuxBridgePluginApi(topics.PLUGIN)
-
+        self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
         # RPC network init
         self.context = context.get_admin_context_without_session()
         # Handle updates from service
@@ -496,6 +516,10 @@ class LinuxBridgeQuantumAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin):
         self.connection = agent_rpc.create_consumers(self.dispatcher,
                                                      self.topic,
                                                      consumers)
+        report_interval = cfg.CONF.AGENT.report_interval
+        if report_interval:
+            heartbeat = loopingcall.LoopingCall(self._report_state)
+            heartbeat.start(interval=report_interval)
 
     def setup_linux_bridge(self, interface_mappings):
         self.br_mgr = LinuxBridgeManager(interface_mappings, self.root_helper)
index 43b1e68a313e3f7e233fc49d9117b35a316de1d0..327bad3855fcf842033b402c97c0c05808da95b5 100644 (file)
@@ -51,4 +51,5 @@ agent_opts = [
 cfg.CONF.register_opts(vlan_opts, "VLANS")
 cfg.CONF.register_opts(bridge_opts, "LINUX_BRIDGE")
 cfg.CONF.register_opts(agent_opts, "AGENT")
+config.register_agent_state_opts_helper(cfg.CONF)
 config.register_root_helper(cfg.CONF)
index 668e319e2a0539138548a9439c281aa8efcfbed2..db6948e4906839eb6c369c7999196224a46de4cd 100644 (file)
@@ -24,6 +24,7 @@ from quantum.common import exceptions as q_exc
 from quantum.common import rpc as q_rpc
 from quantum.common import topics
 from quantum.common import utils
+from quantum.db import agents_db
 from quantum.db import api as db_api
 from quantum.db import db_base_plugin_v2
 from quantum.db import dhcp_rpc_base
@@ -48,12 +49,13 @@ LOG = logging.getLogger(__name__)
 
 class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
                               l3_rpc_base.L3RpcCallbackMixin,
-                              sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
+                              sg_db_rpc.SecurityGroupServerRpcCallbackMixin
+                              ):
 
-    RPC_API_VERSION = '1.1'
-    # Device names start with "tap"
     # history
     #   1.1 Support Security Group RPC
+    RPC_API_VERSION = '1.1'
+    # Device names start with "tap"
     TAP_PREFIX_LEN = 3
 
     def create_rpc_dispatcher(self):
@@ -62,7 +64,8 @@ class LinuxBridgeRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
         If a manager would like to set an rpc API version, or support more than
         one class as the target of rpc messages, override this method.
         '''
-        return q_rpc.PluginRpcDispatcher([self])
+        return q_rpc.PluginRpcDispatcher([self,
+                                          agents_db.AgentExtRpcCallback()])
 
     @classmethod
     def get_port_from_device(cls, device):
@@ -170,7 +173,8 @@ class AgentNotifierApi(proxy.RpcProxy,
 
 class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2,
                           l3_db.L3_NAT_db_mixin,
-                          sg_db_rpc.SecurityGroupServerRpcMixin):
+                          sg_db_rpc.SecurityGroupServerRpcMixin,
+                          agents_db.AgentDbMixin):
     """Implement the Quantum abstractions using Linux bridging.
 
     A new VLAN is created for each network.  An agent is relied upon
@@ -193,7 +197,7 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2,
     __native_bulk_support = True
 
     supported_extension_aliases = ["provider", "router", "binding", "quotas",
-                                   "security-group"]
+                                   "security-group", "agent"]
 
     network_view = "extension:provider_network:view"
     network_set = "extension:provider_network:set"
index be34a86cbd2c4802856053aaef6179db81c2c7e2..070e2209b7e3bf847a6e2f95c6fb5c10106a71fc 100644 (file)
@@ -32,10 +32,12 @@ from quantum.agent.linux import utils
 from quantum.agent import rpc as agent_rpc
 from quantum.agent import securitygroups_rpc as sg_rpc
 from quantum.common import config as logging_config
+from quantum.common import constants as q_const
 from quantum.common import topics
 from quantum.common import utils as q_utils
 from quantum import context
 from quantum.openstack.common import log as logging
+from quantum.openstack.common import loopingcall
 from quantum.openstack.common.rpc import dispatcher
 from quantum.plugins.openvswitch.common import config
 from quantum.extensions import securitygroup as ext_sg
@@ -176,7 +178,13 @@ class OVSQuantumAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
         self.tunnel_count = 0
         if self.enable_tunneling:
             self.setup_tunnel_br(tun_br)
-
+        self.agent_state = {
+            'binary': 'quantum-openvswitch-agent',
+            'host': cfg.CONF.host,
+            'topic': q_const.L2_AGENT_TOPIC,
+            'configurations': bridge_mappings,
+            'agent_type': q_const.AGENT_TYPE_OVS,
+            'start_flag': True}
         self.setup_rpc(integ_br)
 
         # Security group agent supprot
@@ -184,11 +192,24 @@ class OVSQuantumAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
                                               self.plugin_rpc,
                                               root_helper)
 
+    def _report_state(self):
+        try:
+            # How many devices are likely used by a VM
+            ports = self.int_br.get_vif_port_set()
+            num_devices = len(ports)
+            self.agent_state.get('configurations')['devices'] = num_devices
+            self.state_rpc.report_state(self.context,
+                                        self.agent_state)
+            self.agent_state.pop('start_flag', None)
+        except Exception:
+            LOG.exception(_("Failed reporting state!"))
+
     def setup_rpc(self, integ_br):
         mac = utils.get_interface_mac(integ_br)
         self.agent_id = '%s%s' % ('ovs', (mac.replace(":", "")))
         self.topic = topics.AGENT
         self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
+        self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
 
         # RPC network init
         self.context = context.get_admin_context_without_session()
@@ -202,6 +223,10 @@ class OVSQuantumAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
         self.connection = agent_rpc.create_consumers(self.dispatcher,
                                                      self.topic,
                                                      consumers)
+        report_interval = cfg.CONF.AGENT.report_interval
+        if report_interval:
+            heartbeat = loopingcall.LoopingCall(self._report_state)
+            heartbeat.start(interval=report_interval)
 
     def get_net_uuid(self, vif_id):
         for network_id, vlan_mapping in self.local_vlan_map.iteritems():
index 82f9c0b14315a857747dbdbfef698939b3487140..6f16e3cdbb8b59b7be2a9dacc9043ca8502483c9 100644 (file)
@@ -62,4 +62,5 @@ agent_opts = [
 
 cfg.CONF.register_opts(ovs_opts, "OVS")
 cfg.CONF.register_opts(agent_opts, "AGENT")
+config.register_agent_state_opts_helper(cfg.CONF)
 config.register_root_helper(cfg.CONF)
index 76754e7430d50f87511183dbebf6ea8f98788d90..05a0a898154388699e712d0a521b59c537ab406f 100644 (file)
@@ -30,6 +30,7 @@ from quantum.common import constants as q_const
 from quantum.common import exceptions as q_exc
 from quantum.common import rpc as q_rpc
 from quantum.common import topics
+from quantum.db import agents_db
 from quantum.db import db_base_plugin_v2
 from quantum.db import dhcp_rpc_base
 from quantum.db import l3_db
@@ -71,7 +72,8 @@ class OVSRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
         If a manager would like to set an rpc API version, or support more than
         one class as the target of rpc messages, override this method.
         '''
-        return q_rpc.PluginRpcDispatcher([self])
+        return q_rpc.PluginRpcDispatcher([self,
+                                          agents_db.AgentExtRpcCallback()])
 
     @classmethod
     def get_port_from_device(cls, device):
@@ -208,7 +210,9 @@ class AgentNotifierApi(proxy.RpcProxy,
 
 class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
                          l3_db.L3_NAT_db_mixin,
-                         sg_db_rpc.SecurityGroupServerRpcMixin):
+                         sg_db_rpc.SecurityGroupServerRpcMixin,
+                         agents_db.AgentDbMixin):
+
     """Implement the Quantum abstractions using Open vSwitch.
 
     Depending on whether tunneling is enabled, either a GRE tunnel or
@@ -231,7 +235,8 @@ class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
     # is qualified by class
     __native_bulk_support = True
     supported_extension_aliases = ["provider", "router",
-                                   "binding", "quotas", "security-group"]
+                                   "binding", "quotas", "security-group",
+                                   "agent"]
 
     network_view = "extension:provider_network:view"
     network_set = "extension:provider_network:set"
index fa88c7bc657ac7d9f0b82c861396abe74f158320..33529288a3be30b85237445140d2f91c9e3dcf6a 100644 (file)
@@ -34,9 +34,6 @@ from quantum import wsgi
 LOG = logging.getLogger(__name__)
 
 service_opts = [
-    cfg.IntOpt('report_interval',
-               default=10,
-               help=_('Seconds between nodes reporting state to datastore')),
     cfg.IntOpt('periodic_interval',
                default=40,
                help=_('Seconds between running periodic tasks')),
index b11b4ce7e129ef7b6b77f3c7f1ffd50d24c9e920..17f0c8c4fd8db9a6363c5f07491ec608425816cd 100644 (file)
@@ -50,6 +50,7 @@ class TestOvsQuantumAgent(unittest.TestCase):
         # Avoid rpc initialization for unit tests
         cfg.CONF.set_override('rpc_backend',
                               'quantum.openstack.common.rpc.impl_fake')
+        cfg.CONF.set_override('report_interval', 0, 'AGENT')
         kwargs = ovs_quantum_agent.create_agent_config_map(cfg.CONF)
         with mock.patch('quantum.plugins.openvswitch.agent.ovs_quantum_agent.'
                         'OVSQuantumAgent.setup_integration_br',
@@ -160,14 +161,19 @@ class TestOvsQuantumAgent(unittest.TestCase):
                 'admin_state_up': True}
         with mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
                                return_value='2'):
-            with mock.patch.object(self.agent, 'port_bound') as port_bound:
-                self.agent.port_update(mock.Mock(), port=port)
-                self.assertTrue(port_bound.called)
-
-            with mock.patch.object(self.agent, 'port_dead') as port_dead:
-                port['admin_state_up'] = False
-                self.agent.port_update(mock.Mock(), port=port)
-                self.assertTrue(port_dead.called)
+            with mock.patch.object(self.agent.plugin_rpc,
+                                   'update_device_up') as device_up:
+                with mock.patch.object(self.agent, 'port_bound') as port_bound:
+                    self.agent.port_update(mock.Mock(), port=port)
+                    self.assertTrue(port_bound.called)
+                    self.assertTrue(device_up.called)
+            with mock.patch.object(self.agent.plugin_rpc,
+                                   'update_device_down') as device_down:
+                with mock.patch.object(self.agent, 'port_dead') as port_dead:
+                    port['admin_state_up'] = False
+                    self.agent.port_update(mock.Mock(), port=port)
+                    self.assertTrue(port_dead.called)
+                    self.assertTrue(device_down.called)
 
     def test_process_network_ports(self):
         reply = {'current': set(['tap0']),
index 5571f8186593038e7aa02c8d041a92ade5f16a74..8f5e578f55d91ea16be07143fe607e75ad2fe63a 100644 (file)
@@ -64,6 +64,7 @@ class TunnelTest(unittest.TestCase):
     def setUp(self):
         cfg.CONF.set_override('rpc_backend',
                               'quantum.openstack.common.rpc.impl_fake')
+        cfg.CONF.set_override('report_interval', 0, 'AGENT')
         self.mox = mox.Mox()
         self.INT_BRIDGE = 'integration_bridge'
         self.TUN_BRIDGE = 'tunnel_bridge'
diff --git a/quantum/tests/unit/test_agent_ext_plugin.py b/quantum/tests/unit/test_agent_ext_plugin.py
new file mode 100644 (file)
index 0000000..60f65d9
--- /dev/null
@@ -0,0 +1,180 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2013 OpenStack LLC.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import copy
+import time
+
+from webob import exc
+
+from quantum.common import constants
+from quantum.common import topics
+from quantum.common.test_lib import test_config
+from quantum import context
+from quantum.db import agents_db
+from quantum.db import db_base_plugin_v2
+from quantum.extensions import agent
+from quantum.openstack.common import cfg
+from quantum.openstack.common import log as logging
+from quantum.openstack.common import uuidutils
+from quantum.tests.unit import test_api_v2
+from quantum.tests.unit import test_db_plugin
+
+
+LOG = logging.getLogger(__name__)
+
+_uuid = uuidutils.generate_uuid
+_get_path = test_api_v2._get_path
+L3_HOSTA = 'hosta'
+DHCP_HOSTA = 'hosta'
+L3_HOSTB = 'hostb'
+DHCP_HOSTC = 'hostc'
+
+
+class AgentTestExtensionManager(object):
+
+    def get_resources(self):
+        return agent.Agent.get_resources()
+
+    def get_actions(self):
+        return []
+
+    def get_request_extensions(self):
+        return []
+
+
+# This plugin class is just for testing
+class TestAgentPlugin(db_base_plugin_v2.QuantumDbPluginV2,
+                      agents_db.AgentDbMixin):
+    supported_extension_aliases = ["agent"]
+
+
+class AgentDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
+    fmt = 'json'
+
+    def setUp(self):
+        self.adminContext = context.get_admin_context()
+        test_config['plugin_name_v2'] = (
+            'quantum.tests.unit.test_agent_ext_plugin.TestAgentPlugin')
+        # for these tests we need to enable overlapping ips
+        cfg.CONF.set_default('allow_overlapping_ips', True)
+        ext_mgr = AgentTestExtensionManager()
+        test_config['extension_manager'] = ext_mgr
+        super(AgentDBTestCase, self).setUp()
+
+    def _list_agents(self, expected_res_status=None,
+                     quantum_context=None,
+                     query_string=None):
+        comp_res = self._list('agents',
+                              quantum_context=quantum_context,
+                              query_params=query_string)
+        if expected_res_status:
+            self.assertEqual(comp_res.status_int, expected_res_status)
+        return comp_res
+
+    def _register_agent_states(self):
+        """Register two L3 agents and two DHCP agents."""
+        l3_hosta = {
+            'binary': 'quantum-l3-agent',
+            'host': L3_HOSTA,
+            'topic': topics.L3_AGENT,
+            'configurations': {'use_namespaces': True,
+                               'router_id': None,
+                               'handle_internal_only_routers':
+                               True,
+                               'gateway_external_network_id':
+                               None,
+                               'interface_driver': 'interface_driver',
+                               },
+            'agent_type': constants.AGENT_TYPE_L3}
+        l3_hostb = copy.deepcopy(l3_hosta)
+        l3_hostb['host'] = L3_HOSTB
+        dhcp_hosta = {
+            'binary': 'quantum-dhcp-agent',
+            'host': DHCP_HOSTA,
+            'topic': 'DHCP_AGENT',
+            'configurations': {'dhcp_driver': 'dhcp_driver',
+                               'use_namespaces': True,
+                               },
+            'agent_type': constants.AGENT_TYPE_DHCP}
+        dhcp_hostc = copy.deepcopy(dhcp_hosta)
+        dhcp_hostc['host'] = DHCP_HOSTC
+        callback = agents_db.AgentExtRpcCallback()
+        callback.report_state(self.adminContext,
+                              agent_state={'agent_state': l3_hosta})
+        callback.report_state(self.adminContext,
+                              agent_state={'agent_state': l3_hostb})
+        callback.report_state(self.adminContext,
+                              agent_state={'agent_state': dhcp_hosta})
+        callback.report_state(self.adminContext,
+                              agent_state={'agent_state': dhcp_hostc})
+        return [l3_hosta, l3_hostb, dhcp_hosta, dhcp_hostc]
+
+    def test_create_agent(self):
+        data = {'agent': {}}
+        _req = self.new_create_request('agents', data, self.fmt)
+        _req.environ['quantum.context'] = context.Context(
+            '', 'tenant_id')
+        res = _req.get_response(self.ext_api)
+        self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
+
+    def test_list_agent(self):
+        agents = self._register_agent_states()
+        res = self._list('agents')
+        for agent in res['agents']:
+            if (agent['host'] == DHCP_HOSTA and
+                agent['agent_type'] == constants.AGENT_TYPE_DHCP):
+                self.assertEqual(
+                    'dhcp_driver',
+                    agent['configurations']['dhcp_driver'])
+                break
+        self.assertEqual(len(agents), len(res['agents']))
+
+    def test_show_agent(self):
+        self._register_agent_states()
+        agents = self._list_agents(
+            query_string='binary=quantum-l3-agent')
+        self.assertEqual(2, len(agents['agents']))
+        agent = self._show('agents', agents['agents'][0]['id'])
+        self.assertEqual('quantum-l3-agent', agent['agent']['binary'])
+
+    def test_update_agent(self):
+        self._register_agent_states()
+        agents = self._list_agents(
+            query_string='binary=quantum-l3-agent&host=' + L3_HOSTB)
+        self.assertEqual(1, len(agents['agents']))
+        com_id = agents['agents'][0]['id']
+        agent = self._show('agents', com_id)
+        new_agent = {}
+        new_agent['agent'] = {}
+        new_agent['agent']['admin_state_up'] = False
+        new_agent['agent']['description'] = 'description'
+        self._update('agents', com_id, new_agent)
+        agent = self._show('agents', com_id)
+        self.assertFalse(agent['agent']['admin_state_up'])
+        self.assertEqual('description', agent['agent']['description'])
+
+    def test_dead_agent(self):
+        cfg.CONF.set_override('agent_down_time', 1)
+        self._register_agent_states()
+        time.sleep(1.5)
+        agents = self._list_agents(
+            query_string='binary=quantum-l3-agent&host=' + L3_HOSTB)
+        self.assertFalse(agents['agents'][0]['alive'])
+
+
+class AgentDBTestCaseXML(AgentDBTestCase):
+    fmt = 'xml'