Blame cloudinit/tests/test_subp.py

Packit Service a04d08
# This file is part of cloud-init. See LICENSE file for license information.
Packit Service a04d08
Packit Service a04d08
"""Tests for cloudinit.subp utility functions"""
Packit Service a04d08
Packit Service 751c4a
import json
Packit Service 751c4a
import os
Packit Service 751c4a
import sys
Packit Service 751c4a
import stat
Packit Service 751c4a
Packit Service 751c4a
from unittest import mock
Packit Service 751c4a
Packit Service 751c4a
from cloudinit import subp, util
Packit Service a04d08
from cloudinit.tests.helpers import CiTestCase
Packit Service a04d08
Packit Service a04d08
Packit Service 751c4a
BASH = subp.which('bash')
Packit Service 751c4a
BOGUS_COMMAND = 'this-is-not-expected-to-be-a-program-name'
Packit Service 751c4a
Packit Service 751c4a
Packit Service a04d08
class TestPrependBaseCommands(CiTestCase):
Packit Service a04d08
Packit Service a04d08
    with_logs = True
Packit Service a04d08
Packit Service a04d08
    def test_prepend_base_command_errors_on_neither_string_nor_list(self):
Packit Service a04d08
        """Raise an error for each command which is not a string or list."""
Packit Service a04d08
        orig_commands = ['ls', 1, {'not': 'gonna work'}, ['basecmd', 'list']]
Packit Service a04d08
        with self.assertRaises(TypeError) as context_manager:
Packit Service a04d08
            subp.prepend_base_command(
Packit Service a04d08
                base_command='basecmd', commands=orig_commands)
Packit Service a04d08
        self.assertEqual(
Packit Service a04d08
            "Invalid basecmd config. These commands are not a string or"
Packit Service a04d08
            " list:\n1\n{'not': 'gonna work'}",
Packit Service a04d08
            str(context_manager.exception))
Packit Service a04d08
Packit Service a04d08
    def test_prepend_base_command_warns_on_non_base_string_commands(self):
Packit Service a04d08
        """Warn on each non-base for commands of type string."""
Packit Service a04d08
        orig_commands = [
Packit Service a04d08
            'ls', 'basecmd list', 'touch /blah', 'basecmd install x']
Packit Service a04d08
        fixed_commands = subp.prepend_base_command(
Packit Service a04d08
            base_command='basecmd', commands=orig_commands)
Packit Service a04d08
        self.assertEqual(
Packit Service a04d08
            'WARNING: Non-basecmd commands in basecmd config:\n'
Packit Service a04d08
            'ls\ntouch /blah\n',
Packit Service a04d08
            self.logs.getvalue())
Packit Service a04d08
        self.assertEqual(orig_commands, fixed_commands)
Packit Service a04d08
Packit Service a04d08
    def test_prepend_base_command_prepends_on_non_base_list_commands(self):
Packit Service a04d08
        """Prepend 'basecmd' for each non-basecmd command of type list."""
Packit Service a04d08
        orig_commands = [['ls'], ['basecmd', 'list'], ['basecmda', '/blah'],
Packit Service a04d08
                         ['basecmd', 'install', 'x']]
Packit Service a04d08
        expected = [['basecmd', 'ls'], ['basecmd', 'list'],
Packit Service a04d08
                    ['basecmd', 'basecmda', '/blah'],
Packit Service a04d08
                    ['basecmd', 'install', 'x']]
Packit Service a04d08
        fixed_commands = subp.prepend_base_command(
Packit Service a04d08
            base_command='basecmd', commands=orig_commands)
Packit Service a04d08
        self.assertEqual('', self.logs.getvalue())
Packit Service a04d08
        self.assertEqual(expected, fixed_commands)
Packit Service a04d08
Packit Service a04d08
    def test_prepend_base_command_removes_first_item_when_none(self):
Packit Service a04d08
        """Remove the first element of a non-basecmd when it is None."""
Packit Service a04d08
        orig_commands = [[None, 'ls'], ['basecmd', 'list'],
Packit Service a04d08
                         [None, 'touch', '/blah'],
Packit Service a04d08
                         ['basecmd', 'install', 'x']]
