Blame tests/py/nft-test.py

Packit c5a612
#!/usr/bin/env python
Packit c5a612
#
Packit c5a612
# (C) 2014 by Ana Rey Botello <anarey@gmail.com>
Packit c5a612
#
Packit c5a612
# Based on iptables-test.py:
Packit c5a612
# (C) 2012 by Pablo Neira Ayuso <pablo@netfilter.org>"
Packit c5a612
#
Packit c5a612
# This program is free software; you can redistribute it and/or modify
Packit c5a612
# it under the terms of the GNU General Public License as published by
Packit c5a612
# the Free Software Foundation; either version 2 of the License, or
Packit c5a612
# (at your option) any later version.
Packit c5a612
#
Packit c5a612
# Thanks to the Outreach Program for Women (OPW) for sponsoring this test
Packit c5a612
# infrastructure.
Packit c5a612
Packit c5a612
from __future__ import print_function
Packit c5a612
import sys
Packit c5a612
import os
Packit c5a612
import argparse
Packit c5a612
import signal
Packit c5a612
import json
Packit c5a612
import traceback
Packit c5a612
import tempfile
Packit c5a612
Packit c5a612
TESTS_PATH = os.path.dirname(os.path.abspath(__file__))
Packit c5a612
sys.path.insert(0, os.path.join(TESTS_PATH, '../../py/'))
Packit c5a612
os.environ['TZ'] = 'UTC-2'
Packit c5a612
Packit c5a612
from nftables import Nftables
Packit c5a612
Packit c5a612
TESTS_DIRECTORY = ["any", "arp", "bridge", "inet", "ip", "ip6"]
Packit c5a612
LOGFILE = "/tmp/nftables-test.log"
Packit c5a612
log_file = None
Packit c5a612
table_list = []
Packit c5a612
chain_list = []
Packit c5a612
all_set = dict()
Packit c5a612
obj_list = []
Packit c5a612
signal_received = 0
Packit c5a612
Packit c5a612
Packit c5a612
class Colors:
Packit c5a612
    if sys.stdout.isatty():
Packit c5a612
        HEADER = '\033[95m'
Packit c5a612
        GREEN = '\033[92m'
Packit c5a612
        YELLOW = '\033[93m'
Packit c5a612
        RED = '\033[91m'
Packit c5a612
        ENDC = '\033[0m'
Packit c5a612
    else:
Packit c5a612
        HEADER = ''
Packit c5a612
        GREEN = ''
Packit c5a612
        YELLOW = ''
Packit c5a612
        RED = ''
Packit c5a612
        ENDC = ''
Packit c5a612
Packit c5a612
Packit c5a612
class Chain:
Packit c5a612
    """Class that represents a chain"""
Packit c5a612
Packit c5a612
    def __init__(self, name, config, lineno):
Packit c5a612
        self.name = name
Packit c5a612
        self.config = config
Packit c5a612
        self.lineno = lineno
Packit c5a612
Packit c5a612
    def __eq__(self, other):
Packit c5a612
        return self.__dict__ == other.__dict__
Packit c5a612
Packit c5a612
    def __str__(self):
Packit c5a612
        return "%s" % self.name
Packit c5a612
Packit c5a612
Packit c5a612
class Table:
Packit c5a612
    """Class that represents a table"""
Packit c5a612
Packit c5a612
    def __init__(self, family, name, chains):
Packit c5a612
        self.family = family
Packit c5a612
        self.name = name
Packit c5a612
        self.chains = chains
Packit c5a612
Packit c5a612
    def __eq__(self, other):
Packit c5a612
        return self.__dict__ == other.__dict__
Packit c5a612
Packit c5a612
    def __str__(self):
Packit c5a612
        return "%s %s" % (self.family, self.name)
Packit c5a612
Packit c5a612
Packit c5a612
class Set:
Packit c5a612
    """Class that represents a set"""
Packit c5a612
Packit c5a612
    def __init__(self, family, table, name, type, timeout, flags):
Packit c5a612
        self.family = family
Packit c5a612
        self.table = table
Packit c5a612
        self.name = name
Packit c5a612
        self.type = type
Packit c5a612
        self.timeout = timeout
Packit c5a612
        self.flags = flags
Packit c5a612
Packit c5a612
    def __eq__(self, other):
Packit c5a612
        return self.__dict__ == other.__dict__
Packit c5a612
Packit c5a612
Packit c5a612
class Obj:
Packit c5a612
    """Class that represents an object"""
Packit c5a612
Packit c5a612
    def __init__(self, table, family, name, type, spcf):
Packit c5a612
        self.table = table
Packit c5a612
        self.family = family
Packit c5a612
        self.name = name
Packit c5a612
        self.type = type
Packit c5a612
        self.spcf = spcf
Packit c5a612
Packit c5a612
    def __eq__(self, other):
Packit c5a612
        return self.__dict__ == other.__dict__
Packit c5a612
Packit c5a612
Packit c5a612
def print_msg(reason, errstr, filename=None, lineno=None, color=None):
Packit c5a612
    '''
Packit c5a612
    Prints a message with nice colors, indicating file and line number.
Packit c5a612
    '''
Packit c5a612
    color_errstr = "%s%s%s" % (color, errstr, Colors.ENDC)
Packit c5a612
    if filename and lineno:
Packit c5a612
        sys.stderr.write("%s: %s line %d: %s\n" %
Packit c5a612
                         (filename, color_errstr, lineno + 1, reason))
Packit c5a612
    else:
Packit c5a612
        sys.stderr.write("%s %s\n" % (color_errstr, reason))
Packit c5a612
    sys.stderr.flush() # So that the message stay in the right place.
Packit c5a612
Packit c5a612
Packit c5a612
def print_error(reason, filename=None, lineno=None):
Packit c5a612
    print_msg(reason, "ERROR:", filename, lineno, Colors.RED)
Packit c5a612
Packit c5a612
Packit c5a612
def print_warning(reason, filename=None, lineno=None):
Packit c5a612
    print_msg(reason, "WARNING:", filename, lineno, Colors.YELLOW)
Packit c5a612
Packit c5a612
def print_info(reason, filename=None, lineno=None):
Packit c5a612
    print_msg(reason, "INFO:", filename, lineno, Colors.GREEN)
Packit c5a612
Packit c5a612
def color_differences(rule, other, color):
Packit c5a612
    rlen = len(rule)
Packit c5a612
    olen = len(other)
Packit c5a612
    out = ""
Packit c5a612
    i = 0
Packit c5a612
Packit c5a612
    # find equal part at start
Packit c5a612
    for i in range(rlen):
Packit c5a612
        if i >= olen or rule[i] != other[i]:
Packit c5a612
            break
Packit c5a612
    if i > 0:
Packit c5a612
        out += rule[:i]
Packit c5a612
        rule = rule[i:]
Packit c5a612
        other = other[i:]
Packit c5a612
        rlen = len(rule)
Packit c5a612
        olen = len(other)
Packit c5a612
Packit c5a612
    # find equal part at end
Packit c5a612
    for i in range(1, rlen + 1):
Packit c5a612
        if i > olen or rule[rlen - i] != other[olen - i]:
Packit c5a612
            i -= 1
Packit c5a612
            break
Packit c5a612
    if rlen > i:
Packit c5a612
        out += color + rule[:rlen - i] + Colors.ENDC
Packit c5a612
        rule = rule[rlen - i:]
Packit c5a612
Packit c5a612
    out += rule
Packit c5a612
    return out
Packit c5a612
Packit c5a612
def print_differences_warning(filename, lineno, rule1, rule2, cmd):
Packit c5a612
    colored_rule1 = color_differences(rule1, rule2, Colors.YELLOW)
Packit c5a612
    colored_rule2 = color_differences(rule2, rule1, Colors.YELLOW)
Packit c5a612
    reason = "'%s': '%s' mismatches '%s'" % (cmd, colored_rule1, colored_rule2)
Packit c5a612
    print_warning(reason, filename, lineno)
Packit c5a612
Packit c5a612
Packit c5a612
def print_differences_error(filename, lineno, cmd):
Packit c5a612
    reason = "'%s': Listing is broken." % cmd
Packit c5a612
    print_error(reason, filename, lineno)
Packit c5a612
Packit c5a612
Packit c5a612
def table_exist(table, filename, lineno):
Packit c5a612
    '''
Packit c5a612
    Exists a table.
Packit c5a612
    '''
Packit c5a612
    cmd = "list table %s" % table
Packit c5a612
    ret = execute_cmd(cmd, filename, lineno)
Packit c5a612
Packit c5a612
    return True if (ret == 0) else False
