Blame tools/image-info

Packit 63bb0d
#!/usr/bin/python3
Packit 63bb0d
Packit 63bb0d
import argparse
Packit Service bcdfb1
import configparser
Packit 63bb0d
import contextlib
Packit 63bb0d
import errno
Packit 63bb0d
import functools
Packit 63bb0d
import glob
Packit 63bb0d
import mimetypes
Packit 63bb0d
import json
Packit 63bb0d
import os
Packit 63bb0d
import platform
Packit 63bb0d
import subprocess
Packit 63bb0d
import sys
Packit 63bb0d
import tempfile
Packit 63bb0d
import xml.etree.ElementTree
Packit 63bb0d
Packit 63bb0d
from osbuild import loop
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
def run_ostree(*args, _input=None, _check=True, **kwargs):
Packit 63bb0d
    args = list(args) + [f'--{k}={v}' for k, v in kwargs.items()]
Packit 63bb0d
    print("ostree " + " ".join(args), file=sys.stderr)
Packit 63bb0d
    res = subprocess.run(["ostree"] + args,
Packit 63bb0d
                         encoding="utf-8",
Packit 63bb0d
                         stdout=subprocess.PIPE,
Packit 63bb0d
                         input=_input,
Packit 63bb0d
                         check=_check)
Packit 63bb0d
    return res
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
@contextlib.contextmanager
Packit 63bb0d
def loop_create_device(ctl, fd, offset=None, sizelimit=None):
Packit 63bb0d
    while True:
Packit 63bb0d
        lo = loop.Loop(ctl.get_unbound())
Packit 63bb0d
        try:
Packit 63bb0d
            lo.set_fd(fd)
Packit 63bb0d
        except OSError as e:
Packit 63bb0d
            lo.close()
Packit 63bb0d
            if e.errno == errno.EBUSY:
Packit 63bb0d
                continue
Packit 63bb0d
            raise e
Packit 63bb0d
        try:
Packit 63bb0d
            lo.set_status(offset=offset, sizelimit=sizelimit, autoclear=True)
Packit 63bb0d
        except BlockingIOError:
Packit 63bb0d
            lo.clear_fd()
Packit 63bb0d
            lo.close()
Packit 63bb0d
            continue
Packit 63bb0d
        break
Packit 63bb0d
    try:
Packit 63bb0d
        yield lo
Packit 63bb0d
    finally:
Packit 63bb0d
        lo.close()
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
@contextlib.contextmanager
Packit 63bb0d
def loop_open(ctl, image, *, offset=None, size=None):
Packit 63bb0d
    with open(image, "rb") as f:
Packit 63bb0d
        fd = f.fileno()
Packit 63bb0d
        with loop_create_device(ctl, fd, offset=offset, sizelimit=size) as lo:
Packit 63bb0d
            yield os.path.join("/dev", lo.devname)
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
@contextlib.contextmanager
Packit 63bb0d
def open_image(ctl, image, fmt):
Packit 63bb0d
    with tempfile.TemporaryDirectory(dir="/var/tmp") as tmp:
Packit 63bb0d
        if fmt != "raw":
Packit 63bb0d
            target = os.path.join(tmp, "image.raw")
Packit 63bb0d
            # A bug exists in qemu that causes the conversion to raw to fail
Packit 63bb0d
            # on aarch64 systems with a LOT of CPUs. A workaround is to use
Packit 63bb0d
            # a single coroutine to do the conversion. It doesn't slow down
Packit 63bb0d
            # the conversion by much, but it hangs about half the time without
Packit 63bb0d
            # the limit set. 😢
Packit 63bb0d
            # Bug: https://bugs.launchpad.net/qemu/+bug/1805256
Packit 63bb0d
            if platform.machine() == 'aarch64':
Packit 63bb0d
                subprocess.run(
Packit 63bb0d
                    ["qemu-img", "convert", "-m", "1", "-O", "raw", image, target],
Packit 63bb0d
                    check=True
Packit 63bb0d
                )
Packit 63bb0d
            else:
Packit 63bb0d
                subprocess.run(
Packit 63bb0d
                    ["qemu-img", "convert", "-O", "raw", image, target],
Packit 63bb0d
                    check=True
Packit 63bb0d
                )
Packit 63bb0d
        else:
Packit 63bb0d
            target = image
Packit 63bb0d
Packit 63bb0d
        size = os.stat(target).st_size
Packit 63bb0d
Packit 63bb0d
        with loop_open(ctl, target, offset=0, size=size) as dev:
