Blob Blame History Raw
#!/usr/bin/python3
"""
Assemble a bootable partitioned disk image with qemu-img

Assemble a bootable partitioned disk image using `qemu-img`.

Creates a sparse partitioned disk image of type `pttype` of a given `size`,
with a partition table according to `partitions` or a MBR partitioned disk
having a single bootable partition containing the root filesystem if the
`pttype` property is absent.

If the partition type is MBR it installs GRUB2 (using the buildhost's
`/usr/lib/grub/i386-pc/boot.img` etc.) as the bootloader.

Copies the tree contents into the root filesystem and then converts the raw
sparse image into the format requested with the `fmt` option.

Buildhost commands used: `truncate`, `mount`, `umount`, `sfdisk`,
`grub2-mkimage`, `mkfs.ext4` or `mkfs.xfs`, `qemu-img`.
"""


import contextlib
import json
import os
import shutil
import struct
import subprocess
import sys
import tempfile
from typing import List, BinaryIO
import osbuild.remoteloop as remoteloop

SCHEMA = """
"additionalProperties": false,
"required": ["format", "filename", "ptuuid", "size"],
"oneOf": [{
  "required": ["root_fs_uuid"]
},{
  "required": ["pttype", "partitions"]
}],
"properties": {
  "bootloader": {
    "description": "Options specific to the bootloader",
    "type": "object",
    "properties": {
      "type": {
        "description": "What bootloader to install",
        "type": "string",
        "enum": ["grub2", "zipl"]
      }
    }
  },
  "format": {
    "description": "Image file format to use",
    "type": "string",
    "enum": ["raw", "raw.xz", "qcow2", "vdi", "vmdk", "vpc", "vhdx"]
  },
  "filename": {
    "description": "Image filename",
    "type": "string"
  },
  "partitions": {
    "description": "Partition layout ",
    "type": "array",
    "items": {
      "description": "Description of one partition",
      "type": "object",
      "properties": {
        "bootable": {
          "description": "Mark the partition as bootable (dos)",
          "type": "boolean"
        },
        "name": {
          "description": "The partition name (GPT)",
          "type": "string"
        },
        "size": {
          "description": "The size of this partition",
          "type": "integer"
        },
        "start": {
          "description": "The start offset of this partition",
          "type": "integer"
        },
        "type": {
          "description": "The partition type (UUID or identifier)",
          "type": "string"
        },
        "uuid": {
           "description": "UUID of the partition (GPT)",
           "type": "string"
        },
        "filesystem": {
          "description": "Description of the filesystem",
          "type": "object",
          "required": ["mountpoint", "type", "uuid"],
          "properties": {
            "label": {
              "description": "Label for the filesystem",
              "type": "string"
            },
            "mountpoint": {
              "description": "Where to mount the partition",
              "type": "string"
            },
            "type": {
              "description": "Type of the filesystem",
              "type": "string",
              "enum": ["ext4", "xfs", "vfat"]
            },
            "uuid": {
              "description": "UUID for the filesystem",
              "type": "string"
            }
          }
        }
      }
    }
  },
  "ptuuid": {
    "description": "UUID for the disk image's partition table",
    "type": "string"
  },
  "pttype": {
    "description": "The type of the partition table",
    "type": "string",
    "enum": ["mbr", "dos", "gpt"]
  },
  "root_fs_uuid": {
    "description": "UUID for the root filesystem",
    "type": "string"
  },
  "size": {
    "description": "Virtual disk size",
    "type": "integer"
  },
  "root_fs_type": {
    "description": "Type of the root filesystem",
    "type": "string",
    "enum": ["ext4", "xfs"],
    "default": "ext4"
  }
}
"""

@contextlib.contextmanager
def mount(source, dest):
    subprocess.run(["mount", source, dest], check=True)
    try:
        yield dest
    finally:
        subprocess.run(["umount", "-R", dest], check=True)


def mkfs_ext4(device, uuid, label):
    opts = []
    if label:
        opts = ["-L", label]
    subprocess.run(["mkfs.ext4", "-U", uuid] + opts + [device],
                   input="y", encoding='utf-8', check=True)


def mkfs_xfs(device, uuid, label):
    opts = []
    if label:
        opts = ["-L", label]
    subprocess.run(["mkfs.xfs", "-m", f"uuid={uuid}"] + opts + [device],
                   encoding='utf-8', check=True)