Packit c5a612
Packit c5a612
Packit c5a612
def table_flush(table, filename, lineno):
Packit c5a612
    '''
Packit c5a612
    Flush a table.
Packit c5a612
    '''
Packit c5a612
    cmd = "flush table %s" % table
Packit c5a612
    execute_cmd(cmd, filename, lineno)
Packit c5a612
Packit c5a612
    return cmd
Packit c5a612
Packit c5a612
Packit c5a612
def table_create(table, filename, lineno):
Packit c5a612
    '''
Packit c5a612
    Adds a table.
Packit c5a612
    '''
Packit c5a612
    # We check if table exists.
Packit c5a612
    if table_exist(table, filename, lineno):
Packit c5a612
        reason = "Table %s already exists" % table
Packit c5a612
        print_error(reason, filename, lineno)
Packit c5a612
        return -1
Packit c5a612
Packit c5a612
    table_list.append(table)
Packit c5a612
Packit c5a612
    # We add a new table
Packit c5a612
    cmd = "add table %s" % table
Packit c5a612
    ret = execute_cmd(cmd, filename, lineno)
Packit c5a612
Packit c5a612
    if ret != 0:
Packit c5a612
        reason = "Cannot " + cmd
Packit c5a612
        print_error(reason, filename, lineno)
Packit c5a612
        table_list.remove(table)
Packit c5a612
        return -1
Packit c5a612
Packit c5a612
    # We check if table was added correctly.
Packit c5a612
    if not table_exist(table, filename, lineno):
Packit c5a612
        table_list.remove(table)
Packit c5a612
        reason = "I have just added the table %s " \
Packit c5a612
                 "but it does not exist. Giving up!" % table
Packit c5a612
        print_error(reason, filename, lineno)
Packit c5a612
        return -1
Packit c5a612
Packit c5a612
    for table_chain in table.chains:
Packit c5a612
        chain = chain_get_by_name(table_chain)
Packit c5a612
        if chain is None:
Packit c5a612
            reason = "The chain %s requested by table %s " \
Packit c5a612
                     "does not exist." % (table_chain, table)
Packit c5a612
            print_error(reason, filename, lineno)
Packit c5a612
        else:
Packit c5a612
            chain_create(chain, table, filename)
Packit c5a612
Packit c5a612
    return 0
Packit c5a612
Packit c5a612
Packit c5a612
def table_delete(table, filename=None, lineno=None):
Packit c5a612
    '''
Packit c5a612
    Deletes a table.
Packit c5a612
    '''
Packit c5a612
    if not table_exist(table, filename, lineno):
Packit c5a612
        reason = "Table %s does not exist but I added it before." % table
Packit c5a612
        print_error(reason, filename, lineno)
Packit c5a612
        return -1
Packit c5a612
Packit c5a612
    cmd = "delete table %s" % table
Packit c5a612
    ret = execute_cmd(cmd, filename, lineno)
Packit c5a612
    if ret != 0:
Packit c5a612
        reason = "%s: I cannot delete table %s. Giving up!" % (cmd, table)
Packit c5a612
        print_error(reason, filename, lineno)
Packit c5a612
        return -1
Packit c5a612
Packit c5a612
    if table_exist(table, filename, lineno):
Packit c5a612
        reason = "I have just deleted the table %s " \
Packit c5a612
                 "but it still exists." % table
Packit c5a612
        print_error(reason, filename, lineno)
Packit c5a612
        return -1
Packit c5a612
Packit c5a612
    return 0
Packit c5a612
Packit c5a612
Packit c5a612
def chain_exist(chain, table, filename):
Packit c5a612
    '''
Packit c5a612
    Checks a chain
Packit c5a612
    '''
Packit c5a612
    cmd = "list chain %s %s" % (table, chain)
Packit c5a612
    ret = execute_cmd(cmd, filename, chain.lineno)
Packit c5a612
Packit c5a612
    return True if (ret == 0) else False
Packit c5a612
Packit c5a612
Packit c5a612
def chain_create(chain, table, filename):
Packit c5a612
    '''
Packit c5a612
    Adds a chain
Packit c5a612
    '''
Packit c5a612
    if chain_exist(chain, table, filename):
Packit c5a612
        reason = "This chain '%s' exists in %s. I cannot create " \
Packit c5a612
                 "two chains with same name." % (chain, table)
Packit c5a612
        print_error(reason, filename, chain.lineno)
Packit c5a612
        return -1
Packit c5a612
Packit c5a612
    cmd = "add chain %s %s" % (table, chain)
Packit c5a612
    if chain.config:
Packit c5a612
        cmd += " { %s; }" % chain.config
Packit c5a612
Packit c5a612
    ret = execute_cmd(cmd, filename, chain.lineno)
Packit c5a612
    if ret != 0:
Packit c5a612
        reason = "I cannot create the chain '%s'" % chain
Packit c5a612
        print_error(reason, filename, chain.lineno)
Packit c5a612
        return -1
Packit c5a612
Packit c5a612
    if not chain_exist(chain, table, filename):
Packit c5a612
        reason = "I have added the chain '%s' " \
Packit c5a612
                 "but it does not exist in %s" % (chain, table)
Packit c5a612
        print_error(reason, filename, chain.lineno)
Packit c5a612
        return -1
Packit c5a612
Packit c5a612
    return 0
Packit c5a612
Packit c5a612
Packit c5a612
def chain_delete(chain, table, filename=None, lineno=None):
Packit c5a612
    '''
Packit c5a612
    Flushes and deletes a chain.
Packit c5a612
    '''
Packit c5a612
    if not chain_exist(chain, table, filename):
Packit c5a612
        reason = "The chain %s does not exist in %s. " \
Packit c5a612
                 "I cannot delete it." % (chain, table)
Packit c5a612
        print_error(reason, filename, lineno)
Packit c5a612
        return -1
Packit c5a612
Packit c5a612
    cmd = "flush chain %s %s" % (table, chain)
Packit c5a612
    ret = execute_cmd(cmd, filename, lineno)
Packit c5a612
    if ret != 0:
Packit c5a612
        reason = "I cannot " + cmd
Packit c5a612
        print_error(reason, filename, lineno)
Packit c5a612
        return -1
Packit c5a612
Packit c5a612
    cmd = "delete chain %s %s" % (table, chain)
Packit c5a612
    ret = execute_cmd(cmd, filename, lineno)
Packit c5a612
    if ret != 0:
Packit c5a612
        reason = "I cannot " + cmd
Packit c5a612
        print_error(reason, filename, lineno)
Packit c5a612
        return -1
Packit c5a612
Packit c5a612
    if chain_exist(chain, table, filename):
Packit c5a612
        reason = "The chain %s exists in %s. " \
Packit c5a612
                 "I cannot delete this chain" % (chain, table)
Packit c5a612
        print_error(reason, filename, lineno)
Packit c5a612
        return -1
Packit c5a612
Packit c5a612
    return 0
Packit c5a612
Packit c5a612
Packit c5a612
def chain_get_by_name(name):
Packit c5a612
    for chain in chain_list:
Packit c5a612
        if chain.name == name:
Packit c5a612
            break
Packit c5a612
    else:
Packit c5a612
        chain = None
Packit c5a612
Packit c5a612
    return chain
Packit c5a612
Packit c5a612
Packit c5a612
def set_add(s, test_result, filename, lineno):
Packit c5a612
    '''
Packit c5a612
    Adds a set.
Packit c5a612
    '''
Packit c5a612
    if not table_list:
Packit c5a612
        reason = "Missing table to add rule"
Packit c5a612
        print_error(reason, filename, lineno)
Packit c5a612
        return -1
Packit c5a612
Packit c5a612
    for table in table_list:
Packit c5a612
        s.table = table.name
Packit c5a612
        s.family = table.family
Packit c5a612
        if _set_exist(s, filename, lineno):
Packit c5a612
            reason = "Set %s already exists in %s" % (s.name, table)
Packit c5a612
            print_error(reason, filename, lineno)
Packit c5a612
            return -1
Packit c5a612
Packit c5a612
        flags = s.flags
Packit c5a612
        if flags != "":
Packit c5a612
            flags = "flags %s; " % flags
Packit c5a612
Packit c5a612
        cmd = "add set %s %s { type %s;%s %s}" % (table, s.name, s.type, s.timeout, flags)
Packit c5a612
        ret = execute_cmd(cmd, filename, lineno)
Packit c5a612
Packit c5a612
        if (ret == 0 and test_result == "fail") or \
Packit c5a612
                (ret != 0 and test_result == "ok"):
Packit c5a612
            reason = "%s: I cannot add the set %s" % (cmd, s.name)
Packit c5a612
            print_error(reason, filename, lineno)
Packit c5a612
            return -1
