--- /dev/null
+# Copyright (c) 2014 Cisco Systems Inc.
+# All Rights Reserved.
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# @author: Mandeep Dhami (dhami@noironetworks.com), Cisco Systems Inc.
+import re
+import sys
+import eventlet
+from oslo.config import cfg
+from neutron.agent.common import config
+from neutron.agent.linux import ip_lib
+from neutron.agent.linux import utils
+from neutron.common import config as common_cfg
+from neutron.common import rpc
+from neutron.common import utils as neutron_utils
+from neutron.db import agents_db
+from neutron import manager
+from neutron.openstack.common.gettextutils import _LE, _LI
+from neutron.openstack.common import lockutils
+from neutron.openstack.common import log as logging
+from neutron.openstack.common import periodic_task
+from neutron.openstack.common import service as svc
+from neutron.plugins.ml2.drivers.cisco.apic import mechanism_apic as ma
+from neutron.plugins.ml2.drivers import type_vlan # noqa
+from neutron import service
+ 'topology/pod-1/node-(\d+)/sys/conng/path-\[eth(\d+)/(\d+)\]',
+ 'topology/pod-1/paths-(\d+)/pathep-\[eth(\d+)/(\d+)\]',
+BINARY_APIC_SERVICE_AGENT = 'neutron-cisco-apic-service-agent'
+BINARY_APIC_HOST_AGENT = 'neutron-cisco-apic-host-agent'
+TOPIC_APIC_SERVICE = 'apic-service'
+TYPE_APIC_SERVICE_AGENT = 'cisco-apic-service-agent'
+TYPE_APIC_HOST_AGENT = 'cisco-apic-host-agent'
+LOG = logging.getLogger(__name__)
+class ApicTopologyService(manager.Manager):
+ def __init__(self, host=None):
+ if host is None:
+ host = neutron_utils.get_hostname()
+ super(ApicTopologyService, self).__init__(host=host)
+ self.conf = cfg.CONF.ml2_cisco_apic
+ self.conn = None
+ self.peers = {}
+ self.invalid_peers = []
+ self.dispatcher = None
+ self.state = None
+ self.state_agent = None
+ self.topic = TOPIC_APIC_SERVICE
+ self.apic_manager = ma.APICMechanismDriver.get_apic_manager(False)
+ def init_host(self):
+ LOG.info(_LI("APIC service agent starting ..."))
+ self.state = {
+ 'host': self.host,
+ 'topic': self.topic,
+ 'configurations': {},
+ 'start_flag': True,
+ 'agent_type': TYPE_APIC_SERVICE_AGENT,
+ }
+ self.conn = rpc.create_connection(new=True)
+ self.dispatcher = [self, agents_db.AgentExtRpcCallback()]
+ self.conn.create_consumer(
+ self.topic, self.dispatcher, fanout=True)
+ self.conn.consume_in_threads()
+ def after_start(self):
+ LOG.info(_LI("APIC service agent started"))
+ def report_send(self, context):
+ if not self.state_agent:
+ return
+ LOG.debug("APIC service agent: sending report state")
+ try:
+ self.state_agent.report_state(context, self.state)
+ self.state.pop('start_flag', None)
+ except AttributeError:
+ # This means the server does not support report_state
+ # ignore it
+ return
+ except Exception:
+ LOG.exception(_LE("APIC service agent: failed in reporting state"))
+ @lockutils.synchronized('apic_service')
+ def update_link(self, context,
+ host, interface, mac,
+ switch, module, port):
+ LOG.debug("APIC service agent: received update_link: %s",
+ ", ".join(map(str,
+ [host, interface, mac, switch, module, port])))
+ nlink = (host, interface, mac, switch, module, port)
+ clink = self.peers.get((host, interface), None)
+ if switch == 0:
+ # this is a link delete, remove it
+ if clink is not None:
+ self.apic_manager.remove_hostlink(*clink)
+ self.peers.pop((host, interface))
+ else:
+ if clink is None:
+ # add new link to database
+ self.apic_manager.add_hostlink(*nlink)
+ self.peers[(host, interface)] = nlink
+ elif clink != nlink:
+ # delete old link and add new one (don't update in place)
+ self.apic_manager.remove_hostlink(*clink)
+ self.peers.pop((host, interface))
+ self.apic_manager.add_hostlink(*nlink)
+ self.peers[(host, interface)] = nlink
+class ApicTopologyServiceNotifierApi(rpc.RpcProxy):
+ def __init__(self):
+ super(ApicTopologyServiceNotifierApi, self).__init__(
+ default_version=self.RPC_API_VERSION)
+ def update_link(self, context, host, interface, mac, switch, module, port):
+ self.fanout_cast(
+ context, self.make_msg(
+ 'update_link',
+ host=host, interface=interface, mac=mac,
+ switch=switch, module=module, port=port),
+ def delete_link(self, context, host, interface):
+ self.fanout_cast(
+ context, self.make_msg(
+ 'delete_link',
+ host=host, interface=interface, mac=None,
+ switch=0, module=0, port=0),
+class ApicTopologyAgent(manager.Manager):
+ def __init__(self, host=None):
+ if host is None:
+ host = neutron_utils.get_hostname()
+ super(ApicTopologyAgent, self).__init__(host=host)
+ self.conf = cfg.CONF.ml2_cisco_apic
+ self.count_current = 0
+ self.count_force_send = AGENT_FORCE_UPDATE_COUNT
+ self.interfaces = {}
+ self.lldpcmd = None
+ self.peers = {}
+ self.port_desc_re = map(re.compile, ACI_PORT_DESCR_FORMATS)
+ self.root_helper = self.conf.root_helper
+ self.service_agent = ApicTopologyServiceNotifierApi()
+ self.state = None
+ self.state_agent = None
+ self.topic = TOPIC_APIC_SERVICE
+ self.uplink_ports = []
+ self.invalid_peers = []
+ def init_host(self):
+ LOG.info(_LI("APIC host agent: agent starting on %s"), self.host)
+ self.state = {
+ 'host': self.host,
+ 'topic': self.topic,
+ 'configurations': {},
+ 'start_flag': True,
+ 'agent_type': TYPE_APIC_HOST_AGENT,
+ }
+ self.uplink_ports = []
+ for inf in self.conf.apic_host_uplink_ports:
+ if ip_lib.device_exists(inf):
+ self.uplink_ports.append(inf)
+ else:
+ # ignore unknown interfaces
+ LOG.error(_LE("No such interface (ignored): %s"), inf)
+ self.lldpcmd = ['lldpctl', '-f', 'keyvalue'] + self.uplink_ports
+ def after_start(self):
+ LOG.info(_LI("APIC host agent: started on %s"), self.host)
+ @periodic_task.periodic_task
+ def _check_for_new_peers(self, context):
+ LOG.debug("APIC host agent: _check_for_new_peers")
+ if not self.lldpcmd:
+ return
+ try:
+ # Check if we must send update even if there is no change
+ force_send = False
+ self.count_current += 1
+ if self.count_current >= self.count_force_send:
+ force_send = True
+ self.count_current = 0
+ # Check for new peers
+ new_peers = self._get_peers()
+ new_peers = self._valid_peers(new_peers)
+ # Make a copy of current interfaces
+ curr_peers = {}
+ for interface in self.peers:
+ curr_peers[interface] = self.peers[interface]
+ # Based curr -> new updates, add the new interfaces
+ self.peers = {}
+ for interface in new_peers:
+ peer = new_peers[interface]
+ self.peers[interface] = peer
+ if (interface in curr_peers and
+ curr_peers[interface] != peer):
+ self.service_agent.update_link(
+ context, peer[0], peer[1], None, 0, 0, 0)
+ if (interface not in curr_peers or
+ curr_peers[interface] != peer or
+ force_send):
+ self.service_agent.update_link(context, *peer)
+ if interface in curr_peers:
+ curr_peers.pop(interface)
+ # Any interface still in curr_peers need to be deleted
+ for peer in curr_peers.values():
+ self.service_agent.update_link(
+ context, peer[0], peer[1], None, 0, 0, 0)
+ except Exception:
+ LOG.exception(_LE("APIC service agent: exception in LLDP parsing"))
+ def _get_peers(self):
+ peers = {}
+ lldpkeys = utils.execute(self.lldpcmd, self.root_helper)
+ for line in lldpkeys.splitlines():
+ if '=' not in line:
+ continue
+ fqkey, value = line.split('=', 1)
+ lldp, interface, key = fqkey.split('.', 2)
+ if key == 'port.descr':
+ for regexp in self.port_desc_re:
+ match = regexp.match(value)
+ if match:
+ mac = self._get_mac(interface)
+ switch, module, port = match.group(1, 2, 3)
+ peer = (self.host, interface, mac,
+ switch, module, port)
+ if interface not in peers:
+ peers[interface] = []
+ peers[interface].append(peer)
+ return peers
+ def _valid_peers(self, peers):
+ # Reduce the peers array to one valid peer per interface
+ # NOTE:
+ # There is a bug in lldpd daemon that it keeps reporting
+ # old peers even after their updates have stopped
+ # we keep track of that report remove them from peers
+ valid_peers = {}
+ invalid_peers = []
+ for interface in peers:
+ curr_peer = None
+ for peer in peers[interface]:
+ if peer in self.invalid_peers or curr_peer:
+ invalid_peers.append(peer)
+ else:
+ curr_peer = peer
+ if curr_peer is not None:
+ valid_peers[interface] = curr_peer
+ self.invalid_peers = invalid_peers
+ return valid_peers
+ def _get_mac(self, interface):
+ if interface in self.interfaces:
+ return self.interfaces[interface]
+ try:
+ mac = ip_lib.IPDevice(interface).link.address
+ self.interfaces[interface] = mac
+ return mac
+ except Exception:
+ # we can safely ignore it, it is only needed for debugging
+ LOG.exception(
+ _LE("APIC service agent: can not get MACaddr for %s"),
+ interface)
+ def report_send(self, context):
+ if not self.state_agent:
+ return
+ LOG.debug("APIC host agent: sending report state")
+ try:
+ self.state_agent.report_state(context, self.state)
+ self.state.pop('start_flag', None)
+ except AttributeError:
+ # This means the server does not support report_state
+ # ignore it
+ return
+ except Exception:
+ LOG.exception(_LE("APIC host agent: failed in reporting state"))
+def launch(binary, manager, topic=None):
+ cfg.CONF(project='neutron')
+ common_cfg.init(sys.argv[1:])
+ config.setup_logging(cfg.CONF)
+ report_period = cfg.CONF.ml2_cisco_apic.apic_agent_report_interval
+ poll_period = cfg.CONF.ml2_cisco_apic.apic_agent_poll_interval
+ server = service.Service.create(
+ binary=binary, manager=manager, topic=topic,
+ report_interval=report_period, periodic_interval=poll_period)
+ svc.launch(server).wait()
+def service_main():
+ launch(
+ 'neutron.plugins.ml2.drivers.' +
+ 'cisco.apic.apic_topology.ApicTopologyService',
+def agent_main():
+ launch(
+ 'neutron.plugins.ml2.drivers.' +
+ 'cisco.apic.apic_topology.ApicTopologyAgent')
--- /dev/null
+# Copyright (c) 2014 Cisco Systems
+# All Rights Reserved.
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# @author: Ivar Lazzaro (ivarlazzaro@gmail.com), Cisco Systems, Inc.
+import sys
+import mock
+sys.modules["apicapi"] = mock.Mock()
+from neutron.plugins.ml2.drivers.cisco.apic import apic_topology
+from neutron.tests import base
+from neutron.tests.unit.ml2.drivers.cisco.apic import (
+ test_cisco_apic_common as mocked)
+NOTIFIER = ('neutron.plugins.ml2.drivers.cisco.apic.'
+ 'apic_topology.ApicTopologyServiceNotifierApi')
+RPC_CONNECTION = 'neutron.common.rpc.Connection'
+AGENTS_DB = 'neutron.db.agents_db'
+PERIODIC_TASK = 'neutron.openstack.common.periodic_task'
+DEV_EXISTS = 'neutron.agent.linux.ip_lib.device_exists'
+IP_DEVICE = 'neutron.agent.linux.ip_lib.IPDevice'
+EXECUTE = 'neutron.agent.linux.utils.execute'
+LLDP_CMD = ['lldpctl', '-f', 'keyvalue']
+ 'lldp.' + ETH0 + '.via=LLDP\n'
+ 'lldp.' + ETH0 + '.rid=1\n'
+ 'lldp.' + ETH0 + '.age=0 day, 20:55:54\n'
+ 'lldp.' + ETH0 + '.chassis.mac=' + mocked.SERVICE_HOST_MAC + '\n'
+ 'lldp.' + ETH0 + '.chassis.name=' + mocked.SERVICE_PEER_CHASSIS_NAME + '\n'
+ 'lldp.' + ETH0 + '.chassis.descr=' + mocked.SERVICE_PEER_CHASSIS + '\n'
+ 'lldp.' + ETH0 + '.chassis.Bridge.enabled=on\n'
+ 'lldp.' + ETH0 + '.chassis.Router.enabled=on\n'
+ 'lldp.' + ETH0 + '.port.local=' + mocked.SERVICE_PEER_PORT_LOCAL + '\n'
+ 'lldp.' + ETH0 + '.port.descr=' + mocked.SERVICE_PEER_PORT_DESC)
+class TestCiscoApicTopologyService(base.BaseTestCase,
+ mocked.ControllerMixin,
+ mocked.ConfigMixin):
+ def setUp(self):
+ super(TestCiscoApicTopologyService, self).setUp()
+ mocked.ControllerMixin.set_up_mocks(self)
+ mocked.ConfigMixin.set_up_mocks(self)
+ # Patch notifier
+ notifier_c = mock.patch(NOTIFIER).start()
+ self.notifier = mock.Mock()
+ notifier_c.return_value = self.notifier
+ # Patch Connection
+ connection_c = mock.patch(RPC_CONNECTION).start()
+ self.connection = mock.Mock()
+ connection_c.return_value = self.connection
+ # Patch agents db
+ self.agents_db = mock.patch(AGENTS_DB).start()
+ self.service = apic_topology.ApicTopologyService()
+ self.service.apic_manager = mock.Mock()
+ def test_init_host(self):
+ self.service.init_host()
+ self.connection.create_consumer.ensure_called_once()
+ self.connection.consume_in_threads.ensure_called_once()
+ def test_update_link_add_nopeers(self):
+ self.service.peers = {}
+ args = (mocked.SERVICE_HOST, mocked.SERVICE_HOST_IFACE,
+ self.service.update_link(None, *args)
+ self.service.apic_manager.add_hostlink.assert_called_once_with(*args)
+ self.assertEqual(args,
+ self.service.peers[(mocked.SERVICE_HOST,
+ def test_update_link_add_with_peers_diff(self):
+ args = (mocked.SERVICE_HOST, mocked.SERVICE_HOST_IFACE,
+ args_prime = args[:2] + tuple(x + '1' for x in args[2:])
+ self.service.peers = {args_prime[:2]: args_prime}
+ self.service.update_link(None, *args)
+ self.service.apic_manager.remove_hostlink.assert_called_once_with(
+ *args_prime)
+ self.service.apic_manager.add_hostlink.assert_called_once_with(*args)
+ self.assertEqual(
+ args, self.service.peers[
+ def test_update_link_add_with_peers_eq(self):
+ args = (mocked.SERVICE_HOST, mocked.SERVICE_HOST_IFACE,
+ self.service.peers = {args[:2]: args}
+ self.service.update_link(None, *args)
+ def test_update_link_rem_with_peers(self):
+ args = (mocked.SERVICE_HOST, mocked.SERVICE_HOST_IFACE,
+ mocked.SERVICE_HOST_MAC, 0,
+ self.service.peers = {args[:2]: args}
+ self.service.update_link(None, *args)
+ self.service.apic_manager.remove_hostlink.assert_called_once_with(
+ *args)
+ self.assertFalse(bool(self.service.peers))
+ def test_update_link_rem_no_peers(self):
+ args = (mocked.SERVICE_HOST, mocked.SERVICE_HOST_IFACE,
+ mocked.SERVICE_HOST_MAC, 0,
+ self.service.update_link(None, *args)
+class TestCiscoApicTopologyAgent(base.BaseTestCase,
+ mocked.ControllerMixin,
+ mocked.ConfigMixin):
+ def setUp(self):
+ super(TestCiscoApicTopologyAgent, self).setUp()
+ mocked.ControllerMixin.set_up_mocks(self)
+ mocked.ConfigMixin.set_up_mocks(self)
+ # Patch notifier
+ notifier_c = mock.patch(NOTIFIER).start()
+ self.notifier = mock.Mock()
+ notifier_c.return_value = self.notifier
+ # Patch device_exists
+ self.dev_exists = mock.patch(DEV_EXISTS).start()
+ # Patch IPDevice
+ ipdev_c = mock.patch(IP_DEVICE).start()
+ self.ipdev = mock.Mock()
+ ipdev_c.return_value = self.ipdev
+ self.ipdev.link.address = mocked.SERVICE_HOST_MAC
+ # Patch execute
+ self.execute = mock.patch(EXECUTE).start()
+ self.execute.return_value = LLDPCTL_RES
+ # Patch tasks
+ self.periodic_task = mock.patch(PERIODIC_TASK).start()
+ self.agent = apic_topology.ApicTopologyAgent()
+ self.agent.host = mocked.SERVICE_HOST
+ self.agent.service_agent = mock.Mock()
+ self.agent.lldpcmd = LLDP_CMD
+ def test_init_host_device_exists(self):
+ self.agent.lldpcmd = None
+ self.dev_exists.return_value = True
+ self.agent.init_host()
+ self.assertEqual(LLDP_CMD + mocked.APIC_UPLINK_PORTS,
+ self.agent.lldpcmd)
+ def test_init_host_device_not_exist(self):
+ self.agent.lldpcmd = None
+ self.dev_exists.return_value = False
+ self.agent.init_host()
+ self.assertEqual(LLDP_CMD, self.agent.lldpcmd)
+ def test_get_peers(self):
+ self.agent.peers = {}
+ peers = self.agent._get_peers()
+ expected = [(mocked.SERVICE_HOST, mocked.SERVICE_HOST_IFACE,
+ mocked.APIC_EXT_MODULE, mocked.APIC_EXT_PORT)]
+ self.assertEqual(expected,
+ peers[mocked.SERVICE_HOST_IFACE])
+ def test_check_for_new_peers_no_peers(self):
+ self.agent.peers = {}
+ expected = (mocked.SERVICE_HOST, mocked.SERVICE_HOST_IFACE,
+ peers = {mocked.SERVICE_HOST_IFACE: [expected]}
+ context = mock.Mock()
+ with mock.patch.object(self.agent, '_get_peers',
+ return_value=peers):
+ self.agent._check_for_new_peers(context)
+ self.assertEqual(expected,
+ self.agent.peers[mocked.SERVICE_HOST_IFACE])
+ self.agent.service_agent.update_link.assert_called_once_with(
+ context, *expected)
+ def test_check_for_new_peers_with_peers(self):
+ expected = (mocked.SERVICE_HOST, mocked.SERVICE_HOST_IFACE,
+ peers = {mocked.SERVICE_HOST_IFACE: [expected]}
+ self.agent.peers = {mocked.SERVICE_HOST_IFACE:
+ [tuple(x + '1' for x in expected)]}
+ context = mock.Mock()
+ with mock.patch.object(self.agent, '_get_peers',
+ return_value=peers):
+ self.agent._check_for_new_peers(context)
+ self.agent.service_agent.update_link.assert_called_with(
+ context, *expected)
\ No newline at end of file