Blob Blame History Raw
#!/usr/bin/python3
"""
Assemble a file system tree for a bootable iso

This stage prepares a file system tree for a bootable ISO, like the
Anaconda installer. It follows the convention used by Lorax to
create the boot isos: It takes an input `rootfs`, which will serve
as the root file system. This is copied into a file with a `ext4`
file system which in turn will be made into a squashfs file system.
Options for controlling the root file-system creation can be given
via `rootfs`, like it size and the compression to be used.

The boot loader is configured via the `isolinux` and `efi` options.
Which combination makes sense depends on the targeted platform and
architecture.
The kernel and initrd are taken from the tree given via the `kernel`
input, or if that was not specified, from `rootfs`. In either case
it will look for the specified kernel in the `/boot` directory.
Additionally kernel command line flags can passed via `kernel_opts`.

This stage has the `.mono` suffix to indicate that is a monolithic
stage that could, and in the future will be, broken up into smaller
pieces.
"""

import contextlib
import os
import re
import shutil
import subprocess
import sys
import tempfile
import osbuild.remoteloop as remoteloop

import osbuild.api


SCHEMA_2 = """
"options": {
  "additionalProperties": false,
  "required": ["product", "kernel", "isolabel"],
  "properties": {
    "product": {
      "type": "object",
      "additionalProperties": false,
      "required": ["name", "version"],
      "properties": {
        "name": {"type": "string"},
        "version": {"type": "string"}
       }
    },
    "kernel": {
      "type": "string"
    },
    "isolabel": {
      "type": "string"
    },
    "efi": {
      "type": "object",
      "additionalProperties": false,
      "required": ["architectures", "vendor"],
      "properties": {
        "architectures": {
            "type": "array",
            "items": {
                "type": "string"
            }
        },
        "vendor": {
            "type": "string"
        }
      }
    },
    "isolinux": {
      "type": "object",
      "additionalProperties": false,
      "required": ["enabled"],
      "properties": {
        "enabled": {
          "type": "boolean"
        },
        "debug": {
            "type": "boolean"
        }
      }
    },
    "kernel_opts": {
      "description": "Additional kernel boot options",
      "type": "string"
    },
    "templates": {
      "type": "string",
      "default": "99-generic"
    },
    "rootfs": {
      "type": "object",
      "additionalProperties": false,
      "properties": {
        "compression": {
          "type": "object",
          "additionalProperties": false,
          "required": ["method"],
          "properties": {
            "method": {
              "enum": ["gzip", "xz"]
            },
            "options": {
              "type": "object",
              "additionalProperties": false,
              "properties": {
                "bcj": {
                  "enum": [
                    "x86",
                    "arm",
                    "armthumb",
                    "powerpc",
                    "sparc",
                    "ia64"
                  ]
                }
              }
            }
          }
        },
        "size": {
          "type": "integer",
          "description": "size in MB",
          "default": 3072
        }
      }
    }
  }
},
"inputs": {
  "type": "object",
  "additionalProperties": false,
  "required": ["rootfs"],
  "properties": {
    "rootfs": {
      "type": "object",
      "additionalProperties": true
    },
    "kernel": {
      "type": "object",
      "additionalProperties": true
    }
  }
}
"""


LORAX_TEMPLATES = "/usr/share/lorax/templates.d"


@contextlib.contextmanager
def mount(source, dest):
    subprocess.run(["mount", source, dest], check=True)
    try:
        yield dest
    finally:
        subprocess.run(["umount", "-R", dest], check=True)


def install(src, dst, mode=None):
    shutil.copyfile(src, dst)
    if mode:
        os.chmod(dst, mode)


def replace(target, patterns):
    finder = [(re.compile(p), s) for p, s in patterns]
    newfile = target + ".replace"

    with open(target, "r") as i, open(newfile, "w") as o:
        for line in i:
            for p, s in finder:
                line = p.sub(s, line)
            o.write(line)
    os.rename(newfile, target)