Packit 63bb0d
            yield target, dev
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
@contextlib.contextmanager
Packit 63bb0d
def mount_at(device, mountpoint, options=[], extra=[]):
Packit 63bb0d
    opts = ",".join(["ro"] + options)
Packit 63bb0d
    subprocess.run(["mount", "-o", opts] + extra + [device, mountpoint], check=True)
Packit 63bb0d
    try:
Packit 63bb0d
        yield mountpoint
Packit 63bb0d
    finally:
Packit 63bb0d
        subprocess.run(["umount", "--lazy", mountpoint], check=True)
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
@contextlib.contextmanager
Packit 63bb0d
def mount(device):
Packit 63bb0d
    with tempfile.TemporaryDirectory() as mountpoint:
Packit 63bb0d
        subprocess.run(["mount", "-o", "ro", device, mountpoint], check=True)
Packit 63bb0d
        try:
Packit 63bb0d
            yield mountpoint
Packit 63bb0d
        finally:
Packit 63bb0d
            subprocess.run(["umount", "--lazy", mountpoint], check=True)
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
def parse_environment_vars(s):
Packit 63bb0d
    r = {}
Packit 63bb0d
    for line in s.split("\n"):
Packit 63bb0d
        line = line.strip()
Packit 63bb0d
        if not line:
Packit 63bb0d
            continue
Packit 63bb0d
        if line[0] == '#':
Packit 63bb0d
            continue
Packit 63bb0d
        key, value = line.split("=", 1)
Packit 63bb0d
        r[key] = value.strip('"')
Packit 63bb0d
    return r
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
def parse_unit_files(s, expected_state):
Packit 63bb0d
    r = []
Packit 63bb0d
    for line in s.split("\n")[1:]:
Packit 63bb0d
        try:
Packit 63bb0d
            unit, state, *_ = line.split()
Packit 63bb0d
        except ValueError:
Packit 63bb0d
            pass
Packit 63bb0d
        if state != expected_state:
Packit 63bb0d
            continue
Packit 63bb0d
        r.append(unit)
Packit 63bb0d
Packit 63bb0d
    return r
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
def subprocess_check_output(argv, parse_fn=None):
Packit 63bb0d
    try:
Packit 63bb0d
        output = subprocess.check_output(argv, encoding="utf-8")
Packit 63bb0d
    except subprocess.CalledProcessError as e:
Packit 63bb0d
        sys.stderr.write(f"--- Output from {argv}:\n")
Packit 63bb0d
        sys.stderr.write(e.stdout)
Packit 63bb0d
        sys.stderr.write("\n--- End of the output\n")
Packit 63bb0d
        raise
Packit 63bb0d
Packit 63bb0d
    return parse_fn(output) if parse_fn else output
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
def read_image_format(device):
Packit 63bb0d
    qemu = subprocess_check_output(["qemu-img", "info", "--output=json", device], json.loads)
Packit 63bb0d
    return qemu["format"]
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
def read_partition(device, partition):
Packit 63bb0d
    res = subprocess.run(["blkid", "--output", "export", device],
Packit 63bb0d
                         check=False, encoding="utf-8",
Packit 63bb0d
                         stdout=subprocess.PIPE)
Packit 63bb0d
    if res.returncode == 0:
Packit 63bb0d
        blkid = parse_environment_vars(res.stdout)
Packit 63bb0d
    else:
Packit 63bb0d
        blkid = {}
Packit 63bb0d
Packit 63bb0d
    partition["label"] = blkid.get("LABEL") # doesn't exist for mbr
Packit 63bb0d
    partition["uuid"] = blkid.get("UUID")
Packit 63bb0d
    partition["fstype"] = blkid.get("TYPE")
Packit 63bb0d
    return partition
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
def read_partition_table(device):
Packit 63bb0d
    partitions = []
Packit 63bb0d
    info = {"partition-table": None,
Packit 63bb0d
            "partition-table-id": None,
Packit 63bb0d
            "partitions": partitions}
Packit 63bb0d
    try:
Packit 63bb0d
        sfdisk = subprocess_check_output(["sfdisk", "--json", device], json.loads)
Packit 63bb0d
    except subprocess.CalledProcessError:
Packit 63bb0d
        partitions.append(read_partition(device, False))
Packit 63bb0d
        return info
Packit 63bb0d
Packit 63bb0d
    ptable = sfdisk["partitiontable"]
