#!/usr/bin/python3 import datetime import dnf import hashlib import hawkey import json import sys import tempfile DNF_ERROR_EXIT_CODE = 10 def timestamp_to_rfc3339(timestamp): d = datetime.datetime.utcfromtimestamp(package.buildtime) return d.strftime('%Y-%m-%dT%H:%M:%SZ') def dnfrepo(desc, parent_conf=None): """Makes a dnf.repo.Repo out of a JSON repository description""" repo = dnf.repo.Repo(desc["id"], parent_conf) if "baseurl" in desc: repo.baseurl = desc["baseurl"] elif "metalink" in desc: repo.metalink = desc["metalink"] elif "mirrorlist" in desc: repo.mirrorlist = desc["mirrorlist"] else: assert False if desc.get("ignoressl", False): repo.sslverify = False if "sslcacert" in desc: repo.sslcacert = desc["sslcacert"] if "sslclientkey" in desc: repo.sslclientkey = desc["sslclientkey"] if "sslclientcert" in desc: repo.sslclientcert = desc["sslclientcert"] # In dnf, the default metadata expiration time is 48 hours. However, # some repositories never expire the metadata, and others expire it much # sooner than that. Therefore we must make this configurable. If nothing # is provided, we default to never expiring the metadata, as hardcoding # some arbitrary does not seem very helpful. repo.metadata_expire = desc.get("metadata_expire", "-1") return repo def create_base(repos, module_platform_id, persistdir, cachedir, arch): base = dnf.Base() # Enable fastestmirror to ensure we choose the fastest mirrors for # downloading metadata (when depsolving) and downloading packages. base.conf.fastestmirror = True # Try another mirror if it takes longer than 5 seconds to connect. base.conf.timeout = 5 # Set the rest of the dnf configuration. base.conf.module_platform_id = module_platform_id base.conf.config_file_path = "/dev/null" base.conf.persistdir = persistdir base.conf.cachedir = cachedir base.conf.substitutions['arch'] = arch base.conf.substitutions['basearch'] = dnf.rpm.basearch(arch) for repo in repos: base.repos.add(dnfrepo(repo, base.conf)) base.fill_sack(load_system_repo=False) return base def exit_with_dnf_error(kind: str, reason: str): json.dump({"kind": kind, "reason": reason}, sys.stdout) sys.exit(DNF_ERROR_EXIT_CODE) def repo_checksums(base): checksums = {} for repo in base.repos.iter_enabled(): # Uses the same algorithm as libdnf to find cache dir: # https://github.com/rpm-software-management/libdnf/blob/master/libdnf/repo/Repo.cpp#L1288 if repo.metalink: url = repo.metalink elif repo.mirrorlist: url = repo.mirrorlist elif repo.baseurl: url = repo.baseurl[0] else: assert False digest = hashlib.sha256(url.encode()).hexdigest()[:16] repomd_file = f"{repo.id}-{digest}/repodata/repomd.xml" with open(f"{base.conf.cachedir}/{repomd_file}", "rb") as f: repomd = f.read() checksums[repo.id] = "sha256:" + hashlib.sha256(repomd).hexdigest() return checksums call = json.load(sys.stdin) command = call["command"] arguments = call["arguments"] repos = arguments.get("repos", {}) arch = arguments["arch"] cachedir = arguments["cachedir"] module_platform_id = arguments["module_platform_id"] with tempfile.TemporaryDirectory() as persistdir: try: base = create_base( repos, module_platform_id, persistdir, cachedir, arch ) except dnf.exceptions.Error as e: exit_with_dnf_error( type(e).__name__, f"Error occurred when setting up repo: {e}" ) if command == "dump": packages = [] for package in base.sack.query().available(): packages.append({ "name": package.name, "summary": package.summary, "description": package.description, "url": package.url, "epoch": package.epoch, "version": package.version, "release": package.release, "arch": package.arch, "buildtime": timestamp_to_rfc3339(package.buildtime), "license": package.license }) json.dump({ "checksums": repo_checksums(base), "packages": packages }, sys.stdout) elif command == "depsolve": errors = [] try: base.install_specs( arguments["package-specs"], exclude=arguments.get("exclude-specs", []) ) except dnf.exceptions.MarkingErrors as e: exit_with_dnf_error( "MarkingErrors", f"Error occurred when marking packages for installation: {e}" ) try: base.resolve() except dnf.exceptions.DepsolveError as e: exit_with_dnf_error( "DepsolveError", ( "There was a problem depsolving " f"{arguments['package-specs']}: {e}" ) ) dependencies = [] for tsi in base.transaction: # Avoid using the install_set() helper, as it does not guarantee # a stable order if tsi.action not in dnf.transaction.FORWARD_ACTIONS: continue package = tsi.pkg dependencies.append({ "name": package.name, "epoch": package.epoch, "version": package.version, "release": package.release, "arch": package.arch, "repo_id": package.reponame, "path": package.relativepath, "remote_location": package.remote_location(), "checksum": ( f"{hawkey.chksum_name(package.chksum[0])}:" f"{package.chksum[1].hex()}" ) }) json.dump({ "checksums": repo_checksums(base), "dependencies": dependencies }, sys.stdout)