Packit c5a612
Packit c5a612
        if not _set_exist(s, filename, lineno):
Packit c5a612
            reason = "I have just added the set %s to " \
Packit c5a612
                     "the table %s but it does not exist" % (s.name, table)
Packit c5a612
            print_error(reason, filename, lineno)
Packit c5a612
            return -1
Packit c5a612
Packit c5a612
    return 0
Packit c5a612
Packit c5a612
Packit c5a612
def set_add_elements(set_element, set_name, state, filename, lineno):
Packit c5a612
    '''
Packit c5a612
    Adds elements to the set.
Packit c5a612
    '''
Packit c5a612
    if not table_list:
Packit c5a612
        reason = "Missing table to add rules"
Packit c5a612
        print_error(reason, filename, lineno)
Packit c5a612
        return -1
Packit c5a612
Packit c5a612
    for table in table_list:
Packit c5a612
        # Check if set exists.
Packit c5a612
        if (not set_exist(set_name, table, filename, lineno) or
Packit c5a612
                    set_name not in all_set) and state == "ok":
Packit c5a612
            reason = "I cannot add an element to the set %s " \
Packit c5a612
                     "since it does not exist." % set_name
Packit c5a612
            print_error(reason, filename, lineno)
Packit c5a612
            return -1
Packit c5a612
Packit c5a612
        element = ", ".join(set_element)
Packit c5a612
        cmd = "add element %s %s { %s }" % (table, set_name, element)
Packit c5a612
        ret = execute_cmd(cmd, filename, lineno)
Packit c5a612
Packit c5a612
        if (state == "fail" and ret == 0) or (state == "ok" and ret != 0):
Packit c5a612
            test_state = "This rule should have failed."
Packit c5a612
            reason = cmd + ": " + test_state
Packit c5a612
            print_error(reason, filename, lineno)
Packit c5a612
            return -1
Packit c5a612
Packit c5a612
        # Add element into all_set.
Packit c5a612
        if ret == 0 and state == "ok":
Packit c5a612
            for e in set_element:
Packit c5a612
                all_set[set_name].add(e)
Packit c5a612
Packit c5a612
    return 0
Packit c5a612
Packit c5a612
Packit c5a612
def set_delete_elements(set_element, set_name, table, filename=None,
Packit c5a612
                        lineno=None):
Packit c5a612
    '''
Packit c5a612
    Deletes elements in a set.
Packit c5a612
    '''
Packit c5a612
    for element in set_element:
Packit c5a612
        cmd = "delete element %s %s { %s }" % (table, set_name, element)
Packit c5a612
        ret = execute_cmd(cmd, filename, lineno)
Packit c5a612
        if ret != 0:
Packit c5a612
            reason = "I cannot delete element %s " \
Packit c5a612
                     "from the set %s" % (element, set_name)
Packit c5a612
            print_error(reason, filename, lineno)
Packit c5a612
            return -1
Packit c5a612
Packit c5a612
    return 0
Packit c5a612
Packit c5a612
Packit c5a612
def set_delete(table, filename=None, lineno=None):
Packit c5a612
    '''
Packit c5a612
    Deletes set and its content.
Packit c5a612
    '''
Packit c5a612
    for set_name in all_set.keys():
Packit c5a612
        # Check if exists the set
Packit c5a612
        if not set_exist(set_name, table, filename, lineno):
Packit c5a612
            reason = "The set %s does not exist, " \
Packit c5a612
                     "I cannot delete it" % set_name
Packit c5a612
            print_error(reason, filename, lineno)
Packit c5a612
            return -1
Packit c5a612
Packit c5a612
        # We delete all elements in the set
Packit c5a612
        set_delete_elements(all_set[set_name], set_name, table, filename,
Packit c5a612
                            lineno)
Packit c5a612
Packit c5a612
        # We delete the set.
Packit c5a612
        cmd = "delete set %s %s" % (table, set_name)
Packit c5a612
        ret = execute_cmd(cmd, filename, lineno)
Packit c5a612
Packit c5a612
        # Check if the set still exists after I deleted it.
Packit c5a612
        if ret != 0 or set_exist(set_name, table, filename, lineno):
Packit c5a612
            reason = "Cannot remove the set " + set_name
Packit c5a612
            print_error(reason, filename, lineno)
Packit c5a612
            return -1
Packit c5a612
Packit c5a612
    return 0
Packit c5a612
Packit c5a612
Packit c5a612
def set_exist(set_name, table, filename, lineno):
Packit c5a612
    '''
Packit c5a612
    Check if the set exists.
Packit c5a612
    '''
Packit c5a612
    cmd = "list set %s %s" % (table, set_name)
Packit c5a612
    ret = execute_cmd(cmd, filename, lineno)
Packit c5a612
Packit c5a612
    return True if (ret == 0) else False
Packit c5a612
Packit c5a612
Packit c5a612
def _set_exist(s, filename, lineno):
Packit c5a612
    '''
Packit c5a612
    Check if the set exists.
Packit c5a612
    '''
Packit c5a612
    cmd = "list set %s %s %s" % (s.family, s.table, s.name)
Packit c5a612
    ret = execute_cmd(cmd, filename, lineno)
Packit c5a612
Packit c5a612
    return True if (ret == 0) else False
Packit c5a612
Packit c5a612
Packit c5a612
def set_check_element(rule1, rule2):
Packit c5a612
    '''
Packit c5a612
    Check if element exists in anonymous sets.
Packit c5a612
    '''
Packit c5a612
    pos1 = rule1.find("{")
Packit c5a612
    pos2 = rule2.find("{")
Packit c5a612
Packit c5a612
    if (rule1[:pos1] != rule2[:pos2]):
Packit c5a612
        return False
Packit c5a612
Packit c5a612
    end1 = rule1.find("}")
Packit c5a612
    end2 = rule2.find("}")
Packit c5a612
Packit c5a612
    if (pos1 != -1) and (pos2 != -1) and (end1 != -1) and (end2 != -1):
Packit c5a612
        list1 = (rule1[pos1 + 1:end1].replace(" ", "")).split(",")
Packit c5a612
        list2 = (rule2[pos2 + 1:end2].replace(" ", "")).split(",")
Packit c5a612
        list1.sort()
Packit c5a612
        list2.sort()
Packit c5a612
        if list1 != list2:
Packit c5a612
            return False
Packit c5a612
Packit c5a612
        return rule1[end1:] == rule2[end2:]
Packit c5a612
Packit c5a612
    return False
Packit c5a612
Packit c5a612
Packit c5a612
def obj_add(o, test_result, filename, lineno):
Packit c5a612
    '''
Packit c5a612
    Adds an object.
Packit c5a612
    '''
Packit c5a612
    if not table_list:
Packit c5a612
        reason = "Missing table to add rule"
Packit c5a612
        print_error(reason, filename, lineno)
Packit c5a612
        return -1
Packit c5a612
Packit c5a612
    for table in table_list:
Packit c5a612
        o.table = table.name
Packit c5a612
        o.family = table.family
Packit c5a612
        obj_handle = o.type + " " + o.name
Packit c5a612
        if _obj_exist(o, filename, lineno):
Packit c5a612
            reason = "The %s already exists in %s" % (obj_handle, table)
Packit c5a612
            print_error(reason, filename, lineno)
Packit c5a612
            return -1
Packit c5a612
Packit c5a612
        cmd = "add %s %s %s %s" % (o.type, table, o.name, o.spcf)
Packit c5a612
        ret = execute_cmd(cmd, filename, lineno)
Packit c5a612
Packit c5a612
        if (ret == 0 and test_result == "fail") or \
Packit c5a612
                (ret != 0 and test_result == "ok"):
Packit c5a612
            reason = "%s: I cannot add the %s" % (cmd, obj_handle)
Packit c5a612
            print_error(reason, filename, lineno)
Packit c5a612
            return -1
Packit c5a612
Packit c5a612
        exist = _obj_exist(o, filename, lineno)
Packit c5a612
Packit c5a612
        if exist:
Packit c5a612
            if test_result == "ok":
Packit c5a612
                 return 0
Packit c5a612
            reason = "I added the %s to the table %s " \
Packit c5a612
                     "but it should have failed" % (obj_handle, table)
Packit c5a612
            print_error(reason, filename, lineno)
Packit c5a612
            return -1
Packit c5a612
Packit c5a612
        if test_result == "fail":
Packit c5a612
            return 0
Packit c5a612
Packit c5a612
        reason = "I have just added the %s to " \
Packit c5a612
                 "the table %s but it does not exist" % (obj_handle, table)
Packit c5a612
        print_error(reason, filename, lineno)
Packit c5a612
        return -1