def mkfs_vfat(device, uuid, label):
    volid = uuid.replace('-', '')
    opts = []
    if label:
        opts = ["-n", label]
    subprocess.run(["mkfs.vfat", "-i", volid] + opts + [device], encoding='utf-8', check=True)


class Filesystem:
    def __init__(self,
                 fstype: str,
                 uuid: str,
                 mountpoint: str,
                 label: str = None):
        self.type = fstype
        self.uuid = uuid
        self.mountpoint = mountpoint
        self.label = label

    def make_at(self, device: str):
        fs_type = self.type
        if fs_type == "ext4":
            maker = mkfs_ext4
        elif fs_type == "xfs":
            maker = mkfs_xfs
        elif fs_type == "vfat":
            maker = mkfs_vfat
        else:
            raise ValueError(f"Unknown filesystem type '{fs_type}'")
        maker(device, self.uuid, self.label)


# pylint: disable=too-many-instance-attributes
class Partition:
    def __init__(self,
                 pttype: str = None,
                 start: int = None,
                 size: int = None,
                 bootable: bool = False,
                 name: str = None,
                 uuid: str = None,
                 filesystem: Filesystem = None):
        self.type = pttype
        self.start = start
        self.size = size
        self.bootable = bootable
        self.name = name
        self.uuid = uuid
        self.filesystem = filesystem
        self.index = None

    @property
    def start_in_bytes(self):
        return (self.start or 0) * 512

    @property
    def size_in_bytes(self):
        return (self.size or 0) * 512

    @property
    def mountpoint(self):
        if self.filesystem is None:
            return None
        return self.filesystem.mountpoint

    @property
    def fs_type(self):
        if self.filesystem is None:
            return None
        return self.filesystem.type

    @property
    def fs_uuid(self):
        if self.filesystem is None:
            return None
        return self.filesystem.uuid


class PartitionTable:
    def __init__(self, label, uuid, partitions):
        self.label = label
        self.uuid = uuid
        self.partitions = partitions or []

    def __getitem__(self, key) -> Partition:
        return self.partitions[key]

    def partitions_with_filesystems(self) -> List[Partition]:
        """Return partitions with filesystems sorted by hierarchy"""
        def mountpoint_len(p):
            return len(p.mountpoint)
        parts_fs = filter(lambda p: p.filesystem is not None, self.partitions)
        return sorted(parts_fs, key=mountpoint_len)

    def partition_containing_root(self) -> Partition:
        """Return the partition containing the root filesystem"""
        for p in self.partitions:
            if p.mountpoint and p.mountpoint == "/":
                return p
        return None

    def partition_containing_boot(self) -> Partition:
        """Return the partition containing /boot"""
        for p in self.partitions_with_filesystems():
            if p.mountpoint == "/boot":
                return p
        # fallback to the root partition
        return self.partition_containing_root()

    def find_prep_partition(self) -> Partition:
        """Find the PReP partition'"""
        if self.label == "dos":
            prep_type = "41"
        elif self.label == "gpt":
            prep_type = "9E1A2D38-C612-4316-AA26-8B49521E5A8B"

        for part in self.partitions:
            if part.type.upper() == prep_type:
                return part
        return None

    def find_bios_boot_partition(self) -> Partition:
        """Find the BIOS-boot Partition"""
        bb_type = "21686148-6449-6E6F-744E-656564454649"
        for part in self.partitions:
            if part.type.upper() == bb_type:
                return part
        return None

    def write_to(self, target, sync=True):
        """Write the partition table to disk"""
        # generate the command for sfdisk to create the table
        command = f"label: {self.label}\nlabel-id: {self.uuid}"
        for partition in self.partitions:
            fields = []
            for field in ["start", "size", "type", "name", "uuid"]:
                value = getattr(partition, field)
                if value:
                    fields += [f'{field}="{value}"']
                if partition.bootable:
                    fields += ["bootable"]
            command += "\n" + ", ".join(fields)

        subprocess.run(["sfdisk", "-q", target],
                       input=command,
                       encoding='utf-8',
                       check=True)

        if sync:
            self.update_from(target)

    def update_from(self, target):
        """Update and fill in missing information from disk"""
        r = subprocess.run(["sfdisk", "--json", target],
                           stdout=subprocess.PIPE,
                           encoding='utf-8',
                           check=True)
        disk_table = json.loads(r.stdout)["partitiontable"]
        disk_parts = disk_table["partitions"]

        assert len(disk_parts) == len(self.partitions)
        for i, part in enumerate(self.partitions):
            part.index = i
            part.start = disk_parts[i]["start"]
            part.size = disk_parts[i]["size"]
            part.type = disk_parts[i].get("type")
            part.name = disk_parts[i].get("name")


