#!/usr/bin/env python3
import argparse
import hashlib
import json
import os
def hash_file(fd):
BLOCK_SIZE = 4096
hasher = hashlib.sha256()
buf = os.read(fd, BLOCK_SIZE)
while len(buf) > 0:
hasher.update(buf)
buf = os.read(fd, BLOCK_SIZE)
return f"sha256:{hasher.hexdigest()}"
def stat_diff(stat1, stat2, path, differences):
if stat1.st_mode != stat2.st_mode:
props = differences.setdefault(path, {})
props["mode"] = [stat1.st_mode, stat2.st_mode]
return False
if stat1.st_uid != stat2.st_uid:
props = differences.setdefault(path, {})
props["uid"] = [stat1.st_uid, stat2.st_uid]
if stat1.st_gid != stat2.st_gid:
props = differences.setdefault(path, {})
props["gid"] = [stat1.st_gid, stat2.st_gid]
return True
def selinux_diff(path1, path2, path, differences):
try:
label1 = os.getxattr(path1, b"security.selinux", follow_symlinks=False).decode()
label2 = os.getxattr(path2, b"security.selinux", follow_symlinks=False).decode()
except OSError:
return True
if label1 != label2:
props = differences.setdefault(path, {})
props["selinux"] = [label1.strip('\n\0'), label2.strip('\n\0')]
return False
return True
def content_diff(name, dir_fd1, dir_fd2, path, differences):
try:
fd1 = os.open(name, flags=os.O_RDONLY, dir_fd=dir_fd1)
except OSError:
return
try:
fd2 = os.open(name, flags=os.O_RDONLY, dir_fd=dir_fd2)
except OSError:
os.close(fd1)
return
try:
hash1 = hash_file(fd1)
hash2 = hash_file(fd2)
if hash1 != hash2:
props = differences.setdefault(path, {})
props["content"] = [hash1, hash2]
finally:
os.close(fd1)
os.close(fd2)
def symlink_diff(name, dir_fd1, dir_fd2, path, differences):
try:
target1 = os.readlink(name, dir_fd=dir_fd1)
target2 = os.readlink(name, dir_fd=dir_fd2)
except OSError:
return
if target1 != target2:
props = differences.setdefault(path, {})
props["symlink"] = [os.fsdecode(target1), os.fsdecode(target2)]
def diff_aux(dir_fd1, dir_fd2, path, report):
entries1 = set()
with os.scandir(dir_fd1) as it:
for dirent in it:
try:
stat2 = os.stat(dirent.name, dir_fd=dir_fd2, follow_symlinks=False)
except FileNotFoundError:
report["deleted_files"] += [os.path.join(path, dirent.name)]
if dirent.is_dir(follow_symlinks=False):
try:
child_fd = os.open(dirent.name, os.O_DIRECTORY, dir_fd=dir_fd1)
except OSError:
continue
list_dir(child_fd, os.path.join(path, dirent.name), report["deleted_files"])
os.close(child_fd)
continue
entries1.add(dirent.name)
stat1 = dirent.stat(follow_symlinks=False)
selinux_diff(os.path.join(f"/proc/self/fd/{dir_fd1}", dirent.name),
os.path.join(f"/proc/self/fd/{dir_fd2}", dirent.name),
os.path.join(path, dirent.name),
report["differences"])
if not stat_diff(stat1,
stat2,
os.path.join(path, dirent.name),
report["differences"]):
continue
if dirent.is_symlink():
symlink_diff(dirent.name,
dir_fd1,
dir_fd2,
os.path.join(path, dirent.name),
report["differences"])
elif dirent.is_file(follow_symlinks=False):
content_diff(dirent.name,
dir_fd1,
dir_fd2,
os.path.join(path, dirent.name),
report["differences"])
elif dirent.is_dir(follow_symlinks=False):
try:
child_fd1 = os.open(dirent.name, os.O_DIRECTORY, dir_fd=dir_fd1)
except OSError:
continue
try:
child_fd2 = os.open(dirent.name, os.O_DIRECTORY, dir_fd=dir_fd2)
except OSError:
os.close(child_fd1)
continue
diff_aux(child_fd1, child_fd2, os.path.join(path, dirent.name), report)
os.close(child_fd2)
os.close(child_fd1)
with os.scandir(dir_fd2) as it:
for dirent in it:
if dirent.name not in entries1:
report["added_files"] += [os.path.join(path, dirent.name)]
if dirent.is_dir(follow_symlinks=False):
try:
child_fd = os.open(dirent.name, os.O_DIRECTORY, dir_fd=dir_fd2)
except OSError:
continue
list_dir(child_fd, os.path.join(path, dirent.name), report["added_files"])
os.close(child_fd)
def diff(dir_fd1, dir_fd2, report):
stat1 = os.stat(".", dir_fd=dir_fd1, follow_symlinks=False)
stat2 = os.stat(".", dir_fd=dir_fd2, follow_symlinks=False)
selinux_diff(f"/proc/self/fd/{dir_fd1}", f"/proc/self/fd/{dir_fd2}", "/", report["differences"])
stat_diff(stat1, stat2, "/", report["differences"])
diff_aux(dir_fd1, dir_fd2, "/", report)
def list_dir(dir_fd, path, target_list):
with os.scandir(dir_fd) as it:
for dirent in it:
p = os.path.join(path, dirent.name)
target_list.append(p)
if dirent.is_dir(follow_symlinks=False):
try:
child_fd = os.open(dirent.name, os.O_DIRECTORY, dir_fd=dir_fd)
except OSError:
continue
list_dir(child_fd, p, target_list)
os.close(child_fd)
def main():
parser = argparse.ArgumentParser(description="Recursively compare file system trees")
parser.add_argument("dir1", metavar="DIRECTORY1",
help="first directory to compare")
parser.add_argument("dir2", metavar="DIRECTORY2",
help="second directory to compare")
args = parser.parse_args()
report = {}
report["added_files"] = []
report["deleted_files"] = []
report["differences"] = {}
dir_fd1 = os.open(args.dir1, os.O_DIRECTORY)
dir_fd2 = os.open(args.dir2, os.O_DIRECTORY)
diff(dir_fd1, dir_fd2, report)
os.close(dir_fd2)
os.close(dir_fd1)
print(json.dumps(report, indent=2))
if __name__ == "__main__":
main()