Blame tests/unittests/test_data.py

Packit Service a04d08
# This file is part of cloud-init. See LICENSE file for license information.
Packit Service a04d08
Packit Service a04d08
"""Tests for handling of userdata within cloud init."""
Packit Service a04d08
Packit Service a04d08
import gzip
Packit Service a04d08
import logging
Packit Service a04d08
import os
Packit Service 9bfd13
from io import BytesIO, StringIO
Packit Service 9bfd13
from unittest import mock
Packit Service a04d08
Packit Service a04d08
from email import encoders
Packit Service a04d08
from email.mime.application import MIMEApplication
Packit Service a04d08
from email.mime.base import MIMEBase
Packit Service a04d08
from email.mime.multipart import MIMEMultipart
Packit Service a04d08
Packit Service a04d08
import httpretty
Packit Service a04d08
Packit Service a04d08
from cloudinit import handlers
Packit Service a04d08
from cloudinit import helpers as c_helpers
Packit Service a04d08
from cloudinit import log
Packit Service a04d08
from cloudinit.settings import (PER_INSTANCE)
Packit Service a04d08
from cloudinit import sources
Packit Service a04d08
from cloudinit import stages
Packit Service a04d08
from cloudinit import user_data as ud
Packit Service a04d08
from cloudinit import safeyaml
Packit Service a04d08
from cloudinit import util
Packit Service a04d08
Packit Service a04d08
from cloudinit.tests import helpers
Packit Service a04d08
Packit Service a04d08
Packit Service a04d08
INSTANCE_ID = "i-testing"
Packit Service a04d08
Packit Service a04d08
Packit Service a04d08
class FakeDataSource(sources.DataSource):
Packit Service a04d08
Packit Service a04d08
    def __init__(self, userdata=None, vendordata=None):
Packit Service a04d08
        sources.DataSource.__init__(self, {}, None, None)
Packit Service a04d08
        self.metadata = {'instance-id': INSTANCE_ID}
Packit Service a04d08
        self.userdata_raw = userdata
Packit Service a04d08
        self.vendordata_raw = vendordata
Packit Service a04d08
Packit Service a04d08
Packit Service a04d08
def count_messages(root):
Packit Service a04d08
    am = 0
Packit Service a04d08
    for m in root.walk():
Packit Service a04d08
        if ud.is_skippable(m):
Packit Service a04d08
            continue
Packit Service a04d08
        am += 1
Packit Service a04d08
    return am
Packit Service a04d08
Packit Service a04d08
Packit Service a04d08
def gzip_text(text):
Packit Service a04d08
    contents = BytesIO()
Packit Service a04d08
    f = gzip.GzipFile(fileobj=contents, mode='wb')
Packit Service a04d08
    f.write(util.encode_text(text))
Packit Service a04d08
    f.flush()
Packit Service a04d08
    f.close()
Packit Service a04d08
    return contents.getvalue()
Packit Service a04d08
Packit Service a04d08
Packit Service a04d08
# FIXME: these tests shouldn't be checking log output??
Packit Service a04d08
# Weirddddd...
Packit Service a04d08
class TestConsumeUserData(helpers.FilesystemMockingTestCase):
Packit Service a04d08
Packit Service a04d08
    def setUp(self):
Packit Service a04d08
        super(TestConsumeUserData, self).setUp()
Packit Service a04d08
        self._log = None
Packit Service a04d08
        self._log_file = None
Packit Service a04d08
        self._log_handler = None
Packit Service a04d08
Packit Service a04d08
    def tearDown(self):
Packit Service a04d08
        if self._log_handler and self._log:
Packit Service a04d08
            self._log.removeHandler(self._log_handler)
Packit Service a04d08
        helpers.FilesystemMockingTestCase.tearDown(self)
Packit Service a04d08
Packit Service a04d08
    def _patchIn(self, root):
Packit Service a04d08
        self.patchOS(root)
Packit Service a04d08
        self.patchUtils(root)
Packit Service a04d08
Packit Service a04d08
    def capture_log(self, lvl=logging.DEBUG):
Packit Service a04d08
        log_file = StringIO()
Packit Service a04d08
        self._log_handler = logging.StreamHandler(log_file)
Packit Service a04d08
        self._log_handler.setLevel(lvl)
Packit Service a04d08
        self._log = log.getLogger()
Packit Service a04d08
        self._log.addHandler(self._log_handler)
Packit Service a04d08
        return log_file
Packit Service a04d08
Packit Service a04d08
    def test_simple_jsonp(self):
Packit Service a04d08
        blob = '''
Packit Service a04d08
#cloud-config-jsonp
Packit Service a04d08
[
Packit Service a04d08
     { "op": "add", "path": "/baz", "value": "qux" },
Packit Service a04d08
     { "op": "add", "path": "/bar", "value": "qux2" }
Packit Service a04d08
]
Packit Service a04d08
'''
Packit Service a04d08
Packit Service a04d08
        ci = stages.Init()
Packit Service a04d08
        ci.datasource = FakeDataSource(blob)
Packit Service a04d08
        self.reRoot()
Packit Service a04d08
        ci.fetch()
Packit Service a04d08
        ci.consume_data()
Packit Service a04d08
        cc_contents = util.load_file(ci.paths.get_ipath("cloud_config"))
Packit Service a04d08
        cc = util.load_yaml(cc_contents)
Packit Service a04d08
        self.assertEqual(2, len(cc))
Packit Service a04d08
        self.assertEqual('qux', cc['baz'])
Packit Service a04d08
        self.assertEqual('qux2', cc['bar'])
Packit Service a04d08
Packit Service a04d08
    def test_simple_jsonp_vendor_and_user(self):
Packit Service a04d08
        # test that user-data wins over vendor
