Blame tests/unittests/test_handler/test_handler_mounts.py

Packit bc9a3a
# This file is part of cloud-init. See LICENSE file for license information.
Packit bc9a3a
Packit bc9a3a
import os.path
Packit bc9a3a
Packit bc9a3a
from cloudinit.config import cc_mounts
Packit bc9a3a
Packit bc9a3a
from cloudinit.tests import helpers as test_helpers
Packit bc9a3a
Packit bc9a3a
try:
Packit bc9a3a
    from unittest import mock
Packit bc9a3a
except ImportError:
Packit bc9a3a
    import mock
Packit bc9a3a
Packit bc9a3a
Packit bc9a3a
class TestSanitizeDevname(test_helpers.FilesystemMockingTestCase):
Packit bc9a3a
Packit bc9a3a
    def setUp(self):
Packit bc9a3a
        super(TestSanitizeDevname, self).setUp()
Packit bc9a3a
        self.new_root = self.tmp_dir()
Packit bc9a3a
        self.patchOS(self.new_root)
Packit bc9a3a
Packit bc9a3a
    def _touch(self, path):
Packit bc9a3a
        path = os.path.join(self.new_root, path.lstrip('/'))
Packit bc9a3a
        basedir = os.path.dirname(path)
Packit bc9a3a
        if not os.path.exists(basedir):
Packit bc9a3a
            os.makedirs(basedir)
Packit bc9a3a
        open(path, 'a').close()
Packit bc9a3a
Packit bc9a3a
    def _makedirs(self, directory):
Packit bc9a3a
        directory = os.path.join(self.new_root, directory.lstrip('/'))
Packit bc9a3a
        if not os.path.exists(directory):
Packit bc9a3a
            os.makedirs(directory)
Packit bc9a3a
Packit bc9a3a
    def mock_existence_of_disk(self, disk_path):
Packit bc9a3a
        self._touch(disk_path)
Packit bc9a3a
        self._makedirs(os.path.join('/sys/block', disk_path.split('/')[-1]))
Packit bc9a3a
Packit bc9a3a
    def mock_existence_of_partition(self, disk_path, partition_number):
Packit bc9a3a
        self.mock_existence_of_disk(disk_path)
Packit bc9a3a
        self._touch(disk_path + str(partition_number))
Packit bc9a3a
        disk_name = disk_path.split('/')[-1]
Packit bc9a3a
        self._makedirs(os.path.join('/sys/block',
Packit bc9a3a
                                    disk_name,
Packit bc9a3a
                                    disk_name + str(partition_number)))
Packit bc9a3a
Packit bc9a3a
    def test_existent_full_disk_path_is_returned(self):
Packit bc9a3a
        disk_path = '/dev/sda'
Packit bc9a3a
        self.mock_existence_of_disk(disk_path)
Packit bc9a3a
        self.assertEqual(disk_path,
Packit bc9a3a
                         cc_mounts.sanitize_devname(disk_path,
Packit bc9a3a
                                                    lambda x: None,
Packit bc9a3a
                                                    mock.Mock()))
Packit bc9a3a
Packit bc9a3a
    def test_existent_disk_name_returns_full_path(self):
Packit bc9a3a
        disk_name = 'sda'
Packit bc9a3a
        disk_path = '/dev/' + disk_name
Packit bc9a3a
        self.mock_existence_of_disk(disk_path)
Packit bc9a3a
        self.assertEqual(disk_path,
Packit bc9a3a
                         cc_mounts.sanitize_devname(disk_name,
Packit bc9a3a
                                                    lambda x: None,
Packit bc9a3a
                                                    mock.Mock()))
Packit bc9a3a
Packit bc9a3a
    def test_existent_meta_disk_is_returned(self):
Packit bc9a3a
        actual_disk_path = '/dev/sda'
Packit bc9a3a
        self.mock_existence_of_disk(actual_disk_path)
Packit bc9a3a
        self.assertEqual(
Packit bc9a3a
            actual_disk_path,
Packit bc9a3a
            cc_mounts.sanitize_devname('ephemeral0',
Packit bc9a3a
                                       lambda x: actual_disk_path,
Packit bc9a3a
                                       mock.Mock()))