Packit c5a612
Packit c5a612
def obj_delete(table, filename=None, lineno=None):
Packit c5a612
    '''
Packit c5a612
    Deletes object.
Packit c5a612
    '''
Packit c5a612
    for o in obj_list:
Packit c5a612
        obj_handle = o.type + " " + o.name
Packit c5a612
        # Check if exists the obj
Packit c5a612
        if not obj_exist(o, table, filename, lineno):
Packit c5a612
            reason = "The %s does not exist, I cannot delete it" % obj_handle
Packit c5a612
            print_error(reason, filename, lineno)
Packit c5a612
            return -1
Packit c5a612
Packit c5a612
        # We delete the object.
Packit c5a612
        cmd = "delete %s %s %s" % (o.type, table, o.name)
Packit c5a612
        ret = execute_cmd(cmd, filename, lineno)
Packit c5a612
Packit c5a612
        # Check if the object still exists after I deleted it.
Packit c5a612
        if ret != 0 or obj_exist(o, table, filename, lineno):
Packit c5a612
            reason = "Cannot remove the " + obj_handle
Packit c5a612
            print_error(reason, filename, lineno)
Packit c5a612
            return -1
Packit c5a612
Packit c5a612
    return 0
Packit c5a612
Packit c5a612
Packit c5a612
def obj_exist(o, table, filename, lineno):
Packit c5a612
    '''
Packit c5a612
    Check if the object exists.
Packit c5a612
    '''
Packit c5a612
    cmd = "list %s %s %s" % (o.type, table, o.name)
Packit c5a612
    ret = execute_cmd(cmd, filename, lineno)
Packit c5a612
Packit c5a612
    return True if (ret == 0) else False
Packit c5a612
Packit c5a612
Packit c5a612
def _obj_exist(o, filename, lineno):
Packit c5a612
    '''
Packit c5a612
    Check if the object exists.
Packit c5a612
    '''
Packit c5a612
    cmd = "list %s %s %s %s" % (o.type, o.family, o.table, o.name)
Packit c5a612
    ret = execute_cmd(cmd, filename, lineno)
Packit c5a612
Packit c5a612
    return True if (ret == 0) else False
Packit c5a612
Packit c5a612
Packit c5a612
def output_clean(pre_output, chain):
Packit c5a612
    pos_chain = pre_output.find(chain.name)
Packit c5a612
    if pos_chain == -1:
Packit c5a612
        return ""
Packit c5a612
    output_intermediate = pre_output[pos_chain:]
Packit c5a612
    brace_start = output_intermediate.find("{")
Packit c5a612
    brace_end = output_intermediate.find("}")
Packit c5a612
    pre_rule = output_intermediate[brace_start:brace_end]
Packit c5a612
    if pre_rule[1:].find("{") > -1:  # this rule has a set.
Packit c5a612
        set = pre_rule[1:].replace("\t", "").replace("\n", "").strip()
Packit c5a612
        set = set.split(";")[2].strip() + "}"
Packit c5a612
        remainder = output_clean(chain.name + " {;;" + output_intermediate[brace_end+1:], chain)
Packit c5a612
        if len(remainder) <= 0:
Packit c5a612
            return set
Packit c5a612
        return set + " " + remainder
Packit c5a612
    else:
Packit c5a612
        rule = pre_rule.split(";")[2].replace("\t", "").replace("\n", "").\
Packit c5a612
            strip()
Packit c5a612
    if len(rule) < 0:
Packit c5a612
        return ""
Packit c5a612
    return rule
Packit c5a612
Packit c5a612
Packit c5a612
def payload_check_elems_to_set(elems):
Packit c5a612
    newset = set()
Packit c5a612
Packit c5a612
    for n, line in enumerate(elems.split('[end]')):
Packit c5a612
        e = line.strip()
Packit c5a612
        if e in newset:
Packit c5a612
            print_error("duplicate", e, n)
Packit c5a612
            return newset
Packit c5a612
Packit c5a612
        newset.add(e)
Packit c5a612
Packit c5a612
    return newset
Packit c5a612
Packit c5a612
Packit c5a612
def payload_check_set_elems(want, got):
Packit c5a612
    if want.find('element') < 0 or want.find('[end]') < 0:
Packit c5a612
        return 0
Packit c5a612
Packit c5a612
    if got.find('element') < 0 or got.find('[end]') < 0:
Packit c5a612
        return 0
Packit c5a612
Packit c5a612
    set_want = payload_check_elems_to_set(want)
Packit c5a612
    set_got = payload_check_elems_to_set(got)
Packit c5a612
Packit c5a612
    return set_want == set_got
Packit c5a612
Packit c5a612
Packit c5a612
def payload_check(payload_buffer, file, cmd):
Packit c5a612
    file.seek(0, 0)
Packit c5a612
    i = 0
Packit c5a612
Packit c5a612
    if not payload_buffer:
Packit c5a612
        return False
Packit c5a612
Packit c5a612
    for lineno, want_line in enumerate(payload_buffer):
Packit c5a612
        line = file.readline()
Packit c5a612
Packit c5a612
        if want_line == line:
Packit c5a612
            i += 1
Packit c5a612
            continue
Packit c5a612
Packit c5a612
        if want_line.find('[') < 0 and line.find('[') < 0:
Packit c5a612
            continue
Packit c5a612
        if want_line.find(']') < 0 and line.find(']') < 0:
Packit c5a612
            continue
Packit c5a612
Packit c5a612
        if payload_check_set_elems(want_line, line):
Packit c5a612
            continue
Packit c5a612
Packit c5a612
        print_differences_warning(file.name, lineno, want_line.strip(),
Packit c5a612
                                  line.strip(), cmd)
Packit c5a612
        return 0
Packit c5a612
Packit c5a612
    return i > 0
Packit c5a612
Packit c5a612
Packit c5a612
def json_dump_normalize(json_string, human_readable = False):
Packit c5a612
    json_obj = json.loads(json_string)
Packit c5a612
Packit c5a612
    if human_readable:
Packit c5a612
        return json.dumps(json_obj, sort_keys = True,
Packit c5a612
                          indent = 4, separators = (',', ': '))
Packit c5a612
    else:
Packit c5a612
        return json.dumps(json_obj, sort_keys = True)
Packit c5a612
Packit c5a612
def json_validate(json_string):
Packit c5a612
    json_obj = json.loads(json_string)
Packit c5a612
    try:
Packit c5a612
        nftables.json_validate(json_obj)
Packit c5a612
    except Exception:
Packit c5a612
        print_error("schema validation failed for input '%s'" % json_string)
Packit c5a612
        print_error(traceback.format_exc())
Packit c5a612
Packit c5a612
def rule_add(rule, filename, lineno, force_all_family_option, filename_path):
Packit c5a612
    '''
Packit c5a612
    Adds a rule
Packit c5a612
    '''
Packit c5a612
    # TODO Check if a rule is added correctly.
Packit c5a612
    ret = warning = error = unit_tests = 0
Packit c5a612
Packit c5a612
    if not table_list or not chain_list:
Packit c5a612
        reason = "Missing table or chain to add rule."
Packit c5a612
        print_error(reason, filename, lineno)
Packit c5a612
        return [-1, warning, error, unit_tests]
Packit c5a612
Packit c5a612
    if rule[1].strip() == "ok":
Packit c5a612
        payload_expected = None
Packit c5a612
        try:
Packit c5a612
            payload_log = open("%s.payload" % filename_path)
Packit c5a612
            payload_expected = payload_find_expected(payload_log, rule[0])
Packit c5a612
        except:
Packit c5a612
            payload_log = None
Packit c5a612
Packit c5a612
        if enable_json_option:
Packit c5a612
            try:
Packit c5a612
                json_log = open("%s.json" % filename_path)
Packit c5a612
                json_input = json_find_expected(json_log, rule[0])
Packit c5a612
            except:
Packit c5a612
                json_input = None
Packit c5a612
Packit c5a612
            if not json_input:
Packit c5a612
                print_error("did not find JSON equivalent for rule '%s'"
Packit c5a612
                            % rule[0])
Packit c5a612
            else:
Packit c5a612
                try:
Packit c5a612
                    json_input = json_dump_normalize(json_input)
Packit c5a612
                except ValueError:
Packit c5a612
                    reason = "Invalid JSON syntax in rule: %s" % json_input
Packit c5a612
                    print_error(reason)
Packit c5a612
                    return [-1, warning, error, unit_tests]
Packit c5a612
Packit c5a612
            try:
Packit c5a612
                json_log = open("%s.json.output" % filename_path)
Packit c5a612
                json_expected = json_find_expected(json_log, rule[0])
Packit c5a612
            except:
