Blob Blame History Raw
import os
import unittest
import time
import overrides_hack

from utils import create_sparse_tempfile, TestTags, tag_test
from gi.repository import BlockDev, GLib


class LoopTestCase(unittest.TestCase):

    requested_plugins = BlockDev.plugin_specs_from_names(("loop",))

    @classmethod
    def setUpClass(cls):
        if not BlockDev.is_initialized():
            BlockDev.init(cls.requested_plugins, None)
        else:
            BlockDev.reinit(cls.requested_plugins, True, None)

    def setUp(self):
        self.addCleanup(self._clean_up)
        self.dev_file = create_sparse_tempfile("loop_test", 1024**3)
        self.loop = None

    def _clean_up(self):
        try:
            BlockDev.loop_teardown(self.loop)
        except:
            pass
        os.unlink(self.dev_file)

class LoopTestSetupBasic(LoopTestCase):
    @tag_test(TestTags.CORE)
    def testLoop_setup_teardown_basic(self):
        """Verify that basic loop_setup and loop_teardown work as expected"""

        succ, self.loop = BlockDev.loop_setup(self.dev_file)
        self.assertTrue(succ)
        self.assertTrue(self.loop)

        succ = BlockDev.loop_teardown(self.loop)
        self.assertTrue(succ)

        # give kernel+udev time to update stuff under /sys and try to get the
        # backing file -- there should be none after the teardown
        time.sleep(1)
        b_file = BlockDev.loop_get_backing_file(self.loop)
        self.assertIsNone(b_file)

class LoopTestSetupOffset(LoopTestCase):
    def testLoop_setup_with_offset(self):
        """Verify that loop_setup with offset specified works as expected"""

        # now test with the offset
        succ, self.loop = BlockDev.loop_setup(self.dev_file, 10 * 1024**2)
        self.assertTrue(succ)
        self.assertTrue(self.loop)

        # should have smaller size due to the offset
        with open("/sys/block/%s/size" % self.loop, "r") as f:
            size = int(f.read()) * 512
        self.assertEqual(size, 1024**3 - 10 * 1024 **2)

        succ = BlockDev.loop_teardown(self.loop)
        self.assertTrue(succ)


class LoopTestSetupOffsetSize(LoopTestCase):
    def testLoop_setup_with_offset_and_size(self):
        """Verify that loop_setup with offset and size specified works as expected"""

        # now test with the offset and size
        succ, self.loop = BlockDev.loop_setup(self.dev_file, 10 * 1024**2, 50 * 1024**2)
        self.assertTrue(succ)
        self.assertTrue(self.loop)

        # should have size as specified
        with open("/sys/block/%s/size" % self.loop, "r") as f:
            size = int(f.read()) * 512
        self.assertEqual(size, 50 * 1024**2)

        succ = BlockDev.loop_teardown(self.loop)
        self.assertTrue(succ)

class LoopTestSetupReadOnly(LoopTestCase):
    def testLoop_setup_read_only(self):
        """Verify that loop_setup with read_only specified works as expected"""
        # test read-only
        succ, self.loop = BlockDev.loop_setup(self.dev_file, 0, 0, True)
        self.assertTrue(succ)
        self.assertTrue(self.loop)

        # should be read-only
        with open("/sys/block/%s/ro" % self.loop, "r") as f:
            self.assertEqual(f.read().strip(), "1")

# XXX: any sane way how to test part_probe=True/False?

class LoopTestGetLoopName(LoopTestCase):
    @tag_test(TestTags.CORE)
    def testLoop_get_loop_name(self):
        """Verify that loop_get_loop_name works as expected"""

        self.assertIs(BlockDev.loop_get_loop_name("/non/existing"), None)

        succ, self.loop = BlockDev.loop_setup(self.dev_file)
        ret_loop = BlockDev.loop_get_loop_name(self.dev_file)
        self.assertEqual(ret_loop, self.loop)

class LoopTestGetBackingFile(LoopTestCase):
    @tag_test(TestTags.CORE)
    def testLoop_get_backing_file(self):
        """Verify that loop_get_backing_file works as expected"""

        self.assertIs(BlockDev.loop_get_backing_file("/non/existing"), None)

        succ, self.loop = BlockDev.loop_setup(self.dev_file)
        f_name = BlockDev.loop_get_backing_file(self.loop)
        self.assertEqual(f_name, self.dev_file)

class LoopTestGetSetAutoclear(LoopTestCase):
    def testLoop_get_set_autoclear(self):
        """Verify that getting and setting the autoclear flag works as expected"""

        with self.assertRaises(GLib.Error):
            BlockDev.loop_get_autoclear("/non/existing")

        with self.assertRaises(GLib.Error):
            BlockDev.loop_set_autoclear("/non/existing", True)

        succ, self.loop = BlockDev.loop_setup(self.dev_file)
        self.assertFalse(BlockDev.loop_get_autoclear(self.loop))

        # open the loop device so that it doesn't disappear once we set
        # autoclear to True (it's otherwise not being used so it may get cleared
        # automatically)
        fd = os.open("/dev/" + self.loop, os.O_RDONLY)
        self.addCleanup(os.close, fd)

        self.assertTrue(BlockDev.loop_set_autoclear(self.loop, True))
        self.assertTrue(BlockDev.loop_get_autoclear(self.loop))

        self.assertTrue(BlockDev.loop_set_autoclear(self.loop, False))
        self.assertFalse(BlockDev.loop_get_autoclear(self.loop))

        # now the same, but with the "/dev/" prefix
        loop = "/dev/" + self.loop
        self.assertTrue(BlockDev.loop_set_autoclear(loop, True))
        self.assertTrue(BlockDev.loop_get_autoclear(loop))

        self.assertTrue(BlockDev.loop_set_autoclear(loop, False))
        self.assertFalse(BlockDev.loop_get_autoclear(loop))