Packit 63bb0d
    assert ptable["unit"] == "sectors"
Packit 63bb0d
    is_dos = ptable["label"] == "dos"
Packit 63bb0d
    ssize = ptable.get("sectorsize", 512)
Packit 63bb0d
Packit 63bb0d
    for i, p in enumerate(ptable["partitions"]):
Packit 63bb0d
Packit 63bb0d
        partuuid = p.get("uuid")
Packit 63bb0d
        if not partuuid and is_dos:
Packit 63bb0d
            # For dos/mbr partition layouts the partition uuid
Packit 63bb0d
            # is generated. Normally this would be done by
Packit 63bb0d
            # udev+blkid, when the partition table is scanned.
Packit 63bb0d
            # 'sfdisk' prefixes the partition id with '0x' but
Packit 63bb0d
            # 'blkid' does not; remove it to mimic 'blkid'
Packit 63bb0d
            table_id = ptable['id'][2:]
Packit 63bb0d
            partuuid = "%.33s-%02x" % (table_id, i+1)
Packit 63bb0d
Packit 63bb0d
        partitions.append({
Packit 63bb0d
            "bootable": p.get("bootable", False),
Packit 63bb0d
            "type": p["type"],
Packit 63bb0d
            "start": p["start"] * ssize,
Packit 63bb0d
            "size": p["size"] * ssize,
Packit 63bb0d
            "partuuid": partuuid
Packit 63bb0d
        })
Packit 63bb0d
Packit 63bb0d
    info["partition-table"] = ptable["label"]
Packit 63bb0d
    info["partition-table-id"] = ptable["id"]
Packit 63bb0d
Packit 63bb0d
    return info
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
def read_bootloader_type(device):
Packit 63bb0d
    with open(device, "rb") as f:
Packit 63bb0d
        if b"GRUB" in f.read(512):
Packit 63bb0d
            return "grub"
Packit 63bb0d
        else:
Packit 63bb0d
            return "unknown"
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
def read_boot_entries(boot_dir):
Packit 63bb0d
    entries = []
Packit 63bb0d
    for conf in glob.glob(f"{boot_dir}/loader/entries/*.conf"):
Packit 63bb0d
        with open(conf) as f:
Packit 63bb0d
           entries.append(dict(line.strip().split(" ", 1) for line in f))
Packit 63bb0d
Packit 63bb0d
    return sorted(entries, key=lambda e: e["title"])
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
def rpm_verify(tree):
Packit 63bb0d
    # cannot use `rpm --root` here, because rpm uses passwd from the host to
Packit 63bb0d
    # verify user and group ownership:
Packit 63bb0d
    #   https://github.com/rpm-software-management/rpm/issues/882
Packit 63bb0d
    rpm = subprocess.Popen(["chroot", tree, "rpm", "--verify", "--all"],
Packit 63bb0d
            stdout=subprocess.PIPE, encoding="utf-8")
Packit 63bb0d
Packit 63bb0d
    changed = {}
Packit 63bb0d
    missing = []
Packit 63bb0d
    for line in rpm.stdout:
Packit 63bb0d
        # format description in rpm(8), under `--verify`
Packit 63bb0d
        attrs = line[:9]
Packit 63bb0d
        if attrs == "missing  ":
Packit 63bb0d
            missing.append(line[12:].rstrip())
Packit 63bb0d
        else:
Packit 63bb0d
            changed[line[13:].rstrip()] = attrs
Packit 63bb0d
Packit 63bb0d
    # ignore return value, because it returns non-zero when it found changes
Packit 63bb0d
    rpm.wait()
Packit 63bb0d
Packit 63bb0d
    return {
Packit 63bb0d
        "missing": sorted(missing),
Packit 63bb0d
        "changed": changed
Packit 63bb0d
    }
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
def rpm_packages(tree, is_ostree):
Packit 63bb0d
    cmd = ["rpm", "--root", tree, "-qa"]
Packit 63bb0d
    if is_ostree:
Packit 63bb0d
        cmd += ["--dbpath", "/usr/share/rpm"]
Packit 63bb0d
    pkgs = subprocess_check_output(cmd, str.split)
Packit 63bb0d
    return list(sorted(pkgs))
Packit 63bb0d
Packit 63bb0d
Packit Service 3a6627
@contextlib.contextmanager
Packit Service 3a6627
def change_root(root):
Packit Service 3a6627
    real_root = os.open("/", os.O_RDONLY)
Packit Service 3a6627
    try:
Packit Service 3a6627
        os.chroot(root)
