This agent client plugs itself into each network.
Then tries to ping each fixed_ips.
Implements blueprint test-agent
Change-Id: I3908e37401272e9e091ceae66e72cfcdd13b7898
--- /dev/null
+#!/usr/bin/env python
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2012 Openstack, LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from quantum.debug.shell import main
+main()
--- /dev/null
+Debug Helper Script for Quantum
+
+- Configure
+export TEST_CONFIG_FILE=/etc/quantum/dhcp_agent.ini
+or
+export TEST_CONFIG_FILE=/etc/quantum/l3_agent.ini
+
+you can also specify config file by --config-file option
+
+- Usage
+quantum-debug commands
+
+probe-create <net-id> Create probe port - create port and interface, then plug it in.
+ This commands returns a port id of a probe port. A probe port is a port which is used to test.
+ The port id is probe id.
+ We can have multiple probe probes in a network, in order to check connectivity between ports.
+
+ quantum-debug probe-exec probe_id_1 'nc -l 192.168.100.3 22'
+ quantum-debug probe-exec probe_id_2 'nc -vz 192.168.100.4 22'
+
+probe-delete <port-id> Delete probe - delete port then uplug
+probe-exec <port-id> 'command' Exec commands on the namespace of the probe
+`probe-exec <port-id>` 'interactive command' Exec interactive command (eg, ssh)
+
+probe-list List probes
+probe-clear Clear All probes
+
+ping-all --id <network_id> --timeout 1 (optional)
+ ping-all is all-in-one command to ping all fixed ip's in all network or a specified network.
+ In the command probe is automatically created if needed.
+
+quantum-debug extends the shell of quantumclient, so you can use all the commands of quantum
+
--- /dev/null
+#!/bin/python
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2012, Nachi Ueno, NTT MCL, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
--- /dev/null
+#!/bin/python
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2012, Nachi Ueno, NTT MCL, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+
+from cliff import lister
+
+from quantumclient.common import utils
+from quantumclient.quantum.v2_0 import QuantumCommand
+from quantumclient.quantum.v2_0.port import _format_fixed_ips
+
+
+class ProbeCommand(QuantumCommand):
+ log = logging.getLogger(__name__ + '.ProbeCommand')
+
+ def get_debug_agent(self):
+ return self.app.debug_agent
+
+ def run(self, parsed_args):
+ self.log.debug('run(%s)' % parsed_args)
+ self.app.stdout.write(_('Unimplemented commands') + '\n')
+
+
+class CreateProbe(ProbeCommand):
+ """Create probe port and interface, then plug it in."""
+
+ log = logging.getLogger(__name__ + '.CreateProbe')
+
+ def get_parser(self, prog_name):
+ parser = super(CreateProbe, self).get_parser(prog_name)
+ parser.add_argument(
+ 'id', metavar='network_id',
+ help='ID of network to probe')
+ return parser
+
+ def run(self, parsed_args):
+ self.log.debug('run(%s)' % parsed_args)
+ debug_agent = self.get_debug_agent()
+ port = debug_agent.create_probe(parsed_args.id)
+ self.app.stdout.write(_('Probe created : %s ') % port.id + '\n')
+
+
+class DeleteProbe(ProbeCommand):
+ """Delete probe - delete port then uplug """
+
+ log = logging.getLogger(__name__ + '.DeleteProbe')
+
+ def get_parser(self, prog_name):
+ parser = super(DeleteProbe, self).get_parser(prog_name)
+ parser.add_argument(
+ 'id', metavar='port_id',
+ help='ID of probe port to delete')
+ return parser
+
+ def run(self, parsed_args):
+ self.log.debug('run(%s)' % parsed_args)
+ debug_agent = self.get_debug_agent()
+ debug_agent.delete_probe(parsed_args.id)
+ self.app.stdout.write(_('Probe %s deleted') % parsed_args.id + '\n')
+
+
+class ListProbe(QuantumCommand, lister.Lister):
+ """ List probes """
+
+ log = logging.getLogger(__name__ + '.ListProbe')
+ _formatters = {'fixed_ips': _format_fixed_ips, }
+
+ def get_debug_agent(self):
+ return self.app.debug_agent
+
+ def get_data(self, parsed_args):
+
+ debug_agent = self.get_debug_agent()
+ info = debug_agent.list_probes()
+ columns = len(info) > 0 and sorted(info[0].keys()) or []
+ return (columns, (utils.get_item_properties(
+ s, columns, formatters=self._formatters, )
+ for s in info), )
+
+
+class ClearProbe(ProbeCommand):
+ """Clear All probes """
+
+ log = logging.getLogger(__name__ + '.ClearProbe')
+
+ def run(self, parsed_args):
+ self.log.debug('run(%s)' % parsed_args)
+ debug_agent = self.get_debug_agent()
+ debug_agent.clear_probe()
+ self.app.stdout.write(_('All Probes deleted ') + '\n')
+
+
+class ExecProbe(ProbeCommand):
+ """Exec commands on the namespace of the probe
+ """
+
+ log = logging.getLogger(__name__ + '.ExecProbe')
+
+ def get_parser(self, prog_name):
+ parser = super(ExecProbe, self).get_parser(prog_name)
+ parser.add_argument(
+ 'id', metavar='port_id',
+ help='ID of probe port to execute command')
+ parser.add_argument(
+ 'command', metavar='command',
+ nargs='?',
+ default=None,
+ help='Command to execute')
+ return parser
+
+ def run(self, parsed_args):
+ self.log.debug('run(%s)' % parsed_args)
+ debug_agent = self.get_debug_agent()
+ result = debug_agent.exec_command(parsed_args.id, parsed_args.command)
+ self.app.stdout.write(result + '\n')
+
+
+class PingAll(ProbeCommand):
+ """Ping all fixed_ip
+ """
+
+ log = logging.getLogger(__name__ + '.ExecProbe')
+
+ def get_parser(self, prog_name):
+ parser = super(PingAll, self).get_parser(prog_name)
+ parser.add_argument(
+ '--timeout', metavar='<timeout>',
+ default=10,
+ help='Ping timeout')
+ parser.add_argument(
+ '--id', metavar='network_id',
+ default=None,
+ help='ID of network')
+ return parser
+
+ def run(self, parsed_args):
+ self.log.debug('run(%s)' % parsed_args)
+ debug_agent = self.get_debug_agent()
+ result = debug_agent.ping_all(parsed_args.id,
+ timeout=parsed_args.timeout)
+ self.app.stdout.write(result + '\n')
--- /dev/null
+#!/bin/python
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2012, Nachi Ueno, NTT MCL, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import errno
+import logging
+import os
+import shlex
+import socket
+import sys
+
+import netaddr
+
+from quantum.agent.common import config
+from quantum.agent.dhcp_agent import DictModel
+from quantum.agent.linux import interface
+from quantum.agent.linux import ip_lib
+from quantum.agent.linux import utils
+from quantum.openstack.common import cfg
+from quantum.openstack.common import importutils
+from quantumclient.v2_0 import client
+
+LOG = logging.getLogger('test-agent')
+
+DEVICE_OWNER_PROBE = 'network:probe'
+
+
+class QuantumDebugAgent():
+
+ OPTS = [
+ cfg.StrOpt('root_helper', default='sudo'),
+ # Needed for drivers
+ cfg.StrOpt('admin_user'),
+ cfg.StrOpt('admin_password'),
+ cfg.StrOpt('admin_tenant_name'),
+ cfg.StrOpt('auth_url'),
+ cfg.StrOpt('auth_strategy', default='keystone'),
+ cfg.StrOpt('auth_region'),
+ cfg.BoolOpt('use_namespaces', default=True),
+ cfg.StrOpt('interface_driver',
+ help="The driver used to manage the virtual interface.")
+ ]
+
+ def __init__(self, conf, client, driver):
+ self.conf = conf
+ self.client = client
+ self.driver = driver
+
+ def _get_namespace(self, port):
+ return "qprobe-%s" % port.id
+
+ def create_probe(self, network_id):
+ network = self._get_network(network_id)
+ port = self._create_port(network)
+ port.network = network
+ interface_name = self.driver.get_device_name(port)
+ namespace = None
+ if self.conf.use_namespaces:
+ namespace = self._get_namespace(port)
+
+ if ip_lib.device_exists(interface_name,
+ self.conf.root_helper, namespace):
+ LOG.debug(_('Reusing existing device: %s.') % interface_name)
+ else:
+ self.driver.plug(network.id,
+ port.id,
+ interface_name,
+ port.mac_address,
+ namespace=namespace)
+ ip_cidrs = []
+ for fixed_ip in port.fixed_ips:
+ subnet = fixed_ip.subnet
+ net = netaddr.IPNetwork(subnet.cidr)
+ ip_cidr = '%s/%s' % (fixed_ip.ip_address, net.prefixlen)
+ ip_cidrs.append(ip_cidr)
+ self.driver.init_l3(interface_name, ip_cidrs, namespace=namespace)
+ return port
+
+ def _get_subnet(self, subnet_id):
+ subnet_dict = self.client.show_subnet(subnet_id)['subnet']
+ return DictModel(subnet_dict)
+
+ def _get_network(self, network_id):
+ network_dict = self.client.show_network(network_id)['network']
+ network = DictModel(network_dict)
+ obj_subnet = [self._get_subnet(s_id) for s_id in network.subnets]
+ network.subnets = obj_subnet
+ return network
+
+ def clear_probe(self):
+ ports = self.client.list_ports(device_id=socket.gethostname(),
+ device_owner=DEVICE_OWNER_PROBE)
+ info = ports['ports']
+ for port in info:
+ self.delete_probe(port['id'])
+
+ def delete_probe(self, port_id):
+ port = DictModel(self.client.show_port(port_id)['port'])
+ ip = ip_lib.IPWrapper(self.conf.root_helper)
+ namespace = self._get_namespace(port)
+ if self.conf.use_namespaces and ip.netns.exists(namespace):
+ self.driver.unplug(self.driver.get_device_name(port),
+ namespace=namespace)
+ ip.netns.delete(namespace)
+ else:
+ self.driver.unplug(self.driver.get_device_name(port))
+ self.client.delete_port(port.id)
+
+ def list_probes(self):
+ ports = self.client.list_ports(device_owner=DEVICE_OWNER_PROBE)
+ info = ports['ports']
+ for port in info:
+ port['device_name'] = self.driver.get_device_name(DictModel(port))
+ return info
+
+ def exec_command(self, port_id, command=None):
+ port = DictModel(self.client.show_port(port_id)['port'])
+ ip = ip_lib.IPWrapper(self.conf.root_helper)
+ namespace = self._get_namespace(port)
+ if self.conf.use_namespaces:
+ if not command:
+ return "sudo ip netns exec %s" % self._get_namespace(port)
+ namespace = ip.ensure_namespace(namespace)
+ return namespace.netns.execute(shlex.split(command))
+ else:
+ return utils.execute(shlex.split(command))
+
+ def ensure_probe(self, network_id):
+ ports = self.client.list_ports(network_id=network_id,
+ device_id=socket.gethostname(),
+ device_owner=DEVICE_OWNER_PROBE)
+ info = ports.get('ports', [])
+ if info:
+ return DictModel(info[0])
+ else:
+ return self.create_probe(network_id)
+
+ def ping_all(self, network_id=None, timeout=1):
+ if network_id:
+ ports = self.client.list_ports(network_id=network_id)['ports']
+ else:
+ ports = self.client.list_ports()['ports']
+ result = ""
+ for port in ports:
+ probe = self.ensure_probe(port['network_id'])
+ if port['device_owner'] == DEVICE_OWNER_PROBE:
+ continue
+ for fixed_ip in port['fixed_ips']:
+ address = fixed_ip['ip_address']
+ subnet = self._get_subnet(fixed_ip['subnet_id'])
+ if subnet.ip_version == 4:
+ ping_command = 'ping'
+ else:
+ ping_command = 'ping6'
+ result += self.exec_command(probe.id,
+ '%s -c 1 -w %s %s' % (ping_command,
+ timeout,
+ address))
+ return result
+
+ def _create_port(self, network):
+ body = dict(port=dict(
+ admin_state_up=True,
+ network_id=network.id,
+ device_id='%s' % socket.gethostname(),
+ device_owner=DEVICE_OWNER_PROBE,
+ tenant_id=network.tenant_id,
+ fixed_ips=[dict(subnet_id=s.id) for s in network.subnets]))
+ port_dict = self.client.create_port(body)['port']
+ port = DictModel(port_dict)
+ port.network = network
+ for fixed_ip in port.fixed_ips:
+ fixed_ip.subnet = self._get_subnet(fixed_ip.subnet_id)
+ return port
--- /dev/null
+#!/bin/python
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2012, Nachi Ueno, NTT MCL, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import itertools
+import sys
+
+from quantum.agent.common import config
+from quantum.agent.linux import interface
+import quantum.debug.commands
+from quantum.debug.debug_agent import QuantumDebugAgent
+from quantum.openstack.common import cfg
+from quantum.openstack.common import importutils
+from quantumclient.common import exceptions as exc
+from quantumclient.common import utils
+from quantumclient.shell import env, QuantumShell, QUANTUM_API_VERSION
+
+COMMAND_V2 = {
+ 'probe-create': utils.import_class(
+ 'quantum.debug.commands.CreateProbe'),
+ 'probe-delete': utils.import_class(
+ 'quantum.debug.commands.DeleteProbe'),
+ 'probe-list': utils.import_class(
+ 'quantum.debug.commands.ListProbe'),
+ 'probe-clear': utils.import_class(
+ 'quantum.debug.commands.ClearProbe'),
+ 'probe-exec': utils.import_class(
+ 'quantum.debug.commands.ExecProbe'),
+ 'ping-all': utils.import_class(
+ 'quantum.debug.commands.PingAll'),
+#TODO(nati) ping, netcat , nmap, bench
+}
+COMMANDS = {'2.0': COMMAND_V2}
+
+
+class QuantumDebugShell(QuantumShell):
+ def __init__(self, api_version):
+ super(QuantumDebugShell, self).__init__(api_version)
+ for k, v in COMMANDS[api_version].items():
+ self.command_manager.add_command(k, v)
+
+ def build_option_parser(self, description, version):
+ parser = super(QuantumDebugShell, self).build_option_parser(
+ description, version)
+ parser.add_argument(
+ '--config-file',
+ default=env('TEST_CONFIG_FILE'),
+ help='Config file for interface driver '
+ '(You may also use either the '
+ 'l3_agent.ini or the dhcp_agent.ini)')
+ return parser
+
+ def initialize_app(self, argv):
+ super(QuantumDebugShell, self).initialize_app(argv)
+ if not self.options.config_file:
+ raise exc.CommandError(
+ "You must provide a config file for bridge -"
+ " either --config-file or env[TEST_CONFIG_FILE]")
+ client = self.client_manager.quantum
+ cfg.CONF.register_opts(interface.OPTS)
+ cfg.CONF.register_opts(QuantumDebugAgent.OPTS)
+ cfg.CONF(['--config-file', self.options.config_file])
+ config.setup_logging(cfg.CONF)
+ driver = importutils.import_object(cfg.CONF.interface_driver, cfg.CONF)
+ self.debug_agent = QuantumDebugAgent(cfg.CONF, client, driver)
+
+
+def main(argv=None):
+ return QuantumDebugShell(QUANTUM_API_VERSION).run(argv or sys.argv[1:])
+
+if __name__ == "__main__":
+ sys.exit(main())
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2012, Nachi Ueno, NTT MCL, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import socket
+import sys
+import uuid
+
+import mock
+import unittest2 as unittest
+
+from quantum.agent.common import config
+from quantum.agent.linux import interface
+from quantum.agent.linux import utils
+from quantum.common import exceptions
+from quantum.debug import commands
+from quantum.debug.debug_agent import DEVICE_OWNER_PROBE, QuantumDebugAgent
+from quantum.openstack.common import cfg
+
+
+class MyApp(object):
+ def __init__(self, _stdout):
+ self.stdout = _stdout
+
+
+class TestDebugCommands(unittest.TestCase):
+ def setUp(self):
+ cfg.CONF.register_opts(interface.OPTS)
+ cfg.CONF.register_opts(QuantumDebugAgent.OPTS)
+ cfg.CONF(args=sys.argv, project='quantum')
+ cfg.CONF.set_override('use_namespaces', True)
+ cfg.CONF.root_helper = 'sudo'
+
+ self.addCleanup(mock.patch.stopall)
+ device_exists_p = mock.patch(
+ 'quantum.agent.linux.ip_lib.device_exists')
+ device_exists_p.start()
+ namespace_p = mock.patch(
+ 'quantum.agent.linux.ip_lib.IpNetnsCommand')
+ namespace_p.start()
+ ensure_namespace_p = mock.patch(
+ 'quantum.agent.linux.ip_lib.IPWrapper.ensure_namespace')
+ ensure_namespace_p.start()
+ dvr_cls_p = mock.patch('quantum.agent.linux.interface.NullDriver')
+ driver_cls = dvr_cls_p.start()
+ mock_driver = mock.MagicMock()
+ mock_driver.DEV_NAME_LEN = (
+ interface.LinuxInterfaceDriver.DEV_NAME_LEN)
+ mock_driver.get_device_name.return_value = 'tap12345678-12'
+ driver_cls.return_value = mock_driver
+ self.driver = mock_driver
+
+ client_cls_p = mock.patch('quantumclient.v2_0.client.Client')
+ client_cls = client_cls_p.start()
+ client_inst = mock.Mock()
+ client_cls.return_value = client_inst
+
+ fake_network = {'network': {'id': 'fake_net',
+ 'tenant_id': 'fake_tenant',
+ 'subnets': ['fake_subnet']}}
+ fake_port = {'port':
+ {'id': 'fake_port',
+ 'device_owner': 'fake_device',
+ 'mac_address': 'aa:bb:cc:dd:ee:ffa',
+ 'network_id': 'fake_net',
+ 'fixed_ips':
+ [{'subnet_id': 'fake_subnet', 'ip_address':'10.0.0.3'}]
+ }}
+ fake_ports = {'ports': [fake_port['port']]}
+ self.fake_ports = fake_ports
+ allocation_pools = [{'start': '10.0.0.2',
+ 'end': '10.0.0.254'}]
+ fake_subnet_v4 = {'subnet': {'name': 'fake_subnet_v4',
+ 'id': 'fake_subnet',
+ 'network_id': 'fake_net',
+ 'gateway_ip': '10.0.0.1',
+ 'dns_nameservers': ['10.0.0.2'],
+ 'host_routes': [],
+ 'cidr': '10.0.0.0/24',
+ 'allocation_pools': allocation_pools,
+ 'enable_dhcp': True,
+ 'ip_version': 4}}
+
+ client_inst.list_ports.return_value = fake_ports
+ client_inst.create_port.return_value = fake_port
+ client_inst.show_port.return_value = fake_port
+ client_inst.show_network.return_value = fake_network
+ client_inst.show_subnet.return_value = fake_subnet_v4
+ self.client = client_inst
+ mock_std = mock.Mock()
+ self.app = MyApp(mock_std)
+ self.app.debug_agent = QuantumDebugAgent(cfg.CONF,
+ client_inst,
+ mock_driver)
+
+ def test_create_probe(self):
+ cmd = commands.CreateProbe(self.app, None)
+ cmd_parser = cmd.get_parser('create_probe')
+ args = ['fake_net']
+ parsed_args = cmd_parser.parse_args(args)
+ cmd.run(parsed_args)
+ fake_port = {'port':
+ {'device_owner': DEVICE_OWNER_PROBE,
+ 'admin_state_up': True,
+ 'network_id': 'fake_net',
+ 'tenant_id': 'fake_tenant',
+ 'fixed_ips': [{'subnet_id': 'fake_subnet'}],
+ 'device_id': socket.gethostname()}}
+ namespace = 'qprobe-fake_port'
+ self.client.assert_has_calls([mock.call.show_network('fake_net'),
+ mock.call.show_subnet('fake_subnet'),
+ mock.call.create_port(fake_port),
+ mock.call.show_subnet('fake_subnet')])
+ self.driver.assert_has_calls([mock.call.init_l3('tap12345678-12',
+ ['10.0.0.3/24'],
+ namespace=namespace
+ )])
+
+ def test_delete_probe(self):
+ cmd = commands.DeleteProbe(self.app, None)
+ cmd_parser = cmd.get_parser('delete_probe')
+ args = ['fake_port']
+ parsed_args = cmd_parser.parse_args(args)
+ cmd.run(parsed_args)
+ namespace = 'qprobe-fake_port'
+ self.client.assert_has_calls([mock.call.show_port('fake_port'),
+ mock.call.delete_port('fake_port')])
+ self.driver.assert_has_calls([mock.call.get_device_name(mock.ANY),
+ mock.call.unplug('tap12345678-12',
+ namespace=namespace)])
+
+ def test_delete_probe_without_namespace(self):
+ cfg.CONF.set_override('use_namespaces', False)
+ cmd = commands.DeleteProbe(self.app, None)
+ cmd_parser = cmd.get_parser('delete_probe')
+ args = ['fake_port']
+ parsed_args = cmd_parser.parse_args(args)
+ cmd.run(parsed_args)
+ self.client.assert_has_calls([mock.call.show_port('fake_port'),
+ mock.call.delete_port('fake_port')])
+ self.driver.assert_has_calls([mock.call.get_device_name(mock.ANY),
+ mock.call.unplug('tap12345678-12')])
+
+ def test_list_probe(self):
+ cmd = commands.ListProbe(self.app, None)
+ cmd_parser = cmd.get_parser('list_probe')
+ args = []
+ parsed_args = cmd_parser.parse_args(args)
+ cmd.run(parsed_args)
+ self.client.assert_has_calls(
+ [mock.call.list_ports(device_owner=DEVICE_OWNER_PROBE)])
+
+ def test_exec_command(self):
+ cmd = commands.ExecProbe(self.app, None)
+ cmd_parser = cmd.get_parser('exec_command')
+ args = ['fake_port', 'fake_command']
+ parsed_args = cmd_parser.parse_args(args)
+ with mock.patch('quantum.agent.linux.ip_lib.IpNetnsCommand') as ns:
+ cmd.run(parsed_args)
+ ns.assert_has_calls([mock.call.execute(mock.ANY)])
+ self.client.assert_has_calls([mock.call.show_port('fake_port')])
+
+ def test_exec_command_without_namespace(self):
+ cfg.CONF.set_override('use_namespaces', False)
+ cmd = commands.ExecProbe(self.app, None)
+ cmd_parser = cmd.get_parser('exec_command')
+ args = ['fake_port', 'fake_command']
+ parsed_args = cmd_parser.parse_args(args)
+ with mock.patch('quantum.agent.linux.utils.execute') as exe:
+ cmd.run(parsed_args)
+ exe.assert_has_calls([mock.call.execute(mock.ANY)])
+ self.client.assert_has_calls([mock.call.show_port('fake_port')])
+
+ def test_clear_probe(self):
+ cmd = commands.ClearProbe(self.app, None)
+ cmd_parser = cmd.get_parser('clear_probe')
+ args = []
+ parsed_args = cmd_parser.parse_args(args)
+ cmd.run(parsed_args)
+ namespace = 'qprobe-fake_port'
+ self.client.assert_has_calls([mock.call.list_ports(
+ device_id=socket.gethostname(),
+ device_owner=DEVICE_OWNER_PROBE),
+ mock.call.show_port('fake_port'),
+ mock.call.delete_port('fake_port')])
+ self.driver.assert_has_calls([mock.call.get_device_name(mock.ANY),
+ mock.call.unplug('tap12345678-12',
+ namespace=namespace)])
+
+ def test_ping_all_with_ensure_port(self):
+ fake_ports = self.fake_ports
+
+ def fake_port_list(network_id=None, device_owner=None, device_id=None):
+ if network_id:
+ # In order to test ensure_port, return []
+ return {'ports': []}
+ return fake_ports
+ self.client.list_ports.side_effect = fake_port_list
+ cmd = commands.PingAll(self.app, None)
+ cmd_parser = cmd.get_parser('ping_all')
+ args = []
+ parsed_args = cmd_parser.parse_args(args)
+ namespace = 'qprobe-fake_port'
+ with mock.patch('quantum.agent.linux.ip_lib.IpNetnsCommand') as ns:
+ cmd.run(parsed_args)
+ ns.assert_has_calls([mock.call.execute(mock.ANY)])
+ fake_port = {'port':
+ {'device_owner': DEVICE_OWNER_PROBE,
+ 'admin_state_up': True,
+ 'network_id': 'fake_net',
+ 'tenant_id': 'fake_tenant',
+ 'fixed_ips': [{'subnet_id': 'fake_subnet'}],
+ 'device_id': socket.gethostname()}}
+ expected = [mock.call.show_network('fake_net'),
+ mock.call.show_subnet('fake_subnet'),
+ mock.call.create_port(fake_port),
+ mock.call.show_subnet('fake_subnet')]
+ self.client.assert_has_calls(expected)
+ self.driver.assert_has_calls([mock.call.init_l3('tap12345678-12',
+ ['10.0.0.3/24'],
+ namespace=namespace
+ )])
+
+ def test_ping_all(self):
+ cmd = commands.PingAll(self.app, None)
+ cmd_parser = cmd.get_parser('ping_all')
+ args = []
+ parsed_args = cmd_parser.parse_args(args)
+ with mock.patch('quantum.agent.linux.ip_lib.IpNetnsCommand') as ns:
+ cmd.run(parsed_args)
+ ns.assert_has_calls([mock.call.execute(mock.ANY)])
+ fake_port = {'port':
+ {'device_owner': DEVICE_OWNER_PROBE,
+ 'admin_state_up': True,
+ 'network_id': 'fake_net',
+ 'tenant_id': 'fake_tenant',
+ 'fixed_ips': [{'subnet_id': 'fake_subnet'}],
+ 'device_id': socket.gethostname()}}
+ expected = [mock.call.list_ports(),
+ mock.call.list_ports(network_id='fake_net',
+ device_owner=DEVICE_OWNER_PROBE,
+ device_id=socket.gethostname()),
+ mock.call.show_subnet('fake_subnet'),
+ mock.call.show_port('fake_port')]
+ self.client.assert_has_calls(expected)
+
+ def test_ping_all_v6(self):
+ fake_subnet_v6 = {'subnet': {'name': 'fake_v6',
+ 'ip_version': 6}}
+ self.client.show_subnet.return_value = fake_subnet_v6
+ cmd = commands.PingAll(self.app, None)
+ cmd_parser = cmd.get_parser('ping_all')
+ args = []
+ parsed_args = cmd_parser.parse_args(args)
+ with mock.patch('quantum.agent.linux.ip_lib.IpNetnsCommand') as ns:
+ cmd.run(parsed_args)
+ ns.assert_has_calls([mock.call.execute(mock.ANY)])
+ self.client.assert_has_calls([mock.call.list_ports()])
'quantum-nec-agent = '
'quantum.plugins.nec.agent.nec_quantum_agent:main',
'quantum-server = quantum.server:main',
+ 'quantum-debug = quantum.debug.shell:main',
]
},
)
-distribute>=0.6.24
+cliff
coverage
-mock>=0.8
+distribute>=0.6.24
+mock>=1.0b1
mox==0.5.3
nose
+nosehtmloutput
nosexcover
openstack.nose_plugin
-nosehtmloutput
pep8
sphinx>=1.1.2
unittest2