Packit bc9a3a
Packit bc9a3a
    def test_existent_meta_partition_is_returned(self):
Packit bc9a3a
        disk_name, partition_part = '/dev/sda', '1'
Packit bc9a3a
        actual_partition_path = disk_name + partition_part
Packit bc9a3a
        self.mock_existence_of_partition(disk_name, partition_part)
Packit bc9a3a
        self.assertEqual(
Packit bc9a3a
            actual_partition_path,
Packit bc9a3a
            cc_mounts.sanitize_devname('ephemeral0.1',
Packit bc9a3a
                                       lambda x: disk_name,
Packit bc9a3a
                                       mock.Mock()))
Packit bc9a3a
Packit bc9a3a
    def test_existent_meta_partition_with_p_is_returned(self):
Packit bc9a3a
        disk_name, partition_part = '/dev/sda', 'p1'
Packit bc9a3a
        actual_partition_path = disk_name + partition_part
Packit bc9a3a
        self.mock_existence_of_partition(disk_name, partition_part)
Packit bc9a3a
        self.assertEqual(
Packit bc9a3a
            actual_partition_path,
Packit bc9a3a
            cc_mounts.sanitize_devname('ephemeral0.1',
Packit bc9a3a
                                       lambda x: disk_name,
Packit bc9a3a
                                       mock.Mock()))
Packit bc9a3a
Packit bc9a3a
    def test_first_partition_returned_if_existent_disk_is_partitioned(self):
Packit bc9a3a
        disk_name, partition_part = '/dev/sda', '1'
Packit bc9a3a
        actual_partition_path = disk_name + partition_part
Packit bc9a3a
        self.mock_existence_of_partition(disk_name, partition_part)
Packit bc9a3a
        self.assertEqual(
Packit bc9a3a
            actual_partition_path,
Packit bc9a3a
            cc_mounts.sanitize_devname('ephemeral0',
Packit bc9a3a
                                       lambda x: disk_name,
Packit bc9a3a
                                       mock.Mock()))
Packit bc9a3a
Packit bc9a3a
    def test_nth_partition_returned_if_requested(self):
Packit bc9a3a
        disk_name, partition_part = '/dev/sda', '3'
Packit bc9a3a
        actual_partition_path = disk_name + partition_part
Packit bc9a3a
        self.mock_existence_of_partition(disk_name, partition_part)
Packit bc9a3a
        self.assertEqual(
Packit bc9a3a
            actual_partition_path,
Packit bc9a3a
            cc_mounts.sanitize_devname('ephemeral0.3',
Packit bc9a3a
                                       lambda x: disk_name,
Packit bc9a3a
                                       mock.Mock()))
Packit bc9a3a
Packit bc9a3a
    def test_transformer_returning_none_returns_none(self):
Packit bc9a3a
        self.assertIsNone(
Packit bc9a3a
            cc_mounts.sanitize_devname(
Packit bc9a3a
                'ephemeral0', lambda x: None, mock.Mock()))
Packit bc9a3a
Packit bc9a3a
    def test_missing_device_returns_none(self):
Packit bc9a3a
        self.assertIsNone(
Packit bc9a3a
            cc_mounts.sanitize_devname('/dev/sda', None, mock.Mock()))
Packit bc9a3a
Packit bc9a3a
    def test_missing_sys_returns_none(self):
Packit bc9a3a
        disk_path = '/dev/sda'
Packit bc9a3a
        self._makedirs(disk_path)
Packit bc9a3a
        self.assertIsNone(
Packit bc9a3a
            cc_mounts.sanitize_devname(disk_path, None, mock.Mock()))
Packit bc9a3a
Packit bc9a3a
    def test_existent_disk_but_missing_partition_returns_none(self):
Packit bc9a3a
        disk_path = '/dev/sda'
Packit bc9a3a
        self.mock_existence_of_disk(disk_path)
