#
# Runtime tests for the individual stages.
#
import difflib
import glob
import json
import os
import pprint
import shutil
import tempfile
import unittest
from typing import Dict
from osbuild.util import selinux
from .. import initrd
from .. import test
def find_stage(result, stageid):
build = result.get("build")
if build:
stage = find_stage(build, stageid)
if stage:
return stage
for stage in result.get("stages", []):
if stage["id"] == stageid:
return stage
return None
@unittest.skipUnless(test.TestBase.have_test_data(), "no test-data access")
@unittest.skipUnless(test.TestBase.have_tree_diff(), "tree-diff missing")
@unittest.skipUnless(test.TestBase.can_bind_mount(), "root-only")
class TestStages(test.TestBase):
def assertTreeDiffsEqual(self, tree_diff1, tree_diff2):
"""
Asserts two tree diffs for equality.
Before assertion, the two trees are sorted, therefore order of files
doesn't matter.
There's a special rule for asserting differences where we don't
know the exact before/after value. This is useful for example if
the content of file is dependent on current datetime. You can use this
feature by putting null value in difference you don't care about.
Example:
"/etc/shadow": {content: ["sha256:xxx", null]}
In this case the after content of /etc/shadow doesn't matter.
The only thing that matters is the before content and that
the content modification happened.
"""
def _sorted_tree(tree):
sorted_tree = json.loads(json.dumps(tree, sort_keys=True))
sorted_tree["added_files"] = sorted(sorted_tree["added_files"])
sorted_tree["deleted_files"] = sorted(sorted_tree["deleted_files"])
return sorted_tree
tree_diff1 = _sorted_tree(tree_diff1)
tree_diff2 = _sorted_tree(tree_diff2)
def raise_assertion(msg):
diff = '\n'.join(
difflib.ndiff(
pprint.pformat(tree_diff1).splitlines(),
pprint.pformat(tree_diff2).splitlines(),
)
)
raise AssertionError(f"{msg}\n\n{diff}")
self.assertEqual(tree_diff1['added_files'], tree_diff2['added_files'])
self.assertEqual(tree_diff1['deleted_files'], tree_diff2['deleted_files'])
if len(tree_diff1['differences']) != len(tree_diff2['differences']):
raise_assertion('length of differences different')
for (file1, differences1), (file2, differences2) in \
zip(tree_diff1['differences'].items(), tree_diff2['differences'].items()):
if file1 != file2:
raise_assertion(f"filename different: {file1}, {file2}")
if len(differences1) != len(differences2):
raise_assertion("length of file differences different")
for (difference1_kind, difference1_values), (difference2_kind, difference2_values) in \
zip(differences1.items(), differences2.items()):
if difference1_kind != difference2_kind:
raise_assertion(f"different difference kinds: {difference1_kind}, {difference2_kind}")
if difference1_values[0] is not None \
and difference2_values[0] is not None \
and difference1_values[0] != difference2_values[0]:
raise_assertion(f"before values are different: {difference1_values[0]}, {difference2_values[0]}")
if difference1_values[1] is not None \
and difference2_values[1] is not None \
and difference1_values[1] != difference2_values[1]:
raise_assertion(f"after values are different: {difference1_values[1]}, {difference2_values[1]}")
def assertMetadata(self, metadata: Dict, result: Dict):
"""Assert all of `metadata` is found in `result`.
Metadata must be a dictionary with stage ids as keys and
the metadata as values. For each of those stage, metadata
pairs the corresponding stage is looked up in the result
and its metadata compared with the one given in metadata.
"""
for stageid, want in metadata.items():
stage = find_stage(result, stageid)
if stage is None:
js = json.dumps(result, indent=2)
self.fail(f"stage {stageid} not found in results:\n{js}\n")
have = stage["metadata"]
if have != want:
diff = difflib.ndiff(pprint.pformat(have).splitlines(),
pprint.pformat(want).splitlines())
txt = "\n".join(diff)
self.fail(f"metadata for {stageid} differs:\n{txt}")
@classmethod
def setUpClass(cls):
cls.store = os.getenv("OSBUILD_TEST_STORE")
if not cls.store:
cls.store = tempfile.mkdtemp(prefix="osbuild-test-", dir="/var/tmp")
@classmethod
def tearDownClass(cls):
if not os.getenv("OSBUILD_TEST_STORE"):
shutil.rmtree(cls.store)
def setUp(self):
self.osbuild = test.OSBuild(self, cache_from=self.store)
def run_stage_diff_test(self, test_dir: str):
with self.osbuild as osb:
def run(path):
checkpoints = []
context = None
with open(path, "r") as f:
data = f.read()
tree = osb.treeid_from_manifest(data)
if tree:
checkpoints += [tree]
context = osb.map_object(tree)
result = osb.compile(data, checkpoints=checkpoints)
return context, result
ctx_a, _ = run(f"{test_dir}/a.json")
ctx_b, res = run(f"{test_dir}/b.json")
ctx_a = ctx_a or tempfile.TemporaryDirectory()
ctx_b = ctx_b or tempfile.TemporaryDirectory()
with ctx_a as tree1, ctx_b as tree2:
actual_diff = self.tree_diff(tree1, tree2)
with open(f"{test_dir}/diff.json") as f:
expected_diff = json.load(f)
self.assertTreeDiffsEqual(expected_diff, actual_diff)
md_path = os.path.join(test_dir, "metadata.json")
if os.path.exists(md_path):
with open(md_path, "r") as f:
metadata = json.load(f)
self.assertMetadata(metadata, res)
# cache the downloaded data for the sources by copying
# it to self.cache, which is going to be used to initialize
# the osbuild cache with.
osb.copy_source_data(self.store, "org.osbuild.files")
def test_stages(self):
path = os.path.join(self.locate_test_data(), "stages")
for t in glob.glob(f"{path}/*/diff.json"):
test_path = os.path.dirname(t)
test_name = os.path.basename(test_path)
with self.subTest(stage=test_name):
self.run_stage_diff_test(test_path)
def test_dracut(self):
datadir = self.locate_test_data()
base = os.path.join(datadir, "stages/dracut")
with open(f"{base}/vanilla.json", "r") as f:
refs = json.load(f)
with self.osbuild as osb:
with open(f"{base}/template.json", "r") as f:
manifest = f.read()
tree = osb.treeid_from_manifest(manifest)
osb.compile(manifest, checkpoints=[tree])
with osb.map_object(tree) as tree:
for name, want in refs.items():
image = initrd.Initrd(f"{tree}/boot/{name}")
have = image.as_dict()
for key in ["modules", "kmods"]:
a = set(have[key])
b = set(want[key])
self.assertEqual(a, b, msg=key)
# cache the downloaded data for the files source
osb.copy_source_data(self.store, "org.osbuild.files")
def test_selinux(self):
datadir = self.locate_test_data()
testdir = os.path.join(datadir, "stages", "selinux")
def load_manifest(manifest_name):
with open(os.path.join(datadir, f"manifests/{manifest_name}")) as f:
manifest = json.load(f)
return manifest
with self.osbuild as osb:
for t in glob.glob(f"{testdir}/test_*.json"):
manifest = load_manifest("f32-base.json")
with open(t) as f:
check = json.load(f)
manifest["pipeline"]["stages"].append({
"name": "org.osbuild.selinux",
"options": check["options"]
})
jsdata = json.dumps(manifest)
treeid = osb.treeid_from_manifest(jsdata)
osb.compile(jsdata, checkpoints=[treeid])
ctx = osb.map_object(treeid)
with ctx as tree:
for path, want in check["labels"].items():
have = selinux.getfilecon(f"{tree}/{path}")
self.assertEqual(have, want)
# cache the downloaded data for the files source
osb.copy_source_data(self.store, "org.osbuild.files")