# This file is part of cloud-init. See LICENSE file for license information.
from cloudinit import helpers
from cloudinit.sources.DataSourceNoCloud import (
DataSourceNoCloud as dsNoCloud,
_maybe_remove_top_network,
parse_cmdline_data)
from cloudinit import util
from cloudinit.tests.helpers import CiTestCase, populate_dir, mock, ExitStack
import os
import textwrap
import yaml
@mock.patch('cloudinit.sources.DataSourceNoCloud.util.is_lxd')
class TestNoCloudDataSource(CiTestCase):
def setUp(self):
super(TestNoCloudDataSource, self).setUp()
self.tmp = self.tmp_dir()
self.paths = helpers.Paths(
{'cloud_dir': self.tmp, 'run_dir': self.tmp})
self.cmdline = "root=TESTCMDLINE"
self.mocks = ExitStack()
self.addCleanup(self.mocks.close)
self.mocks.enter_context(
mock.patch.object(util, 'get_cmdline', return_value=self.cmdline))
self.mocks.enter_context(
mock.patch.object(util, 'read_dmi_data', return_value=None))
def _test_fs_config_is_read(self, fs_label, fs_label_to_search):
vfat_device = 'device-1'
def m_mount_cb(device, callback, mtype):
if (device == vfat_device):
return {'meta-data': yaml.dump({'instance-id': 'IID'})}
else:
return {}
def m_find_devs_with(query='', path=''):
if 'TYPE=vfat' == query:
return [vfat_device]
elif 'LABEL={}'.format(fs_label) == query:
return [vfat_device]
else:
return []
self.mocks.enter_context(
mock.patch.object(util, 'find_devs_with',
side_effect=m_find_devs_with))
self.mocks.enter_context(
mock.patch.object(util, 'mount_cb',
side_effect=m_mount_cb))
sys_cfg = {'datasource': {'NoCloud': {'fs_label': fs_label_to_search}}}
dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
ret = dsrc.get_data()
self.assertEqual(dsrc.metadata.get('instance-id'), 'IID')
self.assertTrue(ret)
def test_nocloud_seed_dir_on_lxd(self, m_is_lxd):
md = {'instance-id': 'IID', 'dsmode': 'local'}
ud = b"USER_DATA_HERE"
seed_dir = os.path.join(self.paths.seed_dir, "nocloud")
populate_dir(seed_dir,
{'user-data': ud, 'meta-data': yaml.safe_dump(md)})
sys_cfg = {
'datasource': {'NoCloud': {'fs_label': None}}
}
dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
ret = dsrc.get_data()
self.assertEqual(dsrc.userdata_raw, ud)
self.assertEqual(dsrc.metadata, md)
self.assertEqual(dsrc.platform_type, 'lxd')
self.assertEqual(
dsrc.subplatform, 'seed-dir (%s)' % seed_dir)
self.assertTrue(ret)
def test_nocloud_seed_dir_non_lxd_platform_is_nocloud(self, m_is_lxd):
"""Non-lxd environments will list nocloud as the platform."""
m_is_lxd.return_value = False
md = {'instance-id': 'IID', 'dsmode': 'local'}
seed_dir = os.path.join(self.paths.seed_dir, "nocloud")
populate_dir(seed_dir,
{'user-data': '', 'meta-data': yaml.safe_dump(md)})
sys_cfg = {
'datasource': {'NoCloud': {'fs_label': None}}
}
dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
self.assertTrue(dsrc.get_data())
self.assertEqual(dsrc.platform_type, 'nocloud')
self.assertEqual(
dsrc.subplatform, 'seed-dir (%s)' % seed_dir)
def test_fs_label(self, m_is_lxd):
# find_devs_with should not be called ff fs_label is None
class PsuedoException(Exception):
pass
self.mocks.enter_context(
mock.patch.object(util, 'find_devs_with',
side_effect=PsuedoException))
# by default, NoCloud should search for filesystems by label
sys_cfg = {'datasource': {'NoCloud': {}}}
dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
self.assertRaises(PsuedoException, dsrc.get_data)
# but disabling searching should just end up with None found
sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
ret = dsrc.get_data()
self.assertFalse(ret)
def test_fs_config_lowercase_label(self, m_is_lxd):
self._test_fs_config_is_read('cidata', 'cidata')
def test_fs_config_uppercase_label(self, m_is_lxd):
self._test_fs_config_is_read('CIDATA', 'cidata')
def test_fs_config_lowercase_label_search_uppercase(self, m_is_lxd):
self._test_fs_config_is_read('cidata', 'CIDATA')
def test_fs_config_uppercase_label_search_uppercase(self, m_is_lxd):
self._test_fs_config_is_read('CIDATA', 'CIDATA')
def test_no_datasource_expected(self, m_is_lxd):
# no source should be found if no cmdline, config, and fs_label=None
sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
self.assertFalse(dsrc.get_data())
def test_seed_in_config(self, m_is_lxd):
data = {
'fs_label': None,
'meta-data': yaml.safe_dump({'instance-id': 'IID'}),
'user-data': b"USER_DATA_RAW",
}
sys_cfg = {'datasource': {'NoCloud': data}}
dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
ret = dsrc.get_data()
self.assertEqual(dsrc.userdata_raw, b"USER_DATA_RAW")
self.assertEqual(dsrc.metadata.get('instance-id'), 'IID')
self.assertTrue(ret)
def test_nocloud_seed_with_vendordata(self, m_is_lxd):
md = {'instance-id': 'IID', 'dsmode': 'local'}
ud = b"USER_DATA_HERE"
vd = b"THIS IS MY VENDOR_DATA"
populate_dir(os.path.join(self.paths.seed_dir, "nocloud"),
{'user-data': ud, 'meta-data': yaml.safe_dump(md),
'vendor-data': vd})
sys_cfg = {
'datasource': {'NoCloud': {'fs_label': None}}
}
dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
ret = dsrc.get_data()
self.assertEqual(dsrc.userdata_raw, ud)
self.assertEqual(dsrc.metadata, md)
self.assertEqual(dsrc.vendordata_raw, vd)
self.assertTrue(ret)
def test_nocloud_no_vendordata(self, m_is_lxd):
populate_dir(os.path.join(self.paths.seed_dir, "nocloud"),
{'user-data': b"ud", 'meta-data': "instance-id: IID\n"})
sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
ret = dsrc.get_data()
self.assertEqual(dsrc.userdata_raw, b"ud")
self.assertFalse(dsrc.vendordata)
self.assertTrue(ret)
def test_metadata_network_interfaces(self, m_is_lxd):
gateway = "103.225.10.1"
md = {
'instance-id': 'i-abcd',
'local-hostname': 'hostname1',
'network-interfaces': textwrap.dedent("""\
auto eth0
iface eth0 inet static
hwaddr 00:16:3e:70:e1:04
address 103.225.10.12
netmask 255.255.255.0
gateway """ + gateway + """
dns-servers 8.8.8.8""")}
populate_dir(
os.path.join(self.paths.seed_dir, "nocloud"),
{'user-data': b"ud",
'meta-data': yaml.dump(md) + "\n"})
sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
ret = dsrc.get_data()
self.assertTrue(ret)
# very simple check just for the strings above
self.assertIn(gateway, str(dsrc.network_config))
def test_metadata_network_config(self, m_is_lxd):
# network-config needs to get into network_config
netconf = {'version': 1,
'config': [{'type': 'physical', 'name': 'interface0',
'subnets': [{'type': 'dhcp'}]}]}
populate_dir(
os.path.join(self.paths.seed_dir, "nocloud"),
{'user-data': b"ud",
'meta-data': "instance-id: IID\n",
'network-config': yaml.dump(netconf) + "\n"})
sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertEqual(netconf, dsrc.network_config)
def test_metadata_network_config_with_toplevel_network(self, m_is_lxd):
"""network-config may have 'network' top level key."""
netconf = {'config': 'disabled'}
populate_dir(
os.path.join(self.paths.seed_dir, "nocloud"),
{'user-data': b"ud",
'meta-data': "instance-id: IID\n",
'network-config': yaml.dump({'network': netconf}) + "\n"})
sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertEqual(netconf, dsrc.network_config)
def test_metadata_network_config_over_interfaces(self, m_is_lxd):
# network-config should override meta-data/network-interfaces
gateway = "103.225.10.1"
md = {
'instance-id': 'i-abcd',
'local-hostname': 'hostname1',
'network-interfaces': textwrap.dedent("""\
auto eth0
iface eth0 inet static
hwaddr 00:16:3e:70:e1:04
address 103.225.10.12
netmask 255.255.255.0
gateway """ + gateway + """
dns-servers 8.8.8.8""")}
netconf = {'version': 1,
'config': [{'type': 'physical', 'name': 'interface0',
'subnets': [{'type': 'dhcp'}]}]}
populate_dir(
os.path.join(self.paths.seed_dir, "nocloud"),
{'user-data': b"ud",
'meta-data': yaml.dump(md) + "\n",
'network-config': yaml.dump(netconf) + "\n"})
sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertEqual(netconf, dsrc.network_config)
self.assertNotIn(gateway, str(dsrc.network_config))
@mock.patch("cloudinit.util.blkid")
def test_nocloud_get_devices_freebsd(self, m_is_lxd, fake_blkid):
populate_dir(os.path.join(self.paths.seed_dir, "nocloud"),
{'user-data': b"ud", 'meta-data': "instance-id: IID\n"})
sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
self.mocks.enter_context(
mock.patch.object(util, 'is_FreeBSD', return_value=True))
def _mfind_devs_with_freebsd(
criteria=None, oformat='device',
tag=None, no_cache=False, path=None):
if not criteria:
return ["/dev/msdosfs/foo", "/dev/iso9660/foo"]
if criteria.startswith("LABEL="):
return ["/dev/msdosfs/foo", "/dev/iso9660/foo"]
elif criteria == "TYPE=vfat":
return ["/dev/msdosfs/foo"]
elif criteria == "TYPE=iso9660":
return ["/dev/iso9660/foo"]
return []
self.mocks.enter_context(
mock.patch.object(
util, 'find_devs_with_freebsd',
side_effect=_mfind_devs_with_freebsd))
dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
ret = dsrc._get_devices('foo')
self.assertEqual(['/dev/msdosfs/foo', '/dev/iso9660/foo'], ret)
fake_blkid.assert_not_called()
class TestParseCommandLineData(CiTestCase):
def test_parse_cmdline_data_valid(self):
ds_id = "ds=nocloud"
pairs = (
("root=/dev/sda1 %(ds_id)s", {}),
("%(ds_id)s; root=/dev/foo", {}),
("%(ds_id)s", {}),
("%(ds_id)s;", {}),
("%(ds_id)s;s=SEED", {'seedfrom': 'SEED'}),
("%(ds_id)s;seedfrom=SEED;local-hostname=xhost",
{'seedfrom': 'SEED', 'local-hostname': 'xhost'}),
("%(ds_id)s;h=xhost",
{'local-hostname': 'xhost'}),
("%(ds_id)s;h=xhost;i=IID",
{'local-hostname': 'xhost', 'instance-id': 'IID'}),
)
for (fmt, expected) in pairs:
fill = {}
cmdline = fmt % {'ds_id': ds_id}
ret = parse_cmdline_data(ds_id=ds_id, fill=fill, cmdline=cmdline)
self.assertEqual(expected, fill)
self.assertTrue(ret)
def test_parse_cmdline_data_none(self):
ds_id = "ds=foo"
cmdlines = (
"root=/dev/sda1 ro",
"console=/dev/ttyS0 root=/dev/foo",
"",
"ds=foocloud",
"ds=foo-net",
"ds=nocloud;s=SEED",
)
for cmdline in cmdlines:
fill = {}
ret = parse_cmdline_data(ds_id=ds_id, fill=fill, cmdline=cmdline)
self.assertEqual(fill, {})
self.assertFalse(ret)
class TestMaybeRemoveToplevelNetwork(CiTestCase):
"""test _maybe_remove_top_network function."""
basecfg = [{'type': 'physical', 'name': 'interface0',
'subnets': [{'type': 'dhcp'}]}]
def test_should_remove_safely(self):
mcfg = {'config': self.basecfg, 'version': 1}
self.assertEqual(mcfg, _maybe_remove_top_network({'network': mcfg}))
def test_no_remove_if_other_keys(self):
"""should not shift if other keys at top level."""
mcfg = {'network': {'config': self.basecfg, 'version': 1},
'unknown_keyname': 'keyval'}
self.assertEqual(mcfg, _maybe_remove_top_network(mcfg))
def test_no_remove_if_non_dict(self):
"""should not shift if not a dict."""
mcfg = {'network': '"content here'}
self.assertEqual(mcfg, _maybe_remove_top_network(mcfg))
def test_no_remove_if_missing_config_or_version(self):
"""should not shift unless network entry has config and version."""
mcfg = {'network': {'config': self.basecfg}}
self.assertEqual(mcfg, _maybe_remove_top_network(mcfg))
mcfg = {'network': {'version': 1}}
self.assertEqual(mcfg, _maybe_remove_top_network(mcfg))
def test_remove_with_config_disabled(self):
"""network/config=disabled should be shifted."""
mcfg = {'config': 'disabled'}
self.assertEqual(mcfg, _maybe_remove_top_network({'network': mcfg}))
# vi: ts=4 expandtab