Packit Service 3a6627
        yield None
Packit Service 3a6627
    finally:
Packit Service 3a6627
        os.fchdir(real_root)
Packit Service 3a6627
        os.chroot(".")
Packit Service 3a6627
        os.close(real_root)
Packit Service 3a6627
Packit Service 3a6627
Packit 63bb0d
def read_services(tree, state):
Packit Service 3a6627
    services_state = subprocess_check_output(["systemctl", f"--root={tree}", "list-unit-files"], (lambda s: parse_unit_files(s, state)))
Packit Service 3a6627
Packit Service 3a6627
    # Since systemd v246, some services previously reported as "enabled" /
Packit Service 3a6627
    # "disabled" are now reported as "alias". There is no systemd command, that
Packit Service 3a6627
    # would take an "alias" unit and report its state as enabled/disabled
Packit Service 3a6627
    # and could run on a different tree (with "--root" option).
Packit Service 3a6627
    # To make the produced list of services in the given state consistent on
Packit Service 3a6627
    # pre/post v246 systemd versions, check all "alias" units and append them
Packit Service 3a6627
    # to the list, if their target is also listed in 'services_state'.
Packit Service 3a6627
    if state != "alias":
Packit Service 3a6627
        services_alias = subprocess_check_output(["systemctl", f"--root={tree}", "list-unit-files"], (lambda s: parse_unit_files(s, "alias")))
Packit Service 3a6627
Packit Service 3a6627
        for alias in services_alias:
Packit Service 3a6627
            # The service may be in one of the following places (output of
Packit Service 3a6627
            # "systemd-analyze unit-paths", it should not change too often).
Packit Service 3a6627
            unit_paths = [
Packit Service 3a6627
                "/etc/systemd/system.control",
Packit Service 3a6627
                "/run/systemd/system.control",
Packit Service 3a6627
                "/run/systemd/transient",
Packit Service 3a6627
                "/run/systemd/generator.early",
Packit Service 3a6627
                "/etc/systemd/system",
Packit Service 3a6627
                "/run/systemd/system",
Packit Service 3a6627
                "/run/systemd/generator",
Packit Service 3a6627
                "/usr/local/lib/systemd/system",
Packit Service 3a6627
                "/usr/lib/systemd/system",
Packit Service 3a6627
                "/run/systemd/generator.late"
Packit Service 3a6627
            ]
Packit Service 3a6627
Packit Service 3a6627
            with change_root(tree):
Packit Service 3a6627
                for path in unit_paths:
Packit Service 3a6627
                    unit_path = os.path.join(path, alias)
Packit Service 3a6627
                    if os.path.exists(unit_path):
Packit Service 3a6627
                        real_unit_path = os.path.realpath(unit_path)
Packit Service 3a6627
                        # Skip the alias, if there was a symlink cycle.
Packit Service 3a6627
                        # When symbolic link cycles occur, the returned path will
Packit Service 3a6627
                        # be one member of the cycle, but no guarantee is made about
Packit Service 3a6627
                        # which member that will be.
Packit Service 3a6627
                        if os.path.islink(real_unit_path):
Packit Service 3a6627
                            continue
Packit Service 3a6627
Packit Service 3a6627
                        # Append the alias unit to the list, if its target is
Packit Service 3a6627
                        # already there.
Packit Service 3a6627
                        if os.path.basename(real_unit_path) in services_state:
Packit Service 3a6627
                            services_state.append(alias)
Packit Service 3a6627
Packit Service 3a6627
    # deduplicate and sort
Packit Service 3a6627
    services_state = list(set(services_state))
Packit Service 3a6627
    services_state.sort()
Packit Service 3a6627
Packit Service 3a6627
    return services_state
Packit Service 3a6627
Packit Service 3a6627
Packit Service 3a6627
def read_default_target(tree):
Packit Service 3a6627
    return subprocess_check_output(["systemctl", f"--root={tree}", "get-default"]).rstrip()
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
def read_firewall_zone(tree):
Packit 63bb0d
    try:
Packit 63bb0d
        with open(f"{tree}/etc/firewalld/firewalld.conf") as f:
Packit 63bb0d
            conf = parse_environment_vars(f.read())
Packit 63bb0d
            default = conf["DefaultZone"]
Packit 63bb0d
    except FileNotFoundError:
Packit 63bb0d
        default = "public"
Packit 63bb0d
Packit 63bb0d
    r = []
Packit 63bb0d
    try:
