Blame src/tests/jsonwalker.py

Packit fd8b60
import sys
Packit fd8b60
import json
Packit fd8b60
from collections import defaultdict
Packit fd8b60
from optparse import OptionParser
Packit fd8b60
Packit fd8b60
class Parser(object):
Packit fd8b60
    DEFAULTS = {int:0,
Packit fd8b60
                str:'',
Packit fd8b60
                list:[]}
Packit fd8b60
Packit fd8b60
    def __init__(self, defconf=None):
Packit fd8b60
        self.defaults = None
Packit fd8b60
        if defconf is not None:
Packit fd8b60
            self.defaults = self.flatten(defconf)
Packit fd8b60
Packit fd8b60
    def run(self, logs, verbose=None):
Packit fd8b60
        result = self.parse(logs)
Packit fd8b60
        if len(result) != len(self.defaults):
Packit fd8b60
            diff = set(self.defaults.keys()).difference(result.keys())
Packit fd8b60
            print('Test failed.')
Packit fd8b60
            print('The following attributes were not set:')
Packit fd8b60
            for it in diff:
Packit fd8b60
                print(it)
Packit fd8b60
            sys.exit(1)
Packit fd8b60
Packit fd8b60
    def flatten(self, defaults):
Packit fd8b60
        """
Packit fd8b60
        Flattens pathes to attributes.
Packit fd8b60
Packit fd8b60
        Parameters
Packit fd8b60
        ----------
Packit fd8b60
        defaults : a dictionaries populated with default values
Packit fd8b60
Packit fd8b60
        Returns :
Packit fd8b60
        dict : with flattened attributes
Packit fd8b60
        """
Packit fd8b60
        result = dict()
Packit fd8b60
        for path,value in self._walk(defaults):
Packit fd8b60
            if path in result:
Packit fd8b60
                print('Warning: attribute path %s already exists' % path)
Packit fd8b60
            result[path] = value
Packit fd8b60
Packit fd8b60
        return result
Packit fd8b60
Packit fd8b60
    def parse(self, logs):
Packit fd8b60
        result = defaultdict(list)
Packit fd8b60
        for msg in logs:
Packit fd8b60
            # each message is treated as a dictionary of dictionaries
Packit fd8b60
            for a,v in self._walk(msg):
Packit fd8b60
                # see if path is registered in defaults
Packit fd8b60
                if a in self.defaults:
Packit fd8b60
                    dv = self.defaults.get(a)
Packit fd8b60
                    if dv is None:
Packit fd8b60
                        # determine default value by type
Packit fd8b60
                        if v is not None:
Packit fd8b60
                            dv = self.DEFAULTS[type(v)]
Packit fd8b60
                        else:
Packit fd8b60
                            print('Warning: attribute %s is set to None' % a)
Packit fd8b60
                            continue
Packit fd8b60
                    # by now we have default value
Packit fd8b60
                    if v != dv:
Packit fd8b60
                        # test passed
Packit fd8b60
                        result[a].append(v)
Packit fd8b60
        return result
Packit fd8b60
Packit fd8b60
    def _walk(self, adict):
Packit fd8b60
        """
Packit fd8b60
        Generator that works through dictionary.
Packit fd8b60
        """
Packit fd8b60
        for a,v in adict.items():
Packit fd8b60
            if isinstance(v,dict):
Packit fd8b60
                for (attrpath,u) in self._walk(v):
Packit fd8b60
                    yield (a+'.'+attrpath,u)
Packit fd8b60
            else:
Packit fd8b60
                yield (a,v)
Packit fd8b60
Packit fd8b60
Packit fd8b60
if __name__ == '__main__':
Packit fd8b60
Packit fd8b60
    parser = OptionParser()
Packit fd8b60
    parser.add_option("-i", "--logfile", dest="filename",
Packit fd8b60
                  help="input log file in json fmt", metavar="FILE")
Packit fd8b60
    parser.add_option("-d", "--defaults", dest="defaults",
Packit fd8b60
                  help="dictionary with defaults", metavar="FILE")
Packit fd8b60
Packit fd8b60
    (options, args) = parser.parse_args()
Packit fd8b60
    if options.filename is not None:
Packit fd8b60
        with open(options.filename, 'r') as f:
Packit fd8b60
            content = list()
Packit fd8b60
            for l in f:
Packit fd8b60
                content.append(json.loads(l.rstrip()))
Packit fd8b60
        f.close()
Packit fd8b60
    else:
Packit fd8b60
        print('Input file in JSON format is required')
Packit fd8b60
        exit()
Packit fd8b60
Packit fd8b60
    defaults = None
Packit fd8b60
    if options.defaults is not None:
Packit fd8b60
        with open(options.defaults, 'r') as f:
Packit fd8b60
            defaults = json.load(f)
Packit fd8b60
Packit fd8b60
    # run test
Packit fd8b60
    p = Parser(defaults)
Packit fd8b60
    p.run(content)
Packit fd8b60
    exit()