Neutron Specific Commandments
--------------------------
+- [N319] Validate that debug level logs are not translated
- [N320] Validate that LOG messages, except debug ones, have translations
- [N321] Validate that jsonutils module is used instead of json
- [N322] We do not use @authors tags in source files. We have git to track
from neutron.common import utils
from neutron import context
from neutron import manager
+from neutron.openstack.common.gettextutils import _LE, _LI, _LW
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
self.cache.put(net)
except NotImplementedError:
# just go ahead with an empty networks cache
- LOG.debug(
- _("The '%s' DHCP-driver does not support retrieving of a "
- "list of existing networks"),
- self.conf.dhcp_driver
- )
+ LOG.debug("The '%s' DHCP-driver does not support retrieving of a "
+ "list of existing networks",
+ self.conf.dhcp_driver)
def after_start(self):
self.run()
- LOG.info(_("DHCP agent started"))
+ LOG.info(_LI("DHCP agent started"))
def run(self):
"""Activate the DHCP agent."""
def call_driver(self, action, network, **action_kwargs):
"""Invoke an action on a DHCP driver instance."""
- LOG.debug(_('Calling driver for network: %(net)s action: %(action)s'),
+ LOG.debug('Calling driver for network: %(net)s action: %(action)s',
{'net': network.id, 'action': action})
try:
# the Driver expects something that is duck typed similar to
except exceptions.Conflict:
# No need to resync here, the agent will receive the event related
# to a status update for the network
- LOG.warning(_('Unable to %(action)s dhcp for %(net_id)s: there is '
- 'a conflict with its current state; please check '
- 'that the network and/or its subnet(s) still exist.')
- % {'net_id': network.id, 'action': action})
+ LOG.warning(_LW('Unable to %(action)s dhcp for %(net_id)s: there '
+ 'is a conflict with its current state; please '
+ 'check that the network and/or its subnet(s) '
+ 'still exist.'),
+ {'net_id': network.id, 'action': action})
except Exception as e:
self.schedule_resync(e, network.id)
if (isinstance(e, messaging.RemoteError)
and e.exc_type == 'NetworkNotFound'
or isinstance(e, exceptions.NetworkNotFound)):
- LOG.warning(_("Network %s has been deleted."), network.id)
+ LOG.warning(_LW("Network %s has been deleted."), network.id)
else:
- LOG.exception(_('Unable to %(action)s dhcp for %(net_id)s.')
+ LOG.exception(_LE('Unable to %(action)s dhcp for %(net_id)s.')
% {'net_id': network.id, 'action': action})
def schedule_resync(self, reason, network=None):
or 'None' is one of the networks, sync all of the networks.
"""
only_nets = set([] if (not networks or None in networks) else networks)
- LOG.info(_('Synchronizing state'))
+ LOG.info(_LI('Synchronizing state'))
pool = eventlet.GreenPool(cfg.CONF.num_sync_threads)
known_network_ids = set(self.cache.get_network_ids())
self.disable_dhcp_helper(deleted_id)
except Exception as e:
self.schedule_resync(e, deleted_id)
- LOG.exception(_('Unable to sync network state on deleted '
- 'network %s'), deleted_id)
+ LOG.exception(_LE('Unable to sync network state on '
+ 'deleted network %s'), deleted_id)
for network in active_networks:
if (not only_nets or # specifically resync all
network.id in only_nets): # specific network to sync
pool.spawn(self.safe_configure_dhcp_for_network, network)
pool.waitall()
- LOG.info(_('Synchronizing state complete'))
+ LOG.info(_LI('Synchronizing state complete'))
except Exception as e:
self.schedule_resync(e)
- LOG.exception(_('Unable to sync network state.'))
+ LOG.exception(_LE('Unable to sync network state.'))
@utils.exception_logger()
def _periodic_resync_helper(self):
for net, r in reasons.items():
if not net:
net = "*"
- LOG.debug(_("resync (%(network)s): %(reason)s"),
+ LOG.debug("resync (%(network)s): %(reason)s",
{"reason": r, "network": net})
self.sync_state(reasons.keys())
try:
network = self.plugin_rpc.get_network_info(network_id)
if not network:
- LOG.warn(_('Network %s has been deleted.'), network_id)
+ LOG.warn(_LW('Network %s has been deleted.'), network_id)
return network
except Exception as e:
self.schedule_resync(e, network_id)
- LOG.exception(_('Network %s info call failed.'), network_id)
+ LOG.exception(_LE('Network %s info call failed.'), network_id)
def enable_dhcp_helper(self, network_id):
"""Enable DHCP for a network that meets enabling criteria."""
try:
self.configure_dhcp_for_network(network)
except (exceptions.NetworkNotFound, RuntimeError):
- LOG.warn(_('Network %s may have been deleted and its resources '
- 'may have already been disposed.'), network.id)
+ LOG.warn(_LW('Network %s may have been deleted and its resources '
+ 'may have already been disposed.'), network.id)
def configure_dhcp_for_network(self, network):
if not network.admin_state_up:
if router_ports:
# Multiple router ports should not be allowed
if len(router_ports) > 1:
- LOG.warning(_("%(port_num)d router ports found on the "
- "metadata access network. Only the port "
- "%(port_id)s, for router %(router_id)s "
- "will be considered"),
+ LOG.warning(_LW("%(port_num)d router ports found on the "
+ "metadata access network. Only the port "
+ "%(port_id)s, for router %(router_id)s "
+ "will be considered"),
{'port_num': len(router_ports),
'port_id': router_ports[0].id,
'router_id': router_ports[0].device_id})
self.use_call = False
except AttributeError:
# This means the server does not support report_state
- LOG.warn(_("Neutron server does not support state report."
- " State report for this agent will be disabled."))
+ LOG.warn(_LW("Neutron server does not support state report."
+ " State report for this agent will be disabled."))
self.heartbeat.stop()
self.run()
return
except Exception:
- LOG.exception(_("Failed reporting state!"))
+ LOG.exception(_LE("Failed reporting state!"))
return
if self.agent_state.pop('start_flag', None):
self.run()
"""Handle the agent_updated notification event."""
self.schedule_resync(_("Agent updated: %(payload)s") %
{"payload": payload})
- LOG.info(_("agent_updated by server side %s!"), payload)
+ LOG.info(_LI("agent_updated by server side %s!"), payload)
def after_start(self):
- LOG.info(_("DHCP agent started"))
+ LOG.info(_LI("DHCP agent started"))
def register_options():
from neutron import context as n_context
from neutron import manager
from neutron.openstack.common import excutils
-from neutron.openstack.common.gettextutils import _LE, _LW
+from neutron.openstack.common.gettextutils import _LE, _LI, _LW
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
self.conf
)
except Exception:
- msg = _("Error importing interface driver "
- "'%s'") % self.conf.interface_driver
- LOG.error(msg)
+ LOG.error(_LE("Error importing interface driver "
+ "'%s'"), self.conf.interface_driver)
raise SystemExit(1)
self.context = n_context.get_admin_context_without_session()
The actual values are not verified for correctness.
"""
if not self.conf.interface_driver:
- msg = _('An interface driver must be specified')
+ msg = _LE('An interface driver must be specified')
LOG.error(msg)
raise SystemExit(1)
if not self.conf.use_namespaces and not self.conf.router_id:
- msg = _('Router id is required if not using namespaces.')
+ msg = _LE('Router id is required if not using namespaces.')
LOG.error(msg)
raise SystemExit(1)
if (ns.startswith(NS_PREFIX)
or ns.startswith(SNAT_NS_PREFIX)))
except RuntimeError:
- LOG.exception(_('RuntimeError in obtaining router list '
+ LOG.exception(_LE('RuntimeError in obtaining router list '
'for namespace cleanup.'))
return set()
try:
self._destroy_namespace(ns)
except RuntimeError:
- LOG.exception(_('Failed to destroy stale router namespace '
- '%s'), ns)
+ LOG.exception(_LE('Failed to destroy stale router namespace '
+ '%s'), ns)
self._clean_stale_namespaces = False
def _destroy_namespace(self, ns):
try:
ns_ip.netns.delete(ns)
except RuntimeError:
- msg = _('Failed trying to delete namespace: %s') % ns
- LOG.exception(msg)
+ LOG.exception(_LE('Failed trying to delete namespace: %s'), ns)
def _destroy_snat_namespace(self, ns):
ns_ip = ip_lib.IPWrapper(self.root_helper, namespace=ns)
def _router_removed(self, router_id):
ri = self.router_info.get(router_id)
if ri is None:
- LOG.warn(_("Info for router %s were not found. "
- "Skipping router removal"), router_id)
+ LOG.warn(_LW("Info for router %s were not found. "
+ "Skipping router removal"), router_id)
return
if ri.is_ha:
if not ips:
raise Exception(_("Router port %s has no IP address") % port['id'])
if len(ips) > 1:
- LOG.error(_("Ignoring multiple IPs on router port %s"),
+ LOG.error(_LE("Ignoring multiple IPs on router port %s"),
port['id'])
prefixlen = netaddr.IPNetwork(port['subnet']['cidr']).prefixlen
port['ip_cidr'] = "%s/%s" % (ips[0]['ip_address'], prefixlen)
id in current_port_ids])
stale_devs = current_internal_devs - current_port_devs
for stale_dev in stale_devs:
- LOG.debug(_('Deleting stale internal router device: %s'),
+ LOG.debug('Deleting stale internal router device: %s',
stale_dev)
self.driver.unplug(stale_dev,
namespace=ri.ns_name,
if dev.startswith(EXTERNAL_DEV_PREFIX)
and dev != interface_name]
for stale_dev in stale_devs:
- LOG.debug(_('Deleting stale external router device: %s'),
+ LOG.debug('Deleting stale external router device: %s',
stale_dev)
self.driver.unplug(stale_dev,
bridge=self.conf.external_network_bridge,
processutils.ProcessExecutionError):
# any exception occurred here should cause the floating IP
# to be set in error state
- LOG.warn(_("Unable to configure IP address for "
- "floating IP: %s"), fip['id'])
+ LOG.warn(_LW("Unable to configure IP address for "
+ "floating IP: %s"), fip['id'])
return l3_constants.FLOATINGIP_STATUS_ERROR
if ri.router['distributed']:
# Special Handling for DVR - update FIP namespace
namespace=ns_name)
ip_wrapper.netns.execute(arping_cmd, check_exit_code=True)
except Exception as e:
- LOG.error(_("Failed sending gratuitous ARP: %s"), str(e))
+ LOG.error(_LE("Failed sending gratuitous ARP: %s"), str(e))
if distributed:
device.addr.delete(net.version, ip_cidr)
if match_port:
return match_port[0]
else:
- LOG.error(_('DVR: no map match_port found!'))
+ LOG.error(_LE('DVR: no map match_port found!'))
def _create_dvr_gateway(self, ri, ex_gw_port, gw_interface_name,
snat_ports):
ns_ipr.netns.execute(['sysctl', '-w', 'net.ipv4.conf.%s.'
'send_redirects=0' % sn_int])
except Exception:
- LOG.exception(_('DVR: error adding redirection logic'))
+ LOG.exception(_LE('DVR: error adding redirection logic'))
def _snat_redirect_remove(self, ri, sn_port, sn_int):
"""Removes rules and routes for SNAT redirection."""
ns_ipd.route.delete_gateway(table=snat_idx)
ns_ipr.delete_rule_priority(snat_idx)
except Exception:
- LOG.exception(_('DVR: removed snat failed'))
+ LOG.exception(_LE('DVR: removed snat failed'))
def _internal_network_added(self, ns_name, network_id, port_id,
internal_cidr, mac_address,
self.plugin_rpc.get_agent_gateway_port(
self.context, network_id))
if 'subnet' not in self.agent_gateway_port:
- LOG.error(_('Missing subnet/agent_gateway_port'))
+ LOG.error(_LE('Missing subnet/agent_gateway_port'))
return
self._set_subnet_info(self.agent_gateway_port)
def router_deleted(self, context, router_id):
"""Deal with router deletion RPC message."""
- LOG.debug(_('Got router deleted notification for %s'), router_id)
+ LOG.debug('Got router deleted notification for %s', router_id)
update = RouterUpdate(router_id, PRIORITY_RPC, action=DELETE_ROUTER)
self._queue.add(update)
elif operation == 'delete':
device.neigh.delete(net.version, ip, mac)
except Exception:
- LOG.exception(_("DVR: Failed updating arp entry"))
+ LOG.exception(_LE("DVR: Failed updating arp entry"))
self.fullsync = True
def add_arp_entry(self, context, payload):
def routers_updated(self, context, routers):
"""Deal with routers modification and creation RPC message."""
- LOG.debug(_('Got routers updated notification :%s'), routers)
+ LOG.debug('Got routers updated notification :%s', routers)
if routers:
# This is needed for backward compatibility
if isinstance(routers[0], dict):
self._queue.add(update)
def router_removed_from_agent(self, context, payload):
- LOG.debug(_('Got router removed from agent :%r'), payload)
+ LOG.debug('Got router removed from agent :%r', payload)
router_id = payload['router_id']
update = RouterUpdate(router_id, PRIORITY_RPC, action=DELETE_ROUTER)
self._queue.add(update)
def router_added_to_agent(self, context, payload):
- LOG.debug(_('Got router added to agent :%r'), payload)
+ LOG.debug('Got router added to agent :%r', payload)
self.routers_updated(context, payload)
def _process_router_if_compatible(self, router):
if (self.conf.external_network_bridge and
not ip_lib.device_exists(self.conf.external_network_bridge)):
- LOG.error(_("The external network bridge '%s' does not exist"),
+ LOG.error(_LE("The external network bridge '%s' does not exist"),
self.conf.external_network_bridge)
return
routers = self.plugin_rpc.get_routers(self.context,
[update.id])
except Exception:
- msg = _("Failed to fetch router information for '%s'")
+ msg = _LE("Failed to fetch router information for '%s'")
LOG.exception(msg, update.id)
self.fullsync = True
continue
def _sync_routers_task(self, context):
if self.services_sync:
super(L3NATAgent, self).process_services_sync(context)
- LOG.debug(_("Starting _sync_routers_task - fullsync:%s"),
+ LOG.debug("Starting _sync_routers_task - fullsync:%s",
self.fullsync)
if not self.fullsync:
return
routers = self.plugin_rpc.get_routers(
context, router_ids)
- LOG.debug(_('Processing :%r'), routers)
+ LOG.debug('Processing :%r', routers)
for r in routers:
update = RouterUpdate(r['id'],
PRIORITY_SYNC_ROUTERS_TASK,
timestamp=timestamp)
self._queue.add(update)
self.fullsync = False
- LOG.debug(_("_sync_routers_task successfully completed"))
+ LOG.debug("_sync_routers_task successfully completed")
except messaging.MessagingException:
- LOG.exception(_("Failed synchronizing routers due to RPC error"))
+ LOG.exception(_LE("Failed synchronizing routers due to RPC error"))
self.fullsync = True
except Exception:
- LOG.exception(_("Failed synchronizing routers"))
+ LOG.exception(_LE("Failed synchronizing routers"))
self.fullsync = True
else:
# Resync is not necessary for the cleanup of stale namespaces
def after_start(self):
eventlet.spawn_n(self._process_routers_loop)
- LOG.info(_("L3 agent started"))
+ LOG.info(_LI("L3 agent started"))
def _update_routing_table(self, ri, operation, route):
cmd = ['ip', 'route', operation, 'to', route['destination'],
adds, removes = common_utils.diff_list_of_dict(old_routes,
new_routes)
for route in adds:
- LOG.debug(_("Added route entry is '%s'"), route)
+ LOG.debug("Added route entry is '%s'", route)
# remove replaced route from deleted route
for del_route in removes:
if route['destination'] == del_route['destination']:
#replace success even if there is no existing route
self._update_routing_table(ri, 'replace', route)
for route in removes:
- LOG.debug(_("Removed route entry is '%s'"), route)
+ LOG.debug("Removed route entry is '%s'", route)
self._update_routing_table(ri, 'delete', route)
ri.routes = new_routes
self.heartbeat.start(interval=report_interval)
def _report_state(self):
- LOG.debug(_("Report state task started"))
+ LOG.debug("Report state task started")
num_ex_gw_ports = 0
num_interfaces = 0
num_floating_ips = 0
self.use_call)
self.agent_state.pop('start_flag', None)
self.use_call = False
- LOG.debug(_("Report state task successfully completed"))
+ LOG.debug("Report state task successfully completed")
except AttributeError:
# This means the server does not support report_state
- LOG.warn(_("Neutron server does not support state report."
- " State report for this agent will be disabled."))
+ LOG.warn(_LW("Neutron server does not support state report."
+ " State report for this agent will be disabled."))
self.heartbeat.stop()
return
except Exception:
- LOG.exception(_("Failed reporting state!"))
+ LOG.exception(_LE("Failed reporting state!"))
def agent_updated(self, context, payload):
"""Handle the agent_updated notification event."""
self.fullsync = True
- LOG.info(_("agent_updated by server side %s!"), payload)
+ LOG.info(_LI("agent_updated by server side %s!"), payload)
def _register_opts(conf):
import eventlet.queue
from neutron.agent.linux import utils
+from neutron.openstack.common.gettextutils import _LE
from neutron.openstack.common import log as logging
if self._kill_event:
raise AsyncProcessException(_('Process is already started'))
else:
- LOG.debug(_('Launching async process [%s].'), self.cmd)
+ LOG.debug('Launching async process [%s].', self.cmd)
self._spawn()
def stop(self):
"""Halt the process and watcher threads."""
if self._kill_event:
- LOG.debug(_('Halting async process [%s].'), self.cmd)
+ LOG.debug('Halting async process [%s].', self.cmd)
self._kill()
else:
raise AsyncProcessException(_('Process is not running.'))
stale_pid = (isinstance(ex, RuntimeError) and
'No such process' in str(ex))
if not stale_pid:
- LOG.exception(_('An error occurred while killing [%s].'),
+ LOG.exception(_LE('An error occurred while killing [%s].'),
self.cmd)
return False
return True
def _handle_process_error(self):
"""Kill the async process and respawn if necessary."""
- LOG.debug(_('Halting async process [%s] in response to an error.'),
+ LOG.debug('Halting async process [%s] in response to an error.',
self.cmd)
respawning = self.respawn_interval >= 0
self._kill(respawning=respawning)
if respawning:
eventlet.sleep(self.respawn_interval)
- LOG.debug(_('Respawning async process [%s].'), self.cmd)
+ LOG.debug('Respawning async process [%s].', self.cmd)
self._spawn()
def _watch_process(self, callback, kill_event):
if not callback():
break
except Exception:
- LOG.exception(_('An error occurred while communicating '
- 'with async process [%s].'), self.cmd)
+ LOG.exception(_LE('An error occurred while communicating '
+ 'with async process [%s].'), self.cmd)
break
# Ensure that watching a process with lots of output does
# not block execution of other greenthreads.
import signal
import sys
+from neutron.openstack.common.gettextutils import _LE
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
self.fd = os.open(pidfile, os.O_CREAT | os.O_RDWR)
fcntl.flock(self.fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
- LOG.exception(_("Error while handling pidfile: %s"), pidfile)
+ LOG.exception(_LE("Error while handling pidfile: %s"), pidfile)
sys.exit(1)
def __str__(self):
if pid > 0:
sys.exit(0)
except OSError:
- LOG.exception(_('Fork failed'))
+ LOG.exception(_LE('Fork failed'))
sys.exit(1)
def daemonize(self):
if self.pidfile.is_running():
self.pidfile.unlock()
- message = _('Pidfile %s already exist. Daemon already running?')
- LOG.error(message, self.pidfile)
+ LOG.error(_LE('Pidfile %s already exist. Daemon already '
+ 'running?'), self.pidfile)
sys.exit(1)
# Start the daemon
from neutron.common import constants
from neutron.common import exceptions
from neutron.common import utils as commonutils
+from neutron.openstack.common.gettextutils import _LE
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import uuidutils
cmd = ['kill', '-9', pid]
utils.execute(cmd, self.root_helper)
else:
- LOG.debug(_('DHCP for %(net_id)s is stale, pid %(pid)d '
- 'does not exist, performing cleanup'),
+ LOG.debug('DHCP for %(net_id)s is stale, pid %(pid)d '
+ 'does not exist, performing cleanup',
{'net_id': self.network.id, 'pid': pid})
if not retain_port:
self.device_manager.destroy(self.network,
self.interface_name)
else:
- LOG.debug(_('No DHCP started for %s'), self.network.id)
+ LOG.debug('No DHCP started for %s', self.network.id)
self._remove_config_files()
try:
ns_ip.netns.delete(self.network.namespace)
except RuntimeError:
- msg = _('Failed trying to delete namespace: %s')
- LOG.exception(msg, self.network.namespace)
+ LOG.exception(_LE('Failed trying to delete namespace: %s'),
+ self.network.namespace)
def _remove_config_files(self):
confs_dir = os.path.abspath(os.path.normpath(self.conf.dhcp_confs))
ver = re.findall("\d+.\d+", out)[0]
is_valid_version = float(ver) >= cls.MINIMUM_VERSION
if not is_valid_version:
- LOG.error(_('FAILED VERSION REQUIREMENT FOR DNSMASQ. '
- 'DHCP AGENT MAY NOT RUN CORRECTLY! '
- 'Please ensure that its version is %s '
- 'or above!'), cls.MINIMUM_VERSION)
+ LOG.error(_LE('FAILED VERSION REQUIREMENT FOR DNSMASQ. '
+ 'DHCP AGENT MAY NOT RUN CORRECTLY! '
+ 'Please ensure that its version is %s '
+ 'or above!'), cls.MINIMUM_VERSION)
raise SystemExit(1)
except (OSError, RuntimeError, IndexError, ValueError):
- LOG.error(_('Unable to determine dnsmasq version. '
- 'Please ensure that its version is %s '
- 'or above!'), cls.MINIMUM_VERSION)
+ LOG.error(_LE('Unable to determine dnsmasq version. '
+ 'Please ensure that its version is %s '
+ 'or above!'), cls.MINIMUM_VERSION)
raise SystemExit(1)
return float(ver)
# If all subnets turn off dhcp, kill the process.
if not self._enable_dhcp():
self.disable()
- LOG.debug(_('Killing dhcpmasq for network since all subnets have '
- 'turned off DHCP: %s'), self.network.id)
+ LOG.debug('Killing dhcpmasq for network since all subnets have '
+ 'turned off DHCP: %s', self.network.id)
return
self._release_unused_leases()
cmd = ['kill', '-HUP', self.pid]
utils.execute(cmd, self.root_helper)
else:
- LOG.debug(_('Pid %d is stale, relaunching dnsmasq'), self.pid)
- LOG.debug(_('Reloading allocations for network: %s'), self.network.id)
+ LOG.debug('Pid %d is stale, relaunching dnsmasq', self.pid)
+ LOG.debug('Reloading allocations for network: %s', self.network.id)
self.device_manager.update(self.network, self.interface_name)
def _iter_hosts(self):
buf = six.StringIO()
filename = self.get_conf_file_name('host')
- LOG.debug(_('Building host file: %s'), filename)
+ LOG.debug('Building host file: %s', filename)
for (port, alloc, hostname, name) in self._iter_hosts():
# (dzyu) Check if it is legal ipv6 address, if so, need wrap
# it with '[]' to let dnsmasq to distinguish MAC address from
if netaddr.valid_ipv6(ip_address):
ip_address = '[%s]' % ip_address
- LOG.debug(_('Adding %(mac)s : %(name)s : %(ip)s'),
+ LOG.debug('Adding %(mac)s : %(name)s : %(ip)s',
{"mac": port.mac_address, "name": name,
"ip": ip_address})
(port.mac_address, name, ip_address))
utils.replace_file(filename, buf.getvalue())
- LOG.debug(_('Done building host file %s'), filename)
+ LOG.debug('Done building host file %s', filename)
return filename
def _read_hosts_file_leases(self, filename):
self.root_helper = root_helper
self.plugin = plugin
if not conf.interface_driver:
- msg = _('An interface driver must be specified')
- LOG.error(msg)
+ LOG.error(_LE('An interface driver must be specified'))
raise SystemExit(1)
try:
self.driver = importutils.import_object(
conf.interface_driver, conf)
except Exception as e:
- msg = (_("Error importing interface driver '%(driver)s': "
- "%(inner)s") % {'driver': conf.interface_driver,
- 'inner': e})
- LOG.error(msg)
+ LOG.error(_LE("Error importing interface driver '%(driver)s': "
+ "%(inner)s"),
+ {'driver': conf.interface_driver,
+ 'inner': e})
raise SystemExit(1)
def get_interface_name(self, network, port):
continue
if gateway != subnet.gateway_ip:
- m = _('Setting gateway for dhcp netns on net %(n)s to %(ip)s')
- LOG.debug(m, {'n': network.id, 'ip': subnet.gateway_ip})
+ LOG.debug('Setting gateway for dhcp netns on net %(n)s to '
+ '%(ip)s',
+ {'n': network.id, 'ip': subnet.gateway_ip})
device.route.add_gateway(subnet.gateway_ip)
# No subnets on the network have a valid gateway. Clean it up to avoid
# confusion from seeing an invalid gateway here.
if gateway is not None:
- msg = _('Removing gateway for dhcp netns on net %s')
- LOG.debug(msg, network.id)
+ LOG.debug('Removing gateway for dhcp netns on net %s', network.id)
device.route.delete_gateway(gateway)
# check for a reserved DHCP port
if dhcp_port is None:
- LOG.debug(_('DHCP port %(device_id)s on network %(network_id)s'
- ' does not yet exist. Checking for a reserved port.'),
+ LOG.debug('DHCP port %(device_id)s on network %(network_id)s'
+ ' does not yet exist. Checking for a reserved port.',
{'device_id': device_id, 'network_id': network.id})
for port in network.ports:
port_device_id = getattr(port, 'device_id', None)
# DHCP port has not yet been created.
if dhcp_port is None:
- LOG.debug(_('DHCP port %(device_id)s on network %(network_id)s'
- ' does not yet exist.'), {'device_id': device_id,
- 'network_id': network.id})
+ LOG.debug('DHCP port %(device_id)s on network %(network_id)s'
+ ' does not yet exist.', {'device_id': device_id,
+ 'network_id': network.id})
port_dict = dict(
name='',
admin_state_up=True,
if ip_lib.ensure_device_is_ready(interface_name,
self.root_helper,
network.namespace):
- LOG.debug(_('Reusing existing device: %s.'), interface_name)
+ LOG.debug('Reusing existing device: %s.', interface_name)
else:
self.driver.plug(network.id,
port.id,
from neutron.common import constants as n_const
from neutron.common import exceptions
from neutron.extensions import flavor
-from neutron.openstack.common.gettextutils import _LE
+from neutron.openstack.common.gettextutils import _LE, _LI
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
if self.conf.ovs_use_veth:
root_dev.link.set_up()
else:
- LOG.info(_("Device %s already exists"), device_name)
+ LOG.info(_LI("Device %s already exists"), device_name)
def unplug(self, device_name, bridge=None, namespace=None, prefix=None):
"""Unplug the interface."""
self.root_helper,
namespace)
device.link.delete()
- LOG.debug(_("Unplugged interface '%s'"), device_name)
+ LOG.debug("Unplugged interface '%s'", device_name)
except RuntimeError:
- LOG.error(_("Failed unplugging interface '%s'"),
+ LOG.error(_LE("Failed unplugging interface '%s'"),
device_name)
utils.execute(cmd, self.root_helper)
else:
- LOG.info(_("Device %s already exists"), device_name)
+ LOG.info(_LI("Device %s already exists"), device_name)
def unplug(self, device_name, bridge=None, namespace=None, prefix=None):
# the port will be deleted by the dhcp agent that will call the plugin
try:
device.link.delete()
except RuntimeError:
- LOG.error(_("Failed unplugging interface '%s'"), device_name)
- LOG.debug(_("Unplugged interface '%s'"), device_name)
+ LOG.error(_LE("Failed unplugging interface '%s'"), device_name)
+ LOG.debug("Unplugged interface '%s'", device_name)
ip_lib.IPWrapper(
self.root_helper, namespace).garbage_collect_namespace()
ns_dev.link.set_up()
root_dev.link.set_up()
else:
- LOG.info(_("Device %s already exists"), device_name)
+ LOG.info(_LI("Device %s already exists"), device_name)
def unplug(self, device_name, bridge=None, namespace=None, prefix=None):
"""Unplug the interface."""
self.root_helper,
namespace)
device.link.delete()
- LOG.debug(_("Unplugged interface '%s'"), device_name)
+ LOG.debug("Unplugged interface '%s'", device_name)
except RuntimeError:
- LOG.error(_("Failed unplugging interface '%s'"),
+ LOG.error(_LE("Failed unplugging interface '%s'"),
device_name)
ns_veth.link.set_up()
else:
- LOG.info(_("Device %s already exists"), device_name)
+ LOG.info(_LI("Device %s already exists"), device_name)
def unplug(self, device_name, bridge=None, namespace=None, prefix=None):
"""Unplug the interface."""
device = ip_lib.IPDevice(device_name, self.root_helper, namespace)
try:
device.link.delete()
- LOG.debug(_("Unplugged interface '%s'"), device_name)
+ LOG.debug("Unplugged interface '%s'", device_name)
except RuntimeError:
- LOG.error(_("Failed unplugging interface '%s'"),
+ LOG.error(_LE("Failed unplugging interface '%s'"),
device_name)
return driver.unplug(device_name, bridge, namespace, prefix)
def _load_driver(self, driver_provider):
- LOG.debug(_("Driver location: %s"), driver_provider)
+ LOG.debug("Driver location: %s", driver_provider)
plugin_klass = importutils.import_class(driver_provider)
return plugin_klass(self.conf)
from neutron.agent.linux import iptables_manager
from neutron.common import constants
from neutron.common import ipv6_utils
+from neutron.openstack.common.gettextutils import _LI
from neutron.openstack.common import log as logging
self.sg_members[sg_id] = sg_members
def prepare_port_filter(self, port):
- LOG.debug(_("Preparing device (%s) filter"), port['device'])
+ LOG.debug("Preparing device (%s) filter", port['device'])
self._remove_chains()
self.filtered_ports[port['device']] = port
# each security group has it own chains
self.iptables.apply()
def update_port_filter(self, port):
- LOG.debug(_("Updating device (%s) filter"), port['device'])
+ LOG.debug("Updating device (%s) filter", port['device'])
if port['device'] not in self.filtered_ports:
- LOG.info(_('Attempted to update port filter which is not '
- 'filtered %s'), port['device'])
+ LOG.info(_LI('Attempted to update port filter which is not '
+ 'filtered %s'), port['device'])
return
self._remove_chains()
self.filtered_ports[port['device']] = port
self.iptables.apply()
def remove_port_filter(self, port):
- LOG.debug(_("Removing device (%s) filter"), port['device'])
+ LOG.debug("Removing device (%s) filter", port['device'])
if not self.filtered_ports.get(port['device']):
- LOG.info(_('Attempted to remove port filter which is not '
- 'filtered %r'), port)
+ LOG.info(_LI('Attempted to remove port filter which is not '
+ 'filtered %r'), port)
return
self._remove_chains()
self.filtered_ports.pop(port['device'], None)
from neutron.agent.linux import utils as linux_utils
from neutron.common import utils
from neutron.openstack.common import excutils
+from neutron.openstack.common.gettextutils import _LE, _LW
from neutron.openstack.common import lockutils
from neutron.openstack.common import log as logging
chain_set = self._select_chain_set(wrap)
if name not in chain_set:
- LOG.warn(_('Attempted to remove chain %s which does not exist'),
+ LOG.warn(_LW('Attempted to remove chain %s which does not exist'),
name)
return
self.wrap_name,
comment=comment))
except ValueError:
- LOG.warn(_('Tried to remove rule that was not there:'
- ' %(chain)r %(rule)r %(wrap)r %(top)r'),
+ LOG.warn(_LW('Tried to remove rule that was not there:'
+ ' %(chain)r %(rule)r %(wrap)r %(top)r'),
{'chain': chain, 'rule': rule,
'top': top, 'wrap': wrap})
try:
with lockutils.lock(lock_name, utils.SYNCHRONIZED_PREFIX, True):
- LOG.debug(_('Got semaphore / lock "%s"'), lock_name)
+ LOG.debug('Got semaphore / lock "%s"', lock_name)
return self._apply_synchronized()
finally:
- LOG.debug(_('Semaphore / lock released "%s"'), lock_name)
+ LOG.debug('Semaphore / lock released "%s"', lock_name)
def _apply_synchronized(self):
"""Apply the current in-memory set of iptables rules.
all_lines[log_start:log_end],
log_start + 1)
)
- LOG.error(_("IPTablesManager.apply failed to apply the "
- "following set of iptables rules:\n%s"),
+ LOG.error(_LE("IPTablesManager.apply failed to apply the "
+ "following set of iptables rules:\n%s"),
'\n'.join(log_lines))
- LOG.debug(_("IPTablesManager.apply completed with success"))
+ LOG.debug("IPTablesManager.apply completed with success")
def _find_table(self, lines, table_name):
if len(lines) < 3:
start = lines.index('*%s' % table_name) - 1
except ValueError:
# Couldn't find table_name
- LOG.debug(_('Unable to find table %s'), table_name)
+ LOG.debug('Unable to find table %s', table_name)
return (0, 0)
end = lines[start:].index('COMMIT') + start + 2
return (start, end)
"""Return the sum of the traffic counters of all rules of a chain."""
cmd_tables = self._get_traffic_counters_cmd_tables(chain, wrap)
if not cmd_tables:
- LOG.warn(_('Attempted to get traffic counters of chain %s which '
- 'does not exist'), chain)
+ LOG.warn(_LW('Attempted to get traffic counters of chain %s which '
+ 'does not exist'), chain)
return
name = get_chain_name(chain, wrap)
from neutron.agent.linux import utils
from neutron.common import exceptions
from neutron.openstack.common import excutils
-from neutron.openstack.common.gettextutils import _LI, _LW
+from neutron.openstack.common.gettextutils import _LE, _LI, _LW
from neutron.openstack.common import log as logging
from neutron.plugins.common import constants
return utils.execute(full_args, root_helper=self.root_helper)
except Exception as e:
with excutils.save_and_reraise_exception() as ctxt:
- LOG.error(_("Unable to execute %(cmd)s. "
- "Exception: %(exception)s"),
+ LOG.error(_LE("Unable to execute %(cmd)s. "
+ "Exception: %(exception)s"),
{'cmd': full_args, 'exception': e})
if not check_error:
ctxt.reraise = False
return utils.execute(full_args, root_helper=self.root_helper,
process_input=process_input)
except Exception as e:
- LOG.error(_("Unable to execute %(cmd)s. Exception: %(exception)s"),
+ LOG.error(_LE("Unable to execute %(cmd)s. Exception: "
+ "%(exception)s"),
{'cmd': full_args, 'exception': e})
def count_flows(self):
ofport = self.get_port_ofport(port_name)
if (tunnel_type == constants.TYPE_VXLAN and
ofport == INVALID_OFPORT):
- LOG.error(_('Unable to create VXLAN tunnel port. Please ensure '
- 'that an openvswitch version that supports VXLAN is '
- 'installed.'))
+ LOG.error(_LE('Unable to create VXLAN tunnel port. Please ensure '
+ 'that an openvswitch version that supports VXLAN is '
+ 'installed.'))
return ofport
def add_patch_port(self, local_name, remote_name):
return utils.execute(args, root_helper=self.root_helper).strip()
except Exception as e:
with excutils.save_and_reraise_exception():
- LOG.error(_("Unable to execute %(cmd)s. "
- "Exception: %(exception)s"),
+ LOG.error(_LE("Unable to execute %(cmd)s. "
+ "Exception: %(exception)s"),
{'cmd': args, 'exception': e})
# returns a VIF object for each VIF port
try:
int_ofport = int(ofport)
except (ValueError, TypeError):
- LOG.warn(_("Found not yet ready openvswitch port: %s"), row)
+ LOG.warn(_LW("Found not yet ready openvswitch port: %s"), row)
else:
if int_ofport > 0:
if ("iface-id" in external_ids and
external_ids["xs-vif-uuid"])
edge_ports.add(iface_id)
else:
- LOG.warn(_("Found failed openvswitch port: %s"), row)
+ LOG.warn(_LW("Found failed openvswitch port: %s"), row)
return edge_ports
def get_port_tag_dict(self):
if exc_type is None:
self.apply_flows()
else:
- LOG.exception(_("OVS flows could not be applied on bridge %s"),
+ LOG.exception(_LE("OVS flows could not be applied on bridge %s"),
self.br.br_name)
try:
return utils.execute(args, root_helper=root_helper).strip()
except Exception:
- LOG.exception(_("Interface %s not found."), iface)
+ LOG.exception(_LE("Interface %s not found."), iface)
return None
return utils.execute(args, root_helper=root_helper).strip().split("\n")
except Exception as e:
with excutils.save_and_reraise_exception():
- LOG.exception(_("Unable to retrieve bridges. Exception: %s"), e)
+ LOG.exception(_LE("Unable to retrieve bridges. Exception: %s"), e)
def get_bridge_external_bridge_id(root_helper, bridge):
try:
return utils.execute(args, root_helper=root_helper).strip()
except Exception:
- LOG.exception(_("Bridge %s not found."), bridge)
+ LOG.exception(_LE("Bridge %s not found."), bridge)
return None
import eventlet
from neutron.agent.linux import async_process
+from neutron.openstack.common.gettextutils import _LE
from neutron.openstack.common import log as logging
if not data:
return
self._stdout_lines.put(data)
- LOG.debug(_('Output received from ovsdb monitor: %s') % data)
+ LOG.debug('Output received from ovsdb monitor: %s', data)
return data
def _read_stderr(self):
data = super(OvsdbMonitor, self)._read_stderr()
if data:
- LOG.error(_('Error received from ovsdb monitor: %s') % data)
+ LOG.error(_LE('Error received from ovsdb monitor: %s'), data)
# Do not return value to ensure that stderr output will
# stop the monitor.
cmd = shlex.split(root_helper) + cmd
cmd = map(str, cmd)
- LOG.debug(_("Running command: %s"), cmd)
+ LOG.debug("Running command: %s", cmd)
env = os.environ.copy()
if addl_env:
env.update(addl_env)
from neutron import context
from neutron.openstack.common.cache import cache
from neutron.openstack.common import excutils
-from neutron.openstack.common.gettextutils import _LW
+from neutron.openstack.common.gettextutils import _LE, _LW
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron import wsgi
@webob.dec.wsgify(RequestClass=webob.Request)
def __call__(self, req):
try:
- LOG.debug(_("Request: %s"), req)
+ LOG.debug("Request: %s", req)
instance_id, tenant_id = self._get_instance_and_tenant_id(req)
if instance_id:
return webob.exc.HTTPNotFound()
except Exception:
- LOG.exception(_("Unexpected error."))
+ LOG.exception(_LE("Unexpected error."))
msg = _('An unknown error has occurred. '
'Please try your request again.')
return webob.exc.HTTPInternalServerError(explanation=unicode(msg))
req.response.body = content
return req.response
elif resp.status == 403:
- msg = _(
+ LOG.warn(_LW(
'The remote metadata server responded with Forbidden. This '
'response usually occurs when shared secrets do not match.'
- )
- LOG.warn(msg)
+ ))
return webob.exc.HTTPForbidden()
elif resp.status == 400:
return webob.exc.HTTPBadRequest()
use_call=self.agent_state.get('start_flag'))
except AttributeError:
# This means the server does not support report_state
- LOG.warn(_('Neutron server does not support state report.'
- ' State report for this agent will be disabled.'))
+ LOG.warn(_LW('Neutron server does not support state report.'
+ ' State report for this agent will be disabled.'))
self.heartbeat.stop()
return
except Exception:
- LOG.exception(_("Failed reporting state!"))
+ LOG.exception(_LE("Failed reporting state!"))
return
self.agent_state.pop('start_flag', None)
from neutron.agent.linux import daemon
from neutron.common import config
from neutron.common import utils
+from neutron.openstack.common.gettextutils import _LE
from neutron.openstack.common import log as logging
from neutron import wsgi
@webob.dec.wsgify(RequestClass=webob.Request)
def __call__(self, req):
- LOG.debug(_("Request: %s"), req)
+ LOG.debug("Request: %s", req)
try:
return self._proxy_request(req.remote_addr,
req.method,
req.query_string,
req.body)
except Exception:
- LOG.exception(_("Unexpected error."))
+ LOG.exception(_LE("Unexpected error."))
msg = _('An unknown error has occurred. '
'Please try your request again.')
return webob.exc.HTTPInternalServerError(explanation=unicode(msg))
from neutron.agent.linux import ovs_lib
from neutron.api.v2 import attributes
from neutron.common import config
+from neutron.openstack.common.gettextutils import _LE
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
bridge = ovs_lib.OVSBridge(bridge_name, root_helper)
bridge.delete_port(device.name)
else:
- LOG.debug(_('Unable to find bridge for device: %s'), device.name)
+ LOG.debug('Unable to find bridge for device: %s', device.name)
def destroy_namespace(conf, namespace, force=False):
ip.garbage_collect_namespace()
except Exception:
- LOG.exception(_('Error unable to destroy namespace: %s'), namespace)
+ LOG.exception(_LE('Error unable to destroy namespace: %s'), namespace)
def main():
from neutron.agent.linux import ip_lib
from neutron.agent.linux import ovs_lib
from neutron.common import config
+from neutron.openstack.common.gettextutils import _LI
from neutron.openstack.common import log as logging
if ip_lib.device_exists(port):
device = ip_lib.IPDevice(port, root_helper)
device.link.delete()
- LOG.info(_("Delete %s"), port)
+ LOG.info(_LI("Deleting port: %s"), port)
def main():
conf.AGENT.root_helper)
for bridge in bridges:
- LOG.info(_("Cleaning %s"), bridge)
+ LOG.info(_LI("Cleaning bridge: %s"), bridge)
ovs = ovs_lib.OVSBridge(bridge, conf.AGENT.root_helper)
ovs.delete_ports(all_ports=conf.ovs_all_ports)
# Remove remaining ports created by Neutron (usually veth pair)
delete_neutron_ports(ports, conf.AGENT.root_helper)
- LOG.info(_("OVS cleanup completed successfully"))
+ LOG.info(_LI("OVS cleanup completed successfully"))
from neutron.common import rpc as n_rpc
from neutron.common import topics
-
+from neutron.openstack.common.gettextutils import _LW
from neutron.openstack.common import log as logging
from neutron.openstack.common import timeutils
# may not work correctly, however it can function in 'degraded'
# mode, in that DVR routers may not be in the system yet, and
# it might be not necessary to retrieve info about the host.
- LOG.warn(_('DVR functionality requires a server upgrade.'))
+ LOG.warn(_LW('DVR functionality requires a server upgrade.'))
res = [
self.call(context,
self.make_msg('get_device_details', device=device,
def is_firewall_enabled():
if not _is_valid_driver_combination():
- LOG.warn(_("Driver configuration doesn't match with "
- "enable_security_group"))
+ LOG.warn(_LW("Driver configuration doesn't match with "
+ "enable_security_group"))
return cfg.CONF.SECURITYGROUP.enable_security_group
"""A mix-in that enable SecurityGroup support in plugin rpc."""
def security_group_rules_for_devices(self, context, devices):
- LOG.debug(_("Get security group rules "
- "for devices via rpc %r"), devices)
+ LOG.debug("Get security group rules "
+ "for devices via rpc %r", devices)
return self.call(context,
self.make_msg('security_group_rules_for_devices',
devices=devices),
sg_agent = None
def _security_groups_agent_not_set(self):
- LOG.warning(_("Security group agent binding currently not set. "
- "This should be set by the end of the init "
- "process."))
+ LOG.warning(_LW("Security group agent binding currently not set. "
+ "This should be set by the end of the init "
+ "process."))
def security_groups_rule_updated(self, context, **kwargs):
"""Callback for security group rule update.
:param security_groups: list of updated security_groups
"""
security_groups = kwargs.get('security_groups', [])
- LOG.debug(
- _("Security group rule updated on remote: %s"), security_groups)
+ LOG.debug("Security group rule updated on remote: %s",
+ security_groups)
if not self.sg_agent:
return self._security_groups_agent_not_set()
self.sg_agent.security_groups_rule_updated(security_groups)
:param security_groups: list of updated security_groups
"""
security_groups = kwargs.get('security_groups', [])
- LOG.debug(
- _("Security group member updated on remote: %s"), security_groups)
+ LOG.debug("Security group member updated on remote: %s",
+ security_groups)
if not self.sg_agent:
return self._security_groups_agent_not_set()
self.sg_agent.security_groups_member_updated(security_groups)
def security_groups_provider_updated(self, context, **kwargs):
"""Callback for security group provider update."""
- LOG.debug(_("Provider rule updated"))
+ LOG.debug("Provider rule updated")
if not self.sg_agent:
return self._security_groups_agent_not_set()
self.sg_agent.security_groups_provider_updated()
def init_firewall(self, defer_refresh_firewall=False):
firewall_driver = cfg.CONF.SECURITYGROUP.firewall_driver
- LOG.debug(_("Init firewall settings (driver=%s)"), firewall_driver)
+ LOG.debug("Init firewall settings (driver=%s)", firewall_driver)
if not _is_valid_driver_combination():
- LOG.warn(_("Driver configuration doesn't match "
- "with enable_security_group"))
+ LOG.warn(_LW("Driver configuration doesn't match "
+ "with enable_security_group"))
if not firewall_driver:
firewall_driver = 'neutron.agent.firewall.NoopFirewallDriver'
self.firewall = importutils.import_object(firewall_driver)
devices.append(device['device'])
if devices:
if self.defer_refresh_firewall:
- LOG.debug(_("Adding %s devices to the list of devices "
- "for which firewall needs to be refreshed"),
+ LOG.debug("Adding %s devices to the list of devices "
+ "for which firewall needs to be refreshed",
devices)
self.devices_to_refilter |= set(devices)
else:
with self.firewall.defer_apply():
for device in devices.values():
- LOG.debug(_("Update port filter for %s"), device['device'])
+ LOG.debug("Update port filter for %s", device['device'])
self.firewall.update_port_filter(device)
if self.use_enhanced_rpc:
LOG.debug("Update security group information for ports %s",
updated devices
"""
if new_devices:
- LOG.debug(_("Preparing device filters for %d new devices"),
+ LOG.debug("Preparing device filters for %d new devices",
len(new_devices))
self.prepare_devices_filter(new_devices)
# These data structures are cleared here in order to avoid
# refresh providing a precise list of devices for which firewall
# should be refreshed
if global_refresh_firewall:
- LOG.debug(_("Refreshing firewall for all filtered devices"))
+ LOG.debug("Refreshing firewall for all filtered devices")
self.refresh_firewall()
else:
# If a device is both in new and updated devices
updated_devices = ((updated_devices | devices_to_refilter) -
new_devices)
if updated_devices:
- LOG.debug(_("Refreshing firewall for %d devices"),
+ LOG.debug("Refreshing firewall for %d devices",
len(updated_devices))
self.refresh_firewall(updated_devices)
r"(.)*LOG\.(audit|error|info|warn|warning|critical|exception)\(\s*('|\")")
author_tag_re = (re.compile("^\s*#\s*@?(a|A)uthor"),
re.compile("^\.\.\s+moduleauthor::"))
+log_translation_hint = re.compile(
+ r"(.)*LOG\.(audit|error|info|warn|warning|critical|exception)"
+ "\(\s*(_\(|'|\")")
+
+
+def _directory_to_check_translation(filename):
+ # In order to try and speed up the integration of this we will
+ # do it on a directory by directory basis. The last patch of the
+ # series will remove this and the entire code base will be validated.
+ dirs = ["neutron/agent"]
+ return any([dir in filename for dir in dirs])
def validate_log_translations(logical_line, physical_line, filename):
if log_translation.match(logical_line):
yield (0, msg)
+ if _directory_to_check_translation(filename):
+ msg = "N320: Log messages require translation hints!"
+ if log_translation_hint.match(logical_line):
+ yield (0, msg)
+
def use_jsonutils(logical_line, filename):
msg = "N321: jsonutils.%(fun)s must be used instead of json.%(fun)s"
return pos, "N322: Don't use author tags"
+def no_translate_debug_logs(logical_line, filename):
+ """Check for 'LOG.debug(_('
+
+ As per our translation policy,
+ https://wiki.openstack.org/wiki/LoggingStandards#Log_Translation
+ we shouldn't translate debug level logs.
+
+ * This check assumes that 'LOG' is a logger.
+ N319
+ """
+ if _directory_to_check_translation(filename):
+ if logical_line.startswith("LOG.debug(_("):
+ yield(0, "N319 Don't translate debug level logs")
+
+
def check_assert_called_once(logical_line, filename):
msg = ("N323: assert_called_once is a no-op. please use "
"assert_called_once_with to test with explicit parameters or an "
register(use_jsonutils)
register(no_author_tags)
register(check_assert_called_once)
+ register(no_translate_debug_logs)
self.assertEqual(
0, len(list(checks.validate_log_translations(ok,
ok, 'f'))))
+ filename = 'neutron/agent/f'
+ bad = "LOG.%s(_('BAD - by directory'))" % log
+ self.assertEqual(
+ 1, len(list(checks.validate_log_translations(bad,
+ bad,
+ filename))))
def test_use_jsonutils(self):
def __get_msg(fun):
from neutron.common import config as base_config
from neutron.common import constants as l3_constants
from neutron.common import exceptions as n_exc
+from neutron.openstack.common.gettextutils import _LE
from neutron.openstack.common import processutils
from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants as p_const
with mock.patch.object(l3_agent, 'LOG') as log:
self.assertRaises(SystemExit, l3_agent.L3NATAgent,
HOSTNAME, self.conf)
- msg = "Error importing interface driver 'wrong_driver'"
- log.error.assert_called_once_with(msg)
+ msg = _LE("Error importing interface driver '%s'")
+ log.error.assert_called_once_with(msg, 'wrong_driver')
def test_metadata_filter_rules(self):
self.conf.set_override('enable_metadata_proxy', False)