Blame tests/unittests/test_merging.py

Packit bc9a3a
# This file is part of cloud-init. See LICENSE file for license information.
Packit bc9a3a
Packit bc9a3a
from cloudinit.tests import helpers
Packit bc9a3a
Packit bc9a3a
from cloudinit.handlers import cloud_config
Packit bc9a3a
from cloudinit.handlers import (CONTENT_START, CONTENT_END)
Packit bc9a3a
Packit bc9a3a
from cloudinit import helpers as c_helpers
Packit bc9a3a
from cloudinit import util
Packit bc9a3a
Packit bc9a3a
import collections
Packit bc9a3a
import glob
Packit bc9a3a
import os
Packit bc9a3a
import random
Packit bc9a3a
import re
Packit bc9a3a
import six
Packit bc9a3a
import string
Packit bc9a3a
Packit bc9a3a
SOURCE_PAT = "source*.*yaml"
Packit bc9a3a
EXPECTED_PAT = "expected%s.yaml"
Packit bc9a3a
TYPES = [dict, str, list, tuple, None]
Packit bc9a3a
TYPES.extend(six.integer_types)
Packit bc9a3a
Packit bc9a3a
Packit bc9a3a
def _old_mergedict(src, cand):
Packit bc9a3a
    """
Packit bc9a3a
    Merge values from C{cand} into C{src}.
Packit bc9a3a
    If C{src} has a key C{cand} will not override.
Packit bc9a3a
    Nested dictionaries are merged recursively.
Packit bc9a3a
    """
Packit bc9a3a
    if isinstance(src, dict) and isinstance(cand, dict):
Packit bc9a3a
        for (k, v) in cand.items():
Packit bc9a3a
            if k not in src:
Packit bc9a3a
                src[k] = v
Packit bc9a3a
            else:
Packit bc9a3a
                src[k] = _old_mergedict(src[k], v)
Packit bc9a3a
    return src
Packit bc9a3a
Packit bc9a3a
Packit bc9a3a
def _old_mergemanydict(*args):
Packit bc9a3a
    out = {}
Packit bc9a3a
    for a in args:
Packit bc9a3a
        out = _old_mergedict(out, a)
Packit bc9a3a
    return out
Packit bc9a3a
Packit bc9a3a
Packit bc9a3a
def _random_str(rand):
Packit bc9a3a
    base = ''
Packit bc9a3a
    for _i in range(rand.randint(1, 2 ** 8)):
Packit bc9a3a
        base += rand.choice(string.ascii_letters + string.digits)
Packit bc9a3a
    return base
Packit bc9a3a
Packit bc9a3a
Packit bc9a3a
class _NoMoreException(Exception):
Packit bc9a3a
    pass
Packit bc9a3a
Packit bc9a3a
Packit bc9a3a
def _make_dict(current_depth, max_depth, rand):
Packit bc9a3a
    if current_depth >= max_depth:
Packit bc9a3a
        raise _NoMoreException()
Packit bc9a3a
    if current_depth == 0:
Packit bc9a3a
        t = dict
Packit bc9a3a
    else:
Packit bc9a3a
        t = rand.choice(TYPES)
Packit bc9a3a
    base = None
Packit bc9a3a
    if t in [None]:
Packit bc9a3a
        return base
Packit bc9a3a
    if t in [dict, list, tuple]:
Packit bc9a3a
        if t in [dict]:
Packit bc9a3a
            amount = rand.randint(0, 5)
Packit bc9a3a
            keys = [_random_str(rand) for _i in range(0, amount)]
Packit bc9a3a
            base = {}
Packit bc9a3a
            for k in keys:
Packit bc9a3a
                try:
Packit bc9a3a
                    base[k] = _make_dict(current_depth + 1, max_depth, rand)
Packit bc9a3a
                except _NoMoreException:
Packit bc9a3a
                    pass
Packit bc9a3a
        elif t in [list, tuple]:
Packit bc9a3a
            base = []
Packit bc9a3a
            amount = rand.randint(0, 5)
Packit bc9a3a
            for _i in range(0, amount):
Packit bc9a3a
                try:
Packit bc9a3a
                    base.append(_make_dict(current_depth + 1, max_depth, rand))
Packit bc9a3a
                except _NoMoreException:
Packit bc9a3a
                    pass
Packit bc9a3a
            if t in [tuple]:
Packit bc9a3a
                base = tuple(base)
Packit bc9a3a
    elif t in six.integer_types:
Packit bc9a3a
        base = rand.randint(0, 2 ** 8)
Packit bc9a3a
    elif t in [str]:
Packit bc9a3a
        base = _random_str(rand)
Packit bc9a3a
    return base
