Blob Blame History Raw
from __future__ import division
import unittest
import os
import math
import overrides_hack
import six
import re
import shutil
import subprocess
from distutils.version import LooseVersion
from itertools import chain

from utils import create_sparse_tempfile, create_lio_device, delete_lio_device, run_command, TestTags, tag_test
from gi.repository import BlockDev, GLib

import dbus
sb = dbus.SystemBus()
lvm_dbus_running = any("lvmdbus" in name for name in chain(sb.list_names(), sb.list_activatable_names()))


class LVMTestCase(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        if lvm_dbus_running:
            # force the new plugin to be used
            cls.ps = BlockDev.PluginSpec()
            cls.ps.name = BlockDev.Plugin.LVM
            cls.ps.so_name = "libbd_lvm-dbus.so.2"
            cls.ps2 = BlockDev.PluginSpec()
            cls.ps2.name = BlockDev.Plugin.LOOP
            if not BlockDev.is_initialized():
                BlockDev.init([cls.ps, cls.ps2], None)
            else:
                BlockDev.reinit([cls.ps, cls.ps2], True, None)

    @classmethod
    def _get_lvm_version(cls):
        _ret, out, _err = run_command("lvm version")
        m = re.search(r"LVM version:\s+([\d\.]+)", out)
        if not m or len(m.groups()) != 1:
            raise RuntimeError("Failed to determine LVM version from: %s" % out)
        return LooseVersion(m.groups()[0])

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmNoDevTestCase(LVMTestCase):

    def __init__(self, *args, **kwargs):
        super(LvmNoDevTestCase, self).__init__(*args, **kwargs)
        self._log = ""

    @tag_test(TestTags.NOSTORAGE)
    def test_is_supported_pe_size(self):
        """Verify that lvm_is_supported_pe_size works as expected"""

        self.assertTrue(BlockDev.lvm_is_supported_pe_size(4 * 1024))
        self.assertTrue(BlockDev.lvm_is_supported_pe_size(4 * 1024**2))
        self.assertTrue(BlockDev.lvm_is_supported_pe_size(6 * 1024**2))
        self.assertTrue(BlockDev.lvm_is_supported_pe_size(12 * 1024**2))
        self.assertTrue(BlockDev.lvm_is_supported_pe_size(15 * 1024**2))
        self.assertTrue(BlockDev.lvm_is_supported_pe_size(4 * 1024**3))

        self.assertFalse(BlockDev.lvm_is_supported_pe_size(512))
        self.assertFalse(BlockDev.lvm_is_supported_pe_size(4097))
        self.assertFalse(BlockDev.lvm_is_supported_pe_size(65535))
        self.assertFalse(BlockDev.lvm_is_supported_pe_size(32 * 1024**3))

    @tag_test(TestTags.NOSTORAGE)
    def test_get_supported_pe_sizes(self):
        """Verify that supported PE sizes are really supported"""

        for size in BlockDev.lvm_get_supported_pe_sizes():
            self.assertTrue(BlockDev.lvm_is_supported_pe_size(size))

    @tag_test(TestTags.NOSTORAGE)
    def test_get_max_lv_size(self):
        """Verify that max LV size is correctly determined"""

        if os.uname()[-1] == "i686":
            # 32-bit arch
            expected = 16 * 1024**4
        else:
            # 64-bit arch
            expected = 8 * 1024**6

        self.assertEqual(BlockDev.lvm_get_max_lv_size(), expected)

    @tag_test(TestTags.NOSTORAGE)
    def test_round_size_to_pe(self):
        """Verify that round_size_to_pe works as expected"""

        self.assertEqual(BlockDev.lvm_round_size_to_pe(11 * 1024**2, 4 * 1024**2, True), 12 * 1024**2)
        self.assertEqual(BlockDev.lvm_round_size_to_pe(11 * 1024**2, 4 * 1024**2, False), 8 * 1024**2)

        self.assertEqual(BlockDev.lvm_round_size_to_pe(11 * 1024**2, 6 * 1024**2, True), 12 * 1024**2)
        self.assertEqual(BlockDev.lvm_round_size_to_pe(11 * 1024**2, 6 * 1024**2, False), 6 * 1024**2)

        # default PE size is 4 MiB
        self.assertEqual(BlockDev.lvm_round_size_to_pe(11 * 1024**2, 0, True), 12 * 1024**2)
        self.assertEqual(BlockDev.lvm_round_size_to_pe(11 * 1024**2, 0, False), 8 * 1024**2)

        # cannot round up to GLib.MAXUINT64, but can round up over GLib.MAXUINT64 (should round down in that case)
        biggest_multiple = (GLib.MAXUINT64 // (4 * 1024**2)) * (4 * 1024**2)
        self.assertEqual(BlockDev.lvm_round_size_to_pe(biggest_multiple + (2 * 1024**2), 4 * 1024**2, True),
                         biggest_multiple)
        self.assertEqual(BlockDev.lvm_round_size_to_pe(biggest_multiple + (2 * 1024**2), 4 * 1024**2, False),
                         biggest_multiple)
        self.assertEqual(BlockDev.lvm_round_size_to_pe(biggest_multiple - (2 * 4 * 1024**2) + 1, 4 * 1024**2, True),
                         biggest_multiple - (4 * 1024**2))
        self.assertEqual(BlockDev.lvm_round_size_to_pe(biggest_multiple - (2 * 4 * 1024**2) + 1, 4 * 1024**2, False),
                         biggest_multiple - (2 * 4 * 1024**2))

    @tag_test(TestTags.NOSTORAGE)
    def test_get_lv_physical_size(self):
        """Verify that get_lv_physical_size works as expected"""

        self.assertEqual(BlockDev.lvm_get_lv_physical_size(25 * 1024**3, 4 * 1024**2),
                         25 * 1024**3)

        # default PE size is 4 MiB
        self.assertEqual(BlockDev.lvm_get_lv_physical_size(25 * 1024**3, 0),
                         25 * 1024**3)

        self.assertEqual(BlockDev.lvm_get_lv_physical_size(11 * 1024**2, 4 * 1024**2),
                         12 * 1024**2)

    @tag_test(TestTags.NOSTORAGE)
    def test_get_thpool_padding(self):
        """Verify that get_thpool_padding works as expected"""

        expected_padding = BlockDev.lvm_round_size_to_pe(int(math.ceil(11 * 1024**2 * 0.2)),
                                                         4 * 1024**2, True)
        self.assertEqual(BlockDev.lvm_get_thpool_padding(11 * 1024**2, 4 * 1024**2, False),
                         expected_padding)

        expected_padding = BlockDev.lvm_round_size_to_pe(int(math.ceil(11 * 1024**2 * (1.0/6.0))),
                                                         4 * 1024**2, True)
        self.assertEqual(BlockDev.lvm_get_thpool_padding(11 * 1024**2, 4 * 1024**2, True),
                         expected_padding)

    @tag_test(TestTags.NOSTORAGE)
    def test_get_thpool_meta_size(self):
        """Verify that getting recommended thin pool metadata size works as expected"""

        # metadata size is calculated as 64 * pool_size / chunk_size
        self.assertEqual(BlockDev.lvm_get_thpool_meta_size(1 * 1024**4, 64 * 1024), 1 * 1024**3)

        self.assertEqual(BlockDev.lvm_get_thpool_meta_size(1 * 1024**4, 128 * 1024),  512 * 1024**2)

        # lower limit is 4 MiB
        self.assertEqual(BlockDev.lvm_get_thpool_meta_size(100 * 1024**2, 128 * 1024),
                         BlockDev.LVM_MIN_THPOOL_MD_SIZE)

        # upper limit is 31.62 GiB
        self.assertEqual(BlockDev.lvm_get_thpool_meta_size(100 * 1024**4, 64 * 1024),
                         BlockDev.LVM_MAX_THPOOL_MD_SIZE)

    @tag_test(TestTags.NOSTORAGE)
    def test_is_valid_thpool_md_size(self):
        """Verify that is_valid_thpool_md_size works as expected"""

        self.assertTrue(BlockDev.lvm_is_valid_thpool_md_size(4 * 1024**2))
        self.assertTrue(BlockDev.lvm_is_valid_thpool_md_size(5 * 1024**2))
        self.assertTrue(BlockDev.lvm_is_valid_thpool_md_size(15 * 1024**3))

        self.assertFalse(BlockDev.lvm_is_valid_thpool_md_size(1 * 1024**2))
        self.assertFalse(BlockDev.lvm_is_valid_thpool_md_size(3 * 1024**2))
        self.assertFalse(BlockDev.lvm_is_valid_thpool_md_size(16 * 1024**3))
        self.assertFalse(BlockDev.lvm_is_valid_thpool_md_size(32 * 1024**3))

    @tag_test(TestTags.NOSTORAGE)
    def test_is_valid_thpool_chunk_size(self):
        """Verify that is_valid_thpool_chunk_size works as expected"""

        # 64 KiB is OK with or without discard
        self.assertTrue(BlockDev.lvm_is_valid_thpool_chunk_size(64 * 1024, True))
        self.assertTrue(BlockDev.lvm_is_valid_thpool_chunk_size(64 * 1024, False))

        # 192 KiB is OK without discard, but NOK with discard
        self.assertTrue(BlockDev.lvm_is_valid_thpool_chunk_size(192 * 1024, False))
        self.assertFalse(BlockDev.lvm_is_valid_thpool_chunk_size(192 * 1024, True))

        # 191 KiB is NOK in both cases
        self.assertFalse(BlockDev.lvm_is_valid_thpool_chunk_size(191 * 1024, False))
        self.assertFalse(BlockDev.lvm_is_valid_thpool_chunk_size(191 * 1024, True))

    def _store_log(self, lvl, msg):
        self._log += str((lvl, msg))

    @tag_test(TestTags.NOSTORAGE)
    def test_get_set_global_config(self):
        """Verify that getting and setting global config works as expected"""

        # setup logging
        self.assertTrue(BlockDev.reinit([self.ps], False, self._store_log))

        # no global config set initially
        self.assertEqual(BlockDev.lvm_get_global_config(), "")

        # make sure we don't leave the config in some problematic shape
        self.addCleanup(BlockDev.lvm_set_global_config, None)

        # set and try to get back
        succ = BlockDev.lvm_set_global_config("bla")
        self.assertTrue(succ)
        self.assertEqual(BlockDev.lvm_get_global_config(), "bla")

        # reset and try to get back
        succ = BlockDev.lvm_set_global_config(None)
        self.assertTrue(succ)
        self.assertEqual(BlockDev.lvm_get_global_config(), "")

        # set twice and try to get back twice
        succ = BlockDev.lvm_set_global_config("foo")
        self.assertTrue(succ)
        succ = BlockDev.lvm_set_global_config("bla")
        self.assertTrue(succ)
        self.assertEqual(BlockDev.lvm_get_global_config(), "bla")
        self.assertEqual(BlockDev.lvm_get_global_config(), "bla")

        # set something sane and check it's really used
        succ = BlockDev.lvm_set_global_config("backup {backup=0 archive=0}")
        BlockDev.lvm_pvscan(None, False, None)
        self.assertIn("'--config'", self._log)
        self.assertIn("'backup {backup=0 archive=0}'", self._log)

        # reset back to default
        succ = BlockDev.lvm_set_global_config(None)
        self.assertTrue(succ)

    @tag_test(TestTags.NOSTORAGE)
    def test_cache_get_default_md_size(self):
        """Verify that default cache metadata size is calculated properly"""

        # 1000x smaller than the data LV size, but at least 8 MiB
        self.assertEqual(BlockDev.lvm_cache_get_default_md_size(100 * 1024**3), (100 * 1024**3) // 1000)
        self.assertEqual(BlockDev.lvm_cache_get_default_md_size(80 * 1024**3), (80 * 1024**3) // 1000)
        self.assertEqual(BlockDev.lvm_cache_get_default_md_size(6 * 1024**3), 8 * 1024**2)

    @tag_test(TestTags.NOSTORAGE)
    def test_cache_mode_bijection(self):
        """Verify that cache modes and their string representations map to each other"""

        mode_strs = {BlockDev.LVMCacheMode.WRITETHROUGH: "writethrough",
                     BlockDev.LVMCacheMode.WRITEBACK: "writeback",
                     BlockDev.LVMCacheMode.UNKNOWN: "unknown",
        }
        for mode in mode_strs.keys():
            self.assertEqual(BlockDev.lvm_cache_get_mode_str(mode), mode_strs[mode])
            self.assertEqual(BlockDev.lvm_cache_get_mode_from_str(mode_strs[mode]), mode)

        with self.assertRaises(GLib.GError):
            BlockDev.lvm_cache_get_mode_from_str("bla")

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmPVonlyTestCase(LVMTestCase):
    # :TODO:
    #     * test pvmove (must create two PVs, a VG, a VG and some data in it
    #       first)
    #     * some complex test for pvs, vgs, lvs, pvinfo, vginfo and lvinfo
    def setUp(self):
        self.addCleanup(self._clean_up)
        self.dev_file = create_sparse_tempfile("lvm_test", 1024**3)
        self.dev_file2 = create_sparse_tempfile("lvm_test", 1024**3)
        try:
            self.loop_dev = create_lio_device(self.dev_file)
        except RuntimeError as e:
            raise RuntimeError("Failed to setup loop device for testing: %s" % e)
        try:
            self.loop_dev2 = create_lio_device(self.dev_file2)
        except RuntimeError as e:
            raise RuntimeError("Failed to setup loop device for testing: %s" % e)

    def _clean_up(self):
        try:
            BlockDev.lvm_pvremove(self.loop_dev, None)
        except:
            pass

        try:
            BlockDev.lvm_pvremove(self.loop_dev2, None)
        except:
            pass

        try:
            delete_lio_device(self.loop_dev)
        except RuntimeError:
            # just move on, we can do no better here
            pass
        os.unlink(self.dev_file)

        try:
            delete_lio_device(self.loop_dev2)
        except RuntimeError:
            # just move on, we can do no better here
            pass
        os.unlink(self.dev_file2)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestPVcreateRemove(LvmPVonlyTestCase):
    @tag_test(TestTags.CORE)
    def test_pvcreate_and_pvremove(self):
        """Verify that it's possible to create and destroy a PV"""

        with self.assertRaises(GLib.GError):
            BlockDev.lvm_pvcreate("/non/existing/device", 0, 0, None)

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvremove(self.loop_dev, None)
        self.assertTrue(succ)

        # this time try to specify data_alignment and metadata_size
        succ = BlockDev.lvm_pvcreate(self.loop_dev, 2*1024**2, 4*1024**2, None)
        self.assertTrue(succ)

        with self.assertRaises(GLib.GError):
            BlockDev.lvm_pvremove("/non/existing/device", None)

        succ = BlockDev.lvm_pvremove(self.loop_dev, None)
        self.assertTrue(succ)

        # already removed -- not an issue
        succ = BlockDev.lvm_pvremove(self.loop_dev, None)
        self.assertTrue(succ)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestPVresize(LvmPVonlyTestCase):
    def test_pvresize(self):
        """Verify that it's possible to resize a PV"""

        with self.assertRaises(GLib.GError):
            succ = BlockDev.lvm_pvresize(self.loop_dev, 200 * 1024**2, None)

        with self.assertRaises(GLib.GError):
            succ = BlockDev.lvm_pvresize("/non/existing/device", 200 * 1024**2, None)

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvresize(self.loop_dev, 200 * 1024**2, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvresize(self.loop_dev, 200 * 1024**3, None)
        self.assertTrue(succ)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestPVscan(LvmPVonlyTestCase):
    def test_pvscan(self):
        """Verify that pvscan runs without issues with cache or without"""

        succ = BlockDev.lvm_pvscan(None, False, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvscan(self.loop_dev, True, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvscan(None, True, None)
        self.assertTrue(succ)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestPVinfo(LvmPVonlyTestCase):
    def test_pvinfo(self):
        """Verify that it's possible to gather info about a PV"""

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        info = BlockDev.lvm_pvinfo(self.loop_dev)
        self.assertTrue(info)
        self.assertEqual(info.pv_name, self.loop_dev)
        self.assertTrue(info.pv_uuid)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestPVs(LvmPVonlyTestCase):
    def test_pvs(self):
        """Verify that it's possible to gather info about PVs"""

        pvs = BlockDev.lvm_pvs()
        orig_len = len(pvs)

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        pvs = BlockDev.lvm_pvs()
        self.assertTrue(len(pvs) > orig_len)
        self.assertTrue(any(info.pv_name == self.loop_dev for info in pvs))

        info = BlockDev.lvm_pvinfo(self.loop_dev)
        self.assertTrue(info)

        self.assertTrue(any(info.pv_uuid == all_info.pv_uuid for all_info in pvs))

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmPVVGTestCase(LvmPVonlyTestCase):
    def _clean_up(self):
        try:
            BlockDev.lvm_vgremove("testVG", None)
        except:
            pass

        LvmPVonlyTestCase._clean_up(self)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestVGcreateRemove(LvmPVVGTestCase):
    @tag_test(TestTags.CORE)
    def test_vgcreate_vgremove(self):
        """Verify that it is possible to create and destroy a VG"""

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvcreate(self.loop_dev2, 0, 0, None)
        self.assertTrue(succ)

        with self.assertRaises(GLib.GError):
            BlockDev.lvm_vgcreate("testVG", ["/non/existing/device"], 0, None)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev, self.loop_dev2], 0, None)
        self.assertTrue(succ)

        # VG already exists
        with self.assertRaises(GLib.GError):
            BlockDev.lvm_vgcreate("testVG", [self.loop_dev, self.loop_dev2], 0, None)

        succ = BlockDev.lvm_vgremove("testVG", None)
        self.assertTrue(succ)

        # no longer exists
        with self.assertRaises(GLib.GError):
            BlockDev.lvm_vgremove("testVG", None)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestVGrename(LvmPVVGTestCase):
    def test_vgrename(self):
        """Verify that it is possible to rename a VG"""

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvcreate(self.loop_dev2, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev, self.loop_dev2], 0, None)
        self.assertTrue(succ)

        # try rename
        succ = BlockDev.lvm_vgrename("testVG", "testVG_new", None)
        self.assertTrue(succ)

        # rename back
        succ = BlockDev.lvm_vgrename("testVG_new", "testVG", None)
        self.assertTrue(succ)

        # (hopefully) non-existing VG
        with self.assertRaises(GLib.GError):
            BlockDev.lvm_vgrename("testVG_new", "testVG", None)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestVGactivateDeactivate(LvmPVVGTestCase):
    def test_vgactivate_vgdeactivate(self):
        """Verify that it is possible to (de)activate a VG"""

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvcreate(self.loop_dev2, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev, self.loop_dev2], 0, None)
        self.assertTrue(succ)

        with self.assertRaises(GLib.GError):
            BlockDev.lvm_vgactivate("nonexistingVG", None)

        succ = BlockDev.lvm_vgactivate("testVG", None)
        self.assertTrue(succ)

        with self.assertRaises(GLib.GError):
            BlockDev.lvm_vgdeactivate("nonexistingVG", None)

        succ = BlockDev.lvm_vgdeactivate("testVG", None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgactivate("testVG", None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgdeactivate("testVG", None)
        self.assertTrue(succ)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestVGextendReduce(LvmPVVGTestCase):
    def test_vgextend_vgreduce(self):
        """Verify that it is possible to extend/reduce a VG"""

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvcreate(self.loop_dev2, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev], 0, None)
        self.assertTrue(succ)

        with self.assertRaises(GLib.GError):
            BlockDev.lvm_vgextend("nonexistingVG", self.loop_dev2, None)

        with self.assertRaises(GLib.GError):
            BlockDev.lvm_vgextend("testVG", "/non/existing/device", None)

        succ = BlockDev.lvm_vgextend("testVG", self.loop_dev2, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgreduce("testVG", self.loop_dev, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgextend("testVG", self.loop_dev, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgreduce("testVG", self.loop_dev2, None)
        self.assertTrue(succ)

        # try to remove missing PVs (there are none)
        succ = BlockDev.lvm_vgreduce("testVG", None, None)
        self.assertTrue(succ)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestVGinfo(LvmPVVGTestCase):
    def test_vginfo(self):
        """Verify that it is possible to gather info about a VG"""

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvcreate(self.loop_dev2, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev, self.loop_dev2], 0, None)
        self.assertTrue(succ)

        info = BlockDev.lvm_vginfo("testVG")
        self.assertTrue(info)
        self.assertEqual(info.name, "testVG")
        self.assertTrue(info.uuid)
        self.assertEqual(info.pv_count, 2)
        self.assertTrue(info.size < 2 * 1024**3)
        self.assertEqual(info.free, info.size)
        self.assertEqual(info.extent_size, 4 * 1024**2)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestVGs(LvmPVVGTestCase):
    def test_vgs(self):
        """Verify that it's possible to gather info about VGs"""

        vgs = BlockDev.lvm_vgs()
        orig_len = len(vgs)

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev], 0, None)
        self.assertTrue(succ)

        vgs = BlockDev.lvm_vgs()
        self.assertTrue(len(vgs) > orig_len)
        self.assertTrue(any(info.name == "testVG" for info in vgs))

        info = BlockDev.lvm_vginfo("testVG")
        self.assertTrue(info)

        self.assertTrue(any(info.uuid == all_info.uuid for all_info in vgs))

        with self.assertRaises(GLib.GError):
            BlockDev.lvm_vgremove("nonexistingVG", None)

        succ = BlockDev.lvm_vgremove("testVG", None)
        self.assertTrue(succ)

        # already removed
        with self.assertRaises(GLib.GError):
            BlockDev.lvm_vgremove("testVG", None)

        succ = BlockDev.lvm_pvremove(self.loop_dev, None)
        self.assertTrue(succ)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmPVVGLVTestCase(LvmPVVGTestCase):
    def _clean_up(self):
        try:
            BlockDev.lvm_lvremove("testVG", "testLV", True, None)
        except:
            pass

        LvmPVVGTestCase._clean_up(self)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestLVcreateRemove(LvmPVVGLVTestCase):
    @tag_test(TestTags.CORE)
    def test_lvcreate_lvremove(self):
        """Verify that it's possible to create/destroy an LV"""

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvcreate(self.loop_dev2, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev, self.loop_dev2], 0, None)
        self.assertTrue(succ)

        with self.assertRaises(GLib.GError):
            BlockDev.lvm_lvcreate("nonexistingVG", "testLV", 512 * 1024**2, None, [self.loop_dev], None)

        with self.assertRaises(GLib.GError):
            BlockDev.lvm_lvcreate("testVG", "testLV", 512 * 1024**2, None, ["/non/existing/device"], None)

        succ = BlockDev.lvm_lvcreate("testVG", "testLV", 512 * 1024**2, None, [self.loop_dev], None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_lvremove("testVG", "testLV", True, None)
        self.assertTrue(succ)

        # not enough space (only one PV)
        with six.assertRaisesRegex(self, GLib.GError, "Insufficient free space"):
            succ = BlockDev.lvm_lvcreate("testVG", "testLV", 1048 * 1024**2, None, [self.loop_dev], None)

        # enough space (two PVs)
        succ = BlockDev.lvm_lvcreate("testVG", "testLV", 1048 * 1024**2, None, [self.loop_dev, self.loop_dev2], None)
        self.assertTrue(succ)

        with self.assertRaises(GLib.GError):
            BlockDev.lvm_lvremove("nonexistingVG", "testLV", True, None)

        with self.assertRaises(GLib.GError):
            BlockDev.lvm_lvremove("testVG", "nonexistingLV", True, None)

        with self.assertRaises(GLib.GError):
            BlockDev.lvm_lvremove("nonexistingVG", "nonexistingLV", True, None)

        succ = BlockDev.lvm_lvremove("testVG", "testLV", True, None)
        self.assertTrue(succ)

        # already removed
        with self.assertRaises(GLib.GError):
            BlockDev.lvm_lvremove("testVG", "testLV", True, None)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestLVRemoveExtraArgs(LvmPVVGLVTestCase):
    def test_lvremove_extra_args(self):
        """Verify that specifying extra arguments for lvremove works as expected"""

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev], 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_lvcreate("testVG", "testLV", 512 * 1024**2, None, [self.loop_dev], None)
        self.assertTrue(succ)

        # try multiple options together with --test, the LV should not be removed
        succ = BlockDev.lvm_lvremove("testVG", "testLV", False, [BlockDev.ExtraArg.new("--test", "")])
        self.assertTrue(succ)

        info = BlockDev.lvm_lvinfo("testVG", "testLV")
        self.assertTrue(info)
        self.assertEqual(info.lv_name, "testLV")

        succ = BlockDev.lvm_lvremove("testVG", "testLV", True, [BlockDev.ExtraArg.new("--test", "")])
        self.assertTrue(succ)

        info = BlockDev.lvm_lvinfo("testVG", "testLV")
        self.assertTrue(info)
        self.assertEqual(info.lv_name, "testLV")

        # try to remove without --force
        succ = BlockDev.lvm_lvremove("testVG", "testLV", False, None)
        self.assertTrue(succ)

        # already removed
        with self.assertRaises(GLib.GError):
            BlockDev.lvm_lvremove("testVG", "testLV", True, None)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestLVcreateWithExtra(LvmPVVGLVTestCase):
    def __init__(self, *args, **kwargs):
        LvmPVVGLVTestCase.__init__(self, *args, **kwargs)
        self.log = ""
        self.ignore_log = True

    def my_log_func(self, level, msg):
        if self.ignore_log:
            return
        # not much to verify here
        self.assertTrue(isinstance(level, int))
        self.assertTrue(isinstance(msg, str))

        self.log += msg + "\n"

    def test_lvcreate_with_extra(self):
        """Verify that it's possible to create an LV with extra arguments"""

        self.ignore_log = True
        self.assertTrue(BlockDev.reinit([self.ps, self.ps2], False, self.my_log_func))

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvcreate(self.loop_dev2, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev, self.loop_dev2], 0, None)
        self.assertTrue(succ)

        with self.assertRaises(GLib.GError):
            BlockDev.lvm_lvcreate("nonexistingVG", "testLV", 512 * 1024**2, None, [self.loop_dev], None)

        with self.assertRaises(GLib.GError):
            BlockDev.lvm_lvcreate("testVG", "testLV", 512 * 1024**2, None, ["/non/existing/device"], None)

        self.ignore_log = False
        ea = BlockDev.ExtraArg.new("-Z", "y")
        succ = BlockDev.lvm_lvcreate("testVG", "testLV", 512 * 1024**2, None, [self.loop_dev], [ea])
        self.assertTrue(succ)
        match = re.search(r"'-Z': <'y'>", self.log)
        self.assertIsNot(match, None)

        self.assertTrue(BlockDev.reinit([self.ps, self.ps2], False, None))

        succ = BlockDev.lvm_lvremove("testVG", "testLV", True, None)
        self.assertTrue(succ)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestLVcreateType(LvmPVVGLVTestCase):
    def test_lvcreate_type(self):
        """Verify it's possible to create LVs with various types"""

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvcreate(self.loop_dev2, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev, self.loop_dev2], 0, None)
        self.assertTrue(succ)

        # try to create a striped LV
        succ = BlockDev.lvm_lvcreate("testVG", "testLV", 512 * 1024**2, "striped", [self.loop_dev, self.loop_dev2], None)
        self.assertTrue(succ)

        # verify that the LV has the requested segtype
        info = BlockDev.lvm_lvinfo("testVG", "testLV")
        self.assertEqual(info.segtype, "striped")

        succ = BlockDev.lvm_lvremove("testVG", "testLV", True, None)
        self.assertTrue(succ)

        # try to create a mirrored LV
        succ = BlockDev.lvm_lvcreate("testVG", "testLV", 512 * 1024**2, "mirror", [self.loop_dev, self.loop_dev2], None)
        self.assertTrue(succ)

        # verify that the LV has the requested segtype
        info = BlockDev.lvm_lvinfo("testVG", "testLV")
        self.assertEqual(info.segtype, "mirror")

        succ = BlockDev.lvm_lvremove("testVG", "testLV", True, None)
        self.assertTrue(succ)

        # try to create a raid1 LV
        succ = BlockDev.lvm_lvcreate("testVG", "testLV", 512 * 1024**2, "raid1", [self.loop_dev, self.loop_dev2], None)
        self.assertTrue(succ)

        # verify that the LV has the requested segtype
        info = BlockDev.lvm_lvinfo("testVG", "testLV")
        self.assertEqual(info.segtype, "raid1")

        succ = BlockDev.lvm_lvremove("testVG", "testLV", True, None)
        self.assertTrue(succ)


@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestLVactivateDeactivate(LvmPVVGLVTestCase):
    def test_lvactivate_lvdeactivate(self):
        """Verify it's possible to (de)actiavate an LV"""

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvcreate(self.loop_dev2, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev, self.loop_dev2], 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_lvcreate("testVG", "testLV", 512 * 1024**2, None, [self.loop_dev], None)
        self.assertTrue(succ)

        with self.assertRaises(GLib.GError):
            BlockDev.lvm_lvactivate("nonexistingVG", "testLV", True, None)

        with self.assertRaises(GLib.GError):
            BlockDev.lvm_lvactivate("testVG", "nonexistingLV", True, None)

        with self.assertRaises(GLib.GError):
            BlockDev.lvm_lvactivate("nonexistingVG", "nonexistingLV", True, None)

        succ = BlockDev.lvm_lvactivate("testVG", "testLV", True, None)
        self.assertTrue(succ)

        with self.assertRaises(GLib.GError):
            BlockDev.lvm_lvdeactivate("nonexistingVG", "testLV", None)

        with self.assertRaises(GLib.GError):
            BlockDev.lvm_lvdeactivate("testVG", "nonexistingLV", None)

        with self.assertRaises(GLib.GError):
            BlockDev.lvm_lvdeactivate("nonexistingVG", "nonexistingLV", None)

        succ = BlockDev.lvm_lvdeactivate("testVG", "testLV", None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_lvactivate("testVG", "testLV", True, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_lvdeactivate("testVG", "testLV", None)
        self.assertTrue(succ)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestLVresize(LvmPVVGLVTestCase):
    def test_lvresize(self):
        """Verify that it's possible to resize an LV"""

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvcreate(self.loop_dev2, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev, self.loop_dev2], 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_lvcreate("testVG", "testLV", 512 * 1024**2, None, [self.loop_dev], None)
        self.assertTrue(succ)

        with self.assertRaises(GLib.GError):
            BlockDev.lvm_lvresize("nonexistingVG", "testLV", 768 * 1024**2, None)

        with self.assertRaises(GLib.GError):
            BlockDev.lvm_lvresize("testVG", "nonexistingLV", 768 * 1024**2, None)

        # grow
        succ = BlockDev.lvm_lvresize("testVG", "testLV", 768 * 1024**2, None)
        self.assertTrue(succ)

        # same size
        with self.assertRaises(GLib.GError):
            BlockDev.lvm_lvresize("testVG", "testLV", 768 * 1024**2, None)

        # shrink
        succ = BlockDev.lvm_lvresize("testVG", "testLV", 512 * 1024**2, None)
        self.assertTrue(succ)

        # shrink, not a multiple of 512
        succ = BlockDev.lvm_lvresize("testVG", "testLV", 500 * 1024**2, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_lvdeactivate("testVG", "testLV", None)
        self.assertTrue(succ)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestLVrename(LvmPVVGLVTestCase):
    def test_lvrename(self):
        """Verify that it's possible to rename an LV"""

        with self.assertRaises(GLib.GError):
            BlockDev.lvm_lvrename("nonexistingVG", "testLV", "newTestLV", None)

        with self.assertRaises(GLib.GError):
            BlockDev.lvm_lvrename("testVG", "nonexistingLV", "newTestLV", None)

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvcreate(self.loop_dev2, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev, self.loop_dev2], 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_lvcreate("testVG", "testLV", 512 * 1024**2, None, [self.loop_dev], None)
        self.assertTrue(succ)

        # rename
        succ = BlockDev.lvm_lvrename("testVG", "testLV", "newTestLV", None)
        self.assertTrue(succ)

        # and back
        succ = BlockDev.lvm_lvrename("testVG", "newTestLV", "testLV", None)
        self.assertTrue(succ)

        # needs a change
        with self.assertRaises(GLib.GError):
            BlockDev.lvm_lvrename("testVG", "testLV", "testLV", None)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestLVsnapshots(LvmPVVGLVTestCase):
    @tag_test(TestTags.SLOW)
    def test_snapshotcreate_lvorigin_snapshotmerge(self):
        """Verify that LV snapshot support works"""

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvcreate(self.loop_dev2, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev, self.loop_dev2], 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_lvcreate("testVG", "testLV", 512 * 1024**2, None, [self.loop_dev], None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_lvsnapshotcreate("testVG", "testLV", "testLV_bak", 256 * 1024**2, None)
        self.assertTrue(succ)

        origin_name = BlockDev.lvm_lvorigin("testVG", "testLV_bak")
        lvi = BlockDev.lvm_lvinfo("testVG", "testLV_bak")
        self.assertEqual(origin_name, "testLV")
        self.assertEqual(lvi.origin, "testLV")
        self.assertIn("snapshot", lvi.roles.split(","))

        succ = BlockDev.lvm_lvsnapshotmerge("testVG", "testLV_bak", None)
        self.assertTrue(succ)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestLVinfo(LvmPVVGLVTestCase):
    def test_lvinfo(self):
        """Verify that it is possible to gather info about an LV"""

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvcreate(self.loop_dev2, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev, self.loop_dev2], 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_lvcreate("testVG", "testLV", 512 * 1024**2, None, [self.loop_dev], None)
        self.assertTrue(succ)

        info = BlockDev.lvm_lvinfo("testVG", "testLV")
        self.assertTrue(info)
        self.assertEqual(info.lv_name, "testLV")
        self.assertEqual(info.vg_name, "testVG")
        self.assertTrue(info.uuid)
        self.assertEqual(info.size, 512 * 1024**2)
        self.assertIn("public", info.roles.split(","))

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestLVs(LvmPVVGLVTestCase):
    def test_lvs(self):
        """Verify that it's possible to gather info about LVs"""

        lvs = BlockDev.lvm_lvs(None)
        orig_len = len(lvs)

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev], 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_lvcreate("testVG", "testLV", 512 * 1024**2, None, [self.loop_dev], None)
        self.assertTrue(succ)

        lvs = BlockDev.lvm_lvs(None)
        self.assertTrue(len(lvs) > orig_len)
        self.assertTrue(any(info.lv_name == "testLV" and info.vg_name == "testVG" for info in lvs))

        info = BlockDev.lvm_lvinfo("testVG", "testLV")
        self.assertTrue(info)

        self.assertTrue(any(info.uuid == all_info.uuid for all_info in lvs))

        lvs = BlockDev.lvm_lvs("testVG")
        self.assertEqual(len(lvs), 1)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmPVVGthpoolTestCase(LvmPVVGTestCase):
    def _clean_up(self):
        try:
            BlockDev.lvm_lvremove("testVG", "testPool", True, None)
        except:
            pass

        LvmPVVGTestCase._clean_up(self)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestLVsAll(LvmPVVGthpoolTestCase):
    def test_lvs_all(self):
        """Verify that info is gathered for all LVs"""

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvcreate(self.loop_dev2, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev, self.loop_dev2], 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_thpoolcreate("testVG", "testPool", 512 * 1024**2, 4 * 1024**2, 512 * 1024, "thin-performance", None)
        self.assertTrue(succ)

        # there should be at least 3 LVs -- testPool, [testPool_tdata], [testPool_tmeta] (plus probably some spare LVs)
        lvs = BlockDev.lvm_lvs("testVG")
        self.assertGreater(len(lvs), 3)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestThpoolCreate(LvmPVVGthpoolTestCase):
    @tag_test(TestTags.CORE)
    def test_thpoolcreate(self):
        """Verify that it is possible to create a thin pool"""

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvcreate(self.loop_dev2, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev, self.loop_dev2], 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_thpoolcreate("testVG", "testPool", 512 * 1024**2, 4 * 1024**2, 512 * 1024, "thin-performance", None)
        self.assertTrue(succ)

        info = BlockDev.lvm_lvinfo("testVG", "testPool")
        self.assertIn("t", info.attr)
        self.assertIn("private", info.roles.split(","))

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestThpoolConvert(LvmPVVGthpoolTestCase):
    def test_thpool_convert(self):
        """Verify that it is possible to create a thin pool by conversion"""

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvcreate(self.loop_dev2, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev, self.loop_dev2], 0, None)
        self.assertTrue(succ)

        # the name of the data LV is used for the pool
        succ = BlockDev.lvm_lvcreate("testVG", "dataLV", 512 * 1024**2, None, [self.loop_dev], None)
        self.assertTrue(succ)
        succ = BlockDev.lvm_lvcreate("testVG", "metadataLV", 50 * 1024**2, None, [self.loop_dev2], None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_thpool_convert("testVG", "dataLV", "metadataLV", "testPool", None)
        self.assertTrue(succ)

        info = BlockDev.lvm_lvinfo("testVG", "testPool")
        self.assertIn("t", info.attr)


@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestDataMetadataLV(LvmPVVGthpoolTestCase):
    def test_data_metadata_lv_name(self):
        """Verify that it is possible to get name of the data/metadata LV"""

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvcreate(self.loop_dev2, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev, self.loop_dev2], 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_thpoolcreate("testVG", "testPool", 512 * 1024**2, 4 * 1024**2, 512 * 1024, "thin-performance", None)
        self.assertTrue(succ)

        name = BlockDev.lvm_data_lv_name("testVG", "testPool")
        lvi = BlockDev.lvm_lvinfo("testVG", "testPool")
        self.assertTrue(name)
        self.assertTrue(name.startswith("testPool"))
        self.assertIn("_tdata", name)
        self.assertEqual(name, lvi.data_lv)

        info = BlockDev.lvm_lvinfo("testVG", name)
        self.assertTrue(info.attr.startswith("T"))
        self.assertIn("private", info.roles.split(","))
        self.assertIn("data", info.roles.split(","))

        name = BlockDev.lvm_metadata_lv_name("testVG", "testPool")
        lvi = BlockDev.lvm_lvinfo("testVG", "testPool")
        self.assertTrue(name)
        self.assertTrue(name.startswith("testPool"))
        self.assertIn("_tmeta", name)
        self.assertEqual(name, lvi.metadata_lv)

        info = BlockDev.lvm_lvinfo("testVG", name)
        self.assertTrue(info.attr.startswith("e"))
        self.assertIn("private", info.roles.split(","))
        self.assertIn("metadata", info.roles.split(","))

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmPVVGLVthLVTestCase(LvmPVVGthpoolTestCase):
    def _clean_up(self):
        try:
            BlockDev.lvm_lvremove("testVG", "testThLV", True, None)
        except:
            pass

        LvmPVVGthpoolTestCase._clean_up(self)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestThLVcreate(LvmPVVGLVthLVTestCase):
    @tag_test(TestTags.CORE)
    def test_thlvcreate_thpoolname(self):
        """Verify that it is possible to create a thin LV and get its pool name"""

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvcreate(self.loop_dev2, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev, self.loop_dev2], 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_thpoolcreate("testVG", "testPool", 512 * 1024**2, 4 * 1024**2, 512 * 1024, None, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_thlvcreate("testVG", "testPool", "testThLV", 1024**3, None)
        self.assertTrue(succ)

        info = BlockDev.lvm_lvinfo("testVG", "testPool")
        self.assertIn("t", info.attr)

        info = BlockDev.lvm_lvinfo("testVG", "testThLV")
        self.assertIn("V", info.attr)

        pool = BlockDev.lvm_thlvpoolname("testVG", "testThLV")
        lvi = BlockDev.lvm_lvinfo("testVG", "testThLV")
        self.assertEqual(pool, "testPool")
        self.assertEqual(lvi.pool_lv, "testPool")

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmPVVGLVthLVsnapshotTestCase(LvmPVVGLVthLVTestCase):
    def _clean_up(self):
        try:
            BlockDev.lvm_lvremove("testVG", "testThLV_bak", True, None)
        except:
            pass

        LvmPVVGLVthLVTestCase._clean_up(self)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestThSnapshotCreate(LvmPVVGLVthLVsnapshotTestCase):
    def test_thsnapshotcreate(self):
        """Verify that it is possible to create a thin LV snapshot"""

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvcreate(self.loop_dev2, 0, 0, None)

        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev, self.loop_dev2], 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_thpoolcreate("testVG", "testPool", 512 * 1024**2, 4 * 1024**2, 512 * 1024, None, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_thlvcreate("testVG", "testPool", "testThLV", 1024**3, None)
        self.assertTrue(succ)

        info = BlockDev.lvm_lvinfo("testVG", "testPool")
        self.assertIn("t", info.attr)

        info = BlockDev.lvm_lvinfo("testVG", "testThLV")
        self.assertIn("V", info.attr)

        succ = BlockDev.lvm_thsnapshotcreate("testVG", "testThLV", "testThLV_bak", "testPool", None)
        self.assertTrue(succ)

        info = BlockDev.lvm_lvinfo("testVG", "testThLV_bak")
        self.assertIn("V", info.attr)
        self.assertIn("snapshot", info.roles.split(","))
        self.assertIn("thinsnapshot", info.roles.split(","))

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmPVVGLVcachePoolTestCase(LvmPVVGLVTestCase):
    def _clean_up(self):
        try:
            BlockDev.lvm_lvremove("testVG", "testCache", True, None)
        except:
            pass

        # lets help udev with removing stale symlinks
        if not BlockDev.lvm_lvs("testVG") and os.path.exists("/dev/testVG/testCache_meta"):
            shutil.rmtree("/dev/testVG", ignore_errors=True)

        LvmPVVGLVTestCase._clean_up(self)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmPVVGLVcachePoolCreateRemoveTestCase(LvmPVVGLVcachePoolTestCase):
    @tag_test(TestTags.SLOW)
    def test_cache_pool_create_remove(self):
        """Verify that is it possible to create and remove a cache pool"""

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvcreate(self.loop_dev2, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev, self.loop_dev2], 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_cache_create_pool("testVG", "testCache", 512 * 1024**2, 0, BlockDev.LVMCacheMode.WRITETHROUGH, 0, [self.loop_dev])
        self.assertTrue(succ)

        succ = BlockDev.lvm_lvremove("testVG", "testCache", True, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_cache_create_pool("testVG", "testCache", 512 * 1024**2, 0, BlockDev.LVMCacheMode.WRITEBACK,
                                              BlockDev.LVMCachePoolFlags.STRIPED|BlockDev.LVMCachePoolFlags.META_RAID1,
                                              [self.loop_dev, self.loop_dev2])
        self.assertTrue(succ)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestCachePoolConvert(LvmPVVGLVcachePoolTestCase):
    @tag_test(TestTags.SLOW)
    def test_cache_pool_convert(self):
        """Verify that it is possible to create a cache pool by conversion"""

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvcreate(self.loop_dev2, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev, self.loop_dev2], 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_lvcreate("testVG", "dataLV", 512 * 1024**2, None, [self.loop_dev], None)
        self.assertTrue(succ)
        succ = BlockDev.lvm_lvcreate("testVG", "metadataLV", 50 * 1024**2, None, [self.loop_dev2], None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_cache_pool_convert("testVG", "dataLV", "metadataLV", "testCache", None)
        self.assertTrue(succ)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmPVVGLVcachePoolAttachDetachTestCase(LvmPVVGLVcachePoolTestCase):
    @tag_test(TestTags.SLOW)
    def test_cache_pool_attach_detach(self):
        """Verify that is it possible to attach and detach a cache pool"""

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvcreate(self.loop_dev2, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev, self.loop_dev2], 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_cache_create_pool("testVG", "testCache", 512 * 1024**2, 0, BlockDev.LVMCacheMode.WRITETHROUGH, 0, [self.loop_dev2])
        self.assertTrue(succ)

        succ = BlockDev.lvm_lvcreate("testVG", "testLV", 512 * 1024**2, None, [self.loop_dev], None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_cache_attach("testVG", "testLV", "testCache", None)
        self.assertTrue(succ)

        # detach and destroy (the last arg)
        succ = BlockDev.lvm_cache_detach("testVG", "testLV", True, None)
        self.assertTrue(succ)

        # once more and do not destroy this time
        succ = BlockDev.lvm_cache_create_pool("testVG", "testCache", 512 * 1024**2, 0, BlockDev.LVMCacheMode.WRITETHROUGH, 0, [self.loop_dev2])
        self.assertTrue(succ)

        succ = BlockDev.lvm_cache_attach("testVG", "testLV", "testCache", None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_cache_detach("testVG", "testLV", False, None)
        self.assertTrue(succ)

        lvs = BlockDev.lvm_lvs("testVG")
        self.assertTrue(any(info.lv_name == "testCache" for info in lvs))

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmPVVGcachedLVTestCase(LvmPVVGLVTestCase):
    @tag_test(TestTags.SLOW)
    def test_create_cached_lv(self):
        """Verify that it is possible to create a cached LV in a single step"""

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvcreate(self.loop_dev2, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev, self.loop_dev2], 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_cache_create_cached_lv("testVG", "testLV", 512 * 1024**2, 256 * 1024**2, 10 * 1024**2,
                                                   BlockDev.LVMCacheMode.WRITEBACK, 0,
                                                   [self.loop_dev], [self.loop_dev2])
        self.assertTrue(succ)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmPVVGcachedLVpoolTestCase(LvmPVVGLVTestCase):
    @tag_test(TestTags.SLOW)
    def test_cache_get_pool_name(self):
        """Verify that it is possible to get the name of the cache pool"""

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvcreate(self.loop_dev2, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev, self.loop_dev2], 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_cache_create_pool("testVG", "testCache", 512 * 1024**2, 0, BlockDev.LVMCacheMode.WRITETHROUGH, 0, [self.loop_dev2])
        self.assertTrue(succ)

        succ = BlockDev.lvm_lvcreate("testVG", "testLV", 512 * 1024**2, None, [self.loop_dev], None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_cache_attach("testVG", "testLV", "testCache", None)
        self.assertTrue(succ)

        lvm_version = self._get_lvm_version()
        if lvm_version < LooseVersion("2.03.06"):
            cpool_name = "testCache"
        else:
            # since 2.03.06 LVM adds _cpool suffix to the cache pool after attaching it
            cpool_name = "testCache_cpool"

        self.assertEqual(BlockDev.lvm_cache_pool_name("testVG", "testLV"), cpool_name)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmPVVGcachedLVstatsTestCase(LvmPVVGLVTestCase):
    @tag_test(TestTags.SLOW)
    def test_cache_get_stats(self):
        """Verify that it is possible to get stats for a cached LV"""

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvcreate(self.loop_dev2, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev, self.loop_dev2], 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_cache_create_pool("testVG", "testCache", 512 * 1024**2, 0, BlockDev.LVMCacheMode.WRITETHROUGH, 0, [self.loop_dev2])
        self.assertTrue(succ)

        succ = BlockDev.lvm_lvcreate("testVG", "testLV", 512 * 1024**2, None, [self.loop_dev], None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_cache_attach("testVG", "testLV", "testCache", None)
        self.assertTrue(succ)

        stats = BlockDev.lvm_cache_stats("testVG", "testLV")
        self.assertTrue(stats)
        self.assertEqual(stats.cache_size, 512 * 1024**2)
        self.assertEqual(stats.md_size, 8 * 1024**2)
        self.assertEqual(stats.mode, BlockDev.LVMCacheMode.WRITETHROUGH)

class LvmPVVGcachedThpoolstatsTestCase(LvmPVVGLVTestCase):
    @tag_test(TestTags.SLOW)
    def test_cache_get_stats(self):
        """Verify that it is possible to get stats for a cached thinpool"""

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvcreate(self.loop_dev2, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev, self.loop_dev2], 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_cache_create_pool("testVG", "testCache", 512 * 1024**2, 0, BlockDev.LVMCacheMode.WRITETHROUGH, 0, [self.loop_dev2])
        self.assertTrue(succ)

        succ = BlockDev.lvm_thpoolcreate("testVG", "testPool", 512 * 1024**2, 4 * 1024**2, 512 * 1024, "thin-performance", None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_cache_attach("testVG", "testPool", "testCache", None)
        self.assertTrue(succ)

        # just ask for the pool itself even if it's not technically cached
        stats = BlockDev.lvm_cache_stats("testVG", "testPool")
        self.assertTrue(stats)
        self.assertEqual(stats.cache_size, 512 * 1024**2)
        self.assertEqual(stats.md_size, 8 * 1024**2)
        self.assertEqual(stats.mode, BlockDev.LVMCacheMode.WRITETHROUGH)

        # same should work when explicitly asking for the data LV
        stats = BlockDev.lvm_cache_stats("testVG", "testPool_tdata")
        self.assertTrue(stats)
        self.assertEqual(stats.cache_size, 512 * 1024**2)
        self.assertEqual(stats.md_size, 8 * 1024**2)
        self.assertEqual(stats.mode, BlockDev.LVMCacheMode.WRITETHROUGH)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LVMTechTest(LVMTestCase):

    def setUp(self):
        # set init checks to false -- we want runtime checks for this
        BlockDev.switch_init_checks(False)

        # set everything back and reinit just to be sure
        self.addCleanup(BlockDev.switch_init_checks, True)
        self.addCleanup(BlockDev.reinit, [self.ps, self.ps2], True, None)

    @tag_test(TestTags.NOSTORAGE)
    def test_tech_available(self):
        """Verify that checking lvm dbus availability by technology works as expected"""

        # stop the lvmdbusd service
        _ret, _out, _err = run_command("systemctl stop lvm2-lvmdbusd")

        # reinit libblockdev -- init checks are switched off so nothing should start the service
        self.assertTrue(BlockDev.reinit([self.ps, self.ps2], True, None))
        ret, _out, _err = run_command("systemctl status lvm2-lvmdbusd")
        self.assertNotEqual(ret, 0)

        # check tech availability -- service should be started
        succ = BlockDev.lvm_is_tech_avail(BlockDev.LVMTech.BASIC, BlockDev.LVMTechMode.CREATE)
        self.assertTrue(succ)

        ret, _out, _err = run_command("systemctl status lvm2-lvmdbusd")
        self.assertEqual(ret, 0)

        # only query is supported with calcs
        with six.assertRaisesRegex(self, GLib.GError, "Only 'query' supported for thin calculations"):
            BlockDev.lvm_is_tech_avail(BlockDev.LVMTech.THIN_CALCS, BlockDev.LVMTechMode.CREATE)

@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LvmTestPVremoveConfig(LvmPVonlyTestCase):
    def test_pvremove_with_config(self):
        """Verify that we correctly pass extra arguments when calling PvRemove"""

        # we add some extra arguments to PvRemove (like '-ff') and we want
        # to be sure that adding these works together with '--config'

        BlockDev.lvm_set_global_config("backup {backup=0 archive=0}")
        self.addCleanup(BlockDev.lvm_set_global_config, None)

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvcreate(self.loop_dev2, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVG", [self.loop_dev, self.loop_dev2], 0, None)
        self.assertTrue(succ)

        # we are removing pv that is part of vg -- '-ff' option must be included
        succ = BlockDev.lvm_pvremove(self.loop_dev, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_pvremove(self.loop_dev2, None)
        self.assertTrue(succ)


@unittest.skipUnless(lvm_dbus_running, "LVM DBus not running")
class LVMVDOTest(LVMTestCase):

    loop_size = 8 * 1024**3

    @classmethod
    def setUpClass(cls):
        if not BlockDev.utils_have_kernel_module("kvdo"):
            raise unittest.SkipTest("VDO kernel module not available, skipping.")

        try:
            BlockDev.utils_load_kernel_module("kvdo")
        except GLib.GError as e:
            if "File exists" not in e.message:
                raise unittest.SkipTest("cannot load VDO kernel module, skipping.")

        lvm_version = cls._get_lvm_version()
        if lvm_version < LooseVersion("2.3.07"):
            raise unittest.SkipTest("LVM version 2.3.07 or newer needed for LVM VDO.")

        super().setUpClass()

    def setUp(self):
        self.addCleanup(self._clean_up)
        self.dev_file = create_sparse_tempfile("vdo_test", self.loop_size)
        try:
            self.loop_dev = create_lio_device(self.dev_file)
        except RuntimeError as e:
            raise RuntimeError("Failed to setup loop device for testing: %s" % e)

        succ = BlockDev.lvm_pvcreate(self.loop_dev, 0, 0, None)
        self.assertTrue(succ)

        succ = BlockDev.lvm_vgcreate("testVDOVG", [self.loop_dev], 0, None)
        self.assertTrue(succ)

    def _clean_up(self):
        BlockDev.lvm_vgremove("testVDOVG")
        BlockDev.lvm_pvremove(self.loop_dev)

        try:
            BlockDev.lvm_lvremove("testVDOVG", "vdoPool", True, None)
        except:
            pass

        try:
            delete_lio_device(self.loop_dev)
        except RuntimeError:
            # just move on, we can do no better here
            pass
        os.unlink(self.dev_file)

    @tag_test(TestTags.SLOW, TestTags.CORE)
    def test_vdo_pool_create(self):
        succ = BlockDev.lvm_vdo_pool_create("testVDOVG", "vdoLV", "vdoPool", 7 * 1024**3, 35 * 1024**3)
        self.assertTrue(succ)

        lv_info = BlockDev.lvm_lvinfo("testVDOVG", "vdoLV")
        self.assertIsNotNone(lv_info)
        self.assertEqual(lv_info.segtype, "vdo")
        self.assertEqual(lv_info.pool_lv, "vdoPool")

        pool_info = BlockDev.lvm_lvinfo("testVDOVG", "vdoPool")
        self.assertEqual(pool_info.segtype, "vdo-pool")
        self.assertEqual(pool_info.data_lv, "vdoPool_vdata")
        self.assertGreater(pool_info.data_percent, 0)

        pool = BlockDev.lvm_vdolvpoolname("testVDOVG", "vdoLV")
        self.assertEqual(pool, lv_info.pool_lv)

        vdo_info = BlockDev.lvm_vdo_info("testVDOVG", "vdoPool")
        self.assertIsNotNone(vdo_info)
        self.assertEqual(vdo_info.operating_mode, BlockDev.LVMVDOOperatingMode.NORMAL)
        self.assertEqual(vdo_info.compression_state, BlockDev.LVMVDOCompressionState.ONLINE)
        self.assertTrue(vdo_info.compression)
        self.assertTrue(vdo_info.deduplication)
        self.assertGreater(vdo_info.index_memory_size, 0)
        self.assertGreater(vdo_info.used_size, 0)
        self.assertTrue(0 <= vdo_info.saving_percent <= 100)

        lvs = BlockDev.lvm_lvs("testVDOVG")
        self.assertIn("vdoPool", [l.lv_name for l in lvs])
        self.assertIn("vdoLV", [l.lv_name for l in lvs])

        mode_str = BlockDev.lvm_get_vdo_operating_mode_str(vdo_info.operating_mode)
        self.assertEqual(mode_str, "normal")

        state_str = BlockDev.lvm_get_vdo_compression_state_str(vdo_info.compression_state)
        self.assertEqual(state_str, "online")

        policy_str = BlockDev.lvm_get_vdo_write_policy_str(vdo_info.write_policy)
        self.assertIn(policy_str, ["sync", "async", "auto"])

        state_str = BlockDev.lvm_get_vdo_compression_state_str(vdo_info.compression_state)
        self.assertEqual(state_str, "online")

    @tag_test(TestTags.SLOW)
    def test_vdo_pool_create_options(self):
        # set index size to 300 MiB, disable compression and write policy to sync
        policy = BlockDev.lvm_get_vdo_write_policy_from_str("sync")
        succ = BlockDev.lvm_vdo_pool_create("testVDOVG", "vdoLV", "vdoPool", 7 * 1024**3, 35 * 1024**3,
                                            300 * 1024**2, False, True, policy)
        self.assertTrue(succ)

        vdo_info = BlockDev.lvm_vdo_info("testVDOVG", "vdoPool")
        self.assertIsNotNone(vdo_info)
        self.assertEqual(vdo_info.index_memory_size, 300 * 1024**2)
        self.assertFalse(vdo_info.compression)
        self.assertTrue(vdo_info.deduplication)
        self.assertEqual(BlockDev.lvm_get_vdo_write_policy_str(vdo_info.write_policy), "sync")

    @tag_test(TestTags.SLOW)
    def test_resize(self):
        succ = BlockDev.lvm_vdo_pool_create("testVDOVG", "vdoLV", "vdoPool", 5 * 1024**3, 10 * 1024**3)
        self.assertTrue(succ)

        # "physical" resize first (pool), shrinking not allowed
        with self.assertRaises(GLib.GError):
            BlockDev.lvm_vdo_pool_resize("testVDOVG", "vdoPool", 4 * 1024**3)

        succ = BlockDev.lvm_vdo_pool_resize("testVDOVG", "vdoPool", 7 * 1024**3)
        self.assertTrue(succ)
        lv_info = BlockDev.lvm_lvinfo("testVDOVG", "vdoPool")
        self.assertEqual(lv_info.size, 7 * 1024**3)

        # "logical" resize (LV)
        succ = BlockDev.lvm_vdo_resize("testVDOVG", "vdoLV", 35 * 1024**3)
        self.assertTrue(succ)
        lv_info = BlockDev.lvm_lvinfo("testVDOVG", "vdoLV")
        self.assertEqual(lv_info.size, 35 * 1024**3)

    @tag_test(TestTags.SLOW)
    def test_enabla_disable_compression(self):
        self.skipTest("Enabling/disabling compression on LVM VDO not implemented in LVM DBus API.")

    @tag_test(TestTags.SLOW)
    def test_enable_disable_deduplication(self):
        self.skipTest("Enabling/disabling deduplication on LVM VDO not implemented in LVM DBus API.")

    @tag_test(TestTags.SLOW)
    def test_vdo_pool_convert(self):
        self.skipTest("LVM VDO pool convert not implemented in LVM DBus API.")

    @tag_test(TestTags.SLOW)
    def test_stats(self):
        succ = BlockDev.lvm_vdo_pool_create("testVDOVG", "vdoLV", "vdoPool", 7 * 1024**3, 35 * 1024**3)
        self.assertTrue(succ)

        vdo_info = BlockDev.lvm_vdo_info("testVDOVG", "vdoPool")
        self.assertIsNotNone(vdo_info)
        self.assertTrue(vdo_info.deduplication)

        vdo_stats = BlockDev.lvm_vdo_get_stats("testVDOVG", "vdoPool")

        # just sanity check
        self.assertNotEqual(vdo_stats.saving_percent, -1)
        self.assertNotEqual(vdo_stats.used_percent, -1)
        self.assertNotEqual(vdo_stats.block_size, -1)
        self.assertNotEqual(vdo_stats.logical_block_size, -1)
        self.assertNotEqual(vdo_stats.physical_blocks, -1)
        self.assertNotEqual(vdo_stats.data_blocks_used, -1)
        self.assertNotEqual(vdo_stats.overhead_blocks_used, -1)
        self.assertNotEqual(vdo_stats.logical_blocks_used, -1)
        self.assertNotEqual(vdo_stats.write_amplification_ratio, -1)

        full_stats = BlockDev.lvm_vdo_get_stats_full("testVDOVG", "vdoPool")
        self.assertIn("writeAmplificationRatio", full_stats.keys())