Blob Blame History Raw
from __future__ import division

import unittest
import os
import six
import re
import time

from distutils.version import LooseVersion
from distutils.spawn import find_executable

import overrides_hack
from utils import create_sparse_tempfile, create_lio_device, delete_lio_device, fake_utils, fake_path, mount, umount, run_command, TestTags, tag_test
from gi.repository import GLib, BlockDev

TEST_MNT = "/tmp/libblockdev_test_mnt"

def wipefs(device):
    os.system("wipefs -a %s > /dev/null" % device)


class BtrfsTestCase(unittest.TestCase):

    requested_plugins = BlockDev.plugin_specs_from_names(("btrfs",))

    @classmethod
    def setUpClass(cls):

        if not BlockDev.utils_have_kernel_module("btrfs"):
            raise unittest.SkipTest('Btrfs kernel module not available, skipping.')

        if not find_executable("btrfs"):
            raise unittest.SkipTest("btrfs executable not foundin $PATH, skipping.")

        if not BlockDev.is_initialized():
            BlockDev.init(cls.requested_plugins, None)
        else:
            BlockDev.reinit(cls.requested_plugins, True, None)

    def setUp(self):
        self.addCleanup(self._clean_up)
        self.dev_file = create_sparse_tempfile("btrfs_test", 1024**3)
        self.dev_file2 = create_sparse_tempfile("btrfs_test", 1024**3)
        try:
            self.loop_dev = create_lio_device(self.dev_file)
        except RuntimeError as e:
            raise RuntimeError("Failed to setup loop device for testing: %s" % e)
        try:
            self.loop_dev2 = create_lio_device(self.dev_file2)
        except RuntimeError as e:
            raise RuntimeError("Failed to setup loop device for testing: %s" % e)

    def _clean_up(self):
        umount(TEST_MNT)
        try:
            delete_lio_device(self.loop_dev)
        except RuntimeError:
            # just move on, we can do no better here
            pass
        os.unlink(self.dev_file)

        try:
            delete_lio_device(self.loop_dev2)
        except RuntimeError:
            # just move on, we can do no better here
            pass
        os.unlink(self.dev_file2)

    def _get_btrfs_version(self):
        _ret, out, _err = run_command("btrfs --version")
        m = re.search(r"[Bb]trfs.* v([\d\.]+)", out)
        if not m or len(m.groups()) != 1:
            raise RuntimeError("Failed to determine btrfs version from: %s" % out)
        return LooseVersion(m.groups()[0])

class BtrfsTestCreateQuerySimple(BtrfsTestCase):
    @tag_test(TestTags.CORE)
    def test_create_and_query_volume(self):
        """Verify that btrfs volume creation and querying works"""

        with self.assertRaises(GLib.GError):
            BlockDev.btrfs_create_volume([], None, None, None, None)

        with self.assertRaises(GLib.GError):
            BlockDev.btrfs_create_volume(["/non/existing/device"], None, None, None, None)

        with self.assertRaises(GLib.GError):
            BlockDev.btrfs_create_volume([self.loop_dev], None, "RaID7", None, None)

        with self.assertRaises(GLib.GError):
            BlockDev.btrfs_create_volume([self.loop_dev], None, None, "RaID7", None)

        # one device, no label
        succ = BlockDev.btrfs_create_volume([self.loop_dev], None, None, None, None)
        self.assertTrue(succ)

        # already created
        with self.assertRaises(GLib.GError):
            BlockDev.btrfs_create_volume([self.loop_dev], None, None, None, None)

        devs = BlockDev.btrfs_list_devices(self.loop_dev)
        self.assertEqual(len(devs), 1)

class BtrfsTestCreateQueryLabel(BtrfsTestCase):
    def test_create_and_query_volume_label(self):
        """Verify that btrfs volume creation with label works"""

        # one device, with label
        succ = BlockDev.btrfs_create_volume([self.loop_dev], "myShinyBtrfs", None, None, None)
        self.assertTrue(succ)

        devs = BlockDev.btrfs_list_devices(self.loop_dev)
        self.assertEqual(len(devs), 1)


