Blame vmbus_testing

Packit 135353
#!/usr/bin/env python3
Packit 135353
# SPDX-License-Identifier: GPL-2.0
Packit 135353
#
Packit 135353
# Program to allow users to fuzz test Hyper-V drivers
Packit 135353
# by interfacing with Hyper-V debugfs attributes.
Packit 135353
# Current test methods available:
Packit 135353
#       1. delay testing
Packit 135353
#
Packit 135353
# Current file/directory structure of hyper-V debugfs:
Packit 135353
#       /sys/kernel/debug/hyperv/UUID
Packit 135353
#       /sys/kernel/debug/hyperv/UUID/<test-state filename>
Packit 135353
#       /sys/kernel/debug/hyperv/UUID/<test-method sub-directory>
Packit 135353
#
Packit 135353
# author: Branden Bonaby <brandonbonaby94@gmail.com>
Packit 135353
Packit 135353
import os
Packit 135353
import cmd
Packit 135353
import argparse
Packit 135353
import glob
Packit 135353
from argparse import RawDescriptionHelpFormatter
Packit 135353
from argparse import RawTextHelpFormatter
Packit 135353
from enum import Enum
Packit 135353
Packit 135353
# Do not change unless, you change the debugfs attributes
Packit 135353
# in /drivers/hv/debugfs.c. All fuzz testing
Packit 135353
# attributes will start with "fuzz_test".
Packit 135353
Packit 135353
# debugfs path for hyperv must exist before proceeding
Packit 135353
debugfs_hyperv_path = "/sys/kernel/debug/hyperv"
Packit 135353
if not os.path.isdir(debugfs_hyperv_path):
Packit 135353
        print("{} doesn't exist/check permissions".format(debugfs_hyperv_path))
Packit 135353
        exit(-1)
Packit 135353
Packit 135353
class dev_state(Enum):
Packit 135353
        off = 0
Packit 135353
        on = 1
Packit 135353
Packit 135353
# File names, that correspond to the files created in
Packit 135353
# /drivers/hv/debugfs.c
Packit 135353
class f_names(Enum):
Packit 135353
        state_f = "fuzz_test_state"
Packit 135353
        buff_f =  "fuzz_test_buffer_interrupt_delay"
Packit 135353
        mess_f =  "fuzz_test_message_delay"
Packit 135353
Packit 135353
# Both single_actions and all_actions are used
Packit 135353
# for error checking and to allow for some subparser
Packit 135353
# names to be abbreviated. Do not abbreviate the
Packit 135353
# test method names, as it will become less intuitive
Packit 135353
# as to what the user can do. If you do decide to
Packit 135353
# abbreviate the test method name, make sure the main
Packit 135353
# function reflects this change.
Packit 135353
Packit 135353
all_actions = [
Packit 135353
        "disable_all",
Packit 135353
        "D",
Packit 135353
        "enable_all",
Packit 135353
        "view_all",
Packit 135353
        "V"
Packit 135353
]
Packit 135353
Packit 135353
single_actions = [
Packit 135353
        "disable_single",
Packit 135353
        "d",
Packit 135353
        "enable_single",
Packit 135353
        "view_single",
Packit 135353
        "v"
Packit 135353
]
Packit 135353
Packit 135353
def main():
Packit 135353
Packit 135353
        file_map = recursive_file_lookup(debugfs_hyperv_path, dict())
Packit 135353
        args = parse_args()
Packit 135353
        if (not args.action):
Packit 135353
                print ("Error, no options selected...exiting")
Packit 135353
                exit(-1)
Packit 135353
        arg_set = { k for (k,v) in vars(args).items() if v and k != "action" }
Packit 135353
        arg_set.add(args.action)
Packit 135353
        path = args.path if "path" in arg_set else None
Packit 135353
        if (path and path[-1] == "/"):
Packit 135353
                path = path[:-1]
Packit 135353
        validate_args_path(path, arg_set, file_map)
Packit 135353
        if (path and "enable_single" in arg_set):
Packit 135353
            state_path = locate_state(path, file_map)
Packit 135353
            set_test_state(state_path, dev_state.on.value, args.quiet)