def filesystem_from_json(js) -> Filesystem:
    return Filesystem(js["type"], js["uuid"], js["mountpoint"], js.get("label"))


def partition_from_json(js) -> Partition:
    p = Partition(pttype=js.get("type"),
                  start=js.get("start"),
                  size=js.get("size"),
                  bootable=js.get("bootable"),
                  name=js.get("name"),
                  uuid=js.get("uuid"))
    fs = js.get("filesystem")
    if fs:
        p.filesystem = filesystem_from_json(fs)
    return p


def partition_table_from_options(options) -> PartitionTable:
    ptuuid = options["ptuuid"]
    pttype = options.get("pttype", "dos")
    partitions = options.get("partitions")

    if pttype == "mbr":
        pttype = "dos"

    if partitions is None:
        # legacy mode, create a correct
        root_fs_uuid = options["root_fs_uuid"]
        root_fs_type = options.get("root_fs_type", "ext4")
        partitions = [{
            "bootable": True,
            "type": "83",
            "filesystem": {
                "type": root_fs_type,
                "uuid": root_fs_uuid,
                "mountpoint": "/"
            }
        }]
    parts = [partition_from_json(p) for p in partitions]
    return PartitionTable(pttype, ptuuid, parts)


def grub2_write_boot_image(boot_f: BinaryIO,
                           image_f: BinaryIO,
                           core_location: int):
    """Write the boot image (grub2 stage 1) to the MBR"""

    # The boot.img file is 512 bytes, but we must only copy the first 440
    # bytes, as these contain the bootstrapping code. The rest of the
    # first sector contains the partition table, and must not be
    # overwritten.
    image_f.seek(0)
    image_f.write(boot_f.read(440))

    # Additionally, write the location (in sectors) of
    # the grub core image, into the boot image, so the
    # latter can find the former. To exact location is
    # taken from grub2's "boot.S":
    #  GRUB_BOOT_MACHINE_KERNEL_SECTOR 0x5c (= 92)
    image_f.seek(0x5c)
    image_f.write(struct.pack("<Q", core_location))


def grub2_write_core_mbrgap(core_f: BinaryIO,
                            image_f: BinaryIO,
                            pt: PartitionTable):
    """Write the core into the MBR gap"""
    # For historic and performance reasons the first partition
    # is aligned to a specific sector number (used to be 64,
    # now it is 2048), which leaves a gap between it and the MBR,
    # where the core image can be embedded in; also check it fits
    core_size = os.fstat(core_f.fileno()).st_size
    partition_offset = pt[0].start_in_bytes
    assert core_size < partition_offset - 512
    image_f.seek(512)
    shutil.copyfileobj(core_f, image_f)

    return 1  # the location of the core image in sectors


def grub2_write_core_prep_part(core_f: BinaryIO,
                               image_f: BinaryIO,
                               pt: PartitionTable):
    """Write the core to the prep partition"""
    # On ppc64le with Open Firmware a special partition called
    # 'PrEP partition' is used the store the grub2 core; the
    # firmware looks for this partition and directly loads and
    # executes the core form it.
    prep_part = pt.find_prep_partition()
    if prep_part is None:
        raise ValueError("PrEP partition missing")

    core_size = os.fstat(core_f.fileno()).st_size
    assert core_size < prep_part.size_in_bytes - 512
    image_f.seek(prep_part.start_in_bytes)
    shutil.copyfileobj(core_f, image_f)

    return prep_part.start


