Blob Blame History Raw
# This file is part of cloud-init. See LICENSE file for license information.

from cloudinit import helpers
from cloudinit.sources.DataSourceNoCloud import (
    DataSourceNoCloud as dsNoCloud,
    _maybe_remove_top_network,
    parse_cmdline_data)
from cloudinit import util
from cloudinit.tests.helpers import CiTestCase, populate_dir, mock, ExitStack

import os
import textwrap
import yaml


@mock.patch('cloudinit.sources.DataSourceNoCloud.util.is_lxd')
class TestNoCloudDataSource(CiTestCase):

    def setUp(self):
        super(TestNoCloudDataSource, self).setUp()
        self.tmp = self.tmp_dir()
        self.paths = helpers.Paths(
            {'cloud_dir': self.tmp, 'run_dir': self.tmp})

        self.cmdline = "root=TESTCMDLINE"

        self.mocks = ExitStack()
        self.addCleanup(self.mocks.close)

        self.mocks.enter_context(
            mock.patch.object(util, 'get_cmdline', return_value=self.cmdline))
        self.mocks.enter_context(
            mock.patch.object(util, 'read_dmi_data', return_value=None))

    def _test_fs_config_is_read(self, fs_label, fs_label_to_search):
        vfat_device = 'device-1'

        def m_mount_cb(device, callback, mtype):
            if (device == vfat_device):
                return {'meta-data': yaml.dump({'instance-id': 'IID'})}
            else:
                return {}

        def m_find_devs_with(query='', path=''):
            if 'TYPE=vfat' == query:
                return [vfat_device]
            elif 'LABEL={}'.format(fs_label) == query:
                return [vfat_device]
            else:
                return []

        self.mocks.enter_context(
            mock.patch.object(util, 'find_devs_with',
                              side_effect=m_find_devs_with))
        self.mocks.enter_context(
            mock.patch.object(util, 'mount_cb',
                              side_effect=m_mount_cb))
        sys_cfg = {'datasource': {'NoCloud': {'fs_label': fs_label_to_search}}}
        dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
        ret = dsrc.get_data()

        self.assertEqual(dsrc.metadata.get('instance-id'), 'IID')
        self.assertTrue(ret)

    def test_nocloud_seed_dir_on_lxd(self, m_is_lxd):
        md = {'instance-id': 'IID', 'dsmode': 'local'}
        ud = b"USER_DATA_HERE"
        seed_dir = os.path.join(self.paths.seed_dir, "nocloud")
        populate_dir(seed_dir,
                     {'user-data': ud, 'meta-data': yaml.safe_dump(md)})

        sys_cfg = {
            'datasource': {'NoCloud': {'fs_label': None}}
        }

        dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
        ret = dsrc.get_data()
        self.assertEqual(dsrc.userdata_raw, ud)
        self.assertEqual(dsrc.metadata, md)
        self.assertEqual(dsrc.platform_type, 'lxd')
        self.assertEqual(
            dsrc.subplatform, 'seed-dir (%s)' % seed_dir)
        self.assertTrue(ret)

    def test_nocloud_seed_dir_non_lxd_platform_is_nocloud(self, m_is_lxd):
        """Non-lxd environments will list nocloud as the platform."""
        m_is_lxd.return_value = False
        md = {'instance-id': 'IID', 'dsmode': 'local'}
        seed_dir = os.path.join(self.paths.seed_dir, "nocloud")
        populate_dir(seed_dir,
                     {'user-data': '', 'meta-data': yaml.safe_dump(md)})

        sys_cfg = {
            'datasource': {'NoCloud': {'fs_label': None}}
        }

        dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
        self.assertTrue(dsrc.get_data())
        self.assertEqual(dsrc.platform_type, 'nocloud')
        self.assertEqual(
            dsrc.subplatform, 'seed-dir (%s)' % seed_dir)

    def test_fs_label(self, m_is_lxd):
        # find_devs_with should not be called ff fs_label is None
        class PsuedoException(Exception):
            pass

        self.mocks.enter_context(
            mock.patch.object(util, 'find_devs_with',
                              side_effect=PsuedoException))

        # by default, NoCloud should search for filesystems by label
        sys_cfg = {'datasource': {'NoCloud': {}}}
        dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
        self.assertRaises(PsuedoException, dsrc.get_data)

        # but disabling searching should just end up with None found
        sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
        dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
        ret = dsrc.get_data()
        self.assertFalse(ret)

    def test_fs_config_lowercase_label(self, m_is_lxd):
        self._test_fs_config_is_read('cidata', 'cidata')

    def test_fs_config_uppercase_label(self, m_is_lxd):
        self._test_fs_config_is_read('CIDATA', 'cidata')

    def test_fs_config_lowercase_label_search_uppercase(self, m_is_lxd):
        self._test_fs_config_is_read('cidata', 'CIDATA')

    def test_fs_config_uppercase_label_search_uppercase(self, m_is_lxd):
        self._test_fs_config_is_read('CIDATA', 'CIDATA')

    def test_no_datasource_expected(self, m_is_lxd):
        # no source should be found if no cmdline, config, and fs_label=None
        sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}

        dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
        self.assertFalse(dsrc.get_data())

    def test_seed_in_config(self, m_is_lxd):
        data = {
            'fs_label': None,
            'meta-data': yaml.safe_dump({'instance-id': 'IID'}),
            'user-data': b"USER_DATA_RAW",
        }

        sys_cfg = {'datasource': {'NoCloud': data}}
        dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
        ret = dsrc.get_data()
        self.assertEqual(dsrc.userdata_raw, b"USER_DATA_RAW")
        self.assertEqual(dsrc.metadata.get('instance-id'), 'IID')
        self.assertTrue(ret)

    def test_nocloud_seed_with_vendordata(self, m_is_lxd):
        md = {'instance-id': 'IID', 'dsmode': 'local'}
        ud = b"USER_DATA_HERE"
        vd = b"THIS IS MY VENDOR_DATA"

        populate_dir(os.path.join(self.paths.seed_dir, "nocloud"),
                     {'user-data': ud, 'meta-data': yaml.safe_dump(md),
                      'vendor-data': vd})

        sys_cfg = {
            'datasource': {'NoCloud': {'fs_label': None}}
        }

        dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
        ret = dsrc.get_data()
        self.assertEqual(dsrc.userdata_raw, ud)
        self.assertEqual(dsrc.metadata, md)
        self.assertEqual(dsrc.vendordata_raw, vd)
        self.assertTrue(ret)

    def test_nocloud_no_vendordata(self, m_is_lxd):
        populate_dir(os.path.join(self.paths.seed_dir, "nocloud"),
                     {'user-data': b"ud", 'meta-data': "instance-id: IID\n"})

        sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}

        dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
        ret = dsrc.get_data()
        self.assertEqual(dsrc.userdata_raw, b"ud")
        self.assertFalse(dsrc.vendordata)
        self.assertTrue(ret)

    def test_metadata_network_interfaces(self, m_is_lxd):
        gateway = "103.225.10.1"
        md = {
            'instance-id': 'i-abcd',
            'local-hostname': 'hostname1',
            'network-interfaces': textwrap.dedent("""\
                auto eth0
                iface eth0 inet static
                hwaddr 00:16:3e:70:e1:04
                address 103.225.10.12
                netmask 255.255.255.0
                gateway """ + gateway + """
                dns-servers 8.8.8.8""")}

        populate_dir(
            os.path.join(self.paths.seed_dir, "nocloud"),
            {'user-data': b"ud",
             'meta-data': yaml.dump(md) + "\n"})

        sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}

        dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
        ret = dsrc.get_data()
        self.assertTrue(ret)
        # very simple check just for the strings above
        self.assertIn(gateway, str(dsrc.network_config))

    def test_metadata_network_config(self, m_is_lxd):
        # network-config needs to get into network_config
        netconf = {'version': 1,
                   'config': [{'type': 'physical', 'name': 'interface0',
                               'subnets': [{'type': 'dhcp'}]}]}
        populate_dir(
            os.path.join(self.paths.seed_dir, "nocloud"),
            {'user-data': b"ud",
             'meta-data': "instance-id: IID\n",
             'network-config': yaml.dump(netconf) + "\n"})

        sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}

        dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
        ret = dsrc.get_data()
        self.assertTrue(ret)
        self.assertEqual(netconf, dsrc.network_config)

    def test_metadata_network_config_with_toplevel_network(self, m_is_lxd):
        """network-config may have 'network' top level key."""
        netconf = {'config': 'disabled'}
        populate_dir(
            os.path.join(self.paths.seed_dir, "nocloud"),
            {'user-data': b"ud",
             'meta-data': "instance-id: IID\n",
             'network-config': yaml.dump({'network': netconf}) + "\n"})

        sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}

        dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
        ret = dsrc.get_data()
        self.assertTrue(ret)
        self.assertEqual(netconf, dsrc.network_config)

    def test_metadata_network_config_over_interfaces(self, m_is_lxd):
        # network-config should override meta-data/network-interfaces
        gateway = "103.225.10.1"
        md = {
            'instance-id': 'i-abcd',
            'local-hostname': 'hostname1',
            'network-interfaces': textwrap.dedent("""\
                auto eth0
                iface eth0 inet static
                hwaddr 00:16:3e:70:e1:04
                address 103.225.10.12
                netmask 255.255.255.0
                gateway """ + gateway + """
                dns-servers 8.8.8.8""")}

        netconf = {'version': 1,
                   'config': [{'type': 'physical', 'name': 'interface0',
                               'subnets': [{'type': 'dhcp'}]}]}
        populate_dir(
            os.path.join(self.paths.seed_dir, "nocloud"),
            {'user-data': b"ud",
             'meta-data': yaml.dump(md) + "\n",
             'network-config': yaml.dump(netconf) + "\n"})

        sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}

        dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
        ret = dsrc.get_data()
        self.assertTrue(ret)
        self.assertEqual(netconf, dsrc.network_config)
        self.assertNotIn(gateway, str(dsrc.network_config))

    @mock.patch("cloudinit.util.blkid")
    def test_nocloud_get_devices_freebsd(self, m_is_lxd, fake_blkid):
        populate_dir(os.path.join(self.paths.seed_dir, "nocloud"),
                     {'user-data': b"ud", 'meta-data': "instance-id: IID\n"})

        sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}

        self.mocks.enter_context(
            mock.patch.object(util, 'is_FreeBSD', return_value=True))

        def _mfind_devs_with_freebsd(
                criteria=None, oformat='device',
                tag=None, no_cache=False, path=None):
            if not criteria:
                return ["/dev/msdosfs/foo", "/dev/iso9660/foo"]
            if criteria.startswith("LABEL="):
                return ["/dev/msdosfs/foo", "/dev/iso9660/foo"]
            elif criteria == "TYPE=vfat":
                return ["/dev/msdosfs/foo"]
            elif criteria == "TYPE=iso9660":
                return ["/dev/iso9660/foo"]
            return []

        self.mocks.enter_context(
            mock.patch.object(
                util, 'find_devs_with_freebsd',
                side_effect=_mfind_devs_with_freebsd))

        dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
        ret = dsrc._get_devices('foo')
        self.assertEqual(['/dev/msdosfs/foo', '/dev/iso9660/foo'], ret)
        fake_blkid.assert_not_called()


