Blob Blame History Raw
# This file is part of cloud-init. See LICENSE file for license information.

from cloudinit.config.cc_snappy import (
    makeop, get_package_ops, render_snap_op)
from cloudinit.config.cc_snap_config import (
    add_assertions, add_snap_user, ASSERTIONS_FILE)
from cloudinit import (distros, helpers, cloud, util)
from cloudinit.config.cc_snap_config import handle as snap_handle
from cloudinit.sources import DataSourceNone
from cloudinit.tests.helpers import FilesystemMockingTestCase, mock

from cloudinit.tests import helpers as t_help

import logging
import os
import shutil
import tempfile
import textwrap
import yaml

LOG = logging.getLogger(__name__)
ALLOWED = (dict, list, int, str)


class TestInstallPackages(t_help.TestCase):
    def setUp(self):
        super(TestInstallPackages, self).setUp()
        self.unapply = []

        # by default 'which' has nothing in its path
        self.apply_patches([(util, 'subp', self._subp)])
        self.subp_called = []
        self.snapcmds = []
        self.tmp = tempfile.mkdtemp(prefix="TestInstallPackages")

    def tearDown(self):
        apply_patches([i for i in reversed(self.unapply)])
        shutil.rmtree(self.tmp)

    def apply_patches(self, patches):
        ret = apply_patches(patches)
        self.unapply += ret

    def populate_tmp(self, files):
        return t_help.populate_dir(self.tmp, files)

    def _subp(self, *args, **kwargs):
        # supports subp calling with cmd as args or kwargs
        if 'args' not in kwargs:
            kwargs['args'] = args[0]
        self.subp_called.append(kwargs)
        args = kwargs['args']
        # here we basically parse the snappy command invoked
        # and append to snapcmds a list of (mode, pkg, config)
        if args[0:2] == ['snappy', 'config']:
            if args[3] == "-":
                config = kwargs.get('data', '')
            else:
                with open(args[3], "rb") as fp:
                    config = yaml.safe_load(fp.read())
            self.snapcmds.append(['config', args[2], config])
        elif args[0:2] == ['snappy', 'install']:
            config = None
            pkg = None
            for arg in args[2:]:
                if arg.startswith("-"):
                    continue
                if not pkg:
                    pkg = arg
                elif not config:
                    cfgfile = arg
                    if cfgfile == "-":
                        config = kwargs.get('data', '')
                    elif cfgfile:
                        with open(cfgfile, "rb") as fp:
                            config = yaml.safe_load(fp.read())
            self.snapcmds.append(['install', pkg, config])

    def test_package_ops_1(self):
        ret = get_package_ops(
            packages=['pkg1', 'pkg2', 'pkg3'],
            configs={'pkg2': b'mycfg2'}, installed=[])
        self.assertEqual(
            ret, [makeop('install', 'pkg1', None, None),
                  makeop('install', 'pkg2', b'mycfg2', None),
                  makeop('install', 'pkg3', None, None)])

    def test_package_ops_config_only(self):
        ret = get_package_ops(
            packages=None,
            configs={'pkg2': b'mycfg2'}, installed=['pkg1', 'pkg2'])
        self.assertEqual(
            ret, [makeop('config', 'pkg2', b'mycfg2')])

    def test_package_ops_install_and_config(self):
        ret = get_package_ops(
            packages=['pkg3', 'pkg2'],
            configs={'pkg2': b'mycfg2', 'xinstalled': b'xcfg'},
            installed=['xinstalled'])
        self.assertEqual(
            ret, [makeop('install', 'pkg3'),
                  makeop('install', 'pkg2', b'mycfg2'),
                  makeop('config', 'xinstalled', b'xcfg')])

    def test_package_ops_install_long_config_short(self):
        # a package can be installed by full name, but have config by short
        cfg = {'k1': 'k2'}
        ret = get_package_ops(
            packages=['config-example.canonical'],
            configs={'config-example': cfg}, installed=[])
        self.assertEqual(
            ret, [makeop('install', 'config-example.canonical', cfg)])

    def test_package_ops_with_file(self):
        self.populate_tmp(
            {"snapf1.snap": b"foo1", "snapf1.config": b"snapf1cfg",
             "snapf2.snap": b"foo2", "foo.bar": "ignored"})
        ret = get_package_ops(
            packages=['pkg1'], configs={}, installed=[], fspath=self.tmp)
        self.assertEqual(
            ret,
            [makeop_tmpd(self.tmp, 'install', 'snapf1', path="snapf1.snap",
                         cfgfile="snapf1.config"),
             makeop_tmpd(self.tmp, 'install', 'snapf2', path="snapf2.snap"),
             makeop('install', 'pkg1')])

    def test_package_ops_common_filename(self):
        # fish package name from filename
        # package names likely look like: pkgname.namespace_version_arch.snap

        # find filenames
        self.populate_tmp(
            {"pkg-ws.smoser_0.3.4_all.snap": "pkg-ws-snapdata",
             "pkg-ws.config": "pkg-ws-config",
             "pkg1.smoser_1.2.3_all.snap": "pkg1.snapdata",
             "pkg1.smoser.config": "pkg1.smoser.config-data",
             "pkg1.config": "pkg1.config-data",
             "pkg2.smoser_0.0_amd64.snap": "pkg2-snapdata",
             "pkg2.smoser_0.0_amd64.config": "pkg2.config"})

        ret = get_package_ops(
            packages=[], configs={}, installed=[], fspath=self.tmp)
        self.assertEqual(
            ret,
            [makeop_tmpd(self.tmp, 'install', 'pkg-ws.smoser',
                         path="pkg-ws.smoser_0.3.4_all.snap",
                         cfgfile="pkg-ws.config"),
             makeop_tmpd(self.tmp, 'install', 'pkg1.smoser',
                         path="pkg1.smoser_1.2.3_all.snap",
                         cfgfile="pkg1.smoser.config"),
             makeop_tmpd(self.tmp, 'install', 'pkg2.smoser',
                         path="pkg2.smoser_0.0_amd64.snap",
                         cfgfile="pkg2.smoser_0.0_amd64.config"),
             ])

    def test_package_ops_config_overrides_file(self):
        # config data overrides local file .config
        self.populate_tmp(
            {"snapf1.snap": b"foo1", "snapf1.config": b"snapf1cfg"})
        ret = get_package_ops(
            packages=[], configs={'snapf1': 'snapf1cfg-config'},
            installed=[], fspath=self.tmp)
        self.assertEqual(
            ret, [makeop_tmpd(self.tmp, 'install', 'snapf1',
                              path="snapf1.snap", config="snapf1cfg-config")])

    def test_package_ops_namespacing(self):
        cfgs = {
            'config-example': {'k1': 'v1'},
            'pkg1': {'p1': 'p2'},
            'ubuntu-core': {'c1': 'c2'},
            'notinstalled.smoser': {'s1': 's2'},
        }
        ret = get_package_ops(
            packages=['config-example.canonical'], configs=cfgs,
            installed=['config-example.smoser', 'pkg1.canonical',
                       'ubuntu-core'])

        expected_configs = [
            makeop('config', 'pkg1', config=cfgs['pkg1']),
            makeop('config', 'ubuntu-core', config=cfgs['ubuntu-core'])]
        expected_installs = [
            makeop('install', 'config-example.canonical',
                   config=cfgs['config-example'])]

        installs = [i for i in ret if i['op'] == 'install']
        configs = [c for c in ret if c['op'] == 'config']

        self.assertEqual(installs, expected_installs)
        # configs are not ordered
        self.assertEqual(len(configs), len(expected_configs))
        self.assertTrue(all(found in expected_configs for found in configs))

    def test_render_op_localsnap(self):
        self.populate_tmp({"snapf1.snap": b"foo1"})
        op = makeop_tmpd(self.tmp, 'install', 'snapf1',
                         path='snapf1.snap')
        render_snap_op(**op)
        self.assertEqual(
            self.snapcmds, [['install', op['path'], None]])

    def test_render_op_localsnap_localconfig(self):
        self.populate_tmp(
            {"snapf1.snap": b"foo1", 'snapf1.config': b'snapf1cfg'})
        op = makeop_tmpd(self.tmp, 'install', 'snapf1',
                         path='snapf1.snap', cfgfile='snapf1.config')
        render_snap_op(**op)
        self.assertEqual(
            self.snapcmds, [['install', op['path'], 'snapf1cfg']])

    def test_render_op_snap(self):
        op = makeop('install', 'snapf1')
        render_snap_op(**op)
        self.assertEqual(
            self.snapcmds, [['install', 'snapf1', None]])

    def test_render_op_snap_config(self):
        mycfg = {'key1': 'value1'}
        name = "snapf1"
        op = makeop('install', name, config=mycfg)
        render_snap_op(**op)
        self.assertEqual(
            self.snapcmds, [['install', name, {'config': {name: mycfg}}]])

    def test_render_op_config_bytes(self):
        name = "snapf1"
        mycfg = b'myconfig'
        op = makeop('config', name, config=mycfg)
        render_snap_op(**op)
        self.assertEqual(
            self.snapcmds, [['config', 'snapf1', {'config': {name: mycfg}}]])

    def test_render_op_config_string(self):
        name = 'snapf1'
        mycfg = 'myconfig: foo\nhisconfig: bar\n'
        op = makeop('config', name, config=mycfg)
        render_snap_op(**op)
        self.assertEqual(
            self.snapcmds, [['config', 'snapf1', {'config': {name: mycfg}}]])

    def test_render_op_config_dict(self):
        # config entry for package can be a dict, not a string blob
        mycfg = {'foo': 'bar'}
        name = 'snapf1'
        op = makeop('config', name, config=mycfg)
        render_snap_op(**op)
        # snapcmds is a list of 3-entry lists. data_found will be the
        # blob of data in the file in 'snappy install --config=<file>'
        data_found = self.snapcmds[0][2]
        self.assertEqual(mycfg, data_found['config'][name])

    def test_render_op_config_list(self):
        # config entry for package can be a list, not a string blob
        mycfg = ['foo', 'bar', 'wark', {'f1': 'b1'}]
        name = "snapf1"
        op = makeop('config', name, config=mycfg)
        render_snap_op(**op)
        data_found = self.snapcmds[0][2]
        self.assertEqual(mycfg, data_found['config'][name])

    def test_render_op_config_int(self):
        # config entry for package can be a list, not a string blob
        mycfg = 1
        name = 'snapf1'
        op = makeop('config', name, config=mycfg)
        render_snap_op(**op)
        data_found = self.snapcmds[0][2]
        self.assertEqual(mycfg, data_found['config'][name])

    def test_render_long_configs_short(self):
        # install a namespaced package should have un-namespaced config
        mycfg = {'k1': 'k2'}
        name = 'snapf1'
        op = makeop('install', name + ".smoser", config=mycfg)
        render_snap_op(**op)
        data_found = self.snapcmds[0][2]
        self.assertEqual(mycfg, data_found['config'][name])

    def test_render_does_not_pad_cfgfile(self):
        # package_ops with cfgfile should not modify --file= content.
        mydata = "foo1: bar1\nk: [l1, l2, l3]\n"
        self.populate_tmp(
            {"snapf1.snap": b"foo1", "snapf1.config": mydata.encode()})
        ret = get_package_ops(
            packages=[], configs={}, installed=[], fspath=self.tmp)
        self.assertEqual(
            ret,
            [makeop_tmpd(self.tmp, 'install', 'snapf1', path="snapf1.snap",
                         cfgfile="snapf1.config")])

        # now the op was ok, but test that render didn't mess it up.
        render_snap_op(**ret[0])
        data_found = self.snapcmds[0][2]
        # the data found gets loaded in the snapcmd interpretation
        # so this comparison is a bit lossy, but input to snappy config
        # is expected to be yaml loadable, so it should be OK.
        self.assertEqual(yaml.safe_load(mydata), data_found)


