Blob Blame History Raw
import sys
import json
from collections import defaultdict
from optparse import OptionParser

class Parser(object):
    DEFAULTS = {int:0,
                str:'',
                list:[]}

    def __init__(self, defconf=None):
        self.defaults = None
        if defconf is not None:
            self.defaults = self.flatten(defconf)

    def run(self, logs, verbose=None):
        result = self.parse(logs)
        if len(result) != len(self.defaults):
            diff = set(self.defaults.keys()).difference(result.keys())
            print('Test failed.')
            print('The following attributes were not set:')
            for it in diff:
                print(it)
            sys.exit(1)

    def flatten(self, defaults):
        """
        Flattens pathes to attributes.

        Parameters
        ----------
        defaults : a dictionaries populated with default values

        Returns :
        dict : with flattened attributes
        """
        result = dict()
        for path,value in self._walk(defaults):
            if path in result:
                print('Warning: attribute path %s already exists' % path)
            result[path] = value

        return result

    def parse(self, logs):
        result = defaultdict(list)
        for msg in logs:
            # each message is treated as a dictionary of dictionaries
            for a,v in self._walk(msg):
                # see if path is registered in defaults
                if a in self.defaults:
                    dv = self.defaults.get(a)
                    if dv is None:
                        # determine default value by type
                        if v is not None:
                            dv = self.DEFAULTS[type(v)]
                        else:
                            print('Warning: attribute %s is set to None' % a)
                            continue
                    # by now we have default value
                    if v != dv:
                        # test passed
                        result[a].append(v)
        return result

    def _walk(self, adict):
        """
        Generator that works through dictionary.
        """
        for a,v in adict.items():
            if isinstance(v,dict):
                for (attrpath,u) in self._walk(v):
                    yield (a+'.'+attrpath,u)
            else:
                yield (a,v)


if __name__ == '__main__':

    parser = OptionParser()
    parser.add_option("-i", "--logfile", dest="filename",
                  help="input log file in json fmt", metavar="FILE")
    parser.add_option("-d", "--defaults", dest="defaults",
                  help="dictionary with defaults", metavar="FILE")

    (options, args) = parser.parse_args()
    if options.filename is not None:
        with open(options.filename, 'r') as f:
            content = list()
            for l in f:
                content.append(json.loads(l.rstrip()))
        f.close()
    else:
        print('Input file in JSON format is required')
        exit()

    defaults = None
    if options.defaults is not None:
        with open(options.defaults, 'r') as f:
            defaults = json.load(f)

    # run test
    p = Parser(defaults)
    p.run(content)
    exit()