Blame iptables-test.py

Packit 7b22a4
#!/usr/bin/env python
Packit 7b22a4
#
Packit 7b22a4
# (C) 2012-2013 by Pablo Neira Ayuso <pablo@netfilter.org>
Packit 7b22a4
#
Packit 7b22a4
# This program is free software; you can redistribute it and/or modify
Packit 7b22a4
# it under the terms of the GNU General Public License as published by
Packit 7b22a4
# the Free Software Foundation; either version 2 of the License, or
Packit 7b22a4
# (at your option) any later version.
Packit 7b22a4
#
Packit 7b22a4
# This software has been sponsored by Sophos Astaro <http://www.sophos.com>
Packit 7b22a4
#
Packit 7b22a4
Packit 7b22a4
from __future__ import print_function
Packit 7b22a4
import sys
Packit 7b22a4
import os
Packit 7b22a4
import subprocess
Packit 7b22a4
import argparse
Packit 7b22a4
Packit 7b22a4
IPTABLES = "iptables"
Packit 7b22a4
IP6TABLES = "ip6tables"
Packit 7b22a4
ARPTABLES = "arptables"
Packit 7b22a4
EBTABLES = "ebtables"
Packit 7b22a4
Packit 7b22a4
IPTABLES_SAVE = "iptables-save"
Packit 7b22a4
IP6TABLES_SAVE = "ip6tables-save"
Packit 7b22a4
ARPTABLES_SAVE = "arptables-save"
Packit 7b22a4
EBTABLES_SAVE = "ebtables-save"
Packit 7b22a4
#IPTABLES_SAVE = ['xtables-save','-4']
Packit 7b22a4
#IP6TABLES_SAVE = ['xtables-save','-6']
Packit 7b22a4
Packit 7b22a4
EXTENSIONS_PATH = "extensions"
Packit 7b22a4
LOGFILE="/tmp/iptables-test.log"
Packit 7b22a4
log_file = None
Packit 7b22a4
Packit 7b22a4
Packit 7b22a4
class Colors:
Packit 7b22a4
    HEADER = '\033[95m'
Packit 7b22a4
    BLUE = '\033[94m'
Packit 7b22a4
    GREEN = '\033[92m'
Packit 7b22a4
    YELLOW = '\033[93m'
Packit 7b22a4
    RED = '\033[91m'
Packit 7b22a4
    ENDC = '\033[0m'
Packit 7b22a4
Packit 7b22a4
Packit 7b22a4
def print_error(reason, filename=None, lineno=None):
Packit 7b22a4
    '''
Packit 7b22a4
    Prints an error with nice colors, indicating file and line number.
Packit 7b22a4
    '''
Packit 7b22a4
    print(filename + ": " + Colors.RED + "ERROR" +
Packit 7b22a4
        Colors.ENDC + ": line %d (%s)" % (lineno, reason))
Packit 7b22a4
Packit 7b22a4
Packit 7b22a4
def delete_rule(iptables, rule, filename, lineno):
Packit 7b22a4
    '''
Packit 7b22a4
    Removes an iptables rule
Packit 7b22a4
    '''
Packit 7b22a4
    cmd = iptables + " -D " + rule
Packit 7b22a4
    ret = execute_cmd(cmd, filename, lineno)
Packit 7b22a4
    if ret == 1:
Packit 7b22a4
        reason = "cannot delete: " + iptables + " -I " + rule
Packit 7b22a4
        print_error(reason, filename, lineno)
Packit 7b22a4
        return -1
Packit 7b22a4
Packit 7b22a4
    return 0
Packit 7b22a4
Packit 7b22a4
Packit 7b22a4
def run_test(iptables, rule, rule_save, res, filename, lineno, netns):
Packit 7b22a4
    '''
Packit 7b22a4
    Executes an unit test. Returns the output of delete_rule().
Packit 7b22a4
Packit 7b22a4
    Parameters:
Packit 7b22a4
    :param  iptables: string with the iptables command to execute
Packit 7b22a4
    :param rule: string with iptables arguments for the rule to test
Packit 7b22a4
    :param rule_save: string to find the rule in the output of iptables -save
Packit 7b22a4
    :param res: expected result of the rule. Valid values: "OK", "FAIL"
Packit 7b22a4
    :param filename: name of the file tested (used for print_error purposes)
Packit 7b22a4
    :param lineno: line number being tested (used for print_error purposes)
Packit 7b22a4
    '''