Packit c5a612
                # will use json_input for comparison
Packit c5a612
                json_expected = None
Packit c5a612
Packit c5a612
            if json_expected:
Packit c5a612
                try:
Packit c5a612
                    json_expected = json_dump_normalize(json_expected)
Packit c5a612
                except ValueError:
Packit c5a612
                    reason = "Invalid JSON syntax in expected output: %s" % json_expected
Packit c5a612
                    print_error(reason)
Packit c5a612
                    return [-1, warning, error, unit_tests]
Packit c5a612
Packit c5a612
    for table in table_list:
Packit c5a612
        if rule[1].strip() == "ok":
Packit c5a612
            table_payload_expected = None
Packit c5a612
            try:
Packit c5a612
                payload_log = open("%s.payload.%s" % (filename_path, table.family))
Packit c5a612
                table_payload_expected = payload_find_expected(payload_log, rule[0])
Packit c5a612
            except:
Packit c5a612
                if not payload_log:
Packit c5a612
                    print_error("did not find any payload information",
Packit c5a612
                                filename_path)
Packit c5a612
                elif not payload_expected:
Packit c5a612
                    print_error("did not find payload information for "
Packit c5a612
                                "rule '%s'" % rule[0], payload_log.name, 1)
Packit c5a612
            if not table_payload_expected:
Packit c5a612
                table_payload_expected = payload_expected
Packit c5a612
Packit c5a612
        for table_chain in table.chains:
Packit c5a612
            chain = chain_get_by_name(table_chain)
Packit c5a612
            unit_tests += 1
Packit c5a612
            table_flush(table, filename, lineno)
Packit c5a612
Packit c5a612
            payload_log = tempfile.TemporaryFile(mode="w+")
Packit c5a612
Packit c5a612
            # Add rule and check return code
Packit c5a612
            cmd = "add rule %s %s %s" % (table, chain, rule[0])
Packit c5a612
            ret = execute_cmd(cmd, filename, lineno, payload_log, debug="netlink")
Packit c5a612
Packit c5a612
            state = rule[1].rstrip()
Packit c5a612
            if (ret in [0,134] and state == "fail") or (ret != 0 and state == "ok"):
Packit c5a612
                if state == "fail":
Packit c5a612
                    test_state = "This rule should have failed."
Packit c5a612
                else:
Packit c5a612
                    test_state = "This rule should not have failed."
Packit c5a612
                reason = cmd + ": " + test_state
Packit c5a612
                print_error(reason, filename, lineno)
Packit c5a612
                ret = -1
Packit c5a612
                error += 1
Packit c5a612
                if not force_all_family_option:
Packit c5a612
                    return [ret, warning, error, unit_tests]
Packit c5a612
Packit c5a612
            if state == "fail" and ret != 0:
Packit c5a612
                ret = 0
Packit c5a612
                continue
Packit c5a612
Packit c5a612
            if ret != 0:
Packit c5a612
                continue
Packit c5a612
Packit c5a612
            # Check for matching payload
Packit c5a612
            if state == "ok" and not payload_check(table_payload_expected,
Packit c5a612
                                                   payload_log, cmd):
Packit c5a612
                error += 1
Packit c5a612
                gotf = open("%s.payload.got" % filename_path, 'a')
Packit c5a612
                payload_log.seek(0, 0)
Packit c5a612
                gotf.write("# %s\n" % rule[0])
Packit c5a612
                while True:
Packit c5a612
                    line = payload_log.readline()
Packit c5a612
                    if line == "":
Packit c5a612
                        break
Packit c5a612
                    gotf.write(line)
Packit c5a612
                gotf.close()
Packit c5a612
                print_warning("Wrote payload for rule %s" % rule[0],
Packit c5a612
                              gotf.name, 1)
Packit c5a612
Packit c5a612
            # Check for matching ruleset listing
Packit c5a612
            numeric_proto_old = nftables.set_numeric_proto_output(True)
Packit c5a612
            stateless_old = nftables.set_stateless_output(True)
Packit c5a612
            list_cmd = 'list table %s' % table
Packit c5a612
            rc, pre_output, err = nftables.cmd(list_cmd)
Packit c5a612
            nftables.set_numeric_proto_output(numeric_proto_old)
Packit c5a612
            nftables.set_stateless_output(stateless_old)
Packit c5a612
Packit c5a612
            output = pre_output.split(";")
Packit c5a612
            if len(output) < 2:
Packit c5a612
                reason = cmd + ": Listing is broken."
Packit c5a612
                print_error(reason, filename, lineno)
Packit c5a612
                ret = -1
Packit c5a612
                error += 1
Packit c5a612
                if not force_all_family_option:
Packit c5a612
                    return [ret, warning, error, unit_tests]
Packit c5a612
                continue
Packit c5a612
Packit c5a612
            rule_output = output_clean(pre_output, chain)
Packit c5a612
            retest_output = False
Packit c5a612
            if len(rule) == 3:
Packit c5a612
                teoric_exit = rule[2]
Packit c5a612
                retest_output = True
Packit c5a612
            else:
Packit c5a612
                teoric_exit = rule[0]
Packit c5a612
Packit c5a612
            if rule_output.rstrip() != teoric_exit.rstrip():
Packit c5a612
                if rule[0].find("{") != -1:  # anonymous sets
Packit c5a612
                    if not set_check_element(teoric_exit.rstrip(),
Packit c5a612
                                         rule_output.rstrip()):
Packit c5a612
                        warning += 1
Packit c5a612
                        retest_output = True
Packit c5a612
                        print_differences_warning(filename, lineno,
Packit c5a612
                                                  teoric_exit.rstrip(),
Packit c5a612
                                                  rule_output, cmd)
Packit c5a612
                        if not force_all_family_option:
Packit c5a612
                            return [ret, warning, error, unit_tests]
Packit c5a612
                else:
Packit c5a612
                    if len(rule_output) <= 0:
Packit c5a612
                        error += 1
Packit c5a612
                        print_differences_error(filename, lineno, cmd)
Packit c5a612
                        if not force_all_family_option:
Packit c5a612
                            return [ret, warning, error, unit_tests]
Packit c5a612
Packit c5a612
                    warning += 1
Packit c5a612
                    retest_output = True
Packit c5a612
                    print_differences_warning(filename, lineno,
Packit c5a612
                                              teoric_exit.rstrip(),
Packit c5a612
                                              rule_output, cmd)
Packit c5a612
Packit c5a612
                    if not force_all_family_option:
Packit c5a612
                        return [ret, warning, error, unit_tests]
Packit c5a612
Packit c5a612
            if retest_output:
Packit c5a612
                table_flush(table, filename, lineno)
Packit c5a612
Packit c5a612
                # Add rule and check return code
Packit c5a612
                cmd = "add rule %s %s %s" % (table, chain, rule_output.rstrip())
Packit c5a612
                ret = execute_cmd(cmd, filename, lineno, payload_log, debug="netlink")
Packit c5a612
Packit c5a612
                if ret != 0:
Packit c5a612
                    test_state = "Replaying rule failed."
Packit c5a612
                    reason = cmd + ": " + test_state
Packit c5a612
                    print_warning(reason, filename, lineno)
Packit c5a612
                    ret = -1
Packit c5a612
                    error += 1
Packit c5a612
                    if not force_all_family_option:
Packit c5a612
                        return [ret, warning, error, unit_tests]
Packit c5a612
                # Check for matching payload
Packit c5a612
                elif not payload_check(table_payload_expected,
Packit c5a612
                                       payload_log, cmd):
Packit c5a612
                    error += 1
Packit c5a612
Packit c5a612
            if not enable_json_option:
Packit c5a612
                continue
Packit c5a612
Packit c5a612
            # Generate JSON equivalent for rule if not found
Packit c5a612
            if not json_input:
Packit c5a612
                json_old = nftables.set_json_output(True)
Packit c5a612
                rc, json_output, err = nftables.cmd(list_cmd)
Packit c5a612
                nftables.set_json_output(json_old)
Packit c5a612
Packit c5a612
                json_output = json.loads(json_output)
Packit c5a612
                for item in json_output["nftables"]:
Packit c5a612
                    if "rule" in item:
Packit c5a612
                        del(item["rule"]["handle"])
Packit c5a612
                        json_output = item["rule"]
Packit c5a612
                        break
Packit c5a612
                json_input = json.dumps(json_output["expr"], sort_keys = True)
Packit c5a612
Packit c5a612
                gotf = open("%s.json.got" % filename_path, 'a')
Packit c5a612
                jdump = json_dump_normalize(json_input, True)
Packit c5a612
                gotf.write("# %s\n%s\n\n" % (rule[0], jdump))
