|
Packit Service |
a04d08 |
# This file is part of cloud-init. See LICENSE file for license information.
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
"""Tests for handling of userdata within cloud init."""
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
import gzip
|
|
Packit Service |
a04d08 |
import logging
|
|
Packit Service |
a04d08 |
import os
|
|
Packit Service |
9bfd13 |
from io import BytesIO, StringIO
|
|
Packit Service |
9bfd13 |
from unittest import mock
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
from email import encoders
|
|
Packit Service |
a04d08 |
from email.mime.application import MIMEApplication
|
|
Packit Service |
a04d08 |
from email.mime.base import MIMEBase
|
|
Packit Service |
a04d08 |
from email.mime.multipart import MIMEMultipart
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
import httpretty
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
from cloudinit import handlers
|
|
Packit Service |
a04d08 |
from cloudinit import helpers as c_helpers
|
|
Packit Service |
a04d08 |
from cloudinit import log
|
|
Packit Service |
a04d08 |
from cloudinit.settings import (PER_INSTANCE)
|
|
Packit Service |
a04d08 |
from cloudinit import sources
|
|
Packit Service |
a04d08 |
from cloudinit import stages
|
|
Packit Service |
a04d08 |
from cloudinit import user_data as ud
|
|
Packit Service |
a04d08 |
from cloudinit import safeyaml
|
|
Packit Service |
a04d08 |
from cloudinit import util
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
from cloudinit.tests import helpers
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
INSTANCE_ID = "i-testing"
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class FakeDataSource(sources.DataSource):
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def __init__(self, userdata=None, vendordata=None):
|
|
Packit Service |
a04d08 |
sources.DataSource.__init__(self, {}, None, None)
|
|
Packit Service |
a04d08 |
self.metadata = {'instance-id': INSTANCE_ID}
|
|
Packit Service |
a04d08 |
self.userdata_raw = userdata
|
|
Packit Service |
a04d08 |
self.vendordata_raw = vendordata
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def count_messages(root):
|
|
Packit Service |
a04d08 |
am = 0
|
|
Packit Service |
a04d08 |
for m in root.walk():
|
|
Packit Service |
a04d08 |
if ud.is_skippable(m):
|
|
Packit Service |
a04d08 |
continue
|
|
Packit Service |
a04d08 |
am += 1
|
|
Packit Service |
a04d08 |
return am
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def gzip_text(text):
|
|
Packit Service |
a04d08 |
contents = BytesIO()
|
|
Packit Service |
a04d08 |
f = gzip.GzipFile(fileobj=contents, mode='wb')
|
|
Packit Service |
a04d08 |
f.write(util.encode_text(text))
|
|
Packit Service |
a04d08 |
f.flush()
|
|
Packit Service |
a04d08 |
f.close()
|
|
Packit Service |
a04d08 |
return contents.getvalue()
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
# FIXME: these tests shouldn't be checking log output??
|
|
Packit Service |
a04d08 |
# Weirddddd...
|
|
Packit Service |
a04d08 |
class TestConsumeUserData(helpers.FilesystemMockingTestCase):
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def setUp(self):
|
|
Packit Service |
a04d08 |
super(TestConsumeUserData, self).setUp()
|
|
Packit Service |
a04d08 |
self._log = None
|
|
Packit Service |
a04d08 |
self._log_file = None
|
|
Packit Service |
a04d08 |
self._log_handler = None
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def tearDown(self):
|
|
Packit Service |
a04d08 |
if self._log_handler and self._log:
|
|
Packit Service |
a04d08 |
self._log.removeHandler(self._log_handler)
|
|
Packit Service |
a04d08 |
helpers.FilesystemMockingTestCase.tearDown(self)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def _patchIn(self, root):
|
|
Packit Service |
a04d08 |
self.patchOS(root)
|
|
Packit Service |
a04d08 |
self.patchUtils(root)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def capture_log(self, lvl=logging.DEBUG):
|
|
Packit Service |
a04d08 |
log_file = StringIO()
|
|
Packit Service |
a04d08 |
self._log_handler = logging.StreamHandler(log_file)
|
|
Packit Service |
a04d08 |
self._log_handler.setLevel(lvl)
|
|
Packit Service |
a04d08 |
self._log = log.getLogger()
|
|
Packit Service |
a04d08 |
self._log.addHandler(self._log_handler)
|
|
Packit Service |
a04d08 |
return log_file
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_simple_jsonp(self):
|
|
Packit Service |
a04d08 |
blob = '''
|
|
Packit Service |
a04d08 |
#cloud-config-jsonp
|
|
Packit Service |
a04d08 |
[
|
|
Packit Service |
a04d08 |
{ "op": "add", "path": "/baz", "value": "qux" },
|
|
Packit Service |
a04d08 |
{ "op": "add", "path": "/bar", "value": "qux2" }
|
|
Packit Service |
a04d08 |
]
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
ci = stages.Init()
|
|
Packit Service |
a04d08 |
ci.datasource = FakeDataSource(blob)
|
|
Packit Service |
a04d08 |
self.reRoot()
|
|
Packit Service |
a04d08 |
ci.fetch()
|
|
Packit Service |
a04d08 |
ci.consume_data()
|
|
Packit Service |
a04d08 |
cc_contents = util.load_file(ci.paths.get_ipath("cloud_config"))
|
|
Packit Service |
a04d08 |
cc = util.load_yaml(cc_contents)
|
|
Packit Service |
a04d08 |
self.assertEqual(2, len(cc))
|
|
Packit Service |
a04d08 |
self.assertEqual('qux', cc['baz'])
|
|
Packit Service |
a04d08 |
self.assertEqual('qux2', cc['bar'])
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_simple_jsonp_vendor_and_user(self):
|
|
Packit Service |
a04d08 |
# test that user-data wins over vendor
|
|
Packit Service |
a04d08 |
user_blob = '''
|
|
Packit Service |
a04d08 |
#cloud-config-jsonp
|
|
Packit Service |
a04d08 |
[
|
|
Packit Service |
a04d08 |
{ "op": "add", "path": "/baz", "value": "qux" },
|
|
Packit Service |
a04d08 |
{ "op": "add", "path": "/bar", "value": "qux2" }
|
|
Packit Service |
a04d08 |
]
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
vendor_blob = '''
|
|
Packit Service |
a04d08 |
#cloud-config-jsonp
|
|
Packit Service |
a04d08 |
[
|
|
Packit Service |
a04d08 |
{ "op": "add", "path": "/baz", "value": "quxA" },
|
|
Packit Service |
a04d08 |
{ "op": "add", "path": "/bar", "value": "quxB" },
|
|
Packit Service |
a04d08 |
{ "op": "add", "path": "/foo", "value": "quxC" }
|
|
Packit Service |
a04d08 |
]
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
self.reRoot()
|
|
Packit Service |
a04d08 |
initer = stages.Init()
|
|
Packit Service |
a04d08 |
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
|
|
Packit Service |
a04d08 |
initer.read_cfg()
|
|
Packit Service |
a04d08 |
initer.initialize()
|
|
Packit Service |
a04d08 |
initer.fetch()
|
|
Packit Service |
a04d08 |
initer.instancify()
|
|
Packit Service |
a04d08 |
initer.update()
|
|
Packit Service |
a04d08 |
initer.cloudify().run('consume_data',
|
|
Packit Service |
a04d08 |
initer.consume_data,
|
|
Packit Service |
a04d08 |
args=[PER_INSTANCE],
|
|
Packit Service |
a04d08 |
freq=PER_INSTANCE)
|
|
Packit Service |
a04d08 |
mods = stages.Modules(initer)
|
|
Packit Service |
a04d08 |
(_which_ran, _failures) = mods.run_section('cloud_init_modules')
|
|
Packit Service |
a04d08 |
cfg = mods.cfg
|
|
Packit Service |
a04d08 |
self.assertIn('vendor_data', cfg)
|
|
Packit Service |
a04d08 |
self.assertEqual('qux', cfg['baz'])
|
|
Packit Service |
a04d08 |
self.assertEqual('qux2', cfg['bar'])
|
|
Packit Service |
a04d08 |
self.assertEqual('quxC', cfg['foo'])
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_simple_jsonp_no_vendor_consumed(self):
|
|
Packit Service |
a04d08 |
# make sure that vendor data is not consumed
|
|
Packit Service |
a04d08 |
user_blob = '''
|
|
Packit Service |
a04d08 |
#cloud-config-jsonp
|
|
Packit Service |
a04d08 |
[
|
|
Packit Service |
a04d08 |
{ "op": "add", "path": "/baz", "value": "qux" },
|
|
Packit Service |
a04d08 |
{ "op": "add", "path": "/bar", "value": "qux2" },
|
|
Packit Service |
a04d08 |
{ "op": "add", "path": "/vendor_data", "value": {"enabled": "false"}}
|
|
Packit Service |
a04d08 |
]
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
vendor_blob = '''
|
|
Packit Service |
a04d08 |
#cloud-config-jsonp
|
|
Packit Service |
a04d08 |
[
|
|
Packit Service |
a04d08 |
{ "op": "add", "path": "/baz", "value": "quxA" },
|
|
Packit Service |
a04d08 |
{ "op": "add", "path": "/bar", "value": "quxB" },
|
|
Packit Service |
a04d08 |
{ "op": "add", "path": "/foo", "value": "quxC" }
|
|
Packit Service |
a04d08 |
]
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
self.reRoot()
|
|
Packit Service |
a04d08 |
initer = stages.Init()
|
|
Packit Service |
a04d08 |
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
|
|
Packit Service |
a04d08 |
initer.read_cfg()
|
|
Packit Service |
a04d08 |
initer.initialize()
|
|
Packit Service |
a04d08 |
initer.fetch()
|
|
Packit Service |
a04d08 |
initer.instancify()
|
|
Packit Service |
a04d08 |
initer.update()
|
|
Packit Service |
a04d08 |
initer.cloudify().run('consume_data',
|
|
Packit Service |
a04d08 |
initer.consume_data,
|
|
Packit Service |
a04d08 |
args=[PER_INSTANCE],
|
|
Packit Service |
a04d08 |
freq=PER_INSTANCE)
|
|
Packit Service |
a04d08 |
mods = stages.Modules(initer)
|
|
Packit Service |
a04d08 |
(_which_ran, _failures) = mods.run_section('cloud_init_modules')
|
|
Packit Service |
a04d08 |
cfg = mods.cfg
|
|
Packit Service |
a04d08 |
self.assertEqual('qux', cfg['baz'])
|
|
Packit Service |
a04d08 |
self.assertEqual('qux2', cfg['bar'])
|
|
Packit Service |
a04d08 |
self.assertNotIn('foo', cfg)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_mixed_cloud_config(self):
|
|
Packit Service |
a04d08 |
blob_cc = '''
|
|
Packit Service |
a04d08 |
#cloud-config
|
|
Packit Service |
a04d08 |
a: b
|
|
Packit Service |
a04d08 |
c: d
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
message_cc = MIMEBase("text", "cloud-config")
|
|
Packit Service |
a04d08 |
message_cc.set_payload(blob_cc)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
blob_jp = '''
|
|
Packit Service |
a04d08 |
#cloud-config-jsonp
|
|
Packit Service |
a04d08 |
[
|
|
Packit Service |
a04d08 |
{ "op": "replace", "path": "/a", "value": "c" },
|
|
Packit Service |
a04d08 |
{ "op": "remove", "path": "/c" }
|
|
Packit Service |
a04d08 |
]
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
message_jp = MIMEBase('text', "cloud-config-jsonp")
|
|
Packit Service |
a04d08 |
message_jp.set_payload(blob_jp)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
message = MIMEMultipart()
|
|
Packit Service |
a04d08 |
message.attach(message_cc)
|
|
Packit Service |
a04d08 |
message.attach(message_jp)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
self.reRoot()
|
|
Packit Service |
a04d08 |
ci = stages.Init()
|
|
Packit Service |
a04d08 |
ci.datasource = FakeDataSource(str(message))
|
|
Packit Service |
a04d08 |
ci.fetch()
|
|
Packit Service |
a04d08 |
ci.consume_data()
|
|
Packit Service |
a04d08 |
cc_contents = util.load_file(ci.paths.get_ipath("cloud_config"))
|
|
Packit Service |
a04d08 |
cc = util.load_yaml(cc_contents)
|
|
Packit Service |
a04d08 |
self.assertEqual(1, len(cc))
|
|
Packit Service |
a04d08 |
self.assertEqual('c', cc['a'])
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
9bfd13 |
def test_cloud_config_as_x_shell_script(self):
|
|
Packit Service |
9bfd13 |
blob_cc = '''
|
|
Packit Service |
9bfd13 |
#cloud-config
|
|
Packit Service |
9bfd13 |
a: b
|
|
Packit Service |
9bfd13 |
c: d
|
|
Packit Service |
9bfd13 |
'''
|
|
Packit Service |
9bfd13 |
message_cc = MIMEBase("text", "x-shellscript")
|
|
Packit Service |
9bfd13 |
message_cc.set_payload(blob_cc)
|
|
Packit Service |
9bfd13 |
|
|
Packit Service |
9bfd13 |
blob_jp = '''
|
|
Packit Service |
9bfd13 |
#cloud-config-jsonp
|
|
Packit Service |
9bfd13 |
[
|
|
Packit Service |
9bfd13 |
{ "op": "replace", "path": "/a", "value": "c" },
|
|
Packit Service |
9bfd13 |
{ "op": "remove", "path": "/c" }
|
|
Packit Service |
9bfd13 |
]
|
|
Packit Service |
9bfd13 |
'''
|
|
Packit Service |
9bfd13 |
|
|
Packit Service |
9bfd13 |
message_jp = MIMEBase('text', "cloud-config-jsonp")
|
|
Packit Service |
9bfd13 |
message_jp.set_payload(blob_jp)
|
|
Packit Service |
9bfd13 |
|
|
Packit Service |
9bfd13 |
message = MIMEMultipart()
|
|
Packit Service |
9bfd13 |
message.attach(message_cc)
|
|
Packit Service |
9bfd13 |
message.attach(message_jp)
|
|
Packit Service |
9bfd13 |
|
|
Packit Service |
9bfd13 |
self.reRoot()
|
|
Packit Service |
9bfd13 |
ci = stages.Init()
|
|
Packit Service |
9bfd13 |
ci.datasource = FakeDataSource(str(message))
|
|
Packit Service |
9bfd13 |
ci.fetch()
|
|
Packit Service |
9bfd13 |
ci.consume_data()
|
|
Packit Service |
9bfd13 |
cc_contents = util.load_file(ci.paths.get_ipath("cloud_config"))
|
|
Packit Service |
9bfd13 |
cc = util.load_yaml(cc_contents)
|
|
Packit Service |
9bfd13 |
self.assertEqual(1, len(cc))
|
|
Packit Service |
9bfd13 |
self.assertEqual('c', cc['a'])
|
|
Packit Service |
9bfd13 |
|
|
Packit Service |
a04d08 |
def test_vendor_user_yaml_cloud_config(self):
|
|
Packit Service |
a04d08 |
vendor_blob = '''
|
|
Packit Service |
a04d08 |
#cloud-config
|
|
Packit Service |
a04d08 |
a: b
|
|
Packit Service |
a04d08 |
name: vendor
|
|
Packit Service |
a04d08 |
run:
|
|
Packit Service |
a04d08 |
- x
|
|
Packit Service |
a04d08 |
- y
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
user_blob = '''
|
|
Packit Service |
a04d08 |
#cloud-config
|
|
Packit Service |
a04d08 |
a: c
|
|
Packit Service |
a04d08 |
vendor_data:
|
|
Packit Service |
a04d08 |
enabled: True
|
|
Packit Service |
a04d08 |
prefix: /bin/true
|
|
Packit Service |
a04d08 |
name: user
|
|
Packit Service |
a04d08 |
run:
|
|
Packit Service |
a04d08 |
- z
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
self.reRoot()
|
|
Packit Service |
a04d08 |
initer = stages.Init()
|
|
Packit Service |
a04d08 |
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
|
|
Packit Service |
a04d08 |
initer.read_cfg()
|
|
Packit Service |
a04d08 |
initer.initialize()
|
|
Packit Service |
a04d08 |
initer.fetch()
|
|
Packit Service |
a04d08 |
initer.instancify()
|
|
Packit Service |
a04d08 |
initer.update()
|
|
Packit Service |
a04d08 |
initer.cloudify().run('consume_data',
|
|
Packit Service |
a04d08 |
initer.consume_data,
|
|
Packit Service |
a04d08 |
args=[PER_INSTANCE],
|
|
Packit Service |
a04d08 |
freq=PER_INSTANCE)
|
|
Packit Service |
a04d08 |
mods = stages.Modules(initer)
|
|
Packit Service |
a04d08 |
(_which_ran, _failures) = mods.run_section('cloud_init_modules')
|
|
Packit Service |
a04d08 |
cfg = mods.cfg
|
|
Packit Service |
a04d08 |
self.assertIn('vendor_data', cfg)
|
|
Packit Service |
a04d08 |
self.assertEqual('c', cfg['a'])
|
|
Packit Service |
a04d08 |
self.assertEqual('user', cfg['name'])
|
|
Packit Service |
a04d08 |
self.assertNotIn('x', cfg['run'])
|
|
Packit Service |
a04d08 |
self.assertNotIn('y', cfg['run'])
|
|
Packit Service |
a04d08 |
self.assertIn('z', cfg['run'])
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_vendordata_script(self):
|
|
Packit Service |
a04d08 |
vendor_blob = '''
|
|
Packit Service |
a04d08 |
#!/bin/bash
|
|
Packit Service |
a04d08 |
echo "test"
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
user_blob = '''
|
|
Packit Service |
a04d08 |
#cloud-config
|
|
Packit Service |
a04d08 |
vendor_data:
|
|
Packit Service |
a04d08 |
enabled: True
|
|
Packit Service |
a04d08 |
prefix: /bin/true
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
new_root = self.reRoot()
|
|
Packit Service |
a04d08 |
initer = stages.Init()
|
|
Packit Service |
a04d08 |
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
|
|
Packit Service |
a04d08 |
initer.read_cfg()
|
|
Packit Service |
a04d08 |
initer.initialize()
|
|
Packit Service |
a04d08 |
initer.fetch()
|
|
Packit Service |
a04d08 |
initer.instancify()
|
|
Packit Service |
a04d08 |
initer.update()
|
|
Packit Service |
a04d08 |
initer.cloudify().run('consume_data',
|
|
Packit Service |
a04d08 |
initer.consume_data,
|
|
Packit Service |
a04d08 |
args=[PER_INSTANCE],
|
|
Packit Service |
a04d08 |
freq=PER_INSTANCE)
|
|
Packit Service |
a04d08 |
mods = stages.Modules(initer)
|
|
Packit Service |
a04d08 |
(_which_ran, _failures) = mods.run_section('cloud_init_modules')
|
|
Packit Service |
a04d08 |
vendor_script = initer.paths.get_ipath_cur('vendor_scripts')
|
|
Packit Service |
a04d08 |
vendor_script_fns = "%s%s/part-001" % (new_root, vendor_script)
|
|
Packit Service |
a04d08 |
self.assertTrue(os.path.exists(vendor_script_fns))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_merging_cloud_config(self):
|
|
Packit Service |
a04d08 |
blob = '''
|
|
Packit Service |
a04d08 |
#cloud-config
|
|
Packit Service |
a04d08 |
a: b
|
|
Packit Service |
a04d08 |
e: f
|
|
Packit Service |
a04d08 |
run:
|
|
Packit Service |
a04d08 |
- b
|
|
Packit Service |
a04d08 |
- c
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
message1 = MIMEBase("text", "cloud-config")
|
|
Packit Service |
a04d08 |
message1.set_payload(blob)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
blob2 = '''
|
|
Packit Service |
a04d08 |
#cloud-config
|
|
Packit Service |
a04d08 |
a: e
|
|
Packit Service |
a04d08 |
e: g
|
|
Packit Service |
a04d08 |
run:
|
|
Packit Service |
a04d08 |
- stuff
|
|
Packit Service |
a04d08 |
- morestuff
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
message2 = MIMEBase("text", "cloud-config")
|
|
Packit Service |
a04d08 |
message2['X-Merge-Type'] = ('dict(recurse_array,'
|
|
Packit Service |
a04d08 |
'recurse_str)+list(append)+str(append)')
|
|
Packit Service |
a04d08 |
message2.set_payload(blob2)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
blob3 = '''
|
|
Packit Service |
a04d08 |
#cloud-config
|
|
Packit Service |
a04d08 |
e:
|
|
Packit Service |
a04d08 |
- 1
|
|
Packit Service |
a04d08 |
- 2
|
|
Packit Service |
a04d08 |
- 3
|
|
Packit Service |
a04d08 |
p: 1
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
message3 = MIMEBase("text", "cloud-config")
|
|
Packit Service |
a04d08 |
message3.set_payload(blob3)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
messages = [message1, message2, message3]
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
paths = c_helpers.Paths({}, ds=FakeDataSource(''))
|
|
Packit Service |
a04d08 |
cloud_cfg = handlers.cloud_config.CloudConfigPartHandler(paths)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
self.reRoot()
|
|
Packit Service |
a04d08 |
cloud_cfg.handle_part(None, handlers.CONTENT_START, None, None, None,
|
|
Packit Service |
a04d08 |
None)
|
|
Packit Service |
a04d08 |
for i, m in enumerate(messages):
|
|
Packit Service |
a04d08 |
headers = dict(m)
|
|
Packit Service |
a04d08 |
fn = "part-%s" % (i + 1)
|
|
Packit Service |
a04d08 |
payload = m.get_payload(decode=True)
|
|
Packit Service |
a04d08 |
cloud_cfg.handle_part(None, headers['Content-Type'],
|
|
Packit Service |
a04d08 |
fn, payload, None, headers)
|
|
Packit Service |
a04d08 |
cloud_cfg.handle_part(None, handlers.CONTENT_END, None, None, None,
|
|
Packit Service |
a04d08 |
None)
|
|
Packit Service |
a04d08 |
contents = util.load_file(paths.get_ipath('cloud_config'))
|
|
Packit Service |
a04d08 |
contents = util.load_yaml(contents)
|
|
Packit Service |
a04d08 |
self.assertEqual(contents['run'], ['b', 'c', 'stuff', 'morestuff'])
|
|
Packit Service |
a04d08 |
self.assertEqual(contents['a'], 'be')
|
|
Packit Service |
a04d08 |
self.assertEqual(contents['e'], [1, 2, 3])
|
|
Packit Service |
a04d08 |
self.assertEqual(contents['p'], 1)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_unhandled_type_warning(self):
|
|
Packit Service |
a04d08 |
"""Raw text without magic is ignored but shows warning."""
|
|
Packit Service |
a04d08 |
self.reRoot()
|
|
Packit Service |
a04d08 |
ci = stages.Init()
|
|
Packit Service |
a04d08 |
data = "arbitrary text\n"
|
|
Packit Service |
a04d08 |
ci.datasource = FakeDataSource(data)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
with mock.patch('cloudinit.util.write_file') as mockobj:
|
|
Packit Service |
a04d08 |
log_file = self.capture_log(logging.WARNING)
|
|
Packit Service |
a04d08 |
ci.fetch()
|
|
Packit Service |
a04d08 |
ci.consume_data()
|
|
Packit Service |
a04d08 |
self.assertIn(
|
|
Packit Service |
a04d08 |
"Unhandled non-multipart (text/x-not-multipart) userdata:",
|
|
Packit Service |
a04d08 |
log_file.getvalue())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
mockobj.assert_called_once_with(
|
|
Packit Service |
a04d08 |
ci.paths.get_ipath("cloud_config"), "", 0o600)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_mime_gzip_compressed(self):
|
|
Packit Service |
a04d08 |
"""Tests that individual message gzip encoding works."""
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def gzip_part(text):
|
|
Packit Service |
a04d08 |
return MIMEApplication(gzip_text(text), 'gzip')
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
base_content1 = '''
|
|
Packit Service |
a04d08 |
#cloud-config
|
|
Packit Service |
a04d08 |
a: 2
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
base_content2 = '''
|
|
Packit Service |
a04d08 |
#cloud-config
|
|
Packit Service |
a04d08 |
b: 3
|
|
Packit Service |
a04d08 |
c: 4
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
message = MIMEMultipart('test')
|
|
Packit Service |
a04d08 |
message.attach(gzip_part(base_content1))
|
|
Packit Service |
a04d08 |
message.attach(gzip_part(base_content2))
|
|
Packit Service |
a04d08 |
ci = stages.Init()
|
|
Packit Service |
a04d08 |
ci.datasource = FakeDataSource(str(message))
|
|
Packit Service |
a04d08 |
self.reRoot()
|
|
Packit Service |
a04d08 |
ci.fetch()
|
|
Packit Service |
a04d08 |
ci.consume_data()
|
|
Packit Service |
a04d08 |
contents = util.load_file(ci.paths.get_ipath("cloud_config"))
|
|
Packit Service |
a04d08 |
contents = util.load_yaml(contents)
|
|
Packit Service |
a04d08 |
self.assertTrue(isinstance(contents, dict))
|
|
Packit Service |
a04d08 |
self.assertEqual(3, len(contents))
|
|
Packit Service |
a04d08 |
self.assertEqual(2, contents['a'])
|
|
Packit Service |
a04d08 |
self.assertEqual(3, contents['b'])
|
|
Packit Service |
a04d08 |
self.assertEqual(4, contents['c'])
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_mime_text_plain(self):
|
|
Packit Service |
a04d08 |
"""Mime message of type text/plain is ignored but shows warning."""
|
|
Packit Service |
a04d08 |
self.reRoot()
|
|
Packit Service |
a04d08 |
ci = stages.Init()
|
|
Packit Service |
a04d08 |
message = MIMEBase("text", "plain")
|
|
Packit Service |
a04d08 |
message.set_payload("Just text")
|
|
Packit Service |
a04d08 |
ci.datasource = FakeDataSource(message.as_string().encode())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
with mock.patch('cloudinit.util.write_file') as mockobj:
|
|
Packit Service |
a04d08 |
log_file = self.capture_log(logging.WARNING)
|
|
Packit Service |
a04d08 |
ci.fetch()
|
|
Packit Service |
a04d08 |
ci.consume_data()
|
|
Packit Service |
a04d08 |
self.assertIn(
|
|
Packit Service |
a04d08 |
"Unhandled unknown content-type (text/plain)",
|
|
Packit Service |
a04d08 |
log_file.getvalue())
|
|
Packit Service |
a04d08 |
mockobj.assert_called_once_with(
|
|
Packit Service |
a04d08 |
ci.paths.get_ipath("cloud_config"), "", 0o600)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_shellscript(self):
|
|
Packit Service |
a04d08 |
"""Raw text starting #!/bin/sh is treated as script."""
|
|
Packit Service |
a04d08 |
self.reRoot()
|
|
Packit Service |
a04d08 |
ci = stages.Init()
|
|
Packit Service |
a04d08 |
script = "#!/bin/sh\necho hello\n"
|
|
Packit Service |
a04d08 |
ci.datasource = FakeDataSource(script)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
with mock.patch('cloudinit.util.write_file') as mockobj:
|
|
Packit Service |
a04d08 |
log_file = self.capture_log(logging.WARNING)
|
|
Packit Service |
a04d08 |
ci.fetch()
|
|
Packit Service |
a04d08 |
ci.consume_data()
|
|
Packit Service |
a04d08 |
self.assertEqual("", log_file.getvalue())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
mockobj.assert_has_calls([
|
|
Packit Service |
a04d08 |
mock.call(outpath, script, 0o700),
|
|
Packit Service |
a04d08 |
mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600)])
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_mime_text_x_shellscript(self):
|
|
Packit Service |
a04d08 |
"""Mime message of type text/x-shellscript is treated as script."""
|
|
Packit Service |
a04d08 |
self.reRoot()
|
|
Packit Service |
a04d08 |
ci = stages.Init()
|
|
Packit Service |
a04d08 |
script = "#!/bin/sh\necho hello\n"
|
|
Packit Service |
a04d08 |
message = MIMEBase("text", "x-shellscript")
|
|
Packit Service |
a04d08 |
message.set_payload(script)
|
|
Packit Service |
a04d08 |
ci.datasource = FakeDataSource(message.as_string())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
with mock.patch('cloudinit.util.write_file') as mockobj:
|
|
Packit Service |
a04d08 |
log_file = self.capture_log(logging.WARNING)
|
|
Packit Service |
a04d08 |
ci.fetch()
|
|
Packit Service |
a04d08 |
ci.consume_data()
|
|
Packit Service |
a04d08 |
self.assertEqual("", log_file.getvalue())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
mockobj.assert_has_calls([
|
|
Packit Service |
a04d08 |
mock.call(outpath, script, 0o700),
|
|
Packit Service |
a04d08 |
mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600)])
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_mime_text_plain_shell(self):
|
|
Packit Service |
a04d08 |
"""Mime type text/plain starting #!/bin/sh is treated as script."""
|
|
Packit Service |
a04d08 |
self.reRoot()
|
|
Packit Service |
a04d08 |
ci = stages.Init()
|
|
Packit Service |
a04d08 |
script = "#!/bin/sh\necho hello\n"
|
|
Packit Service |
a04d08 |
message = MIMEBase("text", "plain")
|
|
Packit Service |
a04d08 |
message.set_payload(script)
|
|
Packit Service |
a04d08 |
ci.datasource = FakeDataSource(message.as_string())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
with mock.patch('cloudinit.util.write_file') as mockobj:
|
|
Packit Service |
a04d08 |
log_file = self.capture_log(logging.WARNING)
|
|
Packit Service |
a04d08 |
ci.fetch()
|
|
Packit Service |
a04d08 |
ci.consume_data()
|
|
Packit Service |
a04d08 |
self.assertEqual("", log_file.getvalue())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
mockobj.assert_has_calls([
|
|
Packit Service |
a04d08 |
mock.call(outpath, script, 0o700),
|
|
Packit Service |
a04d08 |
mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600)])
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_mime_application_octet_stream(self):
|
|
Packit Service |
a04d08 |
"""Mime type application/octet-stream is ignored but shows warning."""
|
|
Packit Service |
a04d08 |
self.reRoot()
|
|
Packit Service |
a04d08 |
ci = stages.Init()
|
|
Packit Service |
a04d08 |
message = MIMEBase("application", "octet-stream")
|
|
Packit Service |
a04d08 |
message.set_payload(b'\xbf\xe6\xb2\xc3\xd3\xba\x13\xa4\xd8\xa1\xcc')
|
|
Packit Service |
a04d08 |
encoders.encode_base64(message)
|
|
Packit Service |
a04d08 |
ci.datasource = FakeDataSource(message.as_string().encode())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
with mock.patch('cloudinit.util.write_file') as mockobj:
|
|
Packit Service |
a04d08 |
log_file = self.capture_log(logging.WARNING)
|
|
Packit Service |
a04d08 |
ci.fetch()
|
|
Packit Service |
a04d08 |
ci.consume_data()
|
|
Packit Service |
a04d08 |
self.assertIn(
|
|
Packit Service |
a04d08 |
"Unhandled unknown content-type (application/octet-stream)",
|
|
Packit Service |
a04d08 |
log_file.getvalue())
|
|
Packit Service |
a04d08 |
mockobj.assert_called_once_with(
|
|
Packit Service |
a04d08 |
ci.paths.get_ipath("cloud_config"), "", 0o600)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_cloud_config_archive(self):
|
|
Packit Service |
a04d08 |
non_decodable = b'\x11\xc9\xb4gTH\xee\x12'
|
|
Packit Service |
a04d08 |
data = [{'content': '#cloud-config\npassword: gocubs\n'},
|
|
Packit Service |
a04d08 |
{'content': '#cloud-config\nlocale: chicago\n'},
|
|
Packit Service |
a04d08 |
{'content': non_decodable}]
|
|
Packit Service |
a04d08 |
message = b'#cloud-config-archive\n' + safeyaml.dumps(data).encode()
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
self.reRoot()
|
|
Packit Service |
a04d08 |
ci = stages.Init()
|
|
Packit Service |
a04d08 |
ci.datasource = FakeDataSource(message)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
fs = {}
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def fsstore(filename, content, mode=0o0644, omode="wb"):
|
|
Packit Service |
a04d08 |
fs[filename] = content
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
# consuming the user-data provided should write 'cloud_config' file
|
|
Packit Service |
a04d08 |
# which will have our yaml in it.
|
|
Packit Service |
a04d08 |
with mock.patch('cloudinit.util.write_file') as mockobj:
|
|
Packit Service |
a04d08 |
mockobj.side_effect = fsstore
|
|
Packit Service |
a04d08 |
ci.fetch()
|
|
Packit Service |
a04d08 |
ci.consume_data()
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
cfg = util.load_yaml(fs[ci.paths.get_ipath("cloud_config")])
|
|
Packit Service |
a04d08 |
self.assertEqual(cfg.get('password'), 'gocubs')
|
|
Packit Service |
a04d08 |
self.assertEqual(cfg.get('locale'), 'chicago')
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
@mock.patch('cloudinit.util.read_conf_with_confd')
|
|
Packit Service |
a04d08 |
def test_dont_allow_user_data(self, mock_cfg):
|
|
Packit Service |
a04d08 |
mock_cfg.return_value = {"allow_userdata": False}
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
# test that user-data is ignored but vendor-data is kept
|
|
Packit Service |
a04d08 |
user_blob = '''
|
|
Packit Service |
a04d08 |
#cloud-config-jsonp
|
|
Packit Service |
a04d08 |
[
|
|
Packit Service |
a04d08 |
{ "op": "add", "path": "/baz", "value": "qux" },
|
|
Packit Service |
a04d08 |
{ "op": "add", "path": "/bar", "value": "qux2" }
|
|
Packit Service |
a04d08 |
]
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
vendor_blob = '''
|
|
Packit Service |
a04d08 |
#cloud-config-jsonp
|
|
Packit Service |
a04d08 |
[
|
|
Packit Service |
a04d08 |
{ "op": "add", "path": "/baz", "value": "quxA" },
|
|
Packit Service |
a04d08 |
{ "op": "add", "path": "/bar", "value": "quxB" },
|
|
Packit Service |
a04d08 |
{ "op": "add", "path": "/foo", "value": "quxC" }
|
|
Packit Service |
a04d08 |
]
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
self.reRoot()
|
|
Packit Service |
a04d08 |
initer = stages.Init()
|
|
Packit Service |
a04d08 |
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
|
|
Packit Service |
a04d08 |
initer.read_cfg()
|
|
Packit Service |
a04d08 |
initer.initialize()
|
|
Packit Service |
a04d08 |
initer.fetch()
|
|
Packit Service |
a04d08 |
initer.instancify()
|
|
Packit Service |
a04d08 |
initer.update()
|
|
Packit Service |
a04d08 |
initer.cloudify().run('consume_data',
|
|
Packit Service |
a04d08 |
initer.consume_data,
|
|
Packit Service |
a04d08 |
args=[PER_INSTANCE],
|
|
Packit Service |
a04d08 |
freq=PER_INSTANCE)
|
|
Packit Service |
a04d08 |
mods = stages.Modules(initer)
|
|
Packit Service |
a04d08 |
(_which_ran, _failures) = mods.run_section('cloud_init_modules')
|
|
Packit Service |
a04d08 |
cfg = mods.cfg
|
|
Packit Service |
a04d08 |
self.assertIn('vendor_data', cfg)
|
|
Packit Service |
a04d08 |
self.assertEqual('quxA', cfg['baz'])
|
|
Packit Service |
a04d08 |
self.assertEqual('quxB', cfg['bar'])
|
|
Packit Service |
a04d08 |
self.assertEqual('quxC', cfg['foo'])
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class TestConsumeUserDataHttp(TestConsumeUserData, helpers.HttprettyTestCase):
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def setUp(self):
|
|
Packit Service |
a04d08 |
TestConsumeUserData.setUp(self)
|
|
Packit Service |
a04d08 |
helpers.HttprettyTestCase.setUp(self)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def tearDown(self):
|
|
Packit Service |
a04d08 |
TestConsumeUserData.tearDown(self)
|
|
Packit Service |
a04d08 |
helpers.HttprettyTestCase.tearDown(self)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
@mock.patch('cloudinit.url_helper.time.sleep')
|
|
Packit Service |
a04d08 |
def test_include(self, mock_sleep):
|
|
Packit Service |
a04d08 |
"""Test #include."""
|
|
Packit Service |
a04d08 |
included_url = 'http://hostname/path'
|
|
Packit Service |
a04d08 |
included_data = '#cloud-config\nincluded: true\n'
|
|
Packit Service |
a04d08 |
httpretty.register_uri(httpretty.GET, included_url, included_data)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
blob = '#include\n%s\n' % included_url
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
self.reRoot()
|
|
Packit Service |
a04d08 |
ci = stages.Init()
|
|
Packit Service |
a04d08 |
ci.datasource = FakeDataSource(blob)
|
|
Packit Service |
a04d08 |
ci.fetch()
|
|
Packit Service |
a04d08 |
ci.consume_data()
|
|
Packit Service |
a04d08 |
cc_contents = util.load_file(ci.paths.get_ipath("cloud_config"))
|
|
Packit Service |
a04d08 |
cc = util.load_yaml(cc_contents)
|
|
Packit Service |
a04d08 |
self.assertTrue(cc.get('included'))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
@mock.patch('cloudinit.url_helper.time.sleep')
|
|
Packit Service |
a04d08 |
def test_include_bad_url(self, mock_sleep):
|
|
Packit Service |
a04d08 |
"""Test #include with a bad URL."""
|
|
Packit Service |
a04d08 |
bad_url = 'http://bad/forbidden'
|
|
Packit Service |
a04d08 |
bad_data = '#cloud-config\nbad: true\n'
|
|
Packit Service |
a04d08 |
httpretty.register_uri(httpretty.GET, bad_url, bad_data, status=403)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
included_url = 'http://hostname/path'
|
|
Packit Service |
a04d08 |
included_data = '#cloud-config\nincluded: true\n'
|
|
Packit Service |
a04d08 |
httpretty.register_uri(httpretty.GET, included_url, included_data)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
blob = '#include\n%s\n%s' % (bad_url, included_url)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
self.reRoot()
|
|
Packit Service |
a04d08 |
ci = stages.Init()
|
|
Packit Service |
a04d08 |
ci.datasource = FakeDataSource(blob)
|
|
Packit Service |
9bfd13 |
ci.fetch()
|
|
Packit Service |
9bfd13 |
with self.assertRaises(Exception) as context:
|
|
Packit Service |
9bfd13 |
ci.consume_data()
|
|
Packit Service |
9bfd13 |
self.assertIn('403', str(context.exception))
|
|
Packit Service |
9bfd13 |
|
|
Packit Service |
9bfd13 |
with self.assertRaises(FileNotFoundError):
|
|
Packit Service |
9bfd13 |
util.load_file(ci.paths.get_ipath("cloud_config"))
|
|
Packit Service |
9bfd13 |
|
|
Packit Service |
9bfd13 |
@mock.patch('cloudinit.url_helper.time.sleep')
|
|
Packit Service |
9bfd13 |
@mock.patch(
|
|
Packit Service |
9bfd13 |
"cloudinit.user_data.features.ERROR_ON_USER_DATA_FAILURE", False
|
|
Packit Service |
9bfd13 |
)
|
|
Packit Service |
9bfd13 |
def test_include_bad_url_no_fail(self, mock_sleep):
|
|
Packit Service |
9bfd13 |
"""Test #include with a bad URL and failure disabled"""
|
|
Packit Service |
9bfd13 |
bad_url = 'http://bad/forbidden'
|
|
Packit Service |
9bfd13 |
bad_data = '#cloud-config\nbad: true\n'
|
|
Packit Service |
9bfd13 |
httpretty.register_uri(httpretty.GET, bad_url, bad_data, status=403)
|
|
Packit Service |
9bfd13 |
|
|
Packit Service |
9bfd13 |
included_url = 'http://hostname/path'
|
|
Packit Service |
9bfd13 |
included_data = '#cloud-config\nincluded: true\n'
|
|
Packit Service |
9bfd13 |
httpretty.register_uri(httpretty.GET, included_url, included_data)
|
|
Packit Service |
9bfd13 |
|
|
Packit Service |
9bfd13 |
blob = '#include\n%s\n%s' % (bad_url, included_url)
|
|
Packit Service |
9bfd13 |
|
|
Packit Service |
9bfd13 |
self.reRoot()
|
|
Packit Service |
9bfd13 |
ci = stages.Init()
|
|
Packit Service |
9bfd13 |
ci.datasource = FakeDataSource(blob)
|
|
Packit Service |
a04d08 |
log_file = self.capture_log(logging.WARNING)
|
|
Packit Service |
a04d08 |
ci.fetch()
|
|
Packit Service |
a04d08 |
ci.consume_data()
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
self.assertIn("403 Client Error: Forbidden for url: %s" % bad_url,
|
|
Packit Service |
a04d08 |
log_file.getvalue())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
cc_contents = util.load_file(ci.paths.get_ipath("cloud_config"))
|
|
Packit Service |
a04d08 |
cc = util.load_yaml(cc_contents)
|
|
Packit Service |
a04d08 |
self.assertIsNone(cc.get('bad'))
|
|
Packit Service |
a04d08 |
self.assertTrue(cc.get('included'))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class TestUDProcess(helpers.ResourceUsingTestCase):
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_bytes_in_userdata(self):
|
|
Packit Service |
a04d08 |
msg = b'#cloud-config\napt_update: True\n'
|
|
Packit Service |
a04d08 |
ud_proc = ud.UserDataProcessor(self.getCloudPaths())
|
|
Packit Service |
a04d08 |
message = ud_proc.process(msg)
|
|
Packit Service |
a04d08 |
self.assertTrue(count_messages(message) == 1)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_string_in_userdata(self):
|
|
Packit Service |
a04d08 |
msg = '#cloud-config\napt_update: True\n'
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
ud_proc = ud.UserDataProcessor(self.getCloudPaths())
|
|
Packit Service |
a04d08 |
message = ud_proc.process(msg)
|
|
Packit Service |
a04d08 |
self.assertTrue(count_messages(message) == 1)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_compressed_in_userdata(self):
|
|
Packit Service |
a04d08 |
msg = gzip_text('#cloud-config\napt_update: True\n')
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
ud_proc = ud.UserDataProcessor(self.getCloudPaths())
|
|
Packit Service |
a04d08 |
message = ud_proc.process(msg)
|
|
Packit Service |
a04d08 |
self.assertTrue(count_messages(message) == 1)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class TestConvertString(helpers.TestCase):
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_handles_binary_non_utf8_decodable(self):
|
|
Packit Service |
a04d08 |
"""Printable unicode (not utf8-decodable) is safely converted."""
|
|
Packit Service |
a04d08 |
blob = b'#!/bin/bash\necho \xc3\x84\n'
|
|
Packit Service |
a04d08 |
msg = ud.convert_string(blob)
|
|
Packit Service |
a04d08 |
self.assertEqual(blob, msg.get_payload(decode=True))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_handles_binary_utf8_decodable(self):
|
|
Packit Service |
a04d08 |
blob = b'\x32\x32'
|
|
Packit Service |
a04d08 |
msg = ud.convert_string(blob)
|
|
Packit Service |
a04d08 |
self.assertEqual(blob, msg.get_payload(decode=True))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_handle_headers(self):
|
|
Packit Service |
a04d08 |
text = "hi mom"
|
|
Packit Service |
a04d08 |
msg = ud.convert_string(text)
|
|
Packit Service |
a04d08 |
self.assertEqual(text, msg.get_payload(decode=False))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_handle_mime_parts(self):
|
|
Packit Service |
a04d08 |
"""Mime parts are properly returned as a mime message."""
|
|
Packit Service |
a04d08 |
message = MIMEBase("text", "plain")
|
|
Packit Service |
a04d08 |
message.set_payload("Just text")
|
|
Packit Service |
a04d08 |
msg = ud.convert_string(str(message))
|
|
Packit Service |
a04d08 |
self.assertEqual("Just text", msg.get_payload(decode=False))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class TestFetchBaseConfig(helpers.TestCase):
|
|
Packit Service |
a04d08 |
def test_only_builtin_gets_builtin(self):
|
|
Packit Service |
a04d08 |
ret = helpers.wrap_and_call(
|
|
Packit Service |
a04d08 |
'cloudinit.stages',
|
|
Packit Service |
a04d08 |
{'util.read_conf_with_confd': None,
|
|
Packit Service |
a04d08 |
'util.read_conf_from_cmdline': None,
|
|
Packit Service |
a04d08 |
'read_runtime_config': {'return_value': {}}},
|
|
Packit Service |
a04d08 |
stages.fetch_base_config)
|
|
Packit Service |
a04d08 |
self.assertEqual(util.get_builtin_cfg(), ret)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_conf_d_overrides_defaults(self):
|
|
Packit Service |
a04d08 |
builtin = util.get_builtin_cfg()
|
|
Packit Service |
a04d08 |
test_key = sorted(builtin)[0]
|
|
Packit Service |
a04d08 |
test_value = 'test'
|
|
Packit Service |
a04d08 |
ret = helpers.wrap_and_call(
|
|
Packit Service |
a04d08 |
'cloudinit.stages',
|
|
Packit Service |
a04d08 |
{'util.read_conf_with_confd':
|
|
Packit Service |
a04d08 |
{'return_value': {test_key: test_value}},
|
|
Packit Service |
a04d08 |
'util.read_conf_from_cmdline': None,
|
|
Packit Service |
a04d08 |
'read_runtime_config': {'return_value': {}}},
|
|
Packit Service |
a04d08 |
stages.fetch_base_config)
|
|
Packit Service |
a04d08 |
self.assertEqual(ret.get(test_key), test_value)
|
|
Packit Service |
a04d08 |
builtin[test_key] = test_value
|
|
Packit Service |
a04d08 |
self.assertEqual(ret, builtin)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_cmdline_overrides_defaults(self):
|
|
Packit Service |
a04d08 |
builtin = util.get_builtin_cfg()
|
|
Packit Service |
a04d08 |
test_key = sorted(builtin)[0]
|
|
Packit Service |
a04d08 |
test_value = 'test'
|
|
Packit Service |
a04d08 |
cmdline = {test_key: test_value}
|
|
Packit Service |
a04d08 |
ret = helpers.wrap_and_call(
|
|
Packit Service |
a04d08 |
'cloudinit.stages',
|
|
Packit Service |
a04d08 |
{'util.read_conf_from_cmdline': {'return_value': cmdline},
|
|
Packit Service |
a04d08 |
'util.read_conf_with_confd': None,
|
|
Packit Service |
a04d08 |
'read_runtime_config': None},
|
|
Packit Service |
a04d08 |
stages.fetch_base_config)
|
|
Packit Service |
a04d08 |
self.assertEqual(ret.get(test_key), test_value)
|
|
Packit Service |
a04d08 |
builtin[test_key] = test_value
|
|
Packit Service |
a04d08 |
self.assertEqual(ret, builtin)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_cmdline_overrides_confd_runtime_and_defaults(self):
|
|
Packit Service |
a04d08 |
builtin = {'key1': 'value0', 'key3': 'other2'}
|
|
Packit Service |
a04d08 |
conf_d = {'key1': 'value1', 'key2': 'other1'}
|
|
Packit Service |
a04d08 |
cmdline = {'key3': 'other3', 'key2': 'other2'}
|
|
Packit Service |
a04d08 |
runtime = {'key3': 'runtime3'}
|
|
Packit Service |
a04d08 |
ret = helpers.wrap_and_call(
|
|
Packit Service |
a04d08 |
'cloudinit.stages',
|
|
Packit Service |
a04d08 |
{'util.read_conf_with_confd': {'return_value': conf_d},
|
|
Packit Service |
a04d08 |
'util.get_builtin_cfg': {'return_value': builtin},
|
|
Packit Service |
a04d08 |
'read_runtime_config': {'return_value': runtime},
|
|
Packit Service |
a04d08 |
'util.read_conf_from_cmdline': {'return_value': cmdline}},
|
|
Packit Service |
a04d08 |
stages.fetch_base_config)
|
|
Packit Service |
a04d08 |
self.assertEqual(ret, {'key1': 'value1', 'key2': 'other2',
|
|
Packit Service |
a04d08 |
'key3': 'other3'})
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_order_precedence_is_builtin_system_runtime_cmdline(self):
|
|
Packit Service |
a04d08 |
builtin = {'key1': 'builtin0', 'key3': 'builtin3'}
|
|
Packit Service |
a04d08 |
conf_d = {'key1': 'confd1', 'key2': 'confd2', 'keyconfd1': 'kconfd1'}
|
|
Packit Service |
a04d08 |
runtime = {'key1': 'runtime1', 'key2': 'runtime2'}
|
|
Packit Service |
a04d08 |
cmdline = {'key1': 'cmdline1'}
|
|
Packit Service |
a04d08 |
ret = helpers.wrap_and_call(
|
|
Packit Service |
a04d08 |
'cloudinit.stages',
|
|
Packit Service |
a04d08 |
{'util.read_conf_with_confd': {'return_value': conf_d},
|
|
Packit Service |
a04d08 |
'util.get_builtin_cfg': {'return_value': builtin},
|
|
Packit Service |
a04d08 |
'util.read_conf_from_cmdline': {'return_value': cmdline},
|
|
Packit Service |
a04d08 |
'read_runtime_config': {'return_value': runtime},
|
|
Packit Service |
a04d08 |
},
|
|
Packit Service |
a04d08 |
stages.fetch_base_config)
|
|
Packit Service |
a04d08 |
self.assertEqual(ret, {'key1': 'cmdline1', 'key2': 'runtime2',
|
|
Packit Service |
a04d08 |
'key3': 'builtin3', 'keyconfd1': 'kconfd1'})
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
# vi: ts=4 expandtab
|