class TestParseCommandLineData(CiTestCase):

    def test_parse_cmdline_data_valid(self):
        ds_id = "ds=nocloud"
        pairs = (
            ("root=/dev/sda1 %(ds_id)s", {}),
            ("%(ds_id)s; root=/dev/foo", {}),
            ("%(ds_id)s", {}),
            ("%(ds_id)s;", {}),
            ("%(ds_id)s;s=SEED", {'seedfrom': 'SEED'}),
            ("%(ds_id)s;seedfrom=SEED;local-hostname=xhost",
             {'seedfrom': 'SEED', 'local-hostname': 'xhost'}),
            ("%(ds_id)s;h=xhost",
             {'local-hostname': 'xhost'}),
            ("%(ds_id)s;h=xhost;i=IID",
             {'local-hostname': 'xhost', 'instance-id': 'IID'}),
        )

        for (fmt, expected) in pairs:
            fill = {}
            cmdline = fmt % {'ds_id': ds_id}
            ret = parse_cmdline_data(ds_id=ds_id, fill=fill, cmdline=cmdline)
            self.assertEqual(expected, fill)
            self.assertTrue(ret)

    def test_parse_cmdline_data_none(self):
        ds_id = "ds=foo"
        cmdlines = (
            "root=/dev/sda1 ro",
            "console=/dev/ttyS0 root=/dev/foo",
            "",
            "ds=foocloud",
            "ds=foo-net",
            "ds=nocloud;s=SEED",
        )

        for cmdline in cmdlines:
            fill = {}
            ret = parse_cmdline_data(ds_id=ds_id, fill=fill, cmdline=cmdline)
            self.assertEqual(fill, {})
            self.assertFalse(ret)