def make_rootfs(tree, image, size, workdir, loop_client):
    with open(image, "w") as f:
        os.ftruncate(f.fileno(), size)

    root = os.path.join(workdir, "rootfs")
    os.makedirs(root)

    with loop_client.device(image, 0, size) as dev:
        subprocess.run(["mkfs.ext4",
                        "-L", "Anaconda",
                        "-b", "4096",
                        "-m", "0",
                        dev],
                       input="y", encoding='utf-8', check=True)

        with mount(dev, root):
            print("copying tree")
            subprocess.run(["cp", "-a", f"{tree}/.", root],
                           check=True)
            print("done")


def make_efi(efi, info, root, loop_client):
    arches = efi["architectures"]
    vendor = efi["vendor"]

    efidir = os.path.join(root, "EFI", "BOOT")
    os.makedirs(efidir)

    #arch related data
    for arch in arches:
        arch = arch.lower()
        targets = [
            (f"shim{arch}.efi", f"BOOT{arch}.EFI".upper()),
            (f"mm{arch}.efi", f"mm{arch}.efi"),
            (f"gcd{arch}.efi", f"grub{arch}.efi")
        ]

        for src, dst in targets:
            shutil.copy2(os.path.join("/boot/efi/EFI/", vendor, src),
                         os.path.join(efidir, dst))

    # the font
    fontdir = os.path.join(efidir, "fonts")
    os.makedirs(fontdir, exist_ok=True)
    shutil.copy2("/usr/share/grub/unicode.pf2", fontdir)

    # the config
    configdir = info["configdir"]
    version = info["version"]
    name = info["name"]
    isolabel = info["isolabel"]
    cmdline = info["cmdline"]

    kdir = "/" + os.path.relpath(info["kerneldir"], start=root)
    print(f"kernel dir at {kdir}")

    config = os.path.join(efidir, "grub.cfg")
    shutil.copy2(os.path.join(configdir, "grub2-efi.cfg"), config)

    replace(config, [
        ("@VERSION@", version),
        ("@PRODUCT@", name),
        ("@KERNELNAME@", "vmlinuz"),
        ("@KERNELPATH@", os.path.join(kdir, "vmlinuz")),
        ("@INITRDPATH@", os.path.join(kdir, "initrd.img")),
        ("@ISOLABEL@", isolabel),
        ("@ROOT@", cmdline)
    ])

    if "IA32" in arches:
        shutil.copy2(config, os.path.join(efidir, "BOOT.cfg"))

    # estimate the size
    blocksize = 2048
    size = blocksize * 256  # blocksize * overhead
    for parent, dirs, files in os.walk(efidir):
        for name in files + dirs:
            t = os.path.join(parent, name)
            s = os.stat(t).st_size
            d = s % blocksize
            if not s or d:
                s += blocksize - d
            size += s
    print(f"Estimates efiboot size to be {size}")

    # create the image
    image = os.path.join(info["imgdir"], "efiboot.img")
    with open(image, "w") as f:
        os.ftruncate(f.fileno(), size)

    root = os.path.join(info["workdir"], "mnt")
    os.makedirs(root)

    with loop_client.device(image, 0, size) as dev:
        subprocess.run(["mkfs.fat",
                        "-n", "ANACONDA",
                        dev],
                       input="y", encoding='utf-8', check=True)

        with mount(dev, root):
            target = os.path.join(root, "EFI", "BOOT")
            shutil.copytree(efidir, target)
        subprocess.run(["ls", root], check=True)