Packit bc9a3a
Packit bc9a3a
Packit bc9a3a
def make_dict(max_depth, seed=None):
Packit bc9a3a
    max_depth = max(1, max_depth)
Packit bc9a3a
    rand = random.Random(seed)
Packit bc9a3a
    return _make_dict(0, max_depth, rand)
Packit bc9a3a
Packit bc9a3a
Packit bc9a3a
class TestSimpleRun(helpers.ResourceUsingTestCase):
Packit bc9a3a
    def _load_merge_files(self):
Packit bc9a3a
        merge_root = helpers.resourceLocation('merge_sources')
Packit bc9a3a
        tests = []
Packit bc9a3a
        source_ids = collections.defaultdict(list)
Packit bc9a3a
        expected_files = {}
Packit bc9a3a
        for fn in glob.glob(os.path.join(merge_root, SOURCE_PAT)):
Packit bc9a3a
            base_fn = os.path.basename(fn)
Packit bc9a3a
            file_id = re.match(r"source(\d+)\-(\d+)[.]yaml", base_fn)
Packit bc9a3a
            if not file_id:
Packit bc9a3a
                raise IOError("File %s does not have a numeric identifier"
Packit bc9a3a
                              % (fn))
Packit bc9a3a
            file_id = int(file_id.group(1))
Packit bc9a3a
            source_ids[file_id].append(fn)
Packit bc9a3a
            expected_fn = os.path.join(merge_root, EXPECTED_PAT % (file_id))
Packit bc9a3a
            if not os.path.isfile(expected_fn):
Packit bc9a3a
                raise IOError("No expected file found at %s" % (expected_fn))
Packit bc9a3a
            expected_files[file_id] = expected_fn
Packit bc9a3a
        for i in sorted(source_ids.keys()):
Packit bc9a3a
            source_file_contents = []
Packit bc9a3a
            for fn in sorted(source_ids[i]):
Packit bc9a3a
                source_file_contents.append([fn, util.load_file(fn)])
Packit bc9a3a
            expected = util.load_yaml(util.load_file(expected_files[i]))
Packit bc9a3a
            entry = [source_file_contents, [expected, expected_files[i]]]
Packit bc9a3a
            tests.append(entry)
Packit bc9a3a
        return tests
Packit bc9a3a
Packit bc9a3a
    def test_seed_runs(self):
Packit bc9a3a
        test_dicts = []
Packit bc9a3a
        for i in range(1, 10):
Packit bc9a3a
            base_dicts = []
Packit bc9a3a
            for j in range(1, 10):
Packit bc9a3a
                base_dicts.append(make_dict(5, i * j))
Packit bc9a3a
            test_dicts.append(base_dicts)
Packit bc9a3a
        for test in test_dicts:
Packit bc9a3a
            c = _old_mergemanydict(*test)
Packit bc9a3a
            d = util.mergemanydict(test)
Packit bc9a3a
            self.assertEqual(c, d)
Packit bc9a3a
Packit bc9a3a
    def test_merge_cc_samples(self):
Packit bc9a3a
        tests = self._load_merge_files()
Packit bc9a3a
        paths = c_helpers.Paths({})
Packit bc9a3a
        cc_handler = cloud_config.CloudConfigPartHandler(paths)
Packit bc9a3a
        cc_handler.cloud_fn = None
Packit bc9a3a
        for (payloads, (expected_merge, expected_fn)) in tests:
Packit bc9a3a
            cc_handler.handle_part(None, CONTENT_START, None,
Packit bc9a3a
                                   None, None, None)
Packit bc9a3a
            merging_fns = []
Packit bc9a3a
            for (fn, contents) in payloads:
Packit bc9a3a
                cc_handler.handle_part(None, None, "%s.yaml" % (fn),
Packit bc9a3a
                                       contents, None, {})
Packit bc9a3a
                merging_fns.append(fn)
Packit bc9a3a
            merged_buf = cc_handler.cloud_buf
Packit bc9a3a
            cc_handler.handle_part(None, CONTENT_END, None,
Packit bc9a3a
                                   None, None, None)
Packit bc9a3a
            fail_msg = "Equality failure on checking %s with %s: %s != %s"
Packit bc9a3a
            fail_msg = fail_msg % (expected_fn,
Packit bc9a3a
                                   ",".join(merging_fns), merged_buf,
Packit bc9a3a
                                   expected_merge)
Packit bc9a3a
            self.assertEqual(expected_merge, merged_buf, msg=fail_msg)
Packit bc9a3a
Packit bc9a3a
    def test_compat_merges_dict(self):
Packit bc9a3a
        a = {
Packit bc9a3a
            '1': '2',
Packit bc9a3a
            'b': 'c',
Packit bc9a3a
        }