Packit 63bb0d
        root = xml.etree.ElementTree.parse(f"{tree}/etc/firewalld/zones/{default}.xml").getroot()
Packit 63bb0d
    except FileNotFoundError:
Packit 63bb0d
        root = xml.etree.ElementTree.parse(f"{tree}/usr/lib/firewalld/zones/{default}.xml").getroot()
Packit 63bb0d
Packit 63bb0d
    for element in root.findall("service"):
Packit 63bb0d
        r.append(element.get("name"))
Packit 63bb0d
Packit 63bb0d
    return r
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
def read_fstab(tree):
Packit 63bb0d
    result = []
Packit 63bb0d
    with contextlib.suppress(FileNotFoundError):
Packit 63bb0d
        with open(f"{tree}/etc/fstab") as f:
Packit 63bb0d
            result = sorted([line.split() for line in f if line and not line.startswith("#")])
Packit 63bb0d
    return result
Packit 63bb0d
Packit 63bb0d
Packit Service bcdfb1
# Read configuration changes possible via org.osbuild.rhsm stage
Packit Service bcdfb1
def read_rhsm(tree):
Packit Service bcdfb1
    result = {}
Packit Service bcdfb1
Packit Service bcdfb1
    # Check RHSM DNF plugins configuration and allowed options
Packit Service bcdfb1
    dnf_plugins_config = {
Packit Service bcdfb1
        "product-id": f"{tree}/etc/dnf/plugins/product-id.conf",
Packit Service bcdfb1
        "subscription-manager": f"{tree}/etc/dnf/plugins/subscription-manager.conf"
Packit Service bcdfb1
    }
Packit Service bcdfb1
Packit Service bcdfb1
    for plugin_name, plugin_path in dnf_plugins_config.items():
Packit Service bcdfb1
        with contextlib.suppress(FileNotFoundError):
Packit Service bcdfb1
            with open(plugin_path) as f:
Packit Service bcdfb1
                parser = configparser.ConfigParser()
Packit Service bcdfb1
                parser.read_file(f)
Packit Service bcdfb1
                # only read "enabled" option from "main" section
Packit Service bcdfb1
                with contextlib.suppress(configparser.NoSectionError, configparser.NoOptionError):
Packit Service bcdfb1
                    # get the value as the first thing, in case it raises an exception
Packit Service bcdfb1
                    enabled = parser.getboolean("main", "enabled")
Packit Service bcdfb1
Packit Service bcdfb1
                    try:
Packit Service bcdfb1
                        dnf_plugins_dict = result["dnf-plugins"]
Packit Service bcdfb1
                    except KeyError as _:
Packit Service bcdfb1
                        dnf_plugins_dict = result["dnf-plugins"] = {}
Packit Service bcdfb1
Packit Service bcdfb1
                    try:
Packit Service bcdfb1
                        plugin_dict = dnf_plugins_dict[plugin_name]
Packit Service bcdfb1
                    except KeyError as _:
Packit Service bcdfb1
                        plugin_dict = dnf_plugins_dict[plugin_name] = {}
Packit Service bcdfb1
Packit Service bcdfb1
                    plugin_dict["enabled"] = enabled
Packit Service bcdfb1
Packit Service bcdfb1
    return result
Packit Service bcdfb1
Packit Service bcdfb1
Packit Service bcdfb1
# Create a nested dictionary for all supported sysconfigs 
Packit Service bcdfb1
def read_sysconfig(tree):
Packit Service bcdfb1
    result = {}
Packit Service bcdfb1
    sysconfig_paths = {
Packit Service bcdfb1
        "kernel": f"{tree}/etc/sysconfig/kernel",
Packit Service bcdfb1
        "network": f"{tree}/etc/sysconfig/network"
Packit Service bcdfb1
    }
Packit Service bcdfb1
    # iterate through supported configs
Packit Service bcdfb1
    # based on https://github.com/osbuild/osbuild/blob/main/osbuild/util/osrelease.py#L17
Packit Service bcdfb1
    for name, path in sysconfig_paths.items():
Packit Service bcdfb1
        with contextlib.suppress(FileNotFoundError):
Packit Service bcdfb1
            with open(path) as f:
Packit Service bcdfb1
                # if file exists start with empty array of values
Packit Service bcdfb1
                result[name] = {}
Packit Service bcdfb1
                for line in f:
Packit Service bcdfb1
                    line = line.strip()
Packit Service bcdfb1
                    if not line:
Packit Service bcdfb1
                        continue
