Blob Blame History Raw
#!/usr/bin/python3
"""
Inputs for individual files

Provides all the files, named via their content hash, specified
via `references` in a new directory.

The returned data in `refs` is a dictionary where the keys are
the content hash of a file and the values dictionaries with
metadata for it. The input itself currently does not set any
metadata itself, but will forward any metadata set via the
`metadata` property. Keys in that must start with a prefix,
like `rpm.` to avoid namespace clashes. This is enforced via
schema validation.
"""


import json
import sys
import subprocess

from osbuild.objectstore import StoreClient


SCHEMA = r"""
"additionalProperties": false,
"required": ["type", "origin", "references"],
"properties": {
  "type": {
    "enum": ["org.osbuild.files"]
  },
  "origin": {
    "description": "The origin of the input (must be 'org.osbuild.source')",
    "type": "string",
    "enum": ["org.osbuild.source"]
  },
  "references": {
    "description": "Checksums of files to use as files input",
    "oneOf": [{
      "type": "array",
      "items": {
        "type": "string"
      }
    }, {
      "type": "object",
      "additionalProperties": false,
      "minProperties": 1,
      "patternProperties": {
        ".*": {
          "type": "object",
          "additionalProperties": false,
          "properties": {
            "metadata": {
              "description": "Additinal metadata to forward to the stage",
              "type": "object",
              "additionalProperties": false,
              "patternProperties": {
                "^\\w+[.]{1}\\w+$": {
                  "additionalProperties": false
                }
              }
            }
          }
        }
      }
    }]
  }
}
"""


def main():
    args = json.load(sys.stdin)
    refs = args["refs"]

    store = StoreClient(connect_to=args["api"]["store"])
    source = store.source("org.osbuild.files")
    output = store.mkdtemp(prefix="files-input-")

    for checksum in refs:
        try:
            subprocess.run(
                [
                    "ln",
                    f"{source}/{checksum}",
                    f"{output}/{checksum}",
                ],
                check=True,
            )
        except subprocess.CalledProcessError as e:
            json.dump({"error": e.output}, sys.stdout)
            return 1


    reply = {
        "path": output,
        "data": {
          "refs": refs
        }
    }

    json.dump(reply, sys.stdout)
    return 0


if __name__ == '__main__':
    r = main()
    sys.exit(r)