Packit bc9a3a
        self.assertIsNone(
Packit bc9a3a
            cc_mounts.sanitize_devname(
Packit bc9a3a
                'ephemeral0.1', lambda x: disk_path, mock.Mock()))
Packit bc9a3a
Packit bc9a3a
Packit bc9a3a
class TestFstabHandling(test_helpers.FilesystemMockingTestCase):
Packit bc9a3a
Packit bc9a3a
    swap_path = '/dev/sdb1'
Packit bc9a3a
Packit bc9a3a
    def setUp(self):
Packit bc9a3a
        super(TestFstabHandling, self).setUp()
Packit bc9a3a
        self.new_root = self.tmp_dir()
Packit bc9a3a
        self.patchOS(self.new_root)
Packit bc9a3a
Packit bc9a3a
        self.fstab_path = os.path.join(self.new_root, 'etc/fstab')
Packit bc9a3a
        self._makedirs('/etc')
Packit bc9a3a
Packit bc9a3a
        self.add_patch('cloudinit.config.cc_mounts.FSTAB_PATH',
Packit bc9a3a
                       'mock_fstab_path',
Packit bc9a3a
                       self.fstab_path,
Packit bc9a3a
                       autospec=False)
Packit bc9a3a
Packit bc9a3a
        self.add_patch('cloudinit.config.cc_mounts._is_block_device',
Packit bc9a3a
                       'mock_is_block_device',
Packit bc9a3a
                       return_value=True)
Packit bc9a3a
Packit bc9a3a
        self.add_patch('cloudinit.config.cc_mounts.util.subp',
Packit bc9a3a
                       'm_util_subp')
Packit bc9a3a
Packit bc9a3a
        self.add_patch('cloudinit.config.cc_mounts.util.mounts',
Packit bc9a3a
                       'mock_util_mounts',
Packit bc9a3a
                       return_value={
Packit bc9a3a
                           '/dev/sda1': {'fstype': 'ext4',
Packit bc9a3a
                                         'mountpoint': '/',
Packit bc9a3a
                                         'opts': 'rw,relatime,discard'
Packit bc9a3a
                                         }})
Packit bc9a3a
Packit bc9a3a
        self.mock_cloud = mock.Mock()
Packit bc9a3a
        self.mock_log = mock.Mock()
Packit bc9a3a
        self.mock_cloud.device_name_to_device = self.device_name_to_device
Packit bc9a3a
Packit bc9a3a
    def _makedirs(self, directory):
Packit bc9a3a
        directory = os.path.join(self.new_root, directory.lstrip('/'))
Packit bc9a3a
        if not os.path.exists(directory):
Packit bc9a3a
            os.makedirs(directory)
Packit bc9a3a
Packit bc9a3a
    def device_name_to_device(self, path):
Packit bc9a3a
        if path == 'swap':
Packit bc9a3a
            return self.swap_path
Packit bc9a3a
        else:
Packit bc9a3a
            dev = None
Packit bc9a3a
Packit bc9a3a
        return dev
Packit bc9a3a
Packit bc9a3a
    def test_fstab_no_swap_device(self):
Packit bc9a3a
        '''Ensure that cloud-init adds a discovered swap partition
Packit bc9a3a
        to /etc/fstab.'''
Packit bc9a3a
Packit bc9a3a
        fstab_original_content = ''
Packit bc9a3a
        fstab_expected_content = (
Packit bc9a3a
            '%s\tnone\tswap\tsw,comment=cloudconfig\t'
Packit bc9a3a
            '0\t0\n' % (self.swap_path,)
Packit bc9a3a
        )
Packit bc9a3a
Packit bc9a3a
        with open(cc_mounts.FSTAB_PATH, 'w') as fd:
Packit bc9a3a
            fd.write(fstab_original_content)
Packit bc9a3a
Packit bc9a3a
        cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, [])
Packit bc9a3a
Packit bc9a3a
        with open(cc_mounts.FSTAB_PATH, 'r') as fd:
Packit bc9a3a
            fstab_new_content = fd.read()
Packit bc9a3a
            self.assertEqual(fstab_expected_content, fstab_new_content)
