# This file is part of cloud-init. See LICENSE file for license information.
import re
from cloudinit import distros
from cloudinit import ssh_util
from cloudinit.tests.helpers import (CiTestCase, mock)
class MyBaseDistro(distros.Distro):
# MyBaseDistro is here to test base Distro class implementations
def __init__(self, name="basedistro", cfg=None, paths=None):
if not cfg:
cfg = {}
if not paths:
paths = {}
super(MyBaseDistro, self).__init__(name, cfg, paths)
def install_packages(self, pkglist):
raise NotImplementedError()
def _write_network(self, settings):
raise NotImplementedError()
def package_command(self, cmd, args=None, pkgs=None):
raise NotImplementedError()
def update_package_sources(self):
raise NotImplementedError()
def apply_locale(self, locale, out_fn=None):
raise NotImplementedError()
def set_timezone(self, tz):
raise NotImplementedError()
def _read_hostname(self, filename, default=None):
raise NotImplementedError()
def _write_hostname(self, hostname, filename):
raise NotImplementedError()
def _read_system_hostname(self):
raise NotImplementedError()
@mock.patch("cloudinit.distros.util.system_is_snappy", return_value=False)
@mock.patch("cloudinit.distros.subp.subp")
class TestCreateUser(CiTestCase):
with_logs = True
def setUp(self):
super(TestCreateUser, self).setUp()
self.dist = MyBaseDistro()
def _useradd2call(self, args):
# return a mock call for the useradd command in args
# with expected 'logstring'.
args = ['useradd'] + args
logcmd = [a for a in args]
for i in range(len(args)):
if args[i] in ('--password',):
logcmd[i + 1] = 'REDACTED'
return mock.call(args, logstring=logcmd)
def test_basic(self, m_subp, m_is_snappy):
user = 'foouser'
self.dist.create_user(user)
self.assertEqual(
m_subp.call_args_list,
[self._useradd2call([user, '-m']),
mock.call(['passwd', '-l', user])])
def test_no_home(self, m_subp, m_is_snappy):
user = 'foouser'
self.dist.create_user(user, no_create_home=True)
self.assertEqual(
m_subp.call_args_list,
[self._useradd2call([user, '-M']),
mock.call(['passwd', '-l', user])])
def test_system_user(self, m_subp, m_is_snappy):
# system user should have no home and get --system
user = 'foouser'
self.dist.create_user(user, system=True)
self.assertEqual(
m_subp.call_args_list,
[self._useradd2call([user, '--system', '-M']),
mock.call(['passwd', '-l', user])])
def test_explicit_no_home_false(self, m_subp, m_is_snappy):
user = 'foouser'
self.dist.create_user(user, no_create_home=False)
self.assertEqual(
m_subp.call_args_list,
[self._useradd2call([user, '-m']),
mock.call(['passwd', '-l', user])])
def test_unlocked(self, m_subp, m_is_snappy):
user = 'foouser'
self.dist.create_user(user, lock_passwd=False)
self.assertEqual(
m_subp.call_args_list,
[self._useradd2call([user, '-m'])])
def test_set_password(self, m_subp, m_is_snappy):
user = 'foouser'
password = 'passfoo'
self.dist.create_user(user, passwd=password)
self.assertEqual(
m_subp.call_args_list,
[self._useradd2call([user, '--password', password, '-m']),
mock.call(['passwd', '-l', user])])
@mock.patch("cloudinit.distros.util.is_group")
def test_group_added(self, m_is_group, m_subp, m_is_snappy):
m_is_group.return_value = False
user = 'foouser'
self.dist.create_user(user, groups=['group1'])
expected = [
mock.call(['groupadd', 'group1']),
self._useradd2call([user, '--groups', 'group1', '-m']),
mock.call(['passwd', '-l', user])]
self.assertEqual(m_subp.call_args_list, expected)
@mock.patch("cloudinit.distros.util.is_group")
def test_only_new_group_added(self, m_is_group, m_subp, m_is_snappy):
ex_groups = ['existing_group']
groups = ['group1', ex_groups[0]]
m_is_group.side_effect = lambda m: m in ex_groups
user = 'foouser'
self.dist.create_user(user, groups=groups)
expected = [
mock.call(['groupadd', 'group1']),
self._useradd2call([user, '--groups', ','.join(groups), '-m']),
mock.call(['passwd', '-l', user])]
self.assertEqual(m_subp.call_args_list, expected)
@mock.patch("cloudinit.distros.util.is_group")
def test_create_groups_with_whitespace_string(
self, m_is_group, m_subp, m_is_snappy):
# groups supported as a comma delimeted string even with white space
m_is_group.return_value = False
user = 'foouser'
self.dist.create_user(user, groups='group1, group2')
expected = [
mock.call(['groupadd', 'group1']),
mock.call(['groupadd', 'group2']),
self._useradd2call([user, '--groups', 'group1,group2', '-m']),
mock.call(['passwd', '-l', user])]
self.assertEqual(m_subp.call_args_list, expected)
def test_explicit_sudo_false(self, m_subp, m_is_snappy):
user = 'foouser'
self.dist.create_user(user, sudo=False)
self.assertEqual(
m_subp.call_args_list,
[self._useradd2call([user, '-m']),
mock.call(['passwd', '-l', user])])
@mock.patch('cloudinit.ssh_util.setup_user_keys')
def test_setup_ssh_authorized_keys_with_string(
self, m_setup_user_keys, m_subp, m_is_snappy):
"""ssh_authorized_keys allows string and calls setup_user_keys."""
user = 'foouser'
self.dist.create_user(user, ssh_authorized_keys='mykey')
self.assertEqual(
m_subp.call_args_list,
[self._useradd2call([user, '-m']),
mock.call(['passwd', '-l', user])])
m_setup_user_keys.assert_called_once_with(set(['mykey']), user)
@mock.patch('cloudinit.ssh_util.setup_user_keys')
def test_setup_ssh_authorized_keys_with_list(
self, m_setup_user_keys, m_subp, m_is_snappy):
"""ssh_authorized_keys allows lists and calls setup_user_keys."""
user = 'foouser'
self.dist.create_user(user, ssh_authorized_keys=['key1', 'key2'])
self.assertEqual(
m_subp.call_args_list,
[self._useradd2call([user, '-m']),
mock.call(['passwd', '-l', user])])
m_setup_user_keys.assert_called_once_with(set(['key1', 'key2']), user)
@mock.patch('cloudinit.ssh_util.setup_user_keys')
def test_setup_ssh_authorized_keys_with_integer(
self, m_setup_user_keys, m_subp, m_is_snappy):
"""ssh_authorized_keys warns on non-iterable/string type."""
user = 'foouser'
self.dist.create_user(user, ssh_authorized_keys=-1)
m_setup_user_keys.assert_called_once_with(set([]), user)
match = re.match(
r'.*WARNING: Invalid type \'<(type|class) \'int\'>\' detected for'
' \'ssh_authorized_keys\'.*',
self.logs.getvalue(),
re.DOTALL)
self.assertIsNotNone(
match, 'Missing ssh_authorized_keys invalid type warning')
@mock.patch('cloudinit.ssh_util.setup_user_keys')
def test_create_user_with_ssh_redirect_user_no_cloud_keys(
self, m_setup_user_keys, m_subp, m_is_snappy):
"""Log a warning when trying to redirect a user no cloud ssh keys."""
user = 'foouser'
self.dist.create_user(user, ssh_redirect_user='someuser')
self.assertIn(
'WARNING: Unable to disable SSH logins for foouser given '
'ssh_redirect_user: someuser. No cloud public-keys present.\n',
self.logs.getvalue())
m_setup_user_keys.assert_not_called()
@mock.patch('cloudinit.ssh_util.setup_user_keys')
def test_create_user_with_ssh_redirect_user_with_cloud_keys(
self, m_setup_user_keys, m_subp, m_is_snappy):
"""Disable ssh when ssh_redirect_user and cloud ssh keys are set."""
user = 'foouser'
self.dist.create_user(
user, ssh_redirect_user='someuser', cloud_public_ssh_keys=['key1'])
disable_prefix = ssh_util.DISABLE_USER_OPTS
disable_prefix = disable_prefix.replace('$USER', 'someuser')
disable_prefix = disable_prefix.replace('$DISABLE_USER', user)
m_setup_user_keys.assert_called_once_with(
set(['key1']), 'foouser', options=disable_prefix)
@mock.patch('cloudinit.ssh_util.setup_user_keys')
def test_create_user_with_ssh_redirect_user_does_not_disable_auth_keys(
self, m_setup_user_keys, m_subp, m_is_snappy):
"""Do not disable ssh_authorized_keys when ssh_redirect_user is set."""
user = 'foouser'
self.dist.create_user(
user, ssh_authorized_keys='auth1', ssh_redirect_user='someuser',
cloud_public_ssh_keys=['key1'])
disable_prefix = ssh_util.DISABLE_USER_OPTS
disable_prefix = disable_prefix.replace('$USER', 'someuser')
disable_prefix = disable_prefix.replace('$DISABLE_USER', user)
self.assertEqual(
m_setup_user_keys.call_args_list,
[mock.call(set(['auth1']), user), # not disabled
mock.call(set(['key1']), 'foouser', options=disable_prefix)])
@mock.patch("cloudinit.distros.subp.which")
def test_lock_with_usermod_if_no_passwd(self, m_which, m_subp,
m_is_snappy):
"""Lock uses usermod --lock if no 'passwd' cmd available."""
m_which.side_effect = lambda m: m in ('usermod',)
self.dist.lock_passwd("bob")
self.assertEqual(
[mock.call(['usermod', '--lock', 'bob'])],
m_subp.call_args_list)
@mock.patch("cloudinit.distros.subp.which")
def test_lock_with_passwd_if_available(self, m_which, m_subp,
m_is_snappy):
"""Lock with only passwd will use passwd."""
m_which.side_effect = lambda m: m in ('passwd',)
self.dist.lock_passwd("bob")
self.assertEqual(
[mock.call(['passwd', '-l', 'bob'])],
m_subp.call_args_list)
@mock.patch("cloudinit.distros.subp.which")
def test_lock_raises_runtime_if_no_commands(self, m_which, m_subp,
m_is_snappy):
"""Lock with no commands available raises RuntimeError."""
m_which.return_value = None
with self.assertRaises(RuntimeError):
self.dist.lock_passwd("bob")
# vi: ts=4 expandtab