|
Packit Service |
a04d08 |
# This file is part of cloud-init. See LICENSE file for license information.
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
9bfd13 |
import io
|
|
Packit Service |
a04d08 |
import logging
|
|
Packit Service |
a04d08 |
import os
|
|
Packit Service |
a04d08 |
import re
|
|
Packit Service |
a04d08 |
import shutil
|
|
Packit Service |
a04d08 |
import stat
|
|
Packit Service |
a04d08 |
import tempfile
|
|
Packit Service |
9bfd13 |
import pytest
|
|
Packit Service |
a04d08 |
import yaml
|
|
Packit Service |
9bfd13 |
from unittest import mock
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
9bfd13 |
from cloudinit import subp
|
|
Packit Service |
a04d08 |
from cloudinit import importer, util
|
|
Packit Service |
a04d08 |
from cloudinit.tests import helpers
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class FakeSelinux(object):
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def __init__(self, match_what):
|
|
Packit Service |
a04d08 |
self.match_what = match_what
|
|
Packit Service |
a04d08 |
self.restored = []
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def matchpathcon(self, path, mode):
|
|
Packit Service |
a04d08 |
if path == self.match_what:
|
|
Packit Service |
a04d08 |
return
|
|
Packit Service |
a04d08 |
else:
|
|
Packit Service |
a04d08 |
raise OSError("No match!")
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def is_selinux_enabled(self):
|
|
Packit Service |
a04d08 |
return True
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def restorecon(self, path, recursive):
|
|
Packit Service |
a04d08 |
self.restored.append(path)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class TestGetCfgOptionListOrStr(helpers.TestCase):
|
|
Packit Service |
a04d08 |
def test_not_found_no_default(self):
|
|
Packit Service |
a04d08 |
"""None is returned if key is not found and no default given."""
|
|
Packit Service |
a04d08 |
config = {}
|
|
Packit Service |
a04d08 |
result = util.get_cfg_option_list(config, "key")
|
|
Packit Service |
a04d08 |
self.assertIsNone(result)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_not_found_with_default(self):
|
|
Packit Service |
a04d08 |
"""Default is returned if key is not found."""
|
|
Packit Service |
a04d08 |
config = {}
|
|
Packit Service |
a04d08 |
result = util.get_cfg_option_list(config, "key", default=["DEFAULT"])
|
|
Packit Service |
a04d08 |
self.assertEqual(["DEFAULT"], result)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_found_with_default(self):
|
|
Packit Service |
a04d08 |
"""Default is not returned if key is found."""
|
|
Packit Service |
a04d08 |
config = {"key": ["value1"]}
|
|
Packit Service |
a04d08 |
result = util.get_cfg_option_list(config, "key", default=["DEFAULT"])
|
|
Packit Service |
a04d08 |
self.assertEqual(["value1"], result)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_found_convert_to_list(self):
|
|
Packit Service |
a04d08 |
"""Single string is converted to one element list."""
|
|
Packit Service |
a04d08 |
config = {"key": "value1"}
|
|
Packit Service |
a04d08 |
result = util.get_cfg_option_list(config, "key")
|
|
Packit Service |
a04d08 |
self.assertEqual(["value1"], result)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_value_is_none(self):
|
|
Packit Service |
a04d08 |
"""If value is None empty list is returned."""
|
|
Packit Service |
a04d08 |
config = {"key": None}
|
|
Packit Service |
a04d08 |
result = util.get_cfg_option_list(config, "key")
|
|
Packit Service |
a04d08 |
self.assertEqual([], result)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class TestWriteFile(helpers.TestCase):
|
|
Packit Service |
a04d08 |
def setUp(self):
|
|
Packit Service |
a04d08 |
super(TestWriteFile, self).setUp()
|
|
Packit Service |
a04d08 |
self.tmp = tempfile.mkdtemp()
|
|
Packit Service |
a04d08 |
self.addCleanup(shutil.rmtree, self.tmp)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_basic_usage(self):
|
|
Packit Service |
a04d08 |
"""Verify basic usage with default args."""
|
|
Packit Service |
a04d08 |
path = os.path.join(self.tmp, "NewFile.txt")
|
|
Packit Service |
a04d08 |
contents = "Hey there"
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
util.write_file(path, contents)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
self.assertTrue(os.path.exists(path))
|
|
Packit Service |
a04d08 |
self.assertTrue(os.path.isfile(path))
|
|
Packit Service |
a04d08 |
with open(path) as f:
|
|
Packit Service |
a04d08 |
create_contents = f.read()
|
|
Packit Service |
a04d08 |
self.assertEqual(contents, create_contents)
|
|
Packit Service |
a04d08 |
file_stat = os.stat(path)
|
|
Packit Service |
a04d08 |
self.assertEqual(0o644, stat.S_IMODE(file_stat.st_mode))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_dir_is_created_if_required(self):
|
|
Packit Service |
a04d08 |
"""Verifiy that directories are created is required."""
|
|
Packit Service |
a04d08 |
dirname = os.path.join(self.tmp, "subdir")
|
|
Packit Service |
a04d08 |
path = os.path.join(dirname, "NewFile.txt")
|
|
Packit Service |
a04d08 |
contents = "Hey there"
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
util.write_file(path, contents)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
self.assertTrue(os.path.isdir(dirname))
|
|
Packit Service |
a04d08 |
self.assertTrue(os.path.isfile(path))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
9bfd13 |
def test_dir_is_not_created_if_ensure_dir_false(self):
|
|
Packit Service |
9bfd13 |
"""Verify directories are not created if ensure_dir_exists is False."""
|
|
Packit Service |
9bfd13 |
dirname = os.path.join(self.tmp, "subdir")
|
|
Packit Service |
9bfd13 |
path = os.path.join(dirname, "NewFile.txt")
|
|
Packit Service |
9bfd13 |
contents = "Hey there"
|
|
Packit Service |
9bfd13 |
|
|
Packit Service |
9bfd13 |
with self.assertRaises(FileNotFoundError):
|
|
Packit Service |
9bfd13 |
util.write_file(path, contents, ensure_dir_exists=False)
|
|
Packit Service |
9bfd13 |
|
|
Packit Service |
9bfd13 |
self.assertFalse(os.path.isdir(dirname))
|
|
Packit Service |
9bfd13 |
|
|
Packit Service |
a04d08 |
def test_explicit_mode(self):
|
|
Packit Service |
a04d08 |
"""Verify explicit file mode works properly."""
|
|
Packit Service |
a04d08 |
path = os.path.join(self.tmp, "NewFile.txt")
|
|
Packit Service |
a04d08 |
contents = "Hey there"
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
util.write_file(path, contents, mode=0o666)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
self.assertTrue(os.path.exists(path))
|
|
Packit Service |
a04d08 |
self.assertTrue(os.path.isfile(path))
|
|
Packit Service |
a04d08 |
file_stat = os.stat(path)
|
|
Packit Service |
a04d08 |
self.assertEqual(0o666, stat.S_IMODE(file_stat.st_mode))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
9bfd13 |
def test_preserve_mode_no_existing(self):
|
|
Packit Service |
9bfd13 |
"""Verify that file is created with mode 0o644 if preserve_mode
|
|
Packit Service |
a04d08 |
is true and there is no prior existing file."""
|
|
Packit Service |
a04d08 |
path = os.path.join(self.tmp, "NewFile.txt")
|
|
Packit Service |
a04d08 |
contents = "Hey there"
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
9bfd13 |
util.write_file(path, contents, preserve_mode=True)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
self.assertTrue(os.path.exists(path))
|
|
Packit Service |
a04d08 |
self.assertTrue(os.path.isfile(path))
|
|
Packit Service |
a04d08 |
file_stat = os.stat(path)
|
|
Packit Service |
a04d08 |
self.assertEqual(0o644, stat.S_IMODE(file_stat.st_mode))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
9bfd13 |
def test_preserve_mode_with_existing(self):
|
|
Packit Service |
a04d08 |
"""Verify that file is created using mode of existing file
|
|
Packit Service |
9bfd13 |
if preserve_mode is true."""
|
|
Packit Service |
a04d08 |
path = os.path.join(self.tmp, "NewFile.txt")
|
|
Packit Service |
a04d08 |
contents = "Hey there"
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
open(path, 'w').close()
|
|
Packit Service |
a04d08 |
os.chmod(path, 0o666)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
9bfd13 |
util.write_file(path, contents, preserve_mode=True)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
self.assertTrue(os.path.exists(path))
|
|
Packit Service |
a04d08 |
self.assertTrue(os.path.isfile(path))
|
|
Packit Service |
a04d08 |
file_stat = os.stat(path)
|
|
Packit Service |
a04d08 |
self.assertEqual(0o666, stat.S_IMODE(file_stat.st_mode))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_custom_omode(self):
|
|
Packit Service |
a04d08 |
"""Verify custom omode works properly."""
|
|
Packit Service |
a04d08 |
path = os.path.join(self.tmp, "NewFile.txt")
|
|
Packit Service |
a04d08 |
contents = "Hey there"
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
# Create file first with basic content
|
|
Packit Service |
a04d08 |
with open(path, "wb") as f:
|
|
Packit Service |
a04d08 |
f.write(b"LINE1\n")
|
|
Packit Service |
a04d08 |
util.write_file(path, contents, omode="a")
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
self.assertTrue(os.path.exists(path))
|
|
Packit Service |
a04d08 |
self.assertTrue(os.path.isfile(path))
|
|
Packit Service |
a04d08 |
with open(path) as f:
|
|
Packit Service |
a04d08 |
create_contents = f.read()
|
|
Packit Service |
a04d08 |
self.assertEqual("LINE1\nHey there", create_contents)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_restorecon_if_possible_is_called(self):
|
|
Packit Service |
a04d08 |
"""Make sure the selinux guard is called correctly."""
|
|
Packit Service |
a04d08 |
my_file = os.path.join(self.tmp, "my_file")
|
|
Packit Service |
a04d08 |
with open(my_file, "w") as fp:
|
|
Packit Service |
a04d08 |
fp.write("My Content")
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
fake_se = FakeSelinux(my_file)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
with mock.patch.object(importer, 'import_module',
|
|
Packit Service |
a04d08 |
return_value=fake_se) as mockobj:
|
|
Packit Service |
a04d08 |
with util.SeLinuxGuard(my_file) as is_on:
|
|
Packit Service |
a04d08 |
self.assertTrue(is_on)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
self.assertEqual(1, len(fake_se.restored))
|
|
Packit Service |
a04d08 |
self.assertEqual(my_file, fake_se.restored[0])
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
mockobj.assert_called_once_with('selinux')
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class TestDeleteDirContents(helpers.TestCase):
|
|
Packit Service |
a04d08 |
def setUp(self):
|
|
Packit Service |
a04d08 |
super(TestDeleteDirContents, self).setUp()
|
|
Packit Service |
a04d08 |
self.tmp = tempfile.mkdtemp()
|
|
Packit Service |
a04d08 |
self.addCleanup(shutil.rmtree, self.tmp)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def assertDirEmpty(self, dirname):
|
|
Packit Service |
a04d08 |
self.assertEqual([], os.listdir(dirname))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_does_not_delete_dir(self):
|
|
Packit Service |
a04d08 |
"""Ensure directory itself is not deleted."""
|
|
Packit Service |
a04d08 |
util.delete_dir_contents(self.tmp)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
self.assertTrue(os.path.isdir(self.tmp))
|
|
Packit Service |
a04d08 |
self.assertDirEmpty(self.tmp)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_deletes_files(self):
|
|
Packit Service |
a04d08 |
"""Single file should be deleted."""
|
|
Packit Service |
a04d08 |
with open(os.path.join(self.tmp, "new_file.txt"), "wb") as f:
|
|
Packit Service |
a04d08 |
f.write(b"DELETE ME")
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
util.delete_dir_contents(self.tmp)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
self.assertDirEmpty(self.tmp)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_deletes_empty_dirs(self):
|
|
Packit Service |
a04d08 |
"""Empty directories should be deleted."""
|
|
Packit Service |
a04d08 |
os.mkdir(os.path.join(self.tmp, "new_dir"))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
util.delete_dir_contents(self.tmp)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
self.assertDirEmpty(self.tmp)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_deletes_nested_dirs(self):
|
|
Packit Service |
a04d08 |
"""Nested directories should be deleted."""
|
|
Packit Service |
a04d08 |
os.mkdir(os.path.join(self.tmp, "new_dir"))
|
|
Packit Service |
a04d08 |
os.mkdir(os.path.join(self.tmp, "new_dir", "new_subdir"))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
util.delete_dir_contents(self.tmp)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
self.assertDirEmpty(self.tmp)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_deletes_non_empty_dirs(self):
|
|
Packit Service |
a04d08 |
"""Non-empty directories should be deleted."""
|
|
Packit Service |
a04d08 |
os.mkdir(os.path.join(self.tmp, "new_dir"))
|
|
Packit Service |
a04d08 |
f_name = os.path.join(self.tmp, "new_dir", "new_file.txt")
|
|
Packit Service |
a04d08 |
with open(f_name, "wb") as f:
|
|
Packit Service |
a04d08 |
f.write(b"DELETE ME")
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
util.delete_dir_contents(self.tmp)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
self.assertDirEmpty(self.tmp)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_deletes_symlinks(self):
|
|
Packit Service |
a04d08 |
"""Symlinks should be deleted."""
|
|
Packit Service |
a04d08 |
file_name = os.path.join(self.tmp, "new_file.txt")
|
|
Packit Service |
a04d08 |
link_name = os.path.join(self.tmp, "new_file_link.txt")
|
|
Packit Service |
a04d08 |
with open(file_name, "wb") as f:
|
|
Packit Service |
a04d08 |
f.write(b"DELETE ME")
|
|
Packit Service |
a04d08 |
os.symlink(file_name, link_name)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
util.delete_dir_contents(self.tmp)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
self.assertDirEmpty(self.tmp)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class TestKeyValStrings(helpers.TestCase):
|
|
Packit Service |
a04d08 |
def test_keyval_str_to_dict(self):
|
|
Packit Service |
a04d08 |
expected = {'1': 'one', '2': 'one+one', 'ro': True}
|
|
Packit Service |
a04d08 |
cmdline = "1=one ro 2=one+one"
|
|
Packit Service |
a04d08 |
self.assertEqual(expected, util.keyval_str_to_dict(cmdline))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class TestGetCmdline(helpers.TestCase):
|
|
Packit Service |
a04d08 |
def test_cmdline_reads_debug_env(self):
|
|
Packit Service |
a04d08 |
with mock.patch.dict("os.environ",
|
|
Packit Service |
a04d08 |
values={'DEBUG_PROC_CMDLINE': 'abcd 123'}):
|
|
Packit Service |
a04d08 |
ret = util.get_cmdline()
|
|
Packit Service |
a04d08 |
self.assertEqual("abcd 123", ret)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class TestLoadYaml(helpers.CiTestCase):
|
|
Packit Service |
a04d08 |
mydefault = "7b03a8ebace993d806255121073fed52"
|
|
Packit Service |
a04d08 |
with_logs = True
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_simple(self):
|
|
Packit Service |
a04d08 |
mydata = {'1': "one", '2': "two"}
|
|
Packit Service |
a04d08 |
self.assertEqual(util.load_yaml(yaml.dump(mydata)), mydata)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_nonallowed_returns_default(self):
|
|
Packit Service |
a04d08 |
'''Any unallowed types result in returning default; log the issue.'''
|
|
Packit Service |
a04d08 |
# for now, anything not in the allowed list just returns the default.
|
|
Packit Service |
a04d08 |
myyaml = yaml.dump({'1': "one"})
|
|
Packit Service |
a04d08 |
self.assertEqual(util.load_yaml(blob=myyaml,
|
|
Packit Service |
a04d08 |
default=self.mydefault,
|
|
Packit Service |
a04d08 |
allowed=(str,)),
|
|
Packit Service |
a04d08 |
self.mydefault)
|
|
Packit Service |
a04d08 |
regex = re.compile(
|
|
Packit Service |
a04d08 |
r'Yaml load allows \(<(class|type) \'str\'>,\) root types, but'
|
|
Packit Service |
a04d08 |
r' got dict')
|
|
Packit Service |
a04d08 |
self.assertTrue(regex.search(self.logs.getvalue()),
|
|
Packit Service |
a04d08 |
msg='Missing expected yaml load error')
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_bogus_scan_error_returns_default(self):
|
|
Packit Service |
a04d08 |
'''On Yaml scan error, load_yaml returns the default and logs issue.'''
|
|
Packit Service |
a04d08 |
badyaml = "1\n 2:"
|
|
Packit Service |
a04d08 |
self.assertEqual(util.load_yaml(blob=badyaml,
|
|
Packit Service |
a04d08 |
default=self.mydefault),
|
|
Packit Service |
a04d08 |
self.mydefault)
|
|
Packit Service |
a04d08 |
self.assertIn(
|
|
Packit Service |
a04d08 |
'Failed loading yaml blob. Invalid format at line 2 column 3:'
|
|
Packit Service |
a04d08 |
' "mapping values are not allowed here',
|
|
Packit Service |
a04d08 |
self.logs.getvalue())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_bogus_parse_error_returns_default(self):
|
|
Packit Service |
a04d08 |
'''On Yaml parse error, load_yaml returns default and logs issue.'''
|
|
Packit Service |
a04d08 |
badyaml = "{}}"
|
|
Packit Service |
a04d08 |
self.assertEqual(util.load_yaml(blob=badyaml,
|
|
Packit Service |
a04d08 |
default=self.mydefault),
|
|
Packit Service |
a04d08 |
self.mydefault)
|
|
Packit Service |
a04d08 |
self.assertIn(
|
|
Packit Service |
a04d08 |
'Failed loading yaml blob. Invalid format at line 1 column 3:'
|
|
Packit Service |
a04d08 |
" \"expected \'<document start>\', but found \'}\'",
|
|
Packit Service |
a04d08 |
self.logs.getvalue())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_unsafe_types(self):
|
|
Packit Service |
a04d08 |
# should not load complex types
|
|
Packit Service |
a04d08 |
unsafe_yaml = yaml.dump((1, 2, 3,))
|
|
Packit Service |
a04d08 |
self.assertEqual(util.load_yaml(blob=unsafe_yaml,
|
|
Packit Service |
a04d08 |
default=self.mydefault),
|
|
Packit Service |
a04d08 |
self.mydefault)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_python_unicode(self):
|
|
Packit Service |
a04d08 |
# complex type of python/unicode is explicitly allowed
|
|
Packit Service |
9bfd13 |
myobj = {'1': "FOOBAR"}
|
|
Packit Service |
a04d08 |
safe_yaml = yaml.dump(myobj)
|
|
Packit Service |
a04d08 |
self.assertEqual(util.load_yaml(blob=safe_yaml,
|
|
Packit Service |
a04d08 |
default=self.mydefault),
|
|
Packit Service |
a04d08 |
myobj)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_none_returns_default(self):
|
|
Packit Service |
a04d08 |
"""If yaml.load returns None, then default should be returned."""
|
|
Packit Service |
a04d08 |
blobs = ("", " ", "# foo\n", "#")
|
|
Packit Service |
a04d08 |
mdef = self.mydefault
|
|
Packit Service |
a04d08 |
self.assertEqual(
|
|
Packit Service |
a04d08 |
[(b, self.mydefault) for b in blobs],
|
|
Packit Service |
a04d08 |
[(b, util.load_yaml(blob=b, default=mdef)) for b in blobs])
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class TestMountinfoParsing(helpers.ResourceUsingTestCase):
|
|
Packit Service |
a04d08 |
def test_invalid_mountinfo(self):
|
|
Packit Service |
a04d08 |
line = ("20 1 252:1 / / rw,relatime - ext4 /dev/mapper/vg0-root"
|
|
Packit Service |
a04d08 |
"rw,errors=remount-ro,data=ordered")
|
|
Packit Service |
a04d08 |
elements = line.split()
|
|
Packit Service |
a04d08 |
for i in range(len(elements) + 1):
|
|
Packit Service |
a04d08 |
lines = [' '.join(elements[0:i])]
|
|
Packit Service |
a04d08 |
if i < 10:
|
|
Packit Service |
a04d08 |
expected = None
|
|
Packit Service |
a04d08 |
else:
|
|
Packit Service |
a04d08 |
expected = ('/dev/mapper/vg0-root', 'ext4', '/')
|
|
Packit Service |
a04d08 |
self.assertEqual(expected, util.parse_mount_info('/', lines))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_precise_ext4_root(self):
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
lines = helpers.readResource('mountinfo_precise_ext4.txt').splitlines()
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
expected = ('/dev/mapper/vg0-root', 'ext4', '/')
|
|
Packit Service |
a04d08 |
self.assertEqual(expected, util.parse_mount_info('/', lines))
|
|
Packit Service |
a04d08 |
self.assertEqual(expected, util.parse_mount_info('/usr', lines))
|
|
Packit Service |
a04d08 |
self.assertEqual(expected, util.parse_mount_info('/usr/bin', lines))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
expected = ('/dev/md0', 'ext4', '/boot')
|
|
Packit Service |
a04d08 |
self.assertEqual(expected, util.parse_mount_info('/boot', lines))
|
|
Packit Service |
a04d08 |
self.assertEqual(expected, util.parse_mount_info('/boot/grub', lines))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
expected = ('/dev/mapper/vg0-root', 'ext4', '/')
|
|
Packit Service |
a04d08 |
self.assertEqual(expected, util.parse_mount_info('/home', lines))
|
|
Packit Service |
a04d08 |
self.assertEqual(expected, util.parse_mount_info('/home/me', lines))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
expected = ('tmpfs', 'tmpfs', '/run')
|
|
Packit Service |
a04d08 |
self.assertEqual(expected, util.parse_mount_info('/run', lines))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
expected = ('none', 'tmpfs', '/run/lock')
|
|
Packit Service |
a04d08 |
self.assertEqual(expected, util.parse_mount_info('/run/lock', lines))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_raring_btrfs_root(self):
|
|
Packit Service |
a04d08 |
lines = helpers.readResource('mountinfo_raring_btrfs.txt').splitlines()
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
expected = ('/dev/vda1', 'btrfs', '/')
|
|
Packit Service |
a04d08 |
self.assertEqual(expected, util.parse_mount_info('/', lines))
|
|
Packit Service |
a04d08 |
self.assertEqual(expected, util.parse_mount_info('/usr', lines))
|
|
Packit Service |
a04d08 |
self.assertEqual(expected, util.parse_mount_info('/usr/bin', lines))
|
|
Packit Service |
a04d08 |
self.assertEqual(expected, util.parse_mount_info('/boot', lines))
|
|
Packit Service |
a04d08 |
self.assertEqual(expected, util.parse_mount_info('/boot/grub', lines))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
expected = ('/dev/vda1', 'btrfs', '/home')
|
|
Packit Service |
a04d08 |
self.assertEqual(expected, util.parse_mount_info('/home', lines))
|
|
Packit Service |
a04d08 |
self.assertEqual(expected, util.parse_mount_info('/home/me', lines))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
expected = ('tmpfs', 'tmpfs', '/run')
|
|
Packit Service |
a04d08 |
self.assertEqual(expected, util.parse_mount_info('/run', lines))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
expected = ('none', 'tmpfs', '/run/lock')
|
|
Packit Service |
a04d08 |
self.assertEqual(expected, util.parse_mount_info('/run/lock', lines))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
@mock.patch('cloudinit.util.os')
|
|
Packit Service |
9bfd13 |
@mock.patch('cloudinit.subp.subp')
|
|
Packit Service |
a04d08 |
def test_get_device_info_from_zpool(self, zpool_output, m_os):
|
|
Packit Service |
a04d08 |
# mock /dev/zfs exists
|
|
Packit Service |
a04d08 |
m_os.path.exists.return_value = True
|
|
Packit Service |
a04d08 |
# mock subp command from util.get_mount_info_fs_on_zpool
|
|
Packit Service |
a04d08 |
zpool_output.return_value = (
|
|
Packit Service |
a04d08 |
helpers.readResource('zpool_status_simple.txt'), ''
|
|
Packit Service |
a04d08 |
)
|
|
Packit Service |
a04d08 |
# save function return values and do asserts
|
|
Packit Service |
a04d08 |
ret = util.get_device_info_from_zpool('vmzroot')
|
|
Packit Service |
a04d08 |
self.assertEqual('gpt/system', ret)
|
|
Packit Service |
a04d08 |
self.assertIsNotNone(ret)
|
|
Packit Service |
a04d08 |
m_os.path.exists.assert_called_with('/dev/zfs')
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
@mock.patch('cloudinit.util.os')
|
|
Packit Service |
a04d08 |
def test_get_device_info_from_zpool_no_dev_zfs(self, m_os):
|
|
Packit Service |
a04d08 |
# mock /dev/zfs missing
|
|
Packit Service |
a04d08 |
m_os.path.exists.return_value = False
|
|
Packit Service |
a04d08 |
# save function return values and do asserts
|
|
Packit Service |
a04d08 |
ret = util.get_device_info_from_zpool('vmzroot')
|
|
Packit Service |
a04d08 |
self.assertIsNone(ret)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
@mock.patch('cloudinit.util.os')
|
|
Packit Service |
9bfd13 |
@mock.patch('cloudinit.subp.subp')
|
|
Packit Service |
a04d08 |
def test_get_device_info_from_zpool_handles_no_zpool(self, m_sub, m_os):
|
|
Packit Service |
a04d08 |
"""Handle case where there is no zpool command"""
|
|
Packit Service |
a04d08 |
# mock /dev/zfs exists
|
|
Packit Service |
a04d08 |
m_os.path.exists.return_value = True
|
|
Packit Service |
9bfd13 |
m_sub.side_effect = subp.ProcessExecutionError("No zpool cmd")
|
|
Packit Service |
a04d08 |
ret = util.get_device_info_from_zpool('vmzroot')
|
|
Packit Service |
a04d08 |
self.assertIsNone(ret)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
@mock.patch('cloudinit.util.os')
|
|
Packit Service |
9bfd13 |
@mock.patch('cloudinit.subp.subp')
|
|
Packit Service |
a04d08 |
def test_get_device_info_from_zpool_on_error(self, zpool_output, m_os):
|
|
Packit Service |
a04d08 |
# mock /dev/zfs exists
|
|
Packit Service |
a04d08 |
m_os.path.exists.return_value = True
|
|
Packit Service |
a04d08 |
# mock subp command from util.get_mount_info_fs_on_zpool
|
|
Packit Service |
a04d08 |
zpool_output.return_value = (
|
|
Packit Service |
a04d08 |
helpers.readResource('zpool_status_simple.txt'), 'error'
|
|
Packit Service |
a04d08 |
)
|
|
Packit Service |
a04d08 |
# save function return values and do asserts
|
|
Packit Service |
a04d08 |
ret = util.get_device_info_from_zpool('vmzroot')
|
|
Packit Service |
a04d08 |
self.assertIsNone(ret)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
9bfd13 |
@mock.patch('cloudinit.subp.subp')
|
|
Packit Service |
a04d08 |
def test_parse_mount_with_ext(self, mount_out):
|
|
Packit Service |
a04d08 |
mount_out.return_value = (
|
|
Packit Service |
a04d08 |
helpers.readResource('mount_parse_ext.txt'), '')
|
|
Packit Service |
a04d08 |
# this one is valid and exists in mount_parse_ext.txt
|
|
Packit Service |
a04d08 |
ret = util.parse_mount('/var')
|
|
Packit Service |
a04d08 |
self.assertEqual(('/dev/mapper/vg00-lv_var', 'ext4', '/var'), ret)
|
|
Packit Service |
a04d08 |
# another one that is valid and exists
|
|
Packit Service |
a04d08 |
ret = util.parse_mount('/')
|
|
Packit Service |
a04d08 |
self.assertEqual(('/dev/mapper/vg00-lv_root', 'ext4', '/'), ret)
|
|
Packit Service |
a04d08 |
# this one exists in mount_parse_ext.txt
|
|
Packit Service |
a04d08 |
ret = util.parse_mount('/sys/kernel/debug')
|
|
Packit Service |
a04d08 |
self.assertIsNone(ret)
|
|
Packit Service |
a04d08 |
# this one does not even exist in mount_parse_ext.txt
|
|
Packit Service |
a04d08 |
ret = util.parse_mount('/not/existing/mount')
|
|
Packit Service |
a04d08 |
self.assertIsNone(ret)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
9bfd13 |
@mock.patch('cloudinit.subp.subp')
|
|
Packit Service |
a04d08 |
def test_parse_mount_with_zfs(self, mount_out):
|
|
Packit Service |
a04d08 |
mount_out.return_value = (
|
|
Packit Service |
a04d08 |
helpers.readResource('mount_parse_zfs.txt'), '')
|
|
Packit Service |
a04d08 |
# this one is valid and exists in mount_parse_zfs.txt
|
|
Packit Service |
a04d08 |
ret = util.parse_mount('/var')
|
|
Packit Service |
a04d08 |
self.assertEqual(('vmzroot/ROOT/freebsd/var', 'zfs', '/var'), ret)
|
|
Packit Service |
a04d08 |
# this one is the root, valid and also exists in mount_parse_zfs.txt
|
|
Packit Service |
a04d08 |
ret = util.parse_mount('/')
|
|
Packit Service |
a04d08 |
self.assertEqual(('vmzroot/ROOT/freebsd', 'zfs', '/'), ret)
|
|
Packit Service |
a04d08 |
# this one does not even exist in mount_parse_ext.txt
|
|
Packit Service |
a04d08 |
ret = util.parse_mount('/not/existing/mount')
|
|
Packit Service |
a04d08 |
self.assertIsNone(ret)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class TestIsX86(helpers.CiTestCase):
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_is_x86_matches_x86_types(self):
|
|
Packit Service |
a04d08 |
"""is_x86 returns True if CPU architecture matches."""
|
|
Packit Service |
a04d08 |
matched_arches = ['x86_64', 'i386', 'i586', 'i686']
|
|
Packit Service |
a04d08 |
for arch in matched_arches:
|
|
Packit Service |
a04d08 |
self.assertTrue(
|
|
Packit Service |
a04d08 |
util.is_x86(arch), 'Expected is_x86 for arch "%s"' % arch)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_is_x86_unmatched_types(self):
|
|
Packit Service |
a04d08 |
"""is_x86 returns Fale on non-intel x86 architectures."""
|
|
Packit Service |
a04d08 |
unmatched_arches = ['ia64', '9000/800', 'arm64v71']
|
|
Packit Service |
a04d08 |
for arch in unmatched_arches:
|
|
Packit Service |
a04d08 |
self.assertFalse(
|
|
Packit Service |
a04d08 |
util.is_x86(arch), 'Expected not is_x86 for arch "%s"' % arch)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
@mock.patch('cloudinit.util.os.uname')
|
|
Packit Service |
a04d08 |
def test_is_x86_calls_uname_for_architecture(self, m_uname):
|
|
Packit Service |
a04d08 |
"""is_x86 returns True if platform from uname matches."""
|
|
Packit Service |
a04d08 |
m_uname.return_value = [0, 1, 2, 3, 'x86_64']
|
|
Packit Service |
a04d08 |
self.assertTrue(util.is_x86())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class TestReadDMIData(helpers.FilesystemMockingTestCase):
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def setUp(self):
|
|
Packit Service |
a04d08 |
super(TestReadDMIData, self).setUp()
|
|
Packit Service |
a04d08 |
self.new_root = tempfile.mkdtemp()
|
|
Packit Service |
a04d08 |
self.addCleanup(shutil.rmtree, self.new_root)
|
|
Packit Service |
a04d08 |
self.patchOS(self.new_root)
|
|
Packit Service |
a04d08 |
self.patchUtils(self.new_root)
|
|
Packit Service |
a04d08 |
p = mock.patch("cloudinit.util.is_container", return_value=False)
|
|
Packit Service |
a04d08 |
self.addCleanup(p.stop)
|
|
Packit Service |
a04d08 |
self._m_is_container = p.start()
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def _create_sysfs_parent_directory(self):
|
|
Packit Service |
a04d08 |
util.ensure_dir(os.path.join('sys', 'class', 'dmi', 'id'))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def _create_sysfs_file(self, key, content):
|
|
Packit Service |
a04d08 |
"""Mocks the sys path found on Linux systems."""
|
|
Packit Service |
a04d08 |
self._create_sysfs_parent_directory()
|
|
Packit Service |
a04d08 |
dmi_key = "/sys/class/dmi/id/{0}".format(key)
|
|
Packit Service |
a04d08 |
util.write_file(dmi_key, content)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def _configure_dmidecode_return(self, key, content, error=None):
|
|
Packit Service |
a04d08 |
"""
|
|
Packit Service |
a04d08 |
In order to test a missing sys path and call outs to dmidecode, this
|
|
Packit Service |
a04d08 |
function fakes the results of dmidecode to test the results.
|
|
Packit Service |
a04d08 |
"""
|
|
Packit Service |
a04d08 |
def _dmidecode_subp(cmd):
|
|
Packit Service |
a04d08 |
if cmd[-1] != key:
|
|
Packit Service |
9bfd13 |
raise subp.ProcessExecutionError()
|
|
Packit Service |
a04d08 |
return (content, error)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
self.patched_funcs.enter_context(
|
|
Packit Service |
9bfd13 |
mock.patch("cloudinit.subp.which", side_effect=lambda _: True))
|
|
Packit Service |
a04d08 |
self.patched_funcs.enter_context(
|
|
Packit Service |
9bfd13 |
mock.patch("cloudinit.subp.subp", side_effect=_dmidecode_subp))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def patch_mapping(self, new_mapping):
|
|
Packit Service |
a04d08 |
self.patched_funcs.enter_context(
|
|
Packit Service |
a04d08 |
mock.patch('cloudinit.util.DMIDECODE_TO_DMI_SYS_MAPPING',
|
|
Packit Service |
a04d08 |
new_mapping))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_sysfs_used_with_key_in_mapping_and_file_on_disk(self):
|
|
Packit Service |
a04d08 |
self.patch_mapping({'mapped-key': 'mapped-value'})
|
|
Packit Service |
a04d08 |
expected_dmi_value = 'sys-used-correctly'
|
|
Packit Service |
a04d08 |
self._create_sysfs_file('mapped-value', expected_dmi_value)
|
|
Packit Service |
a04d08 |
self._configure_dmidecode_return('mapped-key', 'wrong-wrong-wrong')
|
|
Packit Service |
a04d08 |
self.assertEqual(expected_dmi_value, util.read_dmi_data('mapped-key'))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_dmidecode_used_if_no_sysfs_file_on_disk(self):
|
|
Packit Service |
a04d08 |
self.patch_mapping({})
|
|
Packit Service |
a04d08 |
self._create_sysfs_parent_directory()
|
|
Packit Service |
a04d08 |
expected_dmi_value = 'dmidecode-used'
|
|
Packit Service |
a04d08 |
self._configure_dmidecode_return('use-dmidecode', expected_dmi_value)
|
|
Packit Service |
a04d08 |
with mock.patch("cloudinit.util.os.uname") as m_uname:
|
|
Packit Service |
a04d08 |
m_uname.return_value = ('x-sysname', 'x-nodename',
|
|
Packit Service |
a04d08 |
'x-release', 'x-version', 'x86_64')
|
|
Packit Service |
a04d08 |
self.assertEqual(expected_dmi_value,
|
|
Packit Service |
a04d08 |
util.read_dmi_data('use-dmidecode'))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_dmidecode_not_used_on_arm(self):
|
|
Packit Service |
a04d08 |
self.patch_mapping({})
|
|
Packit Service |
9bfd13 |
print("current =%s", subp)
|
|
Packit Service |
a04d08 |
self._create_sysfs_parent_directory()
|
|
Packit Service |
a04d08 |
dmi_val = 'from-dmidecode'
|
|
Packit Service |
a04d08 |
dmi_name = 'use-dmidecode'
|
|
Packit Service |
a04d08 |
self._configure_dmidecode_return(dmi_name, dmi_val)
|
|
Packit Service |
9bfd13 |
print("now =%s", subp)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
expected = {'armel': None, 'aarch64': dmi_val, 'x86_64': dmi_val}
|
|
Packit Service |
a04d08 |
found = {}
|
|
Packit Service |
a04d08 |
# we do not run the 'dmi-decode' binary on some arches
|
|
Packit Service |
a04d08 |
# verify that anything requested that is not in the sysfs dir
|
|
Packit Service |
a04d08 |
# will return None on those arches.
|
|
Packit Service |
a04d08 |
with mock.patch("cloudinit.util.os.uname") as m_uname:
|
|
Packit Service |
a04d08 |
for arch in expected:
|
|
Packit Service |
a04d08 |
m_uname.return_value = ('x-sysname', 'x-nodename',
|
|
Packit Service |
a04d08 |
'x-release', 'x-version', arch)
|
|
Packit Service |
9bfd13 |
print("now2 =%s", subp)
|
|
Packit Service |
a04d08 |
found[arch] = util.read_dmi_data(dmi_name)
|
|
Packit Service |
a04d08 |
self.assertEqual(expected, found)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_none_returned_if_neither_source_has_data(self):
|
|
Packit Service |
a04d08 |
self.patch_mapping({})
|
|
Packit Service |
a04d08 |
self._configure_dmidecode_return('key', 'value')
|
|
Packit Service |
a04d08 |
self.assertIsNone(util.read_dmi_data('expect-fail'))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_none_returned_if_dmidecode_not_in_path(self):
|
|
Packit Service |
a04d08 |
self.patched_funcs.enter_context(
|
|
Packit Service |
9bfd13 |
mock.patch.object(subp, 'which', lambda _: False))
|
|
Packit Service |
a04d08 |
self.patch_mapping({})
|
|
Packit Service |
a04d08 |
self.assertIsNone(util.read_dmi_data('expect-fail'))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_dots_returned_instead_of_foxfox(self):
|
|
Packit Service |
a04d08 |
# uninitialized dmi values show as \xff, return those as .
|
|
Packit Service |
a04d08 |
my_len = 32
|
|
Packit Service |
a04d08 |
dmi_value = b'\xff' * my_len + b'\n'
|
|
Packit Service |
a04d08 |
expected = ""
|
|
Packit Service |
a04d08 |
dmi_key = 'system-product-name'
|
|
Packit Service |
a04d08 |
sysfs_key = 'product_name'
|
|
Packit Service |
a04d08 |
self._create_sysfs_file(sysfs_key, dmi_value)
|
|
Packit Service |
a04d08 |
self.assertEqual(expected, util.read_dmi_data(dmi_key))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_container_returns_none(self):
|
|
Packit Service |
a04d08 |
"""In a container read_dmi_data should always return None."""
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
# first verify we get the value if not in container
|
|
Packit Service |
a04d08 |
self._m_is_container.return_value = False
|
|
Packit Service |
a04d08 |
key, val = ("system-product-name", "my_product")
|
|
Packit Service |
a04d08 |
self._create_sysfs_file('product_name', val)
|
|
Packit Service |
a04d08 |
self.assertEqual(val, util.read_dmi_data(key))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
# then verify in container returns None
|
|
Packit Service |
a04d08 |
self._m_is_container.return_value = True
|
|
Packit Service |
a04d08 |
self.assertIsNone(util.read_dmi_data(key))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_container_returns_none_on_unknown(self):
|
|
Packit Service |
a04d08 |
"""In a container even bogus keys return None."""
|
|
Packit Service |
a04d08 |
self._m_is_container.return_value = True
|
|
Packit Service |
a04d08 |
self._create_sysfs_file('product_name', "should-be-ignored")
|
|
Packit Service |
a04d08 |
self.assertIsNone(util.read_dmi_data("bogus"))
|
|
Packit Service |
a04d08 |
self.assertIsNone(util.read_dmi_data("system-product-name"))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class TestGetConfigLogfiles(helpers.CiTestCase):
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_empty_cfg_returns_empty_list(self):
|
|
Packit Service |
a04d08 |
"""An empty config passed to get_config_logfiles returns empty list."""
|
|
Packit Service |
a04d08 |
self.assertEqual([], util.get_config_logfiles(None))
|
|
Packit Service |
a04d08 |
self.assertEqual([], util.get_config_logfiles({}))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_default_log_file_present(self):
|
|
Packit Service |
a04d08 |
"""When default_log_file is set get_config_logfiles finds it."""
|
|
Packit Service |
a04d08 |
self.assertEqual(
|
|
Packit Service |
a04d08 |
['/my.log'],
|
|
Packit Service |
a04d08 |
util.get_config_logfiles({'def_log_file': '/my.log'}))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_output_logs_parsed_when_teeing_files(self):
|
|
Packit Service |
a04d08 |
"""When output configuration is parsed when teeing files."""
|
|
Packit Service |
a04d08 |
self.assertEqual(
|
|
Packit Service |
a04d08 |
['/himom.log', '/my.log'],
|
|
Packit Service |
a04d08 |
sorted(util.get_config_logfiles({
|
|
Packit Service |
a04d08 |
'def_log_file': '/my.log',
|
|
Packit Service |
a04d08 |
'output': {'all': '|tee -a /himom.log'}})))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_output_logs_parsed_when_redirecting(self):
|
|
Packit Service |
a04d08 |
"""When output configuration is parsed when redirecting to a file."""
|
|
Packit Service |
a04d08 |
self.assertEqual(
|
|
Packit Service |
a04d08 |
['/my.log', '/test.log'],
|
|
Packit Service |
a04d08 |
sorted(util.get_config_logfiles({
|
|
Packit Service |
a04d08 |
'def_log_file': '/my.log',
|
|
Packit Service |
a04d08 |
'output': {'all': '>/test.log'}})))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_output_logs_parsed_when_appending(self):
|
|
Packit Service |
a04d08 |
"""When output configuration is parsed when appending to a file."""
|
|
Packit Service |
a04d08 |
self.assertEqual(
|
|
Packit Service |
a04d08 |
['/my.log', '/test.log'],
|
|
Packit Service |
a04d08 |
sorted(util.get_config_logfiles({
|
|
Packit Service |
a04d08 |
'def_log_file': '/my.log',
|
|
Packit Service |
a04d08 |
'output': {'all': '>> /test.log'}})))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class TestMultiLog(helpers.FilesystemMockingTestCase):
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def _createConsole(self, root):
|
|
Packit Service |
a04d08 |
os.mkdir(os.path.join(root, 'dev'))
|
|
Packit Service |
a04d08 |
open(os.path.join(root, 'dev', 'console'), 'a').close()
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def setUp(self):
|
|
Packit Service |
a04d08 |
super(TestMultiLog, self).setUp()
|
|
Packit Service |
a04d08 |
self.root = tempfile.mkdtemp()
|
|
Packit Service |
a04d08 |
self.addCleanup(shutil.rmtree, self.root)
|
|
Packit Service |
a04d08 |
self.patchOS(self.root)
|
|
Packit Service |
a04d08 |
self.patchUtils(self.root)
|
|
Packit Service |
a04d08 |
self.patchOpen(self.root)
|
|
Packit Service |
9bfd13 |
self.stdout = io.StringIO()
|
|
Packit Service |
9bfd13 |
self.stderr = io.StringIO()
|
|
Packit Service |
a04d08 |
self.patchStdoutAndStderr(self.stdout, self.stderr)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_stderr_used_by_default(self):
|
|
Packit Service |
a04d08 |
logged_string = 'test stderr output'
|
|
Packit Service |
a04d08 |
util.multi_log(logged_string)
|
|
Packit Service |
a04d08 |
self.assertEqual(logged_string, self.stderr.getvalue())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_stderr_not_used_if_false(self):
|
|
Packit Service |
a04d08 |
util.multi_log('should not see this', stderr=False)
|
|
Packit Service |
a04d08 |
self.assertEqual('', self.stderr.getvalue())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_logs_go_to_console_by_default(self):
|
|
Packit Service |
a04d08 |
self._createConsole(self.root)
|
|
Packit Service |
a04d08 |
logged_string = 'something very important'
|
|
Packit Service |
a04d08 |
util.multi_log(logged_string)
|
|
Packit Service |
a04d08 |
self.assertEqual(logged_string, open('/dev/console').read())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_logs_dont_go_to_stdout_if_console_exists(self):
|
|
Packit Service |
a04d08 |
self._createConsole(self.root)
|
|
Packit Service |
a04d08 |
util.multi_log('something')
|
|
Packit Service |
a04d08 |
self.assertEqual('', self.stdout.getvalue())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_logs_go_to_stdout_if_console_does_not_exist(self):
|
|
Packit Service |
a04d08 |
logged_string = 'something very important'
|
|
Packit Service |
a04d08 |
util.multi_log(logged_string)
|
|
Packit Service |
a04d08 |
self.assertEqual(logged_string, self.stdout.getvalue())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_logs_go_to_log_if_given(self):
|
|
Packit Service |
a04d08 |
log = mock.MagicMock()
|
|
Packit Service |
a04d08 |
logged_string = 'something very important'
|
|
Packit Service |
a04d08 |
util.multi_log(logged_string, log=log)
|
|
Packit Service |
a04d08 |
self.assertEqual([((mock.ANY, logged_string), {})],
|
|
Packit Service |
a04d08 |
log.log.call_args_list)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_newlines_stripped_from_log_call(self):
|
|
Packit Service |
a04d08 |
log = mock.MagicMock()
|
|
Packit Service |
a04d08 |
expected_string = 'something very important'
|
|
Packit Service |
a04d08 |
util.multi_log('{0}\n'.format(expected_string), log=log)
|
|
Packit Service |
a04d08 |
self.assertEqual((mock.ANY, expected_string), log.log.call_args[0])
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_log_level_defaults_to_debug(self):
|
|
Packit Service |
a04d08 |
log = mock.MagicMock()
|
|
Packit Service |
a04d08 |
util.multi_log('message', log=log)
|
|
Packit Service |
a04d08 |
self.assertEqual((logging.DEBUG, mock.ANY), log.log.call_args[0])
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_given_log_level_used(self):
|
|
Packit Service |
a04d08 |
log = mock.MagicMock()
|
|
Packit Service |
a04d08 |
log_level = mock.Mock()
|
|
Packit Service |
a04d08 |
util.multi_log('message', log=log, log_level=log_level)
|
|
Packit Service |
a04d08 |
self.assertEqual((log_level, mock.ANY), log.log.call_args[0])
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class TestMessageFromString(helpers.TestCase):
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_unicode_not_messed_up(self):
|
|
Packit Service |
a04d08 |
roundtripped = util.message_from_string(u'\n').as_string()
|
|
Packit Service |
a04d08 |
self.assertNotIn('\x00', roundtripped)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class TestReadSeeded(helpers.TestCase):
|
|
Packit Service |
a04d08 |
def setUp(self):
|
|
Packit Service |
a04d08 |
super(TestReadSeeded, self).setUp()
|
|
Packit Service |
a04d08 |
self.tmp = tempfile.mkdtemp()
|
|
Packit Service |
a04d08 |
self.addCleanup(shutil.rmtree, self.tmp)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_unicode_not_messed_up(self):
|
|
Packit Service |
a04d08 |
ud = b"userdatablob"
|
|
Packit Service |
a04d08 |
helpers.populate_dir(
|
|
Packit Service |
a04d08 |
self.tmp, {'meta-data': "key1: val1", 'user-data': ud})
|
|
Packit Service |
a04d08 |
sdir = self.tmp + os.path.sep
|
|
Packit Service |
a04d08 |
(found_md, found_ud) = util.read_seeded(sdir)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
self.assertEqual(found_md, {'key1': 'val1'})
|
|
Packit Service |
a04d08 |
self.assertEqual(found_ud, ud)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class TestEncode(helpers.TestCase):
|
|
Packit Service |
a04d08 |
"""Test the encoding functions"""
|
|
Packit Service |
a04d08 |
def test_decode_binary_plain_text_with_hex(self):
|
|
Packit Service |
a04d08 |
blob = 'BOOTABLE_FLAG=\x80init=/bin/systemd'
|
|
Packit Service |
a04d08 |
text = util.decode_binary(blob)
|
|
Packit Service |
a04d08 |
self.assertEqual(text, blob)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class TestProcessExecutionError(helpers.TestCase):
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
template = ('{description}\n'
|
|
Packit Service |
a04d08 |
'Command: {cmd}\n'
|
|
Packit Service |
a04d08 |
'Exit code: {exit_code}\n'
|
|
Packit Service |
a04d08 |
'Reason: {reason}\n'
|
|
Packit Service |
a04d08 |
'Stdout: {stdout}\n'
|
|
Packit Service |
a04d08 |
'Stderr: {stderr}')
|
|
Packit Service |
a04d08 |
empty_attr = '-'
|
|
Packit Service |
a04d08 |
empty_description = 'Unexpected error while running command.'
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_pexec_error_indent_text(self):
|
|
Packit Service |
9bfd13 |
error = subp.ProcessExecutionError()
|
|
Packit Service |
a04d08 |
msg = 'abc\ndef'
|
|
Packit Service |
a04d08 |
formatted = 'abc\n{0}def'.format(' ' * 4)
|
|
Packit Service |
a04d08 |
self.assertEqual(error._indent_text(msg, indent_level=4), formatted)
|
|
Packit Service |
a04d08 |
self.assertEqual(error._indent_text(msg.encode(), indent_level=4),
|
|
Packit Service |
a04d08 |
formatted.encode())
|
|
Packit Service |
a04d08 |
self.assertIsInstance(
|
|
Packit Service |
a04d08 |
error._indent_text(msg.encode()), type(msg.encode()))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_pexec_error_type(self):
|
|
Packit Service |
9bfd13 |
self.assertIsInstance(subp.ProcessExecutionError(), IOError)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_pexec_error_empty_msgs(self):
|
|
Packit Service |
9bfd13 |
error = subp.ProcessExecutionError()
|
|
Packit Service |
a04d08 |
self.assertTrue(all(attr == self.empty_attr for attr in
|
|
Packit Service |
a04d08 |
(error.stderr, error.stdout, error.reason)))
|
|
Packit Service |
a04d08 |
self.assertEqual(error.description, self.empty_description)
|
|
Packit Service |
a04d08 |
self.assertEqual(str(error), self.template.format(
|
|
Packit Service |
a04d08 |
description=self.empty_description, exit_code=self.empty_attr,
|
|
Packit Service |
a04d08 |
reason=self.empty_attr, stdout=self.empty_attr,
|
|
Packit Service |
a04d08 |
stderr=self.empty_attr, cmd=self.empty_attr))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_pexec_error_single_line_msgs(self):
|
|
Packit Service |
a04d08 |
stdout_msg = 'out out'
|
|
Packit Service |
a04d08 |
stderr_msg = 'error error'
|
|
Packit Service |
a04d08 |
cmd = 'test command'
|
|
Packit Service |
a04d08 |
exit_code = 3
|
|
Packit Service |
9bfd13 |
error = subp.ProcessExecutionError(
|
|
Packit Service |
a04d08 |
stdout=stdout_msg, stderr=stderr_msg, exit_code=3, cmd=cmd)
|
|
Packit Service |
a04d08 |
self.assertEqual(str(error), self.template.format(
|
|
Packit Service |
a04d08 |
description=self.empty_description, stdout=stdout_msg,
|
|
Packit Service |
a04d08 |
stderr=stderr_msg, exit_code=str(exit_code),
|
|
Packit Service |
a04d08 |
reason=self.empty_attr, cmd=cmd))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_pexec_error_multi_line_msgs(self):
|
|
Packit Service |
a04d08 |
# make sure bytes is converted handled properly when formatting
|
|
Packit Service |
a04d08 |
stdout_msg = 'multi\nline\noutput message'.encode()
|
|
Packit Service |
a04d08 |
stderr_msg = 'multi\nline\nerror message\n\n\n'
|
|
Packit Service |
9bfd13 |
error = subp.ProcessExecutionError(
|
|
Packit Service |
a04d08 |
stdout=stdout_msg, stderr=stderr_msg)
|
|
Packit Service |
a04d08 |
self.assertEqual(
|
|
Packit Service |
a04d08 |
str(error),
|
|
Packit Service |
a04d08 |
'\n'.join((
|
|
Packit Service |
a04d08 |
'{description}',
|
|
Packit Service |
a04d08 |
'Command: {empty_attr}',
|
|
Packit Service |
a04d08 |
'Exit code: {empty_attr}',
|
|
Packit Service |
a04d08 |
'Reason: {empty_attr}',
|
|
Packit Service |
a04d08 |
'Stdout: multi',
|
|
Packit Service |
a04d08 |
' line',
|
|
Packit Service |
a04d08 |
' output message',
|
|
Packit Service |
a04d08 |
'Stderr: multi',
|
|
Packit Service |
a04d08 |
' line',
|
|
Packit Service |
a04d08 |
' error message',
|
|
Packit Service |
a04d08 |
)).format(description=self.empty_description,
|
|
Packit Service |
a04d08 |
empty_attr=self.empty_attr))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class TestSystemIsSnappy(helpers.FilesystemMockingTestCase):
|
|
Packit Service |
a04d08 |
def test_id_in_os_release_quoted(self):
|
|
Packit Service |
a04d08 |
"""os-release containing ID="ubuntu-core" is snappy."""
|
|
Packit Service |
a04d08 |
orcontent = '\n'.join(['ID="ubuntu-core"', ''])
|
|
Packit Service |
a04d08 |
root_d = self.tmp_dir()
|
|
Packit Service |
a04d08 |
helpers.populate_dir(root_d, {'etc/os-release': orcontent})
|
|
Packit Service |
a04d08 |
self.reRoot(root_d)
|
|
Packit Service |
a04d08 |
self.assertTrue(util.system_is_snappy())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_id_in_os_release(self):
|
|
Packit Service |
a04d08 |
"""os-release containing ID=ubuntu-core is snappy."""
|
|
Packit Service |
a04d08 |
orcontent = '\n'.join(['ID=ubuntu-core', ''])
|
|
Packit Service |
a04d08 |
root_d = self.tmp_dir()
|
|
Packit Service |
a04d08 |
helpers.populate_dir(root_d, {'etc/os-release': orcontent})
|
|
Packit Service |
a04d08 |
self.reRoot(root_d)
|
|
Packit Service |
a04d08 |
self.assertTrue(util.system_is_snappy())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
@mock.patch('cloudinit.util.get_cmdline')
|
|
Packit Service |
a04d08 |
def test_bad_content_in_os_release_no_effect(self, m_cmdline):
|
|
Packit Service |
a04d08 |
"""malformed os-release should not raise exception."""
|
|
Packit Service |
a04d08 |
m_cmdline.return_value = 'root=/dev/sda'
|
|
Packit Service |
a04d08 |
orcontent = '\n'.join(['IDubuntu-core', ''])
|
|
Packit Service |
a04d08 |
root_d = self.tmp_dir()
|
|
Packit Service |
a04d08 |
helpers.populate_dir(root_d, {'etc/os-release': orcontent})
|
|
Packit Service |
a04d08 |
self.reRoot()
|
|
Packit Service |
a04d08 |
self.assertFalse(util.system_is_snappy())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
@mock.patch('cloudinit.util.get_cmdline')
|
|
Packit Service |
a04d08 |
def test_snap_core_in_cmdline_is_snappy(self, m_cmdline):
|
|
Packit Service |
a04d08 |
"""The string snap_core= in kernel cmdline indicates snappy."""
|
|
Packit Service |
a04d08 |
cmdline = (
|
|
Packit Service |
a04d08 |
"BOOT_IMAGE=(loop)/kernel.img root=LABEL=writable "
|
|
Packit Service |
a04d08 |
"snap_core=core_x1.snap snap_kernel=pc-kernel_x1.snap ro "
|
|
Packit Service |
a04d08 |
"net.ifnames=0 init=/lib/systemd/systemd console=tty1 "
|
|
Packit Service |
a04d08 |
"console=ttyS0 panic=-1")
|
|
Packit Service |
a04d08 |
m_cmdline.return_value = cmdline
|
|
Packit Service |
a04d08 |
self.assertTrue(util.system_is_snappy())
|
|
Packit Service |
a04d08 |
self.assertTrue(m_cmdline.call_count > 0)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
@mock.patch('cloudinit.util.get_cmdline')
|
|
Packit Service |
a04d08 |
def test_nothing_found_is_not_snappy(self, m_cmdline):
|
|
Packit Service |
a04d08 |
"""If no positive identification, then not snappy."""
|
|
Packit Service |
a04d08 |
m_cmdline.return_value = 'root=/dev/sda'
|
|
Packit Service |
a04d08 |
self.reRoot()
|
|
Packit Service |
a04d08 |
self.assertFalse(util.system_is_snappy())
|
|
Packit Service |
a04d08 |
self.assertTrue(m_cmdline.call_count > 0)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
@mock.patch('cloudinit.util.get_cmdline')
|
|
Packit Service |
a04d08 |
def test_channel_ini_with_snappy_is_snappy(self, m_cmdline):
|
|
Packit Service |
a04d08 |
"""A Channel.ini file with 'ubuntu-core' indicates snappy."""
|
|
Packit Service |
a04d08 |
m_cmdline.return_value = 'root=/dev/sda'
|
|
Packit Service |
a04d08 |
root_d = self.tmp_dir()
|
|
Packit Service |
a04d08 |
content = '\n'.join(["[Foo]", "source = 'ubuntu-core'", ""])
|
|
Packit Service |
a04d08 |
helpers.populate_dir(
|
|
Packit Service |
a04d08 |
root_d, {'etc/system-image/channel.ini': content})
|
|
Packit Service |
a04d08 |
self.reRoot(root_d)
|
|
Packit Service |
a04d08 |
self.assertTrue(util.system_is_snappy())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
@mock.patch('cloudinit.util.get_cmdline')
|
|
Packit Service |
a04d08 |
def test_system_image_config_dir_is_snappy(self, m_cmdline):
|
|
Packit Service |
a04d08 |
"""Existence of /etc/system-image/config.d indicates snappy."""
|
|
Packit Service |
a04d08 |
m_cmdline.return_value = 'root=/dev/sda'
|
|
Packit Service |
a04d08 |
root_d = self.tmp_dir()
|
|
Packit Service |
a04d08 |
helpers.populate_dir(
|
|
Packit Service |
a04d08 |
root_d, {'etc/system-image/config.d/my.file': "_unused"})
|
|
Packit Service |
a04d08 |
self.reRoot(root_d)
|
|
Packit Service |
a04d08 |
self.assertTrue(util.system_is_snappy())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class TestLoadShellContent(helpers.TestCase):
|
|
Packit Service |
a04d08 |
def test_comments_handled_correctly(self):
|
|
Packit Service |
a04d08 |
"""Shell comments should be allowed in the content."""
|
|
Packit Service |
a04d08 |
self.assertEqual(
|
|
Packit Service |
a04d08 |
{'key1': 'val1', 'key2': 'val2', 'key3': 'val3 #tricky'},
|
|
Packit Service |
a04d08 |
util.load_shell_content('\n'.join([
|
|
Packit Service |
a04d08 |
"#top of file comment",
|
|
Packit Service |
a04d08 |
"key1=val1 #this is a comment",
|
|
Packit Service |
a04d08 |
"# second comment",
|
|
Packit Service |
a04d08 |
'key2="val2" # inlin comment'
|
|
Packit Service |
a04d08 |
'#badkey=wark',
|
|
Packit Service |
a04d08 |
'key3="val3 #tricky"',
|
|
Packit Service |
a04d08 |
''])))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class TestGetProcEnv(helpers.TestCase):
|
|
Packit Service |
a04d08 |
"""test get_proc_env."""
|
|
Packit Service |
a04d08 |
null = b'\x00'
|
|
Packit Service |
a04d08 |
simple1 = b'HOME=/'
|
|
Packit Service |
a04d08 |
simple2 = b'PATH=/bin:/sbin'
|
|
Packit Service |
a04d08 |
bootflag = b'BOOTABLE_FLAG=\x80' # from LP: #1775371
|
|
Packit Service |
a04d08 |
mixed = b'MIXED=' + b'ab\xccde'
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def _val_decoded(self, blob, encoding='utf-8', errors='replace'):
|
|
Packit Service |
a04d08 |
# return the value portion of key=val decoded.
|
|
Packit Service |
a04d08 |
return blob.split(b'=', 1)[1].decode(encoding, errors)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
@mock.patch("cloudinit.util.load_file")
|
|
Packit Service |
a04d08 |
def test_non_utf8_in_environment(self, m_load_file):
|
|
Packit Service |
a04d08 |
"""env may have non utf-8 decodable content."""
|
|
Packit Service |
a04d08 |
content = self.null.join(
|
|
Packit Service |
a04d08 |
(self.bootflag, self.simple1, self.simple2, self.mixed))
|
|
Packit Service |
a04d08 |
m_load_file.return_value = content
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
self.assertEqual(
|
|
Packit Service |
a04d08 |
{'BOOTABLE_FLAG': self._val_decoded(self.bootflag),
|
|
Packit Service |
a04d08 |
'HOME': '/', 'PATH': '/bin:/sbin',
|
|
Packit Service |
a04d08 |
'MIXED': self._val_decoded(self.mixed)},
|
|
Packit Service |
a04d08 |
util.get_proc_env(1))
|
|
Packit Service |
a04d08 |
self.assertEqual(1, m_load_file.call_count)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
@mock.patch("cloudinit.util.load_file")
|
|
Packit Service |
a04d08 |
def test_encoding_none_returns_bytes(self, m_load_file):
|
|
Packit Service |
a04d08 |
"""encoding none returns bytes."""
|
|
Packit Service |
a04d08 |
lines = (self.bootflag, self.simple1, self.simple2, self.mixed)
|
|
Packit Service |
a04d08 |
content = self.null.join(lines)
|
|
Packit Service |
a04d08 |
m_load_file.return_value = content
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
self.assertEqual(
|
|
Packit Service |
a04d08 |
dict([t.split(b'=') for t in lines]),
|
|
Packit Service |
a04d08 |
util.get_proc_env(1, encoding=None))
|
|
Packit Service |
a04d08 |
self.assertEqual(1, m_load_file.call_count)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
@mock.patch("cloudinit.util.load_file")
|
|
Packit Service |
a04d08 |
def test_all_utf8_encoded(self, m_load_file):
|
|
Packit Service |
a04d08 |
"""common path where only utf-8 decodable content."""
|
|
Packit Service |
a04d08 |
content = self.null.join((self.simple1, self.simple2))
|
|
Packit Service |
a04d08 |
m_load_file.return_value = content
|
|
Packit Service |
a04d08 |
self.assertEqual(
|
|
Packit Service |
a04d08 |
{'HOME': '/', 'PATH': '/bin:/sbin'},
|
|
Packit Service |
a04d08 |
util.get_proc_env(1))
|
|
Packit Service |
a04d08 |
self.assertEqual(1, m_load_file.call_count)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
@mock.patch("cloudinit.util.load_file")
|
|
Packit Service |
a04d08 |
def test_non_existing_file_returns_empty_dict(self, m_load_file):
|
|
Packit Service |
a04d08 |
"""as implemented, a non-existing pid returns empty dict.
|
|
Packit Service |
a04d08 |
This is how it was originally implemented."""
|
|
Packit Service |
a04d08 |
m_load_file.side_effect = OSError("File does not exist.")
|
|
Packit Service |
a04d08 |
self.assertEqual({}, util.get_proc_env(1))
|
|
Packit Service |
a04d08 |
self.assertEqual(1, m_load_file.call_count)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_get_proc_ppid(self):
|
|
Packit Service |
a04d08 |
"""get_proc_ppid returns correct parent pid value."""
|
|
Packit Service |
a04d08 |
my_pid = os.getpid()
|
|
Packit Service |
a04d08 |
my_ppid = os.getppid()
|
|
Packit Service |
a04d08 |
self.assertEqual(my_ppid, util.get_proc_ppid(my_pid))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
9bfd13 |
|
|
Packit Service |
9bfd13 |
class TestKernelVersion():
|
|
Packit Service |
9bfd13 |
"""test kernel version function"""
|
|
Packit Service |
9bfd13 |
|
|
Packit Service |
9bfd13 |
params = [
|
|
Packit Service |
9bfd13 |
('5.6.19-300.fc32.x86_64', (5, 6)),
|
|
Packit Service |
9bfd13 |
('4.15.0-101-generic', (4, 15)),
|
|
Packit Service |
9bfd13 |
('3.10.0-1062.12.1.vz7.131.10', (3, 10)),
|
|
Packit Service |
9bfd13 |
('4.18.0-144.el8.x86_64', (4, 18))]
|
|
Packit Service |
9bfd13 |
|
|
Packit Service |
9bfd13 |
@mock.patch('os.uname')
|
|
Packit Service |
9bfd13 |
@pytest.mark.parametrize("uname_release,expected", params)
|
|
Packit Service |
9bfd13 |
def test_kernel_version(self, m_uname, uname_release, expected):
|
|
Packit Service |
9bfd13 |
m_uname.return_value.release = uname_release
|
|
Packit Service |
9bfd13 |
assert expected == util.kernel_version()
|
|
Packit Service |
9bfd13 |
|
|
Packit Service |
9bfd13 |
|
|
Packit Service |
9bfd13 |
class TestFindDevs:
|
|
Packit Service |
9bfd13 |
@mock.patch('cloudinit.subp.subp')
|
|
Packit Service |
9bfd13 |
def test_find_devs_with(self, m_subp):
|
|
Packit Service |
9bfd13 |
m_subp.return_value = (
|
|
Packit Service |
9bfd13 |
'/dev/sda1: UUID="some-uuid" TYPE="ext4" PARTUUID="some-partid"',
|
|
Packit Service |
9bfd13 |
''
|
|
Packit Service |
9bfd13 |
)
|
|
Packit Service |
9bfd13 |
devlist = util.find_devs_with()
|
|
Packit Service |
9bfd13 |
assert devlist == [
|
|
Packit Service |
9bfd13 |
'/dev/sda1: UUID="some-uuid" TYPE="ext4" PARTUUID="some-partid"']
|
|
Packit Service |
9bfd13 |
|
|
Packit Service |
9bfd13 |
devlist = util.find_devs_with("LABEL_FATBOOT=A_LABEL")
|
|
Packit Service |
9bfd13 |
assert devlist == [
|
|
Packit Service |
9bfd13 |
'/dev/sda1: UUID="some-uuid" TYPE="ext4" PARTUUID="some-partid"']
|
|
Packit Service |
9bfd13 |
|
|
Packit Service |
9bfd13 |
@mock.patch('cloudinit.subp.subp')
|
|
Packit Service |
9bfd13 |
def test_find_devs_with_openbsd(self, m_subp):
|
|
Packit Service |
9bfd13 |
m_subp.return_value = (
|
|
Packit Service |
9bfd13 |
'cd0:,sd0:630d98d32b5d3759,sd1:,fd0:', ''
|
|
Packit Service |
9bfd13 |
)
|
|
Packit Service |
9bfd13 |
devlist = util.find_devs_with_openbsd()
|
|
Packit Service |
9bfd13 |
assert devlist == ['/dev/cd0a', '/dev/sd1i']
|
|
Packit Service |
9bfd13 |
|
|
Packit Service |
9bfd13 |
@mock.patch('cloudinit.subp.subp')
|
|
Packit Service |
9bfd13 |
def test_find_devs_with_openbsd_with_criteria(self, m_subp):
|
|
Packit Service |
9bfd13 |
m_subp.return_value = (
|
|
Packit Service |
9bfd13 |
'cd0:,sd0:630d98d32b5d3759,sd1:,fd0:', ''
|
|
Packit Service |
9bfd13 |
)
|
|
Packit Service |
9bfd13 |
devlist = util.find_devs_with_openbsd(criteria="TYPE=iso9660")
|
|
Packit Service |
9bfd13 |
assert devlist == ['/dev/cd0a']
|
|
Packit Service |
9bfd13 |
|
|
Packit Service |
9bfd13 |
# lp: #1841466
|
|
Packit Service |
9bfd13 |
devlist = util.find_devs_with_openbsd(criteria="LABEL_FATBOOT=A_LABEL")
|
|
Packit Service |
9bfd13 |
assert devlist == ['/dev/cd0a', '/dev/sd1i']
|
|
Packit Service |
9bfd13 |
|
|
Packit Service |
9bfd13 |
@pytest.mark.parametrize(
|
|
Packit Service |
9bfd13 |
'criteria,expected_devlist', (
|
|
Packit Service |
9bfd13 |
(None, ['/dev/msdosfs/EFISYS', '/dev/iso9660/config-2']),
|
|
Packit Service |
9bfd13 |
('TYPE=iso9660', ['/dev/iso9660/config-2']),
|
|
Packit Service |
9bfd13 |
('TYPE=vfat', ['/dev/msdosfs/EFISYS']),
|
|
Packit Service |
9bfd13 |
('LABEL_FATBOOT=A_LABEL', []), # lp: #1841466
|
|
Packit Service |
9bfd13 |
),
|
|
Packit Service |
9bfd13 |
)
|
|
Packit Service |
9bfd13 |
@mock.patch('glob.glob')
|
|
Packit Service |
9bfd13 |
def test_find_devs_with_freebsd(self, m_glob, criteria, expected_devlist):
|
|
Packit Service |
9bfd13 |
def fake_glob(pattern):
|
|
Packit Service |
9bfd13 |
msdos = ["/dev/msdosfs/EFISYS"]
|
|
Packit Service |
9bfd13 |
iso9660 = ["/dev/iso9660/config-2"]
|
|
Packit Service |
9bfd13 |
if pattern == "/dev/msdosfs/*":
|
|
Packit Service |
9bfd13 |
return msdos
|
|
Packit Service |
9bfd13 |
elif pattern == "/dev/iso9660/*":
|
|
Packit Service |
9bfd13 |
return iso9660
|
|
Packit Service |
9bfd13 |
raise Exception
|
|
Packit Service |
9bfd13 |
m_glob.side_effect = fake_glob
|
|
Packit Service |
9bfd13 |
|
|
Packit Service |
9bfd13 |
devlist = util.find_devs_with_freebsd(criteria=criteria)
|
|
Packit Service |
9bfd13 |
assert devlist == expected_devlist
|
|
Packit Service |
9bfd13 |
|
|
Packit Service |
9bfd13 |
@pytest.mark.parametrize(
|
|
Packit Service |
9bfd13 |
'criteria,expected_devlist', (
|
|
Packit Service |
9bfd13 |
(None, ['/dev/ld0', '/dev/dk0', '/dev/dk1', '/dev/cd0']),
|
|
Packit Service |
9bfd13 |
('TYPE=iso9660', ['/dev/cd0']),
|
|
Packit Service |
9bfd13 |
('TYPE=vfat', ["/dev/ld0", "/dev/dk0", "/dev/dk1"]),
|
|
Packit Service |
9bfd13 |
('LABEL_FATBOOT=A_LABEL', # lp: #1841466
|
|
Packit Service |
9bfd13 |
['/dev/ld0', '/dev/dk0', '/dev/dk1', '/dev/cd0']),
|
|
Packit Service |
9bfd13 |
)
|
|
Packit Service |
9bfd13 |
)
|
|
Packit Service |
9bfd13 |
@mock.patch("cloudinit.subp.subp")
|
|
Packit Service |
9bfd13 |
def test_find_devs_with_netbsd(self, m_subp, criteria, expected_devlist):
|
|
Packit Service |
9bfd13 |
side_effect_values = [
|
|
Packit Service |
9bfd13 |
("ld0 dk0 dk1 cd0", ""),
|
|
Packit Service |
9bfd13 |
(
|
|
Packit Service |
9bfd13 |
(
|
|
Packit Service |
9bfd13 |
"mscdlabel: CDIOREADTOCHEADER: "
|
|
Packit Service |
9bfd13 |
"Inappropriate ioctl for device\n"
|
|
Packit Service |
9bfd13 |
"track (ctl=4) at sector 0\n"
|
|
Packit Service |
9bfd13 |
"disklabel not written\n"
|
|
Packit Service |
9bfd13 |
),
|
|
Packit Service |
9bfd13 |
"",
|
|
Packit Service |
9bfd13 |
),
|
|
Packit Service |
9bfd13 |
(
|
|
Packit Service |
9bfd13 |
(
|
|
Packit Service |
9bfd13 |
"mscdlabel: CDIOREADTOCHEADER: "
|
|
Packit Service |
9bfd13 |
"Inappropriate ioctl for device\n"
|
|
Packit Service |
9bfd13 |
"track (ctl=4) at sector 0\n"
|
|
Packit Service |
9bfd13 |
"disklabel not written\n"
|
|
Packit Service |
9bfd13 |
),
|
|
Packit Service |
9bfd13 |
"",
|
|
Packit Service |
9bfd13 |
),
|
|
Packit Service |
9bfd13 |
(
|
|
Packit Service |
9bfd13 |
(
|
|
Packit Service |
9bfd13 |
"mscdlabel: CDIOREADTOCHEADER: "
|
|
Packit Service |
9bfd13 |
"Inappropriate ioctl for device\n"
|
|
Packit Service |
9bfd13 |
"track (ctl=4) at sector 0\n"
|
|
Packit Service |
9bfd13 |
"disklabel not written\n"
|
|
Packit Service |
9bfd13 |
),
|
|
Packit Service |
9bfd13 |
"",
|
|
Packit Service |
9bfd13 |
),
|
|
Packit Service |
9bfd13 |
(
|
|
Packit Service |
9bfd13 |
(
|
|
Packit Service |
9bfd13 |
"track (ctl=4) at sector 0\n"
|
|
Packit Service |
9bfd13 |
'ISO filesystem, label "config-2", '
|
|
Packit Service |
9bfd13 |
"creation time: 2020/03/31 17:29\n"
|
|
Packit Service |
9bfd13 |
"adding as 'a'\n"
|
|
Packit Service |
9bfd13 |
),
|
|
Packit Service |
9bfd13 |
"",
|
|
Packit Service |
9bfd13 |
),
|
|
Packit Service |
9bfd13 |
]
|
|
Packit Service |
9bfd13 |
m_subp.side_effect = side_effect_values
|
|
Packit Service |
9bfd13 |
devlist = util.find_devs_with_netbsd(criteria=criteria)
|
|
Packit Service |
9bfd13 |
assert devlist == expected_devlist
|
|
Packit Service |
9bfd13 |
|
|
Packit Service |
a04d08 |
# vi: ts=4 expandtab
|