#!/usr/bin/python3
import argparse
import contextlib
import errno
import functools
import glob
import mimetypes
import json
import os
import platform
import subprocess
import sys
import tempfile
import xml.etree.ElementTree
from osbuild import loop
def run_ostree(*args, _input=None, _check=True, **kwargs):
args = list(args) + [f'--{k}={v}' for k, v in kwargs.items()]
print("ostree " + " ".join(args), file=sys.stderr)
res = subprocess.run(["ostree"] + args,
encoding="utf-8",
stdout=subprocess.PIPE,
input=_input,
check=_check)
return res
@contextlib.contextmanager
def loop_create_device(ctl, fd, offset=None, sizelimit=None):
while True:
lo = loop.Loop(ctl.get_unbound())
try:
lo.set_fd(fd)
except OSError as e:
lo.close()
if e.errno == errno.EBUSY:
continue
raise e
try:
lo.set_status(offset=offset, sizelimit=sizelimit, autoclear=True)
except BlockingIOError:
lo.clear_fd()
lo.close()
continue
break
try:
yield lo
finally:
lo.close()
@contextlib.contextmanager
def loop_open(ctl, image, *, offset=None, size=None):
with open(image, "rb") as f:
fd = f.fileno()
with loop_create_device(ctl, fd, offset=offset, sizelimit=size) as lo:
yield os.path.join("/dev", lo.devname)
@contextlib.contextmanager
def open_image(ctl, image, fmt):
with tempfile.TemporaryDirectory(dir="/var/tmp") as tmp:
if fmt != "raw":
target = os.path.join(tmp, "image.raw")
# A bug exists in qemu that causes the conversion to raw to fail
# on aarch64 systems with a LOT of CPUs. A workaround is to use
# a single coroutine to do the conversion. It doesn't slow down
# the conversion by much, but it hangs about half the time without
# the limit set. 😢
# Bug: https://bugs.launchpad.net/qemu/+bug/1805256
if platform.machine() == 'aarch64':
subprocess.run(
["qemu-img", "convert", "-m", "1", "-O", "raw", image, target],
check=True
)
else:
subprocess.run(
["qemu-img", "convert", "-O", "raw", image, target],
check=True
)
else:
target = image
size = os.stat(target).st_size
with loop_open(ctl, target, offset=0, size=size) as dev:
yield target, dev
@contextlib.contextmanager
def mount_at(device, mountpoint, options=[], extra=[]):
opts = ",".join(["ro"] + options)
subprocess.run(["mount", "-o", opts] + extra + [device, mountpoint], check=True)
try:
yield mountpoint
finally:
subprocess.run(["umount", "--lazy", mountpoint], check=True)
@contextlib.contextmanager
def mount(device):
with tempfile.TemporaryDirectory() as mountpoint:
subprocess.run(["mount", "-o", "ro", device, mountpoint], check=True)
try:
yield mountpoint
finally:
subprocess.run(["umount", "--lazy", mountpoint], check=True)
def parse_environment_vars(s):
r = {}
for line in s.split("\n"):
line = line.strip()
if not line:
continue
if line[0] == '#':
continue
key, value = line.split("=", 1)
r[key] = value.strip('"')
return r
def parse_unit_files(s, expected_state):
r = []
for line in s.split("\n")[1:]:
try:
unit, state, *_ = line.split()
except ValueError:
pass
if state != expected_state:
continue
r.append(unit)
# deduplicate and sort
r = list(set(r))
r.sort()
return r
def subprocess_check_output(argv, parse_fn=None):
try:
output = subprocess.check_output(argv, encoding="utf-8")
except subprocess.CalledProcessError as e:
sys.stderr.write(f"--- Output from {argv}:\n")
sys.stderr.write(e.stdout)
sys.stderr.write("\n--- End of the output\n")
raise
return parse_fn(output) if parse_fn else output
def read_image_format(device):
qemu = subprocess_check_output(["qemu-img", "info", "--output=json", device], json.loads)
return qemu["format"]
def read_partition(device, partition):
res = subprocess.run(["blkid", "--output", "export", device],
check=False, encoding="utf-8",
stdout=subprocess.PIPE)
if res.returncode == 0:
blkid = parse_environment_vars(res.stdout)
else:
blkid = {}
partition["label"] = blkid.get("LABEL") # doesn't exist for mbr
partition["uuid"] = blkid.get("UUID")
partition["fstype"] = blkid.get("TYPE")
return partition
def read_partition_table(device):
partitions = []
info = {"partition-table": None,
"partition-table-id": None,
"partitions": partitions}
try:
sfdisk = subprocess_check_output(["sfdisk", "--json", device], json.loads)
except subprocess.CalledProcessError:
partitions.append(read_partition(device, False))
return info
ptable = sfdisk["partitiontable"]
assert ptable["unit"] == "sectors"
is_dos = ptable["label"] == "dos"
ssize = ptable.get("sectorsize", 512)
for i, p in enumerate(ptable["partitions"]):
partuuid = p.get("uuid")
if not partuuid and is_dos:
# For dos/mbr partition layouts the partition uuid
# is generated. Normally this would be done by
# udev+blkid, when the partition table is scanned.
# 'sfdisk' prefixes the partition id with '0x' but
# 'blkid' does not; remove it to mimic 'blkid'
table_id = ptable['id'][2:]
partuuid = "%.33s-%02x" % (table_id, i+1)
partitions.append({
"bootable": p.get("bootable", False),
"type": p["type"],
"start": p["start"] * ssize,
"size": p["size"] * ssize,
"partuuid": partuuid
})
info["partition-table"] = ptable["label"]
info["partition-table-id"] = ptable["id"]
return info
def read_bootloader_type(device):
with open(device, "rb") as f:
if b"GRUB" in f.read(512):
return "grub"
else:
return "unknown"
def read_boot_entries(boot_dir):
entries = []
for conf in glob.glob(f"{boot_dir}/loader/entries/*.conf"):
with open(conf) as f:
entries.append(dict(line.strip().split(" ", 1) for line in f))
return sorted(entries, key=lambda e: e["title"])
def rpm_verify(tree):
# cannot use `rpm --root` here, because rpm uses passwd from the host to
# verify user and group ownership:
# https://github.com/rpm-software-management/rpm/issues/882
rpm = subprocess.Popen(["chroot", tree, "rpm", "--verify", "--all"],
stdout=subprocess.PIPE, encoding="utf-8")
changed = {}
missing = []
for line in rpm.stdout:
# format description in rpm(8), under `--verify`
attrs = line[:9]
if attrs == "missing ":
missing.append(line[12:].rstrip())
else:
changed[line[13:].rstrip()] = attrs
# ignore return value, because it returns non-zero when it found changes
rpm.wait()
return {
"missing": sorted(missing),
"changed": changed
}
def rpm_packages(tree, is_ostree):
cmd = ["rpm", "--root", tree, "-qa"]
if is_ostree:
cmd += ["--dbpath", "/usr/share/rpm"]
pkgs = subprocess_check_output(cmd, str.split)
return list(sorted(pkgs))
def read_services(tree, state):
return subprocess_check_output(["systemctl", f"--root={tree}", "list-unit-files"], (lambda s: parse_unit_files(s, state)))
def read_firewall_zone(tree):
try:
with open(f"{tree}/etc/firewalld/firewalld.conf") as f:
conf = parse_environment_vars(f.read())
default = conf["DefaultZone"]
except FileNotFoundError:
default = "public"
r = []
try:
root = xml.etree.ElementTree.parse(f"{tree}/etc/firewalld/zones/{default}.xml").getroot()
except FileNotFoundError:
root = xml.etree.ElementTree.parse(f"{tree}/usr/lib/firewalld/zones/{default}.xml").getroot()
for element in root.findall("service"):
r.append(element.get("name"))
return r
def read_fstab(tree):
result = []
with contextlib.suppress(FileNotFoundError):
with open(f"{tree}/etc/fstab") as f:
result = sorted([line.split() for line in f if line and not line.startswith("#")])
return result
def append_filesystem(report, tree, *, is_ostree=False):
if os.path.exists(f"{tree}/etc/os-release"):
report["packages"] = rpm_packages(tree, is_ostree)
if not is_ostree:
report["rpm-verify"] = rpm_verify(tree)
with open(f"{tree}/etc/os-release") as f:
report["os-release"] = parse_environment_vars(f.read())
report["services-enabled"] = read_services(tree, "enabled")
report["services-disabled"] = read_services(tree, "disabled")
with contextlib.suppress(FileNotFoundError):
with open(f"{tree}/etc/hostname") as f:
report["hostname"] = f.read().strip()
with contextlib.suppress(FileNotFoundError):
report["timezone"] = os.path.basename(os.readlink(f"{tree}/etc/localtime"))
with contextlib.suppress(FileNotFoundError):
report["firewall-enabled"] = read_firewall_zone(tree)
fstab = read_fstab(tree)
if fstab:
report["fstab"] = fstab
with open(f"{tree}/etc/passwd") as f:
report["passwd"] = sorted(f.read().strip().split("\n"))
with open(f"{tree}/etc/group") as f:
report["groups"] = sorted(f.read().strip().split("\n"))
if is_ostree:
with open(f"{tree}/usr/lib/passwd") as f:
report["passwd-system"] = sorted(f.read().strip().split("\n"))
with open(f"{tree}/usr/lib/group") as f:
report["groups-system"] = sorted(f.read().strip().split("\n"))
if os.path.exists(f"{tree}/boot") and len(os.listdir(f"{tree}/boot")) > 0:
assert "bootmenu" not in report
with contextlib.suppress(FileNotFoundError):
with open(f"{tree}/boot/grub2/grubenv") as f:
report["boot-environment"] = parse_environment_vars(f.read())
report["bootmenu"] = read_boot_entries(f"{tree}/boot")
elif len(glob.glob(f"{tree}/vmlinuz-*")) > 0:
assert "bootmenu" not in report
with open(f"{tree}/grub2/grubenv") as f:
report["boot-environment"] = parse_environment_vars(f.read())
report["bootmenu"] = read_boot_entries(tree)
elif len(glob.glob(f"{tree}/EFI")):
print("EFI partition", file=sys.stderr)
def partition_is_esp(partition):
return partition["type"] == "C12A7328-F81F-11D2-BA4B-00A0C93EC93B"
def find_esp(partitions):
for i, p in enumerate(partitions):
if partition_is_esp(p):
return p, i
return None, 0
def append_partitions(report, device, loctl):
partitions = report["partitions"]
esp, esp_id = find_esp(partitions)
with contextlib.ExitStack() as cm:
devices = {}
for n, part in enumerate(partitions):
start, size = part["start"], part["size"]
dev = cm.enter_context(loop_open(loctl, device, offset=start, size=size))
devices[n] = dev
read_partition(dev, part)
for n, part in enumerate(partitions):
if not part["fstype"]:
continue
with mount(devices[n]) as tree:
if esp and os.path.exists(f"{tree}/boot/efi"):
with mount_at(devices[esp_id], f"{tree}/boot/efi", options=['umask=077']):
append_filesystem(report, tree)
else:
append_filesystem(report, tree)
def analyse_image(image):
loctl = loop.LoopControl()
imgfmt = read_image_format(image)
report = {"image-format": imgfmt}
with open_image(loctl, image, imgfmt) as (_, device):
report["bootloader"] = read_bootloader_type(device)
report.update(read_partition_table(device))
if report["partition-table"]:
append_partitions(report, device, loctl)
else:
with mount(device) as tree:
append_filesystem(report, tree)
return report
def append_directory(report, tree):
if os.path.lexists(f"{tree}/ostree"):
os.makedirs(f"{tree}/etc", exist_ok=True)
with mount_at(f"{tree}/usr/etc", f"{tree}/etc", extra=["--bind"]):
append_filesystem(report, tree, is_ostree=True)
else:
append_filesystem(report, tree)
def append_ostree_repo(report, repo):
ostree = functools.partial(run_ostree, repo=repo)
r = ostree("config", "get", "core.mode")
report["ostree"] = {
"repo": {
"core.mode": r.stdout.strip()
}
}
r = ostree("refs")
refs = r.stdout.strip().split("\n")
report["ostree"]["refs"] = refs
resolved = {r: ostree("rev-parse", r).stdout.strip() for r in refs}
commit = resolved[refs[0]]
refs = {r: {"inputhash": ostree("show", "--print-metadata-key=rpmostree.inputhash", resolved[r]).stdout.strip("'\n")} for r in refs}
report["ostree"]["refs"] = refs
with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir:
tree = os.path.join(tmpdir, "tree")
ostree("checkout", "--force-copy", commit, tree)
append_directory(report, tree)
def analyse_directory(path):
report = {}
if os.path.exists(os.path.join(path, "compose.json")):
report["type"] = "ostree/commit"
repo = os.path.join(path, "repo")
append_ostree_repo(report, repo)
elif os.path.isdir(os.path.join(path, "refs")):
report["type"] = "ostree/repo"
append_ostree_repo(report, repo)
else:
append_directory(report, path)
return report
def is_tarball(path):
mtype, encoding = mimetypes.guess_type(path)
return mtype == "application/x-tar"
def analyse_tarball(path):
with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir:
tree = os.path.join(tmpdir, "root")
os.makedirs(tree)
command = [
"tar",
"-x",
"--auto-compress",
"-f", path,
"-C", tree
]
subprocess.run(command,
stdout=sys.stderr,
check=True)
return analyse_directory(tree)
def is_compressed(path):
_, encoding = mimetypes.guess_type(path)
return encoding in ["xz", "gzip", "bzip2"]
def analyse_compressed(path):
_, encoding = mimetypes.guess_type(path)
if encoding == "xz":
command = ["unxz", "--force"]
elif encoding == "gzip":
command = ["gunzip", "--force"]
elif encoding == "bzip2":
command = ["bunzip2", "--force"]
else:
raise ValueError(f"Unsupported compression: {encoding}")
with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir:
subprocess.run(["cp", "--reflink=auto", "-a", path, tmpdir],
check=True)
files = os.listdir(tmpdir)
archive = os.path.join(tmpdir, files[0])
subprocess.run(command + [archive], check=True)
files = os.listdir(tmpdir)
assert len(files) == 1
image = os.path.join(tmpdir, files[0])
return analyse_image(image)
def main():
parser = argparse.ArgumentParser(description="Inspect an image")
parser.add_argument("target", metavar="TARGET",
help="The file or directory to analyse",
type=os.path.abspath)
args = parser.parse_args()
target = args.target
if os.path.isdir(target):
report = analyse_directory(target)
elif is_tarball(target):
report = analyse_tarball(target)
elif is_compressed(target):
report = analyse_compressed(target)
else:
report = analyse_image(target)
json.dump(report, sys.stdout, sort_keys=True, indent=2)
if __name__ == "__main__":
main()