def grub2_write_core_bios_boot(core_f: BinaryIO,
                               image_f: BinaryIO,
                               pt: PartitionTable):
    """Write the core to the bios boot partition"""
    bb = pt.find_bios_boot_partition()
    if bb is None:
        raise ValueError("BIOS-boot partition missing")
    core_size = os.fstat(core_f.fileno()).st_size
    if bb.size_in_bytes < core_size:
        raise ValueError("BIOS-boot partition too small")

    image_f.seek(bb.start_in_bytes)
    shutil.copyfileobj(core_f, image_f)

    # The core image needs to know from where to load its
    # second sector so that information needs to be embedded
    # into the image itself at the right location, i.e.
    # the "sector start parameter" ("size .long 2, 0"):
    # 0x200 - GRUB_BOOT_MACHINE_LIST_SIZE (12) = 0x1F4 = 500
    image_f.seek(bb.start_in_bytes + 500)
    image_f.write(struct.pack("<Q", bb.start + 1))

    return bb.start


def grub2_partition_id(pt: PartitionTable):
    """grub2 partition identifier for the partition table"""

    label2grub = {
        "dos": "msdos",
        "gpt": "gpt"
    }

    if pt.label not in label2grub:
        raise ValueError(f"Unknown partition type: {pt.label}")

    return label2grub[pt.label]


def install_grub2(image: str, pt: PartitionTable, options):
    """Install grub2 to image"""
    platform = options.get("platform", "i386-pc")

    boot_path = f"/usr/lib/grub/{platform}/boot.img"
    core_path = "/var/tmp/grub2-core.img"

    # Create the level-2 & 3 stages of the bootloader, aka the core
    # it consists of the kernel plus the core modules required to
    # to locate and load the rest of the grub modules, specifically
    # the "normal.mod" (Stage 4) module.
    # The exact list of modules required to be built into the core
    # depends on the system: it is the minimal set needed to find
    # read the partition and its filesystem containing said modules
    # and the grub configuration [NB: efi systems work differently]

    # find the partition containing /boot/grub2
    boot_part = pt.partition_containing_boot()

    # modules: access the disk and read the partition table:
    # on x86 'biosdisk' is used to access the disk, on ppc64le
    # with "Open Firmware" the latter is directly loading core
    if platform == "i386-pc":
        modules = ["biosdisk"]
    else:
        modules = []

    if pt.label == "dos":
        modules += ["part_msdos"]
    elif pt.label == "gpt":
        modules += ["part_gpt"]

    # modules: grubs needs to access the filesystems of /boot/grub2
    fs_type = boot_part.fs_type or "unknown"

    if fs_type == "ext4":
        modules += ["ext2"]
    elif fs_type == "xfs":
        modules += ["xfs"]
    else:
        raise ValueError(f"unknown boot filesystem type: '{fs_type}'")

    # identify the partition containing boot for grub2
    partid = grub2_partition_id(pt) + str(boot_part.index + 1)
    print(f"grub2 prefix {partid}")

    # the path containing the grub files relative partition
    grub_path = os.path.relpath("/boot/grub2", boot_part.mountpoint)

    # now created the core image
    subprocess.run(["grub2-mkimage",
                    "--verbose",
                    "--directory", f"/usr/lib/grub/{platform}",
                    "--prefix", f"(,{partid})/{grub_path}",
                    "--format", platform,
                    "--compression", "auto",
                    "--output", core_path] +
                   modules,
                   check=True)

    with open(image, "rb+") as image_f:
        # Write the newly created grub2 core to the image
        with open(core_path, "rb") as core_f:
            if platform == "powerpc-ieee1275":
                # write the core to the PrEP partition
                core_loc = grub2_write_core_prep_part(core_f, image_f, pt)
            elif pt.label == "gpt":
                # gpt requires a bios-boot partition
                core_loc = grub2_write_core_bios_boot(core_f, image_f, pt)
            else:
                # embed the core in the MBR gap
                core_loc = grub2_write_core_mbrgap(core_f, image_f, pt)

        # On certain platforms (x86) a level 1 boot loader is required
        # to load to the core image (on ppc64le & Open Firmware this is
        # done by the firmware itself)
        if platform == "i386-pc":
            # On x86, the boot image just jumps to core image
            with open(boot_path, "rb") as boot_f:
                grub2_write_boot_image(boot_f, image_f, core_loc)


def parse_blsfile(blsfile):
    params = {}
    with open(blsfile, "r") as bls:
        for line in bls:
            key, value = line.split(' ', 1)
            params[key] = value.strip()
    return params


