Blob Blame History Raw
#!/usr/bin/python3
"""
Source for downloading files from URLs.

The files are indexed by their content hash. Can download files
that require secrets. The only secret provider currently supported
is `org.osbuild.rhsm` for downloading Red Hat content that requires
a subscriptions.

Internally use curl to download the files; the files are cached in
an internal cache. Multiple parallel connections are used to speed
up the download.
"""


import concurrent.futures
import glob
import itertools
import json
import math
import os
import subprocess
import sys
import tempfile
import time


SCHEMA = """
"additionalProperties": false,
"properties": {
  "urls": {
    "description": "The files to fetch indexed their content checksum",
    "type": "object",
    "additionalProperties": false,
    "patternProperties": {
      "(md5|sha1|sha256|sha384|sha512):[0-9a-f]{5,64}": {
        "oneOf": [{
          "type": "string",
          "description": "URL to download the file from."
        }, {
          "type": "object",
          "additionalProperties": false,
          "required": ["url"],
          "properties": {
            "url": {
              "type": "string",
              "description": "URL to download the file from."
            },
            "secrets": {
              "type": "object",
              "additionalProperties": false,
              "required": ["name"],
              "properties": {
                "name": {
                  "type": "string",
                  "description": "Name of the secrets provider."
                }
              }
            }
          }
        }]
      }
    }
  }
}
"""


def verify_checksum(filename, checksum):
    algorithm, checksum = checksum.split(":", 1)
    if algorithm not in ("md5", "sha1", "sha256", "sha384", "sha512"):
        raise RuntimeError(f"unsupported checksum algorithm: {algorithm}")

    ret = subprocess.run(
        [f"{algorithm}sum", "-c"],
        input=f"{checksum} {filename}",
        stdout=subprocess.DEVNULL,
        encoding="utf-8",
        check=False
    )

    return ret.returncode == 0


def fetch(url, checksum, directory):
    # Invariant: all files in @directory must be named after their (verified) checksum.
    if os.path.isfile(f"{directory}/{checksum}"):
        return

    secrets = url.get("secrets")
    url_path = url.get("url")
    # Download to a temporary directory until we have verified the checksum. Use a
    # subdirectory, so we avoid copying accross block devices.
    with tempfile.TemporaryDirectory(prefix="osbuild-unverified-file-", dir=directory) as tmpdir:
        # some mirrors are sometimes broken. retry manually, because we could be
        # redirected to a different, working, one on retry.
        start_time = time.monotonic()
        return_code = 0
        for _ in range(20):
            elapsed_time = time.monotonic() - start_time
            if elapsed_time >= 300:
                continue
            curl_command = [
                "curl",
                "--silent",
                "--max-time", f"{int(math.ceil(300 - elapsed_time))}",
                "--connect-timeout", "60",
                "--fail",
                "--location",
                "--output", checksum,
            ]
            if secrets:
                if secrets.get('ssl_ca_cert'):
                    curl_command.extend(["--cacert", secrets.get('ssl_ca_cert')])
                if secrets.get('ssl_client_cert'):
                    curl_command.extend(["--cert", secrets.get('ssl_client_cert')])
                if secrets.get('ssl_client_key'):
                    curl_command.extend(["--key", secrets.get('ssl_client_key')])
            # url must follow options
            curl_command.append(url_path)

            curl = subprocess.run(curl_command, encoding="utf-8", cwd=tmpdir, check=False)
            return_code = curl.returncode
            if return_code == 0:
                break
        else:
            raise RuntimeError(f"curl: error downloading {url}: error code {return_code}")

        if not verify_checksum(f"{tmpdir}/{checksum}", checksum):
            raise RuntimeError(f"checksum mismatch: {checksum} {url}")

        # The checksum has been verified, move the file into place. in case we race
        # another download of the same file, we simply ignore the error as their
        # contents are guaranteed to be  the same.
        try:
            os.rename(f"{tmpdir}/{checksum}", f"{directory}/{checksum}")
        except FileExistsError:
            pass


def get_rhsm_secrets():
    rhsm_secrets = {
        'ssl_ca_cert': "/etc/rhsm/ca/redhat-uep.pem",
        'ssl_client_key': "",
        'ssl_client_cert': ""
    }

    keys = glob.glob("/etc/pki/entitlement/*-key.pem")
    for key in keys:
        # The key and cert have the same prefix
        cert = key.rstrip("-key.pem") + ".pem"
        # The key is only valid if it has a matching cert
        if os.path.exists(cert):
            rhsm_secrets['ssl_client_key'] = key
            rhsm_secrets['ssl_client_cert'] = cert
            return rhsm_secrets

    raise RuntimeError("no matching rhsm key and cert")


def main(options, checksums, cache, output):
    urls = options.get("urls", {})

    os.makedirs(cache, exist_ok=True)
    os.makedirs(output, exist_ok=True)

    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        requested_urls = []
        rhsm_secrets = None

        for checksum in checksums:
            try:
                url = urls[checksum]
            except KeyError:
                json.dump({"error": f"unknown file: {checksum}"}, sys.stdout)
                return 1

            if isinstance(url, dict):
                # check if url needs rhsm secrets
                if url.get("secrets", {}).get("name") == "org.osbuild.rhsm":
                    # rhsm secrets only need to be retrieved once and can then be reused
                    if rhsm_secrets is None:
                        try:
                            rhsm_secrets = get_rhsm_secrets()
                        except RuntimeError as e:
                            json.dump({"error": e.args[0]}, sys.stdout)
                            return 1
                    url["secrets"] = rhsm_secrets
            else:
                url = {"url": url}

            requested_urls.append(url)

        results = executor.map(fetch, requested_urls, checksums, itertools.repeat(cache))

        try:
            for _ in results:
                pass
        except RuntimeError as e:
            json.dump({"error": e.args[0]}, sys.stdout)
            return 1

    for checksum in checksums:
        try:
            subprocess.run(
                [
                    "cp",
                    "--reflink=auto",
                    f"{cache}/{checksum}",
                    f"{output}/{checksum}",
                ],
                check=True,
            )
        except subprocess.CalledProcessError as e:
            json.dump({"error": e.output}, sys.stdout)
            return 1

    json.dump({}, sys.stdout)
    return 0


if __name__ == '__main__':
    args = json.load(sys.stdin)
    r = main(args["options"], args["checksums"], args["cache"], args["output"])
    sys.exit(r)