Blob Blame History Raw
# This file is part of cloud-init. See LICENSE file for license information.

"""Tests of the built-in user data handlers."""

import copy
import errno
import os
import shutil
import tempfile
from textwrap import dedent


from cloudinit.tests.helpers import (
    FilesystemMockingTestCase, CiTestCase, mock, skipUnlessJinja)

from cloudinit import handlers
from cloudinit import helpers
from cloudinit import subp
from cloudinit import util

from cloudinit.handlers.cloud_config import CloudConfigPartHandler
from cloudinit.handlers.jinja_template import (
    JinjaTemplatePartHandler, convert_jinja_instance_data,
    render_jinja_payload)
from cloudinit.handlers.shell_script import ShellScriptPartHandler
from cloudinit.handlers.upstart_job import UpstartJobPartHandler

from cloudinit.settings import (PER_ALWAYS, PER_INSTANCE)


class TestUpstartJobPartHandler(FilesystemMockingTestCase):

    mpath = 'cloudinit.handlers.upstart_job.'

    def test_upstart_frequency_no_out(self):
        c_root = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, c_root)
        up_root = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, up_root)
        paths = helpers.Paths({
            'cloud_dir': c_root,
            'upstart_dir': up_root,
        })
        h = UpstartJobPartHandler(paths)
        # No files should be written out when
        # the frequency is ! per-instance
        h.handle_part('', handlers.CONTENT_START,
                      None, None, None)
        h.handle_part('blah', 'text/upstart-job',
                      'test.conf', 'blah', frequency=PER_ALWAYS)
        h.handle_part('', handlers.CONTENT_END,
                      None, None, None)
        self.assertEqual(0, len(os.listdir(up_root)))

    def test_upstart_frequency_single(self):
        # files should be written out when frequency is ! per-instance
        new_root = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, new_root)

        self.patchOS(new_root)
        self.patchUtils(new_root)
        paths = helpers.Paths({
            'upstart_dir': "/etc/upstart",
        })

        util.ensure_dir("/run")
        util.ensure_dir("/etc/upstart")

        with mock.patch(self.mpath + 'SUITABLE_UPSTART', return_value=True):
            with mock.patch.object(subp, 'subp') as m_subp:
                h = UpstartJobPartHandler(paths)
                h.handle_part('', handlers.CONTENT_START,
                              None, None, None)
                h.handle_part('blah', 'text/upstart-job',
                              'test.conf', 'blah', frequency=PER_INSTANCE)
                h.handle_part('', handlers.CONTENT_END,
                              None, None, None)

        self.assertEqual(len(os.listdir('/etc/upstart')), 1)

        m_subp.assert_called_once_with(
            ['initctl', 'reload-configuration'], capture=False)


