from neutron.agent.linux import dhcp
from neutron.agent.linux import external_process
-from neutron.agent.linux import utils as linux_utils
from neutron.agent.metadata import driver as metadata_driver
from neutron.agent import rpc as agent_rpc
from neutron.common import constants
ctx, self.conf.use_namespaces)
# create dhcp dir to store dhcp info
dhcp_dir = os.path.dirname("/%s/dhcp/" % self.conf.state_path)
- linux_utils.ensure_dir(dhcp_dir)
+ utils.ensure_dir(dhcp_dir)
self.dhcp_version = self.dhcp_driver_cls.check_version()
self._populate_networks_cache()
self._process_monitor = external_process.ProcessMonitor(
from neutron.agent.linux import keepalived
from neutron.agent.linux import utils as agent_utils
+from neutron.common import utils as common_utils
from neutron.i18n import _LI
from neutron.notifiers import batch_notifier
def _init_ha_conf_path(self):
ha_full_path = os.path.dirname("/%s/" % self.conf.ha_confs_path)
- agent_utils.ensure_dir(ha_full_path)
+ common_utils.ensure_dir(ha_full_path)
version, plugin)
self.confs_dir = self.get_confs_dir(conf)
self.network_conf_dir = os.path.join(self.confs_dir, network.id)
- utils.ensure_dir(self.network_conf_dir)
+ commonutils.ensure_dir(self.network_conf_dir)
@staticmethod
def get_confs_dir(conf):
if self.active:
self.restart()
elif self._enable_dhcp():
- utils.ensure_dir(self.network_conf_dir)
+ commonutils.ensure_dir(self.network_conf_dir)
interface_name = self.device_manager.setup(self.network)
self.interface_name = interface_name
self.spawn_process()
from neutron.agent.common import config as agent_cfg
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
+from neutron.common import utils as common_utils
from neutron.i18n import _LE
LOG = logging.getLogger(__name__)
self.service_pid_fname = 'pid'
self.service = 'default-service'
- utils.ensure_dir(os.path.dirname(self.get_pid_file_name()))
+ common_utils.ensure_dir(os.path.dirname(self.get_pid_file_name()))
def enable(self, cmd_callback=None, reload_cfg=False):
if not self.active:
from neutron.agent.linux import external_process
from neutron.agent.linux import utils
from neutron.common import exceptions
+from neutron.common import utils as common_utils
VALID_STATES = ['MASTER', 'BACKUP']
VALID_AUTH_TYPES = ['AH', 'PASS']
def get_full_config_file_path(self, filename, ensure_conf_dir=True):
conf_dir = self.get_conf_dir()
if ensure_conf_dir:
- utils.ensure_dir(conf_dir)
+ common_utils.ensure_dir(conf_dir)
return os.path.join(conf_dir, filename)
def _output_config_file(self):
# License for the specific language governing permissions and limitations
# under the License.
-import errno
import fcntl
import glob
import grp
import tempfile
import threading
+from debtcollector import removals
import eventlet
from eventlet.green import subprocess
from eventlet import greenthread
return [x.strip() for x in raw_pids.split('\n') if x.strip()]
-def ensure_dir(dir_path):
- """Ensure a directory with 755 permissions mode."""
- try:
- os.makedirs(dir_path, 0o755)
- except OSError as e:
- # If the directory already existed, don't raise the error.
- if e.errno != errno.EEXIST:
- raise
+@removals.remove(message='Use neutron.common.utils.ensure_dir instead.')
+def ensure_dir(*args, **kwargs):
+ return utils.ensure_dir(*args, **kwargs)
def _get_conf_base(cfg_root, uuid, ensure_conf_dir):
conf_dir = os.path.abspath(os.path.normpath(cfg_root))
conf_base = os.path.join(conf_dir, uuid)
if ensure_conf_dir:
- ensure_dir(conf_dir)
+ utils.ensure_dir(conf_dir)
return conf_base
if not os.path.exists(path):
ctxt.reraise = False
else:
- ensure_dir(dirname)
+ utils.ensure_dir(dirname)
def is_effective_user(user_id_or_name):
"""Utilities and helper functions."""
import datetime
+import errno
import functools
import hashlib
import logging as std_logging
return cfg_file
+def ensure_dir(dir_path):
+ """Ensure a directory with 755 permissions mode."""
+ try:
+ os.makedirs(dir_path, 0o755)
+ except OSError as e:
+ # If the directory already existed, don't raise the error.
+ if e.errno != errno.EEXIST:
+ raise
+
+
def _subprocess_setup():
# Python installs a SIGPIPE handler by default. This is usually not what
# non-Python subprocesses expect.
from neutron.agent.linux import async_process
from neutron.agent.linux import utils
+from neutron.common import utils as common_utils
from neutron.tests import base
from neutron.tests.common import net_helpers
from neutron.tests.fullstack import config_fixtures
def start(self):
fmt = self.process_name + "--%Y-%m-%d--%H%M%S.log"
log_dir = os.path.join(DEFAULT_LOG_DIR, self.test_name)
- utils.ensure_dir(log_dir)
+ common_utils.ensure_dir(log_dir)
cmd = [spawn.find_executable(self.exec_name),
'--log-dir', log_dir,
'neutron.agent.linux.ip_lib.device_exists')
self.device_exists = self.device_exists_p.start()
- self.ensure_dir = mock.patch('neutron.agent.linux.utils'
- '.ensure_dir').start()
+ self.ensure_dir = mock.patch('neutron.common.utils.ensure_dir').start()
mock.patch('neutron.agent.linux.keepalived.KeepalivedManager'
'.get_full_config_file_path').start()
'neutron.agent.linux.ip_lib.device_exists')
self.device_exists = self.device_exists_p.start()
- self.ensure_dir = mock.patch('neutron.agent.linux.utils'
- '.ensure_dir').start()
+ self.ensure_dir = mock.patch('neutron.common.utils.ensure_dir').start()
mock.patch('neutron.agent.linux.keepalived.KeepalivedManager'
'.get_full_config_file_path').start()
from neutron.agent.dhcp import config as dhcp_config
from neutron.agent.linux import dhcp
from neutron.agent.linux import external_process
-from neutron.agent.linux import utils
from neutron.common import config as base_config
from neutron.common import constants
+from neutron.common import utils
from neutron.extensions import extra_dhcp_opt as edo_ext
from neutron.tests import base
import os.path
from neutron.agent.linux import external_process as ep
-from neutron.agent.linux import utils
+from neutron.common import utils as common_utils
from neutron.tests import base
self.delete_if_exists = mock.patch(
'oslo_utils.fileutils.delete_if_exists').start()
self.ensure_dir = mock.patch.object(
- utils, 'ensure_dir').start()
+ common_utils, 'ensure_dir').start()
self.conf = mock.Mock()
self.conf.external_pids = '/var/path'
# License for the specific language governing permissions and limitations
# under the License.
-import errno
-import mock
import socket
+
+import mock
import testtools
from neutron.agent.linux import utils
getegid.assert_called_once_with()
getgrgid.assert_called_once_with(self.EGID)
- @mock.patch('os.makedirs')
- def test_ensure_dir_no_fail_if_exists(self, makedirs):
- error = OSError()
- error.errno = errno.EEXIST
- makedirs.side_effect = error
- utils.ensure_dir("/etc/create/concurrently")
-
- @mock.patch('os.makedirs')
- def test_ensure_dir_calls_makedirs(self, makedirs):
- utils.ensure_dir("/etc/create/directory")
- makedirs.assert_called_once_with("/etc/create/directory", 0o755)
-
class TestUnixDomainHttpConnection(base.BaseTestCase):
def test_connect(self):
self.cfg.CONF.metadata_backlog = 128
self.cfg.CONF.metadata_proxy_socket_mode = config.USER_MODE
- @mock.patch.object(agent_utils, 'ensure_dir')
+ @mock.patch.object(utils, 'ensure_dir')
def test_init_doesnot_exists(self, ensure_dir):
agent.UnixDomainMetadataProxy(mock.Mock())
ensure_dir.assert_called_once_with('/the')
@mock.patch.object(agent, 'MetadataProxyHandler')
@mock.patch.object(agent_utils, 'UnixDomainWSGIServer')
- @mock.patch.object(agent_utils, 'ensure_dir')
+ @mock.patch.object(utils, 'ensure_dir')
def test_run(self, ensure_dir, server, handler):
p = agent.UnixDomainMetadataProxy(self.cfg.CONF)
p.run()
# License for the specific language governing permissions and limitations
# under the License.
+import errno
+
import eventlet
import mock
import netaddr
LOG.logger.setLevel(logging.logging.DEBUG)
LOG.debug("Hello %s", delayed)
self.assertTrue(my_func.called)
+
+
+class TestEnsureDir(base.BaseTestCase):
+ @mock.patch('os.makedirs')
+ def test_ensure_dir_no_fail_if_exists(self, makedirs):
+ error = OSError()
+ error.errno = errno.EEXIST
+ makedirs.side_effect = error
+ utils.ensure_dir("/etc/create/concurrently")
+
+ @mock.patch('os.makedirs')
+ def test_ensure_dir_calls_makedirs(self, makedirs):
+ utils.ensure_dir("/etc/create/directory")
+ makedirs.assert_called_once_with("/etc/create/directory", 0o755)