Packit 135353
Packit 135353
        # Use subparsers as the key for different actions
Packit 135353
        if ("delay" in arg_set):
Packit 135353
                validate_delay_values(args.delay_time)
Packit 135353
                if (args.enable_all):
Packit 135353
                        set_delay_all_devices(file_map, args.delay_time,
Packit 135353
                                              args.quiet)
Packit 135353
                else:
Packit 135353
                        set_delay_values(path, file_map, args.delay_time,
Packit 135353
                                         args.quiet)
Packit 135353
        elif ("disable_all" in arg_set or "D" in arg_set):
Packit 135353
                disable_all_testing(file_map)
Packit 135353
        elif ("disable_single" in arg_set or "d" in arg_set):
Packit 135353
                disable_testing_single_device(path, file_map)
Packit 135353
        elif ("view_all" in arg_set or "V" in arg_set):
Packit 135353
                get_all_devices_test_status(file_map)
Packit 135353
        elif ("view_single" in arg_set or  "v" in arg_set):
Packit 135353
                get_device_test_values(path, file_map)
Packit 135353
Packit 135353
# Get the state location
Packit 135353
def locate_state(device, file_map):
Packit 135353
        return file_map[device][f_names.state_f.value]
Packit 135353
Packit 135353
# Validate delay values to make sure they are acceptable to
Packit 135353
# enable delays on a device
Packit 135353
def validate_delay_values(delay):
Packit 135353
Packit 135353
        if (delay[0]  == -1 and delay[1] == -1):
Packit 135353
                print("\nError, At least 1 value must be greater than 0")
Packit 135353
                exit(-1)
Packit 135353
        for i in delay:
Packit 135353
                if (i < -1 or i == 0 or i > 1000):
Packit 135353
                        print("\nError, Values must be  equal to -1 "
Packit 135353
                              "or be > 0 and <= 1000")
Packit 135353
                        exit(-1)
Packit 135353
Packit 135353
# Validate argument path
Packit 135353
def validate_args_path(path, arg_set, file_map):
Packit 135353
Packit 135353
        if (not path and any(element in arg_set for element in single_actions)):
Packit 135353
                print("Error, path (-p) REQUIRED for the specified option. "
Packit 135353
                      "Use (-h) to check usage.")
Packit 135353
                exit(-1)
Packit 135353
        elif (path and any(item in arg_set for item in all_actions)):
Packit 135353
                print("Error, path (-p) NOT REQUIRED for the specified option. "
Packit 135353
                      "Use (-h) to check usage." )
Packit 135353
                exit(-1)
Packit 135353
        elif (path not in file_map and any(item in arg_set
Packit 135353
                                           for item in single_actions)):
Packit 135353
                print("Error, path '{}' not a valid vmbus device".format(path))
Packit 135353
                exit(-1)
Packit 135353
Packit 135353
# display Testing status of single device
Packit 135353
def get_device_test_values(path, file_map):
Packit 135353
Packit 135353
        for name in file_map[path]:
Packit 135353
                file_location = file_map[path][name]
Packit 135353
                print( name + " = " + str(read_test_files(file_location)))
Packit 135353
Packit 135353
# Create a map of the vmbus devices and their associated files
Packit 135353
# [key=device, value = [key = filename, value = file path]]
Packit 135353
def recursive_file_lookup(path, file_map):
Packit 135353
Packit 135353
        for f_path in glob.iglob(path + '**/*'):
Packit 135353
                if (os.path.isfile(f_path)):
Packit 135353
                        if (f_path.rsplit("/",2)[0] == debugfs_hyperv_path):
Packit 135353
                                directory = f_path.rsplit("/",1)[0]
Packit 135353
                        else:
Packit 135353
                                directory = f_path.rsplit("/",2)[0]
Packit 135353
                        f_name = f_path.split("/")[-1]
Packit 135353
                        if (file_map.get(directory)):
Packit 135353
                                file_map[directory].update({f_name:f_path})
Packit 135353
                        else:
Packit 135353
                                file_map[directory] = {f_name:f_path}
Packit 135353
                elif (os.path.isdir(f_path)):