class BtrfsTestCreateQueryTwoDevs(BtrfsTestCase):
    def test_create_and_query_volume_two_devs(self):
        """Verify that btrfs volume creation with two devices works"""

        # two devices, no specific data/metadata layout
        succ = BlockDev.btrfs_create_volume([self.loop_dev, self.loop_dev2], "myShinyBtrfs", None, None, None)
        self.assertTrue(succ)

        devs = BlockDev.btrfs_list_devices(self.loop_dev)
        self.assertEqual(len(devs), 2)

class BtrfsTestCreateQueryTwoDevsRaids(BtrfsTestCase):
    def test_create_and_query_volume_two_devs(self):
        """Verify that btrfs volume creation with two devices and raid (meta)data works"""

        # two devices, raid1 data
        succ = BlockDev.btrfs_create_volume([self.loop_dev, self.loop_dev2], "myShinyBtrfs", "raid1", None, None)
        self.assertTrue(succ)

        devs = BlockDev.btrfs_list_devices(self.loop_dev)
        self.assertEqual(len(devs), 2)

        wipefs(self.loop_dev)
        wipefs(self.loop_dev2)

        # two devices, raid1 metadata
        succ = BlockDev.btrfs_create_volume([self.loop_dev, self.loop_dev2], "myShinyBtrfs", None, "raid1", None)
        self.assertTrue(succ)

        devs = BlockDev.btrfs_list_devices(self.loop_dev)
        self.assertEqual(len(devs), 2)

        wipefs(self.loop_dev)
        wipefs(self.loop_dev2)

        # two devices, raid1 data and metadata
        succ = BlockDev.btrfs_create_volume([self.loop_dev, self.loop_dev2], "myShinyBtrfs",
                                            "raid1", "raid1")
        self.assertTrue(succ)

        devs = BlockDev.btrfs_list_devices(self.loop_dev)
        self.assertEqual(len(devs), 2)

class BtrfsTestAddRemoveDevice(BtrfsTestCase):
    def test_add_remove_device(self):
        """Verify that it is possible to add/remove device to a btrfs volume"""

        succ = BlockDev.btrfs_create_volume([self.loop_dev], "myShinyBtrfs", None, None, None)
        self.assertTrue(succ)

        devs = BlockDev.btrfs_list_devices(self.loop_dev)
        self.assertEqual(len(devs), 1)

        mount(self.loop_dev, TEST_MNT)

        succ = BlockDev.btrfs_add_device(TEST_MNT, self.loop_dev2, None)
        self.assertTrue(succ)

        devs = BlockDev.btrfs_list_devices(self.loop_dev)
        self.assertEqual(len(devs), 2)

        succ = BlockDev.btrfs_remove_device(TEST_MNT, self.loop_dev2, None)
        self.assertTrue(succ)

        devs = BlockDev.btrfs_list_devices(self.loop_dev)
        self.assertEqual(len(devs), 1)

class BtrfsTestCreateDeleteSubvolume(BtrfsTestCase):
    @tag_test(TestTags.CORE)
    def test_create_delete_subvolume(self):
        """Verify that it is possible to create/delete subvolume"""

        btrfs_version = self._get_btrfs_version()
        if btrfs_version >= LooseVersion('4.13.2'):
            self.skipTest('subvolumes list is broken with btrfs-progs v4.13.2')

        succ = BlockDev.btrfs_create_volume([self.loop_dev], "myShinyBtrfs", None, None, None)
        self.assertTrue(succ)

        mount(self.loop_dev, TEST_MNT)

        subvols = BlockDev.btrfs_list_subvolumes(TEST_MNT, False)
        self.assertEqual(len(subvols), 0)

        succ = BlockDev.btrfs_create_subvolume(TEST_MNT, "subvol1", None)
        self.assertTrue(succ)

        subvols = BlockDev.btrfs_list_subvolumes(TEST_MNT, False)
        self.assertEqual(len(subvols), 1)

        # already there
        with self.assertRaises(GLib.GError):
            BlockDev.btrfs_create_subvolume(TEST_MNT, "subvol1", None)

        succ = BlockDev.btrfs_delete_subvolume(TEST_MNT, "subvol1", None)
        self.assertTrue(succ)

        subvols = BlockDev.btrfs_list_subvolumes(TEST_MNT, False)
        self.assertEqual(len(subvols), 0)

        # already removed
        with self.assertRaises(GLib.GError):
            BlockDev.btrfs_delete_subvolume(TEST_MNT, "subvol1", None)

        succ = BlockDev.btrfs_create_subvolume(TEST_MNT, "subvol1", None)
        self.assertTrue(succ)

        # add it back
        subvols = BlockDev.btrfs_list_subvolumes(TEST_MNT, False)
        self.assertEqual(len(subvols), 1)

        # and create another subvolume in it
        succ = BlockDev.btrfs_create_subvolume(os.path.join(TEST_MNT, "subvol1"), "subvol1.1", None)
        self.assertTrue(succ)

        subvols = BlockDev.btrfs_list_subvolumes(TEST_MNT, False)
        self.assertEqual(len(subvols), 2)

        # make sure subvolumes are sorted properly (parents before children)
        seen = set()
        for subvol in subvols:
            seen.add(subvol)
            self.assertTrue(subvol.parent_id == BlockDev.BTRFS_MAIN_VOLUME_ID or any(subvol.parent_id == other.id for other in seen))

