#!/usr/bin/python3 """ Assemble a bootable partitioned disk image with qemu-img Assemble a bootable partitioned disk image using `qemu-img`. Creates a sparse partitioned disk image of type `pttype` of a given `size`, with a partition table according to `partitions` or a MBR partitioned disk having a single bootable partition containing the root filesystem if the `pttype` property is absent. If the partition type is MBR it installs GRUB2 (using the buildhost's `/usr/lib/grub/i386-pc/boot.img` etc.) as the bootloader. Copies the tree contents into the root filesystem and then converts the raw sparse image into the format requested with the `fmt` option. Buildhost commands used: `truncate`, `mount`, `umount`, `sfdisk`, `grub2-mkimage`, `mkfs.ext4` or `mkfs.xfs`, `qemu-img`. """ import contextlib import json import os import shutil import struct import subprocess import sys import tempfile from typing import List, BinaryIO import osbuild.remoteloop as remoteloop SCHEMA = """ "additionalProperties": false, "required": ["format", "filename", "ptuuid", "size"], "oneOf": [{ "required": ["root_fs_uuid"] },{ "required": ["pttype", "partitions"] }], "properties": { "bootloader": { "description": "Options specific to the bootloader", "type": "object", "properties": { "type": { "description": "What bootloader to install", "type": "string", "enum": ["grub2", "zipl"] } } }, "format": { "description": "Image file format to use", "type": "string", "enum": ["raw", "raw.xz", "qcow2", "vdi", "vmdk", "vpc", "vhdx"] }, "filename": { "description": "Image filename", "type": "string" }, "partitions": { "description": "Partition layout ", "type": "array", "items": { "description": "Description of one partition", "type": "object", "properties": { "bootable": { "description": "Mark the partition as bootable (dos)", "type": "boolean" }, "name": { "description": "The partition name (GPT)", "type": "string" }, "size": { "description": "The size of this partition", "type": "integer" }, "start": { "description": "The start offset of this partition", "type": "integer" }, "type": { "description": "The partition type (UUID or identifier)", "type": "string" }, "uuid": { "description": "UUID of the partition (GPT)", "type": "string" }, "filesystem": { "description": "Description of the filesystem", "type": "object", "required": ["mountpoint", "type", "uuid"], "properties": { "label": { "description": "Label for the filesystem", "type": "string" }, "mountpoint": { "description": "Where to mount the partition", "type": "string" }, "type": { "description": "Type of the filesystem", "type": "string", "enum": ["ext4", "xfs", "vfat"] }, "uuid": { "description": "UUID for the filesystem", "type": "string" } } } } } }, "ptuuid": { "description": "UUID for the disk image's partition table", "type": "string" }, "pttype": { "description": "The type of the partition table", "type": "string", "enum": ["mbr", "dos", "gpt"] }, "root_fs_uuid": { "description": "UUID for the root filesystem", "type": "string" }, "size": { "description": "Virtual disk size", "type": "integer" }, "root_fs_type": { "description": "Type of the root filesystem", "type": "string", "enum": ["ext4", "xfs"], "default": "ext4" } } """ @contextlib.contextmanager def mount(source, dest): subprocess.run(["mount", source, dest], check=True) try: yield dest finally: subprocess.run(["umount", "-R", dest], check=True) def mkfs_ext4(device, uuid, label): opts = [] if label: opts = ["-L", label] subprocess.run(["mkfs.ext4", "-U", uuid] + opts + [device], input="y", encoding='utf-8', check=True) def mkfs_xfs(device, uuid, label): opts = [] if label: opts = ["-L", label] subprocess.run(["mkfs.xfs", "-m", f"uuid={uuid}"] + opts + [device], encoding='utf-8', check=True) def mkfs_vfat(device, uuid, label): volid = uuid.replace('-', '') opts = [] if label: opts = ["-n", label] subprocess.run(["mkfs.vfat", "-i", volid] + opts + [device], encoding='utf-8', check=True) class Filesystem: def __init__(self, fstype: str, uuid: str, mountpoint: str, label: str = None): self.type = fstype self.uuid = uuid self.mountpoint = mountpoint self.label = label def make_at(self, device: str): fs_type = self.type if fs_type == "ext4": maker = mkfs_ext4 elif fs_type == "xfs": maker = mkfs_xfs elif fs_type == "vfat": maker = mkfs_vfat else: raise ValueError(f"Unknown filesystem type '{fs_type}'") maker(device, self.uuid, self.label) # pylint: disable=too-many-instance-attributes class Partition: def __init__(self, pttype: str = None, start: int = None, size: int = None, bootable: bool = False, name: str = None, uuid: str = None, filesystem: Filesystem = None): self.type = pttype self.start = start self.size = size self.bootable = bootable self.name = name self.uuid = uuid self.filesystem = filesystem self.index = None @property def start_in_bytes(self): return (self.start or 0) * 512 @property def size_in_bytes(self): return (self.size or 0) * 512 @property def mountpoint(self): if self.filesystem is None: return None return self.filesystem.mountpoint @property def fs_type(self): if self.filesystem is None: return None return self.filesystem.type @property def fs_uuid(self): if self.filesystem is None: return None return self.filesystem.uuid class PartitionTable: def __init__(self, label, uuid, partitions): self.label = label self.uuid = uuid self.partitions = partitions or [] def __getitem__(self, key) -> Partition: return self.partitions[key] def partitions_with_filesystems(self) -> List[Partition]: """Return partitions with filesystems sorted by hierarchy""" def mountpoint_len(p): return len(p.mountpoint) parts_fs = filter(lambda p: p.filesystem is not None, self.partitions) return sorted(parts_fs, key=mountpoint_len) def partition_containing_root(self) -> Partition: """Return the partition containing the root filesystem""" for p in self.partitions: if p.mountpoint and p.mountpoint == "/": return p return None def partition_containing_boot(self) -> Partition: """Return the partition containing /boot""" for p in self.partitions_with_filesystems(): if p.mountpoint == "/boot": return p # fallback to the root partition return self.partition_containing_root() def find_prep_partition(self) -> Partition: """Find the PReP partition'""" if self.label == "dos": prep_type = "41" elif self.label == "gpt": prep_type = "9E1A2D38-C612-4316-AA26-8B49521E5A8B" for part in self.partitions: if part.type.upper() == prep_type: return part return None def find_bios_boot_partition(self) -> Partition: """Find the BIOS-boot Partition""" bb_type = "21686148-6449-6E6F-744E-656564454649" for part in self.partitions: if part.type.upper() == bb_type: return part return None def write_to(self, target, sync=True): """Write the partition table to disk""" # generate the command for sfdisk to create the table command = f"label: {self.label}\nlabel-id: {self.uuid}" for partition in self.partitions: fields = [] for field in ["start", "size", "type", "name", "uuid"]: value = getattr(partition, field) if value: fields += [f'{field}="{value}"'] if partition.bootable: fields += ["bootable"] command += "\n" + ", ".join(fields) subprocess.run(["sfdisk", "-q", target], input=command, encoding='utf-8', check=True) if sync: self.update_from(target) def update_from(self, target): """Update and fill in missing information from disk""" r = subprocess.run(["sfdisk", "--json", target], stdout=subprocess.PIPE, encoding='utf-8', check=True) disk_table = json.loads(r.stdout)["partitiontable"] disk_parts = disk_table["partitions"] assert len(disk_parts) == len(self.partitions) for i, part in enumerate(self.partitions): part.index = i part.start = disk_parts[i]["start"] part.size = disk_parts[i]["size"] part.type = disk_parts[i].get("type") part.name = disk_parts[i].get("name") def filesystem_from_json(js) -> Filesystem: return Filesystem(js["type"], js["uuid"], js["mountpoint"], js.get("label")) def partition_from_json(js) -> Partition: p = Partition(pttype=js.get("type"), start=js.get("start"), size=js.get("size"), bootable=js.get("bootable"), name=js.get("name"), uuid=js.get("uuid")) fs = js.get("filesystem") if fs: p.filesystem = filesystem_from_json(fs) return p def partition_table_from_options(options) -> PartitionTable: ptuuid = options["ptuuid"] pttype = options.get("pttype", "dos") partitions = options.get("partitions") if pttype == "mbr": pttype = "dos" if partitions is None: # legacy mode, create a correct root_fs_uuid = options["root_fs_uuid"] root_fs_type = options.get("root_fs_type", "ext4") partitions = [{ "bootable": True, "type": "83", "filesystem": { "type": root_fs_type, "uuid": root_fs_uuid, "mountpoint": "/" } }] parts = [partition_from_json(p) for p in partitions] return PartitionTable(pttype, ptuuid, parts) def grub2_write_boot_image(boot_f: BinaryIO, image_f: BinaryIO, core_location: int): """Write the boot image (grub2 stage 1) to the MBR""" # The boot.img file is 512 bytes, but we must only copy the first 440 # bytes, as these contain the bootstrapping code. The rest of the # first sector contains the partition table, and must not be # overwritten. image_f.seek(0) image_f.write(boot_f.read(440)) # Additionally, write the location (in sectors) of # the grub core image, into the boot image, so the # latter can find the former. To exact location is # taken from grub2's "boot.S": # GRUB_BOOT_MACHINE_KERNEL_SECTOR 0x5c (= 92) image_f.seek(0x5c) image_f.write(struct.pack("