Packit Service bcdfb1
                    if line[0] == "#":
Packit Service bcdfb1
                        continue
Packit Service bcdfb1
                    key, value = line.split("=", 1)
Packit Service bcdfb1
                    result[name][key] = value.strip('"')
Packit Service bcdfb1
    return result
Packit Service bcdfb1
 
Packit Service bcdfb1
Packit 63bb0d
def append_filesystem(report, tree, *, is_ostree=False):
Packit 63bb0d
    if os.path.exists(f"{tree}/etc/os-release"):
Packit 63bb0d
        report["packages"] = rpm_packages(tree, is_ostree)
Packit 63bb0d
        if not is_ostree:
Packit 63bb0d
            report["rpm-verify"] = rpm_verify(tree)
Packit 63bb0d
Packit 63bb0d
        with open(f"{tree}/etc/os-release") as f:
Packit 63bb0d
            report["os-release"] = parse_environment_vars(f.read())
Packit 63bb0d
Packit 63bb0d
        report["services-enabled"] = read_services(tree, "enabled")
Packit 63bb0d
        report["services-disabled"] = read_services(tree, "disabled")
Packit 63bb0d
Packit Service 3a6627
        default_target = read_default_target(tree)
Packit Service 3a6627
        if default_target:
Packit Service 3a6627
            report["default-target"] = default_target
Packit Service 3a6627
Packit 63bb0d
        with contextlib.suppress(FileNotFoundError):
Packit 63bb0d
            with open(f"{tree}/etc/hostname") as f:
Packit 63bb0d
                report["hostname"] = f.read().strip()
Packit 63bb0d
Packit 63bb0d
        with contextlib.suppress(FileNotFoundError):
Packit 63bb0d
            report["timezone"] = os.path.basename(os.readlink(f"{tree}/etc/localtime"))
Packit 63bb0d
Packit 63bb0d
        with contextlib.suppress(FileNotFoundError):
Packit 63bb0d
            report["firewall-enabled"] = read_firewall_zone(tree)
Packit 63bb0d
Packit 63bb0d
        fstab = read_fstab(tree)
Packit 63bb0d
        if fstab:
Packit 63bb0d
            report["fstab"] = fstab
Packit 63bb0d
Packit Service bcdfb1
        rhsm = read_rhsm(tree)
Packit Service bcdfb1
        if rhsm:
Packit Service bcdfb1
            report["rhsm"] = rhsm
Packit Service bcdfb1
Packit Service bcdfb1
        sysconfig = read_sysconfig(tree)
Packit Service bcdfb1
        if sysconfig:
Packit Service bcdfb1
            report["sysconfig"] = sysconfig
Packit Service bcdfb1
            
Packit 63bb0d
        with open(f"{tree}/etc/passwd") as f:
Packit 63bb0d
            report["passwd"] = sorted(f.read().strip().split("\n"))
Packit 63bb0d
Packit 63bb0d
        with open(f"{tree}/etc/group") as f:
Packit 63bb0d
            report["groups"] = sorted(f.read().strip().split("\n"))
Packit 63bb0d
Packit 63bb0d
        if is_ostree:
Packit 63bb0d
            with open(f"{tree}/usr/lib/passwd") as f:
Packit 63bb0d
                report["passwd-system"] = sorted(f.read().strip().split("\n"))
Packit 63bb0d
Packit 63bb0d
            with open(f"{tree}/usr/lib/group") as f:
Packit 63bb0d
                report["groups-system"] = sorted(f.read().strip().split("\n"))
Packit 63bb0d
Packit 63bb0d
        if os.path.exists(f"{tree}/boot") and len(os.listdir(f"{tree}/boot")) > 0:
Packit 63bb0d
            assert "bootmenu" not in report
Packit 63bb0d
            with contextlib.suppress(FileNotFoundError):
Packit 63bb0d
                with open(f"{tree}/boot/grub2/grubenv") as f:
Packit 63bb0d
                    report["boot-environment"] = parse_environment_vars(f.read())
Packit 63bb0d
            report["bootmenu"] = read_boot_entries(f"{tree}/boot")
Packit 63bb0d
Packit 63bb0d
    elif len(glob.glob(f"{tree}/vmlinuz-*")) > 0:
Packit 63bb0d
        assert "bootmenu" not in report
Packit 63bb0d
        with open(f"{tree}/grub2/grubenv") as f:
Packit 63bb0d
            report["boot-environment"] = parse_environment_vars(f.read())
