import unittest
import os
import tempfile
import overrides_hack
import shutil
import subprocess
import six
import locale
import re
import tarfile
from utils import create_sparse_tempfile, create_lio_device, delete_lio_device, get_avail_locales, requires_locales, run_command, read_file, TestTags, tag_test
from gi.repository import BlockDev, GLib
PASSWD = "myshinylittlepassword"
PASSWD2 = "myshinylittlepassword2"
PASSWD3 = "myshinylittlepassword3"
def have_luks2():
try:
succ = BlockDev.utils_check_util_version("cryptsetup", "2.0.3", "--version", r"cryptsetup ([0-9+\.]+)")
except GLib.GError:
return False
else:
return succ
def have_bitlk():
try:
succ = BlockDev.utils_check_util_version("cryptsetup", "2.3.0", "--version", r"cryptsetup ([0-9+\.]+)")
except GLib.GError:
return False
else:
return succ
HAVE_LUKS2 = have_luks2()
HAVE_BITLK = have_bitlk()
class CryptoTestCase(unittest.TestCase):
requested_plugins = BlockDev.plugin_specs_from_names(("crypto", "loop"))
@classmethod
def setUpClass(cls):
unittest.TestCase.setUpClass()
cls.avail_locales = get_avail_locales()
if not BlockDev.is_initialized():
BlockDev.init(cls.requested_plugins, None)
else:
BlockDev.reinit(cls.requested_plugins, True, None)
def setUp(self):
self.addCleanup(self._clean_up)
self.dev_file = create_sparse_tempfile("crypto_test", 1024**3)
self.dev_file2 = create_sparse_tempfile("crypto_test2", 1024**3)
try:
self.loop_dev = create_lio_device(self.dev_file)
except RuntimeError as e:
raise RuntimeError("Failed to setup loop device for testing: %s" % e)
try:
self.loop_dev2 = create_lio_device(self.dev_file2)
except RuntimeError as e:
raise RuntimeError("Failed to setup loop device for testing: %s" % e)
# make a key file
handle, self.keyfile = tempfile.mkstemp(prefix="libblockdev_test_keyfile", text=False)
os.write(handle, b"nobodyknows")
os.close(handle)
def _clean_up(self):
try:
BlockDev.crypto_luks_close("libblockdevTestLUKS")
except:
pass
try:
delete_lio_device(self.loop_dev)
except RuntimeError:
# just move on, we can do no better here
pass
os.unlink(self.dev_file)
try:
delete_lio_device(self.loop_dev2)
except RuntimeError:
# just move on, we can do no better here
pass
os.unlink(self.dev_file2)
os.unlink(self.keyfile)
def _luks_format(self, device, passphrase, keyfile):
return BlockDev.crypto_luks_format(device, None, 0, passphrase, keyfile, 0)
def _luks2_format(self, device, passphrase, keyfile):
return BlockDev.crypto_luks_format(device, None, 0, passphrase, keyfile, 0, BlockDev.CryptoLUKSVersion.LUKS2, None)
class CryptoTestGenerateBackupPassphrase(CryptoTestCase):
def setUp(self):
# we don't need block devices for this test
pass
@tag_test(TestTags.NOSTORAGE)
def test_generate_backup_passhprase(self):
"""Verify that backup passphrase generation works as expected"""
exp = r"^([0-9A-Za-z./]{5}-){3}[0-9A-Za-z./]{5}$"
for _i in range(100):
bp = BlockDev.crypto_generate_backup_passphrase()
six.assertRegex(self, bp, exp)
class CryptoTestFormat(CryptoTestCase):
@tag_test(TestTags.SLOW, TestTags.CORE)
def test_luks_format(self):
"""Verify that formating device as LUKS works"""
# no passphrase nor keyfile
with self.assertRaises(GLib.GError):
BlockDev.crypto_luks_format(self.loop_dev, None, 0, None, None, 0)
# the simple case with password
succ = BlockDev.crypto_luks_format(self.loop_dev, "aes-xts-plain64", 0, PASSWD, None, 0)
self.assertTrue(succ)
# create with a keyfile
succ = BlockDev.crypto_luks_format(self.loop_dev, "aes-xts-plain64", 0, None, self.keyfile, 0)
self.assertTrue(succ)
# the simple case with password blob
succ = BlockDev.crypto_luks_format_blob(self.loop_dev, "aes-xts-plain64", 0, [ord(c) for c in PASSWD], 0)
self.assertTrue(succ)
@tag_test(TestTags.SLOW, TestTags.CORE)
@unittest.skipUnless(HAVE_LUKS2, "LUKS 2 not supported")
def test_luks2_format(self):
"""Verify that formating device as LUKS 2 works"""
# no passphrase nor keyfile
with self.assertRaises(GLib.GError):
BlockDev.crypto_luks_format(self.loop_dev, None, 0, None, None, 0)
# the simple case with password
succ = BlockDev.crypto_luks_format(self.loop_dev, "aes-xts-plain64", 0, PASSWD, None, 0)
self.assertTrue(succ)
# create with a keyfile
succ = BlockDev.crypto_luks_format(self.loop_dev, "aes-xts-plain64", 0, None, self.keyfile, 0)
self.assertTrue(succ)
# the simple case with password blob
succ = BlockDev.crypto_luks_format_blob(self.loop_dev, "aes-xts-plain64", 0, [ord(c) for c in PASSWD], 0)
self.assertTrue(succ)
# simple case with extra options
extra = BlockDev.CryptoLUKSExtra(label="blockdevLUKS")
succ = BlockDev.crypto_luks_format(self.loop_dev, "aes-xts-plain64", 0, None, self.keyfile, 0,
BlockDev.CryptoLUKSVersion.LUKS2, extra)
self.assertTrue(succ)
_ret, out, err = run_command("cryptsetup luksDump %s" % self.loop_dev)
m = re.search(r"Label:\s*(\S+)\s*", out)
if not m or len(m.groups()) != 1:
self.fail("Failed to get label information from:\n%s %s" % (out, err))
self.assertEqual(m.group(1), "blockdevLUKS")
# different key derivation function
pbkdf = BlockDev.CryptoLUKSPBKDF(type="pbkdf2")
extra = BlockDev.CryptoLUKSExtra(pbkdf=pbkdf)
succ = BlockDev.crypto_luks_format(self.loop_dev, "aes-xts-plain64", 0, None, self.keyfile, 0,
BlockDev.CryptoLUKSVersion.LUKS2, extra)
self.assertTrue(succ)
_ret, out, err = run_command("cryptsetup luksDump %s" % self.loop_dev)
m = re.search(r"PBKDF:\s*(\S+)\s*", out)
if not m or len(m.groups()) != 1:
self.fail("Failed to get pbkdf information from:\n%s %s" % (out, err))
self.assertEqual(m.group(1), "pbkdf2")
# different options for argon2 -- all parameters set
pbkdf = BlockDev.CryptoLUKSPBKDF(type="argon2id", max_memory_kb=100*1024, iterations=10, parallel_threads=1)
extra = BlockDev.CryptoLUKSExtra(pbkdf=pbkdf)
succ = BlockDev.crypto_luks_format(self.loop_dev, "aes-xts-plain64", 0, None, self.keyfile, 0,
BlockDev.CryptoLUKSVersion.LUKS2, extra)
self.assertTrue(succ)
_ret, out, err = run_command("cryptsetup luksDump %s" % self.loop_dev)
m = re.search(r"PBKDF:\s*(\S+)\s*", out)
if not m or len(m.groups()) != 1:
self.fail("Failed to get pbkdf information from:\n%s %s" % (out, err))
self.assertEqual(m.group(1), "argon2id")
m = re.search(r"Memory:\s*(\d+)\s*", out)
if not m or len(m.groups()) != 1:
self.fail("Failed to get pbkdf information from:\n%s %s" % (out, err))
# both iterations and memory is set --> cryptsetup will use exactly max_memory_kb
self.assertEqual(int(m.group(1)), 100*1024)
m = re.search(r"Threads:\s*(\d+)\s*", out)
if not m or len(m.groups()) != 1:
self.fail("Failed to get pbkdf information from:\n%s %s" % (out, err))
self.assertEqual(int(m.group(1)), 1)
m = re.search(r"Time cost:\s*(\d+)\s*", out)
if not m or len(m.groups()) != 1:
self.fail("Failed to get pbkdf information from:\n%s %s" % (out, err))
self.assertEqual(int(m.group(1)), 10)
# different options for argon2 -- only memory set
pbkdf = BlockDev.CryptoLUKSPBKDF(max_memory_kb=100*1024)
extra = BlockDev.CryptoLUKSExtra(pbkdf=pbkdf)
succ = BlockDev.crypto_luks_format(self.loop_dev, "aes-xts-plain64", 0, None, self.keyfile, 0,
BlockDev.CryptoLUKSVersion.LUKS2, extra)
self.assertTrue(succ)
_ret, out, err = run_command("cryptsetup luksDump %s" % self.loop_dev)
m = re.search(r"Memory:\s*(\d+)\s*", out)
if not m or len(m.groups()) != 1:
self.fail("Failed to get pbkdf information from:\n%s %s" % (out, err))
# only memory is set -> cryptsetup will run a benchmark and use
# at most max_memory_kb
self.assertLessEqual(int(m.group(1)), 100*1024)
# different options for argon2 -- only miterations set
pbkdf = BlockDev.CryptoLUKSPBKDF(iterations=5)
extra = BlockDev.CryptoLUKSExtra(pbkdf=pbkdf)
succ = BlockDev.crypto_luks_format(self.loop_dev, "aes-xts-plain64", 0, None, self.keyfile, 0,
BlockDev.CryptoLUKSVersion.LUKS2, extra)
self.assertTrue(succ)
_ret, out, err = run_command("cryptsetup luksDump %s" % self.loop_dev)
m = re.search(r"Time cost:\s*(\d+)\s*", out)
if not m or len(m.groups()) != 1:
self.fail("Failed to get pbkdf information from:\n%s %s" % (out, err))
self.assertEqual(int(m.group(1)), 5)
class CryptoTestResize(CryptoTestCase):
def _get_key_location(self, device):
_ret, out, err = run_command("cryptsetup status %s" % device)
m = re.search(r"\s*key location:\s*(\S+)\s*", out)
if not m or len(m.groups()) != 1:
self.fail("Failed to get key locaton from:\n%s %s" % (out, err))
return m.group(1)
@tag_test(TestTags.SLOW)
def test_luks_resize(self):
"""Verify that resizing LUKS device works"""
# the simple case with password
succ = self._luks_format(self.loop_dev, PASSWD, None)
self.assertTrue(succ)
succ = BlockDev.crypto_luks_open(self.loop_dev, "libblockdevTestLUKS", PASSWD, None, False)
self.assertTrue(succ)
# resize to 512 KiB (1024 * 512B sectors)
succ = BlockDev.crypto_luks_resize("libblockdevTestLUKS", 1024)
self.assertTrue(succ)
# resize back to full size
succ = BlockDev.crypto_luks_resize("libblockdevTestLUKS", 0)
self.assertTrue(succ)
succ = BlockDev.crypto_luks_close("libblockdevTestLUKS")
self.assertTrue(succ)
@tag_test(TestTags.SLOW)
@unittest.skipUnless(HAVE_LUKS2, "LUKS 2 not supported")
def test_luks2_resize(self):
"""Verify that resizing LUKS 2 device works"""
# the simple case with password
succ = self._luks2_format(self.loop_dev, PASSWD, self.keyfile)
self.assertTrue(succ)
succ = BlockDev.crypto_luks_open(self.loop_dev, "libblockdevTestLUKS", PASSWD, None, False)
self.assertTrue(succ)
# resize without passphrase should fail if key is saved in keyring
if self._get_key_location("libblockdevTestLUKS") == "keyring":
with self.assertRaises(GLib.GError):
BlockDev.crypto_luks_resize("libblockdevTestLUKS", 1024)
# resize to 512 KiB (1024 * 512B sectors)
succ = BlockDev.crypto_luks_resize("libblockdevTestLUKS", 1024, PASSWD)
self.assertTrue(succ)
# resize back to full size (using the keyfile)
succ = BlockDev.crypto_luks_resize("libblockdevTestLUKS", 0, None, self.keyfile)
self.assertTrue(succ)
succ = BlockDev.crypto_luks_close("libblockdevTestLUKS")
self.assertTrue(succ)
class CryptoTestOpenClose(CryptoTestCase):
def _luks_open_close(self, create_fn):
"""Verify that opening/closing LUKS device works"""
succ = create_fn(self.loop_dev, PASSWD, self.keyfile)
self.assertTrue(succ)
with self.assertRaises(GLib.GError):
BlockDev.crypto_luks_open("/non/existing/device", "libblockdevTestLUKS", PASSWD, None, False)
with self.assertRaises(GLib.GError):
BlockDev.crypto_luks_open(self.loop_dev, "libblockdevTestLUKS", None, None, False)
with six.assertRaisesRegex(self, GLib.GError, r"Incorrect passphrase"):
BlockDev.crypto_luks_open(self.loop_dev, "libblockdevTestLUKS", "wrong-passhprase", None, False)
with self.assertRaises(GLib.GError):
BlockDev.crypto_luks_open(self.loop_dev, "libblockdevTestLUKS", None, "wrong-keyfile", False)
succ = BlockDev.crypto_luks_open(self.loop_dev, "libblockdevTestLUKS", PASSWD, None, False)
self.assertTrue(succ)
# use the full /dev/mapper/ path
succ = BlockDev.crypto_luks_close("/dev/mapper/libblockdevTestLUKS")
self.assertTrue(succ)
succ = BlockDev.crypto_luks_open(self.loop_dev, "libblockdevTestLUKS", None, self.keyfile, False)
self.assertTrue(succ)
# use just the LUKS device name
succ = BlockDev.crypto_luks_close("libblockdevTestLUKS")
self.assertTrue(succ)
@tag_test(TestTags.SLOW, TestTags.CORE)
def test_luks_open_close(self):
self._luks_open_close(self._luks_format)
@tag_test(TestTags.SLOW, TestTags.CORE)
@unittest.skipUnless(HAVE_LUKS2, "LUKS 2 not supported")
def test_luks2_open_close(self):
self._luks_open_close(self._luks2_format)
class CryptoTestAddKey(CryptoTestCase):
def _add_key(self, create_fn):
"""Verify that adding key to LUKS device works"""
succ = create_fn(self.loop_dev, PASSWD, None)
self.assertTrue(succ)
with self.assertRaises(GLib.GError):
BlockDev.crypto_luks_add_key(self.loop_dev, "wrong-passphrase", None, PASSWD2, None)
succ = BlockDev.crypto_luks_add_key(self.loop_dev, PASSWD, None, PASSWD2, None)
self.assertTrue(succ)
succ = BlockDev.crypto_luks_add_key_blob(self.loop_dev, [ord(c) for c in PASSWD2], [ord(c) for c in PASSWD3])
self.assertTrue(succ)
@tag_test(TestTags.SLOW)
def test_luks_add_key(self):
self._add_key(self._luks_format)
@tag_test(TestTags.SLOW)
@unittest.skipUnless(HAVE_LUKS2, "LUKS 2 not supported")
def test_luks2_add_key(self):
self._add_key(self._luks2_format)
class CryptoTestRemoveKey(CryptoTestCase):
def _remove_key(self, create_fn):
"""Verify that removing key from LUKS device works"""
succ = create_fn(self.loop_dev, PASSWD, None)
self.assertTrue(succ)
succ = BlockDev.crypto_luks_add_key(self.loop_dev, PASSWD, None, PASSWD2, None)
self.assertTrue(succ)
succ = BlockDev.crypto_luks_add_key(self.loop_dev, PASSWD, None, PASSWD3, None)
self.assertTrue(succ)
with self.assertRaises(GLib.GError):
BlockDev.crypto_luks_remove_key(self.loop_dev, "wrong-passphrase", None)
succ = BlockDev.crypto_luks_remove_key(self.loop_dev, PASSWD, None)
self.assertTrue(succ)
succ = BlockDev.crypto_luks_remove_key_blob(self.loop_dev, [ord(c) for c in PASSWD2])
self.assertTrue(succ)
@tag_test(TestTags.SLOW)
def test_luks_remove_key(self):
self._remove_key(self._luks_format)
@tag_test(TestTags.SLOW)
@unittest.skipUnless(HAVE_LUKS2, "LUKS 2 not supported")
def test_luks2_remove_key(self):
self._remove_key(self._luks2_format)
class CryptoTestErrorLocale(CryptoTestCase):
def setUp(self):
self._orig_loc = None
CryptoTestCase.setUp(self)
self._orig_loc = ".".join(locale.getdefaultlocale())
def _clean_up(self):
CryptoTestCase._clean_up(self)
if self._orig_loc:
locale.setlocale(locale.LC_ALL, self._orig_loc)
@tag_test(TestTags.SLOW)
@requires_locales({"cs_CZ.UTF-8"})
def test_error_locale_key(self):
"""Verify that the error msg is locale agnostic"""
succ = BlockDev.crypto_luks_format(self.loop_dev, None, 0, PASSWD, None, 0)
self.assertTrue(succ)
locale.setlocale(locale.LC_ALL, "cs_CZ.UTF-8")
try:
BlockDev.crypto_luks_remove_key(self.loop_dev, "wrong-passphrase", None)
except GLib.GError as e:
self.assertIn("Operation not permitted", str(e))
class CryptoTestChangeKey(CryptoTestCase):
def _change_key(self, create_fn):
"""Verify that changing key in LUKS device works"""
succ = create_fn(self.loop_dev, PASSWD, None)
self.assertTrue(succ)
with six.assertRaisesRegex(self, GLib.GError, r"No keyslot with given passphrase found."):
BlockDev.crypto_luks_change_key(self.loop_dev, "wrong-passphrase", PASSWD2)
succ = BlockDev.crypto_luks_change_key(self.loop_dev, PASSWD, PASSWD2)
self.assertTrue(succ)
succ = BlockDev.crypto_luks_change_key_blob(self.loop_dev, [ord(c) for c in PASSWD2], [ord(c) for c in PASSWD3])
self.assertTrue(succ)
@tag_test(TestTags.SLOW)
def test_luks_change_key(self):
self._change_key(self._luks_format)
@tag_test(TestTags.SLOW)
@unittest.skipUnless(HAVE_LUKS2, "LUKS 2 not supported")
def test_luks2_change_key(self):
self._change_key(self._luks2_format)
class CryptoTestIsLuks(CryptoTestCase):
def _is_luks(self, create_fn):
"""Verify that LUKS device recognition works"""
with self.assertRaises(GLib.GError):
BlockDev.crypto_device_is_luks("/non/existing/device")
succ = create_fn(self.loop_dev, PASSWD, None)
self.assertTrue(succ)
is_luks = BlockDev.crypto_device_is_luks(self.loop_dev)
self.assertTrue(is_luks)
is_luks = BlockDev.crypto_device_is_luks(self.loop_dev2)
self.assertFalse(is_luks)
@tag_test(TestTags.SLOW)
def test_is_luks(self):
self._is_luks(self._luks_format)
@tag_test(TestTags.SLOW)
@unittest.skipUnless(HAVE_LUKS2, "LUKS 2 not supported")
def test_is_luks2(self):
self._is_luks(self._luks2_format)
class CryptoTestLuksStatus(CryptoTestCase):
def _luks_status(self, create_fn):
"""Verify that LUKS device status reporting works"""
with self.assertRaises(GLib.GError):
BlockDev.crypto_luks_status("/non/existing/device")
succ = create_fn(self.loop_dev, PASSWD, None)
self.assertTrue(succ)
succ = BlockDev.crypto_luks_open(self.loop_dev, "libblockdevTestLUKS", PASSWD, None, False)
self.assertTrue(succ)
# use the full /dev/mapper path
status = BlockDev.crypto_luks_status("/dev/mapper/libblockdevTestLUKS")
self.assertEqual(status, "active")
# use just the LUKS device name
status = BlockDev.crypto_luks_status("libblockdevTestLUKS")
self.assertEqual(status, "active")
succ = BlockDev.crypto_luks_close("libblockdevTestLUKS")
self.assertTrue(succ)
with self.assertRaises(GLib.GError):
BlockDev.crypto_luks_status("libblockdevTestLUKS")
@tag_test(TestTags.SLOW)
def test_luks_status(self):
self._luks_status(self._luks_format)
@tag_test(TestTags.SLOW)
@unittest.skipUnless(HAVE_LUKS2, "LUKS 2 not supported")
def test_luks2_status(self):
self._luks_status(self._luks2_format)
class CryptoTestGetUUID(CryptoTestCase):
def _get_uuid(self, create_fn):
"""Verify that getting LUKS device UUID works"""
succ = create_fn(self.loop_dev, PASSWD, None)
self.assertTrue(succ)
uuid = BlockDev.crypto_luks_uuid(self.loop_dev)
self.assertTrue(uuid)
with self.assertRaises(GLib.GError):
uuid = BlockDev.crypto_luks_uuid(self.loop_dev2)
@tag_test(TestTags.SLOW)
def test_luks_get_uuid(self):
self._get_uuid(self._luks_format)
@tag_test(TestTags.SLOW)
@unittest.skipUnless(HAVE_LUKS2, "LUKS 2 not supported")
def test_luks2_get_uuid(self):
self._get_uuid(self._luks2_format)
class CryptoTestGetMetadataSize(CryptoTestCase):
@tag_test(TestTags.SLOW)
@unittest.skipUnless(HAVE_LUKS2, "LUKS 2 not supported")
def test_luks2_get_metadata_size(self):
"""Verify that getting LUKS 2 device metadata size works"""
succ = self._luks2_format(self.loop_dev, PASSWD, None)
self.assertTrue(succ)
meta_size = BlockDev.crypto_luks_get_metadata_size(self.loop_dev)
ret, out, err = run_command("cryptsetup luksDump %s" % self.loop_dev)
if ret != 0:
self.fail("Failed to get LUKS 2 header from %s:\n%s %s" % (self.loop_dev, out, err))
m = re.search(r"offset:\s*([0-9]+)\s*\[bytes\]", out)
if m is None:
self.fail("Failed to get LUKS 2 offset information from %s:\n%s %s" % (self.loop_dev, out, err))
offset = int(m.group(1))
self.assertEquals(meta_size, offset, "LUKS 2 metadata sizes differ")
@tag_test(TestTags.SLOW)
def test_luks_get_metadata_size(self):
"""Verify that getting LUKS device metadata size works"""
succ = self._luks_format(self.loop_dev, PASSWD, None)
self.assertTrue(succ)
meta_size = BlockDev.crypto_luks_get_metadata_size(self.loop_dev)
ret, out, err = run_command("cryptsetup luksDump %s" % self.loop_dev)
if ret != 0:
self.fail("Failed to get LUKS header from %s:\n%s %s" % (self.loop_dev, out, err))
m = re.search(r"Payload offset:\s*([0-9]+)", out)
if m is None:
self.fail("Failed to get LUKS 2 offset information from %s:\n%s %s" % (self.loop_dev, out, err))
# offset value is in 512B blocks; we need to multiply to get the real metadata size
offset = int(m.group(1)) * 512
self.assertEquals(meta_size, offset, "LUKS metadata sizes differ")
class CryptoTestLuksOpenRW(CryptoTestCase):
def _luks_open_rw(self, create_fn):
"""Verify that a LUKS device can be activated as RW as well as RO"""
succ = create_fn(self.loop_dev, PASSWD, None)
self.assertTrue(succ)
succ = BlockDev.crypto_luks_open(self.loop_dev, "libblockdevTestLUKS", PASSWD, None, False)
self.assertTrue(succ)
# tests that we can write something to the raw LUKS device
succ = BlockDev.utils_exec_and_report_error(["dd", "if=/dev/zero", "of=/dev/mapper/libblockdevTestLUKS", "bs=1M", "count=1"])
self.assertTrue(succ)
succ = BlockDev.crypto_luks_close("libblockdevTestLUKS")
self.assertTrue(succ)
# now try the same with LUKS device opened as RO
succ = BlockDev.crypto_luks_open(self.loop_dev, "libblockdevTestLUKS", PASSWD, None, True)
self.assertTrue(succ)
# tests that we can write something to the raw LUKS device
with self.assertRaises(GLib.GError):
BlockDev.utils_exec_and_report_error(["dd", "if=/dev/zero", "of=/dev/mapper/libblockdevTestLUKS", "bs=1M", "count=1"])
succ = BlockDev.crypto_luks_close("libblockdevTestLUKS")
self.assertTrue(succ)
@tag_test(TestTags.SLOW)
def test_luks_open_rw(self):
self._luks_open_rw(self._luks_format)
@tag_test(TestTags.SLOW)
@unittest.skipUnless(HAVE_LUKS2, "LUKS 2 not supported")
def test_luks2_open_rw(self):
self._luks_open_rw(self._luks2_format)
class CryptoTestEscrow(CryptoTestCase):
def setUp(self):
super(CryptoTestEscrow, self).setUp()
# Create the certificate used to encrypt the escrow packet and backup passphrase.
# volume_key requires a nss database directory to decrypt any of the
# packet files, and python-nss is python2 only, just do everything with
# shell commands.
self.nss_dir = tempfile.mkdtemp(prefix='libblockdev_test_escrow')
self.addCleanup(shutil.rmtree, self.nss_dir)
subprocess.check_call(['certutil', '-d', self.nss_dir, '--empty-password', '-N'])
# Gather some entropy to keep certutil from asking for input
with tempfile.NamedTemporaryFile() as noise_file:
noise_file.write(os.urandom(20))
noise_file.flush()
subprocess.check_call(['certutil', '-d', self.nss_dir, '-S', '-x', '-n',
'escrow_cert', '-s', 'CN=Escrow Test', '-t', ',,TC', '-z',
noise_file.name])
# Export the public certificate
handle, self.public_cert = tempfile.mkstemp(prefix='libblockdev_test_escrow')
os.close(handle)
subprocess.check_call(['certutil', '-d', self.nss_dir, '-L', '-n', 'escrow_cert',
'-a', '-o', self.public_cert])
self.addCleanup(os.unlink, self.public_cert)
@tag_test(TestTags.SLOW)
def test_escrow_packet(self):
"""Verify that an escrow packet can be created for a device"""
succ = BlockDev.crypto_luks_format(self.loop_dev, None, 0, PASSWD, None, 0)
self.assertTrue(succ)
escrow_dir = tempfile.mkdtemp(prefix='libblockdev_test_escrow')
self.addCleanup(shutil.rmtree, escrow_dir)
with open(self.public_cert, 'rb') as cert_file:
succ = BlockDev.crypto_escrow_device(self.loop_dev, PASSWD, cert_file.read(),
escrow_dir, None)
self.assertTrue(succ)
# Find the escrow packet
escrow_packet_file = '%s/%s-escrow' % (escrow_dir, BlockDev.crypto_luks_uuid(self.loop_dev))
self.assertTrue(os.path.isfile(escrow_packet_file))
# Use the volume_key utility (see note in setUp about why not python)
# to decrypt the escrow packet and restore access to the volume under
# a new passphrase
# Just use the existing temp directory to output the re-encrypted packet
# PASSWD2 is the passphrase of the new escrow packet
p = subprocess.Popen(['volume_key', '--reencrypt', '-b', '-d', self.nss_dir,
escrow_packet_file, '-o', '%s/escrow-out' % escrow_dir],
stdin=subprocess.PIPE)
p.communicate(input=('%s\0%s\0' % (PASSWD2, PASSWD2)).encode('utf-8'))
if p.returncode != 0:
raise subprocess.CalledProcessError(p.returncode, 'volume_key'
)
# Restore access to the volume
# PASSWD3 is the new passphrase for the LUKS device
p = subprocess.Popen(['volume_key', '--restore', '-b', self.loop_dev,
'%s/escrow-out' % escrow_dir], stdin=subprocess.PIPE)
p.communicate(input=('%s\0%s\0%s\0' % (PASSWD2, PASSWD3, PASSWD3)).encode('utf-8'))
if p.returncode != 0:
raise subprocess.CalledProcessError(p.returncode, 'volume_key')
# Open the volume with the new passphrase
succ = BlockDev.crypto_luks_open(self.loop_dev, 'libblockdevTestLUKS', PASSWD3, None)
self.assertTrue(succ)
@tag_test(TestTags.SLOW)
def test_backup_passphrase(self):
"""Verify that a backup passphrase can be created for a device"""
succ = BlockDev.crypto_luks_format(self.loop_dev, None, 0, PASSWD, None, 0)
self.assertTrue(succ)
escrow_dir = tempfile.mkdtemp(prefix='libblockdev_test_escrow')
self.addCleanup(shutil.rmtree, escrow_dir)
backup_passphrase = BlockDev.crypto_generate_backup_passphrase()
with open(self.public_cert, 'rb') as cert_file:
succ = BlockDev.crypto_escrow_device(self.loop_dev, PASSWD, cert_file.read(),
escrow_dir, backup_passphrase)
self.assertTrue(succ)
# Find the backup passphrase
escrow_backup_passphrase = "%s/%s-escrow-backup-passphrase" % (escrow_dir, BlockDev.crypto_luks_uuid(self.loop_dev))
self.assertTrue(os.path.isfile(escrow_backup_passphrase))
# Check that the encrypted file contains what we put in
env = {k: v for k, v in os.environ.items()}
env.update({"LC_ALL": "C"})
passphrase = subprocess.check_output(
['volume_key', '--secrets', '-d', self.nss_dir, escrow_backup_passphrase],
env=env)
passphrase = passphrase.strip().split()[1].decode('ascii')
self.assertEqual(passphrase, backup_passphrase)
# Check that the backup passphrase works
succ = BlockDev.crypto_luks_open(self.loop_dev, 'libblockdevTestLUKS', backup_passphrase, None)
self.assertTrue(succ)
class CryptoTestSuspendResume(CryptoTestCase):
def _luks_suspend_resume(self, create_fn):
succ = create_fn(self.loop_dev, PASSWD, self.keyfile)
self.assertTrue(succ)
succ = BlockDev.crypto_luks_open(self.loop_dev, "libblockdevTestLUKS", PASSWD, None)
self.assertTrue(succ)
with self.assertRaises(GLib.GError):
BlockDev.crypto_luks_suspend("/non/existing/device")
# use the full /dev/mapper/ path
succ = BlockDev.crypto_luks_suspend("/dev/mapper/libblockdevTestLUKS")
self.assertTrue(succ)
_ret, state, _err = run_command("lsblk -oSTATE -n /dev/mapper/libblockdevTestLUKS")
self.assertEqual(state, "suspended")
with self.assertRaises(GLib.GError):
BlockDev.crypto_luks_resume("libblockdevTestLUKS", None, None)
with self.assertRaises(GLib.GError):
BlockDev.crypto_luks_resume("libblockdevTestLUKS", "wrong-passhprase", None)
with self.assertRaises(GLib.GError):
BlockDev.crypto_luks_resume("libblockdevTestLUKS", None, "wrong-keyfile")
succ = BlockDev.crypto_luks_resume("libblockdevTestLUKS", PASSWD, None)
self.assertTrue(succ)
_ret, state, _err = run_command("lsblk -oSTATE -n /dev/mapper/libblockdevTestLUKS")
self.assertEqual(state, "running")
# use just the LUKS device name
succ = BlockDev.crypto_luks_suspend("libblockdevTestLUKS")
self.assertTrue(succ)
_ret, state, _err = run_command("lsblk -oSTATE -n /dev/mapper/libblockdevTestLUKS")
self.assertEqual(state, "suspended")
succ = BlockDev.crypto_luks_resume("libblockdevTestLUKS", None, self.keyfile)
self.assertTrue(succ)
_ret, state, _err = run_command("lsblk -oSTATE -n /dev/mapper/libblockdevTestLUKS")
self.assertEqual(state, "running")
succ = BlockDev.crypto_luks_close("libblockdevTestLUKS")
self.assertTrue(succ)
@tag_test(TestTags.SLOW)
def test_luks_suspend_resume(self):
"""Verify that suspending/resuming LUKS device works"""
self._luks_suspend_resume(self._luks_format)
@tag_test(TestTags.SLOW)
@unittest.skipUnless(HAVE_LUKS2, "LUKS 2 not supported")
def test_luks2_suspend_resume(self):
"""Verify that suspending/resuming LUKS 2 device works"""
self._luks_suspend_resume(self._luks2_format)
class CryptoTestKillSlot(CryptoTestCase):
def _luks_kill_slot(self, create_fn):
succ = create_fn(self.loop_dev, PASSWD, None)
self.assertTrue(succ)
succ = BlockDev.crypto_luks_add_key(self.loop_dev, PASSWD, None, PASSWD2, None)
self.assertTrue(succ)
with self.assertRaises(GLib.GError):
BlockDev.crypto_luks_kill_slot("/non/existing/device", -1)
# invalid slot
with self.assertRaises(GLib.GError):
BlockDev.crypto_luks_kill_slot(self.loop_dev, -1)
# unused slot
with self.assertRaises(GLib.GError):
BlockDev.crypto_luks_kill_slot(self.loop_dev, 2)
# destroy second keyslot
succ = BlockDev.crypto_luks_kill_slot(self.loop_dev, 1)
self.assertTrue(succ)
# opening with the second passphrase should fail
with self.assertRaises(GLib.GError):
BlockDev.crypto_luks_open(self.loop_dev, "libblockdevTestLUKS", PASSWD2)
# opening with passphrase should still work
succ = BlockDev.crypto_luks_open(self.loop_dev, "libblockdevTestLUKS", PASSWD)
self.assertTrue(succ)
succ = BlockDev.crypto_luks_close("libblockdevTestLUKS")
self.assertTrue(succ)
@tag_test(TestTags.SLOW)
def test_luks_kill_slot(self):
"""Verify that killing a key slot on LUKS device works"""
self._luks_kill_slot(self._luks_format)
@tag_test(TestTags.SLOW)
@unittest.skipUnless(HAVE_LUKS2, "LUKS 2 not supported")
def test_luks2_kill_slot(self):
"""Verify that killing a key slot on LUKS 2 device works"""
self._luks_kill_slot(self._luks2_format)
class CryptoTestHeaderBackupRestore(CryptoTestCase):
def setUp(self):
super(CryptoTestHeaderBackupRestore, self).setUp()
self.backup_dir = tempfile.mkdtemp(prefix='libblockdev_test_header')
self.addCleanup(shutil.rmtree, self.backup_dir)
def _luks_header_backup_restore(self, create_fn):
succ = create_fn(self.loop_dev, PASSWD, None)
self.assertTrue(succ)
backup_file = os.path.join(self.backup_dir, "luks-header.txt")
succ = BlockDev.crypto_luks_header_backup(self.loop_dev, backup_file)
self.assertTrue(succ)
self.assertTrue(os.path.isfile(backup_file))
# now completely destroy the luks header
ret, out, err = run_command("cryptsetup erase %s -q && wipefs -a %s" % (self.loop_dev, self.loop_dev))
if ret != 0:
self.fail("Failed to erase LUKS header from %s:\n%s %s" % (self.loop_dev, out, err))
_ret, fstype, _err = run_command("blkid -p -ovalue -sTYPE %s" % self.loop_dev)
self.assertFalse(fstype) # false == empty
# header is destroyed, should not be possible to open
with self.assertRaises(GLib.GError):
BlockDev.crypto_luks_open(self.loop_dev, "libblockdevTestLUKS", PASSWD, None)
# and restore the header back
succ = BlockDev.crypto_luks_header_restore(self.loop_dev, backup_file)
self.assertTrue(succ)
_ret, fstype, _err = run_command("blkid -p -ovalue -sTYPE %s" % self.loop_dev)
self.assertEqual(fstype, "crypto_LUKS")
# opening should now work
succ = BlockDev.crypto_luks_open(self.loop_dev, "libblockdevTestLUKS", PASSWD)
self.assertTrue(succ)
succ = BlockDev.crypto_luks_close("libblockdevTestLUKS")
self.assertTrue(succ)
@tag_test(TestTags.SLOW)
def test_luks_header_backup_restore(self):
"""Verify that header backup/restore with LUKS works"""
self._luks_header_backup_restore(self._luks_format)
@tag_test(TestTags.SLOW)
@unittest.skipUnless(HAVE_LUKS2, "LUKS 2 not supported")
def test_luks2_header_backup_restore(self):
"""Verify that header backup/restore with LUKS2 works"""
self._luks_header_backup_restore(self._luks2_format)
class CryptoTestInfo(CryptoTestCase):
@tag_test(TestTags.SLOW, TestTags.CORE)
def test_luks_format(self):
"""Verify that we can get information about a LUKS device"""
succ = BlockDev.crypto_luks_format(self.loop_dev, "aes-cbc-essiv:sha256", 256, PASSWD, None, 0)
self.assertTrue(succ)
succ = BlockDev.crypto_luks_open(self.loop_dev, "libblockdevTestLUKS", PASSWD, None, False)
self.assertTrue(succ)
info = BlockDev.crypto_luks_info("libblockdevTestLUKS")
self.assertIsNotNone(info)
self.assertEqual(info.version, BlockDev.CryptoLUKSVersion.LUKS1)
self.assertEqual(info.cipher, "aes")
self.assertEqual(info.mode, "cbc-essiv:sha256")
self.assertEqual(info.backing_device, self.loop_dev)
_ret, uuid, _err = run_command("blkid -p -ovalue -sUUID %s" % self.loop_dev)
self.assertEqual(info.uuid, uuid)
succ = BlockDev.crypto_luks_close("libblockdevTestLUKS")
self.assertTrue(succ)
@tag_test(TestTags.SLOW, TestTags.CORE)
@unittest.skipUnless(HAVE_LUKS2, "LUKS 2 not supported")
def test_luks2_format(self):
"""Verify that we can get information about a LUKS 2 device"""
extra = BlockDev.CryptoLUKSExtra()
extra.sector_size = 4096
succ = BlockDev.crypto_luks_format(self.loop_dev, "aes-cbc-essiv:sha256", 256, PASSWD, None, 0,
BlockDev.CryptoLUKSVersion.LUKS2, extra)
self.assertTrue(succ)
succ = BlockDev.crypto_luks_open(self.loop_dev, "libblockdevTestLUKS", PASSWD, None, False)
self.assertTrue(succ)
info = BlockDev.crypto_luks_info("libblockdevTestLUKS")
self.assertIsNotNone(info)
self.assertEqual(info.version, BlockDev.CryptoLUKSVersion.LUKS2)
self.assertEqual(info.cipher, "aes")
self.assertEqual(info.mode, "cbc-essiv:sha256")
self.assertEqual(info.backing_device, self.loop_dev)
self.assertEqual(info.sector_size, 4096)
_ret, uuid, _err = run_command("blkid -p -ovalue -sUUID %s" % self.loop_dev)
self.assertEqual(info.uuid, uuid)
succ = BlockDev.crypto_luks_close("libblockdevTestLUKS")
self.assertTrue(succ)
class CryptoTestIntegrity(CryptoTestCase):
@tag_test(TestTags.SLOW)
@unittest.skipUnless(HAVE_LUKS2, "LUKS 2 not supported")
def test_luks2_integrity(self):
"""Verify that we can get create a LUKS 2 device with integrity"""
if not BlockDev.utils_have_kernel_module("dm-integrity"):
self.skipTest('dm-integrity kernel module not available, skipping.')
extra = BlockDev.CryptoLUKSExtra()
extra.integrity = "hmac(sha256)"
succ = BlockDev.crypto_luks_format(self.loop_dev, "aes-cbc-essiv:sha256", 512, PASSWD, None, 0,
BlockDev.CryptoLUKSVersion.LUKS2, extra)
self.assertTrue(succ)
succ = BlockDev.crypto_luks_open(self.loop_dev, "libblockdevTestLUKS", PASSWD, None, False)
self.assertTrue(succ)
info = BlockDev.crypto_integrity_info("libblockdevTestLUKS")
self.assertIsNotNone(info)
self.assertEqual(info.algorithm, "hmac(sha256)")
# get integrity device dm name
_ret, int_name, _err = run_command('ls /sys/block/%s/holders/' % self.loop_dev.split("/")[-1])
self.assertTrue(int_name) # true == not empty
tag_size = read_file("/sys/block/%s/integrity/tag_size" % int_name)
self.assertEqual(info.tag_size, int(tag_size))
succ = BlockDev.crypto_luks_close("libblockdevTestLUKS")
self.assertTrue(succ)
class CryptoTestTrueCrypt(CryptoTestCase):
# we can't create TrueCrypt/VeraCrypt formats using libblockdev
# so we are using these images from cryptsetup test suite
# https://gitlab.com/cryptsetup/cryptsetup/blob/master/tests/tcrypt-images.tar.bz2
tc_img = "tc-sha512-xts-aes"
vc_img = "vc-sha512-xts-aes"
passphrase = "aaaaaaaaaaaa"
tempdir = None
@classmethod
def setUpClass(cls):
super(CryptoTestTrueCrypt, cls).setUpClass()
cls.tempdir = tempfile.mkdtemp(prefix="bd_test_tcrypt")
images = os.path.join(os.path.dirname(__file__), "truecrypt-images.tar.gz")
with tarfile.open(images, "r") as tar:
tar.extractall(cls.tempdir)
@classmethod
def tearDownClass(cls):
super(CryptoTestTrueCrypt, cls).tearDownClass()
shutil.rmtree(cls.tempdir)
def setUp(self):
self.addCleanup(self._clean_up)
succ, loop = BlockDev.loop_setup(os.path.join(self.tempdir, self.tc_img))
if not succ:
raise RuntimeError("Failed to setup loop device for testing")
self.tc_dev = "/dev/%s" % loop
succ, loop = BlockDev.loop_setup(os.path.join(self.tempdir, self.vc_img))
if not succ:
raise RuntimeError("Failed to setup loop device for testing")
self.vc_dev = "/dev/%s" % loop
def _clean_up(self):
try:
BlockDev.crypto_tc_close("libblockdevTestTC")
except:
pass
succ = BlockDev.loop_teardown(self.tc_dev)
if not succ:
raise RuntimeError("Failed to tear down loop device used for testing")
succ = BlockDev.loop_teardown(self.vc_dev)
if not succ:
raise RuntimeError("Failed to tear down loop device used for testing")
@tag_test(TestTags.NOSTORAGE)
def test_truecrypt_open_close(self):
"""Verify that opening/closing TrueCrypt device works"""
with self.assertRaises(GLib.GError):
BlockDev.crypto_luks_open("/non/existing/device", "libblockdevTestTC", self.passphrase)
with self.assertRaises(GLib.GError):
BlockDev.crypto_luks_open(self.tc_dev, "libblockdevTestTC", None)
with self.assertRaises(GLib.GError):
BlockDev.crypto_luks_open(self.tc_dev, "libblockdevTestTC", "wrong-passhprase")
succ = BlockDev.crypto_tc_open(self.tc_dev, "libblockdevTestTC", self.passphrase)
self.assertTrue(succ)
self.assertTrue(os.path.exists("/dev/mapper/libblockdevTestTC"))
succ = BlockDev.crypto_tc_close("libblockdevTestTC")
self.assertTrue(succ)
self.assertFalse(os.path.exists("/dev/mapper/libblockdevTestTC"))
@tag_test(TestTags.NOSTORAGE)
def test_veracrypt_open_close(self):
"""Verify that opening/closing VeraCrypt device works"""
with self.assertRaises(GLib.GError):
BlockDev.crypto_luks_open("/non/existing/device", "libblockdevTestTC", self.passphrase)
with self.assertRaises(GLib.GError):
BlockDev.crypto_luks_open(self.vc_dev, "libblockdevTestTC", None)
with self.assertRaises(GLib.GError):
BlockDev.crypto_luks_open(self.vc_dev, "libblockdevTestTC", "wrong-passhprase")
succ = BlockDev.crypto_tc_open(self.vc_dev, "libblockdevTestTC", self.passphrase, veracrypt=True)
self.assertTrue(succ)
self.assertTrue(os.path.exists("/dev/mapper/libblockdevTestTC"))
succ = BlockDev.crypto_tc_close("libblockdevTestTC")
self.assertTrue(succ)
self.assertFalse(os.path.exists("/dev/mapper/libblockdevTestTC"))
class CryptoTestBitlk(CryptoTestCase):
# we can't create BitLocker formats using libblockdev
# so we are using these images from cryptsetup test suite
# https://gitlab.com/cryptsetup/cryptsetup/blob/master/tests/bitlk-images.tar.xz
bitlk_img = "bitlk-aes-xts-128.img"
passphrase = "anaconda"
tempdir = None
@classmethod
def setUpClass(cls):
super(CryptoTestBitlk, cls).setUpClass()
cls.tempdir = tempfile.mkdtemp(prefix="bd_test_bitlk")
images = os.path.join(os.path.dirname(__file__), "bitlk-images.tar.gz")
with tarfile.open(images, "r") as tar:
tar.extractall(cls.tempdir)
@classmethod
def tearDownClass(cls):
super(CryptoTestBitlk, cls).tearDownClass()
shutil.rmtree(cls.tempdir)
def setUp(self):
self.addCleanup(self._clean_up)
succ, loop = BlockDev.loop_setup(os.path.join(self.tempdir, self.bitlk_img))
if not succ:
raise RuntimeError("Failed to setup loop device for testing")
self.bitlk_dev = "/dev/%s" % loop
def _clean_up(self):
try:
BlockDev.crypto_bitlk_close("libblockdevTestBitlk")
except:
pass
succ = BlockDev.loop_teardown(self.bitlk_dev)
if not succ:
raise RuntimeError("Failed to tear down loop device used for testing")
@unittest.skipUnless(HAVE_BITLK, "BITLK not supported")
def test_bitlk_open_close(self):
"""Verify that opening/closing a BitLocker device works"""
with self.assertRaises(GLib.GError):
BlockDev.crypto_luks_open("/non/existing/device", "libblockdevTestBitlk", self.passphrase)
with self.assertRaises(GLib.GError):
BlockDev.crypto_luks_open(self.bitlk_dev, "libblockdevTestBitlk", None)
with self.assertRaises(GLib.GError):
BlockDev.crypto_luks_open(self.bitlk_dev, "libblockdevTestBitlk", "wrong-passhprase")
succ = BlockDev.crypto_bitlk_open(self.bitlk_dev, "libblockdevTestBitlk", self.passphrase)
self.assertTrue(succ)
self.assertTrue(os.path.exists("/dev/mapper/libblockdevTestBitlk"))
succ = BlockDev.crypto_bitlk_close("libblockdevTestBitlk")
self.assertTrue(succ)
self.assertFalse(os.path.exists("/dev/mapper/libblockdevTestBitlk"))