Blob Blame History Raw
# This file is part of cloud-init. See LICENSE file for license information.

from unittest import mock

from cloudinit import distros
from cloudinit.distros import ug_util
from cloudinit import helpers
from cloudinit import settings

from cloudinit.tests.helpers import TestCase


bcfg = {
    'name': 'bob',
    'plain_text_passwd': 'ubuntu',
    'home': "/home/ubuntu",
    'shell': "/bin/bash",
    'lock_passwd': True,
    'gecos': "Ubuntu",
    'groups': ["foo"]
}


class TestUGNormalize(TestCase):

    def setUp(self):
        super(TestUGNormalize, self).setUp()
        self.add_patch('cloudinit.util.system_is_snappy', 'm_snappy')
        self.add_patch('cloudinit.util.system_info', 'm_sysinfo')
        self.m_sysinfo.return_value = {'dist': ('Distro', '99.1', 'Codename')}

    def _make_distro(self, dtype, def_user=None):
        cfg = dict(settings.CFG_BUILTIN)
        cfg['system_info']['distro'] = dtype
        paths = helpers.Paths(cfg['system_info']['paths'])
        distro_cls = distros.fetch(dtype)
        if def_user:
            cfg['system_info']['default_user'] = def_user.copy()
        distro = distro_cls(dtype, cfg['system_info'], paths)
        return distro

    def _norm(self, cfg, distro):
        return ug_util.normalize_users_groups(cfg, distro)

    def test_group_dict(self):
        distro = self._make_distro('ubuntu')
        g = {'groups':
             [{'ubuntu': ['foo', 'bar'],
               'bob': 'users'},
              'cloud-users',
              {'bob': 'users2'}]}
        (_users, groups) = self._norm(g, distro)
        self.assertIn('ubuntu', groups)
        ub_members = groups['ubuntu']
        self.assertEqual(sorted(['foo', 'bar']), sorted(ub_members))
        self.assertIn('bob', groups)
        b_members = groups['bob']
        self.assertEqual(sorted(['users', 'users2']),
                         sorted(b_members))

    def test_basic_groups(self):
        distro = self._make_distro('ubuntu')
        ug_cfg = {
            'groups': ['bob'],
        }
        (users, groups) = self._norm(ug_cfg, distro)
        self.assertIn('bob', groups)
        self.assertEqual({}, users)

    def test_csv_groups(self):
        distro = self._make_distro('ubuntu')
        ug_cfg = {
            'groups': 'bob,joe,steve',
        }
        (users, groups) = self._norm(ug_cfg, distro)
        self.assertIn('bob', groups)
        self.assertIn('joe', groups)
        self.assertIn('steve', groups)
        self.assertEqual({}, users)

    def test_more_groups(self):
        distro = self._make_distro('ubuntu')
        ug_cfg = {
            'groups': ['bob', 'joe', 'steve']
        }
        (users, groups) = self._norm(ug_cfg, distro)
        self.assertIn('bob', groups)
        self.assertIn('joe', groups)
        self.assertIn('steve', groups)
        self.assertEqual({}, users)

    def test_member_groups(self):
        distro = self._make_distro('ubuntu')
        ug_cfg = {
            'groups': {
                'bob': ['s'],
                'joe': [],
                'steve': [],
            }
        }
        (users, groups) = self._norm(ug_cfg, distro)
        self.assertIn('bob', groups)
        self.assertEqual(['s'], groups['bob'])
        self.assertEqual([], groups['joe'])
        self.assertIn('joe', groups)
        self.assertIn('steve', groups)
        self.assertEqual({}, users)

    def test_users_simple_dict(self):
        distro = self._make_distro('ubuntu', bcfg)
        ug_cfg = {
            'users': {
                'default': True,
            }
        }
        (users, _groups) = self._norm(ug_cfg, distro)
        self.assertIn('bob', users)
        ug_cfg = {
            'users': {
                'default': 'yes',
            }
        }
        (users, _groups) = self._norm(ug_cfg, distro)
        self.assertIn('bob', users)
        ug_cfg = {
            'users': {
                'default': '1',
            }
        }
        (users, _groups) = self._norm(ug_cfg, distro)
        self.assertIn('bob', users)

    def test_users_simple_dict_no(self):
        distro = self._make_distro('ubuntu', bcfg)
        ug_cfg = {
            'users': {
                'default': False,
            }
        }
        (users, _groups) = self._norm(ug_cfg, distro)
        self.assertEqual({}, users)
        ug_cfg = {
            'users': {
                'default': 'no',
            }
        }
        (users, _groups) = self._norm(ug_cfg, distro)
        self.assertEqual({}, users)

    def test_users_simple_csv(self):
        distro = self._make_distro('ubuntu')
        ug_cfg = {
            'users': 'joe,bob',
        }
        (users, _groups) = self._norm(ug_cfg, distro)
        self.assertIn('joe', users)
        self.assertIn('bob', users)
        self.assertEqual({'default': False}, users['joe'])
        self.assertEqual({'default': False}, users['bob'])

    def test_users_simple(self):
        distro = self._make_distro('ubuntu')
        ug_cfg = {
            'users': [
                'joe',
                'bob'
            ],
        }
        (users, _groups) = self._norm(ug_cfg, distro)
        self.assertIn('joe', users)
        self.assertIn('bob', users)
        self.assertEqual({'default': False}, users['joe'])
        self.assertEqual({'default': False}, users['bob'])

    def test_users_old_user(self):
        distro = self._make_distro('ubuntu', bcfg)
        ug_cfg = {
            'user': 'zetta',
            'users': 'default'
        }
        (users, _groups) = self._norm(ug_cfg, distro)
        self.assertNotIn('bob', users)  # Bob is not the default now, zetta is
        self.assertIn('zetta', users)
        self.assertTrue(users['zetta']['default'])
        self.assertNotIn('default', users)
        ug_cfg = {
            'user': 'zetta',
            'users': 'default, joe'
        }
        (users, _groups) = self._norm(ug_cfg, distro)
        self.assertNotIn('bob', users)  # Bob is not the default now, zetta is
        self.assertIn('joe', users)
        self.assertIn('zetta', users)
        self.assertTrue(users['zetta']['default'])
        self.assertNotIn('default', users)
        ug_cfg = {
            'user': 'zetta',
            'users': ['bob', 'joe']
        }
        (users, _groups) = self._norm(ug_cfg, distro)
        self.assertIn('bob', users)
        self.assertIn('joe', users)
        self.assertIn('zetta', users)
        self.assertTrue(users['zetta']['default'])
        ug_cfg = {
            'user': 'zetta',
            'users': {
                'bob': True,
                'joe': True,
            }
        }
        (users, _groups) = self._norm(ug_cfg, distro)
        self.assertIn('bob', users)
        self.assertIn('joe', users)
        self.assertIn('zetta', users)
        self.assertTrue(users['zetta']['default'])
        ug_cfg = {
            'user': 'zetta',
        }
        (users, _groups) = self._norm(ug_cfg, distro)
        self.assertIn('zetta', users)
        ug_cfg = {}
        (users, groups) = self._norm(ug_cfg, distro)
        self.assertEqual({}, users)
        self.assertEqual({}, groups)

    def test_users_dict_default_additional(self):
        distro = self._make_distro('ubuntu', bcfg)
        ug_cfg = {
            'users': [
                {'name': 'default', 'blah': True}
            ],
        }
        (users, _groups) = self._norm(ug_cfg, distro)
        self.assertIn('bob', users)
        self.assertEqual(",".join(distro.get_default_user()['groups']),
                         users['bob']['groups'])
        self.assertEqual(True, users['bob']['blah'])
        self.assertEqual(True, users['bob']['default'])

    def test_users_dict_extract(self):
        distro = self._make_distro('ubuntu', bcfg)
        ug_cfg = {
            'users': [
                'default',
            ],
        }
        (users, _groups) = self._norm(ug_cfg, distro)
        self.assertIn('bob', users)
        (name, config) = ug_util.extract_default(users)
        self.assertEqual(name, 'bob')
        expected_config = {}
        def_config = None
        try:
            def_config = distro.get_default_user()
        except NotImplementedError:
            pass
        if not def_config:
            def_config = {}
        expected_config.update(def_config)

        # Ignore these for now
        expected_config.pop('name', None)
        expected_config.pop('groups', None)
        config.pop('groups', None)
        self.assertEqual(config, expected_config)

    def test_users_dict_default(self):
        distro = self._make_distro('ubuntu', bcfg)
        ug_cfg = {
            'users': [
                'default',
            ],
        }
        (users, _groups) = self._norm(ug_cfg, distro)
        self.assertIn('bob', users)
        self.assertEqual(",".join(distro.get_default_user()['groups']),
                         users['bob']['groups'])
        self.assertEqual(True, users['bob']['default'])

    def test_users_dict_trans(self):
        distro = self._make_distro('ubuntu')
        ug_cfg = {
            'users': [
                {'name': 'joe',
                 'tr-me': True},
                {'name': 'bob'},
            ],
        }
        (users, _groups) = self._norm(ug_cfg, distro)
        self.assertIn('joe', users)
        self.assertIn('bob', users)
        self.assertEqual({'tr_me': True, 'default': False}, users['joe'])
        self.assertEqual({'default': False}, users['bob'])

    def test_users_dict(self):
        distro = self._make_distro('ubuntu')
        ug_cfg = {
            'users': [
                {'name': 'joe'},
                {'name': 'bob'},
            ],
        }
        (users, _groups) = self._norm(ug_cfg, distro)
        self.assertIn('joe', users)
        self.assertIn('bob', users)
        self.assertEqual({'default': False}, users['joe'])
        self.assertEqual({'default': False}, users['bob'])

    @mock.patch('cloudinit.subp.subp')
    def test_create_snap_user(self, mock_subp):
        mock_subp.side_effect = [('{"username": "joe", "ssh-key-count": 1}\n',
                                  '')]
        distro = self._make_distro('ubuntu')
        ug_cfg = {
            'users': [
                {'name': 'joe', 'snapuser': 'joe@joe.com'},
            ],
        }
        (users, _groups) = self._norm(ug_cfg, distro)
        for (user, config) in users.items():
            print('user=%s config=%s' % (user, config))
            username = distro.create_user(user, **config)

        snapcmd = ['snap', 'create-user', '--sudoer', '--json', 'joe@joe.com']
        mock_subp.assert_called_with(snapcmd, capture=True, logstring=snapcmd)
        self.assertEqual(username, 'joe')

    @mock.patch('cloudinit.subp.subp')
    def test_create_snap_user_known(self, mock_subp):
        mock_subp.side_effect = [('{"username": "joe", "ssh-key-count": 1}\n',
                                  '')]
        distro = self._make_distro('ubuntu')
        ug_cfg = {
            'users': [
                {'name': 'joe', 'snapuser': 'joe@joe.com', 'known': True},
            ],
        }
        (users, _groups) = self._norm(ug_cfg, distro)
        for (user, config) in users.items():
            print('user=%s config=%s' % (user, config))
            username = distro.create_user(user, **config)

        snapcmd = ['snap', 'create-user', '--sudoer', '--json', '--known',
                   'joe@joe.com']
        mock_subp.assert_called_with(snapcmd, capture=True, logstring=snapcmd)
        self.assertEqual(username, 'joe')

    @mock.patch('cloudinit.util.system_is_snappy')
    @mock.patch('cloudinit.util.is_group')
    @mock.patch('cloudinit.subp.subp')
    def test_add_user_on_snappy_system(self, mock_subp, mock_isgrp,
                                       mock_snappy):
        mock_isgrp.return_value = False
        mock_subp.return_value = True
        mock_snappy.return_value = True
        distro = self._make_distro('ubuntu')
        ug_cfg = {
            'users': [
                {'name': 'joe', 'groups': 'users', 'create_groups': True},
            ],
        }
        (users, _groups) = self._norm(ug_cfg, distro)
        for (user, config) in users.items():
            print('user=%s config=%s' % (user, config))
            distro.add_user(user, **config)

        groupcmd = ['groupadd', 'users', '--extrausers']
        addcmd = ['useradd', 'joe', '--extrausers', '--groups', 'users', '-m']

        mock_subp.assert_any_call(groupcmd)
        mock_subp.assert_any_call(addcmd, logstring=addcmd)

# vi: ts=4 expandtab