Blob Blame History Raw
#!/usr/bin/python3

import argparse
import subprocess
import json
import os
import sys
import tempfile


def get_subprocess_stdout(*args, **kwargs):
    sp = subprocess.run(*args, **kwargs, stdout=subprocess.PIPE)
    if sp.returncode != 0:
        sys.stderr.write(sp.stdout)
        sys.exit(1)

    return sp.stdout


def run_osbuild(manifest, store, output, export):
    with tempfile.TemporaryFile(dir="/tmp", prefix="osbuild-test-case-generator-", suffix=".log") as log:
        try:
            subprocess.run(["osbuild",
                            "--store", store,
                            "--output-directory", output,
                            "--checkpoint", "build",
                            "--export", export,
                            "-"],
                            stdout=log,
                            stderr=subprocess.STDOUT,
                            check=True,
                            encoding="utf-8",
                            input=json.dumps(manifest))
        except:
            log.seek(0)
            print(log.read())
            raise


class TestCaseGenerator:
    '''
    This class generates a json test case. It accepts a test_case_request as input to the constructor:

    {
        "boot": {
            "type": "qemu"
        },
        "compose-request": {
            "distro": "fedora-30",
            "arch": "x86_64",
            "image-type": "qcow2",
            "filename": "disk.qcow2",
            "blueprint": {}
        }
    }

    It then outputs a json test case from the get_test_case() method.
    '''

    def __init__(self, test_case_request):
        self.test_case = test_case_request

    def get_test_case(self, no_image_info, store):
        compose_request = json.dumps(self.test_case["compose-request"])

        pipeline_command = ["go", "run", "./cmd/osbuild-pipeline", "-"]
        self.test_case["manifest"] = json.loads(get_subprocess_stdout(pipeline_command, input=compose_request, encoding="utf-8"))

        pipeline_command = ["go", "run", "./cmd/osbuild-pipeline", "-rpmmd", "-"]
        self.test_case["rpmmd"] = json.loads(get_subprocess_stdout(pipeline_command, input=compose_request, encoding="utf-8"))

        if no_image_info == False:
            with tempfile.TemporaryDirectory(dir=store, prefix="test-case-output-") as output:
                manifest = self.test_case["manifest"]
                version = manifest.get("version", "1")
                if version == "1":
                    export = "assembler"
                elif version == "2":
                    export = manifest["pipelines"][-1]["name"]
                else:
                    print(f"Unknown manifest format version {version}")
                    sys.exit(1)
                run_osbuild(manifest, store, output, export)
                image_file = os.path.join(output, export, self.test_case["compose-request"]["filename"])
                image_info = get_subprocess_stdout(["tools/image-info", image_file], encoding="utf-8")
                self.test_case["image-info"] = json.loads(image_info)

        return self.test_case


def generate_test_case(test_type, distro, arch, output_format, test_case_request, keep_image_info, store, output):
    print(f"generating test case for {output_format}")
    generator = TestCaseGenerator(test_case_request)
    test_case = generator.get_test_case(keep_image_info, store)
    name = distro.replace("-", "_") + "-" + arch + "-" + output_format.replace("-", "_") + "-" + test_type + ".json"
    file_name = output + "/" + name
    if keep_image_info:
        try:
            with open(file_name, 'r') as case_file:
                old_test_case = json.load(case_file)
                image_info = old_test_case.get("image-info")
                if image_info:
                    test_case["image-info"] = image_info
        except:
            pass
    with open(file_name, 'w') as case_file:
        json.dump(test_case, case_file, indent=2)
        case_file.write("\n")


