import datetime
import functools
-import hashlib
import os
import time
import uuid
root_helper=mock_helper)
-class GetFromPathTestCase(test.TestCase):
- def test_tolerates_nones(self):
- f = utils.get_from_path
-
- input = []
- self.assertEqual([], f(input, "a"))
- self.assertEqual([], f(input, "a/b"))
- self.assertEqual([], f(input, "a/b/c"))
-
- input = [None]
- self.assertEqual([], f(input, "a"))
- self.assertEqual([], f(input, "a/b"))
- self.assertEqual([], f(input, "a/b/c"))
-
- input = [{'a': None}]
- self.assertEqual([], f(input, "a"))
- self.assertEqual([], f(input, "a/b"))
- self.assertEqual([], f(input, "a/b/c"))
-
- input = [{'a': {'b': None}}]
- self.assertEqual([{'b': None}], f(input, "a"))
- self.assertEqual([], f(input, "a/b"))
- self.assertEqual([], f(input, "a/b/c"))
-
- input = [{'a': {'b': {'c': None}}}]
- self.assertEqual([{'b': {'c': None}}], f(input, "a"))
- self.assertEqual([{'c': None}], f(input, "a/b"))
- self.assertEqual([], f(input, "a/b/c"))
-
- input = [{'a': {'b': {'c': None}}}, {'a': None}]
- self.assertEqual([{'b': {'c': None}}], f(input, "a"))
- self.assertEqual([{'c': None}], f(input, "a/b"))
- self.assertEqual([], f(input, "a/b/c"))
-
- input = [{'a': {'b': {'c': None}}}, {'a': {'b': None}}]
- self.assertEqual([{'b': {'c': None}}, {'b': None}], f(input, "a"))
- self.assertEqual([{'c': None}], f(input, "a/b"))
- self.assertEqual([], f(input, "a/b/c"))
-
- def test_does_select(self):
- f = utils.get_from_path
-
- input = [{'a': 'a_1'}]
- self.assertEqual(['a_1'], f(input, "a"))
- self.assertEqual([], f(input, "a/b"))
- self.assertEqual([], f(input, "a/b/c"))
-
- input = [{'a': {'b': 'b_1'}}]
- self.assertEqual([{'b': 'b_1'}], f(input, "a"))
- self.assertEqual(['b_1'], f(input, "a/b"))
- self.assertEqual([], f(input, "a/b/c"))
-
- input = [{'a': {'b': {'c': 'c_1'}}}]
- self.assertEqual([{'b': {'c': 'c_1'}}], f(input, "a"))
- self.assertEqual([{'c': 'c_1'}], f(input, "a/b"))
- self.assertEqual(['c_1'], f(input, "a/b/c"))
-
- input = [{'a': {'b': {'c': 'c_1'}}}, {'a': None}]
- self.assertEqual([{'b': {'c': 'c_1'}}], f(input, "a"))
- self.assertEqual([{'c': 'c_1'}], f(input, "a/b"))
- self.assertEqual(['c_1'], f(input, "a/b/c"))
-
- input = [{'a': {'b': {'c': 'c_1'}}},
- {'a': {'b': None}}]
- self.assertEqual([{'b': {'c': 'c_1'}}, {'b': None}], f(input, "a"))
- self.assertEqual([{'c': 'c_1'}], f(input, "a/b"))
- self.assertEqual(['c_1'], f(input, "a/b/c"))
-
- input = [{'a': {'b': {'c': 'c_1'}}},
- {'a': {'b': {'c': 'c_2'}}}]
- self.assertEqual([{'b': {'c': 'c_1'}}, {'b': {'c': 'c_2'}}],
- f(input, "a"))
- self.assertEqual([{'c': 'c_1'}, {'c': 'c_2'}], f(input, "a/b"))
- self.assertEqual(['c_1', 'c_2'], f(input, "a/b/c"))
-
- self.assertEqual([], f(input, "a/b/c/d"))
- self.assertEqual([], f(input, "c/a/b/d"))
- self.assertEqual([], f(input, "i/r/t"))
-
- def test_flattens_lists(self):
- f = utils.get_from_path
-
- input = [{'a': [1, 2, 3]}]
- self.assertEqual([1, 2, 3], f(input, "a"))
- self.assertEqual([], f(input, "a/b"))
- self.assertEqual([], f(input, "a/b/c"))
-
- input = [{'a': {'b': [1, 2, 3]}}]
- self.assertEqual([{'b': [1, 2, 3]}], f(input, "a"))
- self.assertEqual([1, 2, 3], f(input, "a/b"))
- self.assertEqual([], f(input, "a/b/c"))
-
- input = [{'a': {'b': [1, 2, 3]}}, {'a': {'b': [4, 5, 6]}}]
- self.assertEqual([1, 2, 3, 4, 5, 6], f(input, "a/b"))
- self.assertEqual([], f(input, "a/b/c"))
-
- input = [{'a': [{'b': [1, 2, 3]}, {'b': [4, 5, 6]}]}]
- self.assertEqual([1, 2, 3, 4, 5, 6], f(input, "a/b"))
- self.assertEqual([], f(input, "a/b/c"))
-
- input = [{'a': [1, 2, {'b': 'b_1'}]}]
- self.assertEqual([1, 2, {'b': 'b_1'}], f(input, "a"))
- self.assertEqual(['b_1'], f(input, "a/b"))
-
- def test_bad_xpath(self):
- f = utils.get_from_path
-
- self.assertRaises(exception.Error, f, [], None)
- self.assertRaises(exception.Error, f, [], "")
- self.assertRaises(exception.Error, f, [], "/")
- self.assertRaises(exception.Error, f, [], "/a")
- self.assertRaises(exception.Error, f, [], "/a/")
- self.assertRaises(exception.Error, f, [], "//")
- self.assertRaises(exception.Error, f, [], "//a")
- self.assertRaises(exception.Error, f, [], "a//a")
- self.assertRaises(exception.Error, f, [], "a//a/")
- self.assertRaises(exception.Error, f, [], "a/a/")
-
- def test_real_failure1(self):
- # Real world failure case...
- # We weren't coping when the input was a Dictionary instead of a List
- # This led to test_accepts_dictionaries
- f = utils.get_from_path
-
- inst = {'fixed_ip': {'floating_ips': [{'address': '1.2.3.4'}],
- 'address': '192.168.0.3'},
- 'hostname': ''}
-
- private_ips = f(inst, 'fixed_ip/address')
- public_ips = f(inst, 'fixed_ip/floating_ips/address')
- self.assertEqual(['192.168.0.3'], private_ips)
- self.assertEqual(['1.2.3.4'], public_ips)
-
- def test_accepts_dictionaries(self):
- f = utils.get_from_path
-
- input = {'a': [1, 2, 3]}
- self.assertEqual([1, 2, 3], f(input, "a"))
- self.assertEqual([], f(input, "a/b"))
- self.assertEqual([], f(input, "a/b/c"))
-
- input = {'a': {'b': [1, 2, 3]}}
- self.assertEqual([{'b': [1, 2, 3]}], f(input, "a"))
- self.assertEqual([1, 2, 3], f(input, "a/b"))
- self.assertEqual([], f(input, "a/b/c"))
-
- input = {'a': [{'b': [1, 2, 3]}, {'b': [4, 5, 6]}]}
- self.assertEqual([1, 2, 3, 4, 5, 6], f(input, "a/b"))
- self.assertEqual([], f(input, "a/b/c"))
-
- input = {'a': [1, 2, {'b': 'b_1'}]}
- self.assertEqual([1, 2, {'b': 'b_1'}], f(input, "a"))
- self.assertEqual(['b_1'], f(input, "a/b"))
-
-
class GenericUtilsTestCase(test.TestCase):
@mock.patch('os.path.exists', return_value=True)
utils.safe_minidom_parse_string,
killer_body())
- def test_xhtml_escape(self):
- self.assertEqual('"foo"', utils.xhtml_escape('"foo"'))
- self.assertEqual(''foo'', utils.xhtml_escape("'foo'"))
-
- def test_hash_file(self):
- data = b'Mary had a little lamb, its fleece as white as snow'
- flo = six.BytesIO(data)
- h1 = utils.hash_file(flo)
- h2 = hashlib.sha1(data).hexdigest()
- self.assertEqual(h1, h2)
-
def test_check_ssh_injection(self):
cmd_list = ['ssh', '-D', 'my_name@name_of_remote_computer']
self.assertIsNone(utils.check_ssh_injection(cmd_list))
utils.check_ssh_injection,
with_multiple_quotes)
- @mock.patch('paramiko.SSHClient')
- def test_create_channel(self, mock_client):
- test_width = 600
- test_height = 800
- mock_channel = mock.Mock()
- mock_client.invoke_shell.return_value = mock_channel
- utils.create_channel(mock_client, test_width, test_height)
- mock_client.invoke_shell.assert_called_once_with()
- mock_channel.resize_pty.assert_called_once_with(test_width,
- test_height)
-
@mock.patch('os.stat')
def test_get_file_mode(self, mock_stat):
class stat_result(object):
import contextlib
import datetime
import functools
-import hashlib
import inspect
import logging as py_logging
import os
from xml.parsers import expat
from xml import sax
from xml.sax import expatreader
-from xml.sax import saxutils
from os_brick.initiator import connector
from oslo_concurrency import lockutils
raise exception.SSHInjectionThreat(command=cmd_list)
-def create_channel(client, width, height):
- """Invoke an interactive shell session on server."""
- channel = client.invoke_shell()
- channel.resize_pty(width, height)
- return channel
-
-
def cinderdir():
import cinder
return os.path.abspath(cinder.__file__).split('cinder/__init__.py')[0]
raise expat.ExpatError()
-def xhtml_escape(value):
- """Escapes a string so it is valid within XML or XHTML."""
- return saxutils.escape(value, {'"': '"', "'": '''})
-
-
-def get_from_path(items, path):
- """Returns a list of items matching the specified path.
-
- Takes an XPath-like expression e.g. prop1/prop2/prop3, and for each item
- in items, looks up items[prop1][prop2][prop3]. Like XPath, if any of the
- intermediate results are lists it will treat each list item individually.
- A 'None' in items or any child expressions will be ignored, this function
- will not throw because of None (anywhere) in items. The returned list
- will contain no None values.
-
- """
- if path is None:
- raise exception.Error('Invalid mini_xpath')
-
- (first_token, sep, remainder) = path.partition('/')
-
- if first_token == '':
- raise exception.Error('Invalid mini_xpath')
-
- results = []
-
- if items is None:
- return results
-
- if not isinstance(items, list):
- # Wrap single objects in a list
- items = [items]
-
- for item in items:
- if item is None:
- continue
- get_method = getattr(item, 'get', None)
- if get_method is None:
- continue
- child = get_method(first_token)
- if child is None:
- continue
- if isinstance(child, list):
- # Flatten intermediate lists
- for x in child:
- results.append(x)
- else:
- results.append(child)
-
- if not sep:
- # No more tokens
- return results
- else:
- return get_from_path(results, remainder)
-
-
def is_valid_boolstr(val):
"""Check if the provided string is a valid bool string or not."""
val = str(val).lower()
return hostname
-def hash_file(file_like_object):
- """Generate a hash for the contents of a file."""
- checksum = hashlib.sha1()
- any(map(checksum.update, iter(lambda: file_like_object.read(32768), b'')))
- return checksum.hexdigest()
-
-
def service_is_up(service):
"""Check whether a service is up based on last heartbeat."""
last_heartbeat = service['updated_at'] or service['created_at']