Packit Service a04d08
        expected = [['ls'], ['basecmd', 'list'],
Packit Service a04d08
                    ['touch', '/blah'],
Packit Service a04d08
                    ['basecmd', 'install', 'x']]
Packit Service a04d08
        fixed_commands = subp.prepend_base_command(
Packit Service a04d08
            base_command='basecmd', commands=orig_commands)
Packit Service a04d08
        self.assertEqual('', self.logs.getvalue())
Packit Service a04d08
        self.assertEqual(expected, fixed_commands)
Packit Service a04d08
Packit Service 751c4a
Packit Service 751c4a
class TestSubp(CiTestCase):
Packit Service 751c4a
    allowed_subp = [BASH, 'cat', CiTestCase.SUBP_SHELL_TRUE,
Packit Service 751c4a
                    BOGUS_COMMAND, sys.executable]
Packit Service 751c4a
Packit Service 751c4a
    stdin2err = [BASH, '-c', 'cat >&2']
Packit Service 751c4a
    stdin2out = ['cat']
Packit Service 751c4a
    utf8_invalid = b'ab\xaadef'
Packit Service 751c4a
    utf8_valid = b'start \xc3\xa9 end'
Packit Service 751c4a
    utf8_valid_2 = b'd\xc3\xa9j\xc8\xa7'
Packit Service 751c4a
    printenv = [BASH, '-c', 'for n in "$@"; do echo "$n=${!n}"; done', '--']
Packit Service 751c4a
Packit Service 751c4a
    def printf_cmd(self, *args):
Packit Service 751c4a
        # bash's printf supports \xaa.  So does /usr/bin/printf
Packit Service 751c4a
        # but by using bash, we remove dependency on another program.
Packit Service 751c4a
        return([BASH, '-c', 'printf "$@"', 'printf'] + list(args))
Packit Service 751c4a
Packit Service 751c4a
    def test_subp_handles_bytestrings(self):
Packit Service 751c4a
        """subp can run a bytestring command if shell is True."""
Packit Service 751c4a
        tmp_file = self.tmp_path('test.out')
Packit Service 751c4a
        cmd = 'echo HI MOM >> {tmp_file}'.format(tmp_file=tmp_file)
Packit Service 751c4a
        (out, _err) = subp.subp(cmd.encode('utf-8'), shell=True)
Packit Service 751c4a
        self.assertEqual(u'', out)
Packit Service 751c4a
        self.assertEqual(u'', _err)
Packit Service 751c4a
        self.assertEqual('HI MOM\n', util.load_file(tmp_file))
Packit Service 751c4a
Packit Service 751c4a
    def test_subp_handles_strings(self):
Packit Service 751c4a
        """subp can run a string command if shell is True."""
Packit Service 751c4a
        tmp_file = self.tmp_path('test.out')
Packit Service 751c4a
        cmd = 'echo HI MOM >> {tmp_file}'.format(tmp_file=tmp_file)
Packit Service 751c4a
        (out, _err) = subp.subp(cmd, shell=True)
Packit Service 751c4a
        self.assertEqual(u'', out)
Packit Service 751c4a
        self.assertEqual(u'', _err)
Packit Service 751c4a
        self.assertEqual('HI MOM\n', util.load_file(tmp_file))
Packit Service 751c4a
Packit Service 751c4a
    def test_subp_handles_utf8(self):
Packit Service 751c4a
        # The given bytes contain utf-8 accented characters as seen in e.g.
Packit Service 751c4a
        # the "deja dup" package in Ubuntu.
Packit Service 751c4a
        cmd = self.printf_cmd(self.utf8_valid_2)
Packit Service 751c4a
        (out, _err) = subp.subp(cmd, capture=True)
Packit Service 751c4a
        self.assertEqual(out, self.utf8_valid_2.decode('utf-8'))
Packit Service 751c4a
Packit Service 751c4a
    def test_subp_respects_decode_false(self):