class TestJinjaTemplatePartHandler(CiTestCase):

    with_logs = True

    mpath = 'cloudinit.handlers.jinja_template.'

    def setUp(self):
        super(TestJinjaTemplatePartHandler, self).setUp()
        self.tmp = self.tmp_dir()
        self.run_dir = os.path.join(self.tmp, 'run_dir')
        util.ensure_dir(self.run_dir)
        self.paths = helpers.Paths({
            'cloud_dir': self.tmp, 'run_dir': self.run_dir})

    def test_jinja_template_part_handler_defaults(self):
        """On init, paths are saved and subhandler types are empty."""
        h = JinjaTemplatePartHandler(self.paths)
        self.assertEqual(['## template: jinja'], h.prefixes)
        self.assertEqual(3, h.handler_version)
        self.assertEqual(self.paths, h.paths)
        self.assertEqual({}, h.sub_handlers)

    def test_jinja_template_part_handler_looks_up_sub_handler_types(self):
        """When sub_handlers are passed, init lists types of subhandlers."""
        script_handler = ShellScriptPartHandler(self.paths)
        cloudconfig_handler = CloudConfigPartHandler(self.paths)
        h = JinjaTemplatePartHandler(
            self.paths, sub_handlers=[script_handler, cloudconfig_handler])
        self.assertCountEqual(
            ['text/cloud-config', 'text/cloud-config-jsonp',
             'text/x-shellscript'],
            h.sub_handlers)

    def test_jinja_template_part_handler_looks_up_subhandler_types(self):
        """When sub_handlers are passed, init lists types of subhandlers."""
        script_handler = ShellScriptPartHandler(self.paths)
        cloudconfig_handler = CloudConfigPartHandler(self.paths)
        h = JinjaTemplatePartHandler(
            self.paths, sub_handlers=[script_handler, cloudconfig_handler])
        self.assertCountEqual(
            ['text/cloud-config', 'text/cloud-config-jsonp',
             'text/x-shellscript'],
            h.sub_handlers)

    def test_jinja_template_handle_noop_on_content_signals(self):
        """Perform no part handling when content type is CONTENT_SIGNALS."""
        script_handler = ShellScriptPartHandler(self.paths)

        h = JinjaTemplatePartHandler(
            self.paths, sub_handlers=[script_handler])
        with mock.patch.object(script_handler, 'handle_part') as m_handle_part:
            h.handle_part(
                data='data', ctype=handlers.CONTENT_START, filename='part-1',
                payload='## template: jinja\n#!/bin/bash\necho himom',
                frequency='freq', headers='headers')
        m_handle_part.assert_not_called()

    @skipUnlessJinja()
    def test_jinja_template_handle_subhandler_v2_with_clean_payload(self):
        """Call version 2 subhandler.handle_part with stripped payload."""
        script_handler = ShellScriptPartHandler(self.paths)
        self.assertEqual(2, script_handler.handler_version)

        # Create required instance-data.json file
        instance_json = os.path.join(self.run_dir, 'instance-data.json')
        instance_data = {'topkey': 'echo himom'}
        util.write_file(instance_json, util.json_dumps(instance_data))
        h = JinjaTemplatePartHandler(
            self.paths, sub_handlers=[script_handler])
        with mock.patch.object(script_handler, 'handle_part') as m_part:
            # ctype with leading '!' not in handlers.CONTENT_SIGNALS
            h.handle_part(
                data='data', ctype="!" + handlers.CONTENT_START,
                filename='part01',
                payload='## template: jinja   \t \n#!/bin/bash\n{{ topkey }}',
                frequency='freq', headers='headers')
        m_part.assert_called_once_with(
            'data', '!__begin__', 'part01', '#!/bin/bash\necho himom', 'freq')

    @skipUnlessJinja()
    def test_jinja_template_handle_subhandler_v3_with_clean_payload(self):
        """Call version 3 subhandler.handle_part with stripped payload."""
        cloudcfg_handler = CloudConfigPartHandler(self.paths)
        self.assertEqual(3, cloudcfg_handler.handler_version)

        # Create required instance-data.json file
        instance_json = os.path.join(self.run_dir, 'instance-data.json')
        instance_data = {'topkey': {'sub': 'runcmd: [echo hi]'}}
        util.write_file(instance_json, util.json_dumps(instance_data))
        h = JinjaTemplatePartHandler(
            self.paths, sub_handlers=[cloudcfg_handler])
        with mock.patch.object(cloudcfg_handler, 'handle_part') as m_part:
            # ctype with leading '!' not in handlers.CONTENT_SIGNALS
            h.handle_part(
                data='data', ctype="!" + handlers.CONTENT_END,
                filename='part01',
                payload='## template: jinja\n#cloud-config\n{{ topkey.sub }}',
                frequency='freq', headers='headers')
        m_part.assert_called_once_with(
            'data', '!__end__', 'part01', '#cloud-config\nruncmd: [echo hi]',
            'freq', 'headers')

    def test_jinja_template_handle_errors_on_missing_instance_data_json(self):
        """If instance-data is absent, raise an error from handle_part."""
        script_handler = ShellScriptPartHandler(self.paths)
        h = JinjaTemplatePartHandler(
            self.paths, sub_handlers=[script_handler])
        with self.assertRaises(RuntimeError) as context_manager:
            h.handle_part(
                data='data', ctype="!" + handlers.CONTENT_START,
                filename='part01',
                payload='## template: jinja  \n#!/bin/bash\necho himom',
                frequency='freq', headers='headers')
        script_file = os.path.join(script_handler.script_dir, 'part01')
        self.assertEqual(
            'Cannot render jinja template vars. Instance data not yet present'
            ' at {}/instance-data.json'.format(
                self.run_dir), str(context_manager.exception))
        self.assertFalse(
            os.path.exists(script_file),
            'Unexpected file created %s' % script_file)

    def test_jinja_template_handle_errors_on_unreadable_instance_data(self):
        """If instance-data is unreadable, raise an error from handle_part."""
        script_handler = ShellScriptPartHandler(self.paths)
        instance_json = os.path.join(self.run_dir, 'instance-data.json')
        util.write_file(instance_json, util.json_dumps({}))
        h = JinjaTemplatePartHandler(
            self.paths, sub_handlers=[script_handler])
        with mock.patch(self.mpath + 'load_file') as m_load:
            with self.assertRaises(RuntimeError) as context_manager:
                m_load.side_effect = OSError(errno.EACCES, 'Not allowed')
                h.handle_part(
                    data='data', ctype="!" + handlers.CONTENT_START,
                    filename='part01',
                    payload='## template: jinja  \n#!/bin/bash\necho himom',
                    frequency='freq', headers='headers')
        script_file = os.path.join(script_handler.script_dir, 'part01')
        self.assertEqual(
            'Cannot render jinja template vars. No read permission on'
            " '{rdir}/instance-data.json'. Try sudo".format(rdir=self.run_dir),
            str(context_manager.exception))
        self.assertFalse(
            os.path.exists(script_file),
            'Unexpected file created %s' % script_file)

    @skipUnlessJinja()
    def test_jinja_template_handle_renders_jinja_content(self):
        """When present, render jinja variables from instance-data.json."""
        script_handler = ShellScriptPartHandler(self.paths)
        instance_json = os.path.join(self.run_dir, 'instance-data.json')
        instance_data = {'topkey': {'subkey': 'echo himom'}}
        util.write_file(instance_json, util.json_dumps(instance_data))
        h = JinjaTemplatePartHandler(
            self.paths, sub_handlers=[script_handler])
        h.handle_part(
            data='data', ctype="!" + handlers.CONTENT_START,
            filename='part01',
            payload=(
                '## template: jinja  \n'
                '#!/bin/bash\n'
                '{{ topkey.subkey|default("nosubkey") }}'),
            frequency='freq', headers='headers')
        script_file = os.path.join(script_handler.script_dir, 'part01')
        self.assertNotIn(
            'Instance data not yet present at {}/instance-data.json'.format(
                self.run_dir),
            self.logs.getvalue())
        self.assertEqual(
            '#!/bin/bash\necho himom', util.load_file(script_file))

    @skipUnlessJinja()
    def test_jinja_template_handle_renders_jinja_content_missing_keys(self):
        """When specified jinja variable is undefined, log a warning."""
        script_handler = ShellScriptPartHandler(self.paths)
        instance_json = os.path.join(self.run_dir, 'instance-data.json')
        instance_data = {'topkey': {'subkey': 'echo himom'}}
        util.write_file(instance_json, util.json_dumps(instance_data))
        h = JinjaTemplatePartHandler(
            self.paths, sub_handlers=[script_handler])
        h.handle_part(
            data='data', ctype="!" + handlers.CONTENT_START,
            filename='part01',
            payload='## template: jinja  \n#!/bin/bash\n{{ goodtry }}',
            frequency='freq', headers='headers')
        script_file = os.path.join(script_handler.script_dir, 'part01')
        self.assertTrue(
            os.path.exists(script_file),
            'Missing expected file %s' % script_file)
        self.assertIn(
            "WARNING: Could not render jinja template variables in file"
            " 'part01': 'goodtry'\n",
            self.logs.getvalue())