Packit Service a04d08
        user_blob = '''
Packit Service a04d08
#cloud-config-jsonp
Packit Service a04d08
[
Packit Service a04d08
     { "op": "add", "path": "/baz", "value": "qux" },
Packit Service a04d08
     { "op": "add", "path": "/bar", "value": "qux2" }
Packit Service a04d08
]
Packit Service a04d08
'''
Packit Service a04d08
        vendor_blob = '''
Packit Service a04d08
#cloud-config-jsonp
Packit Service a04d08
[
Packit Service a04d08
     { "op": "add", "path": "/baz", "value": "quxA" },
Packit Service a04d08
     { "op": "add", "path": "/bar", "value": "quxB" },
Packit Service a04d08
     { "op": "add", "path": "/foo", "value": "quxC" }
Packit Service a04d08
]
Packit Service a04d08
'''
Packit Service a04d08
        self.reRoot()
Packit Service a04d08
        initer = stages.Init()
Packit Service a04d08
        initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
Packit Service a04d08
        initer.read_cfg()
Packit Service a04d08
        initer.initialize()
Packit Service a04d08
        initer.fetch()
Packit Service a04d08
        initer.instancify()
Packit Service a04d08
        initer.update()
Packit Service a04d08
        initer.cloudify().run('consume_data',
Packit Service a04d08
                              initer.consume_data,
Packit Service a04d08
                              args=[PER_INSTANCE],
Packit Service a04d08
                              freq=PER_INSTANCE)
Packit Service a04d08
        mods = stages.Modules(initer)
Packit Service a04d08
        (_which_ran, _failures) = mods.run_section('cloud_init_modules')
Packit Service a04d08
        cfg = mods.cfg
Packit Service a04d08
        self.assertIn('vendor_data', cfg)
Packit Service a04d08
        self.assertEqual('qux', cfg['baz'])
Packit Service a04d08
        self.assertEqual('qux2', cfg['bar'])
Packit Service a04d08
        self.assertEqual('quxC', cfg['foo'])
Packit Service a04d08
Packit Service a04d08
    def test_simple_jsonp_no_vendor_consumed(self):
Packit Service a04d08
        # make sure that vendor data is not consumed
Packit Service a04d08
        user_blob = '''
Packit Service a04d08
#cloud-config-jsonp
Packit Service a04d08
[
Packit Service a04d08
     { "op": "add", "path": "/baz", "value": "qux" },
Packit Service a04d08
     { "op": "add", "path": "/bar", "value": "qux2" },
Packit Service a04d08
     { "op": "add", "path": "/vendor_data", "value": {"enabled": "false"}}
Packit Service a04d08
]
Packit Service a04d08
'''
Packit Service a04d08
        vendor_blob = '''
Packit Service a04d08
#cloud-config-jsonp
Packit Service a04d08
[
Packit Service a04d08
     { "op": "add", "path": "/baz", "value": "quxA" },
Packit Service a04d08
     { "op": "add", "path": "/bar", "value": "quxB" },
Packit Service a04d08
     { "op": "add", "path": "/foo", "value": "quxC" }
Packit Service a04d08
]
Packit Service a04d08
'''
Packit Service a04d08
        self.reRoot()
Packit Service a04d08
        initer = stages.Init()
Packit Service a04d08
        initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
Packit Service a04d08
        initer.read_cfg()
Packit Service a04d08
        initer.initialize()
Packit Service a04d08
        initer.fetch()
Packit Service a04d08
        initer.instancify()
Packit Service a04d08
        initer.update()
Packit Service a04d08
        initer.cloudify().run('consume_data',
Packit Service a04d08
                              initer.consume_data,
Packit Service a04d08
                              args=[PER_INSTANCE],
Packit Service a04d08
                              freq=PER_INSTANCE)
Packit Service a04d08
        mods = stages.Modules(initer)
Packit Service a04d08
        (_which_ran, _failures) = mods.run_section('cloud_init_modules')
Packit Service a04d08
        cfg = mods.cfg
Packit Service a04d08
        self.assertEqual('qux', cfg['baz'])
Packit Service a04d08
        self.assertEqual('qux2', cfg['bar'])
Packit Service a04d08
        self.assertNotIn('foo', cfg)
Packit Service a04d08
Packit Service a04d08
    def test_mixed_cloud_config(self):
Packit Service a04d08
        blob_cc = '''
Packit Service a04d08
#cloud-config
Packit Service a04d08
a: b
Packit Service a04d08
c: d
Packit Service a04d08
'''
Packit Service a04d08
        message_cc = MIMEBase("text", "cloud-config")
Packit Service a04d08
        message_cc.set_payload(blob_cc)
Packit Service a04d08
Packit Service a04d08
        blob_jp = '''
Packit Service a04d08
#cloud-config-jsonp
Packit Service a04d08
[
Packit Service a04d08
     { "op": "replace", "path": "/a", "value": "c" },
Packit Service a04d08
     { "op": "remove", "path": "/c" }
Packit Service a04d08
]
Packit Service a04d08
'''
Packit Service a04d08
Packit Service a04d08
        message_jp = MIMEBase('text', "cloud-config-jsonp")
Packit Service a04d08
        message_jp.set_payload(blob_jp)
Packit Service a04d08
Packit Service a04d08
        message = MIMEMultipart()
Packit Service a04d08
        message.attach(message_cc)
Packit Service a04d08
        message.attach(message_jp)
Packit Service a04d08
Packit Service a04d08
        self.reRoot()
Packit Service a04d08
        ci = stages.Init()
Packit Service a04d08
        ci.datasource = FakeDataSource(str(message))
Packit Service a04d08
        ci.fetch()
Packit Service a04d08
        ci.consume_data()
Packit Service a04d08
        cc_contents = util.load_file(ci.paths.get_ipath("cloud_config"))
Packit Service a04d08
        cc = util.load_yaml(cc_contents)
Packit Service a04d08
        self.assertEqual(1, len(cc))
Packit Service a04d08
        self.assertEqual('c', cc['a'])
Packit Service a04d08
Packit Service 9bfd13
    def test_cloud_config_as_x_shell_script(self):
Packit Service 9bfd13
        blob_cc = '''
Packit Service 9bfd13
#cloud-config
Packit Service 9bfd13
a: b
Packit Service 9bfd13
c: d
Packit Service 9bfd13
'''
Packit Service 9bfd13
        message_cc = MIMEBase("text", "x-shellscript")
Packit Service 9bfd13
        message_cc.set_payload(blob_cc)
Packit Service 9bfd13
Packit Service 9bfd13
        blob_jp = '''
Packit Service 9bfd13
#cloud-config-jsonp
Packit Service 9bfd13
[
Packit Service 9bfd13
     { "op": "replace", "path": "/a", "value": "c" },
Packit Service 9bfd13
     { "op": "remove", "path": "/c" }
Packit Service 9bfd13
]
Packit Service 9bfd13
'''
Packit Service 9bfd13
Packit Service 9bfd13
        message_jp = MIMEBase('text', "cloud-config-jsonp")
Packit Service 9bfd13
        message_jp.set_payload(blob_jp)
Packit Service 9bfd13
Packit Service 9bfd13
        message = MIMEMultipart()
Packit Service 9bfd13
        message.attach(message_cc)
Packit Service 9bfd13
        message.attach(message_jp)
Packit Service 9bfd13
Packit Service 9bfd13
        self.reRoot()
Packit Service 9bfd13
        ci = stages.Init()
Packit Service 9bfd13
        ci.datasource = FakeDataSource(str(message))
Packit Service 9bfd13
        ci.fetch()
Packit Service 9bfd13
        ci.consume_data()
Packit Service 9bfd13
        cc_contents = util.load_file(ci.paths.get_ipath("cloud_config"))
Packit Service 9bfd13
        cc = util.load_yaml(cc_contents)
Packit Service 9bfd13
        self.assertEqual(1, len(cc))
Packit Service 9bfd13
        self.assertEqual('c', cc['a'])
Packit Service 9bfd13
Packit Service a04d08
    def test_vendor_user_yaml_cloud_config(self):
Packit Service a04d08
        vendor_blob = '''
Packit Service a04d08
#cloud-config
Packit Service a04d08
a: b
Packit Service a04d08
name: vendor
Packit Service a04d08
run:
Packit Service a04d08
 - x
Packit Service a04d08
 - y
Packit Service a04d08
'''
Packit Service a04d08
Packit Service a04d08
        user_blob = '''
Packit Service a04d08
#cloud-config
Packit Service a04d08
a: c
Packit Service a04d08
vendor_data:
Packit Service a04d08
  enabled: True
Packit Service a04d08
  prefix: /bin/true
Packit Service a04d08
name: user
Packit Service a04d08
run:
Packit Service a04d08
 - z
Packit Service a04d08
'''
Packit Service a04d08
        self.reRoot()
Packit Service a04d08
        initer = stages.Init()
Packit Service a04d08
        initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
Packit Service a04d08
        initer.read_cfg()
Packit Service a04d08
        initer.initialize()
Packit Service a04d08
        initer.fetch()
Packit Service a04d08
        initer.instancify()
Packit Service a04d08
        initer.update()
Packit Service a04d08
        initer.cloudify().run('consume_data',
Packit Service a04d08
                              initer.consume_data,
Packit Service a04d08
                              args=[PER_INSTANCE],
Packit Service a04d08
                              freq=PER_INSTANCE)
Packit Service a04d08
        mods = stages.Modules(initer)
Packit Service a04d08
        (_which_ran, _failures) = mods.run_section('cloud_init_modules')
Packit Service a04d08
        cfg = mods.cfg
Packit Service a04d08
        self.assertIn('vendor_data', cfg)
Packit Service a04d08
        self.assertEqual('c', cfg['a'])
Packit Service a04d08
        self.assertEqual('user', cfg['name'])
Packit Service a04d08
        self.assertNotIn('x', cfg['run'])
Packit Service a04d08
        self.assertNotIn('y', cfg['run'])
Packit Service a04d08
        self.assertIn('z', cfg['run'])
Packit Service a04d08
Packit Service a04d08
    def test_vendordata_script(self):
Packit Service a04d08
        vendor_blob = '''
Packit Service a04d08
#!/bin/bash
Packit Service a04d08
echo "test"
Packit Service a04d08
'''
Packit Service a04d08
Packit Service a04d08
        user_blob = '''
Packit Service a04d08
#cloud-config
Packit Service a04d08
vendor_data:
Packit Service a04d08
  enabled: True
Packit Service a04d08
  prefix: /bin/true
Packit Service a04d08
'''
Packit Service a04d08
        new_root = self.reRoot()
Packit Service a04d08
        initer = stages.Init()
Packit Service a04d08
        initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
Packit Service a04d08
        initer.read_cfg()
Packit Service a04d08
        initer.initialize()
Packit Service a04d08
        initer.fetch()
Packit Service a04d08
        initer.instancify()
Packit Service a04d08
        initer.update()
Packit Service a04d08
        initer.cloudify().run('consume_data',
Packit Service a04d08
                              initer.consume_data,
Packit Service a04d08
                              args=[PER_INSTANCE],
Packit Service a04d08
                              freq=PER_INSTANCE)
Packit Service a04d08
        mods = stages.Modules(initer)
Packit Service a04d08
        (_which_ran, _failures) = mods.run_section('cloud_init_modules')
Packit Service a04d08
        vendor_script = initer.paths.get_ipath_cur('vendor_scripts')
Packit Service a04d08
        vendor_script_fns = "%s%s/part-001" % (new_root, vendor_script)
Packit Service a04d08
        self.assertTrue(os.path.exists(vendor_script_fns))
Packit Service a04d08
Packit Service a04d08
    def test_merging_cloud_config(self):
Packit Service a04d08
        blob = '''
Packit Service a04d08
#cloud-config
Packit Service a04d08
a: b
Packit Service a04d08
e: f
Packit Service a04d08
run:
Packit Service a04d08
 - b
Packit Service a04d08
 - c
Packit Service a04d08
'''
Packit Service a04d08
        message1 = MIMEBase("text", "cloud-config")
Packit Service a04d08
        message1.set_payload(blob)
Packit Service a04d08
Packit Service a04d08
        blob2 = '''
Packit Service a04d08
#cloud-config
Packit Service a04d08
a: e
Packit Service a04d08
e: g
Packit Service a04d08
run:
Packit Service a04d08
 - stuff
Packit Service a04d08
 - morestuff
Packit Service a04d08
'''
Packit Service a04d08
        message2 = MIMEBase("text", "cloud-config")
Packit Service a04d08
        message2['X-Merge-Type'] = ('dict(recurse_array,'
Packit Service a04d08
                                    'recurse_str)+list(append)+str(append)')
Packit Service a04d08
        message2.set_payload(blob2)
Packit Service a04d08
Packit Service a04d08
        blob3 = '''
Packit Service a04d08
#cloud-config
Packit Service a04d08
e:
Packit Service a04d08
 - 1
Packit Service a04d08
 - 2
Packit Service a04d08
 - 3
Packit Service a04d08
p: 1
Packit Service a04d08
'''
Packit Service a04d08
        message3 = MIMEBase("text", "cloud-config")
Packit Service a04d08
        message3.set_payload(blob3)
Packit Service a04d08
Packit Service a04d08
        messages = [message1, message2, message3]
Packit Service a04d08
Packit Service a04d08
        paths = c_helpers.Paths({}, ds=FakeDataSource(''))
Packit Service a04d08
        cloud_cfg = handlers.cloud_config.CloudConfigPartHandler(paths)
Packit Service a04d08
Packit Service a04d08
        self.reRoot()
Packit Service a04d08
        cloud_cfg.handle_part(None, handlers.CONTENT_START, None, None, None,
Packit Service a04d08
                              None)
Packit Service a04d08
        for i, m in enumerate(messages):
Packit Service a04d08
            headers = dict(m)
Packit Service a04d08
            fn = "part-%s" % (i + 1)
Packit Service a04d08
            payload = m.get_payload(decode=True)
Packit Service a04d08
            cloud_cfg.handle_part(None, headers['Content-Type'],
Packit Service a04d08
                                  fn, payload, None, headers)
Packit Service a04d08
        cloud_cfg.handle_part(None, handlers.CONTENT_END, None, None, None,
Packit Service a04d08
                              None)
Packit Service a04d08
        contents = util.load_file(paths.get_ipath('cloud_config'))
Packit Service a04d08
        contents = util.load_yaml(contents)
Packit Service a04d08
        self.assertEqual(contents['run'], ['b', 'c', 'stuff', 'morestuff'])
Packit Service a04d08
        self.assertEqual(contents['a'], 'be')
Packit Service a04d08
        self.assertEqual(contents['e'], [1, 2, 3])
Packit Service a04d08
        self.assertEqual(contents['p'], 1)
Packit Service a04d08
Packit Service a04d08
    def test_unhandled_type_warning(self):
Packit Service a04d08
        """Raw text without magic is ignored but shows warning."""
Packit Service a04d08
        self.reRoot()
Packit Service a04d08
        ci = stages.Init()
Packit Service a04d08
        data = "arbitrary text\n"
Packit Service a04d08
        ci.datasource = FakeDataSource(data)
Packit Service a04d08
Packit Service a04d08
        with mock.patch('cloudinit.util.write_file') as mockobj:
Packit Service a04d08
            log_file = self.capture_log(logging.WARNING)
Packit Service a04d08
            ci.fetch()
Packit Service a04d08
            ci.consume_data()
Packit Service a04d08
            self.assertIn(
Packit Service a04d08
                "Unhandled non-multipart (text/x-not-multipart) userdata:",
Packit Service a04d08
                log_file.getvalue())
Packit Service a04d08
Packit Service a04d08
        mockobj.assert_called_once_with(
Packit Service a04d08
            ci.paths.get_ipath("cloud_config"), "", 0o600)
Packit Service a04d08
Packit Service a04d08
    def test_mime_gzip_compressed(self):
Packit Service a04d08
        """Tests that individual message gzip encoding works."""
Packit Service a04d08
Packit Service a04d08
        def gzip_part(text):
Packit Service a04d08
            return MIMEApplication(gzip_text(text), 'gzip')
Packit Service a04d08
Packit Service a04d08
        base_content1 = '''
Packit Service a04d08
#cloud-config
Packit Service a04d08
a: 2
Packit Service a04d08
'''
Packit Service a04d08
Packit Service a04d08
        base_content2 = '''
Packit Service a04d08
#cloud-config
Packit Service a04d08
b: 3
Packit Service a04d08
c: 4
Packit Service a04d08
'''
Packit Service a04d08
Packit Service a04d08
        message = MIMEMultipart('test')
Packit Service a04d08
        message.attach(gzip_part(base_content1))
Packit Service a04d08
        message.attach(gzip_part(base_content2))
Packit Service a04d08
        ci = stages.Init()
Packit Service a04d08
        ci.datasource = FakeDataSource(str(message))
Packit Service a04d08
        self.reRoot()
Packit Service a04d08
        ci.fetch()
Packit Service a04d08
        ci.consume_data()
Packit Service a04d08
        contents = util.load_file(ci.paths.get_ipath("cloud_config"))
Packit Service a04d08
        contents = util.load_yaml(contents)
Packit Service a04d08
        self.assertTrue(isinstance(contents, dict))
Packit Service a04d08
        self.assertEqual(3, len(contents))
Packit Service a04d08
        self.assertEqual(2, contents['a'])
Packit Service a04d08
        self.assertEqual(3, contents['b'])
Packit Service a04d08
        self.assertEqual(4, contents['c'])
Packit Service a04d08
Packit Service a04d08
    def test_mime_text_plain(self):
Packit Service a04d08
        """Mime message of type text/plain is ignored but shows warning."""
Packit Service a04d08
        self.reRoot()
Packit Service a04d08
        ci = stages.Init()
Packit Service a04d08
        message = MIMEBase("text", "plain")
Packit Service a04d08
        message.set_payload("Just text")
Packit Service a04d08
        ci.datasource = FakeDataSource(message.as_string().encode())
Packit Service a04d08
Packit Service a04d08
        with mock.patch('cloudinit.util.write_file') as mockobj:
Packit Service a04d08
            log_file = self.capture_log(logging.WARNING)
Packit Service a04d08
            ci.fetch()
Packit Service a04d08
            ci.consume_data()
Packit Service a04d08
            self.assertIn(
Packit Service a04d08
                "Unhandled unknown content-type (text/plain)",
Packit Service a04d08
                log_file.getvalue())
Packit Service a04d08
        mockobj.assert_called_once_with(
Packit Service a04d08
            ci.paths.get_ipath("cloud_config"), "", 0o600)
Packit Service a04d08
Packit Service a04d08
    def test_shellscript(self):
Packit Service a04d08
        """Raw text starting #!/bin/sh is treated as script."""
Packit Service a04d08
        self.reRoot()
Packit Service a04d08
        ci = stages.Init()
Packit Service a04d08
        script = "#!/bin/sh\necho hello\n"
Packit Service a04d08
        ci.datasource = FakeDataSource(script)
Packit Service a04d08
Packit Service a04d08
        outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
Packit Service a04d08
Packit Service a04d08
        with mock.patch('cloudinit.util.write_file') as mockobj:
Packit Service a04d08
            log_file = self.capture_log(logging.WARNING)
Packit Service a04d08
            ci.fetch()
Packit Service a04d08
            ci.consume_data()
Packit Service a04d08
            self.assertEqual("", log_file.getvalue())
Packit Service a04d08
Packit Service a04d08
        mockobj.assert_has_calls([
Packit Service a04d08
            mock.call(outpath, script, 0o700),
Packit Service a04d08
            mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600)])
Packit Service a04d08
Packit Service a04d08
    def test_mime_text_x_shellscript(self):
Packit Service a04d08
        """Mime message of type text/x-shellscript is treated as script."""
Packit Service a04d08
        self.reRoot()
Packit Service a04d08
        ci = stages.Init()
Packit Service a04d08
        script = "#!/bin/sh\necho hello\n"
Packit Service a04d08
        message = MIMEBase("text", "x-shellscript")
Packit Service a04d08
        message.set_payload(script)
Packit Service a04d08
        ci.datasource = FakeDataSource(message.as_string())
Packit Service a04d08
Packit Service a04d08
        outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
Packit Service a04d08
Packit Service a04d08
        with mock.patch('cloudinit.util.write_file') as mockobj:
Packit Service a04d08
            log_file = self.capture_log(logging.WARNING)
Packit Service a04d08
            ci.fetch()
Packit Service a04d08
            ci.consume_data()
Packit Service a04d08
            self.assertEqual("", log_file.getvalue())
Packit Service a04d08
Packit Service a04d08
        mockobj.assert_has_calls([
Packit Service a04d08
            mock.call(outpath, script, 0o700),
Packit Service a04d08
            mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600)])
Packit Service a04d08
Packit Service a04d08
    def test_mime_text_plain_shell(self):
Packit Service a04d08
        """Mime type text/plain starting #!/bin/sh is treated as script."""
Packit Service a04d08
        self.reRoot()
Packit Service a04d08
        ci = stages.Init()
Packit Service a04d08
        script = "#!/bin/sh\necho hello\n"
Packit Service a04d08
        message = MIMEBase("text", "plain")
Packit Service a04d08
        message.set_payload(script)
Packit Service a04d08
        ci.datasource = FakeDataSource(message.as_string())
Packit Service a04d08
Packit Service a04d08
        outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
Packit Service a04d08
Packit Service a04d08
        with mock.patch('cloudinit.util.write_file') as mockobj:
Packit Service a04d08
            log_file = self.capture_log(logging.WARNING)
Packit Service a04d08
            ci.fetch()
Packit Service a04d08
            ci.consume_data()
Packit Service a04d08
            self.assertEqual("", log_file.getvalue())
Packit Service a04d08
Packit Service a04d08
        mockobj.assert_has_calls([
Packit Service a04d08
            mock.call(outpath, script, 0o700),
Packit Service a04d08
            mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600)])
Packit Service a04d08
Packit Service a04d08
    def test_mime_application_octet_stream(self):
Packit Service a04d08
        """Mime type application/octet-stream is ignored but shows warning."""
Packit Service a04d08
        self.reRoot()
Packit Service a04d08
        ci = stages.Init()
Packit Service a04d08
        message = MIMEBase("application", "octet-stream")
Packit Service a04d08
        message.set_payload(b'\xbf\xe6\xb2\xc3\xd3\xba\x13\xa4\xd8\xa1\xcc')
Packit Service a04d08
        encoders.encode_base64(message)
Packit Service a04d08
        ci.datasource = FakeDataSource(message.as_string().encode())
Packit Service a04d08
Packit Service a04d08
        with mock.patch('cloudinit.util.write_file') as mockobj:
Packit Service a04d08
            log_file = self.capture_log(logging.WARNING)
Packit Service a04d08
            ci.fetch()
Packit Service a04d08
            ci.consume_data()
Packit Service a04d08
            self.assertIn(
Packit Service a04d08
                "Unhandled unknown content-type (application/octet-stream)",
Packit Service a04d08
                log_file.getvalue())
Packit Service a04d08
        mockobj.assert_called_once_with(
Packit Service a04d08
            ci.paths.get_ipath("cloud_config"), "", 0o600)
Packit Service a04d08
Packit Service a04d08
    def test_cloud_config_archive(self):
Packit Service a04d08
        non_decodable = b'\x11\xc9\xb4gTH\xee\x12'
Packit Service a04d08
        data = [{'content': '#cloud-config\npassword: gocubs\n'},
Packit Service a04d08
                {'content': '#cloud-config\nlocale: chicago\n'},
Packit Service a04d08
                {'content': non_decodable}]
Packit Service a04d08
        message = b'#cloud-config-archive\n' + safeyaml.dumps(data).encode()
Packit Service a04d08
Packit Service a04d08
        self.reRoot()
Packit Service a04d08
        ci = stages.Init()
Packit Service a04d08
        ci.datasource = FakeDataSource(message)
Packit Service a04d08
Packit Service a04d08
        fs = {}
Packit Service a04d08
Packit Service a04d08
        def fsstore(filename, content, mode=0o0644, omode="wb"):
Packit Service a04d08
            fs[filename] = content
Packit Service a04d08
Packit Service a04d08
        # consuming the user-data provided should write 'cloud_config' file
Packit Service a04d08
        # which will have our yaml in it.
Packit Service a04d08
        with mock.patch('cloudinit.util.write_file') as mockobj:
Packit Service a04d08
            mockobj.side_effect = fsstore
Packit Service a04d08
            ci.fetch()
Packit Service a04d08
            ci.consume_data()
Packit Service a04d08
Packit Service a04d08
        cfg = util.load_yaml(fs[ci.paths.get_ipath("cloud_config")])
Packit Service a04d08
        self.assertEqual(cfg.get('password'), 'gocubs')
Packit Service a04d08
        self.assertEqual(cfg.get('locale'), 'chicago')
Packit Service a04d08
Packit Service a04d08
    @mock.patch('cloudinit.util.read_conf_with_confd')
Packit Service a04d08
    def test_dont_allow_user_data(self, mock_cfg):
Packit Service a04d08
        mock_cfg.return_value = {"allow_userdata": False}
Packit Service a04d08
Packit Service a04d08
        # test that user-data is ignored but vendor-data is kept
Packit Service a04d08
        user_blob = '''
Packit Service a04d08
#cloud-config-jsonp
Packit Service a04d08
[
Packit Service a04d08
     { "op": "add", "path": "/baz", "value": "qux" },
Packit Service a04d08
     { "op": "add", "path": "/bar", "value": "qux2" }
Packit Service a04d08
]
Packit Service a04d08
'''
Packit Service a04d08
        vendor_blob = '''
Packit Service a04d08
#cloud-config-jsonp
Packit Service a04d08
[
Packit Service a04d08
     { "op": "add", "path": "/baz", "value": "quxA" },
Packit Service a04d08
     { "op": "add", "path": "/bar", "value": "quxB" },
Packit Service a04d08
     { "op": "add", "path": "/foo", "value": "quxC" }
Packit Service a04d08
]
Packit Service a04d08
'''
Packit Service a04d08
        self.reRoot()
Packit Service a04d08
        initer = stages.Init()
Packit Service a04d08
        initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
Packit Service a04d08
        initer.read_cfg()
Packit Service a04d08
        initer.initialize()
Packit Service a04d08
        initer.fetch()
Packit Service a04d08
        initer.instancify()
Packit Service a04d08
        initer.update()
Packit Service a04d08
        initer.cloudify().run('consume_data',
Packit Service a04d08
                              initer.consume_data,
Packit Service a04d08
                              args=[PER_INSTANCE],
Packit Service a04d08
                              freq=PER_INSTANCE)
Packit Service a04d08
        mods = stages.Modules(initer)
Packit Service a04d08
        (_which_ran, _failures) = mods.run_section('cloud_init_modules')
Packit Service a04d08
        cfg = mods.cfg
Packit Service a04d08
        self.assertIn('vendor_data', cfg)
Packit Service a04d08
        self.assertEqual('quxA', cfg['baz'])
Packit Service a04d08
        self.assertEqual('quxB', cfg['bar'])
Packit Service a04d08
        self.assertEqual('quxC', cfg['foo'])
Packit Service a04d08
Packit Service a04d08
Packit Service a04d08
class TestConsumeUserDataHttp(TestConsumeUserData, helpers.HttprettyTestCase):
Packit Service a04d08
Packit Service a04d08
    def setUp(self):
Packit Service a04d08
        TestConsumeUserData.setUp(self)
Packit Service a04d08
        helpers.HttprettyTestCase.setUp(self)
Packit Service a04d08
Packit Service a04d08
    def tearDown(self):
Packit Service a04d08
        TestConsumeUserData.tearDown(self)
Packit Service a04d08
        helpers.HttprettyTestCase.tearDown(self)
Packit Service a04d08
Packit Service a04d08
    @mock.patch('cloudinit.url_helper.time.sleep')
Packit Service a04d08
    def test_include(self, mock_sleep):
Packit Service a04d08
        """Test #include."""
Packit Service a04d08
        included_url = 'http://hostname/path'
Packit Service a04d08
        included_data = '#cloud-config\nincluded: true\n'
Packit Service a04d08
        httpretty.register_uri(httpretty.GET, included_url, included_data)
Packit Service a04d08
Packit Service a04d08
        blob = '#include\n%s\n' % included_url
Packit Service a04d08
Packit Service a04d08
        self.reRoot()
Packit Service a04d08
        ci = stages.Init()
Packit Service a04d08
        ci.datasource = FakeDataSource(blob)
Packit Service a04d08
        ci.fetch()
Packit Service a04d08
        ci.consume_data()
Packit Service a04d08
        cc_contents = util.load_file(ci.paths.get_ipath("cloud_config"))
Packit Service a04d08
        cc = util.load_yaml(cc_contents)
Packit Service a04d08
        self.assertTrue(cc.get('included'))
Packit Service a04d08
Packit Service a04d08
    @mock.patch('cloudinit.url_helper.time.sleep')
Packit Service a04d08
    def test_include_bad_url(self, mock_sleep):
Packit Service a04d08
        """Test #include with a bad URL."""
Packit Service a04d08
        bad_url = 'http://bad/forbidden'
Packit Service a04d08
        bad_data = '#cloud-config\nbad: true\n'
Packit Service a04d08
        httpretty.register_uri(httpretty.GET, bad_url, bad_data, status=403)
Packit Service a04d08
Packit Service a04d08
        included_url = 'http://hostname/path'
Packit Service a04d08
        included_data = '#cloud-config\nincluded: true\n'
Packit Service a04d08
        httpretty.register_uri(httpretty.GET, included_url, included_data)
Packit Service a04d08
Packit Service a04d08
        blob = '#include\n%s\n%s' % (bad_url, included_url)
Packit Service a04d08
Packit Service a04d08
        self.reRoot()
Packit Service a04d08
        ci = stages.Init()
Packit Service a04d08
        ci.datasource = FakeDataSource(blob)
Packit Service 9bfd13
        ci.fetch()
Packit Service 9bfd13
        with self.assertRaises(Exception) as context:
Packit Service 9bfd13
            ci.consume_data()
Packit Service 9bfd13
        self.assertIn('403', str(context.exception))
Packit Service 9bfd13
Packit Service 9bfd13
        with self.assertRaises(FileNotFoundError):
Packit Service 9bfd13
            util.load_file(ci.paths.get_ipath("cloud_config"))
Packit Service 9bfd13
Packit Service 9bfd13
    @mock.patch('cloudinit.url_helper.time.sleep')
Packit Service 9bfd13
    @mock.patch(
Packit Service 9bfd13
        "cloudinit.user_data.features.ERROR_ON_USER_DATA_FAILURE", False
Packit Service 9bfd13
    )
Packit Service 9bfd13
    def test_include_bad_url_no_fail(self, mock_sleep):
Packit Service 9bfd13
        """Test #include with a bad URL and failure disabled"""
Packit Service 9bfd13
        bad_url = 'http://bad/forbidden'
Packit Service 9bfd13
        bad_data = '#cloud-config\nbad: true\n'
Packit Service 9bfd13
        httpretty.register_uri(httpretty.GET, bad_url, bad_data, status=403)
Packit Service 9bfd13
Packit Service 9bfd13
        included_url = 'http://hostname/path'
Packit Service 9bfd13
        included_data = '#cloud-config\nincluded: true\n'
Packit Service 9bfd13
        httpretty.register_uri(httpretty.GET, included_url, included_data)
Packit Service 9bfd13
Packit Service 9bfd13
        blob = '#include\n%s\n%s' % (bad_url, included_url)
Packit Service 9bfd13
Packit Service 9bfd13
        self.reRoot()
Packit Service 9bfd13
        ci = stages.Init()
Packit Service 9bfd13
        ci.datasource = FakeDataSource(blob)
Packit Service a04d08
        log_file = self.capture_log(logging.WARNING)
Packit Service a04d08
        ci.fetch()
Packit Service a04d08
        ci.consume_data()
Packit Service a04d08
Packit Service a04d08
        self.assertIn("403 Client Error: Forbidden for url: %s" % bad_url,
Packit Service a04d08
                      log_file.getvalue())
Packit Service a04d08
Packit Service a04d08
        cc_contents = util.load_file(ci.paths.get_ipath("cloud_config"))
Packit Service a04d08
        cc = util.load_yaml(cc_contents)
Packit Service a04d08
        self.assertIsNone(cc.get('bad'))
Packit Service a04d08
        self.assertTrue(cc.get('included'))
Packit Service a04d08
Packit Service a04d08
Packit Service a04d08
class TestUDProcess(helpers.ResourceUsingTestCase):
Packit Service a04d08
Packit Service a04d08
    def test_bytes_in_userdata(self):
Packit Service a04d08
        msg = b'#cloud-config\napt_update: True\n'
Packit Service a04d08
        ud_proc = ud.UserDataProcessor(self.getCloudPaths())
Packit Service a04d08
        message = ud_proc.process(msg)
Packit Service a04d08
        self.assertTrue(count_messages(message) == 1)
Packit Service a04d08
Packit Service a04d08
    def test_string_in_userdata(self):
Packit Service a04d08
        msg = '#cloud-config\napt_update: True\n'
Packit Service a04d08
Packit Service a04d08
        ud_proc = ud.UserDataProcessor(self.getCloudPaths())
Packit Service a04d08
        message = ud_proc.process(msg)
Packit Service a04d08
        self.assertTrue(count_messages(message) == 1)
Packit Service a04d08
Packit Service a04d08
    def test_compressed_in_userdata(self):
Packit Service a04d08
        msg = gzip_text('#cloud-config\napt_update: True\n')
Packit Service a04d08
Packit Service a04d08
        ud_proc = ud.UserDataProcessor(self.getCloudPaths())
Packit Service a04d08
        message = ud_proc.process(msg)
Packit Service a04d08
        self.assertTrue(count_messages(message) == 1)
Packit Service a04d08
Packit Service a04d08
Packit Service a04d08
class TestConvertString(helpers.TestCase):
Packit Service a04d08
Packit Service a04d08
    def test_handles_binary_non_utf8_decodable(self):
Packit Service a04d08
        """Printable unicode (not utf8-decodable) is safely converted."""
Packit Service a04d08
        blob = b'#!/bin/bash\necho \xc3\x84\n'
Packit Service a04d08
        msg = ud.convert_string(blob)
Packit Service a04d08
        self.assertEqual(blob, msg.get_payload(decode=True))
Packit Service a04d08
Packit Service a04d08
    def test_handles_binary_utf8_decodable(self):
Packit Service a04d08
        blob = b'\x32\x32'
Packit Service a04d08
        msg = ud.convert_string(blob)
Packit Service a04d08
        self.assertEqual(blob, msg.get_payload(decode=True))
Packit Service a04d08
Packit Service a04d08
    def test_handle_headers(self):
Packit Service a04d08
        text = "hi mom"
Packit Service a04d08
        msg = ud.convert_string(text)
Packit Service a04d08
        self.assertEqual(text, msg.get_payload(decode=False))
Packit Service a04d08
Packit Service a04d08
    def test_handle_mime_parts(self):
Packit Service a04d08
        """Mime parts are properly returned as a mime message."""
Packit Service a04d08
        message = MIMEBase("text", "plain")
Packit Service a04d08
        message.set_payload("Just text")
Packit Service a04d08
        msg = ud.convert_string(str(message))
Packit Service a04d08
        self.assertEqual("Just text", msg.get_payload(decode=False))
Packit Service a04d08
Packit Service a04d08
Packit Service a04d08
class TestFetchBaseConfig(helpers.TestCase):
Packit Service a04d08
    def test_only_builtin_gets_builtin(self):
Packit Service a04d08
        ret = helpers.wrap_and_call(
Packit Service a04d08
            'cloudinit.stages',
Packit Service a04d08
            {'util.read_conf_with_confd': None,
Packit Service a04d08
             'util.read_conf_from_cmdline': None,
Packit Service a04d08
             'read_runtime_config': {'return_value': {}}},
Packit Service a04d08
            stages.fetch_base_config)
Packit Service a04d08
        self.assertEqual(util.get_builtin_cfg(), ret)
Packit Service a04d08
Packit Service a04d08
    def test_conf_d_overrides_defaults(self):
Packit Service a04d08
        builtin = util.get_builtin_cfg()
Packit Service a04d08
        test_key = sorted(builtin)[0]
Packit Service a04d08
        test_value = 'test'
Packit Service a04d08
        ret = helpers.wrap_and_call(
Packit Service a04d08
            'cloudinit.stages',
Packit Service a04d08
            {'util.read_conf_with_confd':
Packit Service a04d08
                {'return_value': {test_key: test_value}},
Packit Service a04d08
             'util.read_conf_from_cmdline': None,
Packit Service a04d08
             'read_runtime_config': {'return_value': {}}},
Packit Service a04d08
            stages.fetch_base_config)
Packit Service a04d08
        self.assertEqual(ret.get(test_key), test_value)
Packit Service a04d08
        builtin[test_key] = test_value
Packit Service a04d08
        self.assertEqual(ret, builtin)
Packit Service a04d08
Packit Service a04d08
    def test_cmdline_overrides_defaults(self):
Packit Service a04d08
        builtin = util.get_builtin_cfg()
Packit Service a04d08
        test_key = sorted(builtin)[0]
Packit Service a04d08
        test_value = 'test'
Packit Service a04d08
        cmdline = {test_key: test_value}
Packit Service a04d08
        ret = helpers.wrap_and_call(
Packit Service a04d08
            'cloudinit.stages',
Packit Service a04d08
            {'util.read_conf_from_cmdline': {'return_value': cmdline},
Packit Service a04d08
             'util.read_conf_with_confd': None,
Packit Service a04d08
             'read_runtime_config': None},
Packit Service a04d08
            stages.fetch_base_config)
Packit Service a04d08
        self.assertEqual(ret.get(test_key), test_value)
Packit Service a04d08
        builtin[test_key] = test_value
Packit Service a04d08
        self.assertEqual(ret, builtin)
Packit Service a04d08
Packit Service a04d08
    def test_cmdline_overrides_confd_runtime_and_defaults(self):
Packit Service a04d08
        builtin = {'key1': 'value0', 'key3': 'other2'}
Packit Service a04d08
        conf_d = {'key1': 'value1', 'key2': 'other1'}
Packit Service a04d08
        cmdline = {'key3': 'other3', 'key2': 'other2'}
Packit Service a04d08
        runtime = {'key3': 'runtime3'}
Packit Service a04d08
        ret = helpers.wrap_and_call(
Packit Service a04d08
            'cloudinit.stages',
Packit Service a04d08
            {'util.read_conf_with_confd': {'return_value': conf_d},
Packit Service a04d08
             'util.get_builtin_cfg': {'return_value': builtin},
Packit Service a04d08
             'read_runtime_config': {'return_value': runtime},
Packit Service a04d08
             'util.read_conf_from_cmdline': {'return_value': cmdline}},
Packit Service a04d08
            stages.fetch_base_config)
Packit Service a04d08
        self.assertEqual(ret, {'key1': 'value1', 'key2': 'other2',
Packit Service a04d08
                               'key3': 'other3'})
Packit Service a04d08
Packit Service a04d08
    def test_order_precedence_is_builtin_system_runtime_cmdline(self):
Packit Service a04d08
        builtin = {'key1': 'builtin0', 'key3': 'builtin3'}
Packit Service a04d08
        conf_d = {'key1': 'confd1', 'key2': 'confd2', 'keyconfd1': 'kconfd1'}
Packit Service a04d08
        runtime = {'key1': 'runtime1', 'key2': 'runtime2'}
Packit Service a04d08
        cmdline = {'key1': 'cmdline1'}
Packit Service a04d08
        ret = helpers.wrap_and_call(
Packit Service a04d08
            'cloudinit.stages',
Packit Service a04d08
            {'util.read_conf_with_confd': {'return_value': conf_d},
Packit Service a04d08
             'util.get_builtin_cfg': {'return_value': builtin},
Packit Service a04d08
             'util.read_conf_from_cmdline': {'return_value': cmdline},
Packit Service a04d08
             'read_runtime_config': {'return_value': runtime},
Packit Service a04d08
             },
Packit Service a04d08
            stages.fetch_base_config)
Packit Service a04d08
        self.assertEqual(ret, {'key1': 'cmdline1', 'key2': 'runtime2',
Packit Service a04d08
                               'key3': 'builtin3', 'keyconfd1': 'kconfd1'})
Packit Service a04d08
Packit Service a04d08
# vi: ts=4 expandtab