Blob Blame History Raw
# This file is part of cloud-init. See LICENSE file for license information.

from collections import namedtuple
import os
from io import StringIO
from textwrap import dedent

from cloudinit.atomic_helper import write_json
from cloudinit.cmd import status
from cloudinit.util import ensure_file
from cloudinit.tests.helpers import CiTestCase, wrap_and_call, mock

mypaths = namedtuple('MyPaths', 'run_dir')
myargs = namedtuple('MyArgs', 'long wait')


class TestStatus(CiTestCase):

    def setUp(self):
        super(TestStatus, self).setUp()
        self.new_root = self.tmp_dir()
        self.status_file = self.tmp_path('status.json', self.new_root)
        self.disable_file = self.tmp_path('cloudinit-disable', self.new_root)
        self.paths = mypaths(run_dir=self.new_root)

        class FakeInit(object):
            paths = self.paths

            def __init__(self, ds_deps):
                pass

            def read_cfg(self):
                pass

        self.init_class = FakeInit

    def test__is_cloudinit_disabled_false_on_sysvinit(self):
        '''When not in an environment using systemd, return False.'''
        ensure_file(self.disable_file)  # Create the ignored disable file
        (is_disabled, reason) = wrap_and_call(
            'cloudinit.cmd.status',
            {'uses_systemd': False,
             'get_cmdline': "root=/dev/my-root not-important"},
            status._is_cloudinit_disabled, self.disable_file, self.paths)
        self.assertFalse(
            is_disabled, 'expected enabled cloud-init on sysvinit')
        self.assertEqual('Cloud-init enabled on sysvinit', reason)

    def test__is_cloudinit_disabled_true_on_disable_file(self):
        '''When using systemd and disable_file is present return disabled.'''
        ensure_file(self.disable_file)  # Create observed disable file
        (is_disabled, reason) = wrap_and_call(
            'cloudinit.cmd.status',
            {'uses_systemd': True,
             'get_cmdline': "root=/dev/my-root not-important"},
            status._is_cloudinit_disabled, self.disable_file, self.paths)
        self.assertTrue(is_disabled, 'expected disabled cloud-init')
        self.assertEqual(
            'Cloud-init disabled by {0}'.format(self.disable_file), reason)

    def test__is_cloudinit_disabled_false_on_kernel_cmdline_enable(self):
        '''Not disabled when using systemd and enabled via commandline.'''
        ensure_file(self.disable_file)  # Create ignored disable file
        (is_disabled, reason) = wrap_and_call(
            'cloudinit.cmd.status',
            {'uses_systemd': True,
             'get_cmdline': 'something cloud-init=enabled else'},
            status._is_cloudinit_disabled, self.disable_file, self.paths)
        self.assertFalse(is_disabled, 'expected enabled cloud-init')
        self.assertEqual(
            'Cloud-init enabled by kernel command line cloud-init=enabled',
            reason)

    def test__is_cloudinit_disabled_true_on_kernel_cmdline(self):
        '''When using systemd and disable_file is present return disabled.'''
        (is_disabled, reason) = wrap_and_call(
            'cloudinit.cmd.status',
            {'uses_systemd': True,
             'get_cmdline': 'something cloud-init=disabled else'},
            status._is_cloudinit_disabled, self.disable_file, self.paths)
        self.assertTrue(is_disabled, 'expected disabled cloud-init')
        self.assertEqual(
            'Cloud-init disabled by kernel parameter cloud-init=disabled',
            reason)

    def test__is_cloudinit_disabled_true_when_generator_disables(self):
        '''When cloud-init-generator doesn't write enabled file return True.'''
        enabled_file = os.path.join(self.paths.run_dir, 'enabled')
        self.assertFalse(os.path.exists(enabled_file))
        (is_disabled, reason) = wrap_and_call(
            'cloudinit.cmd.status',
            {'uses_systemd': True,
             'get_cmdline': 'something'},
            status._is_cloudinit_disabled, self.disable_file, self.paths)
        self.assertTrue(is_disabled, 'expected disabled cloud-init')
        self.assertEqual('Cloud-init disabled by cloud-init-generator', reason)

    def test__is_cloudinit_disabled_false_when_enabled_in_systemd(self):
        '''Report enabled when systemd generator creates the enabled file.'''
        enabled_file = os.path.join(self.paths.run_dir, 'enabled')
        ensure_file(enabled_file)
        (is_disabled, reason) = wrap_and_call(
            'cloudinit.cmd.status',
            {'uses_systemd': True,
             'get_cmdline': 'something ignored'},
            status._is_cloudinit_disabled, self.disable_file, self.paths)
        self.assertFalse(is_disabled, 'expected enabled cloud-init')
        self.assertEqual(
            'Cloud-init enabled by systemd cloud-init-generator', reason)

    def test_status_returns_not_run(self):
        '''When status.json does not exist yet, return 'not run'.'''
        self.assertFalse(
            os.path.exists(self.status_file), 'Unexpected status.json found')
        cmdargs = myargs(long=False, wait=False)
        with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
            retcode = wrap_and_call(
                'cloudinit.cmd.status',
                {'_is_cloudinit_disabled': (False, ''),
                 'Init': {'side_effect': self.init_class}},
                status.handle_status_args, 'ignored', cmdargs)
        self.assertEqual(0, retcode)
        self.assertEqual('status: not run\n', m_stdout.getvalue())

    def test_status_returns_disabled_long_on_presence_of_disable_file(self):
        '''When cloudinit is disabled, return disabled reason.'''

        checked_files = []

        def fakeexists(filepath):
            checked_files.append(filepath)
            status_file = os.path.join(self.paths.run_dir, 'status.json')
            return bool(not filepath == status_file)

        cmdargs = myargs(long=True, wait=False)
        with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
            retcode = wrap_and_call(
                'cloudinit.cmd.status',
                {'os.path.exists': {'side_effect': fakeexists},
                 '_is_cloudinit_disabled': (True, 'disabled for some reason'),
                 'Init': {'side_effect': self.init_class}},
                status.handle_status_args, 'ignored', cmdargs)
        self.assertEqual(0, retcode)
        self.assertEqual(
            [os.path.join(self.paths.run_dir, 'status.json')],
            checked_files)
        expected = dedent('''\
            status: disabled
            detail:
            disabled for some reason
        ''')
        self.assertEqual(expected, m_stdout.getvalue())

    def test_status_returns_running_on_no_results_json(self):
        '''Report running when status.json exists but result.json does not.'''
        result_file = self.tmp_path('result.json', self.new_root)
        write_json(self.status_file, {})
        self.assertFalse(
            os.path.exists(result_file), 'Unexpected result.json found')
        cmdargs = myargs(long=False, wait=False)
        with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
            retcode = wrap_and_call(
                'cloudinit.cmd.status',
                {'_is_cloudinit_disabled': (False, ''),
                 'Init': {'side_effect': self.init_class}},
                status.handle_status_args, 'ignored', cmdargs)
        self.assertEqual(0, retcode)
        self.assertEqual('status: running\n', m_stdout.getvalue())

    def test_status_returns_running(self):
        '''Report running when status exists with an unfinished stage.'''
        ensure_file(self.tmp_path('result.json', self.new_root))
        write_json(self.status_file,
                   {'v1': {'init': {'start': 1, 'finished': None}}})
        cmdargs = myargs(long=False, wait=False)
        with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
            retcode = wrap_and_call(
                'cloudinit.cmd.status',
                {'_is_cloudinit_disabled': (False, ''),
                 'Init': {'side_effect': self.init_class}},
                status.handle_status_args, 'ignored', cmdargs)
        self.assertEqual(0, retcode)
        self.assertEqual('status: running\n', m_stdout.getvalue())

    def test_status_returns_done(self):
        '''Report done results.json exists no stages are unfinished.'''
        ensure_file(self.tmp_path('result.json', self.new_root))
        write_json(
            self.status_file,
            {'v1': {'stage': None,  # No current stage running
                    'datasource': (
                        'DataSourceNoCloud [seed=/var/.../seed/nocloud-net]'
                        '[dsmode=net]'),
                    'blah': {'finished': 123.456},
                    'init': {'errors': [], 'start': 124.567,
                             'finished': 125.678},
                    'init-local': {'start': 123.45, 'finished': 123.46}}})
        cmdargs = myargs(long=False, wait=False)
        with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
            retcode = wrap_and_call(
                'cloudinit.cmd.status',
                {'_is_cloudinit_disabled': (False, ''),
                 'Init': {'side_effect': self.init_class}},
                status.handle_status_args, 'ignored', cmdargs)
        self.assertEqual(0, retcode)
        self.assertEqual('status: done\n', m_stdout.getvalue())

    def test_status_returns_done_long(self):
        '''Long format of done status includes datasource info.'''
        ensure_file(self.tmp_path('result.json', self.new_root))
        write_json(
            self.status_file,
            {'v1': {'stage': None,
                    'datasource': (
                        'DataSourceNoCloud [seed=/var/.../seed/nocloud-net]'
                        '[dsmode=net]'),
                    'init': {'start': 124.567, 'finished': 125.678},
                    'init-local': {'start': 123.45, 'finished': 123.46}}})
        cmdargs = myargs(long=True, wait=False)
        with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
            retcode = wrap_and_call(
                'cloudinit.cmd.status',
                {'_is_cloudinit_disabled': (False, ''),
                 'Init': {'side_effect': self.init_class}},
                status.handle_status_args, 'ignored', cmdargs)
        self.assertEqual(0, retcode)
        expected = dedent('''\
            status: done
            time: Thu, 01 Jan 1970 00:02:05 +0000
            detail:
            DataSourceNoCloud [seed=/var/.../seed/nocloud-net][dsmode=net]
        ''')
        self.assertEqual(expected, m_stdout.getvalue())

    def test_status_on_errors(self):
        '''Reports error when any stage has errors.'''
        write_json(
            self.status_file,
            {'v1': {'stage': None,
                    'blah': {'errors': [], 'finished': 123.456},
                    'init': {'errors': ['error1'], 'start': 124.567,
                             'finished': 125.678},
                    'init-local': {'start': 123.45, 'finished': 123.46}}})
        cmdargs = myargs(long=False, wait=False)
        with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
            retcode = wrap_and_call(
                'cloudinit.cmd.status',
                {'_is_cloudinit_disabled': (False, ''),
                 'Init': {'side_effect': self.init_class}},
                status.handle_status_args, 'ignored', cmdargs)
        self.assertEqual(1, retcode)
        self.assertEqual('status: error\n', m_stdout.getvalue())

    def test_status_on_errors_long(self):
        '''Long format of error status includes all error messages.'''
        write_json(
            self.status_file,
            {'v1': {'stage': None,
                    'datasource': (
                        'DataSourceNoCloud [seed=/var/.../seed/nocloud-net]'
                        '[dsmode=net]'),
                    'init': {'errors': ['error1'], 'start': 124.567,
                             'finished': 125.678},
                    'init-local': {'errors': ['error2', 'error3'],
                                   'start': 123.45, 'finished': 123.46}}})
        cmdargs = myargs(long=True, wait=False)
        with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
            retcode = wrap_and_call(
                'cloudinit.cmd.status',
                {'_is_cloudinit_disabled': (False, ''),
                 'Init': {'side_effect': self.init_class}},
                status.handle_status_args, 'ignored', cmdargs)
        self.assertEqual(1, retcode)
        expected = dedent('''\
            status: error
            time: Thu, 01 Jan 1970 00:02:05 +0000
            detail:
            error1
            error2
            error3
        ''')
        self.assertEqual(expected, m_stdout.getvalue())

    def test_status_returns_running_long_format(self):
        '''Long format reports the stage in which we are running.'''
        write_json(
            self.status_file,
            {'v1': {'stage': 'init',
                    'init': {'start': 124.456, 'finished': None},
                    'init-local': {'start': 123.45, 'finished': 123.46}}})
        cmdargs = myargs(long=True, wait=False)
        with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
            retcode = wrap_and_call(
                'cloudinit.cmd.status',
                {'_is_cloudinit_disabled': (False, ''),
                 'Init': {'side_effect': self.init_class}},
                status.handle_status_args, 'ignored', cmdargs)
        self.assertEqual(0, retcode)
        expected = dedent('''\
            status: running
            time: Thu, 01 Jan 1970 00:02:04 +0000
            detail:
            Running in stage: init
        ''')
        self.assertEqual(expected, m_stdout.getvalue())

    def test_status_wait_blocks_until_done(self):
        '''Specifying wait will poll every 1/4 second until done state.'''
        running_json = {
            'v1': {'stage': 'init',
                   'init': {'start': 124.456, 'finished': None},
                   'init-local': {'start': 123.45, 'finished': 123.46}}}
        done_json = {
            'v1': {'stage': None,
                   'init': {'start': 124.456, 'finished': 125.678},
                   'init-local': {'start': 123.45, 'finished': 123.46}}}

        self.sleep_calls = 0

        def fake_sleep(interval):
            self.assertEqual(0.25, interval)
            self.sleep_calls += 1
            if self.sleep_calls == 2:
                write_json(self.status_file, running_json)
            elif self.sleep_calls == 3:
                write_json(self.status_file, done_json)
                result_file = self.tmp_path('result.json', self.new_root)
                ensure_file(result_file)

        cmdargs = myargs(long=False, wait=True)
        with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
            retcode = wrap_and_call(
                'cloudinit.cmd.status',
                {'sleep': {'side_effect': fake_sleep},
                 '_is_cloudinit_disabled': (False, ''),
                 'Init': {'side_effect': self.init_class}},
                status.handle_status_args, 'ignored', cmdargs)
        self.assertEqual(0, retcode)
        self.assertEqual(4, self.sleep_calls)
        self.assertEqual('....\nstatus: done\n', m_stdout.getvalue())

    def test_status_wait_blocks_until_error(self):
        '''Specifying wait will poll every 1/4 second until error state.'''
        running_json = {
            'v1': {'stage': 'init',
                   'init': {'start': 124.456, 'finished': None},
                   'init-local': {'start': 123.45, 'finished': 123.46}}}
        error_json = {
            'v1': {'stage': None,
                   'init': {'errors': ['error1'], 'start': 124.456,
                            'finished': 125.678},
                   'init-local': {'start': 123.45, 'finished': 123.46}}}

        self.sleep_calls = 0

        def fake_sleep(interval):
            self.assertEqual(0.25, interval)
            self.sleep_calls += 1
            if self.sleep_calls == 2:
                write_json(self.status_file, running_json)
            elif self.sleep_calls == 3:
                write_json(self.status_file, error_json)

        cmdargs = myargs(long=False, wait=True)
        with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
            retcode = wrap_and_call(
                'cloudinit.cmd.status',
                {'sleep': {'side_effect': fake_sleep},
                 '_is_cloudinit_disabled': (False, ''),
                 'Init': {'side_effect': self.init_class}},
                status.handle_status_args, 'ignored', cmdargs)
        self.assertEqual(1, retcode)
        self.assertEqual(4, self.sleep_calls)
        self.assertEqual('....\nstatus: error\n', m_stdout.getvalue())

    def test_status_main(self):
        '''status.main can be run as a standalone script.'''
        write_json(self.status_file,
                   {'v1': {'init': {'start': 1, 'finished': None}}})
        with self.assertRaises(SystemExit) as context_manager:
            with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
                wrap_and_call(
                    'cloudinit.cmd.status',
                    {'sys.argv': {'new': ['status']},
                     '_is_cloudinit_disabled': (False, ''),
                     'Init': {'side_effect': self.init_class}},
                    status.main)
        self.assertEqual(0, context_manager.exception.code)
        self.assertEqual('status: running\n', m_stdout.getvalue())

# vi: ts=4 expandtab syntax=python