Blob Blame History Raw
# This file is part of cloud-init. See LICENSE file for license information.

from cloudinit.cmd import clean
from cloudinit.util import ensure_dir, sym_link, write_file
from cloudinit.tests.helpers import CiTestCase, wrap_and_call, mock
from collections import namedtuple
import os
from io import StringIO

mypaths = namedtuple('MyPaths', 'cloud_dir')


class TestClean(CiTestCase):

    def setUp(self):
        super(TestClean, self).setUp()
        self.new_root = self.tmp_dir()
        self.artifact_dir = self.tmp_path('artifacts', self.new_root)
        self.log1 = self.tmp_path('cloud-init.log', self.new_root)
        self.log2 = self.tmp_path('cloud-init-output.log', self.new_root)

        class FakeInit(object):
            cfg = {'def_log_file': self.log1,
                   'output': {'all': '|tee -a {0}'.format(self.log2)}}
            # Ensure cloud_dir has a trailing slash, to match real behaviour
            paths = mypaths(cloud_dir='{}/'.format(self.artifact_dir))

            def __init__(self, ds_deps):
                pass

            def read_cfg(self):
                pass

        self.init_class = FakeInit

    def test_remove_artifacts_removes_logs(self):
        """remove_artifacts removes logs when remove_logs is True."""
        write_file(self.log1, 'cloud-init-log')
        write_file(self.log2, 'cloud-init-output-log')

        self.assertFalse(
            os.path.exists(self.artifact_dir), 'Unexpected artifacts dir')
        retcode = wrap_and_call(
            'cloudinit.cmd.clean',
            {'Init': {'side_effect': self.init_class}},
            clean.remove_artifacts, remove_logs=True)
        self.assertFalse(os.path.exists(self.log1), 'Unexpected file')
        self.assertFalse(os.path.exists(self.log2), 'Unexpected file')
        self.assertEqual(0, retcode)

    def test_remove_artifacts_preserves_logs(self):
        """remove_artifacts leaves logs when remove_logs is False."""
        write_file(self.log1, 'cloud-init-log')
        write_file(self.log2, 'cloud-init-output-log')

        retcode = wrap_and_call(
            'cloudinit.cmd.clean',
            {'Init': {'side_effect': self.init_class}},
            clean.remove_artifacts, remove_logs=False)
        self.assertTrue(os.path.exists(self.log1), 'Missing expected file')
        self.assertTrue(os.path.exists(self.log2), 'Missing expected file')
        self.assertEqual(0, retcode)

    def test_remove_artifacts_removes_unlinks_symlinks(self):
        """remove_artifacts cleans artifacts dir unlinking any symlinks."""
        dir1 = os.path.join(self.artifact_dir, 'dir1')
        ensure_dir(dir1)
        symlink = os.path.join(self.artifact_dir, 'mylink')
        sym_link(dir1, symlink)

        retcode = wrap_and_call(
            'cloudinit.cmd.clean',
            {'Init': {'side_effect': self.init_class}},
            clean.remove_artifacts, remove_logs=False)
        self.assertEqual(0, retcode)
        for path in (dir1, symlink):
            self.assertFalse(
                os.path.exists(path),
                'Unexpected {0} dir'.format(path))

    def test_remove_artifacts_removes_artifacts_skipping_seed(self):
        """remove_artifacts cleans artifacts dir with exception of seed dir."""
        dirs = [
            self.artifact_dir,
            os.path.join(self.artifact_dir, 'seed'),
            os.path.join(self.artifact_dir, 'dir1'),
            os.path.join(self.artifact_dir, 'dir2')]
        for _dir in dirs:
            ensure_dir(_dir)

        retcode = wrap_and_call(
            'cloudinit.cmd.clean',
            {'Init': {'side_effect': self.init_class}},
            clean.remove_artifacts, remove_logs=False)
        self.assertEqual(0, retcode)
        for expected_dir in dirs[:2]:
            self.assertTrue(
                os.path.exists(expected_dir),
                'Missing {0} dir'.format(expected_dir))
        for deleted_dir in dirs[2:]:
            self.assertFalse(
                os.path.exists(deleted_dir),
                'Unexpected {0} dir'.format(deleted_dir))

    def test_remove_artifacts_removes_artifacts_removes_seed(self):
        """remove_artifacts removes seed dir when remove_seed is True."""
        dirs = [
            self.artifact_dir,
            os.path.join(self.artifact_dir, 'seed'),
            os.path.join(self.artifact_dir, 'dir1'),
            os.path.join(self.artifact_dir, 'dir2')]
        for _dir in dirs:
            ensure_dir(_dir)

        retcode = wrap_and_call(
            'cloudinit.cmd.clean',
            {'Init': {'side_effect': self.init_class}},
            clean.remove_artifacts, remove_logs=False, remove_seed=True)
        self.assertEqual(0, retcode)
        self.assertTrue(
            os.path.exists(self.artifact_dir), 'Missing artifact dir')
        for deleted_dir in dirs[1:]:
            self.assertFalse(
                os.path.exists(deleted_dir),
                'Unexpected {0} dir'.format(deleted_dir))

    def test_remove_artifacts_returns_one_on_errors(self):
        """remove_artifacts returns non-zero on failure and prints an error."""
        ensure_dir(self.artifact_dir)
        ensure_dir(os.path.join(self.artifact_dir, 'dir1'))

        with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr:
            retcode = wrap_and_call(
                'cloudinit.cmd.clean',
                {'del_dir': {'side_effect': OSError('oops')},
                 'Init': {'side_effect': self.init_class}},
                clean.remove_artifacts, remove_logs=False)
        self.assertEqual(1, retcode)
        self.assertEqual(
            'ERROR: Could not remove %s/dir1: oops\n' % self.artifact_dir,
            m_stderr.getvalue())

    def test_handle_clean_args_reboots(self):
        """handle_clean_args_reboots when reboot arg is provided."""

        called_cmds = []

        def fake_subp(cmd, capture):
            called_cmds.append((cmd, capture))
            return '', ''

        myargs = namedtuple('MyArgs', 'remove_logs remove_seed reboot')
        cmdargs = myargs(remove_logs=False, remove_seed=False, reboot=True)
        retcode = wrap_and_call(
            'cloudinit.cmd.clean',
            {'subp': {'side_effect': fake_subp},
             'Init': {'side_effect': self.init_class}},
            clean.handle_clean_args, name='does not matter', args=cmdargs)
        self.assertEqual(0, retcode)
        self.assertEqual(
            [(['shutdown', '-r', 'now'], False)], called_cmds)

    def test_status_main(self):
        '''clean.main can be run as a standalone script.'''
        write_file(self.log1, 'cloud-init-log')
        with self.assertRaises(SystemExit) as context_manager:
            wrap_and_call(
                'cloudinit.cmd.clean',
                {'Init': {'side_effect': self.init_class},
                 'sys.argv': {'new': ['clean', '--logs']}},
                clean.main)

        self.assertEqual(0, context_manager.exception.code)
        self.assertFalse(
            os.path.exists(self.log1), 'Unexpected log {0}'.format(self.log1))


# vi: ts=4 expandtab syntax=python