Packit c5a612
                gotf.close()
Packit c5a612
                print_warning("Wrote JSON equivalent for rule %s" % rule[0],
Packit c5a612
                              gotf.name, 1)
Packit c5a612
Packit c5a612
            table_flush(table, filename, lineno)
Packit c5a612
            payload_log = tempfile.TemporaryFile(mode="w+")
Packit c5a612
Packit c5a612
            # Add rule in JSON format
Packit c5a612
            cmd = json.dumps({ "nftables": [{ "add": { "rule": {
Packit c5a612
                    "family": table.family,
Packit c5a612
                    "table": table.name,
Packit c5a612
                    "chain": chain.name,
Packit c5a612
                    "expr": json.loads(json_input),
Packit c5a612
            }}}]})
Packit c5a612
Packit c5a612
            if enable_json_schema:
Packit c5a612
                json_validate(cmd)
Packit c5a612
Packit c5a612
            json_old = nftables.set_json_output(True)
Packit c5a612
            ret = execute_cmd(cmd, filename, lineno, payload_log, debug="netlink")
Packit c5a612
            nftables.set_json_output(json_old)
Packit c5a612
Packit c5a612
            if ret != 0:
Packit c5a612
                reason = "Failed to add JSON equivalent rule"
Packit c5a612
                print_error(reason, filename, lineno)
Packit c5a612
                continue
Packit c5a612
Packit c5a612
            # Check for matching payload
Packit c5a612
            if not payload_check(table_payload_expected, payload_log, cmd):
Packit c5a612
                error += 1
Packit c5a612
                gotf = open("%s.json.payload.got" % filename_path, 'a')
Packit c5a612
                payload_log.seek(0, 0)
Packit c5a612
                gotf.write("# %s\n" % rule[0])
Packit c5a612
                while True:
Packit c5a612
                    line = payload_log.readline()
Packit c5a612
                    if line == "":
Packit c5a612
                        break
Packit c5a612
                    gotf.write(line)
Packit c5a612
                gotf.close()
Packit c5a612
                print_warning("Wrote JSON payload for rule %s" % rule[0],
Packit c5a612
                              gotf.name, 1)
Packit c5a612
Packit c5a612
            # Check for matching ruleset listing
Packit c5a612
            numeric_proto_old = nftables.set_numeric_proto_output(True)
Packit c5a612
            stateless_old = nftables.set_stateless_output(True)
Packit c5a612
            json_old = nftables.set_json_output(True)
Packit c5a612
            rc, json_output, err = nftables.cmd(list_cmd)
Packit c5a612
            nftables.set_json_output(json_old)
Packit c5a612
            nftables.set_numeric_proto_output(numeric_proto_old)
Packit c5a612
            nftables.set_stateless_output(stateless_old)
Packit c5a612
Packit c5a612
            if enable_json_schema:
Packit c5a612
                json_validate(json_output)
Packit c5a612
Packit c5a612
            json_output = json.loads(json_output)
Packit c5a612
            for item in json_output["nftables"]:
Packit c5a612
                if "rule" in item:
Packit c5a612
                    del(item["rule"]["handle"])
Packit c5a612
                    json_output = item["rule"]
Packit c5a612
                    break
Packit c5a612
            json_output = json.dumps(json_output["expr"], sort_keys = True)
Packit c5a612
Packit c5a612
            if not json_expected and json_output != json_input:
Packit c5a612
                print_differences_warning(filename, lineno,
Packit c5a612
                                          json_input, json_output, cmd)
Packit c5a612
                error += 1
Packit c5a612
                gotf = open("%s.json.output.got" % filename_path, 'a')
Packit c5a612
                jdump = json_dump_normalize(json_output, True)
Packit c5a612
                gotf.write("# %s\n%s\n\n" % (rule[0], jdump))
Packit c5a612
                gotf.close()
Packit c5a612
                print_warning("Wrote JSON output for rule %s" % rule[0],
Packit c5a612
                              gotf.name, 1)
Packit c5a612
                # prevent further warnings and .got file updates
Packit c5a612
                json_expected = json_output
Packit c5a612
            elif json_expected and json_output != json_expected:
Packit c5a612
                print_differences_warning(filename, lineno,
Packit c5a612
                                          json_expected, json_output, cmd)
Packit c5a612
                error += 1
Packit c5a612
Packit c5a612
    return [ret, warning, error, unit_tests]
Packit c5a612
Packit c5a612
Packit c5a612
def cleanup_on_exit():
Packit c5a612
    for table in table_list:
Packit c5a612
        for table_chain in table.chains:
Packit c5a612
            chain = chain_get_by_name(table_chain)
Packit c5a612
            chain_delete(chain, table, "", "")
Packit c5a612
        if all_set:
Packit c5a612
            set_delete(table)
Packit c5a612
        if obj_list:
Packit c5a612
            obj_delete(table)
Packit c5a612
        table_delete(table)
Packit c5a612
Packit c5a612
Packit c5a612
def signal_handler(signal, frame):
Packit c5a612
    global signal_received
Packit c5a612
    signal_received = 1
Packit c5a612
Packit c5a612
Packit c5a612
def execute_cmd(cmd, filename, lineno, stdout_log=False, debug=False):
Packit c5a612
    '''
Packit c5a612
    Executes a command, checks for segfaults and returns the command exit
Packit c5a612
    code.
Packit c5a612
Packit c5a612
    :param cmd: string with the command to be executed
Packit c5a612
    :param filename: name of the file tested (used for print_error purposes)
Packit c5a612
    :param lineno: line number being tested (used for print_error purposes)
Packit c5a612
    :param stdout_log: redirect stdout to this file instead of global log_file
Packit c5a612
    :param debug: temporarily set these debug flags
Packit c5a612
    '''
Packit c5a612
    global log_file
Packit c5a612
    print("command: {}".format(cmd), file=log_file)
Packit c5a612
    if debug_option:
Packit c5a612
        print(cmd)
Packit c5a612
Packit c5a612
    if debug:
Packit c5a612
        debug_old = nftables.get_debug()
Packit c5a612
        nftables.set_debug(debug)
Packit c5a612
Packit c5a612
    ret, out, err = nftables.cmd(cmd)
Packit c5a612
Packit c5a612
    if not stdout_log:
Packit c5a612
        stdout_log = log_file
Packit c5a612
Packit c5a612
    stdout_log.write(out)
Packit c5a612
    stdout_log.flush()
Packit c5a612
    log_file.write(err)
Packit c5a612
    log_file.flush()
Packit c5a612
Packit c5a612
    if debug:
Packit c5a612
        nftables.set_debug(debug_old)
Packit c5a612
Packit c5a612
    return ret
Packit c5a612
Packit c5a612
Packit c5a612
def print_result(filename, tests, warning, error):
Packit c5a612
    return str(filename) + ": " + str(tests) + " unit tests, " + str(error) + \
Packit c5a612
           " error, " + str(warning) + " warning"
Packit c5a612
Packit c5a612
Packit c5a612
def print_result_all(filename, tests, warning, error, unit_tests):
Packit c5a612
    return str(filename) + ": " + str(tests) + " unit tests, " + \
Packit c5a612
           str(unit_tests) + " total test executed, " + str(error) + \
Packit c5a612
           " error, " + str(warning) + " warning"
Packit c5a612
Packit c5a612
Packit c5a612
def table_process(table_line, filename, lineno):
Packit c5a612
    table_info = table_line.split(";")
Packit c5a612
    table = Table(table_info[0], table_info[1], table_info[2].split(","))
Packit c5a612
Packit c5a612
    return table_create(table, filename, lineno)
Packit c5a612
Packit c5a612
Packit c5a612
def chain_process(chain_line, lineno):
Packit c5a612
    chain_info = chain_line.split(";")
Packit c5a612
    chain_list.append(Chain(chain_info[0], chain_info[1], lineno))
Packit c5a612
Packit c5a612
    return 0
Packit c5a612
Packit c5a612
Packit c5a612
def set_process(set_line, filename, lineno):
Packit c5a612
    test_result = set_line[1]
Packit c5a612
    timeout=""
Packit c5a612
Packit c5a612
    tokens = set_line[0].split(" ")
Packit c5a612
    set_name = tokens[0]
Packit c5a612
    set_type = tokens[2]
Packit c5a612
    set_flags = ""
Packit c5a612
Packit c5a612
    i = 3
Packit c5a612
    while len(tokens) > i and tokens[i] == ".":
Packit c5a612
        set_type += " . " + tokens[i+1]
Packit c5a612
        i += 2
Packit c5a612
Packit c5a612
    if len(tokens) == i+2 and tokens[i] == "timeout":
