# This file is part of cloud-init. See LICENSE file for license information.
from copy import copy
import os
import shutil
import tempfile
import yaml
from unittest import mock
from cloudinit.sources import DataSourceMAAS
from cloudinit import url_helper
from cloudinit.tests.helpers import CiTestCase, populate_dir
class TestMAASDataSource(CiTestCase):
def setUp(self):
super(TestMAASDataSource, self).setUp()
# Make a temp directoy for tests to use.
self.tmp = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tmp)
def test_seed_dir_valid(self):
"""Verify a valid seeddir is read as such."""
userdata = b'valid01-userdata'
data = {'meta-data/instance-id': 'i-valid01',
'meta-data/local-hostname': 'valid01-hostname',
'user-data': userdata,
'public-keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname'}
my_d = os.path.join(self.tmp, "valid")
populate_dir(my_d, data)
ud, md, vd = DataSourceMAAS.read_maas_seed_dir(my_d)
self.assertEqual(userdata, ud)
for key in ('instance-id', 'local-hostname'):
self.assertEqual(data["meta-data/" + key], md[key])
# verify that 'userdata' is not returned as part of the metadata
self.assertFalse(('user-data' in md))
self.assertIsNone(vd)
def test_seed_dir_valid_extra(self):
"""Verify extra files do not affect seed_dir validity."""
userdata = b'valid-extra-userdata'
data = {'meta-data/instance-id': 'i-valid-extra',
'meta-data/local-hostname': 'valid-extra-hostname',
'user-data': userdata, 'foo': 'bar'}
my_d = os.path.join(self.tmp, "valid_extra")
populate_dir(my_d, data)
ud, md, _vd = DataSourceMAAS.read_maas_seed_dir(my_d)
self.assertEqual(userdata, ud)
for key in ('instance-id', 'local-hostname'):
self.assertEqual(data['meta-data/' + key], md[key])
# additional files should not just appear as keys in metadata atm
self.assertFalse(('foo' in md))
def test_seed_dir_invalid(self):
"""Verify that invalid seed_dir raises MAASSeedDirMalformed."""
valid = {'instance-id': 'i-instanceid',
'local-hostname': 'test-hostname', 'user-data': ''}
my_based = os.path.join(self.tmp, "valid_extra")
# missing 'userdata' file
my_d = "%s-01" % my_based
invalid_data = copy(valid)
del invalid_data['local-hostname']
populate_dir(my_d, invalid_data)
self.assertRaises(DataSourceMAAS.MAASSeedDirMalformed,
DataSourceMAAS.read_maas_seed_dir, my_d)
# missing 'instance-id'
my_d = "%s-02" % my_based
invalid_data = copy(valid)
del invalid_data['instance-id']
populate_dir(my_d, invalid_data)
self.assertRaises(DataSourceMAAS.MAASSeedDirMalformed,
DataSourceMAAS.read_maas_seed_dir, my_d)
def test_seed_dir_none(self):
"""Verify that empty seed_dir raises MAASSeedDirNone."""
my_d = os.path.join(self.tmp, "valid_empty")
self.assertRaises(DataSourceMAAS.MAASSeedDirNone,
DataSourceMAAS.read_maas_seed_dir, my_d)
def test_seed_dir_missing(self):
"""Verify that missing seed_dir raises MAASSeedDirNone."""
self.assertRaises(DataSourceMAAS.MAASSeedDirNone,
DataSourceMAAS.read_maas_seed_dir,
os.path.join(self.tmp, "nonexistantdirectory"))
def mock_read_maas_seed_url(self, data, seed, version="19991231"):
"""mock up readurl to appear as a web server at seed has provided data.
return what read_maas_seed_url returns."""
def my_readurl(*args, **kwargs):
if len(args):
url = args[0]
else:
url = kwargs['url']
prefix = "%s/%s/" % (seed, version)
if not url.startswith(prefix):
raise ValueError("unexpected call %s" % url)
short = url[len(prefix):]
if short not in data:
raise url_helper.UrlError("not found", code=404, url=url)
return url_helper.StringResponse(data[short])
# Now do the actual call of the code under test.
with mock.patch("cloudinit.url_helper.readurl") as mock_readurl:
mock_readurl.side_effect = my_readurl
return DataSourceMAAS.read_maas_seed_url(seed, version=version)
def test_seed_url_valid(self):
"""Verify that valid seed_url is read as such."""
valid = {
'meta-data/instance-id': 'i-instanceid',
'meta-data/local-hostname': 'test-hostname',
'meta-data/public-keys': 'test-hostname',
'meta-data/vendor-data': b'my-vendordata',
'user-data': b'foodata',
}
my_seed = "http://example.com/xmeta"
my_ver = "1999-99-99"
ud, md, vd = self.mock_read_maas_seed_url(valid, my_seed, my_ver)
self.assertEqual(valid['meta-data/instance-id'], md['instance-id'])
self.assertEqual(
valid['meta-data/local-hostname'], md['local-hostname'])
self.assertEqual(valid['meta-data/public-keys'], md['public-keys'])
self.assertEqual(valid['user-data'], ud)
# vendor-data is yaml, which decodes a string
self.assertEqual(valid['meta-data/vendor-data'].decode(), vd)
def test_seed_url_vendor_data_dict(self):
expected_vd = {'key1': 'value1'}
valid = {
'meta-data/instance-id': 'i-instanceid',
'meta-data/local-hostname': 'test-hostname',
'meta-data/vendor-data': yaml.safe_dump(expected_vd).encode(),
}
_ud, md, vd = self.mock_read_maas_seed_url(
valid, "http://example.com/foo")
self.assertEqual(valid['meta-data/instance-id'], md['instance-id'])
self.assertEqual(expected_vd, vd)
@mock.patch("cloudinit.sources.DataSourceMAAS.url_helper.OauthUrlHelper")
class TestGetOauthHelper(CiTestCase):
base_cfg = {'consumer_key': 'FAKE_CONSUMER_KEY',
'token_key': 'FAKE_TOKEN_KEY',
'token_secret': 'FAKE_TOKEN_SECRET',
'consumer_secret': None}
def test_all_required(self, m_helper):
"""Valid config as expected."""
DataSourceMAAS.get_oauth_helper(self.base_cfg.copy())
m_helper.assert_has_calls([mock.call(**self.base_cfg)])
def test_other_fields_not_passed_through(self, m_helper):
"""Only relevant fields are passed through."""
mycfg = self.base_cfg.copy()
mycfg['unrelated_field'] = 'unrelated'
DataSourceMAAS.get_oauth_helper(mycfg)
m_helper.assert_has_calls([mock.call(**self.base_cfg)])
class TestGetIdHash(CiTestCase):
v1_cfg = {'consumer_key': 'CKEY', 'token_key': 'TKEY',
'token_secret': 'TSEC'}
v1_id = (
'v1:'
'403ee5f19c956507f1d0e50814119c405902137ea4f8838bde167c5da8110392')
def test_v1_expected(self):
"""Test v1 id generated as expected working behavior from config."""
result = DataSourceMAAS.get_id_from_ds_cfg(self.v1_cfg.copy())
self.assertEqual(self.v1_id, result)
def test_v1_extra_fields_are_ignored(self):
"""Test v1 id ignores unused entries in config."""
cfg = self.v1_cfg.copy()
cfg['consumer_secret'] = "BOO"
cfg['unrelated'] = "HI MOM"
result = DataSourceMAAS.get_id_from_ds_cfg(cfg)
self.assertEqual(self.v1_id, result)
# vi: ts=4 expandtab