| |
| """ Regression tests for Pacemaker's fencer |
| """ |
| |
| |
| from __future__ import print_function, unicode_literals, absolute_import, division |
| |
| __copyright__ = "Copyright 2012-2019 the Pacemaker project contributors" |
| __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY" |
| |
| import io |
| import os |
| import re |
| import sys |
| import subprocess |
| import shlex |
| import time |
| import tempfile |
| import signal |
| |
| |
| |
| BUILD_DIR = "@abs_top_builddir@" |
| SCHEMA_DIR = "@CRM_SCHEMA_DIRECTORY@" |
| TEST_DIR = sys.path[0] |
| |
| AUTOGEN_COROSYNC_TEMPLATE = """ |
| totem { |
| version: 2 |
| cluster_name: cts-fencing |
| crypto_cipher: none |
| crypto_hash: none |
| transport: udp |
| } |
| |
| nodelist { |
| node { |
| nodeid: 1 |
| name: %s |
| ring0_addr: 127.0.0.1 |
| } |
| } |
| |
| logging { |
| debug: off |
| to_syslog: no |
| to_stderr: no |
| to_logfile: yes |
| logfile: %s |
| } |
| """ |
| |
| |
| class CrmExit(object): |
| OK = 0 |
| ERROR = 1 |
| INVALID_PARAM = 2 |
| UNIMPLEMENT_FEATURE = 3 |
| INSUFFICIENT_PRIV = 4 |
| NOT_INSTALLED = 5 |
| NOT_CONFIGURED = 6 |
| NOT_RUNNING = 7 |
| USAGE = 64 |
| DATAERR = 65 |
| NOINPUT = 66 |
| NOUSER = 67 |
| NOHOST = 68 |
| UNAVAILABLE = 69 |
| SOFTWARE = 70 |
| OSERR = 71 |
| OSFILE = 72 |
| CANTCREAT = 73 |
| IOERR = 74 |
| TEMPFAIL = 75 |
| PROTOCOL = 76 |
| NOPERM = 77 |
| CONFIG = 78 |
| FATAL = 100 |
| PANIC = 101 |
| DISCONNECT = 102 |
| SOLO = 103 |
| DIGEST = 104 |
| NOSUCH = 105 |
| QUORUM = 106 |
| UNSAFE = 107 |
| EXISTS = 108 |
| MULTIPLE = 109 |
| OLD = 110 |
| TIMEOUT = 124 |
| MAX = 255 |
| |
| |
| def update_path(): |
| """ Set the PATH environment variable appropriately for the tests """ |
| |
| new_path = os.environ['PATH'] |
| if os.path.exists("%s/cts-fencing.in" % TEST_DIR): |
| print("Running tests from the source tree: %s (%s)" % (BUILD_DIR, TEST_DIR)) |
| |
| new_path = "%s/daemons/fenced:%s" % (BUILD_DIR, new_path) |
| new_path = "%s/tools:%s" % (BUILD_DIR, new_path) |
| new_path = "%s/cts:%s" % (BUILD_DIR, new_path) |
| |
| else: |
| print("Running tests from the install tree: @CRM_DAEMON_DIR@ (not %s)" % TEST_DIR) |
| |
| new_path = "@CRM_DAEMON_DIR@:%s" % (new_path) |
| |
| print('Using PATH="{}"'.format(new_path)) |
| os.environ['PATH'] = new_path |
| |
| |
| def find_validator(rng_file): |
| if os.access("/usr/bin/xmllint", os.X_OK): |
| return ["xmllint", "--relaxng", rng_file, "-"] |
| else: |
| return None |
| |
| |
| def rng_directory(): |
| if "PCMK_schema_directory" in os.environ: |
| return os.environ["PCMK_schema_directory"] |
| elif os.path.exists("%s/cts-fencing.in" % TEST_DIR): |
| return "xml" |
| else: |
| return SCHEMA_DIR |
| |
| |
| def pipe_communicate(pipes, stdout=True, stderr=False, stdin=None): |
| """ Wrapper to get text output from pipes regardless of Python version """ |
| |
| output = "" |
| |
| if stdin: |
| if sys.version_info < (3,): |
| pipe_outputs = pipes.communicate(input=stdin) |
| else: |
| pipe_outputs = pipes.communicate(input=stdin.encode()) |
| else: |
| pipe_outputs = pipes.communicate() |
| |
| if sys.version_info < (3,): |
| if stdout: |
| output = output + pipe_outputs[0] |
| if stderr: |
| output = output + pipe_outputs[1] |
| else: |
| if stdout: |
| output = output + pipe_outputs[0].decode(sys.stdout.encoding) |
| if stderr: |
| output = output + pipe_outputs[1].decode(sys.stderr.encoding) |
| return output |
| |
| |
| def output_from_command(command): |
| """ Execute command and return its standard output """ |
| |
| test = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE) |
| test.wait() |
| return pipe_communicate(test).split("\n") |
| |
| |
| def localname(): |
| """ Return the uname of the local host """ |
| |
| our_uname = output_from_command("uname -n") |
| if our_uname: |
| our_uname = our_uname[0] |
| else: |
| our_uname = "localhost" |
| return our_uname |
| |
| |
| def killall(process): |
| """ Kill all instances of a process """ |
| |
| cmd = shlex.split("killall -9 -q %s" % process) |
| test = subprocess.Popen(cmd, stdout=subprocess.PIPE) |
| test.wait() |
| |
| |
| class TestError(Exception): |
| """ Base class for exceptions in this module """ |
| pass |
| |
| |
| class ExitCodeError(TestError): |
| """ Exception raised when command exit status is unexpected """ |
| |
| def __init__(self, exit_code): |
| self.exit_code = exit_code |
| |
| def __str__(self): |
| return repr(self.exit_code) |
| |
| |
| class OutputNotFoundError(TestError): |
| """ Exception raised when command output does not contain wanted string """ |
| |
| def __init__(self, output): |
| self.output = output |
| |
| def __str__(self): |
| return repr(self.output) |
| |
| |
| class OutputFoundError(TestError): |
| """ Exception raised when command output contains unwanted string """ |
| |
| def __init__(self, output): |
| self.output = output |
| |
| def __str__(self): |
| return repr(self.output) |
| |
| |
| class XmlValidationError(TestError): |
| """ Exception raised when xmllint fails """ |
| |
| def __init__(self, output): |
| self.output = output |
| |
| def __str__(self): |
| return repr(self.output) |
| |
| |
| class Test(object): |
| """ Executor for a single test """ |
| |
| def __init__(self, name, description, verbose=0, with_cpg=0): |
| self.name = name |
| self.description = description |
| self.cmds = [] |
| self.verbose = verbose |
| |
| self.result_txt = "" |
| self.cmd_tool_output = "" |
| self.result_exitcode = CrmExit.OK |
| |
| if with_cpg: |
| self.stonith_options = "-c" |
| self.enable_corosync = 1 |
| else: |
| self.stonith_options = "-s" |
| self.enable_corosync = 0 |
| |
| self.stonith_process = None |
| self.stonith_output = "" |
| self.stonith_patterns = [] |
| self.negative_stonith_patterns = [] |
| |
| self.executed = 0 |
| |
| def __new_cmd(self, cmd, args, exitcode, stdout_match="", no_wait=0, stdout_negative_match="", kill=None, validate=True): |
| """ Add a command to be executed as part of this test """ |
| |
| self.cmds.append( |
| { |
| "cmd" : cmd, |
| "kill" : kill, |
| "args" : args, |
| "expected_exitcode" : exitcode, |
| "stdout_match" : stdout_match, |
| "stdout_negative_match" : stdout_negative_match, |
| "no_wait" : no_wait, |
| "validate" : validate, |
| } |
| ) |
| |
| def start_environment(self): |
| """ Prepare the host for executing a test """ |
| |
| |
| killall("pacemakerd") |
| killall("pacemaker-fenced") |
| |
| if self.verbose: |
| self.stonith_options = self.stonith_options + " -V" |
| print("Starting pacemaker-fenced with %s" % self.stonith_options) |
| |
| if os.path.exists("/tmp/stonith-regression.log"): |
| os.remove('/tmp/stonith-regression.log') |
| |
| cmd = "pacemaker-fenced %s -l /tmp/stonith-regression.log" % self.stonith_options |
| self.stonith_process = subprocess.Popen(shlex.split(cmd)) |
| |
| time.sleep(1) |
| |
| def clean_environment(self): |
| """ Clean up the host after executing a test """ |
| |
| if self.stonith_process: |
| if self.stonith_process.poll() == None: |
| self.stonith_process.terminate() |
| self.stonith_process.wait() |
| else: |
| return_code = { |
| getattr(signal, _signame): _signame |
| for _signame in dir(signal) |
| if _signame.startswith('SIG') and not _signame.startswith("SIG_") |
| }.get(-self.stonith_process.returncode, "RET=%d" % (self.stonith_process.returncode)) |
| msg = "FAILURE - '%s' failed. pacemaker-fenced abnormally exited during test (%s)." |
| self.result_txt = msg % (self.name, return_code) |
| self.result_exitcode = CrmExit.ERROR |
| |
| self.stonith_output = "" |
| self.stonith_process = None |
| |
| |
| |
| |
| |
| logfile = io.open('/tmp/stonith-regression.log', 'rt', encoding = "ISO-8859-1") |
| for line in logfile.readlines(): |
| self.stonith_output = self.stonith_output + line |
| |
| if self.verbose: |
| print("Daemon Output Start") |
| print(self.stonith_output) |
| print("Daemon Output End") |
| os.remove('/tmp/stonith-regression.log') |
| |
| def add_stonith_log_pattern(self, pattern): |
| """ Add a log pattern to expect from this test """ |
| |
| self.stonith_patterns.append(pattern) |
| |
| def add_stonith_neg_log_pattern(self, pattern): |
| """ Add a log pattern that should not occur with this test """ |
| |
| self.negative_stonith_patterns.append(pattern) |
| |
| def add_cmd(self, cmd, args, validate=True): |
| """ Add a simple command to be executed as part of this test """ |
| |
| self.__new_cmd(cmd, args, CrmExit.OK, "", validate=validate) |
| |
| def add_cmd_no_wait(self, cmd, args): |
| """ Add a simple command to be executed (without waiting) as part of this test """ |
| |
| self.__new_cmd(cmd, args, CrmExit.OK, "", 1) |
| |
| def add_cmd_check_stdout(self, cmd, args, match, no_match=""): |
| """ Add a simple command with expected output to be executed as part of this test """ |
| |
| self.__new_cmd(cmd, args, CrmExit.OK, match, 0, no_match) |
| |
| def add_expected_fail_cmd(self, cmd, args, exitcode=CrmExit.ERROR): |
| """ Add a command to be executed as part of this test and expected to fail """ |
| |
| self.__new_cmd(cmd, args, exitcode, "") |
| |
| def get_exitcode(self): |
| """ Return the exit status of the last test execution """ |
| |
| return self.result_exitcode |
| |
| def print_result(self, filler): |
| """ Print the result of the last test execution """ |
| |
| print("%s%s" % (filler, self.result_txt)) |
| |
| def run_cmd(self, args): |
| """ Execute a command as part of this test """ |
| |
| cmd = shlex.split(args['args']) |
| cmd.insert(0, args['cmd']) |
| |
| if self.verbose: |
| print("\n\nRunning: "+" ".join(cmd)) |
| test = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| |
| if args['kill']: |
| if self.verbose: |
| print("Also running: "+args['kill']) |
| subprocess.Popen(shlex.split(args['kill'])) |
| |
| if args['no_wait'] == 0: |
| test.wait() |
| else: |
| return CrmExit.OK |
| |
| output = pipe_communicate(test, stderr=True) |
| if self.verbose: |
| print(output) |
| |
| if test.returncode != args['expected_exitcode']: |
| raise ExitCodeError(test.returncode) |
| |
| if (args['stdout_match'] != "" and |
| re.search(args['stdout_match'], output) is None): |
| raise OutputNotFoundError(output) |
| |
| if (args['stdout_negative_match'] != "" and |
| re.search(args['stdout_negative_match'], output) is not None): |
| raise OutputFoundError(output) |
| |
| if args['validate']: |
| rng_file = rng_directory() + "/api/api-result.rng" |
| |
| cmd = find_validator(rng_file) |
| if not cmd: |
| return |
| |
| if self.verbose: |
| print("\nRunning: "+" ".join(cmd)) |
| |
| validator = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| output = pipe_communicate(validator, stderr=True, stdin=output) |
| |
| if self.verbose: |
| print(output) |
| |
| if validator.returncode != 0: |
| raise XmlValidationError(output) |
| |
| |
| def count_negative_matches(self, outline): |
| """ Return 1 if a line matches patterns that shouldn't have occurred """ |
| |
| count = 0 |
| for line in self.negative_stonith_patterns: |
| if outline.count(line): |
| count = 1 |
| if self.verbose: |
| print("This pattern should not have matched = '%s" % (line)) |
| return count |
| |
| def match_stonith_patterns(self): |
| """ Check test output for expected patterns """ |
| |
| negative_matches = 0 |
| cur = 0 |
| pats = self.stonith_patterns |
| total_patterns = len(self.stonith_patterns) |
| |
| if len(self.stonith_patterns) == 0 and len(self.negative_stonith_patterns) == 0: |
| return |
| |
| for line in self.stonith_output.split("\n"): |
| negative_matches = negative_matches + self.count_negative_matches(line) |
| if len(pats) == 0: |
| continue |
| cur = -1 |
| for pat in pats: |
| cur = cur + 1 |
| if line.count(pats[cur]): |
| del pats[cur] |
| break |
| |
| if len(pats) > 0 or negative_matches: |
| if self.verbose: |
| for pat in pats: |
| print("Pattern Not Matched = '%s'" % pat) |
| |
| msg = "FAILURE - '%s' failed. %d patterns out of %d not matched. %d negative matches." |
| self.result_txt = msg % (self.name, len(pats), total_patterns, negative_matches) |
| self.result_exitcode = CrmExit.ERROR |
| |
| def set_error(self, step, cmd): |
| """ Record failure of this test """ |
| |
| msg = "FAILURE - '%s' failed at step %d. Command: %s %s" |
| self.result_txt = msg % (self.name, step, cmd['cmd'], cmd['args']) |
| self.result_exitcode = CrmExit.ERROR |
| |
| def run(self): |
| """ Execute this test. """ |
| |
| res = 0 |
| i = 1 |
| self.start_environment() |
| |
| if self.verbose: |
| print("\n--- START TEST - %s" % self.name) |
| |
| self.result_txt = "SUCCESS - '%s'" % (self.name) |
| self.result_exitcode = CrmExit.OK |
| for cmd in self.cmds: |
| try: |
| self.run_cmd(cmd) |
| except ExitCodeError as e: |
| print("Step %d FAILED - command returned %s, expected %d" % (i, e, cmd['expected_exitcode'])) |
| self.set_error(i, cmd); |
| break |
| except OutputNotFoundError as e: |
| print("Step %d FAILED - '%s' was not found in command output: %s" % (i, cmd['stdout_match'], e)) |
| self.set_error(i, cmd); |
| break |
| except OutputFoundError as e: |
| print("Step %d FAILED - '%s' was found in command output: %s" % (i, cmd['stdout_negative_match'], e)) |
| self.set_error(i, cmd); |
| break |
| if self.verbose: |
| print("Step %d SUCCESS" % (i)) |
| i = i + 1 |
| self.clean_environment() |
| |
| if self.result_exitcode == CrmExit.OK: |
| self.match_stonith_patterns() |
| |
| print(self.result_txt) |
| if self.verbose: |
| print("--- END TEST - %s\n" % self.name) |
| |
| self.executed = 1 |
| return res |
| |
| class Tests(object): |
| """ Collection of all fencing regression tests """ |
| |
| def __init__(self, verbose=0): |
| self.tests = [] |
| self.verbose = verbose |
| self.autogen_corosync_cfg = not os.path.exists("/etc/corosync/corosync.conf") |
| |
| def new_test(self, name, description, with_cpg=0): |
| """ Create a named test """ |
| |
| test = Test(name, description, self.verbose, with_cpg) |
| self.tests.append(test) |
| return test |
| |
| def print_list(self): |
| """ List all registered tests """ |
| |
| print("\n==== %d TESTS FOUND ====" % (len(self.tests))) |
| print("%35s - %s" % ("TEST NAME", "TEST DESCRIPTION")) |
| print("%35s - %s" % ("--------------------", "--------------------")) |
| for test in self.tests: |
| print("%35s - %s" % (test.name, test.description)) |
| print("==== END OF LIST ====\n") |
| |
| def start_corosync(self): |
| """ Start the corosync process """ |
| |
| if self.verbose: |
| print("Starting corosync") |
| |
| test = subprocess.Popen("corosync", stdout=subprocess.PIPE) |
| test.wait() |
| time.sleep(10) |
| |
| def run_single(self, name): |
| """ Run a single named test """ |
| |
| for test in self.tests: |
| if test.name == name: |
| test.run() |
| break |
| |
| def run_tests_matching(self, pattern): |
| """ Run all tests whose name matches a pattern """ |
| |
| for test in self.tests: |
| if test.name.count(pattern) != 0: |
| test.run() |
| |
| def run_cpg_only(self): |
| """ Run all corosync-enabled tests """ |
| |
| for test in self.tests: |
| if test.enable_corosync: |
| test.run() |
| |
| def run_no_cpg(self): |
| """ Run all standalone tests """ |
| |
| for test in self.tests: |
| if not test.enable_corosync: |
| test.run() |
| |
| def run_tests(self): |
| """ Run all tests """ |
| |
| for test in self.tests: |
| test.run() |
| |
| def exit(self): |
| """ Exit (with error status code if any test failed) """ |
| |
| for test in self.tests: |
| if test.executed == 0: |
| continue |
| |
| if test.get_exitcode() != CrmExit.OK: |
| sys.exit(CrmExit.ERROR) |
| |
| sys.exit(CrmExit.OK) |
| |
| def print_results(self): |
| """ Print summary of results of executed tests """ |
| |
| failures = 0 |
| success = 0 |
| print("\n\n======= FINAL RESULTS ==========") |
| print("\n--- FAILURE RESULTS:") |
| for test in self.tests: |
| if test.executed == 0: |
| continue |
| |
| if test.get_exitcode() != CrmExit.OK: |
| failures = failures + 1 |
| test.print_result(" ") |
| else: |
| success = success + 1 |
| |
| if failures == 0: |
| print(" None") |
| |
| print("\n--- TOTALS\n Pass:%d\n Fail:%d\n" % (success, failures)) |
| |
| def build_api_sanity_tests(self): |
| """ Register tests to verify basic API usage """ |
| |
| verbose_arg = "" |
| if self.verbose: |
| verbose_arg = "-V" |
| |
| test = self.new_test("standalone_low_level_api_test", "Sanity test client api in standalone mode.") |
| test.add_cmd("cts-fence-helper", "-t %s" % (verbose_arg), validate=False) |
| |
| test = self.new_test("cpg_low_level_api_test", "Sanity test client api using mainloop and cpg.", 1) |
| test.add_cmd("cts-fence-helper", "-m %s" % (verbose_arg), validate=False) |
| |
| def build_custom_timeout_tests(self): |
| """ Register tests to verify custom timeout usage """ |
| |
| |
| test = self.new_test("cpg_custom_timeout_1", |
| "Verify per device timeouts work as expected without using topology.", 1) |
| test.add_cmd('stonith_admin', |
| '--output-as=xml -R false1 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=node1 node2 node3"') |
| test.add_cmd('stonith_admin', |
| '--output-as=xml -R true1 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node3" -o "pcmk_off_timeout=1"') |
| test.add_cmd('stonith_admin', |
| '--output-as=xml -R false2 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=node3" -o "pcmk_off_timeout=4"') |
| test.add_cmd("stonith_admin", "--output-as=xml -F node3 -t 5") |
| |
| test.add_stonith_log_pattern("Total timeout set to 10") |
| |
| |
| test = self.new_test("cpg_custom_timeout_2", |
| "Verify per device timeouts work as expected _WITH_ topology.", 1) |
| test.add_cmd('stonith_admin', |
| '--output-as=xml -R false1 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=node1 node2 node3"') |
| test.add_cmd('stonith_admin', |
| '--output-as=xml -R true1 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node3" -o "pcmk_off_timeout=1"') |
| test.add_cmd('stonith_admin', |
| '--output-as=xml -R false2 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=node3" -o "pcmk_off_timeout=4000"') |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 1 -v false1") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 2 -v true1") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 3 -v false2") |
| test.add_cmd("stonith_admin", "--output-as=xml -F node3 -t 5") |
| |
| test.add_stonith_log_pattern("Total timeout set to 4006") |
| |
| def build_fence_merge_tests(self): |
| """ Register tests to verify when fence operations should be merged """ |
| |
| |
| test = self.new_test("cpg_custom_merge_single", |
| "Verify overlapping identical fencing operations are merged, no fencing levels used.", 1) |
| test.add_cmd("stonith_admin", "--output-as=xml -R false1 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node3\"") |
| test.add_cmd("stonith_admin", "--output-as=xml -R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node3\" ") |
| test.add_cmd("stonith_admin", "--output-as=xml -R false2 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node3\"") |
| test.add_cmd_no_wait("stonith_admin", "--output-as=xml -F node3 -t 10") |
| test.add_cmd("stonith_admin", "--output-as=xml -F node3 -t 10") |
| |
| test.add_stonith_log_pattern("Merging stonith action 'off' targeting node3 originating from client") |
| |
| test.add_stonith_log_pattern("Operation 'off' targeting node3 on") |
| test.add_stonith_log_pattern("Operation 'off' targeting node3 on") |
| |
| |
| test = self.new_test("cpg_custom_merge_multiple", |
| "Verify multiple overlapping identical fencing operations are merged", 1) |
| test.add_cmd("stonith_admin", "--output-as=xml -R false1 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node3\"") |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true1 -a fence_dummy -o \"mode=pass\" -o \"delay=2\" -o \"pcmk_host_list=node3\" ") |
| test.add_cmd("stonith_admin", "--output-as=xml -R false2 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node3\"") |
| test.add_cmd_no_wait("stonith_admin", "--output-as=xml -F node3 -t 10") |
| test.add_cmd_no_wait("stonith_admin", "--output-as=xml -F node3 -t 10") |
| test.add_cmd_no_wait("stonith_admin", "--output-as=xml -F node3 -t 10") |
| test.add_cmd_no_wait("stonith_admin", "--output-as=xml -F node3 -t 10") |
| test.add_cmd("stonith_admin", "--output-as=xml -F node3 -t 10") |
| |
| test.add_stonith_log_pattern("Merging stonith action 'off' targeting node3 originating from client") |
| test.add_stonith_log_pattern("Merging stonith action 'off' targeting node3 originating from client") |
| test.add_stonith_log_pattern("Merging stonith action 'off' targeting node3 originating from client") |
| test.add_stonith_log_pattern("Merging stonith action 'off' targeting node3 originating from client") |
| |
| test.add_stonith_log_pattern("Operation 'off' targeting node3 on") |
| test.add_stonith_log_pattern("Operation 'off' targeting node3 on") |
| test.add_stonith_log_pattern("Operation 'off' targeting node3 on") |
| test.add_stonith_log_pattern("Operation 'off' targeting node3 on") |
| test.add_stonith_log_pattern("Operation 'off' targeting node3 on") |
| |
| |
| test = self.new_test("cpg_custom_merge_with_topology", |
| "Verify multiple overlapping identical fencing operations are merged with fencing levels.", |
| 1) |
| test.add_cmd("stonith_admin", "--output-as=xml -R false1 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node3\"") |
| test.add_cmd("stonith_admin", "--output-as=xml -R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node3\" ") |
| test.add_cmd("stonith_admin", "--output-as=xml -R false2 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node3\"") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 1 -v false1") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 1 -v false2") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 2 -v true1") |
| test.add_cmd_no_wait("stonith_admin", "--output-as=xml -F node3 -t 10") |
| test.add_cmd_no_wait("stonith_admin", "--output-as=xml -F node3 -t 10") |
| test.add_cmd_no_wait("stonith_admin", "--output-as=xml -F node3 -t 10") |
| test.add_cmd_no_wait("stonith_admin", "--output-as=xml -F node3 -t 10") |
| test.add_cmd("stonith_admin", "--output-as=xml -F node3 -t 10") |
| |
| test.add_stonith_log_pattern("Merging stonith action 'off' targeting node3 originating from client") |
| test.add_stonith_log_pattern("Merging stonith action 'off' targeting node3 originating from client") |
| test.add_stonith_log_pattern("Merging stonith action 'off' targeting node3 originating from client") |
| test.add_stonith_log_pattern("Merging stonith action 'off' targeting node3 originating from client") |
| |
| test.add_stonith_log_pattern("Operation 'off' targeting node3 on") |
| test.add_stonith_log_pattern("Operation 'off' targeting node3 on") |
| test.add_stonith_log_pattern("Operation 'off' targeting node3 on") |
| test.add_stonith_log_pattern("Operation 'off' targeting node3 on") |
| test.add_stonith_log_pattern("Operation 'off' targeting node3 on") |
| |
| def build_fence_no_merge_tests(self): |
| """ Register tests to verify when fence operations should not be merged """ |
| |
| test = self.new_test("cpg_custom_no_merge", |
| "Verify differing fencing operations are not merged", 1) |
| test.add_cmd("stonith_admin", "--output-as=xml -R false1 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node3 node2\"") |
| test.add_cmd("stonith_admin", "--output-as=xml -R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node3 node2\" ") |
| test.add_cmd("stonith_admin", "--output-as=xml -R false2 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node3 node2\"") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 1 -v false1") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 1 -v false2") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 2 -v true1") |
| test.add_cmd_no_wait("stonith_admin", "--output-as=xml -F node2 -t 10") |
| test.add_cmd("stonith_admin", "--output-as=xml -F node3 -t 10") |
| test.add_stonith_neg_log_pattern("Merging stonith action 'off' targeting node3 originating from client") |
| |
| def build_standalone_tests(self): |
| """ Register a grab bag of tests that can be executed in standalone or corosync mode """ |
| |
| test_types = [ |
| { |
| "prefix" : "standalone", |
| "use_cpg" : 0, |
| }, |
| { |
| "prefix" : "cpg", |
| "use_cpg" : 1, |
| }, |
| ] |
| |
| |
| for test_type in test_types: |
| test = self.new_test("%s_fence_multi_device_failure" % test_type["prefix"], |
| "Verify that all devices timeout, a fencing failure is returned.", |
| test_type["use_cpg"]) |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R false1 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"") |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R false2 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"") |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R false3 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"") |
| if test_type["use_cpg"] == 1: |
| test.add_expected_fail_cmd("stonith_admin", "--output-as=xml -F node3 -t 2", CrmExit.TIMEOUT) |
| test.add_stonith_log_pattern("Total timeout set to 6") |
| else: |
| test.add_expected_fail_cmd("stonith_admin", "--output-as=xml -F node3 -t 2", CrmExit.ERROR) |
| |
| test.add_stonith_log_pattern("for host 'node3' with device 'false1' returned: ") |
| test.add_stonith_log_pattern("for host 'node3' with device 'false2' returned: ") |
| test.add_stonith_log_pattern("for host 'node3' with device 'false3' returned: ") |
| |
| |
| for test_type in test_types: |
| test = self.new_test("%s_fence_device_failure_rollover" % test_type["prefix"], |
| "Verify that when one fence device fails for a node, the others are tried.", |
| test_type["use_cpg"]) |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R false1 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"") |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"") |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R false2 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"") |
| test.add_cmd("stonith_admin", "--output-as=xml -F node3 -t 5") |
| |
| if test_type["use_cpg"] == 1: |
| test.add_stonith_log_pattern("Total timeout set to 15") |
| |
| |
| for test_type in test_types: |
| test = self.new_test("%s_fence_missing_agent" % test_type["prefix"], |
| "Verify proper error-handling when using a non-existent fence-agent.", |
| test_type["use_cpg"]) |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true1 -a fence_missing -o \"mode=pass\" -o \"pcmk_host_list=node3\"") |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true2 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node2\"") |
| |
| test.add_expected_fail_cmd("stonith_admin", "--output-as=xml -F node3 -t 5", CrmExit.ERROR) |
| test.add_cmd("stonith_admin", "--output-as=xml -F node2 -t 5") |
| |
| |
| for test_type in test_types: |
| if test_type["use_cpg"] == 0: |
| continue |
| |
| test = self.new_test("%s_topology_simple" % test_type["prefix"], |
| "Verify all fencing devices at a level are used.", test_type["use_cpg"]) |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 1 -v true") |
| test.add_cmd("stonith_admin", "--output-as=xml -F node3 -t 5") |
| |
| test.add_stonith_log_pattern("Total timeout set to 5") |
| test.add_stonith_log_pattern("for host 'node3' with device 'true' returned: 0") |
| |
| |
| |
| for test_type in test_types: |
| if test_type["use_cpg"] == 0: |
| continue |
| |
| test = self.new_test("%s_topology_add_remove" % test_type["prefix"], |
| "Verify fencing occurrs after all topology levels are removed", |
| test_type["use_cpg"]) |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 1 -v true") |
| test.add_cmd("stonith_admin", "--output-as=xml -d node3 -i 1") |
| test.add_cmd("stonith_admin", "--output-as=xml -F node3 -t 5") |
| |
| test.add_stonith_log_pattern("Total timeout set to 5") |
| test.add_stonith_log_pattern("for host 'node3' with device 'true' returned: 0") |
| |
| |
| for test_type in test_types: |
| if test_type["use_cpg"] == 0: |
| continue |
| |
| test = self.new_test("%s_topology_device_fails" % test_type["prefix"], |
| "Verify if one device in a level fails, the other is tried.", |
| test_type["use_cpg"]) |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R false -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"") |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 1 -v false") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 2 -v true") |
| test.add_cmd("stonith_admin", "--output-as=xml -F node3 -t 20") |
| |
| test.add_stonith_log_pattern("Total timeout set to 40") |
| test.add_stonith_log_pattern("for host 'node3' with device 'false' returned: -201") |
| test.add_stonith_log_pattern("for host 'node3' with device 'true' returned: 0") |
| |
| |
| for test_type in test_types: |
| if test_type["use_cpg"] == 0: |
| continue |
| |
| test = self.new_test("%s_topology_multi_level_fails" % test_type["prefix"], |
| "Verify if one level fails, the next leve is tried.", |
| test_type["use_cpg"]) |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"") |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true2 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"") |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true3 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"") |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true4 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"") |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R false1 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"") |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R false2 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"") |
| |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 1 -v false1") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 1 -v true1") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 2 -v true2") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 2 -v false2") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 3 -v true3") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 3 -v true4") |
| |
| test.add_cmd("stonith_admin", "--output-as=xml -F node3 -t 3") |
| |
| test.add_stonith_log_pattern("Total timeout set to 18") |
| test.add_stonith_log_pattern("for host 'node3' with device 'false1' returned: -201") |
| test.add_stonith_log_pattern("for host 'node3' with device 'false2' returned: -201") |
| test.add_stonith_log_pattern("for host 'node3' with device 'true3' returned: 0") |
| test.add_stonith_log_pattern("for host 'node3' with device 'true4' returned: 0") |
| |
| |
| |
| for test_type in test_types: |
| if test_type["use_cpg"] == 0: |
| continue |
| |
| test = self.new_test("%s_topology_missing_devices" % test_type["prefix"], |
| "Verify topology can continue with missing devices.", |
| test_type["use_cpg"]) |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true2 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"") |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true3 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"") |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true4 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"") |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R false2 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"") |
| |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 1 -v false1") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 1 -v true1") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 2 -v true2") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 2 -v false2") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 3 -v true3") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 3 -v true4") |
| |
| test.add_cmd("stonith_admin", "--output-as=xml -F node3 -t 5") |
| |
| |
| for test_type in test_types: |
| if test_type["use_cpg"] == 0: |
| continue |
| |
| test = self.new_test("%s_topology_level_removal" % test_type["prefix"], |
| "Verify level removal works.", test_type["use_cpg"]) |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"") |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true2 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"") |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true3 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"") |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true4 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"") |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R false1 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"") |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R false2 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\"") |
| |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 1 -v false1") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 1 -v true1") |
| |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 2 -v true2") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 2 -v false2") |
| |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 3 -v true3") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 3 -v true4") |
| |
| |
| test.add_cmd("stonith_admin", "--output-as=xml -d node3 -i 2") |
| |
| test.add_cmd("stonith_admin", "--output-as=xml -F node3 -t 20") |
| |
| test.add_stonith_log_pattern("Total timeout set to 8") |
| test.add_stonith_log_pattern("for host 'node3' with device 'false1' returned: -201") |
| test.add_stonith_neg_log_pattern("for host 'node3' with device 'false2' returned: ") |
| test.add_stonith_log_pattern("for host 'node3' with device 'true3' returned: 0") |
| test.add_stonith_log_pattern("for host 'node3' with device 'true4' returned: 0") |
| |
| |
| for test_type in test_types: |
| if test_type["use_cpg"] == 0: |
| continue |
| |
| test = self.new_test("%s_topology_level_pattern" % test_type["prefix"], |
| "Verify targeting topology by node name pattern works.", |
| test_type["use_cpg"]) |
| test.add_cmd("stonith_admin", |
| """--output-as=xml -R true -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node1 node2 node3" """) |
| test.add_cmd("stonith_admin", """--output-as=xml -r '@node.*' -i 1 -v true""") |
| test.add_cmd("stonith_admin", "--output-as=xml -F node3 -t 5") |
| test.add_stonith_log_pattern("for host 'node3' with device 'true' returned: 0") |
| |
| |
| for test_type in test_types: |
| test = self.new_test("%s_host_list_delimiters" % test_type["prefix"], |
| "Verify commas and semicolons can be used as pcmk_host_list delimiters", |
| test_type["use_cpg"]) |
| test.add_cmd("stonith_admin", |
| """--output-as=xml -R true1 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node1,node2,node3" """) |
| test.add_cmd("stonith_admin", |
| """--output-as=xml -R true2 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=pcmk1;pcmk2;pcmk3" """) |
| test.add_cmd("stonith_admin", "stonith_admin --output-as=xml -F node2 -t 5") |
| test.add_cmd("stonith_admin", "stonith_admin --output-as=xml -F pcmk3 -t 5") |
| test.add_stonith_log_pattern("for host 'node2' with device 'true1' returned: 0") |
| test.add_stonith_log_pattern("for host 'pcmk3' with device 'true2' returned: 0") |
| |
| |
| for test_type in test_types: |
| test = self.new_test("%s_list_devices" % test_type["prefix"], |
| "Verify list of devices that can fence a node is correct", |
| test_type["use_cpg"]) |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node3\"") |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true2 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"") |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true3 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"") |
| test.add_cmd_check_stdout("stonith_admin", "--output-as=xml -l node1 -V", "true2", "true1") |
| test.add_cmd_check_stdout("stonith_admin", "--output-as=xml -l node1 -V", "true3", "true1") |
| |
| |
| for test_type in test_types: |
| test = self.new_test("%s_monitor" % test_type["prefix"], |
| "Verify device is reachable", test_type["use_cpg"]) |
| test.add_cmd("stonith_admin", "--output-as=xml -R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node3\"") |
| test.add_cmd("stonith_admin", "--output-as=xml -R false1 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node3\"") |
| |
| test.add_cmd("stonith_admin", "--output-as=xml -Q true1") |
| test.add_cmd("stonith_admin", "--output-as=xml -Q false1") |
| test.add_expected_fail_cmd("stonith_admin", "--output-as=xml -Q true2", CrmExit.ERROR) |
| |
| |
| for test_type in test_types: |
| test = self.new_test("%s_monitor_timeout" % test_type["prefix"], |
| "Verify monitor uses duration of timeout period given.", |
| test_type["use_cpg"]) |
| test.add_cmd("stonith_admin", |
| '--output-as=xml -R true1 -a fence_dummy -o "mode=fail" -o "monitor_mode=fail" -o "pcmk_host_list=node3"') |
| test.add_expected_fail_cmd("stonith_admin", "--output-as=xml -Q true1 -t 5", CrmExit.ERROR) |
| test.add_stonith_log_pattern("Attempt 2 to execute") |
| |
| |
| for test_type in test_types: |
| test = self.new_test("%s_monitor_timeout_max_retries" % test_type["prefix"], |
| "Verify monitor retries until max retry value or timeout is hit.", |
| test_type["use_cpg"]) |
| test.add_cmd("stonith_admin", |
| '--output-as=xml -R true1 -a fence_dummy -o "mode=fail" -o "monitor_mode=fail" -o "pcmk_host_list=node3"') |
| test.add_expected_fail_cmd("stonith_admin", "--output-as=xml -Q true1 -t 15", CrmExit.ERROR) |
| test.add_stonith_log_pattern("Attempted to execute agent fence_dummy (list) the maximum number of times") |
| |
| |
| for test_type in test_types: |
| test = self.new_test("%s_register" % test_type["prefix"], |
| "Verify devices can be registered and un-registered", |
| test_type["use_cpg"]) |
| test.add_cmd("stonith_admin", "--output-as=xml -R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node3\"") |
| |
| test.add_cmd("stonith_admin", "--output-as=xml -Q true1") |
| |
| test.add_cmd("stonith_admin", "--output-as=xml -D true1") |
| |
| test.add_expected_fail_cmd("stonith_admin", "--output-as=xml -Q true1", CrmExit.ERROR) |
| |
| |
| for test_type in test_types: |
| test = self.new_test("%s_reboot" % test_type["prefix"], |
| "Verify devices can be rebooted", |
| test_type["use_cpg"]) |
| test.add_cmd("stonith_admin", "--output-as=xml -R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node3\"") |
| |
| test.add_cmd("stonith_admin", "--output-as=xml -B node3 -t 5") |
| |
| test.add_cmd("stonith_admin", "--output-as=xml -D true1") |
| |
| test.add_expected_fail_cmd("stonith_admin", "--output-as=xml -Q true1", CrmExit.ERROR) |
| |
| |
| for test_type in test_types: |
| if test_type["use_cpg"] == 0: |
| continue |
| test = self.new_test("%s_fence_history" % test_type["prefix"], |
| "Verify last fencing operation is returned.", |
| test_type["use_cpg"]) |
| test.add_cmd("stonith_admin", "--output-as=xml -R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node3\"") |
| |
| test.add_cmd("stonith_admin", "--output-as=xml -F node3 -t 5 -V") |
| |
| test.add_cmd_check_stdout("stonith_admin", "--output-as=xml -H node3", 'action="off" target="node3" .* status="success"') |
| |
| |
| for test_type in test_types: |
| test = self.new_test("%s_dynamic_list_query" % test_type["prefix"], |
| "Verify dynamic list of fencing devices can be retrieved.", |
| test_type["use_cpg"]) |
| test.add_cmd("stonith_admin", "--output-as=xml -R true1 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1") |
| test.add_cmd("stonith_admin", "--output-as=xml -R true2 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1") |
| test.add_cmd("stonith_admin", "--output-as=xml -R true3 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1") |
| |
| test.add_cmd_check_stdout("stonith_admin", "--output-as=xml -l fake_port_1", 'count="3"') |
| |
| |
| |
| for test_type in test_types: |
| test = self.new_test("%s_fence_dynamic_list_query" % test_type["prefix"], |
| "Verify dynamic list of fencing devices can be retrieved.", |
| test_type["use_cpg"]) |
| test.add_cmd("stonith_admin", "--output-as=xml -R true1 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1") |
| test.add_cmd("stonith_admin", "--output-as=xml -R true2 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1") |
| test.add_cmd("stonith_admin", "--output-as=xml -R true3 -a fence_dummy -o mode=pass -o mock_dynamic_hosts=fake_port_1") |
| |
| test.add_cmd("stonith_admin", "--output-as=xml -F fake_port_1 -t 5 -V") |
| |
| |
| for test_type in test_types: |
| test = self.new_test("%s_status_query" % test_type["prefix"], |
| "Verify dynamic list of fencing devices can be retrieved.", |
| test_type["use_cpg"]) |
| test.add_cmd("stonith_admin", "--output-as=xml -R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_check=status\"") |
| test.add_cmd("stonith_admin", "--output-as=xml -R true2 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_check=status\"") |
| test.add_cmd("stonith_admin", "--output-as=xml -R true3 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_check=status\"") |
| |
| test.add_cmd_check_stdout("stonith_admin", "--output-as=xml -l fake_port_1", 'count="3"') |
| |
| |
| for test_type in test_types: |
| test = self.new_test("%s_no_reboot_support" % test_type["prefix"], |
| "Verify reboot action defaults to off when no reboot action is advertised by agent.", |
| test_type["use_cpg"]) |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true1 -a fence_dummy_no_reboot -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"") |
| test.add_cmd("stonith_admin", "--output-as=xml -B node1 -t 5 -V") |
| test.add_stonith_log_pattern("does not advertise support for 'reboot', performing 'off'") |
| test.add_stonith_log_pattern("with device 'true1' returned: 0 (OK)") |
| |
| |
| for test_type in test_types: |
| test = self.new_test("%s_with_reboot_support" % test_type["prefix"], |
| "Verify reboot action can be used when metadata advertises it.", |
| test_type["use_cpg"]) |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"") |
| test.add_cmd("stonith_admin", "--output-as=xml -B node1 -t 5 -V") |
| test.add_stonith_neg_log_pattern("does not advertise support for 'reboot', performing 'off'") |
| test.add_stonith_log_pattern("with device 'true1' returned: 0 (OK)") |
| |
| |
| |
| for test_type in test_types: |
| if test_type["use_cpg"] == 0: |
| continue |
| |
| test = self.new_test("%s_topology_delay" % test_type["prefix"], |
| "Verify requested fencing delay is applied only for the first device in the first level and pcmk_delay_base is added.", |
| test_type["use_cpg"]) |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\" -o \"pcmk_delay_base=1\"") |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R false1 -a fence_dummy -o \"mode=fail\" -o \"pcmk_host_list=node1 node2 node3\" -o \"pcmk_delay_base=1\"") |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true2 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"") |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true3 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=node1 node2 node3\"") |
| |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 1 -v true1") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 1 -v false1") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 2 -v true2") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node3 -i 2 -v true3") |
| |
| test.add_cmd("stonith_admin", "--output-as=xml -F node3 --delay 1") |
| |
| test.add_stonith_log_pattern("Delaying 'off' action targeting node3 on true1 for 2s (timeout=120s, requested_delay=1s, base=1s, max=1s)") |
| test.add_stonith_log_pattern("Delaying 'off' action targeting node3 on false1 for 1s (timeout=120s, requested_delay=0s, base=1s, max=1s)") |
| test.add_stonith_neg_log_pattern("Delaying 'off' action targeting node3 on true2") |
| test.add_stonith_neg_log_pattern("Delaying 'off' action targeting node3 on true3") |
| |
| def build_nodeid_tests(self): |
| """ Register tests that use a corosync node id """ |
| |
| our_uname = localname() |
| |
| |
| test = self.new_test("cpg_supply_nodeid", |
| "Verify nodeid is given when fence agent has nodeid as parameter", 1) |
| |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=%s\"" % (our_uname)) |
| test.add_cmd("stonith_admin", "--output-as=xml -F %s -t 3" % (our_uname)) |
| test.add_stonith_log_pattern("as nodeid with fence action 'off' targeting %s" % (our_uname)) |
| |
| |
| test = self.new_test("cpg_do_not_supply_nodeid", |
| "Verify nodeid is _NOT_ given when fence agent does not have nodeid as parameter", |
| 1) |
| |
| |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=regr-test\"") |
| test.add_cmd("stonith_admin", "--output-as=xml -F regr-test -t 3") |
| test.add_stonith_neg_log_pattern("For stonith action (off) for victim regr-test, adding nodeid") |
| |
| |
| test = self.new_test("standalone_do_not_supply_nodeid", |
| "Verify nodeid in metadata parameter list doesn't kill standalone mode", |
| 0) |
| |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=%s\"" % (our_uname)) |
| test.add_cmd("stonith_admin", "--output-as=xml -F %s -t 3" % (our_uname)) |
| test.add_stonith_neg_log_pattern("For stonith action (off) for victim %s, adding nodeid" % (our_uname)) |
| |
| def build_unfence_tests(self): |
| """ Register tests that verify unfencing """ |
| |
| our_uname = localname() |
| |
| |
| test = self.new_test("cpg_unfence_required_1", |
| "Verify require unfencing on all devices when automatic=true in agent's metadata", |
| 1) |
| test.add_cmd('stonith_admin', |
| '--output-as=xml -R true1 -a fence_dummy_auto_unfence -o "mode=pass" -o "pcmk_host_list=%s"' % (our_uname)) |
| test.add_cmd('stonith_admin', |
| '--output-as=xml -R true2 -a fence_dummy_auto_unfence -o "mode=pass" -o "pcmk_host_list=%s"' % (our_uname)) |
| test.add_cmd("stonith_admin", "--output-as=xml -U %s -t 3" % (our_uname)) |
| |
| test.add_stonith_log_pattern("with device 'true1' returned: 0 (OK)") |
| test.add_stonith_log_pattern("with device 'true2' returned: 0 (OK)") |
| |
| |
| test = self.new_test("cpg_unfence_required_2", |
| "Verify require unfencing on all devices when automatic=true in agent's metadata", |
| 1) |
| test.add_cmd('stonith_admin', |
| '--output-as=xml -R true1 -a fence_dummy_auto_unfence -o "mode=pass" -o "pcmk_host_list=%s"' % (our_uname)) |
| test.add_cmd('stonith_admin', |
| '--output-as=xml -R true2 -a fence_dummy_auto_unfence -o "mode=fail" -o "pcmk_host_list=%s"' % (our_uname)) |
| test.add_expected_fail_cmd("stonith_admin", "--output-as=xml -U %s -t 6" % (our_uname), CrmExit.ERROR) |
| |
| |
| test = self.new_test("cpg_unfence_required_3", |
| "Verify require unfencing on all devices even when at different topology levels", |
| 1) |
| test.add_cmd('stonith_admin', |
| '--output-as=xml -R true1 -a fence_dummy_auto_unfence -o "mode=pass" -o "pcmk_host_list=%s node3"' % (our_uname)) |
| test.add_cmd('stonith_admin', |
| '--output-as=xml -R true2 -a fence_dummy_auto_unfence -o "mode=pass" -o "pcmk_host_list=%s node3"' % (our_uname)) |
| test.add_cmd("stonith_admin", "--output-as=xml -r %s -i 1 -v true1" % (our_uname)) |
| test.add_cmd("stonith_admin", "--output-as=xml -r %s -i 2 -v true2" % (our_uname)) |
| test.add_cmd("stonith_admin", "--output-as=xml -U %s -t 3" % (our_uname)) |
| test.add_stonith_log_pattern("with device 'true1' returned: 0 (OK)") |
| test.add_stonith_log_pattern("with device 'true2' returned: 0 (OK)") |
| |
| |
| test = self.new_test("cpg_unfence_required_4", |
| "Verify all required devices are executed even with topology levels fail.", |
| 1) |
| test.add_cmd('stonith_admin', |
| '--output-as=xml -R true1 -a fence_dummy_auto_unfence -o "mode=pass" -o "pcmk_host_list=%s node3"' % (our_uname)) |
| test.add_cmd('stonith_admin', |
| '--output-as=xml -R true2 -a fence_dummy_auto_unfence -o "mode=pass" -o "pcmk_host_list=%s node3"' % (our_uname)) |
| test.add_cmd('stonith_admin', |
| '--output-as=xml -R true3 -a fence_dummy_auto_unfence -o "mode=pass" -o "pcmk_host_list=%s node3"' % (our_uname)) |
| test.add_cmd('stonith_admin', |
| '--output-as=xml -R true4 -a fence_dummy_auto_unfence -o "mode=pass" -o "pcmk_host_list=%s node3"' % (our_uname)) |
| test.add_cmd('stonith_admin', |
| '--output-as=xml -R false1 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=%s node3"' % (our_uname)) |
| test.add_cmd('stonith_admin', |
| '--output-as=xml -R false2 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=%s node3"' % (our_uname)) |
| test.add_cmd('stonith_admin', |
| '--output-as=xml -R false3 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=%s node3"' % (our_uname)) |
| test.add_cmd('stonith_admin', |
| '--output-as=xml -R false4 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=%s node3"' % (our_uname)) |
| test.add_cmd("stonith_admin", "--output-as=xml -r %s -i 1 -v true1" % (our_uname)) |
| test.add_cmd("stonith_admin", "--output-as=xml -r %s -i 1 -v false1" % (our_uname)) |
| test.add_cmd("stonith_admin", "--output-as=xml -r %s -i 2 -v false2" % (our_uname)) |
| test.add_cmd("stonith_admin", "--output-as=xml -r %s -i 2 -v true2" % (our_uname)) |
| test.add_cmd("stonith_admin", "--output-as=xml -r %s -i 2 -v false3" % (our_uname)) |
| test.add_cmd("stonith_admin", "--output-as=xml -r %s -i 2 -v true3" % (our_uname)) |
| test.add_cmd("stonith_admin", "--output-as=xml -r %s -i 3 -v false4" % (our_uname)) |
| test.add_cmd("stonith_admin", "--output-as=xml -r %s -i 4 -v true4" % (our_uname)) |
| test.add_cmd("stonith_admin", "--output-as=xml -U %s -t 3" % (our_uname)) |
| test.add_stonith_log_pattern("with device 'true1' returned: 0 (OK)") |
| test.add_stonith_log_pattern("with device 'true2' returned: 0 (OK)") |
| test.add_stonith_log_pattern("with device 'true3' returned: 0 (OK)") |
| test.add_stonith_log_pattern("with device 'true4' returned: 0 (OK)") |
| |
| def build_unfence_on_target_tests(self): |
| """ Register tests that verify unfencing that runs on the target """ |
| |
| our_uname = localname() |
| |
| |
| test = self.new_test("cpg_unfence_on_target_1", |
| "Verify unfencing with on_target = true", 1) |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=%s\"" % (our_uname)) |
| test.add_cmd("stonith_admin", "--output-as=xml -U %s -t 3" % (our_uname)) |
| test.add_stonith_log_pattern("(on) to be executed on the target node") |
| |
| |
| test = self.new_test("cpg_unfence_on_target_2", |
| "Verify failure unfencing with on_target = true", |
| 1) |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=%s node_fake_1234\"" % (our_uname)) |
| test.add_expected_fail_cmd("stonith_admin", "--output-as=xml -U node_fake_1234 -t 3", CrmExit.ERROR) |
| test.add_stonith_log_pattern("(on) to be executed on the target node") |
| |
| |
| test = self.new_test("cpg_unfence_on_target_3", |
| "Verify unfencing with on_target = true using topology", |
| 1) |
| |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=%s node3\"" % (our_uname)) |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true2 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=%s node3\"" % (our_uname)) |
| |
| test.add_cmd("stonith_admin", "--output-as=xml -r %s -i 1 -v true1" % (our_uname)) |
| test.add_cmd("stonith_admin", "--output-as=xml -r %s -i 2 -v true2" % (our_uname)) |
| |
| test.add_cmd("stonith_admin", "--output-as=xml -U %s -t 3" % (our_uname)) |
| test.add_stonith_log_pattern("(on) to be executed on the target node") |
| |
| |
| test = self.new_test("cpg_unfence_on_target_4", |
| "Verify unfencing failure with on_target = true using topology", |
| 1) |
| |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true1 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=%s node_fake\"" % (our_uname)) |
| test.add_cmd("stonith_admin", |
| "--output-as=xml -R true2 -a fence_dummy -o \"mode=pass\" -o \"pcmk_host_list=%s node_fake\"" % (our_uname)) |
| |
| test.add_cmd("stonith_admin", "--output-as=xml -r node_fake -i 1 -v true1") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node_fake -i 2 -v true2") |
| |
| test.add_expected_fail_cmd("stonith_admin", "--output-as=xml -U node_fake -t 3", CrmExit.ERROR) |
| test.add_stonith_log_pattern("(on) to be executed on the target node") |
| |
| def build_remap_tests(self): |
| """ Register tests that verify remapping of reboots to off-on """ |
| |
| test = self.new_test("cpg_remap_simple", |
| "Verify sequential topology reboot is remapped to all-off-then-all-on", 1) |
| test.add_cmd("stonith_admin", |
| """--output-as=xml -R true1 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node_fake" """ |
| """-o "pcmk_off_timeout=1" -o "pcmk_reboot_timeout=10" """) |
| test.add_cmd("stonith_admin", |
| """--output-as=xml -R true2 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node_fake" """ |
| """-o "pcmk_off_timeout=2" -o "pcmk_reboot_timeout=20" """) |
| test.add_cmd("stonith_admin", "--output-as=xml -r node_fake -i 1 -v true1 -v true2") |
| test.add_cmd("stonith_admin", "--output-as=xml -B node_fake -t 5") |
| test.add_stonith_log_pattern("Remapping multiple-device reboot targeting node_fake") |
| |
| test.add_stonith_log_pattern("Total timeout set to 3 for peer's fencing targeting node_fake") |
| test.add_stonith_log_pattern("perform 'off' action targeting node_fake using 'true1'") |
| test.add_stonith_log_pattern("perform 'off' action targeting node_fake using 'true2'") |
| test.add_stonith_log_pattern("Remapped 'off' targeting node_fake complete, remapping to 'on'") |
| |
| test.add_stonith_log_pattern("Ignoring true1 'on' failure (no capable peers) targeting node_fake") |
| test.add_stonith_log_pattern("Ignoring true2 'on' failure (no capable peers) targeting node_fake") |
| test.add_stonith_log_pattern("Undoing remap of reboot targeting node_fake") |
| |
| test = self.new_test("cpg_remap_automatic", |
| "Verify remapped topology reboot skips automatic 'on'", 1) |
| test.add_cmd("stonith_admin", |
| """--output-as=xml -R true1 -a fence_dummy_auto_unfence """ |
| """-o "mode=pass" -o "pcmk_host_list=node_fake" """) |
| test.add_cmd("stonith_admin", |
| """--output-as=xml -R true2 -a fence_dummy_auto_unfence """ |
| """-o "mode=pass" -o "pcmk_host_list=node_fake" """) |
| test.add_cmd("stonith_admin", "--output-as=xml -r node_fake -i 1 -v true1 -v true2") |
| test.add_cmd("stonith_admin", "--output-as=xml -B node_fake -t 5") |
| test.add_stonith_log_pattern("Remapping multiple-device reboot targeting node_fake") |
| test.add_stonith_log_pattern("perform 'off' action targeting node_fake using 'true1'") |
| test.add_stonith_log_pattern("perform 'off' action targeting node_fake using 'true2'") |
| test.add_stonith_log_pattern("Remapped 'off' targeting node_fake complete, remapping to 'on'") |
| test.add_stonith_log_pattern("Undoing remap of reboot targeting node_fake") |
| test.add_stonith_neg_log_pattern("perform 'on' action targeting node_fake using") |
| test.add_stonith_neg_log_pattern("'on' failure") |
| |
| test = self.new_test("cpg_remap_complex_1", |
| "Verify remapped topology reboot in second level works if non-remapped first level fails", |
| 1) |
| test.add_cmd("stonith_admin", """--output-as=xml -R false1 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=node_fake" """) |
| test.add_cmd("stonith_admin", """--output-as=xml -R true1 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node_fake" """) |
| test.add_cmd("stonith_admin", """--output-as=xml -R true2 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node_fake" """) |
| test.add_cmd("stonith_admin", "--output-as=xml -r node_fake -i 1 -v false1") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node_fake -i 2 -v true1 -v true2") |
| test.add_cmd("stonith_admin", "--output-as=xml -B node_fake -t 5") |
| test.add_stonith_log_pattern("perform 'reboot' action targeting node_fake using 'false1'") |
| test.add_stonith_log_pattern("Remapping multiple-device reboot targeting node_fake") |
| test.add_stonith_log_pattern("perform 'off' action targeting node_fake using 'true1'") |
| test.add_stonith_log_pattern("perform 'off' action targeting node_fake using 'true2'") |
| test.add_stonith_log_pattern("Remapped 'off' targeting node_fake complete, remapping to 'on'") |
| test.add_stonith_log_pattern("Ignoring true1 'on' failure (no capable peers) targeting node_fake") |
| test.add_stonith_log_pattern("Ignoring true2 'on' failure (no capable peers) targeting node_fake") |
| test.add_stonith_log_pattern("Undoing remap of reboot targeting node_fake") |
| |
| test = self.new_test("cpg_remap_complex_2", |
| "Verify remapped topology reboot failure in second level proceeds to third level", |
| 1) |
| test.add_cmd("stonith_admin", """--output-as=xml -R false1 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=node_fake" """) |
| test.add_cmd("stonith_admin", """--output-as=xml -R false2 -a fence_dummy -o "mode=fail" -o "pcmk_host_list=node_fake" """) |
| test.add_cmd("stonith_admin", """--output-as=xml -R true1 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node_fake" """) |
| test.add_cmd("stonith_admin", """--output-as=xml -R true2 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node_fake" """) |
| test.add_cmd("stonith_admin", """--output-as=xml -R true3 -a fence_dummy -o "mode=pass" -o "pcmk_host_list=node_fake" """) |
| test.add_cmd("stonith_admin", "--output-as=xml -r node_fake -i 1 -v false1") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node_fake -i 2 -v true1 -v false2 -v true3") |
| test.add_cmd("stonith_admin", "--output-as=xml -r node_fake -i 3 -v true2") |
| test.add_cmd("stonith_admin", "--output-as=xml -B node_fake -t 5") |
| test.add_stonith_log_pattern("perform 'reboot' action targeting node_fake using 'false1'") |
| test.add_stonith_log_pattern("Remapping multiple-device reboot targeting node_fake") |
| test.add_stonith_log_pattern("perform 'off' action targeting node_fake using 'true1'") |
| test.add_stonith_log_pattern("perform 'off' action targeting node_fake using 'false2'") |
| test.add_stonith_log_pattern("Attempted to execute agent fence_dummy (off) the maximum number of times") |
| test.add_stonith_log_pattern("Undoing remap of reboot targeting node_fake") |
| test.add_stonith_log_pattern("perform 'reboot' action targeting node_fake using 'true2'") |
| test.add_stonith_neg_log_pattern("node_fake with true3") |
| |
| def setup_environment(self, use_corosync): |
| """ Prepare the host before executing any tests """ |
| |
| if use_corosync: |
| if self.autogen_corosync_cfg: |
| (handle, self.autogen_corosync_log) = tempfile.mkstemp(prefix="cts-fencing-", |
| suffix=".corosync.log") |
| os.close(handle) |
| corosync_cfg = io.open("/etc/corosync/corosync.conf", "w") |
| corosync_cfg.write(AUTOGEN_COROSYNC_TEMPLATE % (localname(), self.autogen_corosync_log)) |
| corosync_cfg.close() |
| |
| |
| killall("corosync") |
| self.start_corosync() |
| |
| subprocess.call(["cts-support", "install"]) |
| |
| def cleanup_environment(self, use_corosync): |
| """ Clean up the host after executing desired tests """ |
| |
| if use_corosync: |
| killall("corosync") |
| |
| if self.autogen_corosync_cfg: |
| if self.verbose: |
| print("Corosync output") |
| logfile = io.open(self.autogen_corosync_log, 'rt') |
| for line in logfile.readlines(): |
| print(line.strip()) |
| logfile.close() |
| os.remove(self.autogen_corosync_log) |
| os.remove("/etc/corosync/corosync.conf") |
| |
| subprocess.call(["cts-support", "uninstall"]) |
| |
| class TestOptions(object): |
| """ Option handler """ |
| |
| def __init__(self): |
| self.options = {} |
| self.options['list-tests'] = 0 |
| self.options['run-all'] = 1 |
| self.options['run-only'] = "" |
| self.options['run-only-pattern'] = "" |
| self.options['verbose'] = 0 |
| self.options['invalid-arg'] = "" |
| self.options['cpg-only'] = 0 |
| self.options['no-cpg'] = 0 |
| self.options['show-usage'] = 0 |
| |
| def build_options(self, argv): |
| """ Set options based on command-line arguments """ |
| |
| args = argv[1:] |
| skip = 0 |
| for i in range(0, len(args)): |
| if skip: |
| skip = 0 |
| continue |
| elif args[i] == "-h" or args[i] == "--help": |
| self.options['show-usage'] = 1 |
| elif args[i] == "-l" or args[i] == "--list-tests": |
| self.options['list-tests'] = 1 |
| elif args[i] == "-V" or args[i] == "--verbose": |
| self.options['verbose'] = 1 |
| elif args[i] == "-n" or args[i] == "--no-cpg": |
| self.options['no-cpg'] = 1 |
| elif args[i] == "-c" or args[i] == "--cpg-only": |
| self.options['cpg-only'] = 1 |
| elif args[i] == "-r" or args[i] == "--run-only": |
| self.options['run-only'] = args[i+1] |
| skip = 1 |
| elif args[i] == "-p" or args[i] == "--run-only-pattern": |
| self.options['run-only-pattern'] = args[i+1] |
| skip = 1 |
| |
| def show_usage(self): |
| """ Show command usage """ |
| |
| print("usage: " + sys.argv[0] + " [options]") |
| print("If no options are provided, all tests will run") |
| print("Options:") |
| print("\t [--help | -h] Show usage") |
| print("\t [--list-tests | -l] Print out all registered tests.") |
| print("\t [--cpg-only | -c] Only run tests that require corosync.") |
| print("\t [--no-cpg | -n] Only run tests that do not require corosync") |
| print("\t [--run-only | -r 'testname'] Run a specific test") |
| print("\t [--verbose | -V] Verbose output") |
| print("\t [--run-only-pattern | -p 'string'] Run only tests containing the string value") |
| print("\n\tExample: Run only the test 'start_stop'") |
| print("\t\t " + sys.argv[0] + " --run-only start_stop") |
| print("\n\tExample: Run only the tests with the string 'systemd' present in them") |
| print("\t\t " + sys.argv[0] + " --run-only-pattern systemd") |
| |
| |
| def main(argv): |
| """ Run fencing regression tests as specified by arguments """ |
| |
| update_path() |
| |
| opts = TestOptions() |
| opts.build_options(argv) |
| |
| use_corosync = 1 |
| |
| tests = Tests(opts.options['verbose']) |
| tests.build_standalone_tests() |
| tests.build_custom_timeout_tests() |
| tests.build_api_sanity_tests() |
| tests.build_fence_merge_tests() |
| tests.build_fence_no_merge_tests() |
| tests.build_unfence_tests() |
| tests.build_unfence_on_target_tests() |
| tests.build_nodeid_tests() |
| tests.build_remap_tests() |
| |
| if opts.options['list-tests']: |
| tests.print_list() |
| sys.exit(CrmExit.OK) |
| elif opts.options['show-usage']: |
| opts.show_usage() |
| sys.exit(CrmExit.OK) |
| |
| print("Starting ...") |
| |
| if opts.options['no-cpg']: |
| use_corosync = 0 |
| |
| tests.setup_environment(use_corosync) |
| |
| if opts.options['run-only-pattern'] != "": |
| tests.run_tests_matching(opts.options['run-only-pattern']) |
| tests.print_results() |
| elif opts.options['run-only'] != "": |
| tests.run_single(opts.options['run-only']) |
| tests.print_results() |
| elif opts.options['no-cpg']: |
| tests.run_no_cpg() |
| tests.print_results() |
| elif opts.options['cpg-only']: |
| tests.run_cpg_only() |
| tests.print_results() |
| else: |
| tests.run_tests() |
| tests.print_results() |
| |
| tests.cleanup_environment(use_corosync) |
| tests.exit() |
| |
| |
| if __name__ == "__main__": |
| main(sys.argv) |