# This file is part of cloud-init. See LICENSE file for license information.
import os.path
from unittest import mock
from cloudinit.config import cc_mounts
from cloudinit.tests import helpers as test_helpers
class TestSanitizeDevname(test_helpers.FilesystemMockingTestCase):
def setUp(self):
super(TestSanitizeDevname, self).setUp()
self.new_root = self.tmp_dir()
self.patchOS(self.new_root)
def _touch(self, path):
path = os.path.join(self.new_root, path.lstrip('/'))
basedir = os.path.dirname(path)
if not os.path.exists(basedir):
os.makedirs(basedir)
open(path, 'a').close()
def _makedirs(self, directory):
directory = os.path.join(self.new_root, directory.lstrip('/'))
if not os.path.exists(directory):
os.makedirs(directory)
def mock_existence_of_disk(self, disk_path):
self._touch(disk_path)
self._makedirs(os.path.join('/sys/block', disk_path.split('/')[-1]))
def mock_existence_of_partition(self, disk_path, partition_number):
self.mock_existence_of_disk(disk_path)
self._touch(disk_path + str(partition_number))
disk_name = disk_path.split('/')[-1]
self._makedirs(os.path.join('/sys/block',
disk_name,
disk_name + str(partition_number)))
def test_existent_full_disk_path_is_returned(self):
disk_path = '/dev/sda'
self.mock_existence_of_disk(disk_path)
self.assertEqual(disk_path,
cc_mounts.sanitize_devname(disk_path,
lambda x: None,
mock.Mock()))
def test_existent_disk_name_returns_full_path(self):
disk_name = 'sda'
disk_path = '/dev/' + disk_name
self.mock_existence_of_disk(disk_path)
self.assertEqual(disk_path,
cc_mounts.sanitize_devname(disk_name,
lambda x: None,
mock.Mock()))
def test_existent_meta_disk_is_returned(self):
actual_disk_path = '/dev/sda'
self.mock_existence_of_disk(actual_disk_path)
self.assertEqual(
actual_disk_path,
cc_mounts.sanitize_devname('ephemeral0',
lambda x: actual_disk_path,
mock.Mock()))
def test_existent_meta_partition_is_returned(self):
disk_name, partition_part = '/dev/sda', '1'
actual_partition_path = disk_name + partition_part
self.mock_existence_of_partition(disk_name, partition_part)
self.assertEqual(
actual_partition_path,
cc_mounts.sanitize_devname('ephemeral0.1',
lambda x: disk_name,
mock.Mock()))
def test_existent_meta_partition_with_p_is_returned(self):
disk_name, partition_part = '/dev/sda', 'p1'
actual_partition_path = disk_name + partition_part
self.mock_existence_of_partition(disk_name, partition_part)
self.assertEqual(
actual_partition_path,
cc_mounts.sanitize_devname('ephemeral0.1',
lambda x: disk_name,
mock.Mock()))
def test_first_partition_returned_if_existent_disk_is_partitioned(self):
disk_name, partition_part = '/dev/sda', '1'
actual_partition_path = disk_name + partition_part
self.mock_existence_of_partition(disk_name, partition_part)
self.assertEqual(
actual_partition_path,
cc_mounts.sanitize_devname('ephemeral0',
lambda x: disk_name,
mock.Mock()))
def test_nth_partition_returned_if_requested(self):
disk_name, partition_part = '/dev/sda', '3'
actual_partition_path = disk_name + partition_part
self.mock_existence_of_partition(disk_name, partition_part)
self.assertEqual(
actual_partition_path,
cc_mounts.sanitize_devname('ephemeral0.3',
lambda x: disk_name,
mock.Mock()))
def test_transformer_returning_none_returns_none(self):
self.assertIsNone(
cc_mounts.sanitize_devname(
'ephemeral0', lambda x: None, mock.Mock()))
def test_missing_device_returns_none(self):
self.assertIsNone(
cc_mounts.sanitize_devname('/dev/sda', None, mock.Mock()))
def test_missing_sys_returns_none(self):
disk_path = '/dev/sda'
self._makedirs(disk_path)
self.assertIsNone(
cc_mounts.sanitize_devname(disk_path, None, mock.Mock()))
def test_existent_disk_but_missing_partition_returns_none(self):
disk_path = '/dev/sda'
self.mock_existence_of_disk(disk_path)
self.assertIsNone(
cc_mounts.sanitize_devname(
'ephemeral0.1', lambda x: disk_path, mock.Mock()))
def test_network_device_returns_network_device(self):
disk_path = 'netdevice:/path'
self.assertEqual(
disk_path,
cc_mounts.sanitize_devname(disk_path, None, mock.Mock()))
class TestSwapFileCreation(test_helpers.FilesystemMockingTestCase):
def setUp(self):
super(TestSwapFileCreation, self).setUp()
self.new_root = self.tmp_dir()
self.patchOS(self.new_root)
self.fstab_path = os.path.join(self.new_root, 'etc/fstab')
self.swap_path = os.path.join(self.new_root, 'swap.img')
self._makedirs('/etc')
self.add_patch('cloudinit.config.cc_mounts.FSTAB_PATH',
'mock_fstab_path',
self.fstab_path,
autospec=False)
self.add_patch('cloudinit.config.cc_mounts.subp.subp',
'm_subp_subp')
self.add_patch('cloudinit.config.cc_mounts.util.mounts',
'mock_util_mounts',
return_value={
'/dev/sda1': {'fstype': 'ext4',
'mountpoint': '/',
'opts': 'rw,relatime,discard'
}})
self.mock_cloud = mock.Mock()
self.mock_log = mock.Mock()
self.mock_cloud.device_name_to_device = self.device_name_to_device
self.cc = {
'swap': {
'filename': self.swap_path,
'size': '512',
'maxsize': '512'}}
def _makedirs(self, directory):
directory = os.path.join(self.new_root, directory.lstrip('/'))
if not os.path.exists(directory):
os.makedirs(directory)
def device_name_to_device(self, path):
if path == 'swap':
return self.swap_path
else:
dev = None
return dev
@mock.patch('cloudinit.util.get_mount_info')
@mock.patch('cloudinit.util.kernel_version')
def test_swap_creation_method_fallocate_on_xfs(self, m_kernel_version,
m_get_mount_info):
m_kernel_version.return_value = (4, 20)
m_get_mount_info.return_value = ["", "xfs"]
cc_mounts.handle(None, self.cc, self.mock_cloud, self.mock_log, [])
self.m_subp_subp.assert_has_calls([
mock.call(['fallocate', '-l', '0M', self.swap_path], capture=True),
mock.call(['mkswap', self.swap_path]),
mock.call(['swapon', '-a'])])
@mock.patch('cloudinit.util.get_mount_info')
@mock.patch('cloudinit.util.kernel_version')
def test_swap_creation_method_xfs(self, m_kernel_version,
m_get_mount_info):
m_kernel_version.return_value = (3, 18)
m_get_mount_info.return_value = ["", "xfs"]
cc_mounts.handle(None, self.cc, self.mock_cloud, self.mock_log, [])
self.m_subp_subp.assert_has_calls([
mock.call(['dd', 'if=/dev/zero',
'of=' + self.swap_path,
'bs=1M', 'count=0'], capture=True),
mock.call(['mkswap', self.swap_path]),
mock.call(['swapon', '-a'])])
@mock.patch('cloudinit.util.get_mount_info')
@mock.patch('cloudinit.util.kernel_version')
def test_swap_creation_method_btrfs(self, m_kernel_version,
m_get_mount_info):
m_kernel_version.return_value = (4, 20)
m_get_mount_info.return_value = ["", "btrfs"]
cc_mounts.handle(None, self.cc, self.mock_cloud, self.mock_log, [])
self.m_subp_subp.assert_has_calls([
mock.call(['dd', 'if=/dev/zero',
'of=' + self.swap_path,
'bs=1M', 'count=0'], capture=True),
mock.call(['mkswap', self.swap_path]),
mock.call(['swapon', '-a'])])
@mock.patch('cloudinit.util.get_mount_info')
@mock.patch('cloudinit.util.kernel_version')
def test_swap_creation_method_ext4(self, m_kernel_version,
m_get_mount_info):
m_kernel_version.return_value = (5, 14)
m_get_mount_info.return_value = ["", "ext4"]
cc_mounts.handle(None, self.cc, self.mock_cloud, self.mock_log, [])
self.m_subp_subp.assert_has_calls([
mock.call(['fallocate', '-l', '0M', self.swap_path], capture=True),
mock.call(['mkswap', self.swap_path]),
mock.call(['swapon', '-a'])])
class TestFstabHandling(test_helpers.FilesystemMockingTestCase):
swap_path = '/dev/sdb1'
def setUp(self):
super(TestFstabHandling, self).setUp()
self.new_root = self.tmp_dir()
self.patchOS(self.new_root)
self.fstab_path = os.path.join(self.new_root, 'etc/fstab')
self._makedirs('/etc')
self.add_patch('cloudinit.config.cc_mounts.FSTAB_PATH',
'mock_fstab_path',
self.fstab_path,
autospec=False)
self.add_patch('cloudinit.config.cc_mounts._is_block_device',
'mock_is_block_device',
return_value=True)
self.add_patch('cloudinit.config.cc_mounts.subp.subp',
'm_subp_subp')
self.add_patch('cloudinit.config.cc_mounts.util.mounts',
'mock_util_mounts',
return_value={
'/dev/sda1': {'fstype': 'ext4',
'mountpoint': '/',
'opts': 'rw,relatime,discard'
}})
self.mock_cloud = mock.Mock()
self.mock_log = mock.Mock()
self.mock_cloud.device_name_to_device = self.device_name_to_device
def _makedirs(self, directory):
directory = os.path.join(self.new_root, directory.lstrip('/'))
if not os.path.exists(directory):
os.makedirs(directory)
def device_name_to_device(self, path):
if path == 'swap':
return self.swap_path
else:
dev = None
return dev
def test_no_fstab(self):
""" Handle images which do not include an fstab. """
self.assertFalse(os.path.exists(cc_mounts.FSTAB_PATH))
fstab_expected_content = (
'%s\tnone\tswap\tsw,comment=cloudconfig\t'
'0\t0\n' % (self.swap_path,)
)
cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, [])
with open(cc_mounts.FSTAB_PATH, 'r') as fd:
fstab_new_content = fd.read()
self.assertEqual(fstab_expected_content, fstab_new_content)
def test_swap_integrity(self):
'''Ensure that the swap file is correctly created and can
swapon successfully. Fixing the corner case of:
kernel: swapon: swapfile has holes'''
fstab = '/swap.img swap swap defaults 0 0\n'
with open(cc_mounts.FSTAB_PATH, 'w') as fd:
fd.write(fstab)
cc = {'swap': ['filename: /swap.img', 'size: 512', 'maxsize: 512']}
cc_mounts.handle(None, cc, self.mock_cloud, self.mock_log, [])
def test_fstab_no_swap_device(self):
'''Ensure that cloud-init adds a discovered swap partition
to /etc/fstab.'''
fstab_original_content = ''
fstab_expected_content = (
'%s\tnone\tswap\tsw,comment=cloudconfig\t'
'0\t0\n' % (self.swap_path,)
)
with open(cc_mounts.FSTAB_PATH, 'w') as fd:
fd.write(fstab_original_content)
cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, [])
with open(cc_mounts.FSTAB_PATH, 'r') as fd:
fstab_new_content = fd.read()
self.assertEqual(fstab_expected_content, fstab_new_content)
def test_fstab_same_swap_device_already_configured(self):
'''Ensure that cloud-init will not add a swap device if the same
device already exists in /etc/fstab.'''
fstab_original_content = '%s swap swap defaults 0 0\n' % (
self.swap_path,)
fstab_expected_content = fstab_original_content
with open(cc_mounts.FSTAB_PATH, 'w') as fd:
fd.write(fstab_original_content)
cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, [])
with open(cc_mounts.FSTAB_PATH, 'r') as fd:
fstab_new_content = fd.read()
self.assertEqual(fstab_expected_content, fstab_new_content)
def test_fstab_alternate_swap_device_already_configured(self):
'''Ensure that cloud-init will add a discovered swap device to
/etc/fstab even when there exists a swap definition on another
device.'''
fstab_original_content = '/dev/sdc1 swap swap defaults 0 0\n'
fstab_expected_content = (
fstab_original_content +
'%s\tnone\tswap\tsw,comment=cloudconfig\t'
'0\t0\n' % (self.swap_path,)
)
with open(cc_mounts.FSTAB_PATH, 'w') as fd:
fd.write(fstab_original_content)
cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, [])
with open(cc_mounts.FSTAB_PATH, 'r') as fd:
fstab_new_content = fd.read()
self.assertEqual(fstab_expected_content, fstab_new_content)
def test_no_change_fstab_sets_needs_mount_all(self):
'''verify unchanged fstab entries are mounted if not call mount -a'''
fstab_original_content = (
'LABEL=cloudimg-rootfs / ext4 defaults 0 0\n'
'LABEL=UEFI /boot/efi vfat defaults 0 0\n'
'/dev/vdb /mnt auto defaults,noexec,comment=cloudconfig 0 2\n'
)
fstab_expected_content = fstab_original_content
cc = {
'mounts': [
['/dev/vdb', '/mnt', 'auto', 'defaults,noexec']
]
}
with open(cc_mounts.FSTAB_PATH, 'w') as fd:
fd.write(fstab_original_content)
with open(cc_mounts.FSTAB_PATH, 'r') as fd:
fstab_new_content = fd.read()
self.assertEqual(fstab_expected_content, fstab_new_content)
cc_mounts.handle(None, cc, self.mock_cloud, self.mock_log, [])
self.m_subp_subp.assert_has_calls([
mock.call(['mount', '-a']),
mock.call(['systemctl', 'daemon-reload'])])
# vi: ts=4 expandtab