def make_isolinux(cfg, root, info, tree):
    # the config
    configdir = info["configdir"]
    version = info["version"]
    name = info["name"]
    cmdline = info["cmdline"]
    kerneldir = info["kerneldir"]

    # boot loader
    isolinux = os.path.join(root, "isolinux")
    os.makedirs(isolinux)

    isolinuxfiles = [("isolinux.bin", 0o755),
                     ("ldlinux.c32", 0o755),
                     ("libcom32.c32", 0o755),
                     ("libutil.c32", 0o755),
                     ("vesamenu.c32", 0o755)]
    for target, mode in isolinuxfiles:
        src = os.path.join("/usr/share/syslinux/", target)
        dst = os.path.join(isolinux, target)
        install(src, dst, mode)

    if cfg.get("debug"):
        src = "/usr/share/syslinux/isolinux-debug.bin"
        dst = os.path.join(isolinux, "isolinux.bin")
        install(src, dst, 0o755)

    for target in ["isolinux.cfg", "boot.msg", "grub.conf"]:
        src = os.path.join(configdir, target)
        dst = os.path.join(isolinux, target)
        install(src, dst)

        replace(dst, [
            ("@VERSION@", version),
            ("@PRODUCT@", name),
            ("@ROOT@", cmdline)
        ])

    src = os.path.join(tree, "usr/share/anaconda/boot/syslinux-splash.png")
    dst = os.path.join(isolinux, "splash.png")
    install(src, dst)

    # link the kernel
    os.link(os.path.join(kerneldir, "vmlinuz"),
            os.path.join(isolinux, "vmlinuz"))
    os.link(os.path.join(kerneldir, "initrd.img"),
            os.path.join(isolinux, "initrd.img"))


# pylint: disable=too-many-statements
def main(inputs, root, options, workdir, loop_client):
    tree = inputs["rootfs"]["path"]
    name = options["product"]["name"]
    version = options["product"]["version"]
    kernel = options["kernel"]
    isolabel = options["isolabel"]
    templates = options["templates"]
    efi = options.get("efi")
    isolinux = options.get("isolinux", {})
    kopts = options.get("kernel_opts")
    rootfs = options.get("rootfs", {})

    # input directories
    templatedir = os.path.join(LORAX_TEMPLATES, templates)
    configdir = os.path.join(templatedir, "config_files", "x86")

    # output directories
    imgdir = os.path.join(root, "images")
    pxedir = os.path.join(imgdir, "pxeboot")

    os.makedirs(imgdir)

    # boot configuration
    cmdline = f"inst.stage2=hd:LABEL={isolabel}"
    if kopts:
        cmdline += " " + kopts

    info = {
        "version": version,
        "name": name,
        "isolabel": isolabel,
        "workdir": workdir,
        "configdir": configdir,
        "kerneldir": pxedir,
        "imgdir": imgdir,
        "cmdline": cmdline
    }

    #install the kernel
    kerneldir = pxedir
    kernel_input = inputs.get("kernel", inputs["rootfs"])
    kernel_tree = kernel_input["path"]
    bootdir = os.path.join(kernel_tree, "boot")

    os.makedirs(kerneldir)
    install(os.path.join(bootdir, f"vmlinuz-{kernel}"),
            os.path.join(kerneldir, "vmlinuz"))

    install(os.path.join(bootdir, f"initramfs-{kernel}.img"),
            os.path.join(kerneldir, "initrd.img"))

    # iso linux boot
    if isolinux.get("enabled"):
        make_isolinux(isolinux, root, info, tree)

    # efi boot
    if efi:
        make_efi(efi, info, root, loop_client)

    # install.img
    #   rootfs.img
    liveos_work = os.path.join(workdir, "liveos")
    liveos = os.path.join(liveos_work, "LiveOS")
    os.makedirs(liveos)

    rootfs_size = rootfs.get("size", 3072) * 1024*1024
    compression = rootfs.get("compression", {})

    rootfs = os.path.join(liveos, "rootfs.img")
    make_rootfs(tree, rootfs, rootfs_size, workdir, loop_client)

    installimg = os.path.join(imgdir, "install.img")
    cmd = ["mksquashfs", liveos_work, installimg]

    if compression:
        method = compression["method"]
        opts = compression.get("options", {})
        cmd += ["-comp", method]
        for opt, val in opts.items():
            cmd += [f"-X{opt}", val]

    subprocess.run(cmd, check=True)


if __name__ == '__main__':
    args = osbuild.api.arguments()
    _output_dir = args["tree"]
    with tempfile.TemporaryDirectory(dir=_output_dir) as _workdir:
        ret = main(args["inputs"],
                   _output_dir,
                   args["options"],
                   _workdir,
                   remoteloop.LoopClient("/run/osbuild/api/remoteloop"))
    sys.exit(ret)