|
Packit |
a20ca0 |
#
|
|
Packit |
a20ca0 |
# Runtime tests for the individual assemblers.
|
|
Packit |
a20ca0 |
#
|
|
Packit |
a20ca0 |
|
|
Packit |
a20ca0 |
import contextlib
|
|
Packit |
a20ca0 |
import errno
|
|
Packit |
a20ca0 |
import hashlib
|
|
Packit |
a20ca0 |
import json
|
|
Packit |
a20ca0 |
import os
|
|
Packit |
a20ca0 |
import subprocess
|
|
Packit |
a20ca0 |
import tempfile
|
|
Packit |
a20ca0 |
import unittest
|
|
Packit |
a20ca0 |
|
|
Packit |
a20ca0 |
from osbuild import loop
|
|
Packit |
a20ca0 |
from .. import test
|
|
Packit |
a20ca0 |
|
|
Packit Service |
2d981f |
MEBIBYTE = 1024 * 1024
|
|
Packit Service |
2d981f |
|
|
Packit |
a20ca0 |
|
|
Packit |
a20ca0 |
@unittest.skipUnless(test.TestBase.have_test_data(), "no test-data access")
|
|
Packit |
a20ca0 |
class TestAssemblers(test.TestBase):
|
|
Packit |
a20ca0 |
@classmethod
|
|
Packit |
a20ca0 |
def setUpClass(cls):
|
|
Packit |
a20ca0 |
super().setUpClass()
|
|
Packit |
a20ca0 |
|
|
Packit |
a20ca0 |
def setUp(self):
|
|
Packit |
a20ca0 |
self.osbuild = test.OSBuild(self)
|
|
Packit |
a20ca0 |
|
|
Packit |
a20ca0 |
@contextlib.contextmanager
|
|
Packit Service |
2d981f |
def run_assembler(self, osb, name, options, output_path):
|
|
Packit Service |
2d981f |
with open(os.path.join(self.locate_test_data(),
|
|
Packit Service |
2d981f |
"manifests/filesystem.json")) as f:
|
|
Packit Service |
2d981f |
manifest = json.load(f)
|
|
Packit Service |
2d981f |
manifest["pipeline"] = dict(
|
|
Packit Service |
2d981f |
manifest["pipeline"],
|
|
Packit Service |
2d981f |
assembler={"name": name, "options": options}
|
|
Packit Service |
2d981f |
)
|
|
Packit Service |
2d981f |
data = json.dumps(manifest)
|
|
Packit Service |
2d981f |
|
|
Packit Service |
2d981f |
treeid = osb.treeid_from_manifest(data)
|
|
Packit Service |
2d981f |
assert treeid
|
|
Packit Service |
2d981f |
|
|
Packit Service |
2d981f |
with tempfile.TemporaryDirectory(dir="/var/tmp") as output_dir:
|
|
Packit Service |
2d981f |
osb.compile(data, output_dir=output_dir, checkpoints=[treeid])
|
|
Packit Service |
2d981f |
with osb.map_object(treeid) as tree:
|
|
Packit Service |
2d981f |
yield tree, os.path.join(output_dir, output_path)
|
|
Packit |
a20ca0 |
|
|
Packit |
a20ca0 |
def assertImageFile(self, filename, fmt, expected_size=None):
|
|
Packit |
a20ca0 |
info = json.loads(subprocess.check_output(["qemu-img", "info", "--output", "json", filename]))
|
|
Packit |
a20ca0 |
self.assertEqual(info["format"], fmt)
|
|
Packit |
a20ca0 |
self.assertEqual(info["virtual-size"], expected_size)
|
|
Packit |
a20ca0 |
|
|
Packit |
a20ca0 |
def assertFilesystem(self, device, uuid, fstype, tree):
|
|
Packit |
a20ca0 |
output = subprocess.check_output(["blkid", "--output", "export", device], encoding="utf-8")
|
|
Packit |
a20ca0 |
blkid = dict(line.split("=") for line in output.strip().split("\n"))
|
|
Packit |
a20ca0 |
self.assertEqual(blkid["UUID"], uuid)
|
|
Packit |
a20ca0 |
self.assertEqual(blkid["TYPE"], fstype)
|
|
Packit |
a20ca0 |
|
|
Packit |
a20ca0 |
with mount(device) as target_tree:
|
|
Packit |
a20ca0 |
diff = self.tree_diff(tree, target_tree)
|
|
Packit Service |
2d981f |
if fstype == 'ext4':
|
|
Packit Service |
2d981f |
added_files = ["/lost+found"]
|
|
Packit Service |
2d981f |
else:
|
|
Packit Service |
2d981f |
added_files = []
|
|
Packit Service |
2d981f |
self.assertEqual(diff["added_files"], added_files)
|
|
Packit |
a20ca0 |
self.assertEqual(diff["deleted_files"], [])
|
|
Packit |
a20ca0 |
self.assertEqual(diff["differences"], {})
|
|
Packit |
a20ca0 |
|
|
Packit |
a20ca0 |
def assertGRUB2(self, device, l1hash, l2hash, size):
|
|
Packit |
a20ca0 |
m1 = hashlib.sha256()
|
|
Packit |
a20ca0 |
m2 = hashlib.sha256()
|
|
Packit |
a20ca0 |
with open(device, "rb") as d:
|
|
Packit |
a20ca0 |
sectors = d.read(size)
|
|
Packit |
a20ca0 |
self.assertEqual(len(sectors), size)
|
|
Packit |
a20ca0 |
m1.update(sectors[:440])
|
|
Packit |
a20ca0 |
m2.update(sectors[512:size])
|
|
Packit |
a20ca0 |
self.assertEqual(m1.hexdigest(), l1hash)
|
|
Packit |
a20ca0 |
self.assertEqual(m2.hexdigest(), l2hash)
|
|
Packit |
a20ca0 |
|
|
Packit |
a20ca0 |
def assertPartitionTable(self, ptable, label, uuid, n_partitions, boot_partition=None):
|
|
Packit |
a20ca0 |
self.assertEqual(ptable["label"], label)
|
|
Packit |
a20ca0 |
self.assertEqual(ptable["id"][2:], uuid[:8])
|
|
Packit |
a20ca0 |
self.assertEqual(len(ptable["partitions"]), n_partitions)
|
|
Packit |
a20ca0 |
|
|
Packit |
a20ca0 |
if boot_partition:
|
|
Packit |
a20ca0 |
bootable = [p.get("bootable", False) for p in ptable["partitions"]]
|
|
Packit |
a20ca0 |
self.assertEqual(bootable.count(True), 1)
|
|
Packit |
a20ca0 |
self.assertEqual(bootable.index(True) + 1, boot_partition)
|
|
Packit |
a20ca0 |
|
|
Packit |
a20ca0 |
def read_partition_table(self, device):
|
|
Packit |
a20ca0 |
sfdisk = json.loads(subprocess.check_output(["sfdisk", "--json", device]))
|
|
Packit |
a20ca0 |
ptable = sfdisk["partitiontable"]
|
|
Packit |
a20ca0 |
self.assertIsNotNone(ptable)
|
|
Packit |
a20ca0 |
return ptable
|
|
Packit |
a20ca0 |
|
|
Packit |
a20ca0 |
@unittest.skipUnless(test.TestBase.have_tree_diff(), "tree-diff missing")
|
|
Packit |
a20ca0 |
def test_rawfs(self):
|
|
Packit Service |
2d981f |
for fs_type in ["ext4", "xfs", "btrfs"]:
|
|
Packit Service |
2d981f |
with self.subTest(fs_type=fs_type):
|
|
Packit Service |
2d981f |
print(f" {fs_type}", flush=True)
|
|
Packit Service |
2d981f |
options = {
|
|
Packit Service |
2d981f |
"filename": "image.raw",
|
|
Packit Service |
2d981f |
"root_fs_uuid": "016a1cda-5182-4ab3-bf97-426b00b74eb0",
|
|
Packit Service |
2d981f |
"size": 1024 * MEBIBYTE,
|
|
Packit Service |
2d981f |
"fs_type": fs_type,
|
|
Packit Service |
2d981f |
}
|
|
Packit Service |
2d981f |
with self.osbuild as osb:
|
|
Packit Service |
2d981f |
with self.run_assembler(osb, "org.osbuild.rawfs", options, "image.raw") as (tree, image):
|
|
Packit Service |
2d981f |
self.assertImageFile(image, "raw", options["size"])
|
|
Packit Service |
2d981f |
self.assertFilesystem(image, options["root_fs_uuid"], fs_type, tree)
|
|
Packit |
a20ca0 |
|
|
Packit |
a20ca0 |
@unittest.skipUnless(test.TestBase.have_tree_diff(), "tree-diff missing")
|
|
Packit |
a20ca0 |
def test_ostree(self):
|
|
Packit |
a20ca0 |
with self.osbuild as osb:
|
|
Packit |
a20ca0 |
with open(os.path.join(self.locate_test_data(),
|
|
Packit |
a20ca0 |
"manifests/fedora-ostree-commit.json")) as f:
|
|
Packit |
a20ca0 |
manifest = json.load(f)
|
|
Packit |
a20ca0 |
|
|
Packit |
a20ca0 |
data = json.dumps(manifest)
|
|
Packit Service |
2d981f |
with tempfile.TemporaryDirectory(dir="/var/tmp") as output_dir:
|
|
Packit Service |
2d981f |
result = osb.compile(data, output_dir=output_dir)
|
|
Packit Service |
2d981f |
compose_file = os.path.join(output_dir, "compose.json")
|
|
Packit Service |
2d981f |
repo = os.path.join(output_dir, "repo")
|
|
Packit Service |
2d981f |
|
|
Packit Service |
2d981f |
with open(compose_file) as f:
|
|
Packit |
a20ca0 |
compose = json.load(f)
|
|
Packit Service |
2d981f |
commit_id = compose["ostree-commit"]
|
|
Packit Service |
2d981f |
ref = compose["ref"]
|
|
Packit Service |
2d981f |
rpmostree_inputhash = compose["rpm-ostree-inputhash"]
|
|
Packit Service |
2d981f |
os_version = compose["ostree-version"]
|
|
Packit Service |
2d981f |
assert commit_id
|
|
Packit Service |
2d981f |
assert ref
|
|
Packit Service |
2d981f |
assert rpmostree_inputhash
|
|
Packit Service |
2d981f |
assert os_version
|
|
Packit Service |
2d981f |
self.assertIn("metadata", result["assembler"])
|
|
Packit Service |
2d981f |
metadata = result["assembler"]["metadata"]
|
|
Packit Service |
2d981f |
self.assertIn("compose", metadata)
|
|
Packit Service |
2d981f |
self.assertEqual(compose, metadata["compose"])
|
|
Packit Service |
2d981f |
|
|
Packit |
a20ca0 |
md = subprocess.check_output(
|
|
Packit |
a20ca0 |
[
|
|
Packit |
a20ca0 |
"ostree",
|
|
Packit |
a20ca0 |
"show",
|
|
Packit |
a20ca0 |
"--repo", repo,
|
|
Packit |
a20ca0 |
"--print-metadata-key=rpmostree.inputhash",
|
|
Packit |
a20ca0 |
commit_id
|
|
Packit |
a20ca0 |
], encoding="utf-8").strip()
|
|
Packit Service |
2d981f |
self.assertEqual(md, f"'{rpmostree_inputhash}'")
|
|
Packit Service |
2d981f |
|
|
Packit Service |
2d981f |
md = subprocess.check_output(
|
|
Packit Service |
2d981f |
[
|
|
Packit Service |
2d981f |
"ostree",
|
|
Packit Service |
2d981f |
"show",
|
|
Packit Service |
2d981f |
"--repo", repo,
|
|
Packit Service |
2d981f |
"--print-metadata-key=version",
|
|
Packit Service |
2d981f |
commit_id
|
|
Packit Service |
2d981f |
], encoding="utf-8").strip()
|
|
Packit Service |
2d981f |
self.assertEqual(md, f"'{os_version}'")
|
|
Packit |
a20ca0 |
|
|
Packit |
a20ca0 |
@unittest.skipUnless(test.TestBase.have_tree_diff(), "tree-diff missing")
|
|
Packit |
a20ca0 |
def test_qemu(self):
|
|
Packit |
a20ca0 |
loctl = loop.LoopControl()
|
|
Packit Service |
2d981f |
with self.osbuild as osb:
|
|
Packit Service |
2d981f |
for fmt in ["raw", "raw.xz", "qcow2", "vmdk", "vdi"]:
|
|
Packit Service |
2d981f |
for fs_type in ["ext4", "xfs", "btrfs"]:
|
|
Packit Service |
2d981f |
with self.subTest(fmt=fmt, fs_type=fs_type):
|
|
Packit Service |
2d981f |
print(f" {fmt} {fs_type}", flush=True)
|
|
Packit Service |
2d981f |
options = {
|
|
Packit Service |
2d981f |
"format": fmt,
|
|
Packit Service |
2d981f |
"filename": f"image.{fmt}",
|
|
Packit Service |
2d981f |
"ptuuid": "b2c09a39-db93-44c5-846a-81e06b1dc162",
|
|
Packit Service |
2d981f |
"root_fs_uuid": "aff010e9-df95-4f81-be6b-e22317251033",
|
|
Packit Service |
2d981f |
"size": 1024 * MEBIBYTE,
|
|
Packit Service |
2d981f |
"root_fs_type": fs_type,
|
|
Packit Service |
2d981f |
}
|
|
Packit Service |
2d981f |
with self.run_assembler(osb,
|
|
Packit Service |
2d981f |
"org.osbuild.qemu",
|
|
Packit Service |
2d981f |
options,
|
|
Packit Service |
2d981f |
f"image.{fmt}") as (tree, image):
|
|
Packit Service |
2d981f |
if fmt == "raw.xz":
|
|
Packit Service |
2d981f |
subprocess.run(["unxz", "--keep", "--force", image], check=True)
|
|
Packit Service |
2d981f |
image = image[:-3]
|
|
Packit Service |
2d981f |
fmt = "raw"
|
|
Packit Service |
2d981f |
self.assertImageFile(image, fmt, options["size"])
|
|
Packit Service |
2d981f |
with open_image(loctl, image, fmt) as (target, device):
|
|
Packit Service |
2d981f |
ptable = self.read_partition_table(device)
|
|
Packit Service |
2d981f |
self.assertPartitionTable(ptable,
|
|
Packit Service |
2d981f |
"dos",
|
|
Packit Service |
2d981f |
options["ptuuid"],
|
|
Packit Service |
2d981f |
1,
|
|
Packit Service |
2d981f |
boot_partition=1)
|
|
Packit Service |
2d981f |
if fs_type == "btrfs":
|
|
Packit Service |
2d981f |
l2hash = "daa74a424a41e2a13c6c4f6bada0e80d84a9865b12d3369470fc5e74004ed329"
|
|
Packit Service |
2d981f |
elif fs_type == "xfs":
|
|
Packit Service |
2d981f |
l2hash = "58ebc5a9b594607f49c290572e027c353a6359da83099020e6f3b9b1f22a897a"
|
|
Packit Service |
2d981f |
else:
|
|
Packit Service |
2d981f |
l2hash = "9b31c8fbc59602a38582988bf91c3948ae9c6f2a231ab505ea63a7005e302147"
|
|
Packit Service |
2d981f |
self.assertGRUB2(device,
|
|
Packit Service |
2d981f |
"26e3327c6b5ac9b5e21d8b86f19ff7cb4d12fb2d0406713f936997d9d89de3ee",
|
|
Packit Service |
2d981f |
l2hash,
|
|
Packit Service |
2d981f |
1024 * 1024)
|
|
Packit Service |
2d981f |
|
|
Packit Service |
2d981f |
p1 = ptable["partitions"][0]
|
|
Packit Service |
2d981f |
ssize = ptable.get("sectorsize", 512)
|
|
Packit Service |
2d981f |
start, size = p1["start"] * ssize, p1["size"] * ssize
|
|
Packit Service |
2d981f |
with loop_open(loctl, target, offset=start, size=size) as dev:
|
|
Packit Service |
2d981f |
self.assertFilesystem(dev, options["root_fs_uuid"], fs_type, tree)
|
|
Packit |
a20ca0 |
|
|
Packit |
a20ca0 |
def test_tar(self):
|
|
Packit |
a20ca0 |
cases = [
|
|
Packit |
a20ca0 |
("tree.tar.gz", None, ["application/x-tar"]),
|
|
Packit |
a20ca0 |
("tree.tar.gz", "gzip", ["application/x-gzip", "application/gzip"])
|
|
Packit |
a20ca0 |
]
|
|
Packit Service |
2d981f |
with self.osbuild as osb:
|
|
Packit Service |
2d981f |
for filename, compression, expected_mimetypes in cases:
|
|
Packit Service |
2d981f |
options = {"filename": filename}
|
|
Packit Service |
2d981f |
if compression:
|
|
Packit Service |
2d981f |
options["compression"] = compression
|
|
Packit Service |
2d981f |
with self.run_assembler(osb,
|
|
Packit Service |
2d981f |
"org.osbuild.tar",
|
|
Packit Service |
2d981f |
options,
|
|
Packit Service |
2d981f |
filename) as (_, image):
|
|
Packit Service |
2d981f |
output = subprocess.check_output(["file", "--mime-type", image], encoding="utf-8")
|
|
Packit Service |
2d981f |
_, mimetype = output.strip().split(": ") # "filename: mimetype"
|
|
Packit Service |
2d981f |
self.assertIn(mimetype, expected_mimetypes)
|
|
Packit |
a20ca0 |
|
|
Packit |
a20ca0 |
|
|
Packit |
a20ca0 |
@contextlib.contextmanager
|
|
Packit |
a20ca0 |
def loop_create_device(ctl, fd, offset=None, sizelimit=None):
|
|
Packit |
a20ca0 |
while True:
|
|
Packit |
a20ca0 |
lo = loop.Loop(ctl.get_unbound())
|
|
Packit |
a20ca0 |
try:
|
|
Packit |
a20ca0 |
lo.set_fd(fd)
|
|
Packit |
a20ca0 |
except OSError as e:
|
|
Packit |
a20ca0 |
lo.close()
|
|
Packit |
a20ca0 |
if e.errno == errno.EBUSY:
|
|
Packit |
a20ca0 |
continue
|
|
Packit |
a20ca0 |
raise e
|
|
Packit |
a20ca0 |
try:
|
|
Packit |
a20ca0 |
lo.set_status(offset=offset, sizelimit=sizelimit, autoclear=True)
|
|
Packit |
a20ca0 |
except BlockingIOError:
|
|
Packit |
a20ca0 |
lo.clear_fd()
|
|
Packit |
a20ca0 |
lo.close()
|
|
Packit |
a20ca0 |
continue
|
|
Packit |
a20ca0 |
break
|
|
Packit |
a20ca0 |
try:
|
|
Packit |
a20ca0 |
yield lo
|
|
Packit |
a20ca0 |
finally:
|
|
Packit |
a20ca0 |
lo.close()
|
|
Packit |
a20ca0 |
|
|
Packit |
a20ca0 |
|
|
Packit |
a20ca0 |
@contextlib.contextmanager
|
|
Packit |
a20ca0 |
def loop_open(ctl, image, *, offset=None, size=None):
|
|
Packit |
a20ca0 |
with open(image, "rb") as f:
|
|
Packit |
a20ca0 |
fd = f.fileno()
|
|
Packit |
a20ca0 |
with loop_create_device(ctl, fd, offset=offset, sizelimit=size) as lo:
|
|
Packit |
a20ca0 |
yield os.path.join("/dev", lo.devname)
|
|
Packit |
a20ca0 |
|
|
Packit |
a20ca0 |
|
|
Packit |
a20ca0 |
@contextlib.contextmanager
|
|
Packit |
a20ca0 |
def mount(device):
|
|
Packit |
a20ca0 |
with tempfile.TemporaryDirectory() as mountpoint:
|
|
Packit |
a20ca0 |
subprocess.run(["mount", "-o", "ro", device, mountpoint], check=True)
|
|
Packit |
a20ca0 |
try:
|
|
Packit |
a20ca0 |
yield mountpoint
|
|
Packit |
a20ca0 |
finally:
|
|
Packit |
a20ca0 |
subprocess.run(["umount", "--lazy", mountpoint], check=True)
|
|
Packit |
a20ca0 |
|
|
Packit |
a20ca0 |
|
|
Packit |
a20ca0 |
@contextlib.contextmanager
|
|
Packit |
a20ca0 |
def open_image(ctl, image, fmt):
|
|
Packit |
a20ca0 |
with tempfile.TemporaryDirectory() as tmp:
|
|
Packit |
a20ca0 |
if fmt != "raw":
|
|
Packit |
a20ca0 |
target = os.path.join(tmp, "image.raw")
|
|
Packit |
a20ca0 |
subprocess.run(["qemu-img", "convert", "-O", "raw", image, target],
|
|
Packit |
a20ca0 |
check=True)
|
|
Packit |
a20ca0 |
else:
|
|
Packit |
a20ca0 |
target = image
|
|
Packit |
a20ca0 |
|
|
Packit |
a20ca0 |
size = os.stat(target).st_size
|
|
Packit |
a20ca0 |
|
|
Packit |
a20ca0 |
with loop_open(ctl, target, offset=0, size=size) as dev:
|
|
Packit |
a20ca0 |
yield target, dev
|