Blob Blame History Raw
#!/usr/bin/env python3

import argparse
import hashlib
import json
import os


def hash_file(fd):
    BLOCK_SIZE = 4096
    hasher = hashlib.sha256()
    buf = os.read(fd, BLOCK_SIZE)
    while len(buf) > 0:
        hasher.update(buf)
        buf = os.read(fd, BLOCK_SIZE)

    return f"sha256:{hasher.hexdigest()}"


def stat_diff(stat1, stat2, path, differences):
    if stat1.st_mode != stat2.st_mode:
        props = differences.setdefault(path, {})
        props["mode"] = [stat1.st_mode, stat2.st_mode]
        return False
    if stat1.st_uid != stat2.st_uid:
        props = differences.setdefault(path, {})
        props["uid"] = [stat1.st_uid, stat2.st_uid]
    if stat1.st_gid != stat2.st_gid:
        props = differences.setdefault(path, {})
        props["gid"] = [stat1.st_gid, stat2.st_gid]
    return True


def selinux_diff(path1, path2, path, differences):
    try:
        label1 = os.getxattr(path1, b"security.selinux", follow_symlinks=False).decode()
        label2 = os.getxattr(path2, b"security.selinux", follow_symlinks=False).decode()
    except OSError:
        return True
    if label1 != label2:
        props = differences.setdefault(path, {})
        props["selinux"] = [label1.strip('\n\0'), label2.strip('\n\0')]
        return False
    return True


def content_diff(name, dir_fd1, dir_fd2, path, differences):
    try:
        fd1 = os.open(name, flags=os.O_RDONLY, dir_fd=dir_fd1)
    except OSError:
        return
    try:
        fd2 = os.open(name, flags=os.O_RDONLY, dir_fd=dir_fd2)
    except OSError:
        os.close(fd1)
        return
    try:
        hash1 = hash_file(fd1)
        hash2 = hash_file(fd2)

        if hash1 != hash2:
            props = differences.setdefault(path, {})
            props["content"] = [hash1, hash2]
    finally:
        os.close(fd1)
        os.close(fd2)


def symlink_diff(name, dir_fd1, dir_fd2, path, differences):
    try:
        target1 = os.readlink(name, dir_fd=dir_fd1)
        target2 = os.readlink(name, dir_fd=dir_fd2)
    except OSError:
        return
    if target1 != target2:
        props = differences.setdefault(path, {})
        props["symlink"] = [os.fsdecode(target1), os.fsdecode(target2)]


def diff_aux(dir_fd1, dir_fd2, path, report):
    entries1 = set()
    with os.scandir(dir_fd1) as it:
        for dirent in it:
            try:
                stat2 = os.stat(dirent.name, dir_fd=dir_fd2, follow_symlinks=False)
            except FileNotFoundError:
                report["deleted_files"] += [os.path.join(path, dirent.name)]
                if dirent.is_dir(follow_symlinks=False):
                    try:
                        child_fd = os.open(dirent.name, os.O_DIRECTORY, dir_fd=dir_fd1)
                    except OSError:
                        continue
                    list_dir(child_fd, os.path.join(path, dirent.name), report["deleted_files"])
                    os.close(child_fd)
                continue
            entries1.add(dirent.name)
            stat1 = dirent.stat(follow_symlinks=False)
            selinux_diff(os.path.join(f"/proc/self/fd/{dir_fd1}", dirent.name),
                         os.path.join(f"/proc/self/fd/{dir_fd2}", dirent.name),
                         os.path.join(path, dirent.name),
                         report["differences"])
            if not stat_diff(stat1,
                             stat2,
                             os.path.join(path, dirent.name),
                             report["differences"]):
                continue
            if dirent.is_symlink():
                symlink_diff(dirent.name,
                             dir_fd1,
                             dir_fd2,
                             os.path.join(path, dirent.name),
                             report["differences"])
            elif dirent.is_file(follow_symlinks=False):
                content_diff(dirent.name,
                             dir_fd1,
                             dir_fd2,
                             os.path.join(path, dirent.name),
                             report["differences"])
            elif dirent.is_dir(follow_symlinks=False):
                try:
                    child_fd1 = os.open(dirent.name, os.O_DIRECTORY, dir_fd=dir_fd1)
                except OSError:
                    continue
                try:
                    child_fd2 = os.open(dirent.name, os.O_DIRECTORY, dir_fd=dir_fd2)
                except OSError:
                    os.close(child_fd1)
                    continue
                diff_aux(child_fd1, child_fd2, os.path.join(path, dirent.name), report)
                os.close(child_fd2)
                os.close(child_fd1)
    with os.scandir(dir_fd2) as it:
        for dirent in it:
            if dirent.name not in entries1:
                report["added_files"] += [os.path.join(path, dirent.name)]
                if dirent.is_dir(follow_symlinks=False):
                    try:
                        child_fd = os.open(dirent.name, os.O_DIRECTORY, dir_fd=dir_fd2)
                    except OSError:
                        continue
                    list_dir(child_fd, os.path.join(path, dirent.name), report["added_files"])
                    os.close(child_fd)


def diff(dir_fd1, dir_fd2, report):
    stat1 = os.stat(".", dir_fd=dir_fd1, follow_symlinks=False)
    stat2 = os.stat(".", dir_fd=dir_fd2, follow_symlinks=False)
    selinux_diff(f"/proc/self/fd/{dir_fd1}", f"/proc/self/fd/{dir_fd2}", "/", report["differences"])
    stat_diff(stat1, stat2, "/", report["differences"])
    diff_aux(dir_fd1, dir_fd2, "/", report)


def list_dir(dir_fd, path, target_list):
    with os.scandir(dir_fd) as it:
        for dirent in it:
            p = os.path.join(path, dirent.name)
            target_list.append(p)
            if dirent.is_dir(follow_symlinks=False):
                try:
                    child_fd = os.open(dirent.name, os.O_DIRECTORY, dir_fd=dir_fd)
                except OSError:
                    continue
                list_dir(child_fd, p, target_list)
                os.close(child_fd)


def main():
    parser = argparse.ArgumentParser(description="Recursively compare file system trees")
    parser.add_argument("dir1", metavar="DIRECTORY1",
                        help="first directory to compare")
    parser.add_argument("dir2", metavar="DIRECTORY2",
                        help="second directory to compare")
    args = parser.parse_args()

    report = {}
    report["added_files"] = []
    report["deleted_files"] = []
    report["differences"] = {}

    dir_fd1 = os.open(args.dir1, os.O_DIRECTORY)
    dir_fd2 = os.open(args.dir2, os.O_DIRECTORY)
    diff(dir_fd1, dir_fd2, report)
    os.close(dir_fd2)
    os.close(dir_fd1)

    print(json.dumps(report, indent=2))


if __name__ == "__main__":
    main()