Packit Service 751c4a
        (out, err) = subp.subp(self.stdin2out, capture=True, decode=False,
Packit Service 751c4a
                               data=self.utf8_valid)
Packit Service 751c4a
        self.assertTrue(isinstance(out, bytes))
Packit Service 751c4a
        self.assertTrue(isinstance(err, bytes))
Packit Service 751c4a
        self.assertEqual(out, self.utf8_valid)
Packit Service 751c4a
Packit Service 751c4a
    def test_subp_decode_ignore(self):
Packit Service 751c4a
        # this executes a string that writes invalid utf-8 to stdout
Packit Service 751c4a
        (out, _err) = subp.subp(self.printf_cmd('abc\\xaadef'),
Packit Service 751c4a
                                capture=True, decode='ignore')
Packit Service 751c4a
        self.assertEqual(out, 'abcdef')
Packit Service 751c4a
Packit Service 751c4a
    def test_subp_decode_strict_valid_utf8(self):
Packit Service 751c4a
        (out, _err) = subp.subp(self.stdin2out, capture=True,
Packit Service 751c4a
                                decode='strict', data=self.utf8_valid)
Packit Service 751c4a
        self.assertEqual(out, self.utf8_valid.decode('utf-8'))
Packit Service 751c4a
Packit Service 751c4a
    def test_subp_decode_invalid_utf8_replaces(self):
Packit Service 751c4a
        (out, _err) = subp.subp(self.stdin2out, capture=True,
Packit Service 751c4a
                                data=self.utf8_invalid)
Packit Service 751c4a
        expected = self.utf8_invalid.decode('utf-8', 'replace')
Packit Service 751c4a
        self.assertEqual(out, expected)
Packit Service 751c4a
Packit Service 751c4a
    def test_subp_decode_strict_raises(self):
Packit Service 751c4a
        args = []
Packit Service 751c4a
        kwargs = {'args': self.stdin2out, 'capture': True,
Packit Service 751c4a
                  'decode': 'strict', 'data': self.utf8_invalid}
Packit Service 751c4a
        self.assertRaises(UnicodeDecodeError, subp.subp, *args, **kwargs)
Packit Service 751c4a
Packit Service 751c4a
    def test_subp_capture_stderr(self):
Packit Service 751c4a
        data = b'hello world'
Packit Service 751c4a
        (out, err) = subp.subp(self.stdin2err, capture=True,
Packit Service 751c4a
                               decode=False, data=data,
Packit Service 751c4a
                               update_env={'LC_ALL': 'C'})
Packit Service 751c4a
        self.assertEqual(err, data)
Packit Service 751c4a
        self.assertEqual(out, b'')
Packit Service 751c4a
Packit Service 751c4a
    def test_subp_reads_env(self):
Packit Service 751c4a
        with mock.patch.dict("os.environ", values={'FOO': 'BAR'}):
Packit Service 751c4a
            out, _err = subp.subp(self.printenv + ['FOO'], capture=True)
Packit Service 751c4a
        self.assertEqual('FOO=BAR', out.splitlines()[0])
Packit Service 751c4a
Packit Service 751c4a
    def test_subp_env_and_update_env(self):
Packit Service 751c4a
        out, _err = subp.subp(
Packit Service 751c4a
            self.printenv + ['FOO', 'HOME', 'K1', 'K2'], capture=True,
Packit Service 751c4a
            env={'FOO': 'BAR'},
Packit Service 751c4a
            update_env={'HOME': '/myhome', 'K2': 'V2'})
Packit Service 751c4a
        self.assertEqual(
Packit Service 751c4a
            ['FOO=BAR', 'HOME=/myhome', 'K1=', 'K2=V2'], out.splitlines())
Packit Service 751c4a
Packit Service 751c4a
    def test_subp_update_env(self):
Packit Service 751c4a
        extra = {'FOO': 'BAR', 'HOME': '/root', 'K1': 'V1'}
Packit Service 751c4a
        with mock.patch.dict("os.environ", values=extra):
