Blob Blame History Raw
# This file is part of cloud-init. See LICENSE file for license information.

from cloudinit.helpers import Paths
from cloudinit.sources import DataSourceIBMCloud as ibm
from cloudinit.tests import helpers as test_helpers
from cloudinit import util

import base64
import copy
import json
from textwrap import dedent

mock = test_helpers.mock

D_PATH = "cloudinit.sources.DataSourceIBMCloud."


@mock.patch(D_PATH + "_is_xen", return_value=True)
@mock.patch(D_PATH + "_is_ibm_provisioning")
@mock.patch(D_PATH + "util.blkid")
class TestGetIBMPlatform(test_helpers.CiTestCase):
    """Test the get_ibm_platform helper."""

    blkid_base = {
        "/dev/xvda1": {
            "DEVNAME": "/dev/xvda1", "LABEL": "cloudimg-bootfs",
            "TYPE": "ext3"},
        "/dev/xvda2": {
            "DEVNAME": "/dev/xvda2", "LABEL": "cloudimg-rootfs",
            "TYPE": "ext4"},
    }

    blkid_metadata_disk = {
        "/dev/xvdh1": {
            "DEVNAME": "/dev/xvdh1", "LABEL": "METADATA", "TYPE": "vfat",
            "SEC_TYPE": "msdos", "UUID": "681B-8C5D",
            "PARTUUID": "3d631e09-01"},
    }

    blkid_oscode_disk = {
        "/dev/xvdh": {
            "DEVNAME": "/dev/xvdh", "LABEL": "config-2", "TYPE": "vfat",
            "SEC_TYPE": "msdos", "UUID": ibm.IBM_CONFIG_UUID}
    }

    def setUp(self):
        self.blkid_metadata = copy.deepcopy(self.blkid_base)
        self.blkid_metadata.update(copy.deepcopy(self.blkid_metadata_disk))

        self.blkid_oscode = copy.deepcopy(self.blkid_base)
        self.blkid_oscode.update(copy.deepcopy(self.blkid_oscode_disk))

    def test_id_template_live_metadata(self, m_blkid, m_is_prov, _m_xen):
        """identify TEMPLATE_LIVE_METADATA."""
        m_blkid.return_value = self.blkid_metadata
        m_is_prov.return_value = False
        self.assertEqual(
            (ibm.Platforms.TEMPLATE_LIVE_METADATA, "/dev/xvdh1"),
            ibm.get_ibm_platform())

    def test_id_template_prov_metadata(self, m_blkid, m_is_prov, _m_xen):
        """identify TEMPLATE_PROVISIONING_METADATA."""
        m_blkid.return_value = self.blkid_metadata
        m_is_prov.return_value = True
        self.assertEqual(
            (ibm.Platforms.TEMPLATE_PROVISIONING_METADATA, "/dev/xvdh1"),
            ibm.get_ibm_platform())

    def test_id_template_prov_nodata(self, m_blkid, m_is_prov, _m_xen):
        """identify TEMPLATE_PROVISIONING_NODATA."""
        m_blkid.return_value = self.blkid_base
        m_is_prov.return_value = True
        self.assertEqual(
            (ibm.Platforms.TEMPLATE_PROVISIONING_NODATA, None),
            ibm.get_ibm_platform())

    def test_id_os_code(self, m_blkid, m_is_prov, _m_xen):
        """Identify OS_CODE."""
        m_blkid.return_value = self.blkid_oscode
        m_is_prov.return_value = False
        self.assertEqual((ibm.Platforms.OS_CODE, "/dev/xvdh"),
                         ibm.get_ibm_platform())

    def test_id_os_code_must_match_uuid(self, m_blkid, m_is_prov, _m_xen):
        """Test against false positive on openstack with non-ibm UUID."""
        blkid = self.blkid_oscode
        blkid["/dev/xvdh"]["UUID"] = "9999-9999"
        m_blkid.return_value = blkid
        m_is_prov.return_value = False
        self.assertEqual((None, None), ibm.get_ibm_platform())


