Blob Blame History Raw
# This file is part of cloud-init. See LICENSE file for license information.

from cloudinit import helpers
from cloudinit import util
from cloudinit.sources.DataSourceCloudStack import (
    DataSourceCloudStack, get_latest_lease)

from cloudinit.tests.helpers import CiTestCase, ExitStack, mock

import os
import time

MOD_PATH = 'cloudinit.sources.DataSourceCloudStack'
DS_PATH = MOD_PATH + '.DataSourceCloudStack'


class TestCloudStackPasswordFetching(CiTestCase):

    def setUp(self):
        super(TestCloudStackPasswordFetching, self).setUp()
        self.patches = ExitStack()
        self.addCleanup(self.patches.close)
        mod_name = MOD_PATH
        self.patches.enter_context(mock.patch('{0}.ec2'.format(mod_name)))
        self.patches.enter_context(mock.patch('{0}.uhelp'.format(mod_name)))
        default_gw = "192.201.20.0"
        get_latest_lease = mock.MagicMock(return_value=None)
        self.patches.enter_context(mock.patch(
            mod_name + '.get_latest_lease', get_latest_lease))

        get_default_gw = mock.MagicMock(return_value=default_gw)
        self.patches.enter_context(mock.patch(
            mod_name + '.get_default_gateway', get_default_gw))

        get_networkd_server_address = mock.MagicMock(return_value=None)
        self.patches.enter_context(mock.patch(
            mod_name + '.dhcp.networkd_get_option_from_leases',
            get_networkd_server_address))
        self.tmp = self.tmp_dir()

    def _set_password_server_response(self, response_string):
        subp = mock.MagicMock(return_value=(response_string, ''))
        self.patches.enter_context(
            mock.patch('cloudinit.sources.DataSourceCloudStack.subp.subp',
                       subp))
        return subp

    def test_empty_password_doesnt_create_config(self):
        self._set_password_server_response('')
        ds = DataSourceCloudStack(
            {}, None, helpers.Paths({'run_dir': self.tmp}))
        ds.get_data()
        self.assertEqual({}, ds.get_config_obj())

    def test_saved_password_doesnt_create_config(self):
        self._set_password_server_response('saved_password')
        ds = DataSourceCloudStack(
            {}, None, helpers.Paths({'run_dir': self.tmp}))
        ds.get_data()
        self.assertEqual({}, ds.get_config_obj())

    @mock.patch(DS_PATH + '.wait_for_metadata_service')
    def test_password_sets_password(self, m_wait):
        m_wait.return_value = True
        password = 'SekritSquirrel'
        self._set_password_server_response(password)
        ds = DataSourceCloudStack(
            {}, None, helpers.Paths({'run_dir': self.tmp}))
        ds.get_data()
        self.assertEqual(password, ds.get_config_obj()['password'])

    @mock.patch(DS_PATH + '.wait_for_metadata_service')
    def test_bad_request_doesnt_stop_ds_from_working(self, m_wait):
        m_wait.return_value = True
        self._set_password_server_response('bad_request')
        ds = DataSourceCloudStack(
            {}, None, helpers.Paths({'run_dir': self.tmp}))
        self.assertTrue(ds.get_data())

    def assertRequestTypesSent(self, subp, expected_request_types):
        request_types = []
        for call in subp.call_args_list:
            args = call[0][0]
            for arg in args:
                if arg.startswith('DomU_Request'):
                    request_types.append(arg.split()[1])
        self.assertEqual(expected_request_types, request_types)

    @mock.patch(DS_PATH + '.wait_for_metadata_service')
    def test_valid_response_means_password_marked_as_saved(self, m_wait):
        m_wait.return_value = True
        password = 'SekritSquirrel'
        subp = self._set_password_server_response(password)
        ds = DataSourceCloudStack(
            {}, None, helpers.Paths({'run_dir': self.tmp}))
        ds.get_data()
        self.assertRequestTypesSent(subp,
                                    ['send_my_password', 'saved_password'])

    def _check_password_not_saved_for(self, response_string):
        subp = self._set_password_server_response(response_string)
        ds = DataSourceCloudStack(
            {}, None, helpers.Paths({'run_dir': self.tmp}))
        with mock.patch(DS_PATH + '.wait_for_metadata_service') as m_wait:
            m_wait.return_value = True
            ds.get_data()
        self.assertRequestTypesSent(subp, ['send_my_password'])

    def test_password_not_saved_if_empty(self):
        self._check_password_not_saved_for('')

    def test_password_not_saved_if_already_saved(self):
        self._check_password_not_saved_for('saved_password')

    def test_password_not_saved_if_bad_request(self):
        self._check_password_not_saved_for('bad_request')


class TestGetLatestLease(CiTestCase):

    def _populate_dir_list(self, bdir, files):
        """populate_dir_list([(name, data), (name, data)])

        writes files to bdir, and updates timestamps to ensure
        that their mtime increases with each file."""

        start = int(time.time())
        for num, fname in enumerate(reversed(files)):
            fpath = os.path.sep.join((bdir, fname))
            util.write_file(fpath, fname.encode())
            os.utime(fpath, (start - num, start - num))

    def _pop_and_test(self, files, expected):
        lease_d = self.tmp_dir()
        self._populate_dir_list(lease_d, files)
        self.assertEqual(self.tmp_path(expected, lease_d),
                         get_latest_lease(lease_d))

    def test_skips_dhcpv6_files(self):
        """files started with dhclient6 should be skipped."""
        expected = "dhclient.lease"
        self._pop_and_test([expected, "dhclient6.lease"], expected)

    def test_selects_dhclient_dot_files(self):
        """files named dhclient.lease or dhclient.leases should be used.

        Ubuntu names files dhclient.eth0.leases dhclient6.leases and
        sometimes dhclient.leases."""
        self._pop_and_test(["dhclient.lease"], "dhclient.lease")
        self._pop_and_test(["dhclient.leases"], "dhclient.leases")

    def test_selects_dhclient_dash_files(self):
        """files named dhclient-lease or dhclient-leases should be used.

        Redhat/Centos names files with dhclient--eth0.lease (centos 7) or
        dhclient-eth0.leases (centos 6).
        """
        self._pop_and_test(["dhclient-eth0.lease"], "dhclient-eth0.lease")
        self._pop_and_test(["dhclient--eth0.lease"], "dhclient--eth0.lease")

    def test_ignores_by_extension(self):
        """only .lease or .leases file should be considered."""

        self._pop_and_test(["dhclient.lease", "dhclient.lease.bk",
                            "dhclient.lease-old", "dhclient.leaselease"],
                           "dhclient.lease")

    def test_selects_newest_matching(self):
        """If multiple files match, the newest written should be used."""
        lease_d = self.tmp_dir()
        valid_1 = "dhclient.leases"
        valid_2 = "dhclient.lease"
        valid_1_path = self.tmp_path(valid_1, lease_d)
        valid_2_path = self.tmp_path(valid_2, lease_d)

        self._populate_dir_list(lease_d, [valid_1, valid_2])
        self.assertEqual(valid_2_path, get_latest_lease(lease_d))

        # now update mtime on valid_2 to be older than valid_1 and re-check.
        mtime = int(os.path.getmtime(valid_1_path)) - 1
        os.utime(valid_2_path, (mtime, mtime))

        self.assertEqual(valid_1_path, get_latest_lease(lease_d))


# vi: ts=4 expandtab