Blob Blame History Raw
#!/usr/bin/python3
"""
Assemble an OCI image archive

Assemble an Open Container Initiative[1] image[2] archive, i.e. a
tarball whose contents is in the OCI image layout.

Currently the only required options are `filename` and `architecture`.
The execution parameters for the image, which then should form the base
for the container, can be given via `config`. They have the same format
as the `config` option for the "OCI Image Configuration" (see [2]),
except those that map to the "Go type map[string]struct{}", which are
represented as array of strings.

The final resulting tarball, aka a "orci-archive", can be imported via
podman[3] with `podman pull oci-archive:<archive>`.

[1] https://www.opencontainers.org/
[2] https://github.com/opencontainers/image-spec/
[3] https://podman.io/
"""


import datetime
import json
import os
import subprocess
import sys
import tempfile


DEFAULT_PATH = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"


SCHEMA = """
"additionalProperties": false,
"required": ["architecture", "filename"],
"properties": {
  "architecture": {
    "description": "The CPU architecture of the image",
    "type": "string"
  },
  "filename": {
    "description": "Resulting image filename",
    "type": "string"
  },
  "config": {
    "description": "The execution parameters",
    "type": "object",
    "additionalProperties": false,
    "properties": {
      "Cmd": {
        "type": "array",
        "default": ["sh"],
        "items": {
          "type": "string"
        }
      },
      "Env": {
        "type": "array",
        "default": ["PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"],
        "items": {
          "type": "string"
        }
      },
      "ExposedPorts": {
        "type": "array",
        "items": {
          "type": "string"
        }
      },
      "User": {
        "type": "string"
      },
      "Labels": {
        "type": "object",
        "additionalProperties": true
      },
      "StopSiganl": {
        "type": "string"
      },
      "Volumes": {
        "type": "array",
        "items": {
          "type": "string"
        }
      },
      "WorkingDir": {
        "type": "string"
      }
    }
  }
}
"""


MEDIA_TYPES = {
    "layer": "application/vnd.oci.image.layer.v1.tar",
    "manifest": "application/vnd.oci.image.manifest.v1+json",
    "config": "application/vnd.oci.image.config.v1+json"
}


def sha256sum(path: str) -> str:
    ret = subprocess.run(["sha256sum", path],
                         stdout=subprocess.PIPE,
                         encoding="utf-8",
                         check=True)

    return ret.stdout.strip().split(" ")[0]


def blobs_add_file(blobs: str, path: str, mtype: str):
    digest = sha256sum(path)
    size = os.stat(path).st_size

    os.rename(path, os.path.join(blobs, digest))
    info = {
        "digest": "sha256:" + digest,
        "size": size,
        "mediaType": MEDIA_TYPES[mtype]
    }

    print(f"blobs: +{mtype} ({size}, {digest})")
    return info


def blobs_add_json(blobs: str, js: str, mtype: str):
    js_file = os.path.join(blobs, "temporary.js")
    with open(js_file, "w") as f:
        json.dump(js, f)

    return blobs_add_file(blobs, js_file, mtype)


def blobs_add_layer(blobs: str, tree: str):
    compression = "gzip"

    layer_file = os.path.join(blobs, "layer.tar")

    command = [
        "tar",
        "--selinux",
        "--acls",
        "--xattrs",
        "-cf", layer_file,
        "-C", tree,
    ] + os.listdir(tree)

    print("creating layer")
    subprocess.run(command,
                   stdout=subprocess.DEVNULL,
                   check=True)

    digest = "sha256:" + sha256sum(layer_file)

    print("compressing layer")
    suffix = ".compressed"
    subprocess.run([compression,
                    "-S", suffix,
                    layer_file],
                   stdout=subprocess.DEVNULL,
                   check=True)

    layer_file += suffix

    info = blobs_add_file(blobs, layer_file, "layer")
    info["mediaType"] += compression

    return digest, info


def config_from_options(options):
    command = options.get("Cmd", ["sh"])
    env = options.get("Env", ["PATH=" + DEFAULT_PATH])

    config = {
        "Env": env,
        "Cmd": command
    }

    for name in ["User", "Labels", "StopSignal", "WorkingDir"]:
        item = options.get(name)
        if item:
            config[name] = item

    for name in ["ExposedPorts", "Volumes"]:
        item = options.get(name)
        if item:
            config[name] = {x: {} for x in item}

    print(config)
    return config


def create_oci_dir(tree, output_dir, options):
    architecture = options["architecture"]

    config = {
        "created": datetime.datetime.utcnow().isoformat() + "Z",
        "architecture": architecture,
        "os": "linux",
        "config": config_from_options(options["config"]),
        "rootfs": {
            "type": "layers",
            "diff_ids": []
        }
    }

    manifest = {
        "schemaVersion": 2,
        "config": None,
        "layers": []
    }

    index = {
        "schemaVersion": 2,
        "manifests": []
    }

    blobs = os.path.join(output_dir, "blobs", "sha256")
    os.makedirs(blobs)

    ## layers / rootfs

    digest, info = blobs_add_layer(blobs, tree)

    config["rootfs"]["diff_ids"] = [digest]
    manifest["layers"].append(info)

    ## write config
    info = blobs_add_json(blobs, config, "config")
    manifest["config"] = info

    # manifest
    info = blobs_add_json(blobs, manifest, "manifest")
    index["manifests"].append(info)

    # index
    print("writing index")
    with open(os.path.join(output_dir, "index.json"), "w") as f:
        json.dump(index, f)

    # oci-layout tag
    with open(os.path.join(output_dir, "oci-layout"), "w") as f:
        json.dump({"imageLayoutVersion": "1.0.0"}, f)


def main(tree, output_dir, options):
    filename = options["filename"]

    with tempfile.TemporaryDirectory(dir=output_dir) as tmpdir:
        workdir = os.path.join(tmpdir, "output")
        os.makedirs(workdir)

        create_oci_dir(tree, workdir, options)

        command = [
            "tar",
            "--remove-files",
            "-cf", os.path.join(output_dir, filename),
            f"--directory={workdir}",
        ] + os.listdir(workdir)

        print("creating final archive")
        subprocess.run(command,
                       stdout=subprocess.DEVNULL,
                       check=True)


if __name__ == '__main__':
    args = json.load(sys.stdin)
    r = main(args["tree"], args["output_dir"], args["options"])
    sys.exit(r)