Packit Service 751c4a
            out, _err = subp.subp(
Packit Service 751c4a
                self.printenv + ['FOO', 'HOME', 'K1', 'K2'], capture=True,
Packit Service 751c4a
                update_env={'HOME': '/myhome', 'K2': 'V2'})
Packit Service 751c4a
Packit Service 751c4a
        self.assertEqual(
Packit Service 751c4a
            ['FOO=BAR', 'HOME=/myhome', 'K1=V1', 'K2=V2'], out.splitlines())
Packit Service 751c4a
Packit Service 751c4a
    def test_subp_warn_missing_shebang(self):
Packit Service 751c4a
        """Warn on no #! in script"""
Packit Service 751c4a
        noshebang = self.tmp_path('noshebang')
Packit Service 751c4a
        util.write_file(noshebang, 'true\n')
Packit Service 751c4a
Packit Service 751c4a
        print("os is %s" % os)
Packit Service 751c4a
        os.chmod(noshebang, os.stat(noshebang).st_mode | stat.S_IEXEC)
Packit Service 751c4a
        with self.allow_subp([noshebang]):
Packit Service 751c4a
            self.assertRaisesRegex(subp.ProcessExecutionError,
Packit Service 751c4a
                                   r'Missing #! in script\?',
Packit Service 751c4a
                                   subp.subp, (noshebang,))
Packit Service 751c4a
Packit Service 751c4a
    def test_subp_combined_stderr_stdout(self):
Packit Service 751c4a
        """Providing combine_capture as True redirects stderr to stdout."""
Packit Service 751c4a
        data = b'hello world'
Packit Service 751c4a
        (out, err) = subp.subp(self.stdin2err, capture=True,
Packit Service 751c4a
                               combine_capture=True, decode=False, data=data)
Packit Service 751c4a
        self.assertEqual(b'', err)
Packit Service 751c4a
        self.assertEqual(data, out)
Packit Service 751c4a
Packit Service 751c4a
    def test_returns_none_if_no_capture(self):
Packit Service 751c4a
        (out, err) = subp.subp(self.stdin2out, data=b'', capture=False)
Packit Service 751c4a
        self.assertIsNone(err)
Packit Service 751c4a
        self.assertIsNone(out)
Packit Service 751c4a
Packit Service 751c4a
    def test_exception_has_out_err_are_bytes_if_decode_false(self):
Packit Service 751c4a
        """Raised exc should have stderr, stdout as bytes if no decode."""
Packit Service 751c4a
        with self.assertRaises(subp.ProcessExecutionError) as cm:
Packit Service 751c4a
            subp.subp([BOGUS_COMMAND], decode=False)
Packit Service 751c4a
        self.assertTrue(isinstance(cm.exception.stdout, bytes))
Packit Service 751c4a
        self.assertTrue(isinstance(cm.exception.stderr, bytes))
Packit Service 751c4a
Packit Service 751c4a
    def test_exception_has_out_err_are_bytes_if_decode_true(self):
Packit Service 751c4a
        """Raised exc should have stderr, stdout as string if no decode."""
Packit Service 751c4a
        with self.assertRaises(subp.ProcessExecutionError) as cm:
Packit Service 751c4a
            subp.subp([BOGUS_COMMAND], decode=True)
Packit Service 751c4a
        self.assertTrue(isinstance(cm.exception.stdout, str))
Packit Service 751c4a
        self.assertTrue(isinstance(cm.exception.stderr, str))
Packit Service 751c4a
Packit Service 751c4a
    def test_bunch_of_slashes_in_path(self):
Packit Service 751c4a
        self.assertEqual("/target/my/path/",
Packit Service 751c4a
                         subp.target_path("/target/", "//my/path/"))
Packit Service 751c4a
        self.assertEqual("/target/my/path/",
Packit Service 751c4a
                         subp.target_path("/target/", "///my/path/"))
Packit Service 751c4a
Packit Service 751c4a
    def test_c_lang_can_take_utf8_args(self):
