#!/usr/bin/python3
# pylint: disable=line-too-long
"""
generate-all-test-cases
Script to generate all image test cases based on distro x arch x image-type
matrix read from `distro-arch-imagetype-map.json` or passed file. One can
filter the matrix just to a subset using `--distro`, `--arch` or
`--image-types` arguments.
The script is intended to be run from the osbuild-composer sources directory
root, for which the image test cases should be (re)generated.
Example (builds rhel-8 qcow2 images on aarch64 s390x ppc64le):
tools/test-case-generators/generate-all-test-cases \
--output=test/data/manifests \
--image-x86_64 ~/Downloads/Images/Fedora-Cloud-Base-33-1.2.x86_64.qcow2 \
--image-ppc64le ~/Downloads/Images/Fedora-Cloud-Base-33-1.2.ppc64le.qcow2 \
--image-aarch64 ~/Downloads/Images/Fedora-Cloud-Base-33-1.2.aarch64.qcow2 \
--image-s390x ~/Downloads/Images/Fedora-Cloud-Base-33-1.2.s390x.qcow2 \
--arch aarch64 s390x ppc64le \
--distro rhel-8 \
--image-types qcow2
The script spins up an ephemeral QEMU VM (called Runner) per each required
architecture. The CWD (sources dir root) is attached to the Runner using
virtfs as readonly and later mounted into /mnt/sources on the Runner.
The 'output' directory is also attached to the Runner using virtfs as r/w
and later mounted into /mnt/output on the Runner. The next execution on
Runners is as follows:
- Wait for the runner to be configured using cloud-init.
- includes installing osbuild, osbuild-composer and golang
- Create /mnt/sources and /mnt/output and mount appropriate devices
- in /mnt/sources execute tools/test-case-generators/generate-test-cases
for each requested distro and image type combination on the particular
architecture. Output manifest is written into /mnt/output
One can use e.q. Fedora cloud qcow2 images:
x86_64: https://download.fedoraproject.org/pub/fedora/linux/releases/33/Cloud/x86_64/images/Fedora-Cloud-Base-33-1.2.x86_64.qcow2
aarch64: https://download.fedoraproject.org/pub/fedora/linux/releases/33/Cloud/aarch64/images/Fedora-Cloud-Base-33-1.2.aarch64.qcow2
ppc64le: https://download.fedoraproject.org/pub/fedora-secondary/releases/33/Cloud/ppc64le/images/Fedora-Cloud-Base-33-1.2.ppc64le.qcow2
s390x: https://download.fedoraproject.org/pub/fedora-secondary/releases/33/Cloud/s390x/images/Fedora-Cloud-Base-33-1.2.s390x.qcow2
aarch64 special note:
make sure to have the *edk2-aarch64* package installed, which provides UEFI
builds for QEMU and AARCH64 (/usr/share/edk2/aarch64/QEMU_EFI.fd)
https://fedoraproject.org/wiki/Architectures/AArch64/Install_with_QEMU
Images need to have enough disk space to be able to build images using
osbuild. You can resize them using 'qemu-img resize <image> 20G' command.
Known issues:
- The tool does not work with RHEL qcow2 images, becuase the "9p" filesystem
is not supported on RHEL.
HW requirements:
- The x86_64 VM uses 1 CPU and 1GB of RAM
- The aarch64, s390x and ppc64le VMs each uses 2CPU and 2GB of RAM
- Unless filtered using `--arch` option, the script starts 4 VMs in parallel
Tested with:
- Fedora 32 (x86_64) and QEMU version 4.2.1
Not tested:
- installation of newer 'osbuild-composer' or 'osbuild' packages from the
local 'osbuild' repository, which is configured by cloud-init. Not sure
how dnf will behave if there are packages for multiple architectures.
"""
import argparse
import subprocess
import json
import os
import tempfile
import shutil
import time
import socket
import contextlib
import multiprocessing
import logging
import yaml
import paramiko
# setup logging
log = logging.getLogger("generate-all-test-cases")
log.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s [%(levelname)s] - %(processName)s: %(message)s")
sh = logging.StreamHandler()
sh.setFormatter(formatter)
log.addHandler(sh)
# suppress all errors logged by paramiko
paramiko.util.log_to_file(os.devnull)
class RunnerMountPoint:
"""
Data structure to represent basic data used by Runners to attach host
directory as virtfs to the guest and then to mount it.
"""
def __init__(self, src_host, dst_guest, mount_tag, readonly):
self.src_host = src_host
self.dst_guest = dst_guest
self.readonly = readonly
self.mount_tag = mount_tag
@staticmethod
def get_default_runner_mount_points(output_dir, sources_dir=None):
"""
Returns a list of default mount points used by Runners when generating
image test cases.
"""
sources_dir = os.getcwd() if sources_dir is None else sources_dir
mount_points = [
RunnerMountPoint(sources_dir, "/mnt/sources", "sources", True),
RunnerMountPoint(output_dir, "/mnt/output", "output", False)
]
return mount_points
class BaseRunner(contextlib.AbstractContextManager):
"""
Base class representing a QEMU VM runner, which is used for generating image
test case definitions.
Each architecture-specific runner should inherit from this class and define
QEMU_BIN, QEMU_CMD class variable. These will be used to successfully boot
VM for the given architecture.
"""
# name of the QEMU binary to use for running the VM
QEMU_BIN = None
# the actual command to use for running QEMU VM
QEMU_CMD = None
def __init__(self, image, user, passwd, cdrom_iso=None, mount_points=None):
self._check_qemu_bin()
# path to image to run
self.image = image
# path to cdrom iso to attach (for cloud-init)
self.cdrom_iso = cdrom_iso
# host directories to share with the VM as virtfs devices
self.mount_points = mount_points if mount_points else list()
# Popen object of the qemu process
self.vm = None
self.vm_ready = False
# following values are set after the VM is terminated
self.vm_return_code = None
self.vm_stdout = None
self.vm_stderr = None
# credentials used to SSH to the VM
self.vm_user = user
self.vm_pass = passwd
# port on host to forward the guest's SSH port (22) to
self.host_fwd_ssh_port = None
def _check_qemu_bin(self):
"""
Checks whether QEMU binary used for the particular runner is present
on the system.
"""
try:
subprocess.check_call([self.QEMU_BIN, "--version"])
except subprocess.CalledProcessError as _:
raise RuntimeError("QEMU binary {} not found".format(self.QEMU_BIN))
def _get_qemu_cdrom_option(self):
"""
Get the appropriate options for attaching CDROM device to the VM, if
the path to ISO has been provided.
This method may be reimplemented by architecture specific runner class
if needed. Returns a list of strings to be appended to the QEMU command.
"""
if self.cdrom_iso:
return ["-cdrom", self.cdrom_iso]
return list()
def _get_qemu_boot_image_option(self):
"""
Get the appropriate options for specifying the image to boot from.
This method may be reimplemented by architecture specific runner class
if needed.
Returns a list of strings to be appended to the QEMU command.
"""
return [self.image]
def _get_qemu_ssh_fwd_option(self):
"""
Get the appropriate options for forwarding guest's port 22 to host's
random available port.
"""
# get a random free TCP port. This should work in majority of cases
with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.bind(('localhost', 0))
self.host_fwd_ssh_port = sock.getsockname()[1]
return ["-net", "user,hostfwd=tcp::{}-:22".format(self.host_fwd_ssh_port)]
def _run_qemu_cmd(self, qemu_cmd):
"""
Assembles the QEMU command to run and executes using subprocess.
"""
# handle CDROM
qemu_cmd.extend(self._get_qemu_cdrom_option())
# handle mount points
for mount_point in self.mount_points:
src_host = mount_point.src_host
tag = mount_point.mount_tag
readonly = ",readonly" if mount_point.readonly else ""
qemu_cmd.extend(["-virtfs", f"local,path={src_host},mount_tag={tag},security_model=mapped-xattr{readonly}"])
# handle boot image
qemu_cmd.extend(self._get_qemu_boot_image_option())
# handle forwarding of guest's SSH port to host
qemu_cmd.extend(self._get_qemu_ssh_fwd_option())
log.debug("Starting VM using command: '%s'", " ".join(qemu_cmd))
self.vm = subprocess.Popen(
qemu_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
def start(self):
"""
Starts the QEMU process running the VM
"""
if not self.QEMU_CMD:
raise NotImplementedError("The way to spin up QEMU VM is not implemented")
# don't start the qemu process if there is already one running
if self.vm is None:
self._run_qemu_cmd(list(self.QEMU_CMD))
log.info(
"Runner started. You can SSH to it once it has been configured:" +\
"'ssh %s@localhost -p %d' using password: '%s'",
self.vm_user,
self.host_fwd_ssh_port,
self.vm_pass
)
def stop(self):
"""
Stops the QEMU process running the VM
"""
if self.vm:
self.vm.terminate()
try:
# give the process some time to terminate
self.vm.wait(timeout=15)
except subprocess.TimeoutExpired as _:
self.vm.kill()
self.vm.wait(timeout=15)
if self.vm.stdout:
self.vm_stdout = self.vm.stdout.read().decode()
if self.vm.stderr:
self.vm_stderr = self.vm.stderr.read().decode()
self.vm_return_code = self.vm.returncode
if self.vm_return_code == 0:
log.debug("%s process ended with return code %d\n\n" + \
"stdout:\n%s\nstderr:\n%s", self.QEMU_BIN,
self.vm_return_code, self.vm_stdout, self.vm_stderr)
else:
log.error("%s process ended with return code %d\n\n" + \
"stdout:\n%s\nstderr:\n%s", self.QEMU_BIN,
self.vm_return_code, self.vm_stdout, self.vm_stderr)
self.vm = None
self.vm_ready = False
def run_command(self, command):
"""
Runs a given command on the VM over ssh in a blocking fashion.
Calling this method before is_ready() returned True has undefined
behavior.
Returns stdin, stdout, stderr from the run command.
"""
ssh = paramiko.SSHClient()
# don't ask / fail on unknown remote host fingerprint, just accept any
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect("localhost", self.host_fwd_ssh_port, self.vm_user, self.vm_pass)
ssh_tansport = ssh.get_transport()
channel = ssh_tansport.open_session()
# don't log commands when the vm is not yet ready for use
if self.vm_ready:
log.debug("Running on VM: '%s'", command)
channel.exec_command(command)
stdout = ""
stderr = ""
# wait for the command to finish
while True:
while channel.recv_ready():
stdout += channel.recv(1024).decode()
while channel.recv_stderr_ready():
stderr += channel.recv_stderr(1024).decode()
if channel.exit_status_ready():
break
time.sleep(0.01)
returncode = channel.recv_exit_status()
except Exception as e:
# don't log errors when vm is not ready yet, because there are many errors
if self.vm_ready:
log.error("Running command over ssh failed: %s", str(e))
raise e
finally:
# closes the underlying transport
ssh.close()
return stdout, stderr, returncode
def run_command_check_call(self, command):
"""
Runs a command on the VM over ssh in a similar fashion as subprocess.check_call()
"""
_, _, ret = self.run_command(command)
if ret != 0:
raise subprocess.CalledProcessError(ret, command)
def wait_until_ready(self, timeout=None):
"""
Waits for the VM to be ready for use (cloud-init configuration finished).
If timeout is provided
"""
now = time.time()
while not self.is_ready():
if timeout is not None and time.time() > (now + timeout):
raise subprocess.TimeoutExpired("wait_until_ready()", timeout)
time.sleep(15)
def is_ready(self):
"""
Returns True if the VM is ready to be used.
VM is ready after the cloud-init setup is finished.
"""
if self.vm_ready:
return True
# check if the runner didn't terminate unexpectedly before being ready
try:
if self.vm:
self.vm.wait(1)
except subprocess.TimeoutExpired as _:
# process still running
pass
else:
# process not running, call .stop() to log stdout, stderr and retcode
self.stop()
qemu_bin = self.QEMU_BIN
raise RuntimeError(f"'{qemu_bin}' process ended before being ready to use")
try:
# cloud-init touches /var/lib/cloud/instance/boot-finished after it finishes
self.run_command_check_call("ls /var/lib/cloud/instance/boot-finished")
except (paramiko.ChannelException,
paramiko.ssh_exception.NoValidConnectionsError,
paramiko.ssh_exception.SSHException,
EOFError,
socket.timeout,
subprocess.CalledProcessError) as _:
# ignore all reasonable paramiko exceptions, this is useful when the VM is still stating up
pass
else:
log.debug("VM is ready for use")
self.vm_ready = True
return self.vm_ready
def mount_mount_points(self):
"""
This method mounts the needed mount points on the VM.
It should be called only after is_vm_ready() returned True. Otherwise it will fail.
"""
for mount_point in self.mount_points:
dst_guest = mount_point.dst_guest
mount_tag = mount_point.mount_tag
self.run_command_check_call(f"sudo mkdir {dst_guest}")
#! FIXME: "9p" filesystem is not supported on RHEL!
out, err, ret = self.run_command(f"sudo mount -t 9p -o trans=virtio {mount_tag} {dst_guest} -oversion=9p2000.L")
if ret != 0:
log.error("Mounting '%s' to '%s' failed with retcode: %d\nstdout: %s\nstderr: %s", mount_tag, dst_guest,
ret, out, err)
raise subprocess.CalledProcessError(
ret,
f"sudo mount -t 9p -o trans=virtio {mount_tag} {dst_guest} -oversion=9p2000.L")
def __enter__(self):
self.start()
return self
def __exit__(self, *exc_details):
self.stop()
@staticmethod
def prepare_cloud_init_cdrom(userdata, workdir):
"""
Generates a CDROM ISO used as a data source for cloud-init.
Returns path to the generated CDROM ISO image.
"""
iso_path = os.path.join(workdir, "cloudinit.iso")
cidatadir = os.path.join(workdir, "cidata")
user_data_path = os.path.join(cidatadir, "user-data")
meta_data_path = os.path.join(cidatadir, "meta-data")
os.mkdir(cidatadir)
if os.path.isdir(userdata):
with open(user_data_path, "w") as f:
script_dir = os.path.dirname(__file__)
subprocess.check_call(
[os.path.abspath(f"{script_dir}/../gen-user-data"), userdata], stdout=f)
else:
shutil.copy(userdata, user_data_path)
with open(meta_data_path, "w") as f:
f.write("instance-id: nocloud\nlocal-hostname: vm\n")
sysname = os.uname().sysname
log.debug("Generating CDROM ISO image for cloud-init user data: %s", iso_path)
if sysname == "Linux":
subprocess.check_call(
[
"genisoimage",
"-input-charset", "utf-8",
"-output", iso_path,
"-volid", "cidata",
"-joliet",
"-rock",
"-quiet",
"-graft-points",
user_data_path,
meta_data_path
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
elif sysname == "Darwin":
subprocess.check_call(
[
"hdiutil",
"makehybrid",
"-iso",
"-joliet",
"-o", iso_path,
"{cidatadir}"
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
else:
raise NotImplementedError(f"Unsupported system '{sysname}' for generating cdrom iso")
return iso_path
class X86_64Runner(BaseRunner):
"""
VM Runner for x86_64 architecture
"""
QEMU_BIN = "qemu-system-x86_64"
QEMU_CMD = [
QEMU_BIN,
"-M", "accel=kvm:hvf",
"-m", "1024",
"-object", "rng-random,filename=/dev/urandom,id=rng0",
"-device", "virtio-rng-pci,rng=rng0",
"-snapshot",
"-cpu", "max",
"-net", "nic,model=virtio",
]
class Ppc64Runner(BaseRunner):
"""
VM Runner for ppc64le architecture
"""
QEMU_BIN = "qemu-system-ppc64"
QEMU_CMD = [
QEMU_BIN,
"-m", "2048", # RAM
"-smp", "2", # CPUs
"-object", "rng-random,filename=/dev/urandom,id=rng0",
"-device", "virtio-rng-pci,rng=rng0",
"-snapshot",
"-net", "nic,model=virtio",
]
class Aarch64Runner(BaseRunner):
"""
VM Runner for aarch64 architecture
"""
# aarch64 requires UEFI build for QEMU
# https://rwmj.wordpress.com/2015/02/27/how-to-boot-a-fedora-21-aarch64-uefi-guest-on-x86_64/
# https://fedoraproject.org/wiki/Architectures/AArch64/Install_with_QEMU
QEMU_BIN = "qemu-system-aarch64"
QEMU_CMD = [
QEMU_BIN,
"-m", "2048", # RAM
"-smp", "2", # CPUs
"-object", "rng-random,filename=/dev/urandom,id=rng0",
"-device", "virtio-rng-pci,rng=rng0",
"-snapshot",
"-monitor", "none",
"-machine", "virt",
"-cpu", "cortex-a57",
"-bios", "/usr/share/edk2/aarch64/QEMU_EFI.fd", # provided by 'edk2-aarch64' Fedora package
"-net", "nic,model=virtio",
]
class S390xRunner(BaseRunner):
"""
VM Runner for s390x architecture
"""
QEMU_BIN = "qemu-system-s390x"
QEMU_CMD = [
QEMU_BIN,
"-m", "2048", # RAM
"-smp", "2", # CPUs
"-machine", "s390-ccw-virtio",
# disable msa5-base to suppress errors:
# qemu-system-s390x: warning: 'msa5-base' requires 'kimd-sha-512'
# qemu-system-s390x: warning: 'msa5-base' requires 'klmd-sha-512'
"-cpu", "max,msa5-base=no",
"-object", "rng-random,filename=/dev/urandom,id=rng0",
"-device", "virtio-rng-ccw,rng=rng0",
"-monitor", "none",
"-snapshot",
"-net", "nic,model=virtio",
]
def _get_qemu_cdrom_option(self):
"""
Get the appropriate options for attaching CDROM device to the VM, if the path to ISO has been provided.
s390x tries to boot from the CDROM if attached the way as BaseRunner does it.
"""
if self.cdrom_iso:
iso_path = self.cdrom_iso
return list(["-drive", f"file={iso_path},media=cdrom"])
else:
return list()
def _get_qemu_boot_image_option(self):
"""
Get the appropriate options for specifying the image to boot from.
s390x needs to have an explicit 'bootindex' specified.
https://qemu.readthedocs.io/en/latest/system/s390x/bootdevices.html
"""
image_path = self.image
return [
"-drive", f"if=none,id=dr1,file={image_path}",
"-device", "virtio-blk,drive=dr1,bootindex=1"
]
class TestCaseMatrixGenerator(contextlib.AbstractContextManager):
"""
Class representing generation of all test cases based on provided test
cases matrix.
The class should be used as a context manager to ensure that cleanup
of all resources is done (mainly VMs and processes running them).
VM for each architecture is run in a separate process to ensure that
generation is done in parallel.
"""
ARCH_RUNNER_MAP = {
"x86_64": X86_64Runner,
"aarch64": Aarch64Runner,
"ppc64le": Ppc64Runner,
"s390x": S390xRunner
}
def __init__(self, images, ci_userdata, arch_gen_matrix, output, keep_image_info):
"""
'images' is a dict of qcow2 image paths for each supported architecture,
that should be used for VMs:
{
"arch1": "<image path>",
"arch2": "<image path>",
...
}
'ci_userdata' is path to file / directory containing cloud-init user-data used
for generating CDROM ISO image, that is attached to each VM as a cloud-init data source.
'arch_get_matrix' is a dict of requested distro-image_type matrix per architecture:
{
"arch1": {
"distro1": [
"image-type1",
"image-type2"
],
"distro2": [
"image-type2",
"image-type3"
]
},
"arch2": {
"distro2": [
"image-type2"
]
},
...
}
'output' is a directory path, where the generated test case manifests should be stored.
'keep_image_info' specifies whether to pass the '--keep-image-info' option to the 'generate-test-cases' script.
"""
self._processes = list()
self.images = images
self.ci_userdata = ci_userdata
self.arch_gen_matrix = arch_gen_matrix
self.output = output
self.keep_image_info = keep_image_info
# check that we have image for each needed architecture
for arch in self.arch_gen_matrix.keys():
if self.images.get(arch, None) is None:
raise RuntimeError(f"architecture '{arch}' is in requested test matrix, but no image was provided")
@staticmethod
def runner_function(arch, runner_cls, image, user, passwd, cdrom_iso, generation_matrix, output, keep_image_info):
"""
Generate test cases using VM with appropriate architecture.
'generation_matrix' is expected to be already architecture-specific
dict of 'distro' x 'image-type' matrix.
{
"fedora-32": [
"qcow2",
"vmdk"
],
"rhel-84": [
"qcow2",
"tar"
],
...
}
"""
mount_points = RunnerMountPoint.get_default_runner_mount_points(output)
go_tls_timeout_retries = 3
# spin up appropriate VM represented by 'runner'
with runner_cls(image, user, passwd, cdrom_iso, mount_points=mount_points) as runner:
log.info("Waiting for the '%s' runner to be configured by cloud-init", arch)
runner.wait_until_ready()
runner.mount_mount_points()
# don't use /var/tmp for osbuild's store directory to prevent systemd from possibly
# removing some of the downloaded RPMs due to "ageing"
guest_osbuild_store_dir = "/root/osbuild-store"
runner.run_command_check_call(f"sudo mkdir {guest_osbuild_store_dir}")
# Log installed versions of important RPMs
rpm_versions, _, _ = runner.run_command("rpm -q osbuild osbuild-composer")
log.info("Installed packages: %s", " ".join(rpm_versions.split("\n")))
for distro, img_type_list in generation_matrix.items():
for image_type in img_type_list:
log.info("Generating test case for '%s' '%s' image on '%s'", distro, image_type, arch)
# is the image with customizations?
if image_type.endswith("-customize"):
with_customizations = True
image_type = image_type.rstrip("-customize")
else:
with_customizations = False
gen_test_cases_cmd = "cd /mnt/sources; sudo tools/test-case-generators/generate-test-cases" + \
f" --distro {distro} --arch {arch} --image-types {image_type}" + \
f" --store {guest_osbuild_store_dir} --output /mnt/output/"
if with_customizations:
gen_test_cases_cmd += " --with-customizations"
if keep_image_info:
gen_test_cases_cmd += " --keep-image-info"
# allow fixed number of retries if the command fails for a specific reason
for i in range(1, go_tls_timeout_retries+1):
if i > 1:
log.info("Retrying image test case generation (%d of %d)", i, go_tls_timeout_retries)
stdout, stderr, retcode = runner.run_command(gen_test_cases_cmd)
# clean up the osbuild-store dir
runner.run_command_check_call(f"sudo rm -rf {guest_osbuild_store_dir}/*")
if retcode != 0:
log.error("'%s' retcode: %d\nstdout: %s\nstderr: %s", gen_test_cases_cmd, retcode,
stdout, stderr)
# Retry the command, if there was an error due to TLS handshake timeout
# This is happening on all runners using other than host's arch from time to time.
if stderr.find("net/http: TLS handshake timeout") != -1:
continue
else:
log.info("Generating test case for %s-%s-%s - SUCCEEDED", distro, arch, image_type)
# don't retry if the process ended successfully or if there was a different error
break
log.info("'%s' runner finished its work", arch)
# TODO: Possibly remove after testing / fine tuning the script
log.info("Waiting for 1 hour, before terminating the runner (CTRL + c will terminate all VMs)")
time.sleep(3600)
runner.stop()
def generate(self):
"""
Generates all test cases based on provided data
"""
# use the same CDROM ISO image for all VMs
with tempfile.TemporaryDirectory(prefix="osbuild-composer-test-gen-") as tmpdir:
cdrom_iso = BaseRunner.prepare_cloud_init_cdrom(self.ci_userdata, tmpdir)
# Load user / password from the cloud-init user-data
if os.path.isdir(self.ci_userdata):
user_data_path = os.path.join(self.ci_userdata, "user-data.yml")
else:
user_data_path = self.ci_userdata
with open(user_data_path, "r") as ud:
user_data = yaml.safe_load(ud)
vm_user = user_data["user"]
vm_pass = user_data["password"]
# Start a separate runner VM for each required architecture
for arch, generation_matrix in self.arch_gen_matrix.items():
process = multiprocessing.Process(
target=self.runner_function,
args=(arch, self.ARCH_RUNNER_MAP[arch], self.images[arch], vm_user, vm_pass, cdrom_iso,
generation_matrix, self.output, self.keep_image_info))
self._processes.append(process)
process.start()
log.info("Started '%s' runner - %s", arch, process.name)
# wait for all processes to finish
log.info("Waiting for all runner processes to finish")
for process in self._processes:
process.join()
self._processes.clear()
def cleanup(self):
"""
Terminates all running processes of VM runners.
"""
# ensure that all processes running VMs are stopped
for process in self._processes:
process.terminate()
process.join(5)
# kill the process if it didn't terminate yet
if process.exitcode is None:
process.kill()
process.close()
self._processes.clear()
def __exit__(self, *exc_details):
self.cleanup()
def get_args():
"""
Returns ArgumentParser instance specific to this script.
"""
parser = argparse.ArgumentParser(description="(re)generate image all test cases")
parser.add_argument(
"--image-x86_64",
help="Path to x86_64 image to use for QEMU VM",
required=False
)
parser.add_argument(
"--image-ppc64le",
help="Path to ppc64le image to use for QEMU VM",
required=False
)
parser.add_argument(
"--image-aarch64",
help="Path to aarch64 image to use for QEMU VM",
required=False
)
parser.add_argument(
"--image-s390x",
help="Path to s390x image to use for QEMU VM",
required=False
)
parser.add_argument(
"--distro",
help="Filters the matrix for generation only to specified distro",
nargs='*',
required=False
)
parser.add_argument(
"--arch",
help="Filters the matrix for generation only to specified architecture",
nargs='*',
required=False
)
parser.add_argument(
"--image-types",
help="Filters the matrix for generation only to specified image types",
nargs='*',
required=False
)
parser.add_argument(
"--keep-image-info",
action='store_true',
help="Skip image info (re)generation, but keep the one found in the existing test case"
)
parser.add_argument(
"--output",
metavar="OUTPUT_DIRECTORY",
type=os.path.abspath,
help="Path to the output directory, where to store resulting manifests for image test cases",
required=True
)
parser.add_argument(
"--gen-matrix-file",
help="Path to JSON file from which to read the test case generation matrix (distro x arch x image type)." + \
" If not provided, '<script_location_dir>/distro-arch-imagetype-map.json' is read.",
type=os.path.abspath
)
parser.add_argument(
"--ci-userdata",
help="Path to a file/directory with cloud-init user-data, which should be used to configure runner VMs",
type=os.path.abspath
)
parser.add_argument(
"-d", "--debug",
action='store_true',
default=False,
help="turn on debug logging"
)
return parser.parse_args()
# pylint: disable=too-many-arguments,too-many-locals
def main(vm_images, distros, arches, image_types, ci_userdata, gen_matrix_file, output, keep_image_info):
if not os.path.isdir(output):
raise RuntimeError(f"output directory {output} does not exist")
script_dir = os.path.dirname(__file__)
gen_matrix_path = gen_matrix_file if gen_matrix_file else f"{script_dir}/distro-arch-imagetype-map.json"
log.info("Loading generation matrix from file: '%s'", gen_matrix_path)
with open(gen_matrix_path, "r") as gen_matrix_json:
gen_matrix_dict = json.load(gen_matrix_json)
# Filter generation matrix based on passed arguments
for distro in list(gen_matrix_dict.keys()):
# filter the distros list
if distros and distro not in distros:
del gen_matrix_dict[distro]
continue
for arch in list(gen_matrix_dict[distro].keys()):
# filter the arches list of a distro
if arches and arch not in arches:
del gen_matrix_dict[distro][arch]
continue
# filter the image types of a distro and arch
if image_types:
gen_matrix_dict[distro][arch] = list(filter(lambda x: x in image_types, gen_matrix_dict[distro][arch]))
# delete the whole arch if there is no image type left after filtering
if len(gen_matrix_dict[distro][arch]) == 0:
del gen_matrix_dict[distro][arch]
log.debug("gen_matrix_dict:\n%s", json.dumps(gen_matrix_dict, indent=2, sort_keys=True))
# Construct per-architecture matrix dictionary of distro x image type
arch_gen_matrix_dict = dict()
for distro, arches in gen_matrix_dict.items():
for arch, image_types in arches.items():
try:
arch_dict = arch_gen_matrix_dict[arch]
except KeyError as _:
arch_dict = arch_gen_matrix_dict[arch] = dict()
arch_dict[distro] = image_types.copy()
log.debug("arch_gen_matrix_dict:\n%s", json.dumps(arch_gen_matrix_dict, indent=2, sort_keys=True))
ci_userdata_path = ci_userdata if ci_userdata else os.path.abspath(f"{script_dir}/../deploy/gen-test-data")
log.debug("Using cloud-init user-data from '%s'", ci_userdata_path)
with TestCaseMatrixGenerator(vm_images, ci_userdata_path, arch_gen_matrix_dict, output, keep_image_info) as generator:
generator.generate()
if __name__ == '__main__':
args = get_args()
if args.debug:
log.setLevel(logging.DEBUG)
vm_images = {
"x86_64": args.image_x86_64,
"aarch64": args.image_aarch64,
"ppc64le": args.image_ppc64le,
"s390x": args.image_s390x
}
try:
main(
vm_images,
args.distro,
args.arch,
args.image_types,
args.ci_userdata,
args.gen_matrix_file,
args.output,
args.keep_image_info
)
except KeyboardInterrupt as _:
log.info("Interrupted by user")