Blob Blame History Raw
#!/usr/bin/python3

"""Manifest-Pre-Processor - Depsolving

This manifest-pre-processor consumes a manifest on stdin, processes it, and
produces the resulting manifest on stdout.

This tool adjusts the `org.osbuild.rpm` stage. It consumes the `mpp-depsolve`
option and produces a package-list and source-entries.

It supports version "1" and version "2" of the manifest description format.

The parameters for this pre-processor, version "1", look like this:

```
...
    {
      "name": "org.osbuild.rpm",
      ...
      "options": {
        ...
        "mpp-depsolve": {
          "architecture": "x86_64",
          "module-platform-id": "f32",
          "baseurl": "http://mirrors.kernel.org/fedora/releases/32/Everything/x86_64/os",
          "repos": [
            {
              "id": "default",
              "metalink": "https://mirrors.fedoraproject.org/metalink?repo=fedora-32&arch=$basearch"
            }
          ],
          "packages": [
            "@core",
            "dracut-config-generic",
            "grub2-pc",
            "kernel"
          ],
          "excludes": [
            (optional excludes)
          ]
        }
      }
    }
...
```

The parameters for this pre-processor, version "2", look like this:

```
...
    {
      "name": "org.osbuild.rpm",
      ...
      "inputs": {
        packages: {
          "mpp-depsolve": {
              see above
          }
        }
      }
    }
...
```
"""

import argparse
import contextlib
import json
import os
import sys
import tempfile
import urllib.parse

import dnf
import hawkey


class State:
    dnf_cache = None                    # DNF Cache Directory

    manifest = None                     # Input/Working Manifest
    manifest_urls = None                # Link to sources URL dict
    manifest_todo = []                  # Array of links to RPM stages


def _dnf_repo(conf, desc):
    repo = dnf.repo.Repo(desc["id"], conf)
    if "baseurl" in desc:
        repo.baseurl = desc["baseurl"]
    elif "metalink" in desc:
        repo.metalink = desc["metalink"]
    elif "mirrorlist" in desc:
        repo.metalink = desc["mirrorlist"]
    else:
        raise ValueError("repo description does not contain baseurl, metalink, or mirrorlist keys")
    return repo


def _dnf_base(repos, module_platform_id, persistdir, cachedir, arch):
    base = dnf.Base()
    if cachedir:
        base.conf.cachedir = cachedir
    base.conf.config_file_path = "/dev/null"
    base.conf.module_platform_id = module_platform_id
    base.conf.persistdir = persistdir
    base.conf.substitutions['arch'] = arch
    base.conf.substitutions['basearch'] = dnf.rpm.basearch(arch)

    for repo in repos:
        base.repos.add(_dnf_repo(base.conf, repo))

    base.fill_sack(load_system_repo=False)
    return base


def _dnf_resolve(state, mpp_depsolve):
    deps = []

    arch = mpp_depsolve["architecture"]
    mpid = mpp_depsolve["module-platform-id"]
    repos = mpp_depsolve.get("repos", [])
    packages = mpp_depsolve.get("packages", [])
    excludes = mpp_depsolve.get("excludes", [])
    baseurl = mpp_depsolve.get("baseurl")

    baseurls = {
        repo["id"]: repo.get("baseurl", baseurl) for repo in repos
    }

    if len(packages) > 0:
        with tempfile.TemporaryDirectory() as persistdir:
            base = _dnf_base(repos, mpid, persistdir, state.dnf_cache, arch)
            base.install_specs(packages, exclude=excludes)
            base.resolve()

            for tsi in base.transaction:
                if tsi.action not in dnf.transaction.FORWARD_ACTIONS:
                    continue

                checksum_type = hawkey.chksum_name(tsi.pkg.chksum[0])
                checksum_hex = tsi.pkg.chksum[1].hex()

                path = tsi.pkg.relativepath
                base = baseurls[tsi.pkg.reponame]
                # dep["path"] often starts with a "/", even though it's meant to be
                # relative to `baseurl`. Strip any leading slashes, but ensure there's
                # exactly one between `baseurl` and the path.
                url = urllib.parse.urljoin(base + "/", path.lstrip("/"))

                pkg = {
                    "checksum": f"{checksum_type}:{checksum_hex}",
                    "name": tsi.pkg.name,
                    "url": url,
                }
                deps.append(pkg)

    return deps


