# This file is part of cloud-init. See LICENSE file for license information.
from collections import namedtuple
import os
from io import StringIO
from textwrap import dedent
from cloudinit.atomic_helper import write_json
from cloudinit.cmd import status
from cloudinit.util import ensure_file
from cloudinit.tests.helpers import CiTestCase, wrap_and_call, mock
mypaths = namedtuple('MyPaths', 'run_dir')
myargs = namedtuple('MyArgs', 'long wait')
class TestStatus(CiTestCase):
def setUp(self):
super(TestStatus, self).setUp()
self.new_root = self.tmp_dir()
self.status_file = self.tmp_path('status.json', self.new_root)
self.disable_file = self.tmp_path('cloudinit-disable', self.new_root)
self.paths = mypaths(run_dir=self.new_root)
class FakeInit(object):
paths = self.paths
def __init__(self, ds_deps):
pass
def read_cfg(self):
pass
self.init_class = FakeInit
def test__is_cloudinit_disabled_false_on_sysvinit(self):
'''When not in an environment using systemd, return False.'''
ensure_file(self.disable_file) # Create the ignored disable file
(is_disabled, reason) = wrap_and_call(
'cloudinit.cmd.status',
{'uses_systemd': False,
'get_cmdline': "root=/dev/my-root not-important"},
status._is_cloudinit_disabled, self.disable_file, self.paths)
self.assertFalse(
is_disabled, 'expected enabled cloud-init on sysvinit')
self.assertEqual('Cloud-init enabled on sysvinit', reason)
def test__is_cloudinit_disabled_true_on_disable_file(self):
'''When using systemd and disable_file is present return disabled.'''
ensure_file(self.disable_file) # Create observed disable file
(is_disabled, reason) = wrap_and_call(
'cloudinit.cmd.status',
{'uses_systemd': True,
'get_cmdline': "root=/dev/my-root not-important"},
status._is_cloudinit_disabled, self.disable_file, self.paths)
self.assertTrue(is_disabled, 'expected disabled cloud-init')
self.assertEqual(
'Cloud-init disabled by {0}'.format(self.disable_file), reason)
def test__is_cloudinit_disabled_false_on_kernel_cmdline_enable(self):
'''Not disabled when using systemd and enabled via commandline.'''
ensure_file(self.disable_file) # Create ignored disable file
(is_disabled, reason) = wrap_and_call(
'cloudinit.cmd.status',
{'uses_systemd': True,
'get_cmdline': 'something cloud-init=enabled else'},
status._is_cloudinit_disabled, self.disable_file, self.paths)
self.assertFalse(is_disabled, 'expected enabled cloud-init')
self.assertEqual(
'Cloud-init enabled by kernel command line cloud-init=enabled',
reason)
def test__is_cloudinit_disabled_true_on_kernel_cmdline(self):
'''When using systemd and disable_file is present return disabled.'''
(is_disabled, reason) = wrap_and_call(
'cloudinit.cmd.status',
{'uses_systemd': True,
'get_cmdline': 'something cloud-init=disabled else'},
status._is_cloudinit_disabled, self.disable_file, self.paths)
self.assertTrue(is_disabled, 'expected disabled cloud-init')
self.assertEqual(
'Cloud-init disabled by kernel parameter cloud-init=disabled',
reason)
def test__is_cloudinit_disabled_true_when_generator_disables(self):
'''When cloud-init-generator doesn't write enabled file return True.'''
enabled_file = os.path.join(self.paths.run_dir, 'enabled')
self.assertFalse(os.path.exists(enabled_file))
(is_disabled, reason) = wrap_and_call(
'cloudinit.cmd.status',
{'uses_systemd': True,
'get_cmdline': 'something'},
status._is_cloudinit_disabled, self.disable_file, self.paths)
self.assertTrue(is_disabled, 'expected disabled cloud-init')
self.assertEqual('Cloud-init disabled by cloud-init-generator', reason)
def test__is_cloudinit_disabled_false_when_enabled_in_systemd(self):
'''Report enabled when systemd generator creates the enabled file.'''
enabled_file = os.path.join(self.paths.run_dir, 'enabled')
ensure_file(enabled_file)
(is_disabled, reason) = wrap_and_call(
'cloudinit.cmd.status',
{'uses_systemd': True,
'get_cmdline': 'something ignored'},
status._is_cloudinit_disabled, self.disable_file, self.paths)
self.assertFalse(is_disabled, 'expected enabled cloud-init')
self.assertEqual(
'Cloud-init enabled by systemd cloud-init-generator', reason)
def test_status_returns_not_run(self):
'''When status.json does not exist yet, return 'not run'.'''
self.assertFalse(
os.path.exists(self.status_file), 'Unexpected status.json found')
cmdargs = myargs(long=False, wait=False)
with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
retcode = wrap_and_call(
'cloudinit.cmd.status',
{'_is_cloudinit_disabled': (False, ''),
'Init': {'side_effect': self.init_class}},
status.handle_status_args, 'ignored', cmdargs)
self.assertEqual(0, retcode)
self.assertEqual('status: not run\n', m_stdout.getvalue())
def test_status_returns_disabled_long_on_presence_of_disable_file(self):
'''When cloudinit is disabled, return disabled reason.'''
checked_files = []
def fakeexists(filepath):
checked_files.append(filepath)
status_file = os.path.join(self.paths.run_dir, 'status.json')
return bool(not filepath == status_file)
cmdargs = myargs(long=True, wait=False)
with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
retcode = wrap_and_call(
'cloudinit.cmd.status',
{'os.path.exists': {'side_effect': fakeexists},
'_is_cloudinit_disabled': (True, 'disabled for some reason'),
'Init': {'side_effect': self.init_class}},
status.handle_status_args, 'ignored', cmdargs)
self.assertEqual(0, retcode)
self.assertEqual(
[os.path.join(self.paths.run_dir, 'status.json')],
checked_files)
expected = dedent('''\
status: disabled
detail:
disabled for some reason
''')
self.assertEqual(expected, m_stdout.getvalue())
def test_status_returns_running_on_no_results_json(self):
'''Report running when status.json exists but result.json does not.'''
result_file = self.tmp_path('result.json', self.new_root)
write_json(self.status_file, {})
self.assertFalse(
os.path.exists(result_file), 'Unexpected result.json found')
cmdargs = myargs(long=False, wait=False)
with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
retcode = wrap_and_call(
'cloudinit.cmd.status',
{'_is_cloudinit_disabled': (False, ''),
'Init': {'side_effect': self.init_class}},
status.handle_status_args, 'ignored', cmdargs)
self.assertEqual(0, retcode)
self.assertEqual('status: running\n', m_stdout.getvalue())
def test_status_returns_running(self):
'''Report running when status exists with an unfinished stage.'''
ensure_file(self.tmp_path('result.json', self.new_root))
write_json(self.status_file,
{'v1': {'init': {'start': 1, 'finished': None}}})
cmdargs = myargs(long=False, wait=False)
with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
retcode = wrap_and_call(
'cloudinit.cmd.status',
{'_is_cloudinit_disabled': (False, ''),
'Init': {'side_effect': self.init_class}},
status.handle_status_args, 'ignored', cmdargs)
self.assertEqual(0, retcode)
self.assertEqual('status: running\n', m_stdout.getvalue())
def test_status_returns_done(self):
'''Report done results.json exists no stages are unfinished.'''
ensure_file(self.tmp_path('result.json', self.new_root))
write_json(
self.status_file,
{'v1': {'stage': None, # No current stage running
'datasource': (
'DataSourceNoCloud [seed=/var/.../seed/nocloud-net]'
'[dsmode=net]'),
'blah': {'finished': 123.456},
'init': {'errors': [], 'start': 124.567,
'finished': 125.678},
'init-local': {'start': 123.45, 'finished': 123.46}}})
cmdargs = myargs(long=False, wait=False)
with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
retcode = wrap_and_call(
'cloudinit.cmd.status',
{'_is_cloudinit_disabled': (False, ''),
'Init': {'side_effect': self.init_class}},
status.handle_status_args, 'ignored', cmdargs)
self.assertEqual(0, retcode)
self.assertEqual('status: done\n', m_stdout.getvalue())
def test_status_returns_done_long(self):
'''Long format of done status includes datasource info.'''
ensure_file(self.tmp_path('result.json', self.new_root))
write_json(
self.status_file,
{'v1': {'stage': None,
'datasource': (
'DataSourceNoCloud [seed=/var/.../seed/nocloud-net]'
'[dsmode=net]'),
'init': {'start': 124.567, 'finished': 125.678},
'init-local': {'start': 123.45, 'finished': 123.46}}})
cmdargs = myargs(long=True, wait=False)
with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
retcode = wrap_and_call(
'cloudinit.cmd.status',
{'_is_cloudinit_disabled': (False, ''),
'Init': {'side_effect': self.init_class}},
status.handle_status_args, 'ignored', cmdargs)
self.assertEqual(0, retcode)
expected = dedent('''\
status: done
time: Thu, 01 Jan 1970 00:02:05 +0000
detail:
DataSourceNoCloud [seed=/var/.../seed/nocloud-net][dsmode=net]
''')
self.assertEqual(expected, m_stdout.getvalue())
def test_status_on_errors(self):
'''Reports error when any stage has errors.'''
write_json(
self.status_file,
{'v1': {'stage': None,
'blah': {'errors': [], 'finished': 123.456},
'init': {'errors': ['error1'], 'start': 124.567,
'finished': 125.678},
'init-local': {'start': 123.45, 'finished': 123.46}}})
cmdargs = myargs(long=False, wait=False)
with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
retcode = wrap_and_call(
'cloudinit.cmd.status',
{'_is_cloudinit_disabled': (False, ''),
'Init': {'side_effect': self.init_class}},
status.handle_status_args, 'ignored', cmdargs)
self.assertEqual(1, retcode)
self.assertEqual('status: error\n', m_stdout.getvalue())
def test_status_on_errors_long(self):
'''Long format of error status includes all error messages.'''
write_json(
self.status_file,
{'v1': {'stage': None,
'datasource': (
'DataSourceNoCloud [seed=/var/.../seed/nocloud-net]'
'[dsmode=net]'),
'init': {'errors': ['error1'], 'start': 124.567,
'finished': 125.678},
'init-local': {'errors': ['error2', 'error3'],
'start': 123.45, 'finished': 123.46}}})
cmdargs = myargs(long=True, wait=False)
with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
retcode = wrap_and_call(
'cloudinit.cmd.status',
{'_is_cloudinit_disabled': (False, ''),
'Init': {'side_effect': self.init_class}},
status.handle_status_args, 'ignored', cmdargs)
self.assertEqual(1, retcode)
expected = dedent('''\
status: error
time: Thu, 01 Jan 1970 00:02:05 +0000
detail:
error1
error2
error3
''')
self.assertEqual(expected, m_stdout.getvalue())
def test_status_returns_running_long_format(self):
'''Long format reports the stage in which we are running.'''
write_json(
self.status_file,
{'v1': {'stage': 'init',
'init': {'start': 124.456, 'finished': None},
'init-local': {'start': 123.45, 'finished': 123.46}}})
cmdargs = myargs(long=True, wait=False)
with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
retcode = wrap_and_call(
'cloudinit.cmd.status',
{'_is_cloudinit_disabled': (False, ''),
'Init': {'side_effect': self.init_class}},
status.handle_status_args, 'ignored', cmdargs)
self.assertEqual(0, retcode)
expected = dedent('''\
status: running
time: Thu, 01 Jan 1970 00:02:04 +0000
detail:
Running in stage: init
''')
self.assertEqual(expected, m_stdout.getvalue())
def test_status_wait_blocks_until_done(self):
'''Specifying wait will poll every 1/4 second until done state.'''
running_json = {
'v1': {'stage': 'init',
'init': {'start': 124.456, 'finished': None},
'init-local': {'start': 123.45, 'finished': 123.46}}}
done_json = {
'v1': {'stage': None,
'init': {'start': 124.456, 'finished': 125.678},
'init-local': {'start': 123.45, 'finished': 123.46}}}
self.sleep_calls = 0
def fake_sleep(interval):
self.assertEqual(0.25, interval)
self.sleep_calls += 1
if self.sleep_calls == 2:
write_json(self.status_file, running_json)
elif self.sleep_calls == 3:
write_json(self.status_file, done_json)
result_file = self.tmp_path('result.json', self.new_root)
ensure_file(result_file)
cmdargs = myargs(long=False, wait=True)
with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
retcode = wrap_and_call(
'cloudinit.cmd.status',
{'sleep': {'side_effect': fake_sleep},
'_is_cloudinit_disabled': (False, ''),
'Init': {'side_effect': self.init_class}},
status.handle_status_args, 'ignored', cmdargs)
self.assertEqual(0, retcode)
self.assertEqual(4, self.sleep_calls)
self.assertEqual('....\nstatus: done\n', m_stdout.getvalue())
def test_status_wait_blocks_until_error(self):
'''Specifying wait will poll every 1/4 second until error state.'''
running_json = {
'v1': {'stage': 'init',
'init': {'start': 124.456, 'finished': None},
'init-local': {'start': 123.45, 'finished': 123.46}}}
error_json = {
'v1': {'stage': None,
'init': {'errors': ['error1'], 'start': 124.456,
'finished': 125.678},
'init-local': {'start': 123.45, 'finished': 123.46}}}
self.sleep_calls = 0
def fake_sleep(interval):
self.assertEqual(0.25, interval)
self.sleep_calls += 1
if self.sleep_calls == 2:
write_json(self.status_file, running_json)
elif self.sleep_calls == 3:
write_json(self.status_file, error_json)
cmdargs = myargs(long=False, wait=True)
with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
retcode = wrap_and_call(
'cloudinit.cmd.status',
{'sleep': {'side_effect': fake_sleep},
'_is_cloudinit_disabled': (False, ''),
'Init': {'side_effect': self.init_class}},
status.handle_status_args, 'ignored', cmdargs)
self.assertEqual(1, retcode)
self.assertEqual(4, self.sleep_calls)
self.assertEqual('....\nstatus: error\n', m_stdout.getvalue())
def test_status_main(self):
'''status.main can be run as a standalone script.'''
write_json(self.status_file,
{'v1': {'init': {'start': 1, 'finished': None}}})
with self.assertRaises(SystemExit) as context_manager:
with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
wrap_and_call(
'cloudinit.cmd.status',
{'sys.argv': {'new': ['status']},
'_is_cloudinit_disabled': (False, ''),
'Init': {'side_effect': self.init_class}},
status.main)
self.assertEqual(0, context_manager.exception.code)
self.assertEqual('status: running\n', m_stdout.getvalue())
# vi: ts=4 expandtab syntax=python