Packit Service 751c4a
        """Independent of system LC_CTYPE, args can contain utf-8 strings.
Packit Service 751c4a
Packit Service 751c4a
        When python starts up, its default encoding gets set based on
Packit Service 751c4a
        the value of LC_CTYPE.  If no system locale is set, the default
Packit Service 751c4a
        encoding for both python2 and python3 in some paths will end up
Packit Service 751c4a
        being ascii.
Packit Service 751c4a
Packit Service 751c4a
        Attempts to use setlocale or patching (or changing) os.environ
Packit Service 751c4a
        in the current environment seem to not be effective.
Packit Service 751c4a
Packit Service 751c4a
        This test starts up a python with LC_CTYPE set to C so that
Packit Service 751c4a
        the default encoding will be set to ascii.  In such an environment
Packit Service 751c4a
        Popen(['command', 'non-ascii-arg']) would cause a UnicodeDecodeError.
Packit Service 751c4a
        """
Packit Service 751c4a
        python_prog = '\n'.join([
Packit Service 751c4a
            'import json, sys',
Packit Service 751c4a
            'from cloudinit.subp import subp',
Packit Service 751c4a
            'data = sys.stdin.read()',
Packit Service 751c4a
            'cmd = json.loads(data)',
Packit Service 751c4a
            'subp(cmd, capture=False)',
Packit Service 751c4a
            ''])
Packit Service 751c4a
        cmd = [BASH, '-c', 'echo -n "$@"', '--',
Packit Service 751c4a
               self.utf8_valid.decode("utf-8")]
Packit Service 751c4a
        python_subp = [sys.executable, '-c', python_prog]
Packit Service 751c4a
Packit Service 751c4a
        out, _err = subp.subp(
Packit Service 751c4a
            python_subp, update_env={'LC_CTYPE': 'C'},
Packit Service 751c4a
            data=json.dumps(cmd).encode("utf-8"),
Packit Service 751c4a
            decode=False)
Packit Service 751c4a
        self.assertEqual(self.utf8_valid, out)
Packit Service 751c4a
Packit Service 751c4a
    def test_bogus_command_logs_status_messages(self):
Packit Service 751c4a
        """status_cb gets status messages logs on bogus commands provided."""
Packit Service 751c4a
        logs = []
Packit Service 751c4a
Packit Service 751c4a
        def status_cb(log):
Packit Service 751c4a
            logs.append(log)
Packit Service 751c4a
Packit Service 751c4a
        with self.assertRaises(subp.ProcessExecutionError):
Packit Service 751c4a
            subp.subp([BOGUS_COMMAND], status_cb=status_cb)
Packit Service 751c4a
Packit Service 751c4a
        expected = [
Packit Service 751c4a
            'Begin run command: {cmd}\n'.format(cmd=BOGUS_COMMAND),
Packit Service 751c4a
            'ERROR: End run command: invalid command provided\n']
Packit Service 751c4a
        self.assertEqual(expected, logs)
Packit Service 751c4a
Packit Service 751c4a
    def test_command_logs_exit_codes_to_status_cb(self):
Packit Service 751c4a
        """status_cb gets status messages containing command exit code."""
Packit Service 751c4a
        logs = []
Packit Service 751c4a
Packit Service 751c4a
        def status_cb(log):
Packit Service 751c4a
            logs.append(log)
Packit Service 751c4a
Packit Service 751c4a
        with self.assertRaises(subp.ProcessExecutionError):
Packit Service 751c4a
            subp.subp([BASH, '-c', 'exit 2'], status_cb=status_cb)
Packit Service 751c4a
        subp.subp([BASH, '-c', 'exit 0'], status_cb=status_cb)
Packit Service 751c4a
Packit Service 751c4a
        expected = [
Packit Service 751c4a
            'Begin run command: %s -c exit 2\n' % BASH,
Packit Service 751c4a
            'ERROR: End run command: exit(2)\n',
Packit Service 751c4a
            'Begin run command: %s -c exit 0\n' % BASH,
Packit Service 751c4a
            'End run command: exit(0)\n']
Packit Service 751c4a
        self.assertEqual(expected, logs)
Packit Service 751c4a
Packit Service 751c4a
Packit Service a04d08
# vi: ts=4 expandtab