Blob Blame History Raw
"""
Pipeline inputs

A pipeline input provides data in various forms to a `Stage`, like
files, OSTree commits or trees. The content can either be obtained
via a `Source` or have been built by a `Pipeline`. Thus an `Input`
is the bridge between various types of content that originate from
different types of sources.

The acceptable origin of the data is determined by the `Input`
itself. What types of input are allowed and required is determined
by the `Stage`.

To osbuild itself this is all transparent. The only data visible to
osbuild is the path. The input options are just passed to the
`Input` as is and the result is forwarded to the `Stage`.
"""


import hashlib
import importlib
import json
import os
import subprocess

from typing import Dict, Optional, Tuple

from .objectstore import StoreServer


class Input:
    """
    A single input with its corresponding options.
    """

    def __init__(self, info, origin: str, options: Dict):
        self.info = info
        self.origin = origin
        self.refs = {}
        self.options = options or {}
        self.id = self.calc_id()

    def add_reference(self, ref, options: Optional[Dict] = None):
        self.refs[ref] = options or {}
        self.id = self.calc_id()

    def calc_id(self):
        m = hashlib.sha256()
        m.update(json.dumps(self.name, sort_keys=True).encode())
        m.update(json.dumps(self.origin, sort_keys=True).encode())
        m.update(json.dumps(self.refs, sort_keys=True).encode())
        m.update(json.dumps(self.options, sort_keys=True).encode())
        return m.hexdigest()

    @property
    def name(self) -> str:
        return self.info.name

    def run(self, storeapi: StoreServer) -> Tuple[str, Dict]:
        name = self.info.name
        msg = {
            # mandatory bits
            "origin": self.origin,
            "refs": self.refs,

            # global options
            "options": self.options,

            # API endpoints
            "api": {
                "store": storeapi.socket_address
            }
        }

        # We want the `osbuild` python package that contains this
        # very module, which might be different from the system wide
        # installed one, to be accessible to the Input programs so
        # we detect our origin and set the `PYTHONPATH` accordingly
        modorigin = importlib.util.find_spec("osbuild").origin
        modpath = os.path.dirname(modorigin)
        env = os.environ.copy()
        env["PYTHONPATH"] = os.path.dirname(modpath)

        r = subprocess.run([self.info.path],
                           env=env,
                           input=json.dumps(msg),
                           stdout=subprocess.PIPE,
                           encoding="utf-8",
                           check=False)

        try:
            reply = json.loads(r.stdout)
        except ValueError:
            raise RuntimeError(f"{name}: error: {r.stderr}") from None

        if "error" in reply:
            raise RuntimeError(f"{name}: " + reply["error"])

        if r.returncode != 0:
            raise RuntimeError(f"{name}: error {r.returncode}")

        path, data = reply["path"], reply.get("data", {})

        return path, data