class TestMaybeRemoveToplevelNetwork(CiTestCase):
    """test _maybe_remove_top_network function."""
    basecfg = [{'type': 'physical', 'name': 'interface0',
                'subnets': [{'type': 'dhcp'}]}]

    def test_should_remove_safely(self):
        mcfg = {'config': self.basecfg, 'version': 1}
        self.assertEqual(mcfg, _maybe_remove_top_network({'network': mcfg}))

    def test_no_remove_if_other_keys(self):
        """should not shift if other keys at top level."""
        mcfg = {'network': {'config': self.basecfg, 'version': 1},
                'unknown_keyname': 'keyval'}
        self.assertEqual(mcfg, _maybe_remove_top_network(mcfg))

    def test_no_remove_if_non_dict(self):
        """should not shift if not a dict."""
        mcfg = {'network': '"content here'}
        self.assertEqual(mcfg, _maybe_remove_top_network(mcfg))

    def test_no_remove_if_missing_config_or_version(self):
        """should not shift unless network entry has config and version."""
        mcfg = {'network': {'config': self.basecfg}}
        self.assertEqual(mcfg, _maybe_remove_top_network(mcfg))

        mcfg = {'network': {'version': 1}}
        self.assertEqual(mcfg, _maybe_remove_top_network(mcfg))

    def test_remove_with_config_disabled(self):
        """network/config=disabled should be shifted."""
        mcfg = {'config': 'disabled'}
        self.assertEqual(mcfg, _maybe_remove_top_network({'network': mcfg}))


# vi: ts=4 expandtab