#!/usr/bin/python3
"""
Verify, and install RPM packages
The `exclude.docs` option can be used to tell rpm to not install docs.
`gpgkeys` should be an array of strings containing each GPG key to be used
to verify the packages.
`packages` is an array of objects representing RPMs. Each RPM is identified by
its checksums. Specifically, the content hash of the rpm, not the checksums
found in the rpm header. The `check_gpg` property indicates that the RPM's
must be signed by one of the given GPG keys, and that the transaction should
fail otherwise.
This stage will fail if any of the packages can't be found, or if any
RPM fails signature verification.
Uses the following binaries from the host:
* `rpmkeys` to import keys and to verify signatures for each package
* `sh`, `mkdir`, `mount`, `chmod` to prepare the target tree for `rpm`
* `rpm` to install packages into the target tree
This stage will return the following metadata via the osbuild API:
packages: a list of objects representing the installed packages,
with information about:
name, version, release,
and optionally:
epoch, arch, sigmd5, sigpgp, siggpg
"""
import contextlib
import json
import os
import pathlib
import subprocess
import sys
import tempfile
from osbuild import api
SCHEMA = """
"additionalProperties": false,
"properties": {
"disable_dracut": {
"description": "Prevent dracut from running",
"type": "boolean"
},
"exclude": {
"type": "object",
"additionalProperties": false,
"properties": {
"docs": {
"type": "boolean",
"description": "Do not install documentation.",
"default": false
}
}
},
"gpgkeys": {
"description": "Array of GPG key contents to import",
"type": "array",
"items": { "type": "string" }
},
"packages": {
"description": "Array of RPM content hashes",
"type": "array",
"items": {
"oneOf": [
{
"type": "string",
"description": ".rpm file checksum, prefixed with 'md5:', 'sha1:', 'sha256:', 'sha384:', or 'sha512:', indicating the algorithm used."
},
{
"type": "object",
"additionalProperties": false,
"required": ["checksum"],
"properties": {
"checksum": {
"type": "string",
"description": ".rpm file checksum, prefixed with 'md5:', 'sha1:', 'sha256:', 'sha384:', or 'sha512:', indicating the algorithm used."
},
"check_gpg": {
"type": "boolean",
"description": "Whether the GPG signatures of the RPM should be verified.",
"default": false
}
}
}
]
}
}
}
"""
SCHEMA_2 = """
"options": {
"type": "object",
"additionalProperties": false,
"properties": {
"gpgkeys": {
"description": "Array of GPG key contents to import",
"type": "array",
"items": { "type": "string" }
},
"disable_dracut": {
"description": "Prevent dracut from running",
"type": "boolean"
},
"exclude": {
"type": "object",
"additionalProperties": false,
"properties": {
"docs": {
"type": "boolean",
"description": "Do not install documentation.",
"default": false
}
}
}
}
},
"inputs": {
"type": "object",
"additionalProperties": false,
"required": ["packages"],
"properties": {
"packages": {
"type": "object",
"additionalProperties": true
}
}
}
"""
def generate_package_metadata(tree):
query = r"""\{
"name": "%{NAME}",
"version": "%{VERSION}",
"release": "%{RELEASE}",
"epoch": %|EPOCH?{"%{EPOCH}"}:{null}|,
"arch": %|ARCH?{"%{ARCH}"}:{null}|,
"sigmd5": %|SIGMD5?{"%{SIGMD5}"}:{null}|,
"sigpgp": %|SIGPGP?{"%{SIGPGP}"}:{null}|,
"siggpg": %|SIGGPG?{"%{SIGGPG}"}:{null}|
\},
"""
cmd = [
"rpm",
"-qa",
"--root", tree,
"--qf=" + query
]
res = subprocess.run(cmd, stdout=subprocess.PIPE,
check=True, encoding="utf-8")
raw = res.stdout.strip()
jsdata = '{"packages": [' + raw[:-1] + "]}"
return json.loads(jsdata)
def disable_dracut(tree):
kernel_install_d = f"{tree}/etc/kernel/install.d"
files = []
os.makedirs(kernel_install_d, exist_ok=True)
for path in ["50-dracut.install", "51-dracut-rescue.install"]:
target = os.path.join(kernel_install_d, path)
os.symlink("/dev/null", target)
files.append(target)
return files
def enable_dracut(masked_files):
for path in masked_files:
os.unlink(path)
def remove_unowned_etc_kernel(tree):
# if installed, /etc/kernel is owned by systemd-udev; but
# in case the directory is un-owned, remove it again
res = subprocess.run(["rpm",
"--root", tree,
"-qf", "/etc/kernel"],
stdout=subprocess.PIPE,
check=False)
owner = res.stdout.strip()
if res == 0 and owner:
print(f"/etc/kernel is owned by {owner}")
return
with contextlib.suppress(OSError):
os.rmdir("etc/kernel/install.d")
os.rmdir("etc/kernel")
def parse_input(inputs):
packages = inputs["packages"]
path = packages["path"]
data = packages["data"]
refs = data["refs"]
return path, refs
def main(tree, inputs, options):
pkgpath, packages = parse_input(inputs)
for key in options.get("gpgkeys", []):
with tempfile.NamedTemporaryFile(prefix="gpgkey.", mode="w") as keyfile:
keyfile.write(key)
keyfile.flush()
subprocess.run([
"rpmkeys",
"--root", tree,
"--import", keyfile.name
], check=True)
print("imported gpg key")
for checksum, data in packages.items():
if data.get("rpm.check_gpg"):
subprocess.run([
"rpmkeys",
"--root", tree,
"--checksig",
checksum
], cwd=pkgpath, stdout=subprocess.DEVNULL, check=True)
script = f"""
set -e
mkdir -p {tree}/dev {tree}/sys {tree}/proc
mount -o bind /dev {tree}/dev
mount -o bind /sys {tree}/sys
mount -o bind /proc {tree}/proc
"""
machine_id_set_previously = os.path.exists(f"{tree}/etc/machine-id")
if not machine_id_set_previously:
# create a fake machine ID to improve reproducibility
print("creating a fake machine id")
script += f"""
mkdir -p {tree}/etc
echo "ffffffffffffffffffffffffffffffff" > {tree}/etc/machine-id
chmod 0444 {tree}/etc/machine-id
"""
subprocess.run(["/bin/sh", "-c", script], check=True)
extra_args = []
if options.get("exclude", {}).get("docs"):
extra_args += ["--excludedocs"]
# prevent dracut from running, if enabled
no_dracut = options.get("disable_dracut", False)
if no_dracut:
masked_files = disable_dracut(tree)
with tempfile.NamedTemporaryFile(prefix="manifest.", mode='w') as manifest:
manifest.writelines(c+'\n' for c in packages)
manifest.flush()
subprocess.run([
"rpm",
"--verbose",
"--root", tree,
*extra_args,
# The content hash of the rpms has been verified, default to not
# verifying again (see /usr/lib/rpm/macros for more info)
"--define", "_pkgverify_level none",
"--install", manifest.name
], cwd=pkgpath, check=True)
# re-enabled dracut
if no_dracut:
enable_dracut(masked_files)
remove_unowned_etc_kernel(tree)
# remove temporary machine ID if it was created by us
if not machine_id_set_previously:
print("deleting the fake machine id")
machine_id_file = pathlib.Path(f"{tree}/etc/machine-id")
machine_id_file.unlink()
machine_id_file.touch()
# remove random seed from the tree if exists
with contextlib.suppress(FileNotFoundError):
os.unlink(f"{tree}/var/lib/systemd/random-seed")
# generate the metadata
md = generate_package_metadata(tree)
api.metadata(md)
return 0
if __name__ == '__main__':
args = api.arguments()
r = main(args["tree"], args["inputs"], args["options"])
sys.exit(r)