def _manifest_enter(manifest, key, default):
    if key not in manifest:
        manifest[key] = default
    return manifest[key]


def _manifest_parse_v1(state, data):
    manifest = data

    # Resolve "sources"."org.osbuild.files"."url".
    manifest_sources = _manifest_enter(manifest, "sources", {})
    manifest_files = _manifest_enter(manifest_sources, "org.osbuild.files", {})
    manifest_urls = _manifest_enter(manifest_files, "urls", {})

    # Resolve "pipeline"."stages".
    manifest_pipeline = _manifest_enter(manifest, "pipeline", {})
    manifest_stages = _manifest_enter(manifest_pipeline, "stages", [])

    # Collect all stages of interest in `manifest_todo`.
    manifest_todo = []
    for stage in manifest_stages:
        if stage.get("name", "") != "org.osbuild.rpm":
            continue

        stage_options = _manifest_enter(stage, "options", {})
        if "mpp-depsolve" not in stage_options:
            continue

        manifest_todo.append(stage)

    # Remember links of interest.
    state.manifest = manifest
    state.manifest_urls = manifest_urls
    state.manifest_todo = manifest_todo


def _manifest_process_v1(state, stage):
    options = _manifest_enter(stage, "options", {})
    options_mpp = _manifest_enter(options, "mpp-depsolve", {})
    options_packages = _manifest_enter(options, "packages", [])

    del(options["mpp-depsolve"])

    deps = _dnf_resolve(state, options_mpp)
    for dep in deps:
        options_packages.append(dep["checksum"])
        state.manifest_urls[dep["checksum"]] = dep["url"]


def _manifest_depsolve_v1(state, src):
    _manifest_parse_v1(state, src)

    for stage in state.manifest_todo:
        _manifest_process_v1(state, stage)


def _manifest_parse_v2(state, manifest):
    todo = []

    for pipeline in manifest.get("pipelines", {}):
        for stage in pipeline.get("stages", []):
            if stage["type"] != "org.osbuild.rpm":
                continue

            inputs = _manifest_enter(stage, "inputs", {})
            packages = _manifest_enter(inputs, "packages", {})

            if "mpp-depsolve" not in packages:
                continue

            todo.append(packages)

    sources = _manifest_enter(manifest, "sources", {})
    files = _manifest_enter(sources, "org.osbuild.curl", {})
    urls = _manifest_enter(files, "items", {})

    state.manifest = manifest
    state.manifest_todo = todo
    state.manifest_urls = urls


def _manifest_process_v2(state, ip):
    urls = state.manifest_urls
    refs = _manifest_enter(ip, "references", {})

    mpp = ip["mpp-depsolve"]

    deps = _dnf_resolve(state, mpp)

    for dep in deps:
        checksum = dep["checksum"]
        refs[checksum] = {}
        urls[checksum] = dep["url"]

    del ip["mpp-depsolve"]


def _manifest_depsolve_v2(state, src):
    _manifest_parse_v2(state, src)

    for todo in state.manifest_todo:
        _manifest_process_v2(state, todo)


def _main_args(argv):
    parser = argparse.ArgumentParser(description="Generate Test Manifests")

    parser.add_argument(
        "--dnf-cache",
        metavar="PATH",
        type=os.path.abspath,
        default=None,
        help="Path to DNF cache-directory to use",
    )

    return parser.parse_args(argv[1:])


@contextlib.contextmanager
def _main_state(args):
    state = State()
    if args.dnf_cache:
        state.dnf_cache = args.dnf_cache
    yield state


def _main_process(state):
    src = json.load(sys.stdin)
    version = src.get("version", "1")
    if version == "1":
        _manifest_depsolve_v1(state, src)
    elif version == "2":
        _manifest_depsolve_v2(state, src)
    else:
        print(f"Unknown manifest version {version}", file=sys.stderr)
        return 1

    json.dump(state.manifest, sys.stdout, indent=2)
    sys.stdout.write("\n")
    return 0


def main() -> int:
    args = _main_args(sys.argv)
    with _main_state(args) as state:
        _main_process(state)

    return 0


if __name__ == "__main__":
    sys.exit(main())