class BtrfsTestCreateSnapshot(BtrfsTestCase):
    def test_create_snapshot(self):
        succ = BlockDev.btrfs_create_volume([self.loop_dev], "myShinyBtrfs", None, None, None)
        self.assertTrue(succ)

        mount(self.loop_dev, TEST_MNT)

        subvols = BlockDev.btrfs_list_subvolumes(TEST_MNT, True)
        self.assertEqual(len(subvols), 0)

        # R/W snapshot
        succ = BlockDev.btrfs_create_snapshot(TEST_MNT, TEST_MNT + "/snap1", False, None)
        self.assertTrue(succ)

        subvols = BlockDev.btrfs_list_subvolumes(TEST_MNT, True)
        self.assertEqual(len(subvols), 1)

        # RO snapshot
        succ = BlockDev.btrfs_create_snapshot(TEST_MNT, TEST_MNT + "/snap2", True, None)
        self.assertTrue(succ)

        subvols = BlockDev.btrfs_list_subvolumes(TEST_MNT, True)
        self.assertEqual(len(subvols), 2)

class BtrfsTestGetDefaultSubvolumeID(BtrfsTestCase):
    def test_get_default_subvolume_id(self):
        """Verify that getting default subvolume ID works as expected"""

        succ = BlockDev.btrfs_create_volume([self.loop_dev], "myShinyBtrfs", None, None, None)
        self.assertTrue(succ)

        # not mounted yet, should fail
        with six.assertRaisesRegex(self, GLib.GError, r".*(can't|cannot) access.*"):
            ret = BlockDev.btrfs_get_default_subvolume_id(TEST_MNT)

        mount(self.loop_dev, TEST_MNT)

        ret = BlockDev.btrfs_get_default_subvolume_id(TEST_MNT)
        self.assertEqual(ret, 5)

class BtrfsTestSetDefaultSubvolumeID(BtrfsTestCase):
    def test_set_default_subvolume(self):
        """Verify that setting default subvolume works as expected"""

        succ = BlockDev.btrfs_create_volume([self.loop_dev], "myShinyBtrfs", None, None, None)
        self.assertTrue(succ)

        mount(self.loop_dev, TEST_MNT)

        ret = BlockDev.btrfs_get_default_subvolume_id(TEST_MNT)
        self.assertEqual(ret, 5)

        succ = BlockDev.btrfs_create_subvolume(TEST_MNT, "subvol1", None)
        self.assertTrue(succ)

        subvols = BlockDev.btrfs_list_subvolumes(TEST_MNT, False)
        self.assertEqual(len(subvols), 1)

        new_id = next((subvol.id for subvol in subvols), None)
        self.assertIsNot(new_id, None)
        succ = BlockDev.btrfs_set_default_subvolume(TEST_MNT, new_id, None)
        self.assertTrue(succ)
        ret = BlockDev.btrfs_get_default_subvolume_id(TEST_MNT)
        self.assertEqual(ret, new_id)

        succ = BlockDev.btrfs_set_default_subvolume(TEST_MNT, 5, None)
        self.assertTrue(succ)
        ret = BlockDev.btrfs_get_default_subvolume_id(TEST_MNT)
        self.assertEqual(ret, 5)

