Blob Blame History Raw
#!/usr/bin/python3
"""Fetch OSTree commits from an repository

Uses ostree to pull specific commits from (remote) repositories
at the provided `url`. Can verify the commit, if one or more
gpg keys are provided via `gpgkeys`.
"""


import json
import os
import sys
import subprocess
import uuid


SCHEMA = """
"additionalProperties": false,
"definitions": {
  "item": {
    "description": "The commits to fetch indexed their checksum",
    "type": "object",
    "additionalProperties": false,
    "patternProperties": {
      "[0-9a-f]{5,64}": {
        "type": "object",
        "additionalProperties": false,
        "required": ["remote"],
        "properties": {
          "remote": {
            "type": "object",
            "additionalProperties": false,
            "required": ["url"],
            "properties": {
              "url": {
                "type": "string",
                "description": "URL of the repository."
              },
              "gpgkeys": {
                "type": "array",
                "items": {
                  "type": "string",
                  "description": "GPG keys to verify the commits"
                }
              }
            }
          }
        }
      }
    }
  }
},
"properties": {
  "items": {"$ref": "#/definitions/item"},
  "commits": {"$ref": "#/definitions/item"}
},
"oneOf": [{
  "required": ["items"]
}, {
  "required": ["commits"]
}]
"""


def ostree(*args, _input=None, **kwargs):
    args = list(args) + [f'--{k}={v}' for k, v in kwargs.items()]
    print("ostree " + " ".join(args), file=sys.stderr)
    subprocess.run(["ostree"] + args,
                   encoding="utf-8",
                   stdout=subprocess.PIPE,
                   stderr=subprocess.STDOUT,
                   input=_input,
                   check=True)


def download(commits, checksums, cache):
    # Prepare the cache and the output repo
    repo_cache = os.path.join(cache, "repo")
    ostree("init", mode="archive", repo=repo_cache)

    # Make sure the cache repository uses locks to protect the metadata during
    # shared access. This is the default since `2018.5`, but lets document this
    # explicitly here.
    ostree("config", "set", "repo.locking", "true", repo=repo_cache)

    for commit in checksums:
        remote = commits[commit]["remote"]
        url = remote["url"]
        gpg = remote.get("gpgkeys", [])
        uid = str(uuid.uuid4())

        verify_args = []
        if not gpg:
            verify_args = ["--no-gpg-verify"]

        ostree("remote", "add",
               uid, url,
               *verify_args,
               repo=repo_cache)

        for key in gpg:
            ostree("remote", "gpg-import", "--stdin", uid,
                   repo=repo_cache, _input=key)

        # Transfer the commit: remote → cache
        print(f"pulling {commit}", file=sys.stderr)
        ostree("pull", uid, commit, repo=repo_cache)

        # Remove the temporary remotes again
        ostree("remote", "delete", uid,
               repo=repo_cache)


def export(checksums, cache, output):
    repo_cache = os.path.join(cache, "repo")

    repo_out = os.path.join(output, "repo")
    ostree("init", mode="archive", repo=repo_out)

    for commit in checksums:
        # Transfer the commit: remote → cache
        print(f"exporting {commit}", file=sys.stderr)

        ostree("pull-local", repo_cache, commit,
               repo=repo_out)

    json.dump({}, sys.stdout)


def main(commits, options, checksums, cache, output):
    cache = os.path.join(cache, "org.osbuild.ostree")
    download_only = not output

    if not commits:
        commits = options.get("commits", {})

    if commits:
        if not checksums and download_only:
            checksums = [k for k, _ in commits.items()]

        os.makedirs(cache, exist_ok=True)
        try:
            download(commits, checksums, cache)
        except subprocess.CalledProcessError as e:
            output = e.output.strip()
            json.dump({"error": output}, sys.stdout)
            return 1

    if download_only:
        json.dump({}, sys.stdout)
        return 0

    os.makedirs(output, exist_ok=True)
    export(checksums, cache, output)

    return 0


if __name__ == '__main__':
    source_args = json.load(sys.stdin)
    r = main(source_args["items"],
             source_args["options"],
             source_args["checksums"],
             source_args["cache"],
             source_args.get("output"))
    sys.exit(r)