| #!/usr/bin/python3 |
| |
| import datetime |
| import dnf |
| import hashlib |
| import hawkey |
| import json |
| import sys |
| import tempfile |
| |
| DNF_ERROR_EXIT_CODE = 10 |
| |
| |
| def timestamp_to_rfc3339(timestamp): |
| d = datetime.datetime.utcfromtimestamp(package.buildtime) |
| return d.strftime('%Y-%m-%dT%H:%M:%SZ') |
| |
| |
| def dnfrepo(desc, parent_conf=None): |
| """Makes a dnf.repo.Repo out of a JSON repository description""" |
| |
| repo = dnf.repo.Repo(desc["id"], parent_conf) |
| |
| if "baseurl" in desc: |
| repo.baseurl = desc["baseurl"] |
| elif "metalink" in desc: |
| repo.metalink = desc["metalink"] |
| elif "mirrorlist" in desc: |
| repo.mirrorlist = desc["mirrorlist"] |
| else: |
| assert False |
| |
| if desc.get("ignoressl", False): |
| repo.sslverify = False |
| if "sslcacert" in desc: |
| repo.sslcacert = desc["sslcacert"] |
| if "sslclientkey" in desc: |
| repo.sslclientkey = desc["sslclientkey"] |
| if "sslclientcert" in desc: |
| repo.sslclientcert = desc["sslclientcert"] |
| |
| # In dnf, the default metadata expiration time is 48 hours. However, |
| # some repositories never expire the metadata, and others expire it much |
| # sooner than that. Therefore we must make this configurable. If nothing |
| # is provided, we default to never expiring the metadata, as hardcoding |
| # some arbitrary does not seem very helpful. |
| repo.metadata_expire = desc.get("metadata_expire", "-1") |
| |
| return repo |
| |
| |
| def create_base(repos, module_platform_id, persistdir, cachedir, arch): |
| base = dnf.Base() |
| |
| # Enable fastestmirror to ensure we choose the fastest mirrors for |
| # downloading metadata (when depsolving) and downloading packages. |
| base.conf.fastestmirror = True |
| |
| # Try another mirror if it takes longer than 5 seconds to connect. |
| base.conf.timeout = 5 |
| |
| # Set the rest of the dnf configuration. |
| base.conf.module_platform_id = module_platform_id |
| base.conf.config_file_path = "/dev/null" |
| base.conf.persistdir = persistdir |
| base.conf.cachedir = cachedir |
| base.conf.substitutions['arch'] = arch |
| base.conf.substitutions['basearch'] = dnf.rpm.basearch(arch) |
| |
| for repo in repos: |
| base.repos.add(dnfrepo(repo, base.conf)) |
| |
| base.fill_sack(load_system_repo=False) |
| return base |
| |
| |
| def exit_with_dnf_error(kind: str, reason: str): |
| json.dump({"kind": kind, "reason": reason}, sys.stdout) |
| sys.exit(DNF_ERROR_EXIT_CODE) |
| |
| |
| def repo_checksums(base): |
| checksums = {} |
| for repo in base.repos.iter_enabled(): |
| # Uses the same algorithm as libdnf to find cache dir: |
| # https://github.com/rpm-software-management/libdnf/blob/master/libdnf/repo/Repo.cpp#L1288 |
| if repo.metalink: |
| url = repo.metalink |
| elif repo.mirrorlist: |
| url = repo.mirrorlist |
| elif repo.baseurl: |
| url = repo.baseurl[0] |
| else: |
| assert False |
| |
| digest = hashlib.sha256(url.encode()).hexdigest()[:16] |
| |
| repomd_file = f"{repo.id}-{digest}/repodata/repomd.xml" |
| with open(f"{base.conf.cachedir}/{repomd_file}", "rb") as f: |
| repomd = f.read() |
| |
| checksums[repo.id] = "sha256:" + hashlib.sha256(repomd).hexdigest() |
| |
| return checksums |
| |
| |
| call = json.load(sys.stdin) |
| command = call["command"] |
| arguments = call["arguments"] |
| repos = arguments.get("repos", {}) |
| arch = arguments["arch"] |
| cachedir = arguments["cachedir"] |
| module_platform_id = arguments["module_platform_id"] |
| |
| with tempfile.TemporaryDirectory() as persistdir: |
| try: |
| base = create_base( |
| repos, |
| module_platform_id, |
| persistdir, |
| cachedir, |
| arch |
| ) |
| except dnf.exceptions.Error as e: |
| exit_with_dnf_error( |
| type(e).__name__, |
| f"Error occurred when setting up repo: {e}" |
| ) |
| |
| if command == "dump": |
| packages = [] |
| for package in base.sack.query().available(): |
| packages.append({ |
| "name": package.name, |
| "summary": package.summary, |
| "description": package.description, |
| "url": package.url, |
| "epoch": package.epoch, |
| "version": package.version, |
| "release": package.release, |
| "arch": package.arch, |
| "buildtime": timestamp_to_rfc3339(package.buildtime), |
| "license": package.license |
| }) |
| json.dump({ |
| "checksums": repo_checksums(base), |
| "packages": packages |
| }, sys.stdout) |
| |
| elif command == "depsolve": |
| errors = [] |
| |
| try: |
| base.install_specs( |
| arguments["package-specs"], |
| exclude=arguments.get("exclude-specs", []) |
| ) |
| except dnf.exceptions.MarkingErrors as e: |
| exit_with_dnf_error( |
| "MarkingErrors", |
| f"Error occurred when marking packages for installation: {e}" |
| ) |
| |
| try: |
| base.resolve() |
| except dnf.exceptions.DepsolveError as e: |
| exit_with_dnf_error( |
| "DepsolveError", |
| ( |
| "There was a problem depsolving " |
| f"{arguments['package-specs']}: {e}" |
| ) |
| ) |
| |
| dependencies = [] |
| for tsi in base.transaction: |
| # Avoid using the install_set() helper, as it does not guarantee |
| # a stable order |
| if tsi.action not in dnf.transaction.FORWARD_ACTIONS: |
| continue |
| package = tsi.pkg |
| |
| dependencies.append({ |
| "name": package.name, |
| "epoch": package.epoch, |
| "version": package.version, |
| "release": package.release, |
| "arch": package.arch, |
| "repo_id": package.reponame, |
| "path": package.relativepath, |
| "remote_location": package.remote_location(), |
| "checksum": ( |
| f"{hawkey.chksum_name(package.chksum[0])}:" |
| f"{package.chksum[1].hex()}" |
| ) |
| }) |
| json.dump({ |
| "checksums": repo_checksums(base), |
| "dependencies": dependencies |
| }, sys.stdout) |