Blob Blame History Raw
# This file is part of cloud-init. See LICENSE file for license information.

import sys

from cloudinit.config import cc_power_state_change as psc

from cloudinit.tests import helpers as t_help
from cloudinit.tests.helpers import mock


class TestLoadPowerState(t_help.TestCase):
    def test_no_config(self):
        # completely empty config should mean do nothing
        (cmd, _timeout, _condition) = psc.load_power_state({}, 'ubuntu')
        self.assertIsNone(cmd)

    def test_irrelevant_config(self):
        # no power_state field in config should return None for cmd
        (cmd, _timeout, _condition) = psc.load_power_state({'foo': 'bar'},
                                                           'ubuntu')
        self.assertIsNone(cmd)

    def test_invalid_mode(self):
        cfg = {'power_state': {'mode': 'gibberish'}}
        self.assertRaises(TypeError, psc.load_power_state, cfg, 'ubuntu')

        cfg = {'power_state': {'mode': ''}}
        self.assertRaises(TypeError, psc.load_power_state, cfg, 'ubuntu')

    def test_empty_mode(self):
        cfg = {'power_state': {'message': 'goodbye'}}
        self.assertRaises(TypeError, psc.load_power_state, cfg, 'ubuntu')

    def test_valid_modes(self):
        cfg = {'power_state': {}}
        for mode in ('halt', 'poweroff', 'reboot'):
            cfg['power_state']['mode'] = mode
            check_lps_ret(psc.load_power_state(cfg, 'ubuntu'), mode=mode)

    def test_invalid_delay(self):
        cfg = {'power_state': {'mode': 'poweroff', 'delay': 'goodbye'}}
        self.assertRaises(TypeError, psc.load_power_state, cfg, 'ubuntu')

    def test_valid_delay(self):
        cfg = {'power_state': {'mode': 'poweroff', 'delay': ''}}
        for delay in ("now", "+1", "+30"):
            cfg['power_state']['delay'] = delay
            check_lps_ret(psc.load_power_state(cfg, 'ubuntu'))

    def test_message_present(self):
        cfg = {'power_state': {'mode': 'poweroff', 'message': 'GOODBYE'}}
        ret = psc.load_power_state(cfg, 'ubuntu')
        check_lps_ret(psc.load_power_state(cfg, 'ubuntu'))
        self.assertIn(cfg['power_state']['message'], ret[0])

    def test_no_message(self):
        # if message is not present, then no argument should be passed for it
        cfg = {'power_state': {'mode': 'poweroff'}}
        (cmd, _timeout, _condition) = psc.load_power_state(cfg, 'ubuntu')
        self.assertNotIn("", cmd)
        check_lps_ret(psc.load_power_state(cfg, 'ubuntu'))
        self.assertTrue(len(cmd) == 3)

    def test_condition_null_raises(self):
        cfg = {'power_state': {'mode': 'poweroff', 'condition': None}}
        self.assertRaises(TypeError, psc.load_power_state, cfg, 'ubuntu')

    def test_condition_default_is_true(self):
        cfg = {'power_state': {'mode': 'poweroff'}}
        _cmd, _timeout, cond = psc.load_power_state(cfg, 'ubuntu')
        self.assertEqual(cond, True)


class TestCheckCondition(t_help.TestCase):
    def cmd_with_exit(self, rc):
        return([sys.executable, '-c', 'import sys; sys.exit(%s)' % rc])

    def test_true_is_true(self):
        self.assertEqual(psc.check_condition(True), True)

    def test_false_is_false(self):
        self.assertEqual(psc.check_condition(False), False)

    def test_cmd_exit_zero_true(self):
        self.assertEqual(psc.check_condition(self.cmd_with_exit(0)), True)

    def test_cmd_exit_one_false(self):
        self.assertEqual(psc.check_condition(self.cmd_with_exit(1)), False)

    def test_cmd_exit_nonzero_warns(self):
        mocklog = mock.Mock()
        self.assertEqual(
            psc.check_condition(self.cmd_with_exit(2), mocklog), False)
        self.assertEqual(mocklog.warning.call_count, 1)


def check_lps_ret(psc_return, mode=None):
    if len(psc_return) != 3:
        raise TypeError("length returned = %d" % len(psc_return))

    errs = []
    cmd = psc_return[0]
    timeout = psc_return[1]
    condition = psc_return[2]

    if 'shutdown' not in psc_return[0][0]:
        errs.append("string 'shutdown' not in cmd")

    if condition is None:
        errs.append("condition was not returned")

    if mode is not None:
        opt = {'halt': '-H', 'poweroff': '-P', 'reboot': '-r'}[mode]
        if opt not in psc_return[0]:
            errs.append("opt '%s' not in cmd: %s" % (opt, cmd))

    if len(cmd) != 3 and len(cmd) != 4:
        errs.append("Invalid command length: %s" % len(cmd))

    try:
        float(timeout)
    except Exception:
        errs.append("timeout failed convert to float")

    if len(errs):
        lines = ["Errors in result: %s" % str(psc_return)] + errs
        raise Exception('\n'.join(lines))

# vi: ts=4 expandtab