# @author: Salvatore Orlando, Citrix
import Cheetah.Template as cheetah_template
-import logging as LOG
+import logging
+import os
import sys
from client import Client
FORMAT = "json"
CLI_TEMPLATE = "../quantum/cli_output.template"
-#TODO(salvatore-orlando): do proper logging!
+LOG = logging.getLogger('cli')
+
def _handle_exception(ex):
status_code = None
message = None
# Retrieve dict at 1st element of tuple at last argument
- if ex.args and isinstance(ex.args[-1][0],dict):
+ if ex.args and isinstance(ex.args[-1][0], dict):
status_code = ex.args[-1][0].get('status_code', None)
message = ex.args[-1][0].get('message', None)
- print "Failed to create network: %s" % status_code or '<missing>'
- print "Error message:%s" % message or '<missing>'
-
+ msg_1 = "Command failed with error code: %s" % status_code or '<missing>'
+ msg_2 = "Error message:%s" % message or '<missing>'
+ LOG.exception(msg_1 + "-" + msg_2)
+ print msg_1
+ print msg_2
+
def prepare_output(cmd, tenant_id, response):
""" Fills a cheetah template with the response """
template_file = open(CLI_TEMPLATE).read()
output = str(cheetah_template.Template(template_file,
searchList=response))
- return output
+ LOG.debug("Finished preparing output for command:%s", cmd)
+ return output
+
def list_nets(client, *args):
tenant_id = args[0]
res = client.list_networks()
+ LOG.debug("Operation 'list_networks' executed.")
output = prepare_output("list_nets", tenant_id, res)
print output
try:
res = client.create_network(data)
new_net_id = res["networks"]["network"]["id"]
+ LOG.debug("Operation 'create_network' executed.")
output = prepare_output("create_net", tenant_id,
dict(network_id=new_net_id))
print output
except Exception as ex:
_handle_exception(ex)
+
def delete_net(client, *args):
tenant_id, network_id = args
try:
client.delete_network(network_id)
+ LOG.debug("Operation 'delete_network' executed.")
output = prepare_output("delete_net", tenant_id,
dict(network_id=network_id))
print output
tenant_id, network_id = args
try:
res = client.list_network_details(network_id)["networks"]["network"]
+ LOG.debug("Operation 'list_network_details' executed.")
ports = client.list_ports(network_id)
+ LOG.debug("Operation 'list_ports' executed.")
res['ports'] = ports
for port in ports["ports"]:
att_data = client.list_port_attachments(network_id, port['id'])
+ LOG.debug("Operation 'list_attachments' executed.")
port['attachment'] = att_data["attachment"]
output = prepare_output("detail_net", tenant_id, dict(network=res))
data = {'network': {'net-name': '%s' % name}}
try:
client.update_network(network_id, data)
+ LOG.debug("Operation 'update_network' executed.")
# Response has no body. Use data for populating output
data['id'] = network_id
output = prepare_output("rename_net", tenant_id, dict(network=data))
tenant_id, network_id = args
try:
ports = client.list_ports(network_id)
+ LOG.debug("Operation 'list_ports' executed.")
output = prepare_output("list_ports", tenant_id, dict(ports=ports))
print output
except Exception as ex:
tenant_id, network_id = args
try:
res = client.create_port(network_id)
+ LOG.debug("Operation 'create_port' executed.")
new_port_id = res["ports"]["port"]["id"]
output = prepare_output("create_port", tenant_id,
dict(network_id=network_id,
tenant_id, network_id, port_id = args
try:
client.delete_port(network_id, port_id)
+ LOG.debug("Operation 'delete_port' executed.")
output = prepare_output("delete_port", tenant_id,
dict(network_id=network_id,
port_id=port_id))
tenant_id, network_id, port_id = args
try:
port = client.list_port_details(network_id, port_id)["ports"]["port"]
- #NOTE(salvatore-orland): current API implementation does not
- #return attachment with GET operation on port. Once API alignment
+ LOG.debug("Operation 'list_port_details' executed.")
+ #NOTE(salvatore-orland): current API implementation does not
+ #return attachment with GET operation on port. Once API alignment
#branch is merged, update client to use the detail action
port['attachment'] = '<unavailable>'
output = prepare_output("detail_port", tenant_id,
data = {'port': {'port-state': '%s' % new_state}}
try:
client.set_port_state(network_id, port_id, data)
+ LOG.debug("Operation 'set_port_state' executed.")
# Response has no body. Use data for populating output
data['id'] = port_id
output = prepare_output("set_port_state", tenant_id,
try:
data = {'port': {'attachment': '%s' % attachment}}
client.attach_resource(network_id, port_id, data)
+ LOG.debug("Operation 'attach_resource' executed.")
output = prepare_output("plug_interface", tenant_id,
- dict(network_id = network_id,
- port_id = port_id,
- attachment = attachment))
+ dict(network_id=network_id,
+ port_id=port_id,
+ attachment=attachment))
print output
except Exception as ex:
_handle_exception(ex)
tenant_id, network_id, port_id = args
try:
client.detach_resource(network_id, port_id)
+ LOG.debug("Operation 'detach_resource' executed.")
output = prepare_output("unplug_interface", tenant_id,
- dict(network_id = network_id,
- port_id = port_id))
+ dict(network_id=network_id,
+ port_id=port_id))
print output
except Exception as ex:
_handle_exception(ex)
"args": ["tenant-id", "net-id", "port-id"]},
"set_port_state": {
"func": set_port_state,
- "args": ["tenant-id", "net-id", "port-id","new_state"]},
+ "args": ["tenant-id", "net-id", "port-id", "new_state"]},
"detail_port": {
"func": detail_port,
"args": ["tenant-id", "net-id", "port-id"]},
action="store_true", default=False, help="use ssl")
parser.add_option("-v", "--verbose", dest="verbose",
action="store_true", default=False, help="turn on verbose logging")
-
+ parser.add_option("-lf", "--logfile", dest="logfile",
+ type="string", default="syslog", help="log file path")
options, args = parser.parse_args()
if options.verbose:
- LOG.basicConfig(level=LOG.DEBUG)
+ LOG.setLevel(logging.DEBUG)
else:
- LOG.basicConfig(level=LOG.WARN)
+ LOG.setLevel(logging.WARN)
+ #logging.handlers.WatchedFileHandler
+
+ if options.logfile == "syslog":
+ LOG.addHandler(logging.handlers.SysLogHandler(address='/dev/log'))
+ else:
+ LOG.addHandler(logging.handlers.WatchedFileHandler(options.logfile))
+ # Set permissions on log file
+ os.chmod(options.logfile, 0644)
if len(args) < 1:
parser.print_help()
args = build_args(cmd, commands[cmd]["args"], args[1:])
if not args:
sys.exit(1)
- LOG.debug("Executing command \"%s\" with args: %s" % (cmd, args))
+ LOG.info("Executing command \"%s\" with args: %s" % (cmd, args))
client = Client(options.host, options.port, options.ssl,
args[0], FORMAT)
commands[cmd]["func"](client, *args)
-
- LOG.debug("Command execution completed")
+
+ LOG.info("Command execution completed")
sys.exit(0)
Ports on Virtual Network: $network_id
#for $port in $ports
\tLogical Port: $port.id
+#end for
#elif $cmd == 'create_port'
Created new Logical Port with ID: $port_id
on Virtual Network: $network_id
# facilitate stubout for testing
conn.request(method, action, body, headers)
return conn.getresponse()
-
-
+
def do_request(self, method, action, body=None,
headers=None, params=None):
"""
else:
# Create exception with HTTP status code and message
ex = Exception("Server returned error: %s" % status_code)
- ex.args = ([dict(status_code=status_code, message=res.read())],)
+ ex.args = ([dict(status_code=status_code,
+ message=res.read())],)
raise ex
except (socket.error, IOError), e:
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
-# under the License.
\ No newline at end of file
+# under the License.
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-
""" Stubs for client tools unit tests """
+
from quantum import api as server
-from quantum import client
from tests.unit import testlib_api
-def stubout_send_request(stubs, api):
- """Simulates a failure in fetch image_glance_disk."""
+class FakeStdout:
- def fake_send_request(self, conn, method, action, body, headers):
- # ignore headers and connection
- req = testlib_api.create_request(action, body,
- "application/json", method)
- res = req.get_response(api)
- return res
-
- stubs.Set(client.Client, '_send_request', fake_send_request)
-
-
-class FakeStdout:
-
def __init__(self):
self.content = []
-
+
def write(self, text):
self.content.append(text)
-
+
def make_string(self):
result = ''
for line in self.content:
class FakeHTTPConnection:
- """ stub HTTP connection class for CLI testing """
+ """ stub HTTP connection class for CLI testing """
def __init__(self, _1, _2):
# Ignore host and port parameters
self._req = None
- options = dict(plugin_provider = \
- 'quantum.plugins.SamplePlugin.FakePlugin')
+ options = \
+ dict(plugin_provider='quantum.plugins.SamplePlugin.FakePlugin')
self._api = server.APIRouterV01(options)
-
+
def request(self, method, action, body, headers):
# TODO: remove version prefix from action!
parts = action.split('/', 2)
path = '/' + parts[2]
self._req = testlib_api.create_request(path, body, "application/json",
method)
-
+
def getresponse(self):
res = self._req.get_response(self._api)
def _fake_read():
- """ Trick for macking a webob.Response look like a
+ """ Trick for macking a webob.Response look like a
httplib.Response
-
+
"""
return res.body
setattr(res, 'read', _fake_read)
- return res
-
+ return res
# under the License.
# @author: Salvatore Orlando, Citrix Systems
-""" Module containing unit tests for Quantum
+""" Module containing unit tests for Quantum
command line interface
-
+
"""
from quantum import cli
from quantum.client import Client
from quantum.db import api as db
-from quantum.manager import QuantumManager
from tests.unit.client_tools import stubs as client_stubs
LOG = logging.getLogger('quantum.tests.test_cli')
FORMAT = 'json'
+
class CLITest(unittest.TestCase):
def setUp(self):
self.network_name_1 = "test_network_1"
self.network_name_2 = "test_network_2"
# Prepare client and plugin manager
- self.client = Client(tenant = self.tenant_id, format = FORMAT,
- testingStub = client_stubs.FakeHTTPConnection)
- self.manager = QuantumManager(options).get_plugin()
+ self.client = Client(tenant=self.tenant_id, format=FORMAT,
+ testingStub=client_stubs.FakeHTTPConnection)
# Redirect stdout
self.fake_stdout = client_stubs.FakeStdout()
sys.stdout = self.fake_stdout
-
+
def tearDown(self):
"""Clear the test environment"""
db.clear_db()
sys.stdout = sys.__stdout__
-
-
+
def _verify_list_networks(self):
# Verification - get raw result from db
nw_list = db.network_list(self.tenant_id)
- networks=[dict(id=nw.uuid, name=nw.name) for nw in nw_list]
+ networks = [dict(id=nw.uuid, name=nw.name) for nw in nw_list]
# Fill CLI template
output = cli.prepare_output('list_nets', self.tenant_id,
dict(networks=networks))
# Verify!
# Must add newline at the end to match effect of print call
self.assertEquals(self.fake_stdout.make_string(), output + '\n')
-
- def test_list_networks(self):
- try:
- # Pre-populate data for testing using db api
- db.network_create(self.tenant_id, self.network_name_1)
- db.network_create(self.tenant_id, self.network_name_2)
-
- cli.list_nets(self.manager, self.tenant_id)
- LOG.debug("Operation completed. Verifying result")
- LOG.debug(self.fake_stdout.content)
- self._verify_list_networks()
- except:
- LOG.exception("Exception caught: %s", sys.exc_info())
- self.fail("test_list_network failed due to an exception")
-
def test_list_networks_api(self):
- try:
+ try:
# Pre-populate data for testing using db api
db.network_create(self.tenant_id, self.network_name_1)
db.network_create(self.tenant_id, self.network_name_2)
-
- cli.api_list_nets(self.client, self.tenant_id)
+
+ cli.list_nets(self.client, self.tenant_id)
LOG.debug("Operation completed. Verifying result")
LOG.debug(self.fake_stdout.content)
- self._verify_list_networks()
- except:
+ self._verify_list_networks()
+ except:
LOG.exception("Exception caught: %s", sys.exc_info())
self.fail("test_list_network_api failed due to an exception")
- def test_create_network(self):
- try:
- cli.create_net(self.manager, self.tenant_id, "test")
- LOG.debug("Operation completed. Verifying result")
- LOG.debug(self.fake_stdout.content)
- self._verify_create_network()
- except:
- LOG.exception("Exception caught: %s", sys.exc_info())
- self.fail("test_create_network failed due to an exception")
-
def test_create_network_api(self):
- try:
- cli.api_create_net(self.client, self.tenant_id, "test")
+ try:
+ cli.create_net(self.client, self.tenant_id, "test")
LOG.debug("Operation completed. Verifying result")
LOG.debug(self.fake_stdout.content)
- self._verify_create_network()
- except:
+ self._verify_create_network()
+ except:
LOG.exception("Exception caught: %s", sys.exc_info())
self.fail("test_create_network_api failed due to an exception")
-
+
def _prepare_test_delete_network(self):
# Pre-populate data for testing using db api
db.network_create(self.tenant_id, self.network_name_1)
net_id = db.network_list(self.tenant_id)[0]['uuid']
return net_id
- def test_delete_network(self):
- try:
- network_id = self._prepare_test_delete_network()
- cli.delete_net(self.manager, self.tenant_id, network_id)
- LOG.debug("Operation completed. Verifying result")
- LOG.debug(self.fake_stdout.content)
- self._verify_delete_network(network_id)
- except:
- LOG.exception("Exception caught: %s", sys.exc_info())
- self.fail("test_delete_network failed due to an exception")
-
-
def test_delete_network_api(self):
- try:
+ try:
network_id = self._prepare_test_delete_network()
- cli.api_delete_net(self.client, self.tenant_id, network_id)
+ cli.delete_net(self.client, self.tenant_id, network_id)
LOG.debug("Operation completed. Verifying result")
LOG.debug(self.fake_stdout.content)
- self._verify_delete_network(network_id)
- except:
+ self._verify_delete_network(network_id)
+ except:
LOG.exception("Exception caught: %s", sys.exc_info())
self.fail("test_delete_network_api failed due to an exception")
-
+
def test_detail_network_api(self):
# Load some data into the datbase
net = db.network_create(self.tenant_id, self.network_name_1)
- db.port_create(net['uuid'])
+ db.port_create(net['uuid'])
port = db.port_create(net['uuid'])
- cli.api_detail_net(self.client, self.tenant_id, net['uuid'])
- db.port_set_attachment(port['uuid'], net['uuid'], "test_iface_id")
\ No newline at end of file
+ cli.detail_net(self.client, self.tenant_id, net['uuid'])
+ db.port_set_attachment(port['uuid'], net['uuid'], "test_iface_id")