#
# Runtime tests for the individual assemblers.
#
import contextlib
import errno
import hashlib
import json
import os
import subprocess
import tempfile
import unittest
from osbuild import loop
from .. import test
MEBIBYTE = 1024 * 1024
@unittest.skipUnless(test.TestBase.have_test_data(), "no test-data access")
class TestAssemblers(test.TestBase):
@classmethod
def setUpClass(cls):
super().setUpClass()
def setUp(self):
self.osbuild = test.OSBuild(self)
@contextlib.contextmanager
def run_assembler(self, osb, name, options, output_path):
with open(os.path.join(self.locate_test_data(),
"manifests/filesystem.json")) as f:
manifest = json.load(f)
manifest["pipeline"] = dict(
manifest["pipeline"],
assembler={"name": name, "options": options}
)
data = json.dumps(manifest)
treeid = osb.treeid_from_manifest(data)
assert treeid
with tempfile.TemporaryDirectory(dir="/var/tmp") as output_dir:
osb.compile(data, output_dir=output_dir, checkpoints=[treeid])
with osb.map_object(treeid) as tree:
yield tree, os.path.join(output_dir, output_path)
def assertImageFile(self, filename, fmt, expected_size=None):
info = json.loads(subprocess.check_output(["qemu-img", "info", "--output", "json", filename]))
self.assertEqual(info["format"], fmt)
self.assertEqual(info["virtual-size"], expected_size)
def assertFilesystem(self, device, uuid, fstype, tree):
output = subprocess.check_output(["blkid", "--output", "export", device], encoding="utf-8")
blkid = dict(line.split("=") for line in output.strip().split("\n"))
self.assertEqual(blkid["UUID"], uuid)
self.assertEqual(blkid["TYPE"], fstype)
with mount(device) as target_tree:
diff = self.tree_diff(tree, target_tree)
if fstype == 'ext4':
added_files = ["/lost+found"]
else:
added_files = []
self.assertEqual(diff["added_files"], added_files)
self.assertEqual(diff["deleted_files"], [])
self.assertEqual(diff["differences"], {})
def assertGRUB2(self, device, l1hash, l2hash, size):
m1 = hashlib.sha256()
m2 = hashlib.sha256()
with open(device, "rb") as d:
sectors = d.read(size)
self.assertEqual(len(sectors), size)
m1.update(sectors[:440])
m2.update(sectors[512:size])
self.assertEqual(m1.hexdigest(), l1hash)
self.assertEqual(m2.hexdigest(), l2hash)
def assertPartitionTable(self, ptable, label, uuid, n_partitions, boot_partition=None):
self.assertEqual(ptable["label"], label)
self.assertEqual(ptable["id"][2:], uuid[:8])
self.assertEqual(len(ptable["partitions"]), n_partitions)
if boot_partition:
bootable = [p.get("bootable", False) for p in ptable["partitions"]]
self.assertEqual(bootable.count(True), 1)
self.assertEqual(bootable.index(True) + 1, boot_partition)
def read_partition_table(self, device):
sfdisk = json.loads(subprocess.check_output(["sfdisk", "--json", device]))
ptable = sfdisk["partitiontable"]
self.assertIsNotNone(ptable)
return ptable
@unittest.skipUnless(test.TestBase.have_tree_diff(), "tree-diff missing")
def test_rawfs(self):
for fs_type in ["ext4", "xfs", "btrfs"]:
with self.subTest(fs_type=fs_type):
print(f" {fs_type}", flush=True)
options = {
"filename": "image.raw",
"root_fs_uuid": "016a1cda-5182-4ab3-bf97-426b00b74eb0",
"size": 1024 * MEBIBYTE,
"fs_type": fs_type,
}
with self.osbuild as osb:
with self.run_assembler(osb, "org.osbuild.rawfs", options, "image.raw") as (tree, image):
self.assertImageFile(image, "raw", options["size"])
self.assertFilesystem(image, options["root_fs_uuid"], fs_type, tree)
@unittest.skipUnless(test.TestBase.have_tree_diff(), "tree-diff missing")
def test_ostree(self):
with self.osbuild as osb:
with open(os.path.join(self.locate_test_data(),
"manifests/fedora-ostree-commit.json")) as f:
manifest = json.load(f)
data = json.dumps(manifest)
with tempfile.TemporaryDirectory(dir="/var/tmp") as output_dir:
result = osb.compile(data, output_dir=output_dir)
compose_file = os.path.join(output_dir, "compose.json")
repo = os.path.join(output_dir, "repo")
with open(compose_file) as f:
compose = json.load(f)
commit_id = compose["ostree-commit"]
ref = compose["ref"]
rpmostree_inputhash = compose["rpm-ostree-inputhash"]
os_version = compose["ostree-version"]
assert commit_id
assert ref
assert rpmostree_inputhash
assert os_version
self.assertIn("metadata", result["assembler"])
metadata = result["assembler"]["metadata"]
self.assertIn("compose", metadata)
self.assertEqual(compose, metadata["compose"])
md = subprocess.check_output(
[
"ostree",
"show",
"--repo", repo,
"--print-metadata-key=rpmostree.inputhash",
commit_id
], encoding="utf-8").strip()
self.assertEqual(md, f"'{rpmostree_inputhash}'")
md = subprocess.check_output(
[
"ostree",
"show",
"--repo", repo,
"--print-metadata-key=version",
commit_id
], encoding="utf-8").strip()
self.assertEqual(md, f"'{os_version}'")
@unittest.skipUnless(test.TestBase.have_tree_diff(), "tree-diff missing")
def test_qemu(self):
loctl = loop.LoopControl()
with self.osbuild as osb:
for fmt in ["raw", "raw.xz", "qcow2", "vmdk", "vdi"]:
for fs_type in ["ext4", "xfs", "btrfs"]:
with self.subTest(fmt=fmt, fs_type=fs_type):
print(f" {fmt} {fs_type}", flush=True)
options = {
"format": fmt,
"filename": f"image.{fmt}",
"ptuuid": "b2c09a39-db93-44c5-846a-81e06b1dc162",
"root_fs_uuid": "aff010e9-df95-4f81-be6b-e22317251033",
"size": 1024 * MEBIBYTE,
"root_fs_type": fs_type,
}
with self.run_assembler(osb,
"org.osbuild.qemu",
options,
f"image.{fmt}") as (tree, image):
if fmt == "raw.xz":
subprocess.run(["unxz", "--keep", "--force", image], check=True)
image = image[:-3]
fmt = "raw"
self.assertImageFile(image, fmt, options["size"])
with open_image(loctl, image, fmt) as (target, device):
ptable = self.read_partition_table(device)
self.assertPartitionTable(ptable,
"dos",
options["ptuuid"],
1,
boot_partition=1)
if fs_type == "btrfs":
l2hash = "daa74a424a41e2a13c6c4f6bada0e80d84a9865b12d3369470fc5e74004ed329"
elif fs_type == "xfs":
l2hash = "58ebc5a9b594607f49c290572e027c353a6359da83099020e6f3b9b1f22a897a"
else:
l2hash = "9b31c8fbc59602a38582988bf91c3948ae9c6f2a231ab505ea63a7005e302147"
self.assertGRUB2(device,
"26e3327c6b5ac9b5e21d8b86f19ff7cb4d12fb2d0406713f936997d9d89de3ee",
l2hash,
1024 * 1024)
p1 = ptable["partitions"][0]
ssize = ptable.get("sectorsize", 512)
start, size = p1["start"] * ssize, p1["size"] * ssize
with loop_open(loctl, target, offset=start, size=size) as dev:
self.assertFilesystem(dev, options["root_fs_uuid"], fs_type, tree)
def test_tar(self):
cases = [
("tree.tar.gz", None, ["application/x-tar"]),
("tree.tar.gz", "gzip", ["application/x-gzip", "application/gzip"])
]
with self.osbuild as osb:
for filename, compression, expected_mimetypes in cases:
options = {"filename": filename}
if compression:
options["compression"] = compression
with self.run_assembler(osb,
"org.osbuild.tar",
options,
filename) as (_, image):
output = subprocess.check_output(["file", "--mime-type", image], encoding="utf-8")
_, mimetype = output.strip().split(": ") # "filename: mimetype"
self.assertIn(mimetype, expected_mimetypes)
@contextlib.contextmanager
def loop_create_device(ctl, fd, offset=None, sizelimit=None):
while True:
lo = loop.Loop(ctl.get_unbound())
try:
lo.set_fd(fd)
except OSError as e:
lo.close()
if e.errno == errno.EBUSY:
continue
raise e
try:
lo.set_status(offset=offset, sizelimit=sizelimit, autoclear=True)
except BlockingIOError:
lo.clear_fd()
lo.close()
continue
break
try:
yield lo
finally:
lo.close()
@contextlib.contextmanager
def loop_open(ctl, image, *, offset=None, size=None):
with open(image, "rb") as f:
fd = f.fileno()
with loop_create_device(ctl, fd, offset=offset, sizelimit=size) as lo:
yield os.path.join("/dev", lo.devname)
@contextlib.contextmanager
def mount(device):
with tempfile.TemporaryDirectory() as mountpoint:
subprocess.run(["mount", "-o", "ro", device, mountpoint], check=True)
try:
yield mountpoint
finally:
subprocess.run(["umount", "--lazy", mountpoint], check=True)
@contextlib.contextmanager
def open_image(ctl, image, fmt):
with tempfile.TemporaryDirectory() as tmp:
if fmt != "raw":
target = os.path.join(tmp, "image.raw")
subprocess.run(["qemu-img", "convert", "-O", "raw", image, target],
check=True)
else:
target = image
size = os.stat(target).st_size
with loop_open(ctl, target, offset=0, size=size) as dev:
yield target, dev