Blame tests/unittests/test_merging.py

Packit Service a04d08
# This file is part of cloud-init. See LICENSE file for license information.
Packit Service a04d08
Packit Service a04d08
from cloudinit.tests import helpers
Packit Service a04d08
Packit Service a04d08
from cloudinit.handlers import cloud_config
Packit Service a04d08
from cloudinit.handlers import (CONTENT_START, CONTENT_END)
Packit Service a04d08
Packit Service a04d08
from cloudinit import helpers as c_helpers
Packit Service a04d08
from cloudinit import util
Packit Service a04d08
Packit Service a04d08
import collections
Packit Service a04d08
import glob
Packit Service a04d08
import os
Packit Service a04d08
import random
Packit Service a04d08
import re
Packit Service a04d08
import string
Packit Service a04d08
Packit Service a04d08
SOURCE_PAT = "source*.*yaml"
Packit Service a04d08
EXPECTED_PAT = "expected%s.yaml"
Packit Service 9bfd13
TYPES = [dict, str, list, tuple, None, int]
Packit Service a04d08
Packit Service a04d08
Packit Service a04d08
def _old_mergedict(src, cand):
Packit Service a04d08
    """
Packit Service a04d08
    Merge values from C{cand} into C{src}.
Packit Service a04d08
    If C{src} has a key C{cand} will not override.
Packit Service a04d08
    Nested dictionaries are merged recursively.
Packit Service a04d08
    """
Packit Service a04d08
    if isinstance(src, dict) and isinstance(cand, dict):
Packit Service a04d08
        for (k, v) in cand.items():
Packit Service a04d08
            if k not in src:
Packit Service a04d08
                src[k] = v
Packit Service a04d08
            else:
Packit Service a04d08
                src[k] = _old_mergedict(src[k], v)
Packit Service a04d08
    return src
Packit Service a04d08
Packit Service a04d08
Packit Service a04d08
def _old_mergemanydict(*args):
Packit Service a04d08
    out = {}
Packit Service a04d08
    for a in args:
Packit Service a04d08
        out = _old_mergedict(out, a)
Packit Service a04d08
    return out
Packit Service a04d08
Packit Service a04d08
Packit Service a04d08
def _random_str(rand):
Packit Service a04d08
    base = ''
Packit Service a04d08
    for _i in range(rand.randint(1, 2 ** 8)):
Packit Service a04d08
        base += rand.choice(string.ascii_letters + string.digits)
Packit Service a04d08
    return base
Packit Service a04d08
Packit Service a04d08
Packit Service a04d08
class _NoMoreException(Exception):
Packit Service a04d08
    pass
Packit Service a04d08
Packit Service a04d08
Packit Service a04d08
def _make_dict(current_depth, max_depth, rand):
Packit Service a04d08
    if current_depth >= max_depth:
Packit Service a04d08
        raise _NoMoreException()
Packit Service a04d08
    if current_depth == 0:
Packit Service a04d08
        t = dict
Packit Service a04d08
    else:
Packit Service a04d08
        t = rand.choice(TYPES)
Packit Service a04d08
    base = None
Packit Service a04d08
    if t in [None]:
Packit Service a04d08
        return base
Packit Service a04d08
    if t in [dict, list, tuple]:
Packit Service a04d08
        if t in [dict]:
Packit Service a04d08
            amount = rand.randint(0, 5)
Packit Service a04d08
            keys = [_random_str(rand) for _i in range(0, amount)]
Packit Service a04d08
            base = {}
Packit Service a04d08
            for k in keys:
Packit Service a04d08
                try:
Packit Service a04d08
                    base[k] = _make_dict(current_depth + 1, max_depth, rand)
Packit Service a04d08
                except _NoMoreException:
Packit Service a04d08
                    pass
Packit Service a04d08
        elif t in [list, tuple]:
Packit Service a04d08
            base = []
Packit Service a04d08
            amount = rand.randint(0, 5)
Packit Service a04d08
            for _i in range(0, amount):
Packit Service a04d08
                try:
Packit Service a04d08
                    base.append(_make_dict(current_depth + 1, max_depth, rand))
Packit Service a04d08
                except _NoMoreException:
Packit Service a04d08
                    pass
Packit Service a04d08
            if t in [tuple]:
Packit Service a04d08
                base = tuple(base)
Packit Service 9bfd13
    elif t in [int]:
Packit Service a04d08
        base = rand.randint(0, 2 ** 8)
Packit Service a04d08
    elif t in [str]:
Packit Service a04d08
        base = _random_str(rand)
Packit Service a04d08
    return base
Packit Service a04d08
Packit Service a04d08
Packit Service a04d08
def make_dict(max_depth, seed=None):
Packit Service a04d08
    max_depth = max(1, max_depth)
Packit Service a04d08
    rand = random.Random(seed)
Packit Service a04d08
    return _make_dict(0, max_depth, rand)
Packit Service a04d08
Packit Service a04d08
Packit Service a04d08
class TestSimpleRun(helpers.ResourceUsingTestCase):
Packit Service a04d08
    def _load_merge_files(self):
Packit Service a04d08
        merge_root = helpers.resourceLocation('merge_sources')
Packit Service a04d08
        tests = []
Packit Service a04d08
        source_ids = collections.defaultdict(list)
Packit Service a04d08
        expected_files = {}
Packit Service a04d08
        for fn in glob.glob(os.path.join(merge_root, SOURCE_PAT)):
Packit Service a04d08
            base_fn = os.path.basename(fn)
Packit Service a04d08
            file_id = re.match(r"source(\d+)\-(\d+)[.]yaml", base_fn)
Packit Service a04d08
            if not file_id:
Packit Service a04d08
                raise IOError("File %s does not have a numeric identifier"
Packit Service a04d08
                              % (fn))
Packit Service a04d08
            file_id = int(file_id.group(1))
Packit Service a04d08
            source_ids[file_id].append(fn)
Packit Service a04d08
            expected_fn = os.path.join(merge_root, EXPECTED_PAT % (file_id))
Packit Service a04d08
            if not os.path.isfile(expected_fn):
Packit Service a04d08
                raise IOError("No expected file found at %s" % (expected_fn))
Packit Service a04d08
            expected_files[file_id] = expected_fn
Packit Service a04d08
        for i in sorted(source_ids.keys()):
Packit Service a04d08
            source_file_contents = []
Packit Service a04d08
            for fn in sorted(source_ids[i]):
Packit Service a04d08
                source_file_contents.append([fn, util.load_file(fn)])
Packit Service a04d08
            expected = util.load_yaml(util.load_file(expected_files[i]))
Packit Service a04d08
            entry = [source_file_contents, [expected, expected_files[i]]]
Packit Service a04d08
            tests.append(entry)
Packit Service a04d08
        return tests
Packit Service a04d08
Packit Service a04d08
    def test_seed_runs(self):
Packit Service a04d08
        test_dicts = []
Packit Service a04d08
        for i in range(1, 10):
Packit Service a04d08
            base_dicts = []
Packit Service a04d08
            for j in range(1, 10):
Packit Service a04d08
                base_dicts.append(make_dict(5, i * j))
Packit Service a04d08
            test_dicts.append(base_dicts)
Packit Service a04d08
        for test in test_dicts:
Packit Service a04d08
            c = _old_mergemanydict(*test)
Packit Service a04d08
            d = util.mergemanydict(test)
Packit Service a04d08
            self.assertEqual(c, d)
Packit Service a04d08
Packit Service a04d08
    def test_merge_cc_samples(self):
Packit Service a04d08
        tests = self._load_merge_files()
Packit Service a04d08
        paths = c_helpers.Paths({})
Packit Service a04d08
        cc_handler = cloud_config.CloudConfigPartHandler(paths)
Packit Service a04d08
        cc_handler.cloud_fn = None
Packit Service a04d08
        for (payloads, (expected_merge, expected_fn)) in tests:
Packit Service a04d08
            cc_handler.handle_part(None, CONTENT_START, None,
Packit Service a04d08
                                   None, None, None)
Packit Service a04d08
            merging_fns = []
Packit Service a04d08
            for (fn, contents) in payloads:
Packit Service a04d08
                cc_handler.handle_part(None, None, "%s.yaml" % (fn),
Packit Service a04d08
                                       contents, None, {})
Packit Service a04d08
                merging_fns.append(fn)
Packit Service a04d08
            merged_buf = cc_handler.cloud_buf
Packit Service a04d08
            cc_handler.handle_part(None, CONTENT_END, None,
Packit Service a04d08
                                   None, None, None)
Packit Service a04d08
            fail_msg = "Equality failure on checking %s with %s: %s != %s"
Packit Service a04d08
            fail_msg = fail_msg % (expected_fn,
Packit Service a04d08
                                   ",".join(merging_fns), merged_buf,
Packit Service a04d08
                                   expected_merge)
Packit Service a04d08
            self.assertEqual(expected_merge, merged_buf, msg=fail_msg)
Packit Service a04d08
Packit Service a04d08
    def test_compat_merges_dict(self):
Packit Service a04d08
        a = {
Packit Service a04d08
            '1': '2',
Packit Service a04d08
            'b': 'c',
Packit Service a04d08
        }
Packit Service a04d08
        b = {
Packit Service a04d08
            'b': 'e',
Packit Service a04d08
        }
Packit Service a04d08
        c = _old_mergedict(a, b)
Packit Service a04d08
        d = util.mergemanydict([a, b])
Packit Service a04d08
        self.assertEqual(c, d)
Packit Service a04d08
Packit Service a04d08
    def test_compat_merges_dict2(self):
Packit Service a04d08
        a = {
Packit Service a04d08
            'Blah': 1,
Packit Service a04d08
            'Blah2': 2,
Packit Service a04d08
            'Blah3': 3,
Packit Service a04d08
        }
Packit Service a04d08
        b = {
Packit Service a04d08
            'Blah': 1,
Packit Service a04d08
            'Blah2': 2,
Packit Service a04d08
            'Blah3': [1],
Packit Service a04d08
        }
Packit Service a04d08
        c = _old_mergedict(a, b)
Packit Service a04d08
        d = util.mergemanydict([a, b])
Packit Service a04d08
        self.assertEqual(c, d)
Packit Service a04d08
Packit Service a04d08
    def test_compat_merges_list(self):
Packit Service a04d08
        a = {'b': [1, 2, 3]}
Packit Service a04d08
        b = {'b': [4, 5]}
Packit Service a04d08
        c = {'b': [6, 7]}
Packit Service a04d08
        e = _old_mergemanydict(a, b, c)
Packit Service a04d08
        f = util.mergemanydict([a, b, c])
Packit Service a04d08
        self.assertEqual(e, f)
Packit Service a04d08
Packit Service a04d08
    def test_compat_merges_str(self):
Packit Service a04d08
        a = {'b': "hi"}
Packit Service a04d08
        b = {'b': "howdy"}
Packit Service a04d08
        c = {'b': "hallo"}
Packit Service a04d08
        e = _old_mergemanydict(a, b, c)
Packit Service a04d08
        f = util.mergemanydict([a, b, c])
Packit Service a04d08
        self.assertEqual(e, f)
Packit Service a04d08
Packit Service a04d08
    def test_compat_merge_sub_dict(self):
Packit Service a04d08
        a = {
Packit Service a04d08
            '1': '2',
Packit Service a04d08
            'b': {
Packit Service a04d08
                'f': 'g',
Packit Service a04d08
                'e': 'c',
Packit Service a04d08
                'h': 'd',
Packit Service a04d08
                'hh': {
Packit Service a04d08
                    '1': 2,
Packit Service a04d08
                },
Packit Service a04d08
            }
Packit Service a04d08
        }
Packit Service a04d08
        b = {
Packit Service a04d08
            'b': {
Packit Service a04d08
                'e': 'c',
Packit Service a04d08
                'hh': {
Packit Service a04d08
                    '3': 4,
Packit Service a04d08
                }
Packit Service a04d08
            }
Packit Service a04d08
        }
Packit Service a04d08
        c = _old_mergedict(a, b)
Packit Service a04d08
        d = util.mergemanydict([a, b])
Packit Service a04d08
        self.assertEqual(c, d)
Packit Service a04d08
Packit Service a04d08
    def test_compat_merge_sub_dict2(self):
Packit Service a04d08
        a = {
Packit Service a04d08
            '1': '2',
Packit Service a04d08
            'b': {
Packit Service a04d08
                'f': 'g',
Packit Service a04d08
            }
Packit Service a04d08
        }
Packit Service a04d08
        b = {
Packit Service a04d08
            'b': {
Packit Service a04d08
                'e': 'c',
Packit Service a04d08
            }
Packit Service a04d08
        }
Packit Service a04d08
        c = _old_mergedict(a, b)
Packit Service a04d08
        d = util.mergemanydict([a, b])
Packit Service a04d08
        self.assertEqual(c, d)
Packit Service a04d08
Packit Service a04d08
    def test_compat_merge_sub_list(self):
Packit Service a04d08
        a = {
Packit Service a04d08
            '1': '2',
Packit Service a04d08
            'b': {
Packit Service a04d08
                'f': ['1'],
Packit Service a04d08
            }
Packit Service a04d08
        }
Packit Service a04d08
        b = {
Packit Service a04d08
            'b': {
Packit Service a04d08
                'f': [],
Packit Service a04d08
            }
Packit Service a04d08
        }
Packit Service a04d08
        c = _old_mergedict(a, b)
Packit Service a04d08
        d = util.mergemanydict([a, b])
Packit Service a04d08
        self.assertEqual(c, d)
Packit Service a04d08
Packit Service a04d08
# vi: ts=4 expandtab