from __future__ import print_function
import os
import yaml
import unittest
import overrides_hack
import six
from utils import run_command, read_file, fake_path, create_sparse_tempfile, create_lio_device, delete_lio_device, TestTags, tag_test
from gi.repository import BlockDev, GLib
from bytesize import bytesize
from distutils.spawn import find_executable
class VDOTestCase(unittest.TestCase):
requested_plugins = BlockDev.plugin_specs_from_names(("vdo","part",))
loop_size = 8 * 1024**3
@classmethod
def setUpClass(cls):
if not BlockDev.utils_have_kernel_module("kvdo"):
raise unittest.SkipTest("VDO kernel module not available, skipping.")
try:
BlockDev.utils_load_kernel_module("kvdo")
except GLib.GError as e:
if "File exists" not in e.message:
raise unittest.SkipTest("cannot load VDO kernel module, skipping.")
if not find_executable("vdo"):
raise unittest.SkipTest("vdo executable not foundin $PATH, skipping.")
if not BlockDev.is_initialized():
BlockDev.init(cls.requested_plugins, None)
else:
BlockDev.reinit(cls.requested_plugins, True, None)
def setUp(self):
self.addCleanup(self._clean_up)
self.dev_file = create_sparse_tempfile("vdo_test", self.loop_size)
try:
self.loop_dev = create_lio_device(self.dev_file)
except RuntimeError as e:
raise RuntimeError("Failed to setup loop device for testing: %s" % e)
def _clean_up(self):
try:
delete_lio_device(self.loop_dev)
except RuntimeError:
# just move on, we can do no better here
pass
os.unlink(self.dev_file)
class VDOTest(VDOTestCase):
vdo_name = "bd-test-vdo"
def _remove_vdo(self, name):
run_command("vdo remove --force -n %s" % name)
@tag_test(TestTags.SLOW, TestTags.CORE)
def test_create_remove(self):
"""Verify that it is possible to create and remove a VDO volume"""
with self.assertRaises(GLib.GError):
BlockDev.vdo_create(self.vdo_name, '/non/existing')
with self.assertRaises(GLib.GError):
BlockDev.vdo_create(self.vdo_name, self.loop_dev, write_policy=BlockDev.VDOWritePolicy.UNKNOWN)
ret = BlockDev.vdo_create(self.vdo_name, self.loop_dev, 3 * self.loop_size, 0,
True, True, BlockDev.VDOWritePolicy.AUTO)
self.addCleanup(self._remove_vdo, self.vdo_name)
self.assertTrue(ret)
self.assertTrue(os.path.exists("/dev/mapper/%s" % self.vdo_name))
info = BlockDev.vdo_info(self.vdo_name)
self.assertEqual(info.name, self.vdo_name)
self.assertEqual(info.device, self.loop_dev)
self.assertTrue(info.deduplication)
self.assertTrue(info.compression)
self.assertEqual(info.logical_size, 3 * self.loop_size)
self.assertEqual(info.write_policy, BlockDev.VDOWritePolicy.AUTO)
ret = BlockDev.vdo_remove(self.vdo_name, True)
self.assertTrue(ret)
self.assertFalse(os.path.exists("/dev/mapper/%s" % self.vdo_name))
@tag_test(TestTags.SLOW)
def test_enable_disable_compression(self):
"""Verify that it is possible to enable/disable compression on an existing VDO volume"""
ret = BlockDev.vdo_create(self.vdo_name, self.loop_dev, 3 * self.loop_size, 0,
True, True, BlockDev.VDOWritePolicy.AUTO)
self.addCleanup(self._remove_vdo, self.vdo_name)
self.assertTrue(ret)
info = BlockDev.vdo_info(self.vdo_name)
self.assertTrue(info.compression)
# now disable the compression
ret = BlockDev.vdo_disable_compression(self.vdo_name)
self.assertTrue(ret)
info = BlockDev.vdo_info(self.vdo_name)
self.assertFalse(info.compression)
# and enable it again
ret = BlockDev.vdo_enable_compression(self.vdo_name)
self.assertTrue(ret)
info = BlockDev.vdo_info(self.vdo_name)
self.assertTrue(info.compression)
@tag_test(TestTags.SLOW)
def test_enable_disable_deduplication(self):
"""Verify that it is possible to enable/disable deduplication on an existing VDO volume"""
ret = BlockDev.vdo_create(self.vdo_name, self.loop_dev, 3 * self.loop_size, 0,
True, True, BlockDev.VDOWritePolicy.AUTO)
self.addCleanup(self._remove_vdo, self.vdo_name)
self.assertTrue(ret)
info = BlockDev.vdo_info(self.vdo_name)
self.assertTrue(info.deduplication)
# now disable the deduplication
ret = BlockDev.vdo_disable_deduplication(self.vdo_name)
self.assertTrue(ret)
info = BlockDev.vdo_info(self.vdo_name)
self.assertFalse(info.deduplication)
# and enable it again
ret = BlockDev.vdo_enable_deduplication(self.vdo_name)
self.assertTrue(ret)
info = BlockDev.vdo_info(self.vdo_name)
self.assertTrue(info.deduplication)
@tag_test(TestTags.SLOW)
def test_activate_deactivate(self):
"""Verify that it is possible to activate/deactivate an existing VDO volume"""
ret = BlockDev.vdo_create(self.vdo_name, self.loop_dev, 3 * self.loop_size, 0,
True, True, BlockDev.VDOWritePolicy.AUTO)
self.addCleanup(self._remove_vdo, self.vdo_name)
self.assertTrue(ret)
info = BlockDev.vdo_info(self.vdo_name)
self.assertTrue(info.active)
# now disable the volume
ret = BlockDev.vdo_deactivate(self.vdo_name)
self.assertTrue(ret)
info = BlockDev.vdo_info(self.vdo_name)
self.assertFalse(info.active)
# and stop it too
ret = BlockDev.vdo_stop(self.vdo_name)
self.assertTrue(ret)
self.assertFalse(os.path.exists("/dev/mapper/%s" % self.vdo_name))
# enable it again
ret = BlockDev.vdo_activate(self.vdo_name)
self.assertTrue(ret)
info = BlockDev.vdo_info(self.vdo_name)
self.assertTrue(info.active)
# and start it
ret = BlockDev.vdo_start(self.vdo_name)
self.assertTrue(ret)
self.assertTrue(os.path.exists("/dev/mapper/%s" % self.vdo_name))
@tag_test(TestTags.SLOW)
def test_change_write_policy(self):
ret = BlockDev.vdo_create(self.vdo_name, self.loop_dev, 3 * self.loop_size, 0,
True, True, BlockDev.VDOWritePolicy.AUTO)
self.addCleanup(self._remove_vdo, self.vdo_name)
self.assertTrue(ret)
with self.assertRaises(GLib.GError):
BlockDev.vdo_change_write_policy("definitely-not-a-vdo", BlockDev.VDOWritePolicy.SYNC)
with self.assertRaises(GLib.GError):
BlockDev.vdo_change_write_policy(self.vdo_name, BlockDev.VDOWritePolicy.UNKNOWN)
ret = BlockDev.vdo_change_write_policy(self.vdo_name, BlockDev.VDOWritePolicy.SYNC)
self.assertTrue(ret)
info = BlockDev.vdo_info(self.vdo_name)
self.assertEqual(info.write_policy, BlockDev.VDOWritePolicy.SYNC)
def _get_vdo_info(self, name):
ret, out, _err = run_command("vdo status -n %s" % name)
if ret != 0 or not out:
return None
info = yaml.load(out, Loader=yaml.SafeLoader)
if "VDOs" not in info.keys() or name not in info["VDOs"].keys():
print("Failed to parse output of 'vdo status'")
return None
return info["VDOs"][name]
@tag_test(TestTags.SLOW, TestTags.CORE)
def test_get_info(self):
"""Verify that it is possible to get information about an existing VDO volume"""
ret = BlockDev.vdo_create(self.vdo_name, self.loop_dev)
self.addCleanup(self._remove_vdo, self.vdo_name)
self.assertTrue(ret)
bd_info = BlockDev.vdo_info(self.vdo_name)
self.assertIsNotNone(bd_info)
sys_info = self._get_vdo_info(self.vdo_name)
self.assertIsNotNone(sys_info)
self.assertEqual(bd_info.deduplication, sys_info["Deduplication"] == "enabled")
self.assertEqual(bd_info.deduplication, sys_info["Compression"] == "enabled")
self.assertEqual(BlockDev.vdo_get_write_policy_str(bd_info.write_policy),
sys_info["Configured write policy"])
# index memory is printed in gigabytes without the unit
self.assertEqual(bd_info.index_memory, sys_info["Index memory setting"] * 1000**3)
# logical and physical size are printed with units
self.assertEqual(bd_info.physical_size, bytesize.Size(sys_info["Physical size"]))
self.assertEqual(bd_info.logical_size, bytesize.Size(sys_info["Logical size"]))
@tag_test(TestTags.SLOW)
def test_grow_logical(self):
"""Verify that it is possible to grow logical size of an existing VDO volume"""
ret = BlockDev.vdo_create(self.vdo_name, self.loop_dev)
self.addCleanup(self._remove_vdo, self.vdo_name)
self.assertTrue(ret)
info = BlockDev.vdo_info(self.vdo_name)
self.assertIsNotNone(info)
new_size = info.logical_size * 2
ret = BlockDev.vdo_grow_logical(self.vdo_name, new_size)
self.assertTrue(ret)
info = BlockDev.vdo_info(self.vdo_name)
self.assertIsNotNone(info)
self.assertEqual(info.logical_size, new_size)
@tag_test(TestTags.SLOW, TestTags.UNSTABLE)
def test_grow_physical(self):
"""Verify that it is possible to grow physical size of an existing VDO volume"""
# create a partition that we can grow later
succ = BlockDev.part_create_table(self.loop_dev, BlockDev.PartTableType.GPT, True)
self.assertTrue(succ)
part_spec = BlockDev.part_create_part(self.loop_dev,BlockDev.PartTypeReq.NORMAL, 2048, 5.1 * 1024**3, BlockDev.PartAlign.OPTIMAL)
self.assertIsNotNone(part_spec)
vdo_part_dev = self.loop_dev + '1'
self.assertTrue(os.path.exists(vdo_part_dev))
ret = BlockDev.vdo_create(self.vdo_name, vdo_part_dev)
self.addCleanup(self._remove_vdo, self.vdo_name)
self.assertTrue(ret)
info_before = BlockDev.vdo_info(self.vdo_name)
self.assertIsNotNone(info_before)
# grow the partition
succ = BlockDev.part_resize_part(self.loop_dev, vdo_part_dev, 0, BlockDev.PartAlign.OPTIMAL)
self.assertTrue(succ)
info_after = BlockDev.vdo_info(self.vdo_name)
self.assertIsNotNone(info_after)
self.assertEqual(info_before.logical_size, info_after.logical_size)
self.assertEqual(info_before.physical_size, info_after.physical_size)
# perform the real grow and get new sizes
succ = BlockDev.vdo_grow_physical(self.vdo_name)
self.assertTrue(succ)
info_after = BlockDev.vdo_info(self.vdo_name)
self.assertIsNotNone(info_after)
self.assertEqual(info_before.logical_size, info_after.logical_size)
self.assertGreater(info_after.physical_size, info_before.physical_size)
@tag_test(TestTags.SLOW)
def test_statistics(self):
"""Verify that it is possible to retrieve statistics of an existing VDO volume"""
ret = BlockDev.vdo_create(self.vdo_name, self.loop_dev)
self.addCleanup(self._remove_vdo, self.vdo_name)
self.assertTrue(ret)
with six.assertRaisesRegex(self, GLib.GError, "No such file or directory"):
stats = BlockDev.vdo_get_stats("nonexistingxxx")
stats = BlockDev.vdo_get_stats(self.vdo_name)
self.assertIsNotNone(stats)
# assuming block_size is always greater than zero
self.assertGreater(stats.block_size, 0)
stats = BlockDev.vdo_get_stats_full(self.vdo_name)
self.assertIsNotNone(stats)
self.assertGreater(len(stats), 0)
self.assertIn("block_size", stats)
class VDOUnloadTest(VDOTestCase):
def setUp(self):
# make sure the library is initialized with all plugins loaded for other
# tests
self.addCleanup(BlockDev.reinit, self.requested_plugins, True, None)
@tag_test(TestTags.NOSTORAGE)
def test_check_no_vdo(self):
"""Verify that checking vdo tool availability works as expected"""
# unload all plugins first
self.assertTrue(BlockDev.reinit([], True, None))
with fake_path(all_but="vdo"):
# no vdo tool available, the VDO plugin should fail to load
with self.assertRaises(GLib.GError):
BlockDev.reinit(self.requested_plugins, True, None)
self.assertNotIn("vdo", BlockDev.get_available_plugin_names())
# load the plugins back
self.assertTrue(BlockDev.reinit(self.requested_plugins, True, None))
self.assertIn("vdo", BlockDev.get_available_plugin_names())