Packit 135353
                        recursive_file_lookup(f_path,file_map)
Packit 135353
        return file_map
Packit 135353
Packit 135353
# display Testing state of devices
Packit 135353
def get_all_devices_test_status(file_map):
Packit 135353
Packit 135353
        for device in file_map:
Packit 135353
                if (get_test_state(locate_state(device, file_map)) is 1):
Packit 135353
                        print("Testing = ON for: {}"
Packit 135353
                              .format(device.split("/")[5]))
Packit 135353
                else:
Packit 135353
                        print("Testing = OFF for: {}"
Packit 135353
                              .format(device.split("/")[5]))
Packit 135353
Packit 135353
# read the vmbus device files, path must be absolute path before calling
Packit 135353
def read_test_files(path):
Packit 135353
        try:
Packit 135353
                with open(path,"r") as f:
Packit 135353
                        file_value = f.readline().strip()
Packit 135353
                return int(file_value)
Packit 135353
Packit 135353
        except IOError as e:
Packit 135353
                errno, strerror = e.args
Packit 135353
                print("I/O error({0}): {1} on file {2}"
Packit 135353
                      .format(errno, strerror, path))
Packit 135353
                exit(-1)
Packit 135353
        except ValueError:
Packit 135353
                print ("Element to int conversion error in: \n{}".format(path))
Packit 135353
                exit(-1)
Packit 135353
Packit 135353
# writing to vmbus device files, path must be absolute path before calling
Packit 135353
def write_test_files(path, value):
Packit 135353
Packit 135353
        try:
Packit 135353
                with open(path,"w") as f:
Packit 135353
                        f.write("{}".format(value))
Packit 135353
        except IOError as e:
Packit 135353
                errno, strerror = e.args
Packit 135353
                print("I/O error({0}): {1} on file {2}"
Packit 135353
                      .format(errno, strerror, path))
Packit 135353
                exit(-1)
Packit 135353
Packit 135353
# set testing state of device
Packit 135353
def set_test_state(state_path, state_value, quiet):
Packit 135353
Packit 135353
        write_test_files(state_path, state_value)
Packit 135353
        if (get_test_state(state_path) is 1):
Packit 135353
                if (not quiet):
Packit 135353
                        print("Testing = ON for device: {}"
Packit 135353
                              .format(state_path.split("/")[5]))
Packit 135353
        else:
Packit 135353
                if (not quiet):
Packit 135353
                        print("Testing = OFF for device: {}"
Packit 135353
                              .format(state_path.split("/")[5]))
Packit 135353
Packit 135353
# get testing state of device
Packit 135353
def get_test_state(state_path):
Packit 135353
        #state == 1 - test = ON
Packit 135353
        #state == 0 - test = OFF
Packit 135353
        return  read_test_files(state_path)
Packit 135353
Packit 135353
# write 1 - 1000 microseconds, into a single device using the
Packit 135353
# fuzz_test_buffer_interrupt_delay and fuzz_test_message_delay
Packit 135353
# debugfs attributes
Packit 135353
def set_delay_values(device, file_map, delay_length, quiet):
Packit 135353
Packit 135353
        try:
Packit 135353
                interrupt = file_map[device][f_names.buff_f.value]
Packit 135353
                message = file_map[device][f_names.mess_f.value]
Packit 135353
Packit 135353
                # delay[0]- buffer interrupt delay, delay[1]- message delay
Packit 135353
                if (delay_length[0] >= 0 and delay_length[0] <= 1000):
Packit 135353
                        write_test_files(interrupt, delay_length[0])
Packit 135353
                if (delay_length[1] >= 0 and delay_length[1] <= 1000):
Packit 135353
                        write_test_files(message, delay_length[1])
Packit 135353
                if (not quiet):
Packit 135353
                        print("Buffer delay testing = {} for: {}"
Packit 135353
                              .format(read_test_files(interrupt),
Packit 135353
                                      interrupt.split("/")[5]))
Packit 135353
                        print("Message delay testing = {} for: {}"
Packit 135353
                              .format(read_test_files(message),
Packit 135353
                                      message.split("/")[5]))