Packit c5a612
        timeout = "timeout " + tokens[i+1] + ";"
Packit c5a612
        i += 2
Packit c5a612
Packit c5a612
    if len(tokens) == i+2 and tokens[i] == "flags":
Packit c5a612
        set_flags = tokens[i+1]
Packit c5a612
    elif len(tokens) != i:
Packit c5a612
        print_error(set_name + " bad flag: " + tokens[i], filename, lineno)
Packit c5a612
Packit c5a612
    s = Set("", "", set_name, set_type, timeout, set_flags)
Packit c5a612
Packit c5a612
    ret = set_add(s, test_result, filename, lineno)
Packit c5a612
    if ret == 0:
Packit c5a612
        all_set[set_name] = set()
Packit c5a612
Packit c5a612
    return ret
Packit c5a612
Packit c5a612
Packit c5a612
def set_element_process(element_line, filename, lineno):
Packit c5a612
    rule_state = element_line[1]
Packit c5a612
    element_line = element_line[0]
Packit c5a612
    space = element_line.find(" ")
Packit c5a612
    set_name = element_line[:space]
Packit c5a612
    set_element = element_line[space:].split(",")
Packit c5a612
Packit c5a612
    return set_add_elements(set_element, set_name, rule_state, filename, lineno)
Packit c5a612
Packit c5a612
Packit c5a612
def obj_process(obj_line, filename, lineno):
Packit c5a612
    test_result = obj_line[1]
Packit c5a612
Packit c5a612
    tokens = obj_line[0].split(" ")
Packit c5a612
    obj_name = tokens[0]
Packit c5a612
    obj_type = tokens[2]
Packit c5a612
    obj_spcf = ""
Packit c5a612
Packit c5a612
    if obj_type == "ct" and tokens[3] == "helper":
Packit c5a612
       obj_type = "ct helper"
Packit c5a612
       tokens[3] = ""
Packit c5a612
Packit c5a612
    if obj_type == "ct" and tokens[3] == "timeout":
Packit c5a612
       obj_type = "ct timeout"
Packit c5a612
       tokens[3] = ""
Packit c5a612
Packit c5a612
    if obj_type == "ct" and tokens[3] == "expectation":
Packit c5a612
       obj_type = "ct expectation"
Packit c5a612
       tokens[3] = ""
Packit c5a612
Packit c5a612
    if len(tokens) > 3:
Packit c5a612
        obj_spcf = " ".join(tokens[3:])
Packit c5a612
Packit c5a612
    o = Obj("", "", obj_name, obj_type, obj_spcf)
Packit c5a612
Packit c5a612
    ret = obj_add(o, test_result, filename, lineno)
Packit c5a612
    if ret == 0:
Packit c5a612
        obj_list.append(o)
Packit c5a612
Packit c5a612
    return ret
Packit c5a612
Packit c5a612
Packit c5a612
def payload_find_expected(payload_log, rule):
Packit c5a612
    '''
Packit c5a612
    Find the netlink payload that should be generated by given rule in
Packit c5a612
    payload_log
Packit c5a612
Packit c5a612
    :param payload_log: open file handle of the payload data
Packit c5a612
    :param rule: nft rule we are going to add
Packit c5a612
    '''
Packit c5a612
    found = 0
Packit c5a612
    payload_buffer = []
Packit c5a612
Packit c5a612
    while True:
Packit c5a612
        line = payload_log.readline()
Packit c5a612
        if not line:
Packit c5a612
            break
Packit c5a612
Packit c5a612
        if line[0] == "#":  # rule start
Packit c5a612
            rule_line = line.strip()[2:]
Packit c5a612
Packit c5a612
            if rule_line == rule.strip():
Packit c5a612
                found = 1
Packit c5a612
                continue
Packit c5a612
Packit c5a612
        if found == 1:
Packit c5a612
            payload_buffer.append(line)
Packit c5a612
            if line.isspace():
Packit c5a612
                return payload_buffer
Packit c5a612
Packit c5a612
    payload_log.seek(0, 0)
Packit c5a612
    return payload_buffer
Packit c5a612
Packit c5a612
Packit c5a612
def json_find_expected(json_log, rule):
Packit c5a612
    '''
Packit c5a612
    Find the corresponding JSON for given rule
Packit c5a612
Packit c5a612
    :param json_log: open file handle of the json data
Packit c5a612
    :param rule: nft rule we are going to add
Packit c5a612
    '''
Packit c5a612
    found = 0
Packit c5a612
    json_buffer = ""
Packit c5a612
Packit c5a612
    while True:
Packit c5a612
        line = json_log.readline()
Packit c5a612
        if not line:
Packit c5a612
            break
Packit c5a612
Packit c5a612
        if line[0] == "#":  # rule start
Packit c5a612
            rule_line = line.strip()[2:]
Packit c5a612
Packit c5a612
            if rule_line == rule.strip():
Packit c5a612
                found = 1
Packit c5a612
                continue
Packit c5a612
Packit c5a612
        if found == 1:
Packit c5a612
            json_buffer += line.rstrip("\n").strip()
Packit c5a612
            if line.isspace():
Packit c5a612
                return json_buffer
Packit c5a612
Packit c5a612
    json_log.seek(0, 0)
Packit c5a612
    return json_buffer
Packit c5a612
Packit c5a612
Packit c5a612
def run_test_file(filename, force_all_family_option, specific_file):
Packit c5a612
    '''
Packit c5a612
    Runs a test file
Packit c5a612
Packit c5a612
    :param filename: name of the file with the test rules
Packit c5a612
    '''
Packit c5a612
    filename_path = os.path.join(TESTS_PATH, filename)
Packit c5a612
    f = open(filename_path)
Packit c5a612
    tests = passed = total_unit_run = total_warning = total_error = 0
Packit c5a612
Packit c5a612
    for lineno, line in enumerate(f):
Packit c5a612
        sys.stdout.flush()
Packit c5a612
Packit c5a612
        if signal_received == 1:
Packit c5a612
            print("\nSignal received. Cleaning up and Exitting...")
Packit c5a612
            cleanup_on_exit()
Packit c5a612
            sys.exit(0)
Packit c5a612
Packit c5a612
        if line.isspace():
Packit c5a612
            continue
Packit c5a612
Packit c5a612
        if line[0] == "#":  # Command-line
Packit c5a612
            continue
Packit c5a612
Packit c5a612
        if line[0] == '*':  # Table
Packit c5a612
            table_line = line.rstrip()[1:]
Packit c5a612
            ret = table_process(table_line, filename, lineno)
Packit c5a612
            if ret != 0:
Packit c5a612
                break
Packit c5a612
            continue
Packit c5a612
Packit c5a612
        if line[0] == ":":  # Chain
Packit c5a612
            chain_line = line.rstrip()[1:]
Packit c5a612
            ret = chain_process(chain_line, lineno)
Packit c5a612
            if ret != 0:
Packit c5a612
                break
Packit c5a612
            continue
Packit c5a612
Packit c5a612
        if line[0] == "!":  # Adds this set
Packit c5a612
            set_line = line.rstrip()[1:].split(";")
Packit c5a612
            ret = set_process(set_line, filename, lineno)
Packit c5a612
            tests += 1
Packit c5a612
            if ret == -1:
Packit c5a612
                continue
Packit c5a612
            passed += 1
Packit c5a612
            continue
Packit c5a612
Packit c5a612
        if line[0] == "?":  # Adds elements in a set
Packit c5a612
            element_line = line.rstrip()[1:].split(";")
Packit c5a612
            ret = set_element_process(element_line, filename, lineno)
Packit c5a612
            tests += 1
Packit c5a612
            if ret == -1:
Packit c5a612
                continue
Packit c5a612
Packit c5a612
            passed += 1
Packit c5a612
            continue
Packit c5a612
Packit c5a612
        if line[0] == "%":  # Adds this object
Packit c5a612
            brace = line.rfind("}")
Packit c5a612
            if brace < 0:
Packit c5a612
                obj_line = line.rstrip()[1:].split(";")
Packit c5a612
            else:
Packit c5a612
                obj_line = (line[1:brace+1], line[brace+2:].rstrip())
Packit c5a612
Packit c5a612
            ret = obj_process(obj_line, filename, lineno)
Packit c5a612
            tests += 1
Packit c5a612
            if ret == -1:
Packit c5a612
                continue
Packit c5a612
            passed += 1
Packit c5a612
            continue
Packit c5a612
Packit c5a612
        # Rule
Packit c5a612
        rule = line.split(';')  # rule[1] Ok or FAIL
Packit c5a612
        if len(rule) == 1 or len(rule) > 3 or rule[1].rstrip() \