Packit bc9a3a
        b = {
Packit bc9a3a
            'b': 'e',
Packit bc9a3a
        }
Packit bc9a3a
        c = _old_mergedict(a, b)
Packit bc9a3a
        d = util.mergemanydict([a, b])
Packit bc9a3a
        self.assertEqual(c, d)
Packit bc9a3a
Packit bc9a3a
    def test_compat_merges_dict2(self):
Packit bc9a3a
        a = {
Packit bc9a3a
            'Blah': 1,
Packit bc9a3a
            'Blah2': 2,
Packit bc9a3a
            'Blah3': 3,
Packit bc9a3a
        }
Packit bc9a3a
        b = {
Packit bc9a3a
            'Blah': 1,
Packit bc9a3a
            'Blah2': 2,
Packit bc9a3a
            'Blah3': [1],
Packit bc9a3a
        }
Packit bc9a3a
        c = _old_mergedict(a, b)
Packit bc9a3a
        d = util.mergemanydict([a, b])
Packit bc9a3a
        self.assertEqual(c, d)
Packit bc9a3a
Packit bc9a3a
    def test_compat_merges_list(self):
Packit bc9a3a
        a = {'b': [1, 2, 3]}
Packit bc9a3a
        b = {'b': [4, 5]}
Packit bc9a3a
        c = {'b': [6, 7]}
Packit bc9a3a
        e = _old_mergemanydict(a, b, c)
Packit bc9a3a
        f = util.mergemanydict([a, b, c])
Packit bc9a3a
        self.assertEqual(e, f)
Packit bc9a3a
Packit bc9a3a
    def test_compat_merges_str(self):
Packit bc9a3a
        a = {'b': "hi"}
Packit bc9a3a
        b = {'b': "howdy"}
Packit bc9a3a
        c = {'b': "hallo"}
Packit bc9a3a
        e = _old_mergemanydict(a, b, c)
Packit bc9a3a
        f = util.mergemanydict([a, b, c])
Packit bc9a3a
        self.assertEqual(e, f)
Packit bc9a3a
Packit bc9a3a
    def test_compat_merge_sub_dict(self):
Packit bc9a3a
        a = {
Packit bc9a3a
            '1': '2',
Packit bc9a3a
            'b': {
Packit bc9a3a
                'f': 'g',
Packit bc9a3a
                'e': 'c',
Packit bc9a3a
                'h': 'd',
Packit bc9a3a
                'hh': {
Packit bc9a3a
                    '1': 2,
Packit bc9a3a
                },
Packit bc9a3a
            }
Packit bc9a3a
        }
Packit bc9a3a
        b = {
Packit bc9a3a
            'b': {
Packit bc9a3a
                'e': 'c',
Packit bc9a3a
                'hh': {
Packit bc9a3a
                    '3': 4,
Packit bc9a3a
                }
Packit bc9a3a
            }
Packit bc9a3a
        }
Packit bc9a3a
        c = _old_mergedict(a, b)
Packit bc9a3a
        d = util.mergemanydict([a, b])
Packit bc9a3a
        self.assertEqual(c, d)
Packit bc9a3a
Packit bc9a3a
    def test_compat_merge_sub_dict2(self):
Packit bc9a3a
        a = {
Packit bc9a3a
            '1': '2',
Packit bc9a3a
            'b': {
Packit bc9a3a
                'f': 'g',
Packit bc9a3a
            }
Packit bc9a3a
        }
Packit bc9a3a
        b = {
Packit bc9a3a
            'b': {
Packit bc9a3a
                'e': 'c',
Packit bc9a3a
            }
Packit bc9a3a
        }
Packit bc9a3a
        c = _old_mergedict(a, b)
Packit bc9a3a
        d = util.mergemanydict([a, b])
Packit bc9a3a
        self.assertEqual(c, d)
Packit bc9a3a
Packit bc9a3a
    def test_compat_merge_sub_list(self):
Packit bc9a3a
        a = {
Packit bc9a3a
            '1': '2',
Packit bc9a3a
            'b': {
Packit bc9a3a
                'f': ['1'],
Packit bc9a3a
            }
Packit bc9a3a
        }
Packit bc9a3a
        b = {
Packit bc9a3a
            'b': {
Packit bc9a3a
                'f': [],
Packit bc9a3a
            }
Packit bc9a3a
        }
Packit bc9a3a
        c = _old_mergedict(a, b)
Packit bc9a3a
        d = util.mergemanydict([a, b])
Packit bc9a3a
        self.assertEqual(c, d)
Packit bc9a3a
Packit bc9a3a
# vi: ts=4 expandtab