# This file is part of cloud-init. See LICENSE file for license information.
from unittest import mock
from cloudinit import distros
from cloudinit.distros import ug_util
from cloudinit import helpers
from cloudinit import settings
from cloudinit.tests.helpers import TestCase
bcfg = {
'name': 'bob',
'plain_text_passwd': 'ubuntu',
'home': "/home/ubuntu",
'shell': "/bin/bash",
'lock_passwd': True,
'gecos': "Ubuntu",
'groups': ["foo"]
}
class TestUGNormalize(TestCase):
def setUp(self):
super(TestUGNormalize, self).setUp()
self.add_patch('cloudinit.util.system_is_snappy', 'm_snappy')
self.add_patch('cloudinit.util.system_info', 'm_sysinfo')
self.m_sysinfo.return_value = {'dist': ('Distro', '99.1', 'Codename')}
def _make_distro(self, dtype, def_user=None):
cfg = dict(settings.CFG_BUILTIN)
cfg['system_info']['distro'] = dtype
paths = helpers.Paths(cfg['system_info']['paths'])
distro_cls = distros.fetch(dtype)
if def_user:
cfg['system_info']['default_user'] = def_user.copy()
distro = distro_cls(dtype, cfg['system_info'], paths)
return distro
def _norm(self, cfg, distro):
return ug_util.normalize_users_groups(cfg, distro)
def test_group_dict(self):
distro = self._make_distro('ubuntu')
g = {'groups':
[{'ubuntu': ['foo', 'bar'],
'bob': 'users'},
'cloud-users',
{'bob': 'users2'}]}
(_users, groups) = self._norm(g, distro)
self.assertIn('ubuntu', groups)
ub_members = groups['ubuntu']
self.assertEqual(sorted(['foo', 'bar']), sorted(ub_members))
self.assertIn('bob', groups)
b_members = groups['bob']
self.assertEqual(sorted(['users', 'users2']),
sorted(b_members))
def test_basic_groups(self):
distro = self._make_distro('ubuntu')
ug_cfg = {
'groups': ['bob'],
}
(users, groups) = self._norm(ug_cfg, distro)
self.assertIn('bob', groups)
self.assertEqual({}, users)
def test_csv_groups(self):
distro = self._make_distro('ubuntu')
ug_cfg = {
'groups': 'bob,joe,steve',
}
(users, groups) = self._norm(ug_cfg, distro)
self.assertIn('bob', groups)
self.assertIn('joe', groups)
self.assertIn('steve', groups)
self.assertEqual({}, users)
def test_more_groups(self):
distro = self._make_distro('ubuntu')
ug_cfg = {
'groups': ['bob', 'joe', 'steve']
}
(users, groups) = self._norm(ug_cfg, distro)
self.assertIn('bob', groups)
self.assertIn('joe', groups)
self.assertIn('steve', groups)
self.assertEqual({}, users)
def test_member_groups(self):
distro = self._make_distro('ubuntu')
ug_cfg = {
'groups': {
'bob': ['s'],
'joe': [],
'steve': [],
}
}
(users, groups) = self._norm(ug_cfg, distro)
self.assertIn('bob', groups)
self.assertEqual(['s'], groups['bob'])
self.assertEqual([], groups['joe'])
self.assertIn('joe', groups)
self.assertIn('steve', groups)
self.assertEqual({}, users)
def test_users_simple_dict(self):
distro = self._make_distro('ubuntu', bcfg)
ug_cfg = {
'users': {
'default': True,
}
}
(users, _groups) = self._norm(ug_cfg, distro)
self.assertIn('bob', users)
ug_cfg = {
'users': {
'default': 'yes',
}
}
(users, _groups) = self._norm(ug_cfg, distro)
self.assertIn('bob', users)
ug_cfg = {
'users': {
'default': '1',
}
}
(users, _groups) = self._norm(ug_cfg, distro)
self.assertIn('bob', users)
def test_users_simple_dict_no(self):
distro = self._make_distro('ubuntu', bcfg)
ug_cfg = {
'users': {
'default': False,
}
}
(users, _groups) = self._norm(ug_cfg, distro)
self.assertEqual({}, users)
ug_cfg = {
'users': {
'default': 'no',
}
}
(users, _groups) = self._norm(ug_cfg, distro)
self.assertEqual({}, users)
def test_users_simple_csv(self):
distro = self._make_distro('ubuntu')
ug_cfg = {
'users': 'joe,bob',
}
(users, _groups) = self._norm(ug_cfg, distro)
self.assertIn('joe', users)
self.assertIn('bob', users)
self.assertEqual({'default': False}, users['joe'])
self.assertEqual({'default': False}, users['bob'])
def test_users_simple(self):
distro = self._make_distro('ubuntu')
ug_cfg = {
'users': [
'joe',
'bob'
],
}
(users, _groups) = self._norm(ug_cfg, distro)
self.assertIn('joe', users)
self.assertIn('bob', users)
self.assertEqual({'default': False}, users['joe'])
self.assertEqual({'default': False}, users['bob'])
def test_users_old_user(self):
distro = self._make_distro('ubuntu', bcfg)
ug_cfg = {
'user': 'zetta',
'users': 'default'
}
(users, _groups) = self._norm(ug_cfg, distro)
self.assertNotIn('bob', users) # Bob is not the default now, zetta is
self.assertIn('zetta', users)
self.assertTrue(users['zetta']['default'])
self.assertNotIn('default', users)
ug_cfg = {
'user': 'zetta',
'users': 'default, joe'
}
(users, _groups) = self._norm(ug_cfg, distro)
self.assertNotIn('bob', users) # Bob is not the default now, zetta is
self.assertIn('joe', users)
self.assertIn('zetta', users)
self.assertTrue(users['zetta']['default'])
self.assertNotIn('default', users)
ug_cfg = {
'user': 'zetta',
'users': ['bob', 'joe']
}
(users, _groups) = self._norm(ug_cfg, distro)
self.assertIn('bob', users)
self.assertIn('joe', users)
self.assertIn('zetta', users)
self.assertTrue(users['zetta']['default'])
ug_cfg = {
'user': 'zetta',
'users': {
'bob': True,
'joe': True,
}
}
(users, _groups) = self._norm(ug_cfg, distro)
self.assertIn('bob', users)
self.assertIn('joe', users)
self.assertIn('zetta', users)
self.assertTrue(users['zetta']['default'])
ug_cfg = {
'user': 'zetta',
}
(users, _groups) = self._norm(ug_cfg, distro)
self.assertIn('zetta', users)
ug_cfg = {}
(users, groups) = self._norm(ug_cfg, distro)
self.assertEqual({}, users)
self.assertEqual({}, groups)
def test_users_dict_default_additional(self):
distro = self._make_distro('ubuntu', bcfg)
ug_cfg = {
'users': [
{'name': 'default', 'blah': True}
],
}
(users, _groups) = self._norm(ug_cfg, distro)
self.assertIn('bob', users)
self.assertEqual(",".join(distro.get_default_user()['groups']),
users['bob']['groups'])
self.assertEqual(True, users['bob']['blah'])
self.assertEqual(True, users['bob']['default'])
def test_users_dict_extract(self):
distro = self._make_distro('ubuntu', bcfg)
ug_cfg = {
'users': [
'default',
],
}
(users, _groups) = self._norm(ug_cfg, distro)
self.assertIn('bob', users)
(name, config) = ug_util.extract_default(users)
self.assertEqual(name, 'bob')
expected_config = {}
def_config = None
try:
def_config = distro.get_default_user()
except NotImplementedError:
pass
if not def_config:
def_config = {}
expected_config.update(def_config)
# Ignore these for now
expected_config.pop('name', None)
expected_config.pop('groups', None)
config.pop('groups', None)
self.assertEqual(config, expected_config)
def test_users_dict_default(self):
distro = self._make_distro('ubuntu', bcfg)
ug_cfg = {
'users': [
'default',
],
}
(users, _groups) = self._norm(ug_cfg, distro)
self.assertIn('bob', users)
self.assertEqual(",".join(distro.get_default_user()['groups']),
users['bob']['groups'])
self.assertEqual(True, users['bob']['default'])
def test_users_dict_trans(self):
distro = self._make_distro('ubuntu')
ug_cfg = {
'users': [
{'name': 'joe',
'tr-me': True},
{'name': 'bob'},
],
}
(users, _groups) = self._norm(ug_cfg, distro)
self.assertIn('joe', users)
self.assertIn('bob', users)
self.assertEqual({'tr_me': True, 'default': False}, users['joe'])
self.assertEqual({'default': False}, users['bob'])
def test_users_dict(self):
distro = self._make_distro('ubuntu')
ug_cfg = {
'users': [
{'name': 'joe'},
{'name': 'bob'},
],
}
(users, _groups) = self._norm(ug_cfg, distro)
self.assertIn('joe', users)
self.assertIn('bob', users)
self.assertEqual({'default': False}, users['joe'])
self.assertEqual({'default': False}, users['bob'])
@mock.patch('cloudinit.subp.subp')
def test_create_snap_user(self, mock_subp):
mock_subp.side_effect = [('{"username": "joe", "ssh-key-count": 1}\n',
'')]
distro = self._make_distro('ubuntu')
ug_cfg = {
'users': [
{'name': 'joe', 'snapuser': 'joe@joe.com'},
],
}
(users, _groups) = self._norm(ug_cfg, distro)
for (user, config) in users.items():
print('user=%s config=%s' % (user, config))
username = distro.create_user(user, **config)
snapcmd = ['snap', 'create-user', '--sudoer', '--json', 'joe@joe.com']
mock_subp.assert_called_with(snapcmd, capture=True, logstring=snapcmd)
self.assertEqual(username, 'joe')
@mock.patch('cloudinit.subp.subp')
def test_create_snap_user_known(self, mock_subp):
mock_subp.side_effect = [('{"username": "joe", "ssh-key-count": 1}\n',
'')]
distro = self._make_distro('ubuntu')
ug_cfg = {
'users': [
{'name': 'joe', 'snapuser': 'joe@joe.com', 'known': True},
],
}
(users, _groups) = self._norm(ug_cfg, distro)
for (user, config) in users.items():
print('user=%s config=%s' % (user, config))
username = distro.create_user(user, **config)
snapcmd = ['snap', 'create-user', '--sudoer', '--json', '--known',
'joe@joe.com']
mock_subp.assert_called_with(snapcmd, capture=True, logstring=snapcmd)
self.assertEqual(username, 'joe')
@mock.patch('cloudinit.util.system_is_snappy')
@mock.patch('cloudinit.util.is_group')
@mock.patch('cloudinit.subp.subp')
def test_add_user_on_snappy_system(self, mock_subp, mock_isgrp,
mock_snappy):
mock_isgrp.return_value = False
mock_subp.return_value = True
mock_snappy.return_value = True
distro = self._make_distro('ubuntu')
ug_cfg = {
'users': [
{'name': 'joe', 'groups': 'users', 'create_groups': True},
],
}
(users, _groups) = self._norm(ug_cfg, distro)
for (user, config) in users.items():
print('user=%s config=%s' % (user, config))
distro.add_user(user, **config)
groupcmd = ['groupadd', 'users', '--extrausers']
addcmd = ['useradd', 'joe', '--extrausers', '--groups', 'users', '-m']
mock_subp.assert_any_call(groupcmd)
mock_subp.assert_any_call(addcmd, logstring=addcmd)
# vi: ts=4 expandtab