Packit 63bb0d
        report["bootmenu"] = read_boot_entries(tree)
Packit 63bb0d
    elif len(glob.glob(f"{tree}/EFI")):
Packit 63bb0d
        print("EFI partition", file=sys.stderr)
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
def partition_is_esp(partition):
Packit 63bb0d
    return partition["type"] == "C12A7328-F81F-11D2-BA4B-00A0C93EC93B"
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
def find_esp(partitions):
Packit 63bb0d
    for i, p in enumerate(partitions):
Packit 63bb0d
        if partition_is_esp(p):
Packit 63bb0d
            return p, i
Packit 63bb0d
    return None, 0
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
def append_partitions(report, device, loctl):
Packit 63bb0d
    partitions = report["partitions"]
Packit 63bb0d
    esp, esp_id = find_esp(partitions)
Packit 63bb0d
Packit 63bb0d
    with contextlib.ExitStack() as cm:
Packit 63bb0d
Packit 63bb0d
        devices = {}
Packit 63bb0d
        for n, part in enumerate(partitions):
Packit 63bb0d
            start, size = part["start"], part["size"]
Packit 63bb0d
            dev = cm.enter_context(loop_open(loctl, device, offset=start, size=size))
Packit 63bb0d
            devices[n] = dev
Packit 63bb0d
            read_partition(dev, part)
Packit 63bb0d
Packit 63bb0d
        for n, part in enumerate(partitions):
Packit 63bb0d
            if not part["fstype"]:
Packit 63bb0d
                continue
Packit 63bb0d
Packit 63bb0d
            with mount(devices[n]) as tree:
Packit 63bb0d
                if esp and os.path.exists(f"{tree}/boot/efi"):
Packit 63bb0d
                    with mount_at(devices[esp_id], f"{tree}/boot/efi", options=['umask=077']):
Packit 63bb0d
                        append_filesystem(report, tree)
Packit 63bb0d
                else:
Packit 63bb0d
                    append_filesystem(report, tree)
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
def analyse_image(image):
Packit 63bb0d
    loctl = loop.LoopControl()
Packit 63bb0d
Packit 63bb0d
    imgfmt = read_image_format(image)
Packit 63bb0d
    report = {"image-format": imgfmt}
Packit 63bb0d
Packit 63bb0d
    with open_image(loctl, image, imgfmt) as (_, device):
Packit 63bb0d
        report["bootloader"] = read_bootloader_type(device)
Packit 63bb0d
        report.update(read_partition_table(device))
Packit 63bb0d
Packit 63bb0d
        if report["partition-table"]:
Packit 63bb0d
            append_partitions(report, device, loctl)
Packit 63bb0d
        else:
Packit 63bb0d
            with mount(device) as tree:
Packit 63bb0d
                append_filesystem(report, tree)
Packit 63bb0d
Packit 63bb0d
    return report
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
def append_directory(report, tree):
Packit 63bb0d
    if os.path.lexists(f"{tree}/ostree"):
Packit 63bb0d
        os.makedirs(f"{tree}/etc", exist_ok=True)
Packit 63bb0d
        with mount_at(f"{tree}/usr/etc", f"{tree}/etc", extra=["--bind"]):
Packit 63bb0d
            append_filesystem(report, tree, is_ostree=True)
Packit 63bb0d
    else:
Packit 63bb0d
        append_filesystem(report, tree)
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
def append_ostree_repo(report, repo):
Packit 63bb0d
    ostree = functools.partial(run_ostree, repo=repo)
Packit 63bb0d
Packit 63bb0d
    r = ostree("config", "get", "core.mode")
Packit 63bb0d
    report["ostree"] = {
Packit 63bb0d
        "repo": {
Packit 63bb0d
            "core.mode": r.stdout.strip()
Packit 63bb0d
        }
Packit 63bb0d
    }
Packit 63bb0d
Packit 63bb0d
    r = ostree("refs")
Packit 63bb0d
    refs = r.stdout.strip().split("\n")
Packit 63bb0d
    report["ostree"]["refs"] = refs
Packit 63bb0d
Packit 63bb0d
    resolved = {r: ostree("rev-parse", r).stdout.strip() for r in refs}
Packit 63bb0d
    commit = resolved[refs[0]]
Packit 63bb0d
Packit 63bb0d
    with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir:
Packit 63bb0d
        tree = os.path.join(tmpdir, "tree")
Packit 63bb0d
        ostree("checkout", "--force-copy", commit, tree)