class TestConvertJinjaInstanceData(CiTestCase):

    def test_convert_instance_data_hyphens_to_underscores(self):
        """Replace hyphenated keys with underscores in instance-data."""
        data = {'hyphenated-key': 'hyphenated-val',
                'underscore_delim_key': 'underscore_delimited_val'}
        expected_data = {'hyphenated_key': 'hyphenated-val',
                         'underscore_delim_key': 'underscore_delimited_val'}
        self.assertEqual(
            expected_data,
            convert_jinja_instance_data(data=data))

    def test_convert_instance_data_promotes_versioned_keys_to_top_level(self):
        """Any versioned keys are promoted as top-level keys

        This provides any cloud-init standardized keys up at a top-level to
        allow ease of reference for users. Intsead of v1.availability_zone,
        the name availability_zone can be used in templates.
        """
        data = {'ds': {'dskey1': 1, 'dskey2': 2},
                'v1': {'v1key1': 'v1.1'},
                'v2': {'v2key1': 'v2.1'}}
        expected_data = copy.deepcopy(data)
        expected_data.update({'v1key1': 'v1.1', 'v2key1': 'v2.1'})

        converted_data = convert_jinja_instance_data(data=data)
        self.assertCountEqual(
            ['ds', 'v1', 'v2', 'v1key1', 'v2key1'], converted_data.keys())
        self.assertEqual(
            expected_data,
            converted_data)

    def test_convert_instance_data_most_recent_version_of_promoted_keys(self):
        """The most-recent versioned key value is promoted to top-level."""
        data = {'v1': {'key1': 'old v1 key1', 'key2': 'old v1 key2'},
                'v2': {'key1': 'newer v2 key1', 'key3': 'newer v2 key3'},
                'v3': {'key1': 'newest v3 key1'}}
        expected_data = copy.deepcopy(data)
        expected_data.update(
            {'key1': 'newest v3 key1', 'key2': 'old v1 key2',
             'key3': 'newer v2 key3'})

        converted_data = convert_jinja_instance_data(data=data)
        self.assertEqual(
            expected_data,
            converted_data)

    def test_convert_instance_data_decodes_decode_paths(self):
        """Any decode_paths provided are decoded by convert_instance_data."""
        data = {'key1': {'subkey1': 'aGkgbW9t'}, 'key2': 'aGkgZGFk'}
        expected_data = copy.deepcopy(data)
        expected_data['key1']['subkey1'] = 'hi mom'

        converted_data = convert_jinja_instance_data(
            data=data, decode_paths=('key1/subkey1',))
        self.assertEqual(
            expected_data,
            converted_data)