Packit bc9a3a
Packit bc9a3a
    def test_fstab_same_swap_device_already_configured(self):
Packit bc9a3a
        '''Ensure that cloud-init will not add a swap device if the same
Packit bc9a3a
        device already exists in /etc/fstab.'''
Packit bc9a3a
Packit bc9a3a
        fstab_original_content = '%s swap swap defaults 0 0\n' % (
Packit bc9a3a
            self.swap_path,)
Packit bc9a3a
        fstab_expected_content = fstab_original_content
Packit bc9a3a
Packit bc9a3a
        with open(cc_mounts.FSTAB_PATH, 'w') as fd:
Packit bc9a3a
            fd.write(fstab_original_content)
Packit bc9a3a
Packit bc9a3a
        cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, [])
Packit bc9a3a
Packit bc9a3a
        with open(cc_mounts.FSTAB_PATH, 'r') as fd:
Packit bc9a3a
            fstab_new_content = fd.read()
Packit bc9a3a
            self.assertEqual(fstab_expected_content, fstab_new_content)
Packit bc9a3a
Packit bc9a3a
    def test_fstab_alternate_swap_device_already_configured(self):
Packit bc9a3a
        '''Ensure that cloud-init will add a discovered swap device to
Packit bc9a3a
        /etc/fstab even when there exists a swap definition on another
Packit bc9a3a
        device.'''
Packit bc9a3a
Packit bc9a3a
        fstab_original_content = '/dev/sdc1 swap swap defaults 0 0\n'
Packit bc9a3a
        fstab_expected_content = (
Packit bc9a3a
            fstab_original_content +
Packit bc9a3a
            '%s\tnone\tswap\tsw,comment=cloudconfig\t'
Packit bc9a3a
            '0\t0\n' % (self.swap_path,)
Packit bc9a3a
        )
Packit bc9a3a
Packit bc9a3a
        with open(cc_mounts.FSTAB_PATH, 'w') as fd:
Packit bc9a3a
            fd.write(fstab_original_content)
Packit bc9a3a
Packit bc9a3a
        cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, [])
Packit bc9a3a
Packit bc9a3a
        with open(cc_mounts.FSTAB_PATH, 'r') as fd:
Packit bc9a3a
            fstab_new_content = fd.read()
Packit bc9a3a
            self.assertEqual(fstab_expected_content, fstab_new_content)
Packit bc9a3a
Packit bc9a3a
    def test_no_change_fstab_sets_needs_mount_all(self):
Packit bc9a3a
        '''verify unchanged fstab entries are mounted if not call mount -a'''
Packit bc9a3a
        fstab_original_content = (
Packit bc9a3a
            'LABEL=cloudimg-rootfs / ext4 defaults 0 0\n'
Packit bc9a3a
            'LABEL=UEFI /boot/efi vfat defaults 0 0\n'
Packit bc9a3a
            '/dev/vdb /mnt auto defaults,noexec,comment=cloudconfig 0 2\n'
Packit bc9a3a
        )
Packit bc9a3a
        fstab_expected_content = fstab_original_content
Packit bc9a3a
        cc = {'mounts': [
Packit bc9a3a
                 ['/dev/vdb', '/mnt', 'auto', 'defaults,noexec']]}
Packit bc9a3a
        with open(cc_mounts.FSTAB_PATH, 'w') as fd:
Packit bc9a3a
            fd.write(fstab_original_content)
Packit bc9a3a
        with open(cc_mounts.FSTAB_PATH, 'r') as fd:
Packit bc9a3a
            fstab_new_content = fd.read()
Packit bc9a3a
            self.assertEqual(fstab_expected_content, fstab_new_content)
Packit bc9a3a
        cc_mounts.handle(None, cc, self.mock_cloud, self.mock_log, [])
Packit bc9a3a
        self.m_util_subp.assert_has_calls([
Packit bc9a3a
            mock.call(['mount', '-a']),
Packit bc9a3a
            mock.call(['systemctl', 'daemon-reload'])])
Packit bc9a3a
Packit bc9a3a
# vi: ts=4 expandtab