import unittest
import os
import re
import time
from contextlib import contextmanager
from distutils.version import LooseVersion
from distutils.spawn import find_executable
from utils import create_sparse_tempfile, create_lio_device, delete_lio_device, wipe_all, fake_path, read_file, TestTags, tag_test
from bytesize import bytesize
import overrides_hack
from gi.repository import BlockDev, GLib
def _can_load_zram():
"""Test if we can load the zram module"""
if os.system("lsmod|grep zram >/dev/null") != 0:
# not loaded
return True
elif os.system("rmmod zram") == 0:
# successfully unloaded
return True
else:
# loaded and failed to unload
return False
@contextmanager
def _track_module_load(test_case, mod_name, loaded_attr):
setattr(test_case, loaded_attr, os.system("lsmod|grep %s > /dev/null" % mod_name) == 0)
try:
yield
finally:
setattr(test_case, loaded_attr, os.system("lsmod|grep %s > /dev/null" % mod_name) == 0)
def _wait_for_bcache_setup(bcache_dev):
i = 0
cache_dir = "/sys/block/%s/bcache/cache" % bcache_dev
while not os.access(cache_dir, os.R_OK):
time.sleep(1)
i += 1
if i >= 30:
print("WARNING: Giving up waiting for bcache setup!!!")
break
class KbdZRAMTestCase(unittest.TestCase):
requested_plugins = BlockDev.plugin_specs_from_names(("kbd", "swap"))
@classmethod
def setUpClass(cls):
if not BlockDev.is_initialized():
BlockDev.init(cls.requested_plugins, None)
else:
BlockDev.reinit(cls.requested_plugins, True, None)
def setUp(self):
self.addCleanup(self._clean_up)
self._loaded_zram_module = False
def _clean_up(self):
# make sure we unload the module if we loaded it
if self._loaded_zram_module:
os.system("rmmod zram")
class KbdZRAMDevicesTestCase(KbdZRAMTestCase):
@unittest.skipUnless(_can_load_zram(), "cannot load the 'zram' module")
@tag_test(TestTags.SLOW)
def test_create_destroy_devices(self):
# the easiest case
with _track_module_load(self, "zram", "_loaded_zram_module"):
self.assertTrue(BlockDev.kbd_zram_create_devices(2, [10 * 1024**2, 10 * 1024**2], [1, 2]))
time.sleep(1)
self.assertTrue(BlockDev.kbd_zram_destroy_devices())
time.sleep(1)
# no nstreams specified
with _track_module_load(self, "zram", "_loaded_zram_module"):
self.assertTrue(BlockDev.kbd_zram_create_devices(2, [10 * 1024**2, 10 * 1024**2], None))
time.sleep(1)
self.assertTrue(BlockDev.kbd_zram_destroy_devices())
time.sleep(1)
# with module pre-loaded, but unsed
self.assertEqual(os.system("modprobe zram num_devices=2"), 0)
time.sleep(1)
with _track_module_load(self, "zram", "_loaded_zram_module"):
self.assertTrue(BlockDev.kbd_zram_create_devices(2, [10 * 1024**2, 10 * 1024**2], [1, 1]))
time.sleep(1)
self.assertTrue(BlockDev.kbd_zram_destroy_devices())
time.sleep(1)
# with module pre-loaded, and devices used (as active swaps)
self.assertEqual(os.system("modprobe zram num_devices=2"), 0)
self.assertEqual(os.system("echo 10M > /sys/class/block/zram0/disksize"), 0)
self.assertEqual(os.system("echo 10M > /sys/class/block/zram1/disksize"), 0)
time.sleep(1)
for zram_dev in ("/dev/zram0", "/dev/zram1"):
self.assertTrue(BlockDev.swap_mkswap(zram_dev, None))
self.assertTrue(BlockDev.swap_swapon(zram_dev, -1))
with _track_module_load(self, "zram", "_loaded_zram_module"):
with self.assertRaises(GLib.GError):
self.assertTrue(BlockDev.kbd_zram_create_devices(2, [10 * 1024**2, 10 * 1024**2], [1, 1]))
for zram_dev in ("/dev/zram0", "/dev/zram1"):
self.assertTrue(BlockDev.swap_swapoff(zram_dev))
self.assertEqual(os.system("rmmod zram"), 0)
# should work just fine now
self.assertTrue(BlockDev.kbd_zram_create_devices(2, [10 * 1024**2, 10 * 1024**2], [1, 1]))
time.sleep(1)
self.assertTrue(BlockDev.kbd_zram_destroy_devices())
time.sleep(1)
@unittest.skipUnless(_can_load_zram(), "cannot load the 'zram' module")
@tag_test(TestTags.SLOW)
def test_zram_add_remove_device(self):
"""Verify that it is possible to add and remove a zram device"""
# the easiest case
with _track_module_load(self, "zram", "_loaded_zram_module"):
succ, device = BlockDev.kbd_zram_add_device (10 * 1024**2, 4)
self.assertTrue(succ)
self.assertTrue(device.startswith("/dev/zram"))
time.sleep(5)
self.assertTrue(BlockDev.kbd_zram_remove_device(device))
# no nstreams specified
with _track_module_load(self, "zram", "_loaded_zram_module"):
succ, device = BlockDev.kbd_zram_add_device (10 * 1024**2, 0)
self.assertTrue(succ)
self.assertTrue(device.startswith("/dev/zram"))
time.sleep(5)
self.assertTrue(BlockDev.kbd_zram_remove_device(device))
# create two devices
with _track_module_load(self, "zram", "_loaded_zram_module"):
succ, device = BlockDev.kbd_zram_add_device (10 * 1024**2, 4)
self.assertTrue(succ)
self.assertTrue(device.startswith("/dev/zram"))
succ, device2 = BlockDev.kbd_zram_add_device (10 * 1024**2, 4)
self.assertTrue(succ)
self.assertTrue(device2.startswith("/dev/zram"))
time.sleep(5)
self.assertTrue(BlockDev.kbd_zram_remove_device(device))
self.assertTrue(BlockDev.kbd_zram_remove_device(device2))
# mixture of multiple devices and a single device
with _track_module_load(self, "zram", "_loaded_zram_module"):
self.assertTrue(BlockDev.kbd_zram_create_devices(2, [10 * 1024**2, 10 * 1024**2], [1, 2]))
time.sleep(5)
succ, device = BlockDev.kbd_zram_add_device (10 * 1024**2, 4)
self.assertTrue(succ)
self.assertTrue(device.startswith("/dev/zram"))
time.sleep(5)
self.assertTrue(BlockDev.kbd_zram_destroy_devices())
time.sleep(5)
class KbdZRAMStatsTestCase(KbdZRAMTestCase):
@unittest.skipUnless(_can_load_zram(), "cannot load the 'zram' module")
def test_zram_get_stats(self):
"""Verify that it is possible to get stats for a zram device"""
# location of some sysfs files we use is different since linux 4.11
kernel_version = os.uname()[2]
if LooseVersion(kernel_version) >= LooseVersion("4.11"):
self._zram_get_stats_new()
else:
self._zram_get_stats_old()
def _zram_get_stats_new(self):
with _track_module_load(self, "zram", "_loaded_zram_module"):
self.assertTrue(BlockDev.kbd_zram_create_devices(1, [10 * 1024**2], [2]))
time.sleep(1)
# XXX: this needs to get more complex/serious
stats = BlockDev.kbd_zram_get_stats("zram0")
self.assertTrue(stats)
# /dev/zram0 should work too
stats = BlockDev.kbd_zram_get_stats("/dev/zram0")
self.assertTrue(stats)
self.assertEqual(stats.disksize, 10 * 1024**2)
# XXX: 'max_comp_streams' is currently broken on rawhide
# https://bugzilla.redhat.com/show_bug.cgi?id=1352567
# self.assertEqual(stats.max_comp_streams, 2)
self.assertTrue(stats.comp_algorithm)
# read 'num_reads' and 'num_writes' from '/sys/block/zram0/stat'
sys_stats = read_file("/sys/block/zram0/stat").strip().split()
self.assertGreaterEqual(len(sys_stats), 11) # 15 stats since 4.19
num_reads = int(sys_stats[0])
num_writes = int(sys_stats[4])
self.assertEqual(stats.num_reads, num_reads)
self.assertEqual(stats.num_writes, num_writes)
# read 'orig_data_size', 'compr_data_size', 'mem_used_total' and
# 'zero_pages' from '/sys/block/zram0/mm_stat'
sys_stats = read_file("/sys/block/zram0/mm_stat").strip().split()
self.assertGreaterEqual(len(sys_stats), 7) # since 4.18 we have 8 stats
orig_data_size = int(sys_stats[0])
compr_data_size = int(sys_stats[1])
mem_used_total = int(sys_stats[2])
zero_pages = int(sys_stats[5])
self.assertEqual(stats.orig_data_size, orig_data_size)
self.assertEqual(stats.compr_data_size, compr_data_size)
self.assertEqual(stats.mem_used_total, mem_used_total)
self.assertEqual(stats.zero_pages, zero_pages)
# read 'invalid_io' and 'num_writes' from '/sys/block/zram0/io_stat'
sys_stats = read_file("/sys/block/zram0/io_stat").strip().split()
self.assertEqual(len(sys_stats), 4)
invalid_io = int(sys_stats[2])
self.assertEqual(stats.invalid_io, invalid_io)
with _track_module_load(self, "zram", "_loaded_zram_module"):
self.assertTrue(BlockDev.kbd_zram_destroy_devices())
def _zram_get_stats_old(self):
with _track_module_load(self, "zram", "_loaded_zram_module"):
self.assertTrue(BlockDev.kbd_zram_create_devices(1, [10 * 1024**2], [2]))
time.sleep(1)
stats = BlockDev.kbd_zram_get_stats("zram0")
self.assertTrue(stats)
# /dev/zram0 should work too
stats = BlockDev.kbd_zram_get_stats("/dev/zram0")
self.assertTrue(stats)
self.assertEqual(stats.disksize, 10 * 1024**2)
self.assertEqual(stats.max_comp_streams, 2)
self.assertTrue(stats.comp_algorithm)
num_reads = int(read_file("/sys/block/zram0/num_reads").strip())
self.assertEqual(stats.num_reads, num_reads)
num_writes = int(read_file("/sys/block/zram0/num_writes").strip())
self.assertEqual(stats.num_writes, num_writes)
orig_data_size = int(read_file("/sys/block/zram0/orig_data_size").strip())
self.assertEqual(stats.orig_data_size, orig_data_size)
compr_data_size = int(read_file("/sys/block/zram0/compr_data_size").strip())
self.assertEqual(stats.compr_data_size, compr_data_size)
mem_used_total = int(read_file("/sys/block/zram0/mem_used_total").strip())
self.assertEqual(stats.mem_used_total, mem_used_total)
zero_pages = int(read_file("/sys/block/zram0/zero_pages").strip())
self.assertEqual(stats.zero_pages, zero_pages)
invalid_io = int(read_file("/sys/block/zram0/invalid_io").strip())
self.assertEqual(stats.invalid_io, invalid_io)
with _track_module_load(self, "zram", "_loaded_zram_module"):
self.assertTrue(BlockDev.kbd_zram_destroy_devices())
class KbdBcacheNodevTestCase(unittest.TestCase):
# no setUp/tearDown methods needed
requested_plugins = BlockDev.plugin_specs_from_names(("kbd", "swap"))
@classmethod
def setUpClass(cls):
if not find_executable("make-bcache"):
raise unittest.SkipTest("make-bcache executable not found in $PATH, skipping.")
if not BlockDev.is_initialized():
BlockDev.init(cls.requested_plugins, None)
else:
BlockDev.reinit(cls.requested_plugins, True, None)
@tag_test(TestTags.NOSTORAGE)
def test_bcache_mode_str_bijection(self):
"""Verify that it's possible to transform between cache modes and their string representations"""
mode_mapping = ((BlockDev.KBDBcacheMode.WRITETHROUGH, "writethrough"),
(BlockDev.KBDBcacheMode.WRITEBACK, "writeback"),
(BlockDev.KBDBcacheMode.WRITEAROUND, "writearound"),
(BlockDev.KBDBcacheMode.NONE, "none"),
(BlockDev.KBDBcacheMode.UNKNOWN, "unknown"),
)
for (mode, mode_str) in mode_mapping:
self.assertEqual(mode, BlockDev.kbd_bcache_get_mode_from_str(mode_str))
self.assertEqual(mode_str, BlockDev.kbd_bcache_get_mode_str(mode))
self.assertEqual(mode_str, BlockDev.kbd_bcache_get_mode_str(BlockDev.kbd_bcache_get_mode_from_str(mode_str)))
self.assertEqual(mode, BlockDev.kbd_bcache_get_mode_from_str(BlockDev.kbd_bcache_get_mode_str(mode)))
class KbdBcacheTestCase(unittest.TestCase):
requested_plugins = BlockDev.plugin_specs_from_names(("kbd", "swap"))
@classmethod
def setUpClass(cls):
if not find_executable("make-bcache"):
raise unittest.SkipTest("make-bcache executable not found in $PATH, skipping.")
if not BlockDev.is_initialized():
BlockDev.init(cls.requested_plugins, None)
else:
BlockDev.reinit(cls.requested_plugins, True, None)
def setUp(self):
self.addCleanup(self._clean_up)
self.dev_file = create_sparse_tempfile("lvm_test", 10 * 1024**3)
self.dev_file2 = create_sparse_tempfile("lvm_test", 10 * 1024**3)
try:
self.loop_dev = create_lio_device(self.dev_file)
except RuntimeError as e:
raise RuntimeError("Failed to setup loop device for testing: %s" % e)
try:
self.loop_dev2 = create_lio_device(self.dev_file2)
except RuntimeError as e:
raise RuntimeError("Failed to setup loop device for testing: %s" % e)
self.bcache_dev = None
def _clean_up(self):
if self.bcache_dev:
try:
BlockDev.kbd_bcache_destroy(self.bcache_dev)
except:
pass
try:
delete_lio_device(self.loop_dev)
except RuntimeError:
# just move on, we can do no better here
pass
os.unlink(self.dev_file)
try:
delete_lio_device(self.loop_dev2)
except RuntimeError:
# just move on, we can do no better here
pass
os.unlink(self.dev_file2)
class KbdTestBcacheCreate(KbdBcacheTestCase):
@tag_test(TestTags.UNSTABLE)
def test_bcache_create_destroy(self):
"""Verify that it's possible to create and destroy a bcache device"""
succ, dev = BlockDev.kbd_bcache_create(self.loop_dev, self.loop_dev2, None)
self.assertTrue(succ)
self.assertTrue(dev)
self.bcache_dev = dev
_wait_for_bcache_setup(dev)
succ = BlockDev.kbd_bcache_destroy(self.bcache_dev)
self.assertTrue(succ)
self.bcache_dev = None
time.sleep(5)
wipe_all(self.loop_dev, self.loop_dev2)
@tag_test(TestTags.UNSTABLE)
def test_bcache_create_destroy_full_path(self):
"""Verify that it's possible to create and destroy a bcache device with full device path"""
succ, dev = BlockDev.kbd_bcache_create(self.loop_dev, self.loop_dev2, None)
self.assertTrue(succ)
self.assertTrue(dev)
self.bcache_dev = dev
_wait_for_bcache_setup(dev)
succ = BlockDev.kbd_bcache_destroy("/dev/" + self.bcache_dev)
self.assertTrue(succ)
self.bcache_dev = None
time.sleep(5)
wipe_all(self.loop_dev, self.loop_dev2)
class KbdTestBcacheAttachDetach(KbdBcacheTestCase):
@tag_test(TestTags.UNSTABLE)
def test_bcache_attach_detach(self):
"""Verify that it's possible to detach/attach a cache from/to a bcache device"""
succ, dev = BlockDev.kbd_bcache_create(self.loop_dev, self.loop_dev2, None)
self.assertTrue(succ)
self.assertTrue(dev)
self.bcache_dev = dev
_wait_for_bcache_setup(dev)
succ, c_set_uuid = BlockDev.kbd_bcache_detach(self.bcache_dev)
self.assertTrue(succ)
self.assertTrue(c_set_uuid)
succ = BlockDev.kbd_bcache_attach(c_set_uuid, self.bcache_dev)
self.assertTrue(succ)
succ = BlockDev.kbd_bcache_destroy(self.bcache_dev)
self.assertTrue(succ)
self.bcache_dev = None
time.sleep(1)
wipe_all(self.loop_dev, self.loop_dev2)
@tag_test(TestTags.UNSTABLE)
def test_bcache_attach_detach_full_path(self):
"""Verify that it's possible to detach/attach a cache from/to a bcache device with full device path"""
succ, dev = BlockDev.kbd_bcache_create(self.loop_dev, self.loop_dev2, None)
self.assertTrue(succ)
self.assertTrue(dev)
self.bcache_dev = dev
_wait_for_bcache_setup(dev)
succ, c_set_uuid = BlockDev.kbd_bcache_detach("/dev/" + self.bcache_dev)
self.assertTrue(succ)
self.assertTrue(c_set_uuid)
succ = BlockDev.kbd_bcache_attach(c_set_uuid, "/dev/" + self.bcache_dev)
self.assertTrue(succ)
succ = BlockDev.kbd_bcache_destroy(self.bcache_dev)
self.assertTrue(succ)
self.bcache_dev = None
time.sleep(1)
wipe_all(self.loop_dev, self.loop_dev2)
@tag_test(TestTags.UNSTABLE)
def test_bcache_detach_destroy(self):
"""Verify that it's possible to destroy a bcache device with no cache attached"""
succ, dev = BlockDev.kbd_bcache_create(self.loop_dev, self.loop_dev2, None)
self.assertTrue(succ)
self.assertTrue(dev)
self.bcache_dev = dev
_wait_for_bcache_setup(dev)
succ, c_set_uuid = BlockDev.kbd_bcache_detach(self.bcache_dev)
self.assertTrue(succ)
self.assertTrue(c_set_uuid)
succ = BlockDev.kbd_bcache_destroy(self.bcache_dev)
self.assertTrue(succ)
self.bcache_dev = None
time.sleep(1)
wipe_all(self.loop_dev, self.loop_dev2)
class KbdTestBcacheGetSetMode(KbdBcacheTestCase):
@tag_test(TestTags.UNSTABLE)
def test_bcache_get_set_mode(self):
"""Verify that it is possible to get and set Bcache mode"""
succ, dev = BlockDev.kbd_bcache_create(self.loop_dev, self.loop_dev2, None)
self.assertTrue(succ)
self.assertTrue(dev)
self.bcache_dev = dev
_wait_for_bcache_setup(dev)
mode = BlockDev.kbd_bcache_get_mode(self.bcache_dev)
self.assertNotEqual(mode, BlockDev.KBDBcacheMode.UNKNOWN)
for mode_str in ("writethrough", "writeback", "writearound", "none"):
mode = BlockDev.kbd_bcache_get_mode_from_str(mode_str)
succ = BlockDev.kbd_bcache_set_mode(self.bcache_dev, mode)
self.assertTrue(succ)
new_mode = BlockDev.kbd_bcache_get_mode(self.bcache_dev)
self.assertEqual(mode, new_mode)
self.assertEqual(mode_str, BlockDev.kbd_bcache_get_mode_str(new_mode))
mode_str = "unknown"
mode = BlockDev.kbd_bcache_get_mode_from_str(mode_str)
with self.assertRaises(GLib.GError):
# cannot set mode to "uknown"
BlockDev.kbd_bcache_set_mode(self.bcache_dev, mode)
mode_str = "bla"
with self.assertRaises(GLib.GError):
mode = BlockDev.kbd_bcache_get_mode_from_str(mode_str)
# set back to some caching mode
mode_str = "writethrough"
mode = BlockDev.kbd_bcache_get_mode_from_str(mode_str)
succ = BlockDev.kbd_bcache_set_mode(self.bcache_dev, mode)
self.assertTrue(succ)
_wait_for_bcache_setup(dev)
succ = BlockDev.kbd_bcache_destroy(self.bcache_dev)
self.assertTrue(succ)
self.bcache_dev = None
time.sleep(1)
wipe_all(self.loop_dev, self.loop_dev2)
class KbdTestBcacheStatusTest(KbdBcacheTestCase):
def _get_size(self, bcache_name):
cache_dir = '/sys/block/%s/bcache/cache' % bcache_name
# sum sizes from all caches
caches = ['%s/%s' % (cache_dir, d) for d in os.listdir(cache_dir) if re.match('cache[0-9]*$', d)]
return sum(int(read_file(os.path.realpath(c) + '/../size')) for c in caches)
@tag_test(TestTags.UNSTABLE)
def test_bcache_status(self):
succ, dev = BlockDev.kbd_bcache_create(self.loop_dev, self.loop_dev2, None)
self.assertTrue(succ)
self.assertTrue(dev)
self.bcache_dev = dev
_wait_for_bcache_setup(dev)
# should work with both "bcacheX" and "/dev/bcacheX"
status = BlockDev.kbd_bcache_status(self.bcache_dev)
self.assertTrue(status)
status = BlockDev.kbd_bcache_status("/dev/" + self.bcache_dev)
self.assertTrue(status)
# check some basic values
self.assertTrue(status.state)
sys_state = read_file("/sys/block/%s/bcache/state" % self.bcache_dev).strip()
self.assertEqual(status.state, sys_state)
sys_block = read_file("/sys/block/%s/bcache/cache/block_size" % self.bcache_dev).strip()
self.assertEqual(status.block_size, int(bytesize.Size(sys_block)))
sys_size = self._get_size(self.bcache_dev)
self.assertGreater(status.cache_size, sys_size)
succ = BlockDev.kbd_bcache_destroy(self.bcache_dev)
self.assertTrue(succ)
self.bcache_dev = None
time.sleep(1)
wipe_all(self.loop_dev, self.loop_dev2)
class KbdTestBcacheBackingCacheDevTest(KbdBcacheTestCase):
@tag_test(TestTags.UNSTABLE)
def test_bcache_backing_cache_dev(self):
"""Verify that is is possible to get the backing and cache devices for a Bcache"""
succ, dev = BlockDev.kbd_bcache_create(self.loop_dev, self.loop_dev2, None)
self.assertTrue(succ)
self.assertTrue(dev)
self.bcache_dev = dev
_wait_for_bcache_setup(dev)
self.assertEqual("/dev/" + BlockDev.kbd_bcache_get_backing_device(self.bcache_dev), self.loop_dev)
self.assertEqual("/dev/" + BlockDev.kbd_bcache_get_cache_device(self.bcache_dev), self.loop_dev2)
succ = BlockDev.kbd_bcache_destroy(self.bcache_dev)
self.assertTrue(succ)
self.bcache_dev = None
time.sleep(1)
wipe_all(self.loop_dev, self.loop_dev2)
class KbdUnloadTest(KbdBcacheTestCase):
def setUp(self):
# make sure the library is initialized with all plugins loaded for other
# tests
self.addCleanup(BlockDev.reinit, self.requested_plugins, True, None)
@tag_test(TestTags.NOSTORAGE)
def test_check_no_bcache_progs(self):
"""Verify that checking the availability of make-bcache works as expected"""
# unload all plugins first
self.assertTrue(BlockDev.reinit([], True, None))
with fake_path(all_but="make-bcache"):
with self.assertRaises(GLib.GError):
BlockDev.reinit(self.requested_plugins, True, None)
self.assertNotIn("kbd", BlockDev.get_available_plugin_names())
# load the plugins back
self.assertTrue(BlockDev.reinit(self.requested_plugins, True, None))
self.assertIn("kbd", BlockDev.get_available_plugin_names())