class BtrfsTestListDevices(BtrfsTestCase):
    @tag_test(TestTags.CORE)
    def test_list_devices(self):
        """Verify that it is possible to get info about devices"""

        succ = BlockDev.btrfs_create_volume([self.loop_dev, self.loop_dev2], "myShinyBtrfs", None, None, None)
        self.assertTrue(succ)

        devs = BlockDev.btrfs_list_devices(self.loop_dev)
        self.assertEqual(len(devs), 2)
        self.assertEqual(devs[0].id, 1)
        self.assertEqual(devs[1].id, 2)
        self.assertEqual(devs[0].path, self.loop_dev)
        self.assertEqual(devs[1].path, self.loop_dev2)
        self.assertTrue(devs[0].size >= 0)
        self.assertTrue(devs[1].size >= 0)
        self.assertTrue(devs[0].used >= 0)
        self.assertTrue(devs[1].used >= 0)

class BtrfsTestListSubvolumes(BtrfsTestCase):
    @tag_test(TestTags.CORE)
    def test_list_subvolumes(self):
        """Verify that it is possible to get info about subvolumes"""

        succ = BlockDev.btrfs_create_volume([self.loop_dev], "myShinyBtrfs", None, None, None)
        self.assertTrue(succ)

        mount(self.loop_dev, TEST_MNT)

        subvols = BlockDev.btrfs_list_subvolumes(TEST_MNT, True)
        self.assertEqual(len(subvols), 0)

        succ = BlockDev.btrfs_create_subvolume(TEST_MNT, "subvol1", None)
        self.assertTrue(succ)

        subvols = BlockDev.btrfs_list_subvolumes(TEST_MNT, False)
        self.assertEqual(len(subvols), 1)
        self.assertEqual(subvols[0].parent_id, 5)
        self.assertEqual(subvols[0].path, "subvol1")

class BtrfsTestFilesystemInfo(BtrfsTestCase):
    @tag_test(TestTags.CORE)
    def test_filesystem_info(self):
        """Verify that it is possible to get filesystem info"""

        label = "My 'Shiny' Btrfs"
        succ = BlockDev.btrfs_create_volume([self.loop_dev], label, None, None, None)
        self.assertTrue(succ)

        mount(self.loop_dev, TEST_MNT)

        info = BlockDev.btrfs_filesystem_info(TEST_MNT)
        self.assertTrue(info)
        self.assertEqual(info.label, label)
        self.assertTrue(info.uuid)
        self.assertEqual(info.num_devices, 1)
        self.assertTrue(info.used >= 0)

class BtrfsTestFilesystemInfoNoLabel(BtrfsTestCase):
    def test_filesystem_info(self):
        """Verify that it is possible to get filesystem info for a volume with no label"""

        succ = BlockDev.btrfs_create_volume([self.loop_dev], None, None, None, None)
        self.assertTrue(succ)

        mount(self.loop_dev, TEST_MNT)

        info = BlockDev.btrfs_filesystem_info(TEST_MNT)
        self.assertEqual(info.label, str())
        self.assertTrue(info.uuid)
        self.assertEqual(info.num_devices, 1)
        self.assertTrue(info.used >= 0)

class BtrfsTestMkfs(BtrfsTestCase):
    @tag_test(TestTags.CORE)
    def test_mkfs(self):
        """Verify that it is possible to create a btrfs filesystem"""

        with self.assertRaises(GLib.GError):
            BlockDev.btrfs_mkfs([], None, None, None, None)

        with self.assertRaises(GLib.GError):
            BlockDev.btrfs_mkfs(["/non/existing/device"], None, None, None, None)

        with self.assertRaises(GLib.GError):
            BlockDev.btrfs_mkfs([self.loop_dev], None, "RaID7", None, None)

        with self.assertRaises(GLib.GError):
            BlockDev.btrfs_mkfs([self.loop_dev], None, None, "RaID7", None)

        # one device, no label
        succ = BlockDev.btrfs_mkfs([self.loop_dev], None, None, None, None)
        self.assertTrue(succ)

        # already created
        with self.assertRaises(GLib.GError):
            BlockDev.btrfs_mkfs([self.loop_dev], None, None, None, None)

        devs = BlockDev.btrfs_list_devices(self.loop_dev)
        self.assertEqual(len(devs), 1)