class TestSnapConfig(FilesystemMockingTestCase):

    SYSTEM_USER_ASSERTION = textwrap.dedent("""
    type: system-user
    authority-id: LqvZQdfyfGlYvtep4W6Oj6pFXP9t1Ksp
    brand-id: LqvZQdfyfGlYvtep4W6Oj6pFXP9t1Ksp
    email: foo@bar.com
    password: $6$E5YiAuMIPAwX58jG$miomhVNui/vf7f/3ctB/f0RWSKFxG0YXzrJ9rtJ1ikvzt
    series:
      - 16
    since: 2016-09-10T16:34:00+03:00
    until: 2017-11-10T16:34:00+03:00
    username: baz
    sign-key-sha3-384: RuVvnp4n52GilycjfbbTCI3_L8Y6QlIE75wxMc0KzGV3AUQqVd9GuXoj

    AcLBXAQAAQoABgUCV/UU1wAKCRBKnlMoJQLkZVeLD/9/+hIeVywtzsDA3oxl+P+u9D13y9s6svP
    Jd6Wnf4FTw6sq1GjBE4ZA7lrwSaRCUJ9Vcsvf2q9OGPY7mOb2TBxaDe0PbUMjrSrqllSSQwhpNI
    zG+NxkkKuxsUmLzFa+k9m6cyojNbw5LFhQZBQCGlr3JYqC0tIREq/UsZxj+90TUC87lDJwkU8GF
    s4CR+rejZj4itIcDcVxCSnJH6hv6j2JrJskJmvObqTnoOlcab+JXdamXqbldSP3UIhWoyVjqzkj
    +to7mXgx+cCUA9+ngNCcfUG+1huGGTWXPCYkZ78HvErcRlIdeo4d3xwtz1cl/w3vYnq9og1XwsP
    Yfetr3boig2qs1Y+j/LpsfYBYncgWjeDfAB9ZZaqQz/oc8n87tIPZDJHrusTlBfop8CqcM4xsKS
    d+wnEY8e/F24mdSOYmS1vQCIDiRU3MKb6x138Ud6oHXFlRBbBJqMMctPqWDunWzb5QJ7YR0I39q
    BrnEqv5NE0G7w6HOJ1LSPG5Hae3P4T2ea+ATgkb03RPr3KnXnzXg4TtBbW1nytdlgoNc/BafE1H
    f3NThcq9gwX4xWZ2PAWnqVPYdDMyCtzW3Ck+o6sIzx+dh4gDLPHIi/6TPe/pUuMop9CBpWwez7V
    v1z+1+URx6Xlq3Jq18y5pZ6fY3IDJ6km2nQPMzcm4Q==""")

    ACCOUNT_ASSERTION = textwrap.dedent("""
    type: account-key
    authority-id: canonical
    revision: 2
    public-key-sha3-384: BWDEoaqyr25nF5SNCvEv2v7QnM9QsfCc0PBMYD_i2NGSQ32EF2d4D0
    account-id: canonical
    name: store
    since: 2016-04-01T00:00:00.0Z
    body-length: 717
    sign-key-sha3-384: -CvQKAwRQ5h3Ffn10FILJoEZUXOv6km9FwA80-Rcj-f-6jadQ89VRswH

    AcbBTQRWhcGAARAA0KKYYQWuHOrsFVi4p4l7ZzSvX7kLgJFFeFgOkzdWKBTHEnsMKjl5mefFe9j
    qe8NlmJdfY7BenP7XeBtwKp700H/t9lLrZbpTNAPHXYxEWFJp5bPqIcJYBZ+29oLVLN1Tc5X482
    vCiDqL8+pPYqBrK2fNlyPlNNSum9wI70rDDL4r6FVvr+osTnGejibdV8JphWX+lrSQDnRSdM8KJ
    UM43vTgLGTi9W54oRhsA2OFexRfRksTrnqGoonCjqX5wO3OFSaMDzMsO2MJ/hPfLgDqw53qjzuK
    Iec9OL3k5basvu2cj5u9tKwVFDsCKK2GbKUsWWpx2KTpOifmhmiAbzkTHbH9KaoMS7p0kJwhTQG
    o9aJ9VMTWHJc/NCBx7eu451u6d46sBPCXS/OMUh2766fQmoRtO1OwCTxsRKG2kkjbMn54UdFULl
    VfzvyghMNRKIezsEkmM8wueTqGUGZWa6CEZqZKwhe/PROxOPYzqtDH18XZknbU1n5lNb7vNfem9
    2ai+3+JyFnW9UhfvpVF7gzAgdyCqNli4C6BIN43uwoS8HkykocZS/+Gv52aUQ/NZ8BKOHLw+7an
    Q0o8W9ltSLZbEMxFIPSN0stiZlkXAp6DLyvh1Y4wXSynDjUondTpej2fSvSlCz/W5v5V7qA4nIc
    vUvV7RjVzv17ut0AEQEAAQ==

    AcLDXAQAAQoABgUCV83k9QAKCRDUpVvql9g3IBT8IACKZ7XpiBZ3W4lqbPssY6On81WmxQLtvsM
    WTp6zZpl/wWOSt2vMNUk9pvcmrNq1jG9CuhDfWFLGXEjcrrmVkN3YuCOajMSPFCGrxsIBLSRt/b
    nrKykdLAAzMfG8rP1d82bjFFiIieE+urQ0Kcv09Jtdvavq3JT1Tek5mFyyfhHNlQEKOzWqmRWiL
    3c3VOZUs1ZD8TSlnuq/x+5T0X0YtOyGjSlVxk7UybbyMNd6MZfNaMpIG4x+mxD3KHFtBAC7O6kL
    eX3i6j5nCY5UABfA3DZEAkWP4zlmdBEOvZ9t293NaDdOpzsUHRkoi0Zez/9BHQ/kwx/uNc2WqrY
    inCmu16JGNeXqsyinnLl7Ghn2RwhvDMlLxF6RTx8xdx1yk6p3PBTwhZMUvuZGjUtN/AG8BmVJQ1
    rsGSRkkSywvnhVJRB2sudnrMBmNS2goJbzSbmJnOlBrd2WsV0T9SgNMWZBiov3LvU4o2SmAb6b+
    rYwh8H5QHcuuYJuxDjFhPswIp6Wes5T6hUicf3SWtObcDS4HSkVS4ImBjjX9YgCuFy7QdnooOWE
    aPvkRw3XCVeYq0K6w9GRsk1YFErD4XmXXZjDYY650MX9v42Sz5MmphHV8jdIY5ssbadwFSe2rCQ
    6UX08zy7RsIb19hTndE6ncvSNDChUR9eEnCm73eYaWTWTnq1cxdVP/s52r8uss++OYOkPWqh5nO
    haRn7INjH/yZX4qXjNXlTjo0PnHH0q08vNKDwLhxS+D9du+70FeacXFyLIbcWllSbJ7DmbumGpF
    yYbtj3FDDPzachFQdIG3lSt+cSUGeyfSs6wVtc3cIPka/2Urx7RprfmoWSI6+a5NcLdj0u2z8O9
    HxeIgxDpg/3gT8ZIuFKePMcLDM19Fh/p0ysCsX+84B9chNWtsMSmIaE57V+959MVtsLu7SLb9gi
    skrju0pQCwsu2wHMLTNd1f3PTHmrr49hxetTus07HSQUApMtAGKzQilF5zqFjbyaTd4xgQbd+PK
    CjFyzQTDOcUhXpuUGt/IzlqiFfsCsmbj2K4KdSNYMlqIgZ3Azu8KvZLIhsyN7v5vNIZSPfEbjde
    ClU9r0VRiJmtYBUjcSghD9LWn+yRLwOxhfQVjm0cBwIt5R/yPF/qC76yIVuWUtM5Y2/zJR1J8OF
    qWchvlImHtvDzS9FQeLyzJAOjvZ2CnWp2gILgUz0WQdOk1Dq8ax7KS9BQ42zxw9EZAEPw3PEFqR
    IQsRTONp+iVS8YxSmoYZjDlCgRMWUmawez/Fv5b9Fb/XkO5Eq4e+KfrpUujXItaipb+tV8h5v3t
    oG3Ie3WOHrVjCLXIdYslpL1O4nadqR6Xv58pHj6k""")

    test_assertions = [ACCOUNT_ASSERTION, SYSTEM_USER_ASSERTION]

    def setUp(self):
        super(TestSnapConfig, self).setUp()
        self.subp = util.subp
        self.new_root = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, self.new_root)

    def _get_cloud(self, distro, metadata=None):
        self.patchUtils(self.new_root)
        paths = helpers.Paths({})
        cls = distros.fetch(distro)
        mydist = cls(distro, {}, paths)
        myds = DataSourceNone.DataSourceNone({}, mydist, paths)
        if metadata:
            myds.metadata.update(metadata)
        return cloud.Cloud(myds, paths, {}, mydist, None)

    @mock.patch('cloudinit.util.write_file')
    @mock.patch('cloudinit.util.subp')
    def test_snap_config_add_assertions(self, msubp, mwrite):
        add_assertions(self.test_assertions)

        combined = "\n".join(self.test_assertions)
        mwrite.assert_any_call(ASSERTIONS_FILE, combined.encode('utf-8'))
        msubp.assert_called_with(['snap', 'ack', ASSERTIONS_FILE],
                                 capture=True)

    def test_snap_config_add_assertions_empty(self):
        self.assertRaises(ValueError, add_assertions, [])

    def test_add_assertions_nonlist(self):
        self.assertRaises(ValueError, add_assertions, {})

    @mock.patch('cloudinit.util.write_file')
    @mock.patch('cloudinit.util.subp')
    def test_snap_config_add_assertions_ack_fails(self, msubp, mwrite):
        msubp.side_effect = [util.ProcessExecutionError("Invalid assertion")]
        self.assertRaises(util.ProcessExecutionError, add_assertions,
                          self.test_assertions)

    @mock.patch('cloudinit.config.cc_snap_config.add_assertions')
    @mock.patch('cloudinit.config.cc_snap_config.util')
    def test_snap_config_handle_no_config(self, mock_util, mock_add):
        cfg = {}
        cc = self._get_cloud('ubuntu')
        cc.distro = mock.MagicMock()
        cc.distro.name = 'ubuntu'
        mock_util.which.return_value = None
        snap_handle('snap_config', cfg, cc, LOG, None)
        mock_add.assert_not_called()

    def test_snap_config_add_snap_user_no_config(self):
        usercfg = add_snap_user(cfg=None)
        self.assertIsNone(usercfg)

    def test_snap_config_add_snap_user_not_dict(self):
        cfg = ['foobar']
        self.assertRaises(ValueError, add_snap_user, cfg)

    def test_snap_config_add_snap_user_no_email(self):
        cfg = {'assertions': [], 'known': True}
        usercfg = add_snap_user(cfg=cfg)
        self.assertIsNone(usercfg)

    @mock.patch('cloudinit.config.cc_snap_config.util')
    def test_snap_config_add_snap_user_email_only(self, mock_util):
        email = 'janet@planetjanet.org'
        cfg = {'email': email}
        mock_util.which.return_value = None
        mock_util.system_is_snappy.return_value = True
        mock_util.subp.side_effect = [
            ("false\n", ""),  # snap managed
        ]

        usercfg = add_snap_user(cfg=cfg)

        self.assertEqual(usercfg, {'snapuser': email, 'known': False})

    @mock.patch('cloudinit.config.cc_snap_config.util')
    def test_snap_config_add_snap_user_email_known(self, mock_util):
        email = 'janet@planetjanet.org'
        known = True
        cfg = {'email': email, 'known': known}
        mock_util.which.return_value = None
        mock_util.system_is_snappy.return_value = True
        mock_util.subp.side_effect = [
            ("false\n", ""),  # snap managed
            (self.SYSTEM_USER_ASSERTION, ""),  # snap known system-user
        ]

        usercfg = add_snap_user(cfg=cfg)

        self.assertEqual(usercfg, {'snapuser': email, 'known': known})

    @mock.patch('cloudinit.config.cc_snap_config.add_assertions')
    @mock.patch('cloudinit.config.cc_snap_config.util')
    def test_snap_config_handle_system_not_snappy(self, mock_util, mock_add):
        cfg = {'snappy': {'assertions': self.test_assertions}}
        cc = self._get_cloud('ubuntu')
        cc.distro = mock.MagicMock()
        cc.distro.name = 'ubuntu'
        mock_util.which.return_value = None
        mock_util.system_is_snappy.return_value = False

        snap_handle('snap_config', cfg, cc, LOG, None)

        mock_add.assert_not_called()

    @mock.patch('cloudinit.config.cc_snap_config.add_assertions')
    @mock.patch('cloudinit.config.cc_snap_config.util')
    def test_snap_config_handle_snapuser(self, mock_util, mock_add):
        email = 'janet@planetjanet.org'
        cfg = {
            'snappy': {
                'assertions': self.test_assertions,
                'email': email,
            }
        }
        cc = self._get_cloud('ubuntu')
        cc.distro = mock.MagicMock()
        cc.distro.name = 'ubuntu'
        mock_util.which.return_value = None
        mock_util.system_is_snappy.return_value = True
        mock_util.subp.side_effect = [
            ("false\n", ""),  # snap managed
        ]

        snap_handle('snap_config', cfg, cc, LOG, None)

        mock_add.assert_called_with(self.test_assertions)
        usercfg = {'snapuser': email, 'known': False}
        cc.distro.create_user.assert_called_with(email, **usercfg)

    @mock.patch('cloudinit.config.cc_snap_config.add_assertions')
    @mock.patch('cloudinit.config.cc_snap_config.util')
    def test_snap_config_handle_snapuser_known(self, mock_util, mock_add):
        email = 'janet@planetjanet.org'
        cfg = {
            'snappy': {
                'assertions': self.test_assertions,
                'email': email,
                'known': True,
            }
        }
        cc = self._get_cloud('ubuntu')
        cc.distro = mock.MagicMock()
        cc.distro.name = 'ubuntu'
        mock_util.which.return_value = None
        mock_util.system_is_snappy.return_value = True
        mock_util.subp.side_effect = [
            ("false\n", ""),  # snap managed
            (self.SYSTEM_USER_ASSERTION, ""),  # snap known system-user
        ]

        snap_handle('snap_config', cfg, cc, LOG, None)

        mock_add.assert_called_with(self.test_assertions)
        usercfg = {'snapuser': email, 'known': True}
        cc.distro.create_user.assert_called_with(email, **usercfg)

    @mock.patch('cloudinit.config.cc_snap_config.add_assertions')
    @mock.patch('cloudinit.config.cc_snap_config.util')
    def test_snap_config_handle_snapuser_known_managed(self, mock_util,
                                                       mock_add):
        email = 'janet@planetjanet.org'
        cfg = {
            'snappy': {
                'assertions': self.test_assertions,
                'email': email,
                'known': True,
            }
        }
        cc = self._get_cloud('ubuntu')
        cc.distro = mock.MagicMock()
        cc.distro.name = 'ubuntu'
        mock_util.which.return_value = None
        mock_util.system_is_snappy.return_value = True
        mock_util.subp.side_effect = [
            ("true\n", ""),  # snap managed
        ]

        snap_handle('snap_config', cfg, cc, LOG, None)

        mock_add.assert_called_with(self.test_assertions)
        cc.distro.create_user.assert_not_called()

    @mock.patch('cloudinit.config.cc_snap_config.add_assertions')
    @mock.patch('cloudinit.config.cc_snap_config.util')
    def test_snap_config_handle_snapuser_known_no_assertion(self, mock_util,
                                                            mock_add):
        email = 'janet@planetjanet.org'
        cfg = {
            'snappy': {
                'assertions': [self.ACCOUNT_ASSERTION],
                'email': email,
                'known': True,
            }
        }
        cc = self._get_cloud('ubuntu')
        cc.distro = mock.MagicMock()
        cc.distro.name = 'ubuntu'
        mock_util.which.return_value = None
        mock_util.system_is_snappy.return_value = True
        mock_util.subp.side_effect = [
            ("true\n", ""),  # snap managed
            ("", ""),        # snap known system-user
        ]

        snap_handle('snap_config', cfg, cc, LOG, None)

        mock_add.assert_called_with([self.ACCOUNT_ASSERTION])
        cc.distro.create_user.assert_not_called()


def makeop_tmpd(tmpd, op, name, config=None, path=None, cfgfile=None):
    if cfgfile:
        cfgfile = os.path.sep.join([tmpd, cfgfile])
    if path:
        path = os.path.sep.join([tmpd, path])
    return(makeop(op=op, name=name, config=config, path=path, cfgfile=cfgfile))


def apply_patches(patches):
    ret = []
    for (ref, name, replace) in patches:
        if replace is None:
            continue
        orig = getattr(ref, name)
        setattr(ref, name, replace)
        ret.append((ref, name, orig))
    return ret

# vi: ts=4 expandtab