@mock.patch(D_PATH + "_read_system_uuid", return_value=None)
@mock.patch(D_PATH + "get_ibm_platform")
class TestReadMD(test_helpers.CiTestCase):
    """Test the read_datasource helper."""

    template_md = {
        "files": [],
        "network_config": {"content_path": "/content/interfaces"},
        "hostname": "ci-fond-ram",
        "name": "ci-fond-ram",
        "domain": "testing.ci.cloud-init.org",
        "meta": {"dsmode": "net"},
        "uuid": "8e636730-9f5d-c4a5-327c-d7123c46e82f",
        "public_keys": {"1091307": "ssh-rsa AAAAB3NzaC1...Hw== ci-pubkey"},
    }

    oscode_md = {
        "hostname": "ci-grand-gannet.testing.ci.cloud-init.org",
        "name": "ci-grand-gannet",
        "uuid": "2f266908-8e6c-4818-9b5c-42e9cc66a785",
        "random_seed": "bm90LXJhbmRvbQo=",
        "crypt_key": "ssh-rsa AAAAB3NzaC1yc2..n6z/",
        "configuration_token": "eyJhbGciOi..M3ZA",
        "public_keys": {"1091307": "ssh-rsa AAAAB3N..Hw== ci-pubkey"},
    }

    content_interfaces = dedent("""\
        auto lo
        iface lo inet loopback

        auto eth0
        allow-hotplug eth0
        iface eth0 inet static
        address 10.82.43.5
        netmask 255.255.255.192
        """)

    userdata = b"#!/bin/sh\necho hi mom\n"
    # meta.js file gets json encoded userdata as a list.
    meta_js = '["#!/bin/sh\necho hi mom\n"]'
    vendor_data = {
        "cloud-init": "#!/bin/bash\necho 'root:$6$5ab01p1m1' | chpasswd -e"}

    network_data = {
        "links": [
            {"id": "interface_29402281", "name": "eth0", "mtu": None,
             "type": "phy", "ethernet_mac_address": "06:00:f1:bd:da:25"},
            {"id": "interface_29402279", "name": "eth1", "mtu": None,
             "type": "phy", "ethernet_mac_address": "06:98:5e:d0:7f:86"}
        ],
        "networks": [
            {"id": "network_109887563", "link": "interface_29402281",
             "type": "ipv4", "ip_address": "10.82.43.2",
             "netmask": "255.255.255.192",
             "routes": [
                 {"network": "10.0.0.0", "netmask": "255.0.0.0",
                  "gateway": "10.82.43.1"},
                 {"network": "161.26.0.0", "netmask": "255.255.0.0",
                  "gateway": "10.82.43.1"}]},
            {"id": "network_109887551", "link": "interface_29402279",
             "type": "ipv4", "ip_address": "108.168.194.252",
             "netmask": "255.255.255.248",
             "routes": [
                 {"network": "0.0.0.0", "netmask": "0.0.0.0",
                  "gateway": "108.168.194.249"}]}
        ],
        "services": [
            {"type": "dns", "address": "10.0.80.11"},
            {"type": "dns", "address": "10.0.80.12"}
        ],
    }

    sysuuid = '7f79ebf5-d791-43c3-a723-854e8389d59f'

    def _get_expected_metadata(self, os_md):
        """return expected 'metadata' for data loaded from meta_data.json."""
        os_md = copy.deepcopy(os_md)
        renames = (
            ('hostname', 'local-hostname'),
            ('uuid', 'instance-id'),
            ('public_keys', 'public-keys'))
        ret = {}
        for osname, mdname in renames:
            if osname in os_md:
                ret[mdname] = os_md[osname]
        if 'random_seed' in os_md:
            ret['random_seed'] = base64.b64decode(os_md['random_seed'])

        return ret

    def test_provisioning_md(self, m_platform, m_sysuuid):
        """Provisioning env with a metadata disk should return None."""
        m_platform.return_value = (
            ibm.Platforms.TEMPLATE_PROVISIONING_METADATA, "/dev/xvdh")
        self.assertIsNone(ibm.read_md())

    def test_provisioning_no_metadata(self, m_platform, m_sysuuid):
        """Provisioning env with no metadata disk should return None."""
        m_platform.return_value = (
            ibm.Platforms.TEMPLATE_PROVISIONING_NODATA, None)
        self.assertIsNone(ibm.read_md())

    def test_provisioning_not_ibm(self, m_platform, m_sysuuid):
        """Provisioning env but not identified as IBM should return None."""
        m_platform.return_value = (None, None)
        self.assertIsNone(ibm.read_md())

    def test_template_live(self, m_platform, m_sysuuid):
        """Template live environment should be identified."""
        tmpdir = self.tmp_dir()
        m_platform.return_value = (
            ibm.Platforms.TEMPLATE_LIVE_METADATA, tmpdir)
        m_sysuuid.return_value = self.sysuuid

        test_helpers.populate_dir(tmpdir, {
            'openstack/latest/meta_data.json': json.dumps(self.template_md),
            'openstack/latest/user_data': self.userdata,
            'openstack/content/interfaces': self.content_interfaces,
            'meta.js': self.meta_js})

        ret = ibm.read_md()
        self.assertEqual(ibm.Platforms.TEMPLATE_LIVE_METADATA,
                         ret['platform'])
        self.assertEqual(tmpdir, ret['source'])
        self.assertEqual(self.userdata, ret['userdata'])
        self.assertEqual(self._get_expected_metadata(self.template_md),
                         ret['metadata'])
        self.assertEqual(self.sysuuid, ret['system-uuid'])

    def test_os_code_live(self, m_platform, m_sysuuid):
        """Verify an os_code metadata path."""
        tmpdir = self.tmp_dir()
        m_platform.return_value = (ibm.Platforms.OS_CODE, tmpdir)
        netdata = json.dumps(self.network_data)
        test_helpers.populate_dir(tmpdir, {
            'openstack/latest/meta_data.json': json.dumps(self.oscode_md),
            'openstack/latest/user_data': self.userdata,
            'openstack/latest/vendor_data.json': json.dumps(self.vendor_data),
            'openstack/latest/network_data.json': netdata,
        })

        ret = ibm.read_md()
        self.assertEqual(ibm.Platforms.OS_CODE, ret['platform'])
        self.assertEqual(tmpdir, ret['source'])
        self.assertEqual(self.userdata, ret['userdata'])
        self.assertEqual(self._get_expected_metadata(self.oscode_md),
                         ret['metadata'])

    def test_os_code_live_no_userdata(self, m_platform, m_sysuuid):
        """Verify os_code without user-data."""
        tmpdir = self.tmp_dir()
        m_platform.return_value = (ibm.Platforms.OS_CODE, tmpdir)
        test_helpers.populate_dir(tmpdir, {
            'openstack/latest/meta_data.json': json.dumps(self.oscode_md),
            'openstack/latest/vendor_data.json': json.dumps(self.vendor_data),
        })

        ret = ibm.read_md()
        self.assertEqual(ibm.Platforms.OS_CODE, ret['platform'])
        self.assertEqual(tmpdir, ret['source'])
        self.assertIsNone(ret['userdata'])
        self.assertEqual(self._get_expected_metadata(self.oscode_md),
                         ret['metadata'])