class BtrfsTestMkfsLabel(BtrfsTestCase):
    def test_mkfs_label(self):
        """Verify that it is possible to create a btrfs filesystem with a label"""

        succ = BlockDev.btrfs_mkfs([self.loop_dev], "myShinyBtrfs", None, None, None)
        self.assertTrue(succ)

        devs = BlockDev.btrfs_list_devices(self.loop_dev)
        self.assertEqual(len(devs), 1)

class BtrfsTestResize(BtrfsTestCase):
    def test_resize(self):
        """Verify that is is possible to resize a btrfs filesystem"""

        succ = BlockDev.btrfs_create_volume([self.loop_dev], None, None, None, None)
        self.assertTrue(succ)

        mount(self.loop_dev, TEST_MNT)

        succ = BlockDev.btrfs_resize(TEST_MNT, 500 * 1024**2, None)
        self.assertTrue(succ)

class BtrfsTestCheck(BtrfsTestCase):
    def test_check(self):
        """Verify that it's possible to check the btrfs filesystem"""

        succ = BlockDev.btrfs_create_volume([self.loop_dev], None, None, None, None)
        self.assertTrue(succ)

        succ = BlockDev.btrfs_check(self.loop_dev, None)
        self.assertTrue(succ)

class BtrfsTestRepair(BtrfsTestCase):
    def test_repair(self):
        """Verify that it's possible to repair the btrfs filesystem"""

        succ = BlockDev.btrfs_create_volume([self.loop_dev], None, None, None, None)
        self.assertTrue(succ)
        time.sleep(1)

        succ = BlockDev.btrfs_repair(self.loop_dev, None)
        self.assertTrue(succ)

class BtrfsTestChangeLabel(BtrfsTestCase):
    def test_change_label(self):
        """Verify that it's possible to change btrfs filesystem's label"""

        succ = BlockDev.btrfs_create_volume([self.loop_dev], "myShinyBtrfs", None, None, None)
        self.assertTrue(succ)

        mount(self.loop_dev, TEST_MNT)

        succ = BlockDev.btrfs_change_label(TEST_MNT, "newLabel")
        self.assertTrue(succ)

        info = BlockDev.btrfs_filesystem_info(TEST_MNT)
        self.assertEqual(info.label, "newLabel")

