# This file is part of cloud-init. See LICENSE file for license information.
from cloudinit.helpers import Paths
from cloudinit.sources import DataSourceIBMCloud as ibm
from cloudinit.tests import helpers as test_helpers
from cloudinit import util
import base64
import copy
import json
from textwrap import dedent
mock = test_helpers.mock
D_PATH = "cloudinit.sources.DataSourceIBMCloud."
@mock.patch(D_PATH + "_is_xen", return_value=True)
@mock.patch(D_PATH + "_is_ibm_provisioning")
@mock.patch(D_PATH + "util.blkid")
class TestGetIBMPlatform(test_helpers.CiTestCase):
"""Test the get_ibm_platform helper."""
blkid_base = {
"/dev/xvda1": {
"DEVNAME": "/dev/xvda1", "LABEL": "cloudimg-bootfs",
"TYPE": "ext3"},
"/dev/xvda2": {
"DEVNAME": "/dev/xvda2", "LABEL": "cloudimg-rootfs",
"TYPE": "ext4"},
}
blkid_metadata_disk = {
"/dev/xvdh1": {
"DEVNAME": "/dev/xvdh1", "LABEL": "METADATA", "TYPE": "vfat",
"SEC_TYPE": "msdos", "UUID": "681B-8C5D",
"PARTUUID": "3d631e09-01"},
}
blkid_oscode_disk = {
"/dev/xvdh": {
"DEVNAME": "/dev/xvdh", "LABEL": "config-2", "TYPE": "vfat",
"SEC_TYPE": "msdos", "UUID": ibm.IBM_CONFIG_UUID}
}
def setUp(self):
self.blkid_metadata = copy.deepcopy(self.blkid_base)
self.blkid_metadata.update(copy.deepcopy(self.blkid_metadata_disk))
self.blkid_oscode = copy.deepcopy(self.blkid_base)
self.blkid_oscode.update(copy.deepcopy(self.blkid_oscode_disk))
def test_id_template_live_metadata(self, m_blkid, m_is_prov, _m_xen):
"""identify TEMPLATE_LIVE_METADATA."""
m_blkid.return_value = self.blkid_metadata
m_is_prov.return_value = False
self.assertEqual(
(ibm.Platforms.TEMPLATE_LIVE_METADATA, "/dev/xvdh1"),
ibm.get_ibm_platform())
def test_id_template_prov_metadata(self, m_blkid, m_is_prov, _m_xen):
"""identify TEMPLATE_PROVISIONING_METADATA."""
m_blkid.return_value = self.blkid_metadata
m_is_prov.return_value = True
self.assertEqual(
(ibm.Platforms.TEMPLATE_PROVISIONING_METADATA, "/dev/xvdh1"),
ibm.get_ibm_platform())
def test_id_template_prov_nodata(self, m_blkid, m_is_prov, _m_xen):
"""identify TEMPLATE_PROVISIONING_NODATA."""
m_blkid.return_value = self.blkid_base
m_is_prov.return_value = True
self.assertEqual(
(ibm.Platforms.TEMPLATE_PROVISIONING_NODATA, None),
ibm.get_ibm_platform())
def test_id_os_code(self, m_blkid, m_is_prov, _m_xen):
"""Identify OS_CODE."""
m_blkid.return_value = self.blkid_oscode
m_is_prov.return_value = False
self.assertEqual((ibm.Platforms.OS_CODE, "/dev/xvdh"),
ibm.get_ibm_platform())
def test_id_os_code_must_match_uuid(self, m_blkid, m_is_prov, _m_xen):
"""Test against false positive on openstack with non-ibm UUID."""
blkid = self.blkid_oscode
blkid["/dev/xvdh"]["UUID"] = "9999-9999"
m_blkid.return_value = blkid
m_is_prov.return_value = False
self.assertEqual((None, None), ibm.get_ibm_platform())
@mock.patch(D_PATH + "_read_system_uuid", return_value=None)
@mock.patch(D_PATH + "get_ibm_platform")
class TestReadMD(test_helpers.CiTestCase):
"""Test the read_datasource helper."""
template_md = {
"files": [],
"network_config": {"content_path": "/content/interfaces"},
"hostname": "ci-fond-ram",
"name": "ci-fond-ram",
"domain": "testing.ci.cloud-init.org",
"meta": {"dsmode": "net"},
"uuid": "8e636730-9f5d-c4a5-327c-d7123c46e82f",
"public_keys": {"1091307": "ssh-rsa AAAAB3NzaC1...Hw== ci-pubkey"},
}
oscode_md = {
"hostname": "ci-grand-gannet.testing.ci.cloud-init.org",
"name": "ci-grand-gannet",
"uuid": "2f266908-8e6c-4818-9b5c-42e9cc66a785",
"random_seed": "bm90LXJhbmRvbQo=",
"crypt_key": "ssh-rsa AAAAB3NzaC1yc2..n6z/",
"configuration_token": "eyJhbGciOi..M3ZA",
"public_keys": {"1091307": "ssh-rsa AAAAB3N..Hw== ci-pubkey"},
}
content_interfaces = dedent("""\
auto lo
iface lo inet loopback
auto eth0
allow-hotplug eth0
iface eth0 inet static
address 10.82.43.5
netmask 255.255.255.192
""")
userdata = b"#!/bin/sh\necho hi mom\n"
# meta.js file gets json encoded userdata as a list.
meta_js = '["#!/bin/sh\necho hi mom\n"]'
vendor_data = {
"cloud-init": "#!/bin/bash\necho 'root:$6$5ab01p1m1' | chpasswd -e"}
network_data = {
"links": [
{"id": "interface_29402281", "name": "eth0", "mtu": None,
"type": "phy", "ethernet_mac_address": "06:00:f1:bd:da:25"},
{"id": "interface_29402279", "name": "eth1", "mtu": None,
"type": "phy", "ethernet_mac_address": "06:98:5e:d0:7f:86"}
],
"networks": [
{"id": "network_109887563", "link": "interface_29402281",
"type": "ipv4", "ip_address": "10.82.43.2",
"netmask": "255.255.255.192",
"routes": [
{"network": "10.0.0.0", "netmask": "255.0.0.0",
"gateway": "10.82.43.1"},
{"network": "161.26.0.0", "netmask": "255.255.0.0",
"gateway": "10.82.43.1"}]},
{"id": "network_109887551", "link": "interface_29402279",
"type": "ipv4", "ip_address": "108.168.194.252",
"netmask": "255.255.255.248",
"routes": [
{"network": "0.0.0.0", "netmask": "0.0.0.0",
"gateway": "108.168.194.249"}]}
],
"services": [
{"type": "dns", "address": "10.0.80.11"},
{"type": "dns", "address": "10.0.80.12"}
],
}
sysuuid = '7f79ebf5-d791-43c3-a723-854e8389d59f'
def _get_expected_metadata(self, os_md):
"""return expected 'metadata' for data loaded from meta_data.json."""
os_md = copy.deepcopy(os_md)
renames = (
('hostname', 'local-hostname'),
('uuid', 'instance-id'),
('public_keys', 'public-keys'))
ret = {}
for osname, mdname in renames:
if osname in os_md:
ret[mdname] = os_md[osname]
if 'random_seed' in os_md:
ret['random_seed'] = base64.b64decode(os_md['random_seed'])
return ret
def test_provisioning_md(self, m_platform, m_sysuuid):
"""Provisioning env with a metadata disk should return None."""
m_platform.return_value = (
ibm.Platforms.TEMPLATE_PROVISIONING_METADATA, "/dev/xvdh")
self.assertIsNone(ibm.read_md())
def test_provisioning_no_metadata(self, m_platform, m_sysuuid):
"""Provisioning env with no metadata disk should return None."""
m_platform.return_value = (
ibm.Platforms.TEMPLATE_PROVISIONING_NODATA, None)
self.assertIsNone(ibm.read_md())
def test_provisioning_not_ibm(self, m_platform, m_sysuuid):
"""Provisioning env but not identified as IBM should return None."""
m_platform.return_value = (None, None)
self.assertIsNone(ibm.read_md())
def test_template_live(self, m_platform, m_sysuuid):
"""Template live environment should be identified."""
tmpdir = self.tmp_dir()
m_platform.return_value = (
ibm.Platforms.TEMPLATE_LIVE_METADATA, tmpdir)
m_sysuuid.return_value = self.sysuuid
test_helpers.populate_dir(tmpdir, {
'openstack/latest/meta_data.json': json.dumps(self.template_md),
'openstack/latest/user_data': self.userdata,
'openstack/content/interfaces': self.content_interfaces,
'meta.js': self.meta_js})
ret = ibm.read_md()
self.assertEqual(ibm.Platforms.TEMPLATE_LIVE_METADATA,
ret['platform'])
self.assertEqual(tmpdir, ret['source'])
self.assertEqual(self.userdata, ret['userdata'])
self.assertEqual(self._get_expected_metadata(self.template_md),
ret['metadata'])
self.assertEqual(self.sysuuid, ret['system-uuid'])
def test_os_code_live(self, m_platform, m_sysuuid):
"""Verify an os_code metadata path."""
tmpdir = self.tmp_dir()
m_platform.return_value = (ibm.Platforms.OS_CODE, tmpdir)
netdata = json.dumps(self.network_data)
test_helpers.populate_dir(tmpdir, {
'openstack/latest/meta_data.json': json.dumps(self.oscode_md),
'openstack/latest/user_data': self.userdata,
'openstack/latest/vendor_data.json': json.dumps(self.vendor_data),
'openstack/latest/network_data.json': netdata,
})
ret = ibm.read_md()
self.assertEqual(ibm.Platforms.OS_CODE, ret['platform'])
self.assertEqual(tmpdir, ret['source'])
self.assertEqual(self.userdata, ret['userdata'])
self.assertEqual(self._get_expected_metadata(self.oscode_md),
ret['metadata'])
def test_os_code_live_no_userdata(self, m_platform, m_sysuuid):
"""Verify os_code without user-data."""
tmpdir = self.tmp_dir()
m_platform.return_value = (ibm.Platforms.OS_CODE, tmpdir)
test_helpers.populate_dir(tmpdir, {
'openstack/latest/meta_data.json': json.dumps(self.oscode_md),
'openstack/latest/vendor_data.json': json.dumps(self.vendor_data),
})
ret = ibm.read_md()
self.assertEqual(ibm.Platforms.OS_CODE, ret['platform'])
self.assertEqual(tmpdir, ret['source'])
self.assertIsNone(ret['userdata'])
self.assertEqual(self._get_expected_metadata(self.oscode_md),
ret['metadata'])
class TestIsIBMProvisioning(test_helpers.FilesystemMockingTestCase):
"""Test the _is_ibm_provisioning method."""
inst_log = "/root/swinstall.log"
prov_cfg = "/root/provisioningConfiguration.cfg"
boot_ref = "/proc/1/environ"
with_logs = True
def _call_with_root(self, rootd):
self.reRoot(rootd)
return ibm._is_ibm_provisioning()
def test_no_config(self):
"""No provisioning config means not provisioning."""
self.assertFalse(self._call_with_root(self.tmp_dir()))
def test_config_only(self):
"""A provisioning config without a log means provisioning."""
rootd = self.tmp_dir()
test_helpers.populate_dir(rootd, {self.prov_cfg: "key=value"})
self.assertTrue(self._call_with_root(rootd))
def test_config_with_old_log(self):
"""A config with a log from previous boot is not provisioning."""
rootd = self.tmp_dir()
data = {self.prov_cfg: ("key=value\nkey2=val2\n", -10),
self.inst_log: ("log data\n", -30),
self.boot_ref: ("PWD=/", 0)}
test_helpers.populate_dir_with_ts(rootd, data)
self.assertFalse(self._call_with_root(rootd=rootd))
self.assertIn("from previous boot", self.logs.getvalue())
def test_config_with_new_log(self):
"""A config with a log from this boot is provisioning."""
rootd = self.tmp_dir()
data = {self.prov_cfg: ("key=value\nkey2=val2\n", -10),
self.inst_log: ("log data\n", 30),
self.boot_ref: ("PWD=/", 0)}
test_helpers.populate_dir_with_ts(rootd, data)
self.assertTrue(self._call_with_root(rootd=rootd))
self.assertIn("from current boot", self.logs.getvalue())
def test_config_and_log_no_reference(self):
"""If the config and log existed, but no reference, assume not."""
rootd = self.tmp_dir()
test_helpers.populate_dir(
rootd, {self.prov_cfg: "key=value", self.inst_log: "log data\n"})
self.assertFalse(self._call_with_root(rootd=rootd))
self.assertIn("no reference file", self.logs.getvalue())
class TestDataSourceIBMCloud(test_helpers.CiTestCase):
def setUp(self):
super(TestDataSourceIBMCloud, self).setUp()
self.tmp = self.tmp_dir()
self.cloud_dir = self.tmp_path('cloud', dir=self.tmp)
util.ensure_dir(self.cloud_dir)
paths = Paths({'run_dir': self.tmp, 'cloud_dir': self.cloud_dir})
self.ds = ibm.DataSourceIBMCloud(
sys_cfg={}, distro=None, paths=paths)
def test_get_data_false(self):
"""When read_md returns None, get_data returns False."""
with mock.patch(D_PATH + 'read_md', return_value=None):
self.assertFalse(self.ds.get_data())
def test_get_data_processes_read_md(self):
"""get_data processes and caches content returned by read_md."""
md = {
'metadata': {}, 'networkdata': 'net', 'platform': 'plat',
'source': 'src', 'system-uuid': 'uuid', 'userdata': 'ud',
'vendordata': 'vd'}
with mock.patch(D_PATH + 'read_md', return_value=md):
self.assertTrue(self.ds.get_data())
self.assertEqual('src', self.ds.source)
self.assertEqual('plat', self.ds.platform)
self.assertEqual({}, self.ds.metadata)
self.assertEqual('ud', self.ds.userdata_raw)
self.assertEqual('net', self.ds.network_json)
self.assertEqual('vd', self.ds.vendordata_pure)
self.assertEqual('uuid', self.ds.system_uuid)
self.assertEqual('ibmcloud', self.ds.cloud_name)
self.assertEqual('ibmcloud', self.ds.platform_type)
self.assertEqual('plat (src)', self.ds.subplatform)
# vi: ts=4 expandtab