Packit 63bb0d
        append_directory(report, tree)
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
def analyse_directory(path):
Packit 63bb0d
    report = {}
Packit 63bb0d
Packit 63bb0d
    if os.path.exists(os.path.join(path, "compose.json")):
Packit 63bb0d
        report["type"] = "ostree/commit"
Packit 63bb0d
        repo = os.path.join(path, "repo")
Packit 63bb0d
        append_ostree_repo(report, repo)
Packit 63bb0d
    elif os.path.isdir(os.path.join(path, "refs")):
Packit 63bb0d
        report["type"] = "ostree/repo"
Packit 63bb0d
        append_ostree_repo(report, repo)
Packit 63bb0d
    else:
Packit 63bb0d
        append_directory(report, path)
Packit 63bb0d
Packit 63bb0d
    return report
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
def is_tarball(path):
Packit 63bb0d
    mtype, encoding = mimetypes.guess_type(path)
Packit 63bb0d
    return mtype == "application/x-tar"
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
def analyse_tarball(path):
Packit 63bb0d
    with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir:
Packit 63bb0d
        tree = os.path.join(tmpdir, "root")
Packit 63bb0d
        os.makedirs(tree)
Packit 63bb0d
        command = [
Packit 63bb0d
            "tar",
Packit 63bb0d
            "-x",
Packit 63bb0d
            "--auto-compress",
Packit 63bb0d
            "-f", path,
Packit 63bb0d
            "-C", tree
Packit 63bb0d
        ]
Packit 63bb0d
        subprocess.run(command,
Packit 63bb0d
                       stdout=sys.stderr,
Packit 63bb0d
                       check=True)
Packit 63bb0d
        return analyse_directory(tree)
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
def is_compressed(path):
Packit 63bb0d
    _, encoding = mimetypes.guess_type(path)
Packit 63bb0d
    return encoding in ["xz", "gzip", "bzip2"]
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
def analyse_compressed(path):
Packit 63bb0d
    _, encoding = mimetypes.guess_type(path)
Packit 63bb0d
Packit 63bb0d
    if encoding == "xz":
Packit 63bb0d
        command = ["unxz", "--force"]
Packit 63bb0d
    elif encoding == "gzip":
Packit 63bb0d
        command = ["gunzip", "--force"]
Packit 63bb0d
    elif encoding == "bzip2":
Packit 63bb0d
        command = ["bunzip2", "--force"]
Packit 63bb0d
    else:
Packit 63bb0d
        raise ValueError(f"Unsupported compression: {encoding}")
Packit 63bb0d
Packit 63bb0d
    with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir:
Packit 63bb0d
        subprocess.run(["cp", "--reflink=auto", "-a", path, tmpdir],
Packit 63bb0d
                       check=True)
Packit 63bb0d
Packit 63bb0d
        files = os.listdir(tmpdir)
Packit 63bb0d
        archive = os.path.join(tmpdir, files[0])
Packit 63bb0d
        subprocess.run(command + [archive], check=True)
Packit 63bb0d
Packit 63bb0d
        files = os.listdir(tmpdir)
Packit 63bb0d
        assert len(files) == 1
Packit 63bb0d
        image = os.path.join(tmpdir, files[0])
Packit 63bb0d
        return analyse_image(image)
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
def main():
Packit 63bb0d
    parser = argparse.ArgumentParser(description="Inspect an image")
Packit 63bb0d
    parser.add_argument("target", metavar="TARGET",
Packit 63bb0d
                        help="The file or directory to analyse",
Packit 63bb0d
                        type=os.path.abspath)
Packit 63bb0d
Packit 63bb0d
    args = parser.parse_args()
Packit 63bb0d
    target = args.target
Packit 63bb0d
Packit 63bb0d
    if os.path.isdir(target):
Packit 63bb0d
        report = analyse_directory(target)
Packit 63bb0d
    elif is_tarball(target):
Packit 63bb0d
        report = analyse_tarball(target)
Packit 63bb0d
    elif is_compressed(target):
Packit 63bb0d
        report = analyse_compressed(target)
Packit 63bb0d
    else:
Packit 63bb0d
        report = analyse_image(target)
Packit 63bb0d
Packit 63bb0d
    json.dump(report, sys.stdout, sort_keys=True, indent=2)
Packit 63bb0d
Packit 63bb0d
Packit 63bb0d
if __name__ == "__main__":
Packit 63bb0d
    main()
Packit 63bb0d