class BtrfsTooSmallTestCase (BtrfsTestCase):
    def setUp(self):
        self.addCleanup(self._clean_up)
        self.dev_file = create_sparse_tempfile("btrfs_test", BlockDev.BTRFS_MIN_MEMBER_SIZE)
        self.dev_file2 = create_sparse_tempfile("btrfs_test", BlockDev.BTRFS_MIN_MEMBER_SIZE//2)
        try:
            self.loop_dev = create_lio_device(self.dev_file)
        except RuntimeError as e:
            raise RuntimeError("Failed to setup loop device for testing: %s" % e)
        try:
            self.loop_dev2 = create_lio_device(self.dev_file2)
        except RuntimeError as e:
            raise RuntimeError("Failed to setup loop device for testing: %s" % e)

    def _clean_up(self):
        try:
            delete_lio_device(self.loop_dev)
        except RuntimeError:
            # just move on, we can do no better here
            pass
        os.unlink(self.dev_file)

        try:
            delete_lio_device(self.loop_dev2)
        except RuntimeError:
            # just move on, we can do no better here
            pass
        os.unlink(self.dev_file2)

    def test_create_too_small(self):
        """Verify that an attempt to create BTRFS on a too small device fails"""

        # even one small devices is enough for the fail
        with self.assertRaises(GLib.GError):
            BlockDev.btrfs_create_volume([self.loop_dev, self.loop_dev2],
                                         None, None, None)

class BtrfsJustBigEnoughTestCase (BtrfsTestCase):
    def setUp(self):
        self.addCleanup(self._clean_up)
        self.dev_file = create_sparse_tempfile("btrfs_test", BlockDev.BTRFS_MIN_MEMBER_SIZE)
        self.dev_file2 = create_sparse_tempfile("btrfs_test", BlockDev.BTRFS_MIN_MEMBER_SIZE)
        try:
            self.loop_dev = create_lio_device(self.dev_file)
        except RuntimeError as e:
            raise RuntimeError("Failed to setup loop device for testing: %s" % e)
        try:
            self.loop_dev2 = create_lio_device(self.dev_file2)
        except RuntimeError as e:
            raise RuntimeError("Failed to setup loop device for testing: %s" % e)

    def _clean_up(self):
        try:
            delete_lio_device(self.loop_dev)
        except RuntimeError:
            # just move on, we can do no better here
            pass
        os.unlink(self.dev_file)

        try:
            delete_lio_device(self.loop_dev2)
        except RuntimeError:
            # just move on, we can do no better here
            pass
        os.unlink(self.dev_file2)

    def test_create_just_enough(self):
        """Verify that creating BTRFS on a just big enough devices works"""

        succ = BlockDev.btrfs_create_volume([self.loop_dev, self.loop_dev2],
                                            None, None, None)
        self.assertTrue(succ)


class FakeBtrfsUtilsTestCase(BtrfsTestCase):
    # no setUp nor tearDown needed, we are gonna use fake utils
    def setUp(self):
        pass

    @tag_test(TestTags.NOSTORAGE)
    def test_list_subvols_weird_docker_data(self):
        """Verify that list_subvolumes works as expected on weird data from one Docker use case"""

        with fake_utils("tests/btrfs_subvols_docker"):
            subvols = BlockDev.btrfs_list_subvolumes("fake_dev", False)

        # make sure subvolumes are sorted properly (parents before children)
        seen = set()
        for subvol in subvols:
            seen.add(subvol)
            self.assertTrue(subvol.parent_id == BlockDev.BTRFS_MAIN_VOLUME_ID or any(subvol.parent_id == other.id for other in seen))

        # check that one of the weird subvols is in the list of subvolumes
        self.assertTrue(any(subvol for subvol in subvols if subvol.path == "docker/btrfs/subvolumes/f2062b736fbabbe4da752632ac4deae87fcb916add6d7d8f5cecee4cbdc41fd9"))

class BTRFSUnloadTest(BtrfsTestCase):
    def setUp(self):
        # make sure the library is initialized with all plugins loaded for other
        # tests
        self.addCleanup(BlockDev.reinit, self.requested_plugins, True, None)

    @tag_test(TestTags.NOSTORAGE)
    def test_check_low_version(self):
        """Verify that checking the minimum BTRFS version works as expected"""

        # unload all plugins first
        self.assertTrue(BlockDev.reinit([], True, None))

        with fake_utils("tests/btrfs_low_version/"):
            # too low version of BTRFS available, the BTRFS plugin should fail to load
            with self.assertRaises(GLib.GError):
                BlockDev.reinit(self.requested_plugins, True, None)

            self.assertNotIn("btrfs", BlockDev.get_available_plugin_names())

        # load the plugins back
        self.assertTrue(BlockDev.reinit(self.requested_plugins, True, None))
        self.assertIn("btrfs", BlockDev.get_available_plugin_names())

    @tag_test(TestTags.NOSTORAGE)
    def test_check_new_version_format(self):
        """Verify that checking the minimum BTRFS version works as expected with the new format"""

        # unload all plugins first
        self.assertTrue(BlockDev.reinit([], True, None))

        # check that new version format is correctly parsed
        with fake_utils("tests/btrfs_new_version_format/"):
            BlockDev.reinit(self.requested_plugins, True, None)

        self.assertIn("btrfs", BlockDev.get_available_plugin_names())

        BlockDev.reinit(self.requested_plugins, True, None)
        self.assertIn("btrfs", BlockDev.get_available_plugin_names())

    @tag_test(TestTags.NOSTORAGE)
    def test_check_no_btrfs(self):
        """Verify that checking btrfs tool availability works as expected"""

        # unload all plugins first
        self.assertTrue(BlockDev.reinit([], True, None))

        with fake_path(all_but="btrfs"):
            # no btrfs tool available, the BTRFS plugin should fail to load
            with self.assertRaises(GLib.GError):
                BlockDev.reinit(self.requested_plugins, True, None)

            self.assertNotIn("btrfs", BlockDev.get_available_plugin_names())

        # load the plugins back
        self.assertTrue(BlockDev.reinit(self.requested_plugins, True, None))
        self.assertIn("btrfs", BlockDev.get_available_plugin_names())