#!/usr/bin/python3
"""
Assemble a bootable partitioned disk image with qemu-img
Assemble a bootable partitioned disk image using `qemu-img`.
Creates a sparse partitioned disk image of type `pttype` of a given `size`,
with a partition table according to `partitions` or a MBR partitioned disk
having a single bootable partition containing the root filesystem if the
`pttype` property is absent.
If the partition type is MBR it installs GRUB2 (using the buildhost's
`/usr/lib/grub/i386-pc/boot.img` etc.) as the bootloader.
Copies the tree contents into the root filesystem and then converts the raw
sparse image into the format requested with the `fmt` option.
Buildhost commands used: `truncate`, `mount`, `umount`, `sfdisk`,
`grub2-mkimage`, `mkfs.ext4` or `mkfs.xfs`, `qemu-img`.
"""
import contextlib
import json
import os
import shutil
import struct
import subprocess
import sys
import tempfile
from typing import List, BinaryIO
import osbuild.remoteloop as remoteloop
SCHEMA = """
"additionalProperties": false,
"required": ["format", "filename", "ptuuid", "size"],
"oneOf": [{
"required": ["root_fs_uuid"]
},{
"required": ["pttype", "partitions"]
}],
"properties": {
"bootloader": {
"description": "Options specific to the bootloader",
"type": "object",
"properties": {
"type": {
"description": "What bootloader to install",
"type": "string",
"enum": ["grub2", "zipl"]
}
}
},
"format": {
"description": "Image file format to use",
"type": "string",
"enum": ["raw", "raw.xz", "qcow2", "vdi", "vmdk", "vpc", "vhdx"]
},
"filename": {
"description": "Image filename",
"type": "string"
},
"partitions": {
"description": "Partition layout ",
"type": "array",
"items": {
"description": "Description of one partition",
"type": "object",
"properties": {
"bootable": {
"description": "Mark the partition as bootable (dos)",
"type": "boolean"
},
"name": {
"description": "The partition name (GPT)",
"type": "string"
},
"size": {
"description": "The size of this partition",
"type": "integer"
},
"start": {
"description": "The start offset of this partition",
"type": "integer"
},
"type": {
"description": "The partition type (UUID or identifier)",
"type": "string"
},
"uuid": {
"description": "UUID of the partition (GPT)",
"type": "string"
},
"filesystem": {
"description": "Description of the filesystem",
"type": "object",
"required": ["mountpoint", "type", "uuid"],
"properties": {
"label": {
"description": "Label for the filesystem",
"type": "string"
},
"mountpoint": {
"description": "Where to mount the partition",
"type": "string"
},
"type": {
"description": "Type of the filesystem",
"type": "string",
"enum": ["ext4", "xfs", "vfat"]
},
"uuid": {
"description": "UUID for the filesystem",
"type": "string"
}
}
}
}
}
},
"ptuuid": {
"description": "UUID for the disk image's partition table",
"type": "string"
},
"pttype": {
"description": "The type of the partition table",
"type": "string",
"enum": ["mbr", "dos", "gpt"]
},
"root_fs_uuid": {
"description": "UUID for the root filesystem",
"type": "string"
},
"size": {
"description": "Virtual disk size",
"type": "integer"
},
"root_fs_type": {
"description": "Type of the root filesystem",
"type": "string",
"enum": ["ext4", "xfs"],
"default": "ext4"
}
}
"""
@contextlib.contextmanager
def mount(source, dest):
subprocess.run(["mount", source, dest], check=True)
try:
yield dest
finally:
subprocess.run(["umount", "-R", dest], check=True)
def mkfs_ext4(device, uuid, label):
opts = []
if label:
opts = ["-L", label]
subprocess.run(["mkfs.ext4", "-U", uuid] + opts + [device],
input="y", encoding='utf-8', check=True)
def mkfs_xfs(device, uuid, label):
opts = []
if label:
opts = ["-L", label]
subprocess.run(["mkfs.xfs", "-m", f"uuid={uuid}"] + opts + [device],
encoding='utf-8', check=True)
def mkfs_vfat(device, uuid, label):
volid = uuid.replace('-', '')
opts = []
if label:
opts = ["-n", label]
subprocess.run(["mkfs.vfat", "-i", volid] + opts + [device], encoding='utf-8', check=True)
class Filesystem:
def __init__(self,
fstype: str,
uuid: str,
mountpoint: str,
label: str = None):
self.type = fstype
self.uuid = uuid
self.mountpoint = mountpoint
self.label = label
def make_at(self, device: str):
fs_type = self.type
if fs_type == "ext4":
maker = mkfs_ext4
elif fs_type == "xfs":
maker = mkfs_xfs
elif fs_type == "vfat":
maker = mkfs_vfat
else:
raise ValueError(f"Unknown filesystem type '{fs_type}'")
maker(device, self.uuid, self.label)
# pylint: disable=too-many-instance-attributes
class Partition:
def __init__(self,
pttype: str = None,
start: int = None,
size: int = None,
bootable: bool = False,
name: str = None,
uuid: str = None,
filesystem: Filesystem = None):
self.type = pttype
self.start = start
self.size = size
self.bootable = bootable
self.name = name
self.uuid = uuid
self.filesystem = filesystem
self.index = None
@property
def start_in_bytes(self):
return (self.start or 0) * 512
@property
def size_in_bytes(self):
return (self.size or 0) * 512
@property
def mountpoint(self):
if self.filesystem is None:
return None
return self.filesystem.mountpoint
@property
def fs_type(self):
if self.filesystem is None:
return None
return self.filesystem.type
@property
def fs_uuid(self):
if self.filesystem is None:
return None
return self.filesystem.uuid
class PartitionTable:
def __init__(self, label, uuid, partitions):
self.label = label
self.uuid = uuid
self.partitions = partitions or []
def __getitem__(self, key) -> Partition:
return self.partitions[key]
def partitions_with_filesystems(self) -> List[Partition]:
"""Return partitions with filesystems sorted by hierarchy"""
def mountpoint_len(p):
return len(p.mountpoint)
parts_fs = filter(lambda p: p.filesystem is not None, self.partitions)
return sorted(parts_fs, key=mountpoint_len)
def partition_containing_root(self) -> Partition:
"""Return the partition containing the root filesystem"""
for p in self.partitions:
if p.mountpoint and p.mountpoint == "/":
return p
return None
def partition_containing_boot(self) -> Partition:
"""Return the partition containing /boot"""
for p in self.partitions_with_filesystems():
if p.mountpoint == "/boot":
return p
# fallback to the root partition
return self.partition_containing_root()
def find_prep_partition(self) -> Partition:
"""Find the PReP partition'"""
if self.label == "dos":
prep_type = "41"
elif self.label == "gpt":
prep_type = "9E1A2D38-C612-4316-AA26-8B49521E5A8B"
for part in self.partitions:
if part.type.upper() == prep_type:
return part
return None
def find_bios_boot_partition(self) -> Partition:
"""Find the BIOS-boot Partition"""
bb_type = "21686148-6449-6E6F-744E-656564454649"
for part in self.partitions:
if part.type.upper() == bb_type:
return part
return None
def write_to(self, target, sync=True):
"""Write the partition table to disk"""
# generate the command for sfdisk to create the table
command = f"label: {self.label}\nlabel-id: {self.uuid}"
for partition in self.partitions:
fields = []
for field in ["start", "size", "type", "name", "uuid"]:
value = getattr(partition, field)
if value:
fields += [f'{field}="{value}"']
if partition.bootable:
fields += ["bootable"]
command += "\n" + ", ".join(fields)
subprocess.run(["sfdisk", "-q", target],
input=command,
encoding='utf-8',
check=True)
if sync:
self.update_from(target)
def update_from(self, target):
"""Update and fill in missing information from disk"""
r = subprocess.run(["sfdisk", "--json", target],
stdout=subprocess.PIPE,
encoding='utf-8',
check=True)
disk_table = json.loads(r.stdout)["partitiontable"]
disk_parts = disk_table["partitions"]
assert len(disk_parts) == len(self.partitions)
for i, part in enumerate(self.partitions):
part.index = i
part.start = disk_parts[i]["start"]
part.size = disk_parts[i]["size"]
part.type = disk_parts[i].get("type")
part.name = disk_parts[i].get("name")
def filesystem_from_json(js) -> Filesystem:
return Filesystem(js["type"], js["uuid"], js["mountpoint"], js.get("label"))
def partition_from_json(js) -> Partition:
p = Partition(pttype=js.get("type"),
start=js.get("start"),
size=js.get("size"),
bootable=js.get("bootable"),
name=js.get("name"),
uuid=js.get("uuid"))
fs = js.get("filesystem")
if fs:
p.filesystem = filesystem_from_json(fs)
return p
def partition_table_from_options(options) -> PartitionTable:
ptuuid = options["ptuuid"]
pttype = options.get("pttype", "dos")
partitions = options.get("partitions")
if pttype == "mbr":
pttype = "dos"
if partitions is None:
# legacy mode, create a correct
root_fs_uuid = options["root_fs_uuid"]
root_fs_type = options.get("root_fs_type", "ext4")
partitions = [{
"bootable": True,
"type": "83",
"filesystem": {
"type": root_fs_type,
"uuid": root_fs_uuid,
"mountpoint": "/"
}
}]
parts = [partition_from_json(p) for p in partitions]
return PartitionTable(pttype, ptuuid, parts)
def grub2_write_boot_image(boot_f: BinaryIO,
image_f: BinaryIO,
core_location: int):
"""Write the boot image (grub2 stage 1) to the MBR"""
# The boot.img file is 512 bytes, but we must only copy the first 440
# bytes, as these contain the bootstrapping code. The rest of the
# first sector contains the partition table, and must not be
# overwritten.
image_f.seek(0)
image_f.write(boot_f.read(440))
# Additionally, write the location (in sectors) of
# the grub core image, into the boot image, so the
# latter can find the former. To exact location is
# taken from grub2's "boot.S":
# GRUB_BOOT_MACHINE_KERNEL_SECTOR 0x5c (= 92)
image_f.seek(0x5c)
image_f.write(struct.pack("<Q", core_location))
def grub2_write_core_mbrgap(core_f: BinaryIO,
image_f: BinaryIO,
pt: PartitionTable):
"""Write the core into the MBR gap"""
# For historic and performance reasons the first partition
# is aligned to a specific sector number (used to be 64,
# now it is 2048), which leaves a gap between it and the MBR,
# where the core image can be embedded in; also check it fits
core_size = os.fstat(core_f.fileno()).st_size
partition_offset = pt[0].start_in_bytes
assert core_size < partition_offset - 512
image_f.seek(512)
shutil.copyfileobj(core_f, image_f)
return 1 # the location of the core image in sectors
def grub2_write_core_prep_part(core_f: BinaryIO,
image_f: BinaryIO,
pt: PartitionTable):
"""Write the core to the prep partition"""
# On ppc64le with Open Firmware a special partition called
# 'PrEP partition' is used the store the grub2 core; the
# firmware looks for this partition and directly loads and
# executes the core form it.
prep_part = pt.find_prep_partition()
if prep_part is None:
raise ValueError("PrEP partition missing")
core_size = os.fstat(core_f.fileno()).st_size
assert core_size < prep_part.size_in_bytes - 512
image_f.seek(prep_part.start_in_bytes)
shutil.copyfileobj(core_f, image_f)
return prep_part.start
def grub2_write_core_bios_boot(core_f: BinaryIO,
image_f: BinaryIO,
pt: PartitionTable):
"""Write the core to the bios boot partition"""
bb = pt.find_bios_boot_partition()
if bb is None:
raise ValueError("BIOS-boot partition missing")
core_size = os.fstat(core_f.fileno()).st_size
if bb.size_in_bytes < core_size:
raise ValueError("BIOS-boot partition too small")
image_f.seek(bb.start_in_bytes)
shutil.copyfileobj(core_f, image_f)
# The core image needs to know from where to load its
# second sector so that information needs to be embedded
# into the image itself at the right location, i.e.
# the "sector start parameter" ("size .long 2, 0"):
# 0x200 - GRUB_BOOT_MACHINE_LIST_SIZE (12) = 0x1F4 = 500
image_f.seek(bb.start_in_bytes + 500)
image_f.write(struct.pack("<Q", bb.start + 1))
return bb.start
def grub2_partition_id(pt: PartitionTable):
"""grub2 partition identifier for the partition table"""
label2grub = {
"dos": "msdos",
"gpt": "gpt"
}
if pt.label not in label2grub:
raise ValueError(f"Unknown partition type: {pt.label}")
return label2grub[pt.label]
def install_grub2(image: str, pt: PartitionTable, options):
"""Install grub2 to image"""
platform = options.get("platform", "i386-pc")
boot_path = f"/usr/lib/grub/{platform}/boot.img"
core_path = "/var/tmp/grub2-core.img"
# Create the level-2 & 3 stages of the bootloader, aka the core
# it consists of the kernel plus the core modules required to
# to locate and load the rest of the grub modules, specifically
# the "normal.mod" (Stage 4) module.
# The exact list of modules required to be built into the core
# depends on the system: it is the minimal set needed to find
# read the partition and its filesystem containing said modules
# and the grub configuration [NB: efi systems work differently]
# find the partition containing /boot/grub2
boot_part = pt.partition_containing_boot()
# modules: access the disk and read the partition table:
# on x86 'biosdisk' is used to access the disk, on ppc64le
# with "Open Firmware" the latter is directly loading core
if platform == "i386-pc":
modules = ["biosdisk"]
else:
modules = []
if pt.label == "dos":
modules += ["part_msdos"]
elif pt.label == "gpt":
modules += ["part_gpt"]
# modules: grubs needs to access the filesystems of /boot/grub2
fs_type = boot_part.fs_type or "unknown"
if fs_type == "ext4":
modules += ["ext2"]
elif fs_type == "xfs":
modules += ["xfs"]
else:
raise ValueError(f"unknown boot filesystem type: '{fs_type}'")
# identify the partition containing boot for grub2
partid = grub2_partition_id(pt) + str(boot_part.index + 1)
print(f"grub2 prefix {partid}")
# the path containing the grub files relative partition
grub_path = os.path.relpath("/boot/grub2", boot_part.mountpoint)
# now created the core image
subprocess.run(["grub2-mkimage",
"--verbose",
"--directory", f"/usr/lib/grub/{platform}",
"--prefix", f"(,{partid})/{grub_path}",
"--format", platform,
"--compression", "auto",
"--output", core_path] +
modules,
check=True)
with open(image, "rb+") as image_f:
# Write the newly created grub2 core to the image
with open(core_path, "rb") as core_f:
if platform == "powerpc-ieee1275":
# write the core to the PrEP partition
core_loc = grub2_write_core_prep_part(core_f, image_f, pt)
elif pt.label == "gpt":
# gpt requires a bios-boot partition
core_loc = grub2_write_core_bios_boot(core_f, image_f, pt)
else:
# embed the core in the MBR gap
core_loc = grub2_write_core_mbrgap(core_f, image_f, pt)
# On certain platforms (x86) a level 1 boot loader is required
# to load to the core image (on ppc64le & Open Firmware this is
# done by the firmware itself)
if platform == "i386-pc":
# On x86, the boot image just jumps to core image
with open(boot_path, "rb") as boot_f:
grub2_write_boot_image(boot_f, image_f, core_loc)
def parse_blsfile(blsfile):
params = {}
with open(blsfile, "r") as bls:
for line in bls:
key, value = line.split(' ', 1)
params[key] = value.strip()
return params
def find_kernel(root):
base = f"{root}/boot/loader/entries"
for dirent in os.scandir(base):
fn, ext = os.path.splitext(dirent.name)
if ext != ".conf" or fn.endswith("rescue"):
continue
blsfile = f"{base}/{dirent.name}"
params = parse_blsfile(blsfile)
linux = root + params["linux"]
initrd = root + params["initrd"]
options = params.get("options", "")
return linux, initrd, options
def install_zipl(root: str, device: str, pt: PartitionTable):
"""Install the bootloader on s390x via zipl"""
kernel, initrd, kopts = find_kernel(root)
part_with_boot = pt.partition_containing_boot()
subprocess.run(["/usr/sbin/zipl",
"--verbose",
"--target", f"{root}/boot",
"--image", kernel,
"--ramdisk", initrd,
"--parameters", kopts,
"--targetbase", device,
"--targettype", "SCSI",
"--targetblocksize", "512",
"--targetoffset", str(part_with_boot.start)],
check=True)
def main(tree, output_dir, options, loop_client):
fmt = options["format"]
filename = options["filename"]
size = options["size"]
bootloader = options.get("bootloader", {"type": "none"})
# sfdisk works on sectors of 512 bytes and ignores excess space - be explicit about this
if size % 512 != 0:
raise ValueError("`size` must be a multiple of sector size (512)")
if fmt not in ["raw", "raw.xz", "qcow2", "vdi", "vmdk", "vpc", "vhdx"]:
raise ValueError("`format` must be one of raw, qcow, vdi, vmdk, vpc, vhdx")
image = "/var/tmp/osbuild-image.raw"
# Create an empty image file
subprocess.run(["truncate", "--size", str(size), image], check=True)
# The partition table
pt = partition_table_from_options(options)
pt.write_to(image)
# For backwards comparability assume that if bootloader is not
# set and partition scheme is dos (MBR) grub2 is being used
if bootloader["type"] == "none" and pt.label == "dos":
bootloader["type"] = "grub2"
# Install the bootloader
if bootloader["type"] == "grub2":
install_grub2(image, pt, bootloader)
# Now assemble the filesystem hierarchy and copy the tree into the image
with contextlib.ExitStack() as cm:
root = cm.enter_context(tempfile.TemporaryDirectory(dir=output_dir, prefix="osbuild-mnt-"))
disk = cm.enter_context(loop_client.device(image, 0, size))
# iterate the partition according to their position in the filesystem tree
for partition in pt.partitions_with_filesystems():
offset, size = partition.start_in_bytes, partition.size_in_bytes
loop = cm.enter_context(loop_client.device(image, offset, size))
# make the specified filesystem, if any
if partition.filesystem is None:
continue
partition.filesystem.make_at(loop)
# now mount it
mountpoint = os.path.normpath(f"{root}/{partition.mountpoint}")
os.makedirs(mountpoint, exist_ok=True)
cm.enter_context(mount(loop, mountpoint))
# the filesystem tree should now be properly setup,
# copy the tree into the target image
subprocess.run(["cp", "-a", f"{tree}/.", root], check=True)
# zipl needs access to the /boot directory and the whole image
# via the loopback device node
if bootloader["type"] == "zipl":
install_zipl(root, disk, pt)
if fmt == "raw":
subprocess.run(["cp", image, f"{output_dir}/{filename}"], check=True)
elif fmt == "raw.xz":
with open(f"{output_dir}/{filename}", "w") as f:
subprocess.run(
["xz", "--keep", "--stdout", "-0", image],
stdout=f,
check=True,
env={
"XZ_OPT": "--threads 0"
}
)
else:
extra_args = {
"qcow2": ["-c"],
"vdi": [],
"vmdk": ["-c"],
"vpc": ["-o", "subformat=fixed,force_size"],
"vhdx": []
}
subprocess.run([
"qemu-img",
"convert",
"-O", fmt,
*extra_args[fmt],
image,
f"{output_dir}/{filename}"
], check=True)
if __name__ == '__main__':
args = json.load(sys.stdin)
ret = main(args["tree"], args["output_dir"], args["options"], remoteloop.LoopClient("/run/osbuild/api/remoteloop"))
sys.exit(ret)