Packit 135353
        except IOError as e:
Packit 135353
                errno, strerror = e.args
Packit 135353
                print("I/O error({0}): {1} on files {2}{3}"
Packit 135353
                      .format(errno, strerror, interrupt, message))
Packit 135353
                exit(-1)
Packit 135353
Packit 135353
# enabling delay testing on all devices
Packit 135353
def set_delay_all_devices(file_map, delay, quiet):
Packit 135353
Packit 135353
        for device in (file_map):
Packit 135353
                set_test_state(locate_state(device, file_map),
Packit 135353
                               dev_state.on.value,
Packit 135353
                               quiet)
Packit 135353
                set_delay_values(device, file_map, delay, quiet)
Packit 135353
Packit 135353
# disable all testing on a SINGLE device.
Packit 135353
def disable_testing_single_device(device, file_map):
Packit 135353
Packit 135353
        for name in file_map[device]:
Packit 135353
                file_location = file_map[device][name]
Packit 135353
                write_test_files(file_location, dev_state.off.value)
Packit 135353
        print("ALL testing now OFF for {}".format(device.split("/")[-1]))
Packit 135353
Packit 135353
# disable all testing on ALL devices
Packit 135353
def disable_all_testing(file_map):
Packit 135353
Packit 135353
        for device in file_map:
Packit 135353
                disable_testing_single_device(device, file_map)
Packit 135353
Packit 135353
def parse_args():
Packit 135353
        parser = argparse.ArgumentParser(prog = "vmbus_testing",usage ="\n"
Packit 135353
                "%(prog)s [delay]   [-h] [-e|-E] -t [-p]\n"
Packit 135353
                "%(prog)s [view_all       | V]      [-h]\n"
Packit 135353
                "%(prog)s [disable_all    | D]      [-h]\n"
Packit 135353
                "%(prog)s [disable_single | d]      [-h|-p]\n"
Packit 135353
                "%(prog)s [view_single    | v]      [-h|-p]\n"
Packit 135353
                "%(prog)s --version\n",
Packit 135353
                description = "\nUse lsvmbus to get vmbus device type "
Packit 135353
                "information.\n" "\nThe debugfs root path is "
Packit 135353
                "/sys/kernel/debug/hyperv",
Packit 135353
                formatter_class = RawDescriptionHelpFormatter)
Packit 135353
        subparsers = parser.add_subparsers(dest = "action")
Packit 135353
        parser.add_argument("--version", action = "version",
Packit 135353
                version = '%(prog)s 0.1.0')
Packit 135353
        parser.add_argument("-q","--quiet", action = "store_true",
Packit 135353
                help = "silence none important test messages."
Packit 135353
                       " This will only work when enabling testing"
Packit 135353
                       " on a device.")
Packit 135353
        # Use the path parser to hold the --path attribute so it can
Packit 135353
        # be shared between subparsers. Also do the same for the state
Packit 135353
        # parser, as all testing methods will use --enable_all and
Packit 135353
        # enable_single.
Packit 135353
        path_parser = argparse.ArgumentParser(add_help=False)
Packit 135353
        path_parser.add_argument("-p","--path", metavar = "",
Packit 135353
                help = "Debugfs path to a vmbus device. The path "
Packit 135353
                "must be the absolute path to the device.")
Packit 135353
        state_parser = argparse.ArgumentParser(add_help=False)
Packit 135353
        state_group = state_parser.add_mutually_exclusive_group(required = True)
Packit 135353
        state_group.add_argument("-E", "--enable_all", action = "store_const",
Packit 135353
                                 const = "enable_all",
Packit 135353
                                 help = "Enable the specified test type "
Packit 135353
                                 "on ALL vmbus devices.")
Packit 135353
        state_group.add_argument("-e", "--enable_single",
Packit 135353
                                 action = "store_const",
Packit 135353
                                 const = "enable_single",
Packit 135353
                                 help = "Enable the specified test type on a "
Packit 135353
                                 "SINGLE vmbus device.")
Packit 135353
        parser_delay = subparsers.add_parser("delay",
Packit 135353
                        parents = [state_parser, path_parser],
Packit 135353
                        help = "Delay the ring buffer interrupt or the "
Packit 135353
                        "ring buffer message reads in microseconds.",
Packit 135353
                        prog = "vmbus_testing",
Packit 135353
                        usage = "%(prog)s [-h]\n"
Packit 135353
                        "%(prog)s -E -t [value] [value]\n"
Packit 135353
                        "%(prog)s -e -t [value] [value] -p",
Packit 135353
                        description = "Delay the ring buffer interrupt for "
Packit 135353
                        "vmbus devices, or delay the ring buffer message "
Packit 135353
                        "reads for vmbus devices (both in microseconds). This "
Packit 135353
                        "is only on the host to guest channel.")
Packit 135353
        parser_delay.add_argument("-t", "--delay_time", metavar = "", nargs = 2,
Packit 135353
                        type = check_range, default =[0,0], required = (True),
Packit 135353
                        help = "Set [buffer] & [message] delay time. "
Packit 135353
                        "Value constraints: -1 == value "
Packit 135353
                        "or 0 < value <= 1000.\n"
Packit 135353
                        "Use -1 to keep the previous value for that delay "
Packit 135353
                        "type, or a value > 0 <= 1000 to change the delay "
Packit 135353
                        "time.")
Packit 135353
        parser_dis_all = subparsers.add_parser("disable_all",
Packit 135353
                        aliases = ['D'], prog = "vmbus_testing",
Packit 135353
                        usage = "%(prog)s [disable_all | D] -h\n"
Packit 135353
                        "%(prog)s [disable_all | D]\n",
Packit 135353
                        help = "Disable ALL testing on ALL vmbus devices.",
Packit 135353
                        description = "Disable ALL testing on ALL vmbus "
Packit 135353
                        "devices.")
Packit 135353
        parser_dis_single = subparsers.add_parser("disable_single",
Packit 135353
                        aliases = ['d'],
Packit 135353
                        parents = [path_parser], prog = "vmbus_testing",
Packit 135353
                        usage = "%(prog)s [disable_single | d] -h\n"
Packit 135353
                        "%(prog)s [disable_single | d] -p\n",
Packit 135353
                        help = "Disable ALL testing on a SINGLE vmbus device.",
Packit 135353
                        description = "Disable ALL testing on a SINGLE vmbus "
Packit 135353
                        "device.")
Packit 135353
        parser_view_all = subparsers.add_parser("view_all", aliases = ['V'],
Packit 135353
                        help = "View the test state for ALL vmbus devices.",
Packit 135353
                        prog = "vmbus_testing",
Packit 135353
                        usage = "%(prog)s [view_all | V] -h\n"
Packit 135353
                        "%(prog)s [view_all | V]\n",
Packit 135353
                        description = "This shows the test state for ALL the "
Packit 135353
                        "vmbus devices.")
Packit 135353
        parser_view_single = subparsers.add_parser("view_single",
Packit 135353
                        aliases = ['v'],parents = [path_parser],
Packit 135353
                        help = "View the test values for a SINGLE vmbus "
Packit 135353
                        "device.",
Packit 135353
                        description = "This shows the test values for a SINGLE "
Packit 135353
                        "vmbus device.", prog = "vmbus_testing",
Packit 135353
                        usage = "%(prog)s [view_single | v] -h\n"
Packit 135353
                        "%(prog)s [view_single | v] -p")
Packit 135353
Packit 135353
        return  parser.parse_args()
Packit 135353
Packit 135353
# value checking for range checking input in parser
Packit 135353
def check_range(arg1):
Packit 135353
Packit 135353
        try:
Packit 135353
                val = int(arg1)
Packit 135353
        except ValueError as err:
Packit 135353
                raise argparse.ArgumentTypeError(str(err))
Packit 135353
        if val < -1 or val > 1000:
Packit 135353
                message = ("\n\nvalue must be -1 or  0 < value <= 1000. "
Packit 135353
                           "Value program received: {}\n").format(val)
Packit 135353
                raise argparse.ArgumentTypeError(message)
Packit 135353
        return val
Packit 135353
Packit 135353
if __name__ == "__main__":
Packit 135353
        main()