Blob Blame History Raw
#!/usr/bin/python3
"""
Lorax related utilities: Template parsing and execution

This module contains a re-implementation of the Lorax
template engine, but for osbuild. Not all commands in
the original scripting language are support, but all
needed to run the post install and cleanup scripts.
"""

import contextlib
import glob
import os
import re
import shlex
import shutil
import subprocess

import mako.template


def replace(target, patterns):
    finder = [(re.compile(p), s) for p, s in patterns]
    newfile = target + ".replace"

    with open(target, "r") as i, open(newfile, "w") as o:
        for line in i:
            for p, s in finder:
                line = p.sub(s, line)
            o.write(line)
    os.rename(newfile, target)


def rglob(pathname, *, fatal=False):
    seen = set()
    for f in glob.iglob(pathname):
        if f not in seen:
            seen.add(f)
            yield f
    if fatal and not seen:
        raise IOError(f"nothing matching {pathname}")


class Script:

    # all built-in commands in a name to method map
    commands = {}

    # helper decorator to register builtin methods
    class command:
        def __init__(self, fn):
            self.fn = fn

        def __set_name__(self, owner, name):
            bultins = getattr(owner, "commands")
            bultins[name] = self.fn
            setattr(owner, name, self.fn)

    # Script class starts here
    def __init__(self, script, build, tree):
        self.script = script
        self.tree = tree
        self.build = build

    def __call__(self):
        for i, line in enumerate(self.script):
            cmd, args = line[0], line[1:]
            ignore_error = False
            if cmd.startswith("-"):
                cmd = cmd[1:]
                ignore_error = True

            method = self.commands.get(cmd)

            if not method:
                raise ValueError(f"Unknown command: '{cmd}'")

            try:
                method(self, *args)
            except Exception:
                if ignore_error:
                    continue
                print(f"Error on line: {i} " + str(line))
                raise

    def tree_path(self, target):
        dest = os.path.join(self.tree, target.lstrip("/"))
        return dest

    @command
    def append(self, filename, data):
        target = self.tree_path(filename)
        dirname = os.path.dirname(target)
        os.makedirs(dirname, exist_ok=True)
        print(f"append '{target}' '{data}'")
        with open(target, "a", encoding="utf-8") as f:
            f.write(bytes(data, "utf8").decode("unicode_escape"))
            f.write("\n")

    @command
    def mkdir(self, *dirs):
        for d in dirs:
            print(f"mkdir '{d}'")
            os.makedirs(self.tree_path(d), exist_ok=True)

    @command
    def move(self, src, dst):
        src = self.tree_path(src)
        dst = self.tree_path(dst)

        if os.path.isdir(dst):
            dst = os.path.join(dst, os.path.basename(src))

        print(f"move '{src}' -> '{dst}'")
        os.rename(src, dst)

    @command
    def install(self, src, dst):
        dst = self.tree_path(dst)
        for s in rglob(os.path.join(self.build, src.lstrip("/")), fatal=True):
            with contextlib.suppress(shutil.Error):
                print(f"install {s} -> {dst}")
                shutil.copy2(os.path.join(self.build, s), dst)

    @command
    def remove(self, *files):
        for g in files:
            for f in rglob(self.tree_path(g)):
                if os.path.isdir(f) and not os.path.islink(f):
                    shutil.rmtree(f)
                else:
                    os.unlink(f)
                print(f"remove '{f}'")

    @command
    def replace(self, pat, repl, *files):
        found = False
        for g in files:
            for f in rglob(self.tree_path(g)):
                found = True
                print(f"replace {f}: {pat} -> {repl}")
                replace(f, [(pat, repl)])

        if not found:
            assert found, f"No match for {pat} in {' '.join(files)}"

    @command
    def runcmd(self, *args):
        print("run ", " ".join(args))
        subprocess.run(args, cwd=self.tree, check=True)

    @command
    def symlink(self, source, dest):
        target = self.tree_path(dest)
        if os.path.exists(target):
            self.remove(dest)
        print(f"symlink '{source}' -> '{target}'")
        os.symlink(source, target)

    @command
    def systemctl(self, verb, *units):
        assert verb in ('enable', 'disable', 'mask')
        self.mkdir("/run/systemd/system")
        cmd = ['systemctl', '--root', self.tree, '--no-reload', verb]

        for unit in units:
            with contextlib.suppress(subprocess.CalledProcessError):
                args = cmd + [unit]
                self.runcmd(*args)


def brace_expand(s):
    if not ('{' in s and ',' in s and '}' in s):
        return [s]

    result = []
    right = s.find('}')
    left = s[:right].rfind('{')
    prefix, choices, suffix = s[:left], s[left+1:right], s[right+1:]
    for choice in choices.split(','):
        result.extend(brace_expand(prefix+choice+suffix))

    return result


def brace_expand_line(line):
    return [after for before in line for after in brace_expand(before)]


def render_template(path, args):
    """Render a template at `path` with arguments `args`"""

    with open(path, "r") as f:
        data = f.read()

    tlp = mako.template.Template(text=data, filename=path)
    txt = tlp.render(**args)

    lines = map(lambda l: l.strip(), txt.splitlines())
    lines = filter(lambda l: l and not l.startswith("#"), lines)
    commands = map(shlex.split, lines)
    commands = map(brace_expand_line, commands)

    result = list(commands)
    return result