class TestRenderJinjaPayload(CiTestCase):

    with_logs = True

    @skipUnlessJinja()
    def test_render_jinja_payload_logs_jinja_vars_on_debug(self):
        """When debug is True, log jinja varables available."""
        payload = (
            '## template: jinja\n#!/bin/sh\necho hi from {{ v1.hostname }}')
        instance_data = {'v1': {'hostname': 'foo'}, 'instance-id': 'iid'}
        expected_log = dedent("""\
            DEBUG: Converted jinja variables
            {
             "hostname": "foo",
             "instance_id": "iid",
             "v1": {
              "hostname": "foo"
             }
            }
            """)
        self.assertEqual(
            render_jinja_payload(
                payload=payload, payload_fn='myfile',
                instance_data=instance_data, debug=True),
            '#!/bin/sh\necho hi from foo')
        self.assertEqual(expected_log, self.logs.getvalue())

    @skipUnlessJinja()
    def test_render_jinja_payload_replaces_missing_variables_and_warns(self):
        """Warn on missing jinja variables and replace the absent variable."""
        payload = (
            '## template: jinja\n#!/bin/sh\necho hi from {{ NOTHERE }}')
        instance_data = {'v1': {'hostname': 'foo'}, 'instance-id': 'iid'}
        self.assertEqual(
            render_jinja_payload(
                payload=payload, payload_fn='myfile',
                instance_data=instance_data),
            '#!/bin/sh\necho hi from CI_MISSING_JINJA_VAR/NOTHERE')
        expected_log = (
            'WARNING: Could not render jinja template variables in file'
            " 'myfile': 'NOTHERE'")
        self.assertIn(expected_log, self.logs.getvalue())

# vi: ts=4 expandtab