Packit c5a612
                not in {"ok", "fail"}:
Packit c5a612
            reason = "Skipping malformed rule test. (%s)" % line.rstrip('\n')
Packit c5a612
            print_warning(reason, filename, lineno)
Packit c5a612
            continue
Packit c5a612
Packit c5a612
        if line[0] == "-":  # Run omitted lines
Packit c5a612
            if need_fix_option:
Packit c5a612
                rule[0] = rule[0].rstrip()[1:].strip()
Packit c5a612
            else:
Packit c5a612
                continue
Packit c5a612
        elif need_fix_option:
Packit c5a612
            continue
Packit c5a612
Packit c5a612
        result = rule_add(rule, filename, lineno, force_all_family_option,
Packit c5a612
                          filename_path)
Packit c5a612
        tests += 1
Packit c5a612
        ret = result[0]
Packit c5a612
        warning = result[1]
Packit c5a612
        total_warning += warning
Packit c5a612
        total_error += result[2]
Packit c5a612
        total_unit_run += result[3]
Packit c5a612
Packit c5a612
        if ret != 0:
Packit c5a612
            continue
Packit c5a612
Packit c5a612
        if warning == 0:  # All ok.
Packit c5a612
            passed += 1
Packit c5a612
Packit c5a612
    # Delete rules, sets, chains and tables
Packit c5a612
    for table in table_list:
Packit c5a612
        # We delete chains
Packit c5a612
        for table_chain in table.chains:
Packit c5a612
            chain = chain_get_by_name(table_chain)
Packit c5a612
            chain_delete(chain, table, filename, lineno)
Packit c5a612
Packit c5a612
        # We delete sets.
Packit c5a612
        if all_set:
Packit c5a612
            ret = set_delete(table, filename, lineno)
Packit c5a612
            if ret != 0:
Packit c5a612
                reason = "There is a problem when we delete a set"
Packit c5a612
                print_error(reason, filename, lineno)
Packit c5a612
Packit c5a612
        # We delete tables.
Packit c5a612
        table_delete(table, filename, lineno)
Packit c5a612
Packit c5a612
    if specific_file:
Packit c5a612
        if force_all_family_option:
Packit c5a612
            print(print_result_all(filename, tests, total_warning, total_error,
Packit c5a612
                                   total_unit_run))
Packit c5a612
        else:
Packit c5a612
            print(print_result(filename, tests, total_warning, total_error))
Packit c5a612
    else:
Packit c5a612
        if tests == passed and tests > 0:
Packit c5a612
            print(filename + ": " + Colors.GREEN + "OK" + Colors.ENDC)
Packit c5a612
Packit c5a612
    f.close()
Packit c5a612
    del table_list[:]
Packit c5a612
    del chain_list[:]
Packit c5a612
    all_set.clear()
Packit c5a612
Packit c5a612
    return [tests, passed, total_warning, total_error, total_unit_run]
Packit c5a612
Packit c5a612
Packit c5a612
def main():
Packit c5a612
    parser = argparse.ArgumentParser(description='Run nft tests')
Packit c5a612
Packit c5a612
    parser.add_argument('filenames', nargs='*', metavar='path/to/file.t',
Packit c5a612
                        help='Run only these tests')
Packit c5a612
Packit c5a612
    parser.add_argument('-d', '--debug', action='store_true', dest='debug',
Packit c5a612
                        help='enable debugging mode')
Packit c5a612
Packit c5a612
    parser.add_argument('-e', '--need-fix', action='store_true',
Packit c5a612
                        dest='need_fix_line', help='run rules that need a fix')
Packit c5a612
Packit c5a612
    parser.add_argument('-f', '--force-family', action='store_true',
Packit c5a612
                        dest='force_all_family',
Packit c5a612
                        help='keep testing all families on error')
Packit c5a612
Packit c5a612
    parser.add_argument('-j', '--enable-json', action='store_true',
Packit c5a612
                        dest='enable_json',
Packit c5a612
                        help='test JSON functionality as well')
Packit c5a612
Packit c5a612
    parser.add_argument('-s', '--schema', action='store_true',
Packit c5a612
                        dest='enable_schema',
Packit c5a612
                        help='verify json input/output against schema')
Packit c5a612
Packit c5a612
    parser.add_argument('-v', '--version', action='version',
Packit c5a612
                        version='1.0',
Packit c5a612
                        help='Print the version information')
Packit c5a612
Packit c5a612
    args = parser.parse_args()
Packit c5a612
    global debug_option, need_fix_option, enable_json_option, enable_json_schema
Packit c5a612
    debug_option = args.debug
Packit c5a612
    need_fix_option = args.need_fix_line
Packit c5a612
    force_all_family_option = args.force_all_family
Packit c5a612
    enable_json_option = args.enable_json
Packit c5a612
    enable_json_schema = args.enable_schema
Packit c5a612
    specific_file = False
Packit c5a612
Packit c5a612
    signal.signal(signal.SIGINT, signal_handler)
Packit c5a612
    signal.signal(signal.SIGTERM, signal_handler)
Packit c5a612
Packit c5a612
    if os.getuid() != 0:
Packit c5a612
        print("You need to be root to run this, sorry")
Packit c5a612
        return
Packit c5a612
Packit c5a612
    # Change working directory to repository root
Packit c5a612
    os.chdir(TESTS_PATH + "/../..")
Packit c5a612
Packit Service 6f0138
    if not os.path.exists('src/.libs/libnftables.so'):
Packit Service 6f0138
        print("The nftables library does not exist. "
Packit Service 6f0138
              "You need to build the project.")
Packit c5a612
        return
Packit c5a612
Packit c5a612
    if args.enable_schema and not args.enable_json:
Packit c5a612
        print_error("Option --schema requires option --json")
Packit c5a612
        return
Packit c5a612
Packit c5a612
    global nftables
Packit Service 6f0138
    nftables = Nftables(sofile = 'src/.libs/libnftables.so')
Packit c5a612
Packit c5a612
    test_files = files_ok = run_total = 0
Packit c5a612
    tests = passed = warnings = errors = 0
Packit c5a612
    global log_file
Packit c5a612
    try:
Packit c5a612
        log_file = open(LOGFILE, 'w')
Packit c5a612
        print_info("Log will be available at %s" % LOGFILE)
Packit c5a612
    except IOError:
Packit c5a612
        print_error("Cannot open log file %s" % LOGFILE)
Packit c5a612
        return
Packit c5a612
Packit c5a612
    file_list = []
Packit c5a612
    if args.filenames:
Packit c5a612
        file_list = args.filenames
Packit c5a612
        if len(args.filenames) == 1:
Packit c5a612
            specific_file = True
Packit c5a612
    else:
Packit c5a612
        for directory in TESTS_DIRECTORY:
Packit c5a612
            path = os.path.join(TESTS_PATH, directory)
Packit c5a612
            for root, dirs, files in os.walk(path):
Packit c5a612
                for f in files:
Packit c5a612
                    if f.endswith(".t"):
Packit c5a612
                        file_list.append(os.path.join(directory, f))
Packit c5a612
Packit c5a612
    for filename in file_list:
Packit c5a612
        result = run_test_file(filename, force_all_family_option, specific_file)
Packit c5a612
        file_tests = result[0]
Packit c5a612
        file_passed = result[1]
Packit c5a612
        file_warnings = result[2]
Packit c5a612
        file_errors = result[3]
Packit c5a612
        file_unit_run = result[4]
Packit c5a612
Packit c5a612
        test_files += 1
Packit c5a612
Packit c5a612
        if file_warnings == 0 and file_tests == file_passed:
Packit c5a612
            files_ok += 1
Packit c5a612
        if file_tests:
Packit c5a612
            tests += file_tests
Packit c5a612
            passed += file_passed
Packit c5a612
            errors += file_errors
Packit c5a612
            warnings += file_warnings
Packit c5a612
        if force_all_family_option:
Packit c5a612
            run_total += file_unit_run
Packit c5a612
Packit c5a612
    if test_files == 0:
Packit c5a612
        print("No test files to run")
Packit c5a612
    else:
Packit c5a612
        if not specific_file:
Packit c5a612
            if force_all_family_option:
Packit c5a612
                print("%d test files, %d files passed, %d unit tests, " % (test_files, files_ok, tests))
Packit c5a612
                print("%d total executed, %d error, %d warning" % (run_total, errors,warnings))
Packit c5a612
            else:
Packit c5a612
                print("%d test files, %d files passed, %d unit tests, " % (test_files, files_ok, tests))
Packit c5a612
                print("%d error, %d warning" % (errors, warnings))
Packit c5a612
Packit c5a612
if __name__ == '__main__':
Packit c5a612
    main()