CUSTOMIZATIONS_BLUEPRINT =  {
    "packages": [
        {
            "name": "bash",
            "version": "*"
        }
    ],
    "groups": [
        {
            "name": "core"
        }
    ],
    "customizations": {
        "hostname": "my-host",
        "kernel": {
            "append": "debug"
        },
        "sshkey": [
            {
                "user": "user1",
                "key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC61wMCjOSHwbVb4VfVyl5sn497qW4PsdQ7Ty7aD6wDNZ/QjjULkDV/yW5WjDlDQ7UqFH0Sr7vywjqDizUAqK7zM5FsUKsUXWHWwg/ehKg8j9xKcMv11AkFoUoujtfAujnKODkk58XSA9whPr7qcw3vPrmog680pnMSzf9LC7J6kXfs6lkoKfBh9VnlxusCrw2yg0qI1fHAZBLPx7mW6+me71QZsS6sVz8v8KXyrXsKTdnF50FjzHcK9HXDBtSJS5wA3fkcRYymJe0o6WMWNdgSRVpoSiWaHHmFgdMUJaYoCfhXzyl7LtNb3Q+Sveg+tJK7JaRXBLMUllOlJ6ll5Hod root@localhost"
            }
        ],
        "user": [
            {
                "name": "user2",
                "description": "description 2",
                "password": "$6$BhyxFBgrEFh0VrPJ$MllG8auiU26x2pmzL4.1maHzPHrA.4gTdCvlATFp8HJU9UPee4zCS9BVl2HOzKaUYD/zEm8r/OF05F2icWB0K/",
                "key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC61wMCjOSHwbVb4VfVyl5sn497qW4PsdQ7Ty7aD6wDNZ/QjjULkDV/yW5WjDlDQ7UqFH0Sr7vywjqDizUAqK7zM5FsUKsUXWHWwg/ehKg8j9xKcMv11AkFoUoujtfAujnKODkk58XSA9whPr7qcw3vPrmog680pnMSzf9LC7J6kXfs6lkoKfBh9VnlxusCrw2yg0qI1fHAZBLPx7mW6+me71QZsS6sVz8v8KXyrXsKTdnF50FjzHcK9HXDBtSJS5wA3fkcRYymJe0o6WMWNdgSRVpoSiWaHHmFgdMUJaYoCfhXzyl7LtNb3Q+Sveg+tJK7JaRXBLMUllOlJ6ll5Hod root@localhost",
                "home": "/home/home2",
                "shell": "/bin/sh",
                "groups": [
                    "group1"
                ],
                "uid": 1020,
                "gid": 1050,
            }
        ],
        "group": [
            {
                "name": "group1",
                "gid": 1030
            },
            {
                "name": "group2",
                "gid": 1050
            }
        ],
        "timezone": {
            "timezone": "Europe/London",
            "ntpservers": [
                "time.example.com"
            ]
        },
        "locale": {
            "languages": [
                "el_CY.UTF-8"
            ],
            "keyboard": "dvorak"
        },
#       "firewall": {
#           "ports": [
#               "25:tcp"
#           ],
#           "services": {
#               "enabled": [
#                   "cockpit"
#               ],
#               "disabled": [
#                   "ssh"
#               ]
#           }
#       },
        "services": {
            "enabled": [
                "sshd.socket"
            ],
            "disabled": [
                "bluetooth.service"
            ]
        }
    }
}


def main(distro, arch, image_types, keep_image_info, store, output, with_customizations):
    with open("tools/test-case-generators/format-request-map.json") as format_request_json:
        format_request_dict = json.load(format_request_json)
    with open("tools/test-case-generators/repos.json") as repos_json:
        repos_dict = json.load(repos_json)

    # Apply all customizations from the CUSTOMIZATIONS_BLUEPRINT dictionary
    if with_customizations:
        if len(image_types) > 1 or image_types[0] != "qcow2":
            print("Customizations are only available for qcow2 image type")
            sys.exit(1)

        test_case_request = {
            "compose-request": {
                "distro": distro,
                "arch": arch,
                "repositories": repos_dict[distro][arch],
                "image-type": "qcow2",
                "filename": "disk.qcow2",
                "blueprint": CUSTOMIZATIONS_BLUEPRINT,
            }
        }
        generate_test_case("customize", distro, arch, "qcow2", test_case_request, keep_image_info, store, output)
        return

    for output_format, test_case_request in format_request_dict.items():
        filtered_request = dict(filter(lambda i: i[0] != "overrides", test_case_request.items()))
        if filtered_request["compose-request"]["image-type"] not in image_types:
                continue
        filtered_request["compose-request"]["distro"] = distro
        filtered_request["compose-request"]["arch"] = arch
        filtered_request["compose-request"]["repositories"] = repos_dict[distro][arch]

        if distro in test_case_request["overrides"]:
            filtered_request["compose-request"].update(test_case_request["overrides"][distro])

        generate_test_case("boot", distro, arch, output_format, filtered_request, keep_image_info, store, output)

    return


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description="Generate test cases")
    parser.add_argument("--distro", help="distribution for test cases", required=True)
    parser.add_argument("--arch", help="architecture for test cases", required=True)
    parser.add_argument("--image-types", help="image types for test cases", required=True, nargs='+')
    parser.add_argument("--keep-image-info", action='store_true', help="skip image info (re)generation, but keep the one found in the existing test case")
    parser.add_argument("--store", metavar="STORE_DIRECTORY", type=os.path.abspath, help="path to the osbuild store", required=True)
    parser.add_argument("--output", metavar="OUTPUT_DIRECTORY", type=os.path.abspath, help="path to the output directory", required=True)
    parser.add_argument("--with-customizations", action='store_true', help="apply all currently supported customizations to the image (qcow2 only)")
    args = parser.parse_args()

    main(args.distro, args.arch, args.image_types, args.keep_image_info, args.store, args.output, args.with_customizations)
    sys.exit()