def find_kernel(root):
    base = f"{root}/boot/loader/entries"
    for dirent in os.scandir(base):
        fn, ext = os.path.splitext(dirent.name)
        if ext != ".conf" or fn.endswith("rescue"):
            continue
        blsfile = f"{base}/{dirent.name}"
        params = parse_blsfile(blsfile)
        linux = root + params["linux"]
        initrd = root + params["initrd"]
        options = params.get("options", "")
        return linux, initrd, options


def install_zipl(root: str, device: str, pt: PartitionTable):
    """Install the bootloader on s390x via zipl"""
    kernel, initrd, kopts = find_kernel(root)
    part_with_boot = pt.partition_containing_boot()
    subprocess.run(["/usr/sbin/zipl",
                    "--verbose",
                    "--target", f"{root}/boot",
                    "--image", kernel,
                    "--ramdisk", initrd,
                    "--parameters", kopts,
                    "--targetbase", device,
                    "--targettype", "SCSI",
                    "--targetblocksize", "512",
                    "--targetoffset", str(part_with_boot.start)],
                   check=True)


def main(tree, output_dir, options, loop_client):
    fmt = options["format"]
    filename = options["filename"]
    size = options["size"]
    bootloader = options.get("bootloader", {"type": "none"})

    # sfdisk works on sectors of 512 bytes and ignores excess space - be explicit about this
    if size % 512 != 0:
        raise ValueError("`size` must be a multiple of sector size (512)")

    if fmt not in ["raw", "raw.xz", "qcow2", "vdi", "vmdk", "vpc", "vhdx"]:
        raise ValueError("`format` must be one of raw, qcow, vdi, vmdk, vpc, vhdx")

    image = "/var/tmp/osbuild-image.raw"

    # Create an empty image file
    subprocess.run(["truncate", "--size", str(size), image], check=True)

    # The partition table
    pt = partition_table_from_options(options)
    pt.write_to(image)

    # For backwards comparability assume that if bootloader is not
    # set and partition scheme is dos (MBR) grub2 is being used
    if bootloader["type"] == "none" and pt.label == "dos":
        bootloader["type"] = "grub2"

    # Install the bootloader
    if bootloader["type"] == "grub2":
        install_grub2(image, pt, bootloader)

    # Now assemble the filesystem hierarchy and copy the tree into the image
    with contextlib.ExitStack() as cm:
        root = cm.enter_context(tempfile.TemporaryDirectory(dir=output_dir, prefix="osbuild-mnt-"))
        disk = cm.enter_context(loop_client.device(image, 0, size))
        # iterate the partition according to their position in the filesystem tree
        for partition in pt.partitions_with_filesystems():
            offset, size = partition.start_in_bytes, partition.size_in_bytes
            loop = cm.enter_context(loop_client.device(image, offset, size))
            # make the specified filesystem, if any
            if partition.filesystem is None:
                continue
            partition.filesystem.make_at(loop)
            # now mount it
            mountpoint = os.path.normpath(f"{root}/{partition.mountpoint}")
            os.makedirs(mountpoint, exist_ok=True)
            cm.enter_context(mount(loop, mountpoint))
        # the filesystem tree should now be properly setup,
        # copy the tree into the target image
        subprocess.run(["cp", "-a", f"{tree}/.", root], check=True)

        # zipl needs access to the /boot directory and the whole image
        # via the loopback device node
        if bootloader["type"] == "zipl":
            install_zipl(root, disk, pt)

    if fmt == "raw":
        subprocess.run(["cp", image, f"{output_dir}/{filename}"], check=True)
    elif fmt == "raw.xz":
        with open(f"{output_dir}/{filename}", "w") as f:
            subprocess.run(
                ["xz", "--keep", "--stdout", "-0", image],
                stdout=f,
                check=True,
                env={
                    "XZ_OPT": "--threads 0"
                }
            )
    else:
        extra_args = {
            "qcow2": ["-c"],
            "vdi": [],
            "vmdk": ["-c"],
            "vpc": ["-o", "subformat=fixed,force_size"],
            "vhdx": []
        }
        subprocess.run([
            "qemu-img",
            "convert",
            "-O", fmt,
            *extra_args[fmt],
            image,
            f"{output_dir}/{filename}"
        ], check=True)


if __name__ == '__main__':
    args = json.load(sys.stdin)
    ret = main(args["tree"], args["output_dir"], args["options"], remoteloop.LoopClient("/run/osbuild/api/remoteloop"))
    sys.exit(ret)