Blob Blame History Raw
#! /usr/bin/python
# SPDX-License-Identifier: CC0-1.0

import argparse
import errno
import io
import itertools
import os
import re
import shutil
import struct
import sys
import tempfile
from subprocess import PIPE, Popen, STDOUT

# Python 3 shims
    from functools import reduce
    from itertools import zip_longest as izip_longest
    from itertools import izip_longest

# revs:
# [ { "path", "cpuid", "pf", "rev", "date" } ]

# artifacts:
#  * content summary (per-file)
#   * overlay summary (per-fms/pf)
#  * changelog (per-file?)
#  * discrepancies (per-fms/pf)

log_level = 0
print_date = False

def log_status(msg, level=0):
    global log_level

    if log_level >= level:
        sys.stderr.write(msg + "\n")

def log_info(msg, level=2):
    global log_level

    if log_level >= level:
        sys.stderr.write("INFO: " + msg + "\n")

def log_warn(msg, level=1):
    global log_level

    if log_level >= level:
        sys.stderr.write("WARNING: " + msg + "\n")

def log_error(msg, level=-1):
    global log_level

    if log_level >= level:
        sys.stderr.write("ERROR: " + msg + "\n")

def remove_prefix(text, prefix):
    if isinstance(prefix, str):
        prefix = [prefix, ]

    for p in prefix:
        pfx = p if p.endswith(os.sep) else p + os.sep
        if text.startswith(pfx):
            return text[len(pfx):]

    return text

def file_walk(args, yield_dirs=False):
    for content in args:
        if os.path.isdir(content):
            if yield_dirs:
                yield ("", content)
            for root, dirs, files in os.walk(content):
                if yield_dirs:
                    for f in dirs:
                        p = os.path.join(root, f)
                        yield (remove_prefix(p, content), p)
                for f in files:
                    p = os.path.join(root, f)
                    yield (remove_prefix(p, content), p)
        elif os.path.exists(content):
            yield ("", content)
            raise IOError(errno.ENOENT, os.strerror(errno.ENOENT), content)

def cpuid_fname(c):
    return "%02x-%02x-%02x" % (
        ((c >> 16) & 0xff0) + ((c >> 8) & 0xf),
        ((c >> 12) & 0xf0) + ((c >> 4) & 0xf),
        c & 0xf)

def read_revs_dir(path, src=None, ret=None):
    if ret is None:
        ret = []

    ucode_re = re.compile('[0-9a-f]{2}-[0-9a-f]{2}-0[0-9a-f]$')
    ucode_dat_re = re.compile('microcode.*\.dat$')

    for rp, ap in file_walk([path, ]):
        rp_fname = os.path.basename(rp)
        if not ucode_re.match(rp_fname) and not ucode_dat_re.match(rp_fname):

        # Text-based format
        data = None
        if ucode_dat_re.match(rp_fname):
            data = io.BytesIO()
            with open(ap, "r") as f:
                for line in f:
                    if line.startswith("/"):
                    vals = line.split(",")
                    for val in vals:
                        val = val.strip()
                        if not val:
                        data.write(struct.pack("<I", int(val, 16)))
            sz =, os.SEEK_CUR)
  , os.SEEK_SET)
            sz = os.stat(ap).st_size

            with data or open(ap, "rb") as f:
                log_info("Processing %s" % ap)
                offs = 0
                while offs < sz:
          , os.SEEK_SET)
                    hdr = struct.unpack("IiIIIIIIIIII",
                    ret.append({"path": rp, "src": src or path,
                                "cpuid": hdr[3], "pf": hdr[6], "rev": hdr[1],
                                "date": hdr[2], "offs": offs, "cksum": hdr[4],
                                "data_size": hdr[7], "total_size": hdr[8]})

                    if hdr[8] and hdr[8] - hdr[7] > 48:
              [7], os.SEEK_CUR)
                        ext_tbl = struct.unpack("IIIII",
                        log_status("Found %u extended signatures for %s:%#x" %
                                   (ext_tbl[0], rp, offs), level=1)

                        cur_offs = offs + hdr[7] + 48 + 20
                        ext_sig_cnt = 0
                        while cur_offs < offs + hdr[8] \
                                and ext_sig_cnt <= ext_tbl[0]:
                            ext_sig = struct.unpack("III",
                            ret.append({"path": rp, "src": src or path,
                                        "cpuid": ext_sig[0], "pf": ext_sig[1],
                                        "rev": hdr[1], "date": hdr[2],
                                        "offs": offs, "ext_offs": cur_offs,
                                        "cksum": hdr[4],
                                        "ext_cksum": ext_sig[2],
                                        "data_size": hdr[7],
                                        "total_size": hdr[8]})
                            log_status(("Got ext sig %#x/%#x for " +
                                        "%s:%#x:%#x/%#x") %
                                       (ext_sig[0], ext_sig[1], rp, offs,
                                        hdr[3], hdr[6]), level=2)

                            cur_offs += 12
                            ext_sig_cnt += 1

                    offs += hdr[8] or 2048
        except Exception as e:
            log_error("a problem occurred while processing %s: %s" % (ap, e),

    return ret

def read_revs_rpm(path, ret=None):
    if ret is None:
        ret = []

    dir_tmp = tempfile.mkdtemp()

    log_status("Trying to extract files from RPM \"%s\"..." % path,

    rpm2cpio = Popen(args=["rpm2cpio", path], stdout=PIPE, stderr=PIPE,
    cpio = Popen(args=["cpio", "-idmv", "*??-??-??", "*microcode*.dat"],
                 cwd=dir_tmp, stdin=rpm2cpio.stdout,
                 stdout=PIPE, stderr=STDOUT)
    out, cpio_stderr = cpio.communicate()
    rpm2cpio_out, rpm2cpio_err = rpm2cpio.communicate()

    rpm2cpio_ret = rpm2cpio.returncode
    cpio_ret = cpio.returncode

    log_info("rpm2cpio exit code: %d, cpio exit code: %d" %
             (rpm2cpio_ret, cpio_ret))
    if rpm2cpio_err:
        log_info("rpm2cpio stderr:\n%s" % rpm2cpio_err, level=3)
    if out:
        log_info("cpio output:\n%s" % out, level=3)
    if cpio_stderr:
        log_info("cpio stderr:\n%s" % cpio_stderr, level=3)

    if rpm2cpio_ret == 0 and cpio_ret == 0:
        ret = read_revs_dir(dir_tmp, path)


    return ret

def read_revs(path, ret=None):
    if ret is None:
        ret = []
    if os.path.isdir(path):
        return read_revs_dir(path, ret)
        return read_revs_rpm(path, ret)

def gen_mc_map(mc_data, merge=False, merge_path=False):
    Converts an array of microcode file information to a map with path/sig/pf
    as a key.

    merge: whether to leave only the newest mc variant in the map or leave all
           possible variants.
    res = dict()

    for mc in mc_data:
        key = (None if merge_path else mc["path"], mc["cpuid"], mc["pf"])

        if key not in res:
            res[key] = dict()

        cpuid = mc["cpuid"]
        cur_pf = mc["pf"]
        pid = 1
        while cur_pf > 0:
            if cur_pf & 1 and not (merge and pid in res[key]
                                   and res[key][pid]["rev"][0] >= mc["rev"]):
                if pid not in res[cpuid] or merge:
                    res[cpuid][pid] = []

            cur_pf = cur_pf / 2
            pid = pid * 2

    return res

def gen_fn_map(mc_data, merge=False, merge_path=False):
    res = dict()

    for mc in mc_data:
        key = (None if merge_path else mc["path"], mc["cpuid"], mc["pf"])
        if key in res:
            log_warn("Duplicate path/cpuid/pf: %s/%#x/%#x" % key)
            res[key] = []
        if merge and len(res[key]):
            if mc["rev"] > res[key][0]["rev"]:
                res[key][0] = mc

    return res

def revcmp(a, b):
    return b["rev"] - a["rev"]

class ChangeLogEntry:
    ADDED = 0
    REMOVED = 1
    UPDATED = 2
    OTHER = 4

def mc_stripped_path(mc):
    paths = ("usr/share/microcode_ctl/ucode_with_caveats/intel",

    return remove_prefix(mc["path"], paths)

class mcnm:

def get_mc_cnames(mc, cmap, mode=mcnm.MCNM_ABBREV):
    if not isinstance(mc, dict):
        mc = mc_from_mc_key(mc)
    sig = mc["cpuid"]
    pf = mc["pf"]
    res = []

    if not cmap:
        return None
    if sig not in cmap:
        log_info("No codename information for sig %#x" % sig)
        return None

    cnames = cmap[sig]

    if mode in (mcnm.MCNM_FAMILIES, mcnm.MCNM_MODELS,
        for c in cnames:
            if not (pf & c["pf_mask"]):
            for m, f in ((mcnm.MCNM_FAMILIES, "families"),
                         (mcnm.MCNM_MODELS, "models")):
                if m & mode == 0:
                if f not in c or not c[f]:
                    log_info("No %s for sig %#x in %r" % (f, sig, c))


        return ", ".join(res) or None

    steppings = dict()
    suffices = dict()
    for c in cnames:
        if pf and not (pf & c["pf_mask"]):

        if mode == mcnm.MCNM_ABBREV and "abbrev" in c and c["abbrev"]:
            cname = c["abbrev"]
            cname = c["codename"]

        if cname not in suffices:
            suffices[cname] = set()
        if "variant" in c and c["variant"]:
            suffices[cname] |= set(c["variant"])

        if cname not in steppings:
            steppings[cname] = set()
        if c["stepping"]:
            steppings[cname] |= set(c["stepping"])

    for cname in sorted(steppings.keys()):
        cname_str = cname
        if len(suffices[cname]):
            cname_str += "-" + "/".join(sorted(suffices[cname]))
        if len(steppings[cname]):
            cname_str += " " + "/".join(sorted(steppings[cname]))

    return ", ".join(res) or None

def mc_from_mc_key(k):
    return dict(zip(("path", "cpuid", "pf"), k))

def mc_path(mc, pf_sfx=True, midword=None, cmap=None):
    if not isinstance(mc, dict):
        mc = mc_from_mc_key(mc)
    path = mc_stripped_path(mc) if mc["path"] is not None else None
    cpuid_fn = cpuid_fname(mc["cpuid"])
    fname = os.path.basename(mc["path"] or cpuid_fn)
    midword = "" if midword is None else " " + midword
    cname = get_mc_cnames(mc, cmap)
    cname_str = " (" + cname + ")" if cname else ""

    if pf_sfx:
        sfx = "/0x%02x" % mc["pf"]
        sfx = ""

    if not path or path == os.path.join("intel-ucode", cpuid_fn):
        return "%s%s%s%s" % (fname, sfx, cname_str, midword)
        return "%s%s%s%s (in %s)" % (cpuid_fn, sfx, cname_str, midword, path)

def gen_changelog_file(old, new):

def mc_cmp(old_mc, new_mc):
    res = []

    old_mc_revs = [x["rev"] for x in old_mc]
    new_mc_revs = [x["rev"] for x in new_mc]
    common = set(old_mc_revs) & set(new_mc_revs)
    old_rev_list = [x for x in sorted(old_mc_revs) if x not in common]
    new_rev_list = [x for x in sorted(new_mc_revs) if x not in common]

    if len(old_rev_list) != 1 or len(new_rev_list) != 1:
        for i in new_mc:
            if i["rev"] in new_rev_list:
                res.append((ChangeLogEntry.ADDED, None, i))
        for i in old_mc:
            if i["rev"] in old_rev_list:
                res.append((ChangeLogEntry.REMOVED, i, None))
        for old in old_mc:
            if old["rev"] == old_rev_list[0]:
        for new in new_mc:
            if new["rev"] == new_rev_list[0]:
        if new["rev"] > old["rev"]:
            res.append((ChangeLogEntry.UPDATED, old, new))
        elif new["rev"] < old["rev"]:
            res.append((ChangeLogEntry.DOWNGRADED, old, new))

    return res

def gen_changelog(old, new):
    res = []

    old_map = gen_fn_map(old)
    new_map = gen_fn_map(new)

    old_files = set(old_map.keys())
    new_files = set(new_map.keys())

    both = old_files & new_files
    added = new_files - old_files
    removed = old_files - new_files

    for f in sorted(added):
        p = mc_path(new_map[f][0])
        for old_f in sorted(removed):
            old_p = mc_path(old_map[old_f][0])
            if p == old_p and f[1] == old_f[1] and f[2] == old_f[2]:
                log_info("Matched %s (%s and %s)" %
                         (p, old_map[old_f][0]["path"], new_map[f][0]["path"]))

                res += mc_cmp(old_map[old_f], new_map[f])

    for f in sorted(added):
        for i in new_map[f]:
            res.append((ChangeLogEntry.ADDED, None, i))
    for f in sorted(removed):
        for i in old_map[f]:
            res.append((ChangeLogEntry.REMOVED, i, None))
    for f in sorted(both):
        res += mc_cmp(old_map[f], new_map[f])

    return res

def mc_date(mc):
    if isinstance(mc, dict):
        mc = mc["date"]
    return "%04x-%02x-%02x" % (mc & 0xffff, mc >> 24, (mc >> 16) & 0xff)

def mc_rev(mc, date=None):
    While revision is signed for comparison purposes, historically
    it is printed as unsigned,  Oh well.
    global print_date

    if mc["rev"] < 0:
        rev = 2**32 + mc["rev"]
        rev = mc["rev"]

    if date if date is not None else print_date:
        return "%#x (%s)" % (rev, mc_date(mc))
        return "%#x" % rev

def print_changelog(clog, cmap, args):
    for e, old, new in sorted(clog):
        if e == ChangeLogEntry.ADDED:
            print("Addition of %s at revision %s" %
                  (mc_path(new, midword="microcode", cmap=cmap), mc_rev(new)))
        elif e == ChangeLogEntry.REMOVED:
            print("Removal of %s at revision %s" %
                  (mc_path(old, midword="microcode", cmap=cmap), mc_rev(old)))
        elif e == ChangeLogEntry.UPDATED:
            print("Update of %s from revision %s up to %s" %
                  (mc_path(old, midword="microcode", cmap=cmap),
                   mc_rev(old), mc_rev(new)))
        elif e == ChangeLogEntry.DOWNGRADED:
                print("Downgrade of %s from revision %s down to %s" %
                      (mc_path(old, midword="microcode", cmap=cmap),
                       mc_rev(old), mc_rev(new)))
        elif e == ChangeLogEntry.OTHER:
            print("Other change in %s:" % old["path"])
            print("  old: %#x/%#x: rev %s (offs %#x)" %
                  (old["cpuid"], old["pf"], mc_rev(old), old["offs"]))
            print("  new: %#x/%#x: rev %s (offs %#x)" %
                  (new["cpuid"], new["pf"], mc_rev(new), new["offs"]))

class TableStyles:
    TS_CSV = 0
    TS_FANCY = 1

def print_line(line, column_sz):
    print(" | ".join([str(x).ljust(column_sz[i])
                      for i, x in zip(itertools.count(),
                                      [""] * (len(column_sz) -

def print_table(items, header=[], style=TableStyles.TS_CSV):
    if style == TableStyles.TS_CSV:
        for i in items:
    elif style == TableStyles.TS_FANCY:
        column_sz = list(reduce(lambda x, y:
                                map(max, izip_longest(x, y, fillvalue=0)),
                                [[len(x) for x in i]
                                 for i in itertools.chain(header, items)]))
        for i in header:
            print_line(i, column_sz)
        if header:
            print("-+-".join(["-" * x for x in column_sz]))
        for i in items:
            print_line(i, column_sz)

def print_summary(revs, cmap, args):
    m = gen_fn_map(revs)
    cnames_mode = mcnm.MCNM_ABBREV if args.abbrev else mcnm.MCNM_CODENAME

    header = []
    if args.header:
        header.append(["Path", "Offset", "Ext. Offset", "CPUID",
                       "Platform ID Mask", "Revision", "Date", "Checksum",
                       "Codenames"] +
                      (["Models"] if args.models else []))
    tbl = []
    for k in sorted(m.keys()):
        for mc in m[k]:
                        "0x%x" % mc["offs"],
                        "0x%x" % mc["ext_offs"] if "ext_offs" in mc else "-",
                        "0x%05x" % mc["cpuid"],
                        "0x%02x" % mc["pf"],
                        mc_rev(mc, date=False),
                        "0x%08x" % mc["cksum"],
                        get_mc_cnames(mc, cmap, cnames_mode) or ""] +
                       ([get_mc_cnames(mc, cmap,
                        if args.models else []))

    print_table(tbl, header, style=TableStyles.TS_FANCY)

def read_codenames_file(path):
    Supports two formats: new and old
     * old: tab-separated. Field order:
       Segment, (unused), Codename, (dash-separated) Stepping,
       Platform ID mask, CPUID, (unused) Update link, (unused) Specs link
     * new: semicolon-separated; support comments.  Distinguished
       by the first line that starts with octothorp.  Field order:
       Segment, Unused, Codename, Stepping, Platform ID mask, CPUID,
       Abbreviation, Variant(s), Families, Models
    old_fields = ["segment", "_", "codename", "stepping", "pf_mask", "sig",
                  "_update", "_specs"]
    new_fields = ["segment", "_", "codename", "stepping", "pf_mask", "sig",
                  "abbrev", "variant", "families", "models"]
    new_fmt = False
    field_names = old_fields

    res = dict()

        with open(path, "r") as f:
            for line in f:
                line = line.strip()
                if len(line) == 0:
                if line[0] == '#':
                    new_fmt = True
                    field_names = new_fields

                fields = line.split(";" if new_fmt else "\t",
                                    1 + len(field_names))
                fields = dict(zip(field_names, fields))
                if "sig" not in fields:
                    log_warn("Skipping %r (from \"%s\")" % (fields, line))

                sig = fields["sig"] = int(fields["sig"], 16)
                fields["pf_mask"] = int(fields["pf_mask"], 16)
                fields["stepping"] = fields["stepping"].split(",")
                if "variant" in fields:
                    if fields["variant"]:
                        fields["variant"] = fields["variant"].split(",")
                        fields["variant"] = []

                if sig not in res:
                    res[sig] = list()
    except Exception as e:
        log_error("a problem occurred while reading code names: %s" % e)

    return res

def print_discrepancies(rev_map, deps, cmap, args):
    rev_map: dict "name": revs
    deps: list of tuples (name, parent/None)
    sigs = set()

    for p, r in rev_map.items():
        sigs |= set(r.keys())

    if args.header:
        header1 = ["sig"]
        if args.print_vs:
            header2 = [""]
        for p, n, d in deps:
            if args.print_vs:
                add = ""
                if d:
                    for pd, nd, dd in deps:
                        if pd == d:
                            add = "(vs. %s)" % nd
        if args.models:
            header1.append("Model names")
            if args.print_vs:
    header = [header1] + ([header2] if args.print_vs else [])

    tbl = []
    for s in sorted(sigs):
        out = [mc_path(s)]
        print_out = not args.print_filter
        print_date = args.min_date is None

        for p, n, d in deps:
            cur = dict([(x["rev"], x) for x in rev_map[p][s]]) \
                  if s in rev_map[p] else []
            v = "/".join([mc_rev(y) for x, y in sorted(cur.items())]) \
                if cur else "-"
            if d is not None:
                prev = [x["rev"] for x in rev_map[d][s]] if s in rev_map[d] \
                        else []
                if [x for x in cur if x not in prev]:
                    v += " (*)"
                    print_out = True
            if args.min_date is not None and s in rev_map[p]:
                for x in rev_map[p][s]:
                    print_date |= mc_date(x) > args.min_date

        if print_out and print_date:
            if args.models:
                out.append(get_mc_cnames(s, cmap) or "")

    print_table(tbl, header, style=TableStyles.TS_FANCY)

def cmd_summary(args):
    revs = []
    for p in args.filelist:
        revs = read_revs(p, ret=revs)

    codenames_map = read_codenames_file(args.codenames)

    print_summary(revs, codenames_map, args)

    return 0

def cmd_changelog(args):
    codenames_map = read_codenames_file(args.codenames)
    base_path = args.filelist[0]
    upd_path = args.filelist[1]

    base = read_revs(base_path)
    upd = read_revs(upd_path)

    print_changelog(gen_changelog(base, upd), codenames_map, args)

    return 0

def cmd_discrepancies(args):
     * "<" prefix (possibly multiple times) to refer to a previous entry
       to compare against
     * "[name]" prefix is a name reference
    codenames_map = read_codenames_file(args.codenames)
    rev_map = dict()
    deps = list()
    cur = -1

    for path in args.filelist:
        orig_path = path
        name = None
        cur += 1
        dep = None
        while True:
            if path[0] == '<':
                path = path[1:]
                dep = cur - 1 if dep is None else dep - 1
            elif path[0] == '[' and path.find(']') > 0:
                pos = path.find(']')
                name = path[1:pos]
                path = path[pos + 1:]
        if name is None:
            name = path
        if dep is not None and dep < 0:
            log_error("Incorrect dep reference for '%s' (points to index %d)" %
                      (orig_path, dep))
            return 1
        deps.append((path, name, deps[dep][0] if dep is not None else None))
        rev_map[path] = gen_fn_map(read_revs(path), merge=args.merge,

    print_discrepancies(rev_map, deps, codenames_map, args)

    return 0

def parse_cli():
    root_parser = argparse.ArgumentParser(prog="gen_updates",
                                          description="Intel CPU Microcode " +
    root_parser.add_argument("-C", "--codenames", default='codenames',
                             help="Code names file")
    root_parser.add_argument("-v", "--verbose", action="count", default=0,
                             help="Increase output verbosity")

    cmdparsers = root_parser.add_subparsers(title="Commands",
                                            help="main gen_updates commands")

    parser_s = cmdparsers.add_parser("summary",
                                     help="Generate microcode summary")
    parser_s.add_argument("-a", "--abbreviate", action="store_const",
                          dest="abbrev", const=True, default=True,
                          help="Abbreviate code names")
    parser_s.add_argument("-A", "--no-abbreviate", action="store_const",
                          dest="abbrev", const=False,
                          help="Do not abbreviate code names")
    parser_s.add_argument("-m", "--print-models", action="store_const",
                          dest="models", const=True, default=False,
                          help="Print models")
    parser_s.add_argument("-M", "--no-print-models",
                          action="store_const", dest="models",
                          const=False, help="Do not print models")
    parser_s.add_argument("-H", "--no-print-header",
                          action="store_const", dest="header",
                          const=False, default=True,
                          help="Do not print hader")
    parser_s.add_argument("filelist", nargs="*", default=[],
                          help="List or RPMs/directories to process")

    parser_c = cmdparsers.add_parser("changelog",
                                     help="Generate changelog")
    parser_c.add_argument("filelist", nargs=2,
                          help="RPMs/directories to compare")

    parser_d = cmdparsers.add_parser("discrepancies",
                                     help="Generate discrepancies")
    parser_d.add_argument("-s", "--merge-revs", action="store_const",
                          dest="merge", const=True, default=False,
                          help="Merge revisions that come" +
                               " from different files")
    parser_d.add_argument("-S", "--no-merge-revs", action="store_const",
                          dest="merge", const=False,
                          help="Do not Merge revisions that come" +
                               " from different files")
    parser_d.add_argument("-v", "--print-vs", action="store_const",
                          dest="print_vs", const=True, default=False,
                          help="Print base version ")
    parser_d.add_argument("-V", "--no-print-vs", action="store_const",
                          dest="print_vs", const=False,
                          help="Do not Merge revisions that come" +
                               " from different files")
    parser_d.add_argument("-m", "--print-models", action="store_const",
                          dest="models", const=True, default=True,
                          help="Print model names")
    parser_d.add_argument("-M", "--no-print-models", action="store_const",
                          dest="models", const=False,
                          help="Do not print model names")
    parser_d.add_argument("-H", "--no-print-header", action="store_const",
                          dest="header", const=False, default=True,
                          help="Do not print hader")
    parser_d.add_argument("-a", "--print-all-files", action="store_const",
                          dest="print_filter", const=False, default=True,
                          help="Print all files")
    parser_d.add_argument("-c", "--print-changed-files", action="store_const",
                          dest="print_filter", const=True,
                          help="Print only changed files")
    parser_d.add_argument("-d", "--min-date", action="store",
                          help="Minimum date filter")
    parser_d.add_argument("filelist", nargs='*',
                          help="RPMs/directories to compare")

    args = root_parser.parse_args()
    if not hasattr(args, "func"):
        return None
    return args

def main():
    args = parse_cli()
    if args is None:
        return 1

    return args.func(args)

if __name__ == "__main__":