responsible determining whether a command is safe to execute.
"""
+from __future__ import print_function
import ConfigParser
import json
exec_name = sys.argv.pop(0)
# argv[0] required; path to conf file
if len(sys.argv) < 2:
- print "%s: No command specified" % exec_name
+ print("%s: No command specified" % exec_name)
sys.exit(RC_NOCOMMAND)
config_file = sys.argv.pop(0)
if len(sections) == 1:
return sections[0]
- print "Multiple [xenapi] sections or no [xenapi] section found!"
+ print("Multiple [xenapi] sections or no [xenapi] section found!")
sys.exit(RC_BADCONFIG)
username = config.get(section, "xenapi_connection_username")
password = config.get(section, "xenapi_connection_password")
except ConfigParser.Error:
- print "%s: Incorrect configuration file: %s" % (exec_name, config_file)
+ print("%s: Incorrect configuration file: %s" % (exec_name, config_file))
sys.exit(RC_BADCONFIG)
if not url or not password:
msg = ("%s: Must specify xenapi_connection_url, "
"xenapi_connection_username (optionally), and "
"xenapi_connection_password in %s") % (exec_name, config_file)
- print msg
+ print(msg)
sys.exit(RC_BADCONFIG)
return dict(
filters_path=filters_path,
filter_match = wrapper.match_filter(
filters, user_args, exec_dirs=exec_dirs)
if not filter_match:
- print "Unauthorized command: %s" % ' '.join(user_args)
+ print("Unauthorized command: %s" % ' '.join(user_args))
sys.exit(RC_UNAUTHORIZED)
if __name__ == '__main__':
- print main()
+ print(main())
responsible determining whether a command is safe to execute.
"""
+from __future__ import print_function
import ConfigParser
import json
exec_name = sys.argv.pop(0)
# argv[0] required; path to conf file
if len(sys.argv) < 2:
- print "%s: No command specified" % exec_name
+ print("%s: No command specified" % exec_name)
sys.exit(RC_NOCOMMAND)
config_file = sys.argv.pop(0)
if len(sections) == 1:
return sections[0]
- print "Multiple [xenapi] sections or no [xenapi] section found!"
+ print("Multiple [xenapi] sections or no [xenapi] section found!")
sys.exit(RC_BADCONFIG)
username = config.get(section, "xenapi_connection_username")
password = config.get(section, "xenapi_connection_password")
except ConfigParser.Error:
- print "%s: Incorrect configuration file: %s" % (exec_name, config_file)
+ print("%s: Incorrect configuration file: %s" % (exec_name, config_file))
sys.exit(RC_BADCONFIG)
if not url or not password:
msg = ("%s: Must specify xenapi_connection_url, "
"xenapi_connection_username (optionally), and "
"xenapi_connection_password in %s") % (exec_name, config_file)
- print msg
+ print(msg)
sys.exit(RC_BADCONFIG)
return dict(
filters_path=filters_path,
filter_match = wrapper.match_filter(
filters, user_args, exec_dirs=exec_dirs)
if not filter_match:
- print "Unauthorized command: %s" % ' '.join(user_args)
+ print("Unauthorized command: %s" % ' '.join(user_args))
sys.exit(RC_UNAUTHORIZED)
if __name__ == '__main__':
- print main()
+ print(main())
Used for NeutronRestProxy tests
"""
+from __future__ import print_function
import json
import re
pass
if self.debug:
- print '\n'
+ print('\n')
if self.debug_env:
- print '%s:' % 'environ:'
+ print('environ:')
for (key, value) in sorted(environ.iteritems()):
- print ' %16s : %s' % (key, value)
+ print(' %16s : %s' % (key, value))
- print '%s %s' % (method, uri)
+ print('%s %s' % (method, uri))
if request_data:
- print '%s' % (
+ print('%s' %
json.dumps(request_data, sort_keys=True, indent=4))
status, body = self.request_handler(method, uri, None)
start_response(status, headers)
if self.debug:
if self.debug_env:
- print '%s: %s' % ('Response',
- json.dumps(body_data, sort_keys=True, indent=4))
+ print('%s: %s' % ('Response',
+ json.dumps(body_data, sort_keys=True, indent=4)))
return body
return make_server(self.host, self.port, app)
def run(self):
- print "Serving on port %d ..." % self.port
+ print("Serving on port %d ..." % self.port)
try:
self.server().serve_forever()
except KeyboardInterrupt:
# @author: Mandeep Dhami, Big Switch Networks, Inc.
"""Determine version of NeutronRestProxy plugin"""
+from __future__ import print_function
# if vcsversion exists, use it. Else, use LOCALBRANCH:LOCALREVISION
try:
if __name__ == "__main__":
- print version_string_with_vcs()
+ print(version_string_with_vcs())
"""Brocade NOS Driver CLI."""
+from __future__ import print_function
import argparse
elif args.cmd == 'dissociate' and numargs == 2:
self._dissociate(args.otherargs[0], args.otherargs[1])
else:
- print usage_desc
+ print(usage_desc)
exit(0)
def _create(self, net_id):
"""Brocade NOS Driver Test."""
+from __future__ import print_function
+
import sys
from neutron.plugins.brocade.nos import nosdriver as nos
# AMPP enumeration
with driver.connect(host, username, password) as mgr:
- print driver.get_port_profiles(mgr)
- print driver.get_port_profile(mgr, 'default')
+ print(driver.get_port_profiles(mgr))
+ print(driver.get_port_profile(mgr, 'default'))
if __name__ == '__main__':
#
# @author: Aaron Rosen, VMware
+from __future__ import print_function
+
import sys
from oslo.config import cfg
def help(name):
- print "Usage: %s path/to/nvp.ini" % name
+ print("Usage: %s path/to/nvp.ini" % name)
sys.exit(1)
args = ['--config-file']
args.append(argv[1])
config.parse(args)
- print "------------------------ Database Options ------------------------"
- print "\tconnection: %s" % cfg.CONF.database.connection
- print "\tretry_interval: %d" % cfg.CONF.database.retry_interval
- print "\tmax_retries: %d" % cfg.CONF.database.max_retries
- print "------------------------ NVP Options ------------------------"
- print "\tNVP Generation Timeout %d" % cfg.CONF.NVP.nvp_gen_timeout
- print ("\tNumber of concurrent connections to each controller %d" %
- cfg.CONF.NVP.concurrent_connections)
- print "\tmax_lp_per_bridged_ls: %s" % cfg.CONF.NVP.max_lp_per_bridged_ls
- print "\tmax_lp_per_overlay_ls: %s" % cfg.CONF.NVP.max_lp_per_overlay_ls
- print "------------------------ Cluster Options ------------------------"
- print "\trequested_timeout: %s" % cfg.CONF.req_timeout
- print "\tretries: %s" % cfg.CONF.retries
- print "\tredirects: %s" % cfg.CONF.redirects
- print "\thttp_timeout: %s" % cfg.CONF.http_timeout
+ print("----------------------- Database Options -----------------------")
+ print("\tconnection: %s" % cfg.CONF.database.connection)
+ print("\tretry_interval: %d" % cfg.CONF.database.retry_interval)
+ print("\tmax_retries: %d" % cfg.CONF.database.max_retries)
+ print("----------------------- NVP Options -----------------------")
+ print("\tNVP Generation Timeout %d" % cfg.CONF.NVP.nvp_gen_timeout)
+ print("\tNumber of concurrent connections to each controller %d" %
+ cfg.CONF.NVP.concurrent_connections)
+ print("\tmax_lp_per_bridged_ls: %s" % cfg.CONF.NVP.max_lp_per_bridged_ls)
+ print("\tmax_lp_per_overlay_ls: %s" % cfg.CONF.NVP.max_lp_per_overlay_ls)
+ print("----------------------- Cluster Options -----------------------")
+ print("\trequested_timeout: %s" % cfg.CONF.req_timeout)
+ print("\tretries: %s" % cfg.CONF.retries)
+ print("\tredirects: %s" % cfg.CONF.redirects)
+ print("\thttp_timeout: %s" % cfg.CONF.http_timeout)
cluster = NeutronPlugin.create_nvp_cluster(
cfg.CONF,
cfg.CONF.NVP.concurrent_connections,
cfg.CONF.NVP.nvp_gen_timeout)
num_controllers = len(cluster.nvp_controllers)
- print "Number of controllers found: %s" % num_controllers
+ print("Number of controllers found: %s" % num_controllers)
if num_controllers == 0:
- print "You must specify at least one controller!"
+ print("You must specify at least one controller!")
sys.exit(1)
for controller in cluster.nvp_controllers:
- print "\tController endpoint: %s" % controller
+ print("\tController endpoint: %s" % controller)
nvplib.check_cluster_connectivity(cluster)
gateway_services = get_gateway_services(cluster)
default_gateways = {
errors = 0
for svc_type in default_gateways.keys():
for uuid in gateway_services[svc_type]:
- print "\t\tGateway(%s) uuid: %s" % (svc_type, uuid)
+ print("\t\tGateway(%s) uuid: %s" % (svc_type, uuid))
if (default_gateways[svc_type] and
default_gateways[svc_type] not in gateway_services):
- print ("\t\t\tError: specified default %s gateway (%s) is "
- "missing from NVP Gateway Services!" % (svc_type,
- default_gateways[svc_type]))
+ print("\t\t\tError: specified default %s gateway (%s) is "
+ "missing from NVP Gateway Services!" % (svc_type,
+ default_gateways[svc_type]))
errors += 1
transport_zones = get_transport_zones(cluster)
- print "\tTransport zones: %s" % transport_zones
+ print("\tTransport zones: %s" % transport_zones)
if cfg.CONF.default_tz_uuid not in transport_zones:
- print ("\t\tError: specified default transport zone "
- "(%s) is missing from NVP transport zones!"
- % cfg.CONF.default_tz_uuid)
+ print("\t\tError: specified default transport zone "
+ "(%s) is missing from NVP transport zones!"
+ % cfg.CONF.default_tz_uuid)
errors += 1
if errors:
- print ("\nThere are %d errors with your configuration. "
- " Please, revise!" % errors)
+ print("\nThere are %d errors with your configuration. "
+ " Please, revise!" % errors)
sys.exit(1)
else:
- print "Done."
+ print("Done.")
# License for the specific language governing permissions and limitations
# under the License.
+from __future__ import print_function
+
import collections
import uuid
def show_pending_tasks(self):
for task in self._tasks_queue:
- print str(task)
+ print(str(task))
for resource, tasks in self._tasks.iteritems():
for task in tasks:
- print str(task)
+ print(str(task))
def count(self):
count = 0
def update_network(self, tenant_id, net_id, param_data):
"""Rename a network."""
try:
- print param_data
net = db.network_update(net_id, tenant_id, **param_data)
LOG.debug("Updated network: %s", net.uuid)
net_dict = {}
self._check_report_state([dummy_vif] * 5, 5, fail_mode)
self._check_report_state([], 0, fail_mode)
- print 'record_state', self.record_calls
- print 'num_ports_hist', self.num_ports_hist
-
# Since loopingcall start is mocked, call_count is same as
# the call count of check_report_state.
self.assertEqual(state_rpc.report_state.call_count, 4)
def test_convert_tenant_id(self):
ofc_t_id = self.generate_random_ids(1)
- print ofc_t_id
ret = self.driver.convert_ofc_tenant_id(self.ctx, ofc_t_id)
self.assertEqual(ret, '/tenants/%s' % ofc_t_id)
'status': "ACTIVE"}
if filter_dict:
f.update(filter_dict)
- print 'filter=%s' % f
net_path = "/networks/%s" % n
ret = self.driver.convert_ofc_port_id(self.ctx, ofc_p_id, t_id, n_id)
exp = port_path_template % {'network': ofc_n_id, 'port': ofc_p_id}
- print 'exp=', exp
- print 'ret=', ret
+
self.assertEqual(ret, exp)
self.mox.VerifyAll()
task.wait(TaskState.RESULT)
- if 'error' in userdata:
- print userdata['error']
-
self.assertTrue(userdata['result'])
def test_task_manager_task_sync_exec_process_state(self):
last_task.wait(TaskState.RESULT)
- for task in tasks:
- if 'error' in task.userdata:
- print "Task %s failed: " % (
- tasks.name, tasks.userdata['error'])
-
for task in tasks:
self.assertTrue(task.userdata['result'])
port_security_enabled=True,
allowed_address_pairs=address_pairs)
port = self.deserialize(self.fmt, res)
- print port
update_port = {'port': {psec.PORTSECURITY: False}}
# If plugin implements security groups we also need to remove
# the security group on port.
"""
Utility methods for working with WSGI servers
"""
+from __future__ import print_function
+
import errno
import os
import socket
@webob.dec.wsgify
def __call__(self, req):
- print ("*" * 40) + " REQUEST ENVIRON"
+ print(("*" * 40) + " REQUEST ENVIRON")
for key, value in req.environ.items():
- print key, "=", value
+ print(key, "=", value)
print
resp = req.get_response(self.application)
- print ("*" * 40) + " RESPONSE HEADERS"
+ print(("*" * 40) + " RESPONSE HEADERS")
for (key, value) in resp.headers.iteritems():
- print key, "=", value
+ print(key, "=", value)
print
resp.app_iter = self.print_generator(resp.app_iter)
@staticmethod
def print_generator(app_iter):
"""Print contents of a wrapper string iterator when iterated."""
- print ("*" * 40) + " BODY"
+ print(("*" * 40) + " BODY")
for part in app_iter:
sys.stdout.write(part)
sys.stdout.flush()
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+from __future__ import print_function
import compiler
import imp
if is_localized(node):
for (checker, msg) in self.msg_format_checkers:
if checker(node):
- print >> sys.stderr, (
- '%s:%d %s: %s' %
- (self.filename, node.lineno,
- self.lines[node.lineno - 1][:-1],
- "Error: %s" % msg))
+ print('%s:%d %s: %s Error: %s' %
+ (self.filename, node.lineno,
+ self.lines[node.lineno - 1][:-1], msg),
+ file=sys.stderr)
self.error = 1
return
if debug:
- print ('%s:%d %s: %s' %
- (self.filename, node.lineno,
- self.lines[node.lineno - 1][:-1],
- "Pass"))
+ print('%s:%d %s: %s' %
+ (self.filename, node.lineno,
+ self.lines[node.lineno - 1][:-1],
+ "Pass"))
else:
for (predicate, action, msg) in self.i18n_msg_predicates:
if predicate(node):
if action == 'skip':
if debug:
- print ('%s:%d %s: %s' %
- (self.filename, node.lineno,
- self.lines[node.lineno - 1][:-1],
- "Pass"))
+ print('%s:%d %s: %s' %
+ (self.filename, node.lineno,
+ self.lines[node.lineno - 1][:-1],
+ "Pass"))
return
elif action == 'error':
- print >> sys.stderr, (
- '%s:%d %s: %s' %
- (self.filename, node.lineno,
- self.lines[node.lineno - 1][:-1],
- "Error: %s" % msg))
+ print('%s:%d %s: %s Error: %s' %
+ (self.filename, node.lineno,
+ self.lines[node.lineno - 1][:-1], msg),
+ file=sys.stderr)
self.error = 1
return
elif action == 'warn':
- print ('%s:%d %s: %s' %
- (self.filename, node.lineno,
- self.lines[node.lineno - 1][:-1],
- "Warn: %s" % msg))
+ print('%s:%d %s: %s' %
+ (self.filename, node.lineno,
+ self.lines[node.lineno - 1][:-1],
+ "Warn: %s" % msg))
return
- print >> sys.stderr, 'Predicate with wrong action!'
+ print('Predicate with wrong action!', file=sys.stderr)
def is_file_in_black_list(black_list, f):
try:
cfg_mod = imp.load_source('', cfg_path)
except Exception:
- print >> sys.stderr, "Load cfg module failed"
+ print("Load cfg module failed", file=sys.stderr)
sys.exit(1)
i18n_msg_predicates = cfg_mod.i18n_msg_predicates
"""
Installation script for Neutron's development virtualenv
"""
+from __future__ import print_function
import os
import subprocess
Also, make test will automatically use the virtualenv.
"""
- print help
+ print(help)
def main(argv):