Packit 7b22a4
    ret = 0
Packit 7b22a4
Packit 7b22a4
    cmd = iptables + " -A " + rule
Packit 7b22a4
    if netns:
Packit 7b22a4
            cmd = "ip netns exec ____iptables-container-test " + EXECUTEABLE + " " + cmd
Packit 7b22a4
Packit 7b22a4
    ret = execute_cmd(cmd, filename, lineno)
Packit 7b22a4
Packit 7b22a4
    #
Packit 7b22a4
    # report failed test
Packit 7b22a4
    #
Packit 7b22a4
    if ret:
Packit 7b22a4
        if res == "OK":
Packit 7b22a4
            reason = "cannot load: " + cmd
Packit 7b22a4
            print_error(reason, filename, lineno)
Packit 7b22a4
            return -1
Packit 7b22a4
        else:
Packit 7b22a4
            # do not report this error
Packit 7b22a4
            return 0
Packit 7b22a4
    else:
Packit 7b22a4
        if res == "FAIL":
Packit 7b22a4
            reason = "should fail: " + cmd
Packit 7b22a4
            print_error(reason, filename, lineno)
Packit 7b22a4
            delete_rule(iptables, rule, filename, lineno)
Packit 7b22a4
            return -1
Packit 7b22a4
Packit 7b22a4
    matching = 0
Packit 7b22a4
    splitted = iptables.split(" ")
Packit 7b22a4
    if len(splitted) == 2:
Packit 7b22a4
        if splitted[1] == '-4':
Packit 7b22a4
            command = IPTABLES_SAVE
Packit 7b22a4
        elif splitted[1] == '-6':
Packit 7b22a4
            command = IP6TABLES_SAVE
Packit 7b22a4
    elif len(splitted) == 1:
Packit 7b22a4
        if splitted[0] == IPTABLES:
Packit 7b22a4
            command = IPTABLES_SAVE
Packit 7b22a4
        elif splitted[0] == IP6TABLES:
Packit 7b22a4
            command = IP6TABLES_SAVE
Packit 7b22a4
        elif splitted[0] == ARPTABLES:
Packit 7b22a4
            command = ARPTABLES_SAVE
Packit 7b22a4
        elif splitted[0] == EBTABLES:
Packit 7b22a4
            command = EBTABLES_SAVE
Packit 7b22a4
Packit Service d4a05d
    command = EXECUTEABLE + " " + command
Packit 7b22a4
Packit 7b22a4
    if netns:
Packit 7b22a4
            command = "ip netns exec ____iptables-container-test " + command
Packit 7b22a4
Packit 7b22a4
    args = splitted[1:]
Packit 7b22a4
    proc = subprocess.Popen(command, shell=True,
Packit 7b22a4
                            stdin=subprocess.PIPE,
Packit 7b22a4
                            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
Packit 7b22a4
    out, err = proc.communicate()
Packit 7b22a4
Packit 7b22a4
    #
Packit 7b22a4
    # check for segfaults
Packit 7b22a4
    #
Packit 7b22a4
    if proc.returncode == -11:
Packit 7b22a4
        reason = "iptables-save segfaults: " + cmd
Packit 7b22a4
        print_error(reason, filename, lineno)
Packit 7b22a4
        delete_rule(iptables, rule, filename, lineno)
Packit 7b22a4
        return -1
Packit 7b22a4
Packit 7b22a4
    # find the rule
Packit 7b22a4
    matching = out.find(rule_save.encode('utf-8'))
Packit 7b22a4
    if matching < 0:
Packit 7b22a4
        reason = "cannot find: " + iptables + " -I " + rule
Packit 7b22a4
        print_error(reason, filename, lineno)
Packit 7b22a4
        delete_rule(iptables, rule, filename, lineno)
Packit 7b22a4
        return -1
Packit 7b22a4
Packit 7b22a4
    # Test "ip netns del NETNS" path with rules in place
Packit 7b22a4
    if netns:
Packit 7b22a4
        return 0
Packit 7b22a4
Packit 7b22a4
    return delete_rule(iptables, rule, filename, lineno)
Packit 7b22a4
Packit 7b22a4
def execute_cmd(cmd, filename, lineno):
Packit 7b22a4
    '''
Packit 7b22a4
    Executes a command, checking for segfaults and returning the command exit
Packit 7b22a4
    code.
Packit 7b22a4
Packit 7b22a4
    :param cmd: string with the command to be executed
Packit 7b22a4
    :param filename: name of the file tested (used for print_error purposes)
Packit 7b22a4
    :param lineno: line number being tested (used for print_error purposes)
Packit 7b22a4
    '''
Packit 7b22a4
    global log_file
Packit 7b22a4
    if cmd.startswith('iptables ') or cmd.startswith('ip6tables ') or cmd.startswith('ebtables ') or cmd.startswith('arptables '):
Packit Service d4a05d
        cmd = EXECUTEABLE + " " + cmd
Packit 7b22a4
Packit 7b22a4
    print("command: {}".format(cmd), file=log_file)
Packit 7b22a4
    ret = subprocess.call(cmd, shell=True, universal_newlines=True,
Packit 7b22a4
        stderr=subprocess.STDOUT, stdout=log_file)
Packit 7b22a4
    log_file.flush()
Packit 7b22a4
Packit 7b22a4
    # generic check for segfaults
Packit 7b22a4
    if ret  == -11:
Packit 7b22a4
        reason = "command segfaults: " + cmd
Packit 7b22a4
        print_error(reason, filename, lineno)
Packit 7b22a4
    return ret
Packit 7b22a4
Packit 7b22a4
Packit 7b22a4
def run_test_file(filename, netns):
Packit 7b22a4
    '''
Packit 7b22a4
    Runs a test file
Packit 7b22a4
Packit 7b22a4
    :param filename: name of the file with the test rules
Packit 7b22a4
    '''
Packit 7b22a4
    #
Packit 7b22a4
    # if this is not a test file, skip.
Packit 7b22a4
    #
Packit 7b22a4
    if not filename.endswith(".t"):
Packit 7b22a4
        return 0, 0
Packit 7b22a4
Packit 7b22a4
    if "libipt_" in filename:
Packit 7b22a4
        iptables = IPTABLES
Packit 7b22a4
    elif "libip6t_" in filename:
Packit 7b22a4
        iptables = IP6TABLES
Packit 7b22a4
    elif "libxt_"  in filename:
Packit 7b22a4
        iptables = IPTABLES
Packit 7b22a4
    elif "libarpt_" in filename:
Packit 7b22a4
        # only supported with nf_tables backend
Packit 7b22a4
        if EXECUTEABLE != "xtables-nft-multi":
Packit 7b22a4
           return 0, 0
Packit 7b22a4
        iptables = ARPTABLES
Packit 7b22a4
    elif "libebt_" in filename:
Packit 7b22a4
        # only supported with nf_tables backend
Packit 7b22a4
        if EXECUTEABLE != "xtables-nft-multi":
Packit 7b22a4
           return 0, 0
Packit 7b22a4
        iptables = EBTABLES
Packit 7b22a4
    else:
Packit 7b22a4
        # default to iptables if not known prefix
Packit 7b22a4
        iptables = IPTABLES
Packit 7b22a4
Packit 7b22a4
    f = open(filename)
Packit 7b22a4
Packit 7b22a4
    tests = 0
Packit 7b22a4
    passed = 0
Packit 7b22a4
    table = ""
Packit 7b22a4
    total_test_passed = True
Packit 7b22a4
Packit 7b22a4
    if netns:
Packit 7b22a4
        execute_cmd("ip netns add ____iptables-container-test", filename, 0)
Packit 7b22a4
Packit 7b22a4
    for lineno, line in enumerate(f):
Packit 7b22a4
        if line[0] == "#":
Packit 7b22a4
            continue
Packit 7b22a4
Packit 7b22a4
        if line[0] == ":":
Packit 7b22a4
            chain_array = line.rstrip()[1:].split(",")
Packit 7b22a4
            continue
Packit 7b22a4
Packit 7b22a4
        # external non-iptables invocation, executed as is.
Packit 7b22a4
        if line[0] == "@":
Packit 7b22a4
            external_cmd = line.rstrip()[1:]
Packit 7b22a4
            if netns:
Packit 7b22a4
                external_cmd = "ip netns exec ____iptables-container-test " + external_cmd
Packit 7b22a4
            execute_cmd(external_cmd, filename, lineno)
Packit 7b22a4
            continue
Packit 7b22a4
Packit 7b22a4
        # external iptables invocation, executed as is.
Packit 7b22a4
        if line[0] == "%":
Packit 7b22a4
            external_cmd = line.rstrip()[1:]
Packit 7b22a4
            if netns:
Packit 7b22a4
                external_cmd = "ip netns exec ____iptables-container-test " + EXECUTEABLE + " " + external_cmd
Packit 7b22a4
            execute_cmd(external_cmd, filename, lineno)
Packit 7b22a4
            continue
Packit 7b22a4
Packit 7b22a4
        if line[0] == "*":
Packit 7b22a4
            table = line.rstrip()[1:]
Packit 7b22a4
            continue
Packit 7b22a4
Packit 7b22a4
        if len(chain_array) == 0:
Packit 7b22a4
            print("broken test, missing chain, leaving")
Packit 7b22a4
            sys.exit()
Packit 7b22a4
Packit 7b22a4
        test_passed = True
Packit 7b22a4
        tests += 1
Packit 7b22a4
Packit 7b22a4
        for chain in chain_array:
Packit 7b22a4
            item = line.split(";")
Packit 7b22a4
            if table == "":
Packit 7b22a4
                rule = chain + " " + item[0]
Packit 7b22a4
            else:
Packit 7b22a4
                rule = chain + " -t " + table + " " + item[0]
Packit 7b22a4
Packit 7b22a4
            if item[1] == "=":
Packit 7b22a4
                rule_save = chain + " " + item[0]
Packit 7b22a4
            else:
Packit 7b22a4
                rule_save = chain + " " + item[1]
Packit 7b22a4
Packit 7b22a4
            res = item[2].rstrip()
Packit 7b22a4
            ret = run_test(iptables, rule, rule_save,
Packit 7b22a4
                           res, filename, lineno + 1, netns)
Packit 7b22a4
Packit 7b22a4
            if ret < 0:
Packit 7b22a4
                test_passed = False
Packit 7b22a4
                total_test_passed = False
Packit 7b22a4
                break
Packit 7b22a4
Packit 7b22a4
        if test_passed:
Packit 7b22a4
            passed += 1
Packit 7b22a4
Packit 7b22a4
    if netns:
Packit 7b22a4
        execute_cmd("ip netns del ____iptables-container-test", filename, 0)
Packit 7b22a4
    if total_test_passed:
Packit 7b22a4
        print(filename + ": " + Colors.GREEN + "OK" + Colors.ENDC)
Packit 7b22a4
Packit 7b22a4
    f.close()
Packit 7b22a4
    return tests, passed
Packit 7b22a4
Packit 7b22a4
Packit 7b22a4
def show_missing():
Packit 7b22a4
    '''
Packit 7b22a4
    Show the list of missing test files
Packit 7b22a4
    '''
Packit 7b22a4
    file_list = os.listdir(EXTENSIONS_PATH)
Packit 7b22a4
    testfiles = [i for i in file_list if i.endswith('.t')]
Packit 7b22a4
    libfiles = [i for i in file_list
Packit 7b22a4
                if i.startswith('lib') and i.endswith('.c')]
Packit 7b22a4
Packit 7b22a4
    def test_name(x):
Packit 7b22a4
        return x[0:-2] + '.t'
Packit 7b22a4
    missing = [test_name(i) for i in libfiles
Packit 7b22a4
               if not test_name(i) in testfiles]
Packit 7b22a4
Packit 7b22a4
    print('\n'.join(missing))
Packit 7b22a4
Packit 7b22a4
Packit 7b22a4
#
Packit 7b22a4
# main
Packit 7b22a4
#
Packit 7b22a4
def main():
Packit 7b22a4
    parser = argparse.ArgumentParser(description='Run iptables tests')
Packit 7b22a4
    parser.add_argument('filename', nargs='?',
Packit 7b22a4
                        metavar='path/to/file.t',
Packit 7b22a4
                        help='Run only this test')
Packit 7b22a4
    parser.add_argument('-H', '--host', action='store_true',
Packit 7b22a4
                        help='Run tests against installed binaries')
Packit 7b22a4
    parser.add_argument('-l', '--legacy', action='store_true',
Packit 7b22a4
                        help='Test iptables-legacy')
Packit 7b22a4
    parser.add_argument('-m', '--missing', action='store_true',
Packit 7b22a4
                        help='Check for missing tests')
Packit 7b22a4
    parser.add_argument('-n', '--nftables', action='store_true',
Packit 7b22a4
                        help='Test iptables-over-nftables')
Packit 7b22a4
    parser.add_argument('-N', '--netns', action='store_true',
Packit 7b22a4
                        help='Test netnamespace path')
Packit 7b22a4
    args = parser.parse_args()
Packit 7b22a4
Packit 7b22a4
    #
Packit 7b22a4
    # show list of missing test files
Packit 7b22a4
    #
Packit 7b22a4
    if args.missing:
Packit 7b22a4
        show_missing()
Packit 7b22a4
        return
Packit 7b22a4
Packit 7b22a4
    global EXECUTEABLE
Packit 7b22a4
    EXECUTEABLE = "xtables-legacy-multi"
Packit 7b22a4
    if args.nftables:
Packit 7b22a4
        EXECUTEABLE = "xtables-nft-multi"
Packit 7b22a4
Packit 7b22a4
    if os.getuid() != 0:
Packit 7b22a4
        print("You need to be root to run this, sorry")
Packit 7b22a4
        return
Packit 7b22a4
Packit 7b22a4
    if not args.host:
Packit 7b22a4
        os.putenv("XTABLES_LIBDIR", os.path.abspath(EXTENSIONS_PATH))
Packit 7b22a4
        os.putenv("PATH", "%s/iptables:%s" % (os.path.abspath(os.path.curdir),
Packit 7b22a4
                                              os.getenv("PATH")))
Packit 7b22a4
Packit 7b22a4
    test_files = 0
Packit 7b22a4
    tests = 0
Packit 7b22a4
    passed = 0
Packit 7b22a4
Packit 7b22a4
    # setup global var log file
Packit 7b22a4
    global log_file
Packit 7b22a4
    try:
Packit 7b22a4
        log_file = open(LOGFILE, 'w')
Packit 7b22a4
    except IOError:
Packit 7b22a4
        print("Couldn't open log file %s" % LOGFILE)
Packit 7b22a4
        return
Packit 7b22a4
Packit 7b22a4
    if args.filename:
Packit 7b22a4
        file_list = [args.filename]
Packit 7b22a4
    else:
Packit 7b22a4
        file_list = [os.path.join(EXTENSIONS_PATH, i)
Packit 7b22a4
                     for i in os.listdir(EXTENSIONS_PATH)
Packit 7b22a4
                     if i.endswith('.t')]
Packit 7b22a4
        file_list.sort()
Packit 7b22a4
Packit 7b22a4
    for filename in file_list:
Packit 7b22a4
        file_tests, file_passed = run_test_file(filename, args.netns)
Packit 7b22a4
        if file_tests:
Packit 7b22a4
            tests += file_tests
Packit 7b22a4
            passed += file_passed
Packit 7b22a4
            test_files += 1
Packit 7b22a4
Packit 7b22a4
    print("%d test files, %d unit tests, %d passed" % (test_files, tests, passed))
Packit 7b22a4
Packit 7b22a4
Packit 7b22a4
if __name__ == '__main__':
Packit 7b22a4
    main()