|
Packit Service |
a04d08 |
# This file is part of cloud-init. See LICENSE file for license information.
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
"""Tests for cloudinit.subp utility functions"""
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
751c4a |
import json
|
|
Packit Service |
751c4a |
import os
|
|
Packit Service |
751c4a |
import sys
|
|
Packit Service |
751c4a |
import stat
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
from unittest import mock
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
from cloudinit import subp, util
|
|
Packit Service |
a04d08 |
from cloudinit.tests.helpers import CiTestCase
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
751c4a |
BASH = subp.which('bash')
|
|
Packit Service |
751c4a |
BOGUS_COMMAND = 'this-is-not-expected-to-be-a-program-name'
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
|
|
Packit Service |
a04d08 |
class TestPrependBaseCommands(CiTestCase):
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
with_logs = True
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_prepend_base_command_errors_on_neither_string_nor_list(self):
|
|
Packit Service |
a04d08 |
"""Raise an error for each command which is not a string or list."""
|
|
Packit Service |
a04d08 |
orig_commands = ['ls', 1, {'not': 'gonna work'}, ['basecmd', 'list']]
|
|
Packit Service |
a04d08 |
with self.assertRaises(TypeError) as context_manager:
|
|
Packit Service |
a04d08 |
subp.prepend_base_command(
|
|
Packit Service |
a04d08 |
base_command='basecmd', commands=orig_commands)
|
|
Packit Service |
a04d08 |
self.assertEqual(
|
|
Packit Service |
a04d08 |
"Invalid basecmd config. These commands are not a string or"
|
|
Packit Service |
a04d08 |
" list:\n1\n{'not': 'gonna work'}",
|
|
Packit Service |
a04d08 |
str(context_manager.exception))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_prepend_base_command_warns_on_non_base_string_commands(self):
|
|
Packit Service |
a04d08 |
"""Warn on each non-base for commands of type string."""
|
|
Packit Service |
a04d08 |
orig_commands = [
|
|
Packit Service |
a04d08 |
'ls', 'basecmd list', 'touch /blah', 'basecmd install x']
|
|
Packit Service |
a04d08 |
fixed_commands = subp.prepend_base_command(
|
|
Packit Service |
a04d08 |
base_command='basecmd', commands=orig_commands)
|
|
Packit Service |
a04d08 |
self.assertEqual(
|
|
Packit Service |
a04d08 |
'WARNING: Non-basecmd commands in basecmd config:\n'
|
|
Packit Service |
a04d08 |
'ls\ntouch /blah\n',
|
|
Packit Service |
a04d08 |
self.logs.getvalue())
|
|
Packit Service |
a04d08 |
self.assertEqual(orig_commands, fixed_commands)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_prepend_base_command_prepends_on_non_base_list_commands(self):
|
|
Packit Service |
a04d08 |
"""Prepend 'basecmd' for each non-basecmd command of type list."""
|
|
Packit Service |
a04d08 |
orig_commands = [['ls'], ['basecmd', 'list'], ['basecmda', '/blah'],
|
|
Packit Service |
a04d08 |
['basecmd', 'install', 'x']]
|
|
Packit Service |
a04d08 |
expected = [['basecmd', 'ls'], ['basecmd', 'list'],
|
|
Packit Service |
a04d08 |
['basecmd', 'basecmda', '/blah'],
|
|
Packit Service |
a04d08 |
['basecmd', 'install', 'x']]
|
|
Packit Service |
a04d08 |
fixed_commands = subp.prepend_base_command(
|
|
Packit Service |
a04d08 |
base_command='basecmd', commands=orig_commands)
|
|
Packit Service |
a04d08 |
self.assertEqual('', self.logs.getvalue())
|
|
Packit Service |
a04d08 |
self.assertEqual(expected, fixed_commands)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_prepend_base_command_removes_first_item_when_none(self):
|
|
Packit Service |
a04d08 |
"""Remove the first element of a non-basecmd when it is None."""
|
|
Packit Service |
a04d08 |
orig_commands = [[None, 'ls'], ['basecmd', 'list'],
|
|
Packit Service |
a04d08 |
[None, 'touch', '/blah'],
|
|
Packit Service |
a04d08 |
['basecmd', 'install', 'x']]
|
|
Packit Service |
a04d08 |
expected = [['ls'], ['basecmd', 'list'],
|
|
Packit Service |
a04d08 |
['touch', '/blah'],
|
|
Packit Service |
a04d08 |
['basecmd', 'install', 'x']]
|
|
Packit Service |
a04d08 |
fixed_commands = subp.prepend_base_command(
|
|
Packit Service |
a04d08 |
base_command='basecmd', commands=orig_commands)
|
|
Packit Service |
a04d08 |
self.assertEqual('', self.logs.getvalue())
|
|
Packit Service |
a04d08 |
self.assertEqual(expected, fixed_commands)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
class TestSubp(CiTestCase):
|
|
Packit Service |
751c4a |
allowed_subp = [BASH, 'cat', CiTestCase.SUBP_SHELL_TRUE,
|
|
Packit Service |
751c4a |
BOGUS_COMMAND, sys.executable]
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
stdin2err = [BASH, '-c', 'cat >&2']
|
|
Packit Service |
751c4a |
stdin2out = ['cat']
|
|
Packit Service |
751c4a |
utf8_invalid = b'ab\xaadef'
|
|
Packit Service |
751c4a |
utf8_valid = b'start \xc3\xa9 end'
|
|
Packit Service |
751c4a |
utf8_valid_2 = b'd\xc3\xa9j\xc8\xa7'
|
|
Packit Service |
751c4a |
printenv = [BASH, '-c', 'for n in "$@"; do echo "$n=${!n}"; done', '--']
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
def printf_cmd(self, *args):
|
|
Packit Service |
751c4a |
# bash's printf supports \xaa. So does /usr/bin/printf
|
|
Packit Service |
751c4a |
# but by using bash, we remove dependency on another program.
|
|
Packit Service |
751c4a |
return([BASH, '-c', 'printf "$@"', 'printf'] + list(args))
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
def test_subp_handles_bytestrings(self):
|
|
Packit Service |
751c4a |
"""subp can run a bytestring command if shell is True."""
|
|
Packit Service |
751c4a |
tmp_file = self.tmp_path('test.out')
|
|
Packit Service |
751c4a |
cmd = 'echo HI MOM >> {tmp_file}'.format(tmp_file=tmp_file)
|
|
Packit Service |
751c4a |
(out, _err) = subp.subp(cmd.encode('utf-8'), shell=True)
|
|
Packit Service |
751c4a |
self.assertEqual(u'', out)
|
|
Packit Service |
751c4a |
self.assertEqual(u'', _err)
|
|
Packit Service |
751c4a |
self.assertEqual('HI MOM\n', util.load_file(tmp_file))
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
def test_subp_handles_strings(self):
|
|
Packit Service |
751c4a |
"""subp can run a string command if shell is True."""
|
|
Packit Service |
751c4a |
tmp_file = self.tmp_path('test.out')
|
|
Packit Service |
751c4a |
cmd = 'echo HI MOM >> {tmp_file}'.format(tmp_file=tmp_file)
|
|
Packit Service |
751c4a |
(out, _err) = subp.subp(cmd, shell=True)
|
|
Packit Service |
751c4a |
self.assertEqual(u'', out)
|
|
Packit Service |
751c4a |
self.assertEqual(u'', _err)
|
|
Packit Service |
751c4a |
self.assertEqual('HI MOM\n', util.load_file(tmp_file))
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
def test_subp_handles_utf8(self):
|
|
Packit Service |
751c4a |
# The given bytes contain utf-8 accented characters as seen in e.g.
|
|
Packit Service |
751c4a |
# the "deja dup" package in Ubuntu.
|
|
Packit Service |
751c4a |
cmd = self.printf_cmd(self.utf8_valid_2)
|
|
Packit Service |
751c4a |
(out, _err) = subp.subp(cmd, capture=True)
|
|
Packit Service |
751c4a |
self.assertEqual(out, self.utf8_valid_2.decode('utf-8'))
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
def test_subp_respects_decode_false(self):
|
|
Packit Service |
751c4a |
(out, err) = subp.subp(self.stdin2out, capture=True, decode=False,
|
|
Packit Service |
751c4a |
data=self.utf8_valid)
|
|
Packit Service |
751c4a |
self.assertTrue(isinstance(out, bytes))
|
|
Packit Service |
751c4a |
self.assertTrue(isinstance(err, bytes))
|
|
Packit Service |
751c4a |
self.assertEqual(out, self.utf8_valid)
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
def test_subp_decode_ignore(self):
|
|
Packit Service |
751c4a |
# this executes a string that writes invalid utf-8 to stdout
|
|
Packit Service |
751c4a |
(out, _err) = subp.subp(self.printf_cmd('abc\\xaadef'),
|
|
Packit Service |
751c4a |
capture=True, decode='ignore')
|
|
Packit Service |
751c4a |
self.assertEqual(out, 'abcdef')
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
def test_subp_decode_strict_valid_utf8(self):
|
|
Packit Service |
751c4a |
(out, _err) = subp.subp(self.stdin2out, capture=True,
|
|
Packit Service |
751c4a |
decode='strict', data=self.utf8_valid)
|
|
Packit Service |
751c4a |
self.assertEqual(out, self.utf8_valid.decode('utf-8'))
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
def test_subp_decode_invalid_utf8_replaces(self):
|
|
Packit Service |
751c4a |
(out, _err) = subp.subp(self.stdin2out, capture=True,
|
|
Packit Service |
751c4a |
data=self.utf8_invalid)
|
|
Packit Service |
751c4a |
expected = self.utf8_invalid.decode('utf-8', 'replace')
|
|
Packit Service |
751c4a |
self.assertEqual(out, expected)
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
def test_subp_decode_strict_raises(self):
|
|
Packit Service |
751c4a |
args = []
|
|
Packit Service |
751c4a |
kwargs = {'args': self.stdin2out, 'capture': True,
|
|
Packit Service |
751c4a |
'decode': 'strict', 'data': self.utf8_invalid}
|
|
Packit Service |
751c4a |
self.assertRaises(UnicodeDecodeError, subp.subp, *args, **kwargs)
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
def test_subp_capture_stderr(self):
|
|
Packit Service |
751c4a |
data = b'hello world'
|
|
Packit Service |
751c4a |
(out, err) = subp.subp(self.stdin2err, capture=True,
|
|
Packit Service |
751c4a |
decode=False, data=data,
|
|
Packit Service |
751c4a |
update_env={'LC_ALL': 'C'})
|
|
Packit Service |
751c4a |
self.assertEqual(err, data)
|
|
Packit Service |
751c4a |
self.assertEqual(out, b'')
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
def test_subp_reads_env(self):
|
|
Packit Service |
751c4a |
with mock.patch.dict("os.environ", values={'FOO': 'BAR'}):
|
|
Packit Service |
751c4a |
out, _err = subp.subp(self.printenv + ['FOO'], capture=True)
|
|
Packit Service |
751c4a |
self.assertEqual('FOO=BAR', out.splitlines()[0])
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
def test_subp_env_and_update_env(self):
|
|
Packit Service |
751c4a |
out, _err = subp.subp(
|
|
Packit Service |
751c4a |
self.printenv + ['FOO', 'HOME', 'K1', 'K2'], capture=True,
|
|
Packit Service |
751c4a |
env={'FOO': 'BAR'},
|
|
Packit Service |
751c4a |
update_env={'HOME': '/myhome', 'K2': 'V2'})
|
|
Packit Service |
751c4a |
self.assertEqual(
|
|
Packit Service |
751c4a |
['FOO=BAR', 'HOME=/myhome', 'K1=', 'K2=V2'], out.splitlines())
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
def test_subp_update_env(self):
|
|
Packit Service |
751c4a |
extra = {'FOO': 'BAR', 'HOME': '/root', 'K1': 'V1'}
|
|
Packit Service |
751c4a |
with mock.patch.dict("os.environ", values=extra):
|
|
Packit Service |
751c4a |
out, _err = subp.subp(
|
|
Packit Service |
751c4a |
self.printenv + ['FOO', 'HOME', 'K1', 'K2'], capture=True,
|
|
Packit Service |
751c4a |
update_env={'HOME': '/myhome', 'K2': 'V2'})
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
self.assertEqual(
|
|
Packit Service |
751c4a |
['FOO=BAR', 'HOME=/myhome', 'K1=V1', 'K2=V2'], out.splitlines())
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
def test_subp_warn_missing_shebang(self):
|
|
Packit Service |
751c4a |
"""Warn on no #! in script"""
|
|
Packit Service |
751c4a |
noshebang = self.tmp_path('noshebang')
|
|
Packit Service |
751c4a |
util.write_file(noshebang, 'true\n')
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
print("os is %s" % os)
|
|
Packit Service |
751c4a |
os.chmod(noshebang, os.stat(noshebang).st_mode | stat.S_IEXEC)
|
|
Packit Service |
751c4a |
with self.allow_subp([noshebang]):
|
|
Packit Service |
751c4a |
self.assertRaisesRegex(subp.ProcessExecutionError,
|
|
Packit Service |
751c4a |
r'Missing #! in script\?',
|
|
Packit Service |
751c4a |
subp.subp, (noshebang,))
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
def test_subp_combined_stderr_stdout(self):
|
|
Packit Service |
751c4a |
"""Providing combine_capture as True redirects stderr to stdout."""
|
|
Packit Service |
751c4a |
data = b'hello world'
|
|
Packit Service |
751c4a |
(out, err) = subp.subp(self.stdin2err, capture=True,
|
|
Packit Service |
751c4a |
combine_capture=True, decode=False, data=data)
|
|
Packit Service |
751c4a |
self.assertEqual(b'', err)
|
|
Packit Service |
751c4a |
self.assertEqual(data, out)
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
def test_returns_none_if_no_capture(self):
|
|
Packit Service |
751c4a |
(out, err) = subp.subp(self.stdin2out, data=b'', capture=False)
|
|
Packit Service |
751c4a |
self.assertIsNone(err)
|
|
Packit Service |
751c4a |
self.assertIsNone(out)
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
def test_exception_has_out_err_are_bytes_if_decode_false(self):
|
|
Packit Service |
751c4a |
"""Raised exc should have stderr, stdout as bytes if no decode."""
|
|
Packit Service |
751c4a |
with self.assertRaises(subp.ProcessExecutionError) as cm:
|
|
Packit Service |
751c4a |
subp.subp([BOGUS_COMMAND], decode=False)
|
|
Packit Service |
751c4a |
self.assertTrue(isinstance(cm.exception.stdout, bytes))
|
|
Packit Service |
751c4a |
self.assertTrue(isinstance(cm.exception.stderr, bytes))
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
def test_exception_has_out_err_are_bytes_if_decode_true(self):
|
|
Packit Service |
751c4a |
"""Raised exc should have stderr, stdout as string if no decode."""
|
|
Packit Service |
751c4a |
with self.assertRaises(subp.ProcessExecutionError) as cm:
|
|
Packit Service |
751c4a |
subp.subp([BOGUS_COMMAND], decode=True)
|
|
Packit Service |
751c4a |
self.assertTrue(isinstance(cm.exception.stdout, str))
|
|
Packit Service |
751c4a |
self.assertTrue(isinstance(cm.exception.stderr, str))
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
def test_bunch_of_slashes_in_path(self):
|
|
Packit Service |
751c4a |
self.assertEqual("/target/my/path/",
|
|
Packit Service |
751c4a |
subp.target_path("/target/", "//my/path/"))
|
|
Packit Service |
751c4a |
self.assertEqual("/target/my/path/",
|
|
Packit Service |
751c4a |
subp.target_path("/target/", "///my/path/"))
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
def test_c_lang_can_take_utf8_args(self):
|
|
Packit Service |
751c4a |
"""Independent of system LC_CTYPE, args can contain utf-8 strings.
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
When python starts up, its default encoding gets set based on
|
|
Packit Service |
751c4a |
the value of LC_CTYPE. If no system locale is set, the default
|
|
Packit Service |
751c4a |
encoding for both python2 and python3 in some paths will end up
|
|
Packit Service |
751c4a |
being ascii.
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
Attempts to use setlocale or patching (or changing) os.environ
|
|
Packit Service |
751c4a |
in the current environment seem to not be effective.
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
This test starts up a python with LC_CTYPE set to C so that
|
|
Packit Service |
751c4a |
the default encoding will be set to ascii. In such an environment
|
|
Packit Service |
751c4a |
Popen(['command', 'non-ascii-arg']) would cause a UnicodeDecodeError.
|
|
Packit Service |
751c4a |
"""
|
|
Packit Service |
751c4a |
python_prog = '\n'.join([
|
|
Packit Service |
751c4a |
'import json, sys',
|
|
Packit Service |
751c4a |
'from cloudinit.subp import subp',
|
|
Packit Service |
751c4a |
'data = sys.stdin.read()',
|
|
Packit Service |
751c4a |
'cmd = json.loads(data)',
|
|
Packit Service |
751c4a |
'subp(cmd, capture=False)',
|
|
Packit Service |
751c4a |
''])
|
|
Packit Service |
751c4a |
cmd = [BASH, '-c', 'echo -n "$@"', '--',
|
|
Packit Service |
751c4a |
self.utf8_valid.decode("utf-8")]
|
|
Packit Service |
751c4a |
python_subp = [sys.executable, '-c', python_prog]
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
out, _err = subp.subp(
|
|
Packit Service |
751c4a |
python_subp, update_env={'LC_CTYPE': 'C'},
|
|
Packit Service |
751c4a |
data=json.dumps(cmd).encode("utf-8"),
|
|
Packit Service |
751c4a |
decode=False)
|
|
Packit Service |
751c4a |
self.assertEqual(self.utf8_valid, out)
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
def test_bogus_command_logs_status_messages(self):
|
|
Packit Service |
751c4a |
"""status_cb gets status messages logs on bogus commands provided."""
|
|
Packit Service |
751c4a |
logs = []
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
def status_cb(log):
|
|
Packit Service |
751c4a |
logs.append(log)
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
with self.assertRaises(subp.ProcessExecutionError):
|
|
Packit Service |
751c4a |
subp.subp([BOGUS_COMMAND], status_cb=status_cb)
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
expected = [
|
|
Packit Service |
751c4a |
'Begin run command: {cmd}\n'.format(cmd=BOGUS_COMMAND),
|
|
Packit Service |
751c4a |
'ERROR: End run command: invalid command provided\n']
|
|
Packit Service |
751c4a |
self.assertEqual(expected, logs)
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
def test_command_logs_exit_codes_to_status_cb(self):
|
|
Packit Service |
751c4a |
"""status_cb gets status messages containing command exit code."""
|
|
Packit Service |
751c4a |
logs = []
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
def status_cb(log):
|
|
Packit Service |
751c4a |
logs.append(log)
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
with self.assertRaises(subp.ProcessExecutionError):
|
|
Packit Service |
751c4a |
subp.subp([BASH, '-c', 'exit 2'], status_cb=status_cb)
|
|
Packit Service |
751c4a |
subp.subp([BASH, '-c', 'exit 0'], status_cb=status_cb)
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
expected = [
|
|
Packit Service |
751c4a |
'Begin run command: %s -c exit 2\n' % BASH,
|
|
Packit Service |
751c4a |
'ERROR: End run command: exit(2)\n',
|
|
Packit Service |
751c4a |
'Begin run command: %s -c exit 0\n' % BASH,
|
|
Packit Service |
751c4a |
'End run command: exit(0)\n']
|
|
Packit Service |
751c4a |
self.assertEqual(expected, logs)
|
|
Packit Service |
751c4a |
|
|
Packit Service |
751c4a |
|
|
Packit Service |
a04d08 |
# vi: ts=4 expandtab
|