|
Packit |
bc9a3a |
# This file is part of cloud-init. See LICENSE file for license information.
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
from cloudinit.cmd import clean
|
|
Packit |
bc9a3a |
from cloudinit.util import ensure_dir, sym_link, write_file
|
|
Packit |
bc9a3a |
from cloudinit.tests.helpers import CiTestCase, wrap_and_call, mock
|
|
Packit |
bc9a3a |
from collections import namedtuple
|
|
Packit |
bc9a3a |
import os
|
|
Packit |
bc9a3a |
from six import StringIO
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
mypaths = namedtuple('MyPaths', 'cloud_dir')
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
class TestClean(CiTestCase):
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
def setUp(self):
|
|
Packit |
bc9a3a |
super(TestClean, self).setUp()
|
|
Packit |
bc9a3a |
self.new_root = self.tmp_dir()
|
|
Packit |
bc9a3a |
self.artifact_dir = self.tmp_path('artifacts', self.new_root)
|
|
Packit |
bc9a3a |
self.log1 = self.tmp_path('cloud-init.log', self.new_root)
|
|
Packit |
bc9a3a |
self.log2 = self.tmp_path('cloud-init-output.log', self.new_root)
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
class FakeInit(object):
|
|
Packit |
bc9a3a |
cfg = {'def_log_file': self.log1,
|
|
Packit |
bc9a3a |
'output': {'all': '|tee -a {0}'.format(self.log2)}}
|
|
Packit |
bc9a3a |
# Ensure cloud_dir has a trailing slash, to match real behaviour
|
|
Packit |
bc9a3a |
paths = mypaths(cloud_dir='{}/'.format(self.artifact_dir))
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
def __init__(self, ds_deps):
|
|
Packit |
bc9a3a |
pass
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
def read_cfg(self):
|
|
Packit |
bc9a3a |
pass
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
self.init_class = FakeInit
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
def test_remove_artifacts_removes_logs(self):
|
|
Packit |
bc9a3a |
"""remove_artifacts removes logs when remove_logs is True."""
|
|
Packit |
bc9a3a |
write_file(self.log1, 'cloud-init-log')
|
|
Packit |
bc9a3a |
write_file(self.log2, 'cloud-init-output-log')
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
self.assertFalse(
|
|
Packit |
bc9a3a |
os.path.exists(self.artifact_dir), 'Unexpected artifacts dir')
|
|
Packit |
bc9a3a |
retcode = wrap_and_call(
|
|
Packit |
bc9a3a |
'cloudinit.cmd.clean',
|
|
Packit |
bc9a3a |
{'Init': {'side_effect': self.init_class}},
|
|
Packit |
bc9a3a |
clean.remove_artifacts, remove_logs=True)
|
|
Packit |
bc9a3a |
self.assertFalse(os.path.exists(self.log1), 'Unexpected file')
|
|
Packit |
bc9a3a |
self.assertFalse(os.path.exists(self.log2), 'Unexpected file')
|
|
Packit |
bc9a3a |
self.assertEqual(0, retcode)
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
def test_remove_artifacts_preserves_logs(self):
|
|
Packit |
bc9a3a |
"""remove_artifacts leaves logs when remove_logs is False."""
|
|
Packit |
bc9a3a |
write_file(self.log1, 'cloud-init-log')
|
|
Packit |
bc9a3a |
write_file(self.log2, 'cloud-init-output-log')
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
retcode = wrap_and_call(
|
|
Packit |
bc9a3a |
'cloudinit.cmd.clean',
|
|
Packit |
bc9a3a |
{'Init': {'side_effect': self.init_class}},
|
|
Packit |
bc9a3a |
clean.remove_artifacts, remove_logs=False)
|
|
Packit |
bc9a3a |
self.assertTrue(os.path.exists(self.log1), 'Missing expected file')
|
|
Packit |
bc9a3a |
self.assertTrue(os.path.exists(self.log2), 'Missing expected file')
|
|
Packit |
bc9a3a |
self.assertEqual(0, retcode)
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
def test_remove_artifacts_removes_unlinks_symlinks(self):
|
|
Packit |
bc9a3a |
"""remove_artifacts cleans artifacts dir unlinking any symlinks."""
|
|
Packit |
bc9a3a |
dir1 = os.path.join(self.artifact_dir, 'dir1')
|
|
Packit |
bc9a3a |
ensure_dir(dir1)
|
|
Packit |
bc9a3a |
symlink = os.path.join(self.artifact_dir, 'mylink')
|
|
Packit |
bc9a3a |
sym_link(dir1, symlink)
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
retcode = wrap_and_call(
|
|
Packit |
bc9a3a |
'cloudinit.cmd.clean',
|
|
Packit |
bc9a3a |
{'Init': {'side_effect': self.init_class}},
|
|
Packit |
bc9a3a |
clean.remove_artifacts, remove_logs=False)
|
|
Packit |
bc9a3a |
self.assertEqual(0, retcode)
|
|
Packit |
bc9a3a |
for path in (dir1, symlink):
|
|
Packit |
bc9a3a |
self.assertFalse(
|
|
Packit |
bc9a3a |
os.path.exists(path),
|
|
Packit |
bc9a3a |
'Unexpected {0} dir'.format(path))
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
def test_remove_artifacts_removes_artifacts_skipping_seed(self):
|
|
Packit |
bc9a3a |
"""remove_artifacts cleans artifacts dir with exception of seed dir."""
|
|
Packit |
bc9a3a |
dirs = [
|
|
Packit |
bc9a3a |
self.artifact_dir,
|
|
Packit |
bc9a3a |
os.path.join(self.artifact_dir, 'seed'),
|
|
Packit |
bc9a3a |
os.path.join(self.artifact_dir, 'dir1'),
|
|
Packit |
bc9a3a |
os.path.join(self.artifact_dir, 'dir2')]
|
|
Packit |
bc9a3a |
for _dir in dirs:
|
|
Packit |
bc9a3a |
ensure_dir(_dir)
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
retcode = wrap_and_call(
|
|
Packit |
bc9a3a |
'cloudinit.cmd.clean',
|
|
Packit |
bc9a3a |
{'Init': {'side_effect': self.init_class}},
|
|
Packit |
bc9a3a |
clean.remove_artifacts, remove_logs=False)
|
|
Packit |
bc9a3a |
self.assertEqual(0, retcode)
|
|
Packit |
bc9a3a |
for expected_dir in dirs[:2]:
|
|
Packit |
bc9a3a |
self.assertTrue(
|
|
Packit |
bc9a3a |
os.path.exists(expected_dir),
|
|
Packit |
bc9a3a |
'Missing {0} dir'.format(expected_dir))
|
|
Packit |
bc9a3a |
for deleted_dir in dirs[2:]:
|
|
Packit |
bc9a3a |
self.assertFalse(
|
|
Packit |
bc9a3a |
os.path.exists(deleted_dir),
|
|
Packit |
bc9a3a |
'Unexpected {0} dir'.format(deleted_dir))
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
def test_remove_artifacts_removes_artifacts_removes_seed(self):
|
|
Packit |
bc9a3a |
"""remove_artifacts removes seed dir when remove_seed is True."""
|
|
Packit |
bc9a3a |
dirs = [
|
|
Packit |
bc9a3a |
self.artifact_dir,
|
|
Packit |
bc9a3a |
os.path.join(self.artifact_dir, 'seed'),
|
|
Packit |
bc9a3a |
os.path.join(self.artifact_dir, 'dir1'),
|
|
Packit |
bc9a3a |
os.path.join(self.artifact_dir, 'dir2')]
|
|
Packit |
bc9a3a |
for _dir in dirs:
|
|
Packit |
bc9a3a |
ensure_dir(_dir)
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
retcode = wrap_and_call(
|
|
Packit |
bc9a3a |
'cloudinit.cmd.clean',
|
|
Packit |
bc9a3a |
{'Init': {'side_effect': self.init_class}},
|
|
Packit |
bc9a3a |
clean.remove_artifacts, remove_logs=False, remove_seed=True)
|
|
Packit |
bc9a3a |
self.assertEqual(0, retcode)
|
|
Packit |
bc9a3a |
self.assertTrue(
|
|
Packit |
bc9a3a |
os.path.exists(self.artifact_dir), 'Missing artifact dir')
|
|
Packit |
bc9a3a |
for deleted_dir in dirs[1:]:
|
|
Packit |
bc9a3a |
self.assertFalse(
|
|
Packit |
bc9a3a |
os.path.exists(deleted_dir),
|
|
Packit |
bc9a3a |
'Unexpected {0} dir'.format(deleted_dir))
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
def test_remove_artifacts_returns_one_on_errors(self):
|
|
Packit |
bc9a3a |
"""remove_artifacts returns non-zero on failure and prints an error."""
|
|
Packit |
bc9a3a |
ensure_dir(self.artifact_dir)
|
|
Packit |
bc9a3a |
ensure_dir(os.path.join(self.artifact_dir, 'dir1'))
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr:
|
|
Packit |
bc9a3a |
retcode = wrap_and_call(
|
|
Packit |
bc9a3a |
'cloudinit.cmd.clean',
|
|
Packit |
bc9a3a |
{'del_dir': {'side_effect': OSError('oops')},
|
|
Packit |
bc9a3a |
'Init': {'side_effect': self.init_class}},
|
|
Packit |
bc9a3a |
clean.remove_artifacts, remove_logs=False)
|
|
Packit |
bc9a3a |
self.assertEqual(1, retcode)
|
|
Packit |
bc9a3a |
self.assertEqual(
|
|
Packit |
bc9a3a |
'ERROR: Could not remove %s/dir1: oops\n' % self.artifact_dir,
|
|
Packit |
bc9a3a |
m_stderr.getvalue())
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
def test_handle_clean_args_reboots(self):
|
|
Packit |
bc9a3a |
"""handle_clean_args_reboots when reboot arg is provided."""
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
called_cmds = []
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
def fake_subp(cmd, capture):
|
|
Packit |
bc9a3a |
called_cmds.append((cmd, capture))
|
|
Packit |
bc9a3a |
return '', ''
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
myargs = namedtuple('MyArgs', 'remove_logs remove_seed reboot')
|
|
Packit |
bc9a3a |
cmdargs = myargs(remove_logs=False, remove_seed=False, reboot=True)
|
|
Packit |
bc9a3a |
retcode = wrap_and_call(
|
|
Packit |
bc9a3a |
'cloudinit.cmd.clean',
|
|
Packit |
bc9a3a |
{'subp': {'side_effect': fake_subp},
|
|
Packit |
bc9a3a |
'Init': {'side_effect': self.init_class}},
|
|
Packit |
bc9a3a |
clean.handle_clean_args, name='does not matter', args=cmdargs)
|
|
Packit |
bc9a3a |
self.assertEqual(0, retcode)
|
|
Packit |
bc9a3a |
self.assertEqual(
|
|
Packit |
bc9a3a |
[(['shutdown', '-r', 'now'], False)], called_cmds)
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
def test_status_main(self):
|
|
Packit |
bc9a3a |
'''clean.main can be run as a standalone script.'''
|
|
Packit |
bc9a3a |
write_file(self.log1, 'cloud-init-log')
|
|
Packit |
bc9a3a |
with self.assertRaises(SystemExit) as context_manager:
|
|
Packit |
bc9a3a |
wrap_and_call(
|
|
Packit |
bc9a3a |
'cloudinit.cmd.clean',
|
|
Packit |
bc9a3a |
{'Init': {'side_effect': self.init_class},
|
|
Packit |
bc9a3a |
'sys.exit': {'side_effect': self.sys_exit},
|
|
Packit |
bc9a3a |
'sys.argv': {'new': ['clean', '--logs']}},
|
|
Packit |
bc9a3a |
clean.main)
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
self.assertEqual(0, context_manager.exception.code)
|
|
Packit |
bc9a3a |
self.assertFalse(
|
|
Packit |
bc9a3a |
os.path.exists(self.log1), 'Unexpected log {0}'.format(self.log1))
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
|
|
Packit |
bc9a3a |
# vi: ts=4 expandtab syntax=python
|