Blob Blame History Raw
#!/usr/bin/python3

import datetime
import dnf
import hashlib
import hawkey
import json
import sys
import tempfile

DNF_ERROR_EXIT_CODE = 10


def timestamp_to_rfc3339(timestamp):
    d = datetime.datetime.utcfromtimestamp(package.buildtime)
    return d.strftime('%Y-%m-%dT%H:%M:%SZ')


def dnfrepo(desc, parent_conf=None):
    """Makes a dnf.repo.Repo out of a JSON repository description"""

    repo = dnf.repo.Repo(desc["id"], parent_conf)

    if "baseurl" in desc:
        repo.baseurl = desc["baseurl"]
    elif "metalink" in desc:
        repo.metalink = desc["metalink"]
    elif "mirrorlist" in desc:
        repo.mirrorlist = desc["mirrorlist"]
    else:
        assert False

    if desc.get("ignoressl", False):
        repo.sslverify = False
    if "sslcacert" in desc:
        repo.sslcacert = desc["sslcacert"]
    if "sslclientkey" in desc:
        repo.sslclientkey = desc["sslclientkey"]
    if "sslclientcert" in desc:
        repo.sslclientcert = desc["sslclientcert"]

    # In dnf, the default metadata expiration time is 48 hours. However,
    # some repositories never expire the metadata, and others expire it much
    # sooner than that. Therefore we must make this configurable. If nothing
    # is provided, we default to never expiring the metadata, as hardcoding
    # some arbitrary does not seem very helpful.
    repo.metadata_expire = desc.get("metadata_expire", "-1")

    return repo


def create_base(repos, module_platform_id, persistdir, cachedir, arch):
    base = dnf.Base()

    # Enable fastestmirror to ensure we choose the fastest mirrors for
    # downloading metadata (when depsolving) and downloading packages.
    base.conf.fastestmirror = True

    # Try another mirror if it takes longer than 5 seconds to connect.
    base.conf.timeout = 5

    # Set the rest of the dnf configuration.
    base.conf.module_platform_id = module_platform_id
    base.conf.config_file_path = "/dev/null"
    base.conf.persistdir = persistdir
    base.conf.cachedir = cachedir
    base.conf.substitutions['arch'] = arch
    base.conf.substitutions['basearch'] = dnf.rpm.basearch(arch)

    for repo in repos:
        base.repos.add(dnfrepo(repo, base.conf))

    base.fill_sack(load_system_repo=False)
    return base


def exit_with_dnf_error(kind: str, reason: str):
    json.dump({"kind": kind, "reason": reason}, sys.stdout)
    sys.exit(DNF_ERROR_EXIT_CODE)


def repo_checksums(base):
    checksums = {}
    for repo in base.repos.iter_enabled():
        # Uses the same algorithm as libdnf to find cache dir:
        #   https://github.com/rpm-software-management/libdnf/blob/master/libdnf/repo/Repo.cpp#L1288
        if repo.metalink:
            url = repo.metalink
        elif repo.mirrorlist:
            url = repo.mirrorlist
        elif repo.baseurl:
            url = repo.baseurl[0]
        else:
            assert False

        digest = hashlib.sha256(url.encode()).hexdigest()[:16]

        repomd_file = f"{repo.id}-{digest}/repodata/repomd.xml"
        with open(f"{base.conf.cachedir}/{repomd_file}", "rb") as f:
            repomd = f.read()

        checksums[repo.id] = "sha256:" + hashlib.sha256(repomd).hexdigest()

    return checksums


call = json.load(sys.stdin)
command = call["command"]
arguments = call["arguments"]
repos = arguments.get("repos", {})
arch = arguments["arch"]
cachedir = arguments["cachedir"]
module_platform_id = arguments["module_platform_id"]

with tempfile.TemporaryDirectory() as persistdir:
    try:
        base = create_base(
            repos,
            module_platform_id,
            persistdir,
            cachedir,
            arch
        )
    except dnf.exceptions.Error as e:
        exit_with_dnf_error(
            type(e).__name__,
            f"Error occurred when setting up repo: {e}"
        )

    if command == "dump":
        packages = []
        for package in base.sack.query().available():
            packages.append({
                "name": package.name,
                "summary": package.summary,
                "description": package.description,
                "url": package.url,
                "epoch": package.epoch,
                "version": package.version,
                "release": package.release,
                "arch": package.arch,
                "buildtime": timestamp_to_rfc3339(package.buildtime),
                "license": package.license
            })
        json.dump({
            "checksums": repo_checksums(base),
            "packages": packages
        }, sys.stdout)

    elif command == "depsolve":
        errors = []

        try:
            base.install_specs(
                arguments["package-specs"],
                exclude=arguments.get("exclude-specs", [])
            )
        except dnf.exceptions.MarkingErrors as e:
            exit_with_dnf_error(
                "MarkingErrors",
                f"Error occurred when marking packages for installation: {e}"
            )

        try:
            base.resolve()
        except dnf.exceptions.DepsolveError as e:
            exit_with_dnf_error(
                "DepsolveError",
                (
                    "There was a problem depsolving "
                    f"{arguments['package-specs']}: {e}"
                )
            )

        dependencies = []
        for tsi in base.transaction:
            # Avoid using the install_set() helper, as it does not guarantee
            # a stable order
            if tsi.action not in dnf.transaction.FORWARD_ACTIONS:
                continue
            package = tsi.pkg

            dependencies.append({
                "name": package.name,
                "epoch": package.epoch,
                "version": package.version,
                "release": package.release,
                "arch": package.arch,
                "repo_id": package.reponame,
                "path": package.relativepath,
                "remote_location": package.remote_location(),
                "checksum": (
                    f"{hawkey.chksum_name(package.chksum[0])}:"
                    f"{package.chksum[1].hex()}"
                )
            })
        json.dump({
            "checksums": repo_checksums(base),
            "dependencies": dependencies
        }, sys.stdout)