class TestIsIBMProvisioning(test_helpers.FilesystemMockingTestCase):
    """Test the _is_ibm_provisioning method."""
    inst_log = "/root/swinstall.log"
    prov_cfg = "/root/provisioningConfiguration.cfg"
    boot_ref = "/proc/1/environ"
    with_logs = True

    def _call_with_root(self, rootd):
        self.reRoot(rootd)
        return ibm._is_ibm_provisioning()

    def test_no_config(self):
        """No provisioning config means not provisioning."""
        self.assertFalse(self._call_with_root(self.tmp_dir()))

    def test_config_only(self):
        """A provisioning config without a log means provisioning."""
        rootd = self.tmp_dir()
        test_helpers.populate_dir(rootd, {self.prov_cfg: "key=value"})
        self.assertTrue(self._call_with_root(rootd))

    def test_config_with_old_log(self):
        """A config with a log from previous boot is not provisioning."""
        rootd = self.tmp_dir()
        data = {self.prov_cfg: ("key=value\nkey2=val2\n", -10),
                self.inst_log: ("log data\n", -30),
                self.boot_ref: ("PWD=/", 0)}
        test_helpers.populate_dir_with_ts(rootd, data)
        self.assertFalse(self._call_with_root(rootd=rootd))
        self.assertIn("from previous boot", self.logs.getvalue())

    def test_config_with_new_log(self):
        """A config with a log from this boot is provisioning."""
        rootd = self.tmp_dir()
        data = {self.prov_cfg: ("key=value\nkey2=val2\n", -10),
                self.inst_log: ("log data\n", 30),
                self.boot_ref: ("PWD=/", 0)}
        test_helpers.populate_dir_with_ts(rootd, data)
        self.assertTrue(self._call_with_root(rootd=rootd))
        self.assertIn("from current boot", self.logs.getvalue())

    def test_config_and_log_no_reference(self):
        """If the config and log existed, but no reference, assume not."""
        rootd = self.tmp_dir()
        test_helpers.populate_dir(
            rootd, {self.prov_cfg: "key=value", self.inst_log: "log data\n"})
        self.assertFalse(self._call_with_root(rootd=rootd))
        self.assertIn("no reference file", self.logs.getvalue())


class TestDataSourceIBMCloud(test_helpers.CiTestCase):

    def setUp(self):
        super(TestDataSourceIBMCloud, self).setUp()
        self.tmp = self.tmp_dir()
        self.cloud_dir = self.tmp_path('cloud', dir=self.tmp)
        util.ensure_dir(self.cloud_dir)
        paths = Paths({'run_dir': self.tmp, 'cloud_dir': self.cloud_dir})
        self.ds = ibm.DataSourceIBMCloud(
            sys_cfg={}, distro=None, paths=paths)

    def test_get_data_false(self):
        """When read_md returns None, get_data returns False."""
        with mock.patch(D_PATH + 'read_md', return_value=None):
            self.assertFalse(self.ds.get_data())

    def test_get_data_processes_read_md(self):
        """get_data processes and caches content returned by read_md."""
        md = {
            'metadata': {}, 'networkdata': 'net', 'platform': 'plat',
            'source': 'src', 'system-uuid': 'uuid', 'userdata': 'ud',
            'vendordata': 'vd'}
        with mock.patch(D_PATH + 'read_md', return_value=md):
            self.assertTrue(self.ds.get_data())
        self.assertEqual('src', self.ds.source)
        self.assertEqual('plat', self.ds.platform)
        self.assertEqual({}, self.ds.metadata)
        self.assertEqual('ud', self.ds.userdata_raw)
        self.assertEqual('net', self.ds.network_json)
        self.assertEqual('vd', self.ds.vendordata_pure)
        self.assertEqual('uuid', self.ds.system_uuid)
        self.assertEqual('ibmcloud', self.ds.cloud_name)
        self.assertEqual('ibmcloud', self.ds.platform_type)
        self.assertEqual('plat (src)', self.ds.subplatform)

# vi: ts=4 expandtab