Blob Blame History Raw
# This file is part of cloud-init. See LICENSE file for license information.

import functools
import httpretty
import os
from unittest import mock

from cloudinit import helpers
from cloudinit.sources import DataSourceAliYun as ay
from cloudinit.tests import helpers as test_helpers

DEFAULT_METADATA = {
    'instance-id': 'aliyun-test-vm-00',
    'eipv4': '10.0.0.1',
    'hostname': 'test-hostname',
    'image-id': 'm-test',
    'launch-index': '0',
    'mac': '00:16:3e:00:00:00',
    'network-type': 'vpc',
    'private-ipv4': '192.168.0.1',
    'serial-number': 'test-string',
    'vpc-cidr-block': '192.168.0.0/16',
    'vpc-id': 'test-vpc',
    'vswitch-id': 'test-vpc',
    'vswitch-cidr-block': '192.168.0.0/16',
    'zone-id': 'test-zone-1',
    'ntp-conf': {'ntp_servers': [
                 'ntp1.aliyun.com',
                 'ntp2.aliyun.com',
                 'ntp3.aliyun.com']},
    'source-address': ['http://mirrors.aliyun.com',
                       'http://mirrors.aliyuncs.com'],
    'public-keys': {'key-pair-1': {'openssh-key': 'ssh-rsa AAAAB3...'},
                    'key-pair-2': {'openssh-key': 'ssh-rsa AAAAB3...'}}
}

DEFAULT_USERDATA = """\
#cloud-config

hostname: localhost"""


def register_mock_metaserver(base_url, data):
    def register_helper(register, base_url, body):
        if isinstance(body, str):
            register(base_url, body)
        elif isinstance(body, list):
            register(base_url.rstrip('/'), '\n'.join(body) + '\n')
        elif isinstance(body, dict):
            if not body:
                register(base_url.rstrip('/') + '/', 'not found',
                         status_code=404)
            vals = []
            for k, v in body.items():
                if isinstance(v, (str, list)):
                    suffix = k.rstrip('/')
                else:
                    suffix = k.rstrip('/') + '/'
                vals.append(suffix)
                url = base_url.rstrip('/') + '/' + suffix
                register_helper(register, url, v)
            register(base_url, '\n'.join(vals) + '\n')

    register = functools.partial(httpretty.register_uri, httpretty.GET)
    register_helper(register, base_url, data)


class TestAliYunDatasource(test_helpers.HttprettyTestCase):
    def setUp(self):
        super(TestAliYunDatasource, self).setUp()
        cfg = {'datasource': {'AliYun': {'timeout': '1', 'max_wait': '1'}}}
        distro = {}
        paths = helpers.Paths({'run_dir': self.tmp_dir()})
        self.ds = ay.DataSourceAliYun(cfg, distro, paths)
        self.metadata_address = self.ds.metadata_urls[0]

    @property
    def default_metadata(self):
        return DEFAULT_METADATA

    @property
    def default_userdata(self):
        return DEFAULT_USERDATA

    @property
    def metadata_url(self):
        return os.path.join(
            self.metadata_address,
            self.ds.min_metadata_version, 'meta-data') + '/'

    @property
    def userdata_url(self):
        return os.path.join(
            self.metadata_address,
            self.ds.min_metadata_version, 'user-data')

    # EC2 provides an instance-identity document which must return 404 here
    # for this test to pass.
    @property
    def default_identity(self):
        return {}

    @property
    def identity_url(self):
        return os.path.join(self.metadata_address,
                            self.ds.min_metadata_version,
                            'dynamic', 'instance-identity')

    def regist_default_server(self):
        register_mock_metaserver(self.metadata_url, self.default_metadata)
        register_mock_metaserver(self.userdata_url, self.default_userdata)
        register_mock_metaserver(self.identity_url, self.default_identity)

    def _test_get_data(self):
        self.assertEqual(self.ds.metadata, self.default_metadata)
        self.assertEqual(self.ds.userdata_raw,
                         self.default_userdata.encode('utf8'))

    def _test_get_sshkey(self):
        pub_keys = [v['openssh-key'] for (_, v) in
                    self.default_metadata['public-keys'].items()]
        self.assertEqual(self.ds.get_public_ssh_keys(), pub_keys)

    def _test_get_iid(self):
        self.assertEqual(self.default_metadata['instance-id'],
                         self.ds.get_instance_id())

    def _test_host_name(self):
        self.assertEqual(self.default_metadata['hostname'],
                         self.ds.get_hostname())

    @mock.patch("cloudinit.sources.DataSourceAliYun._is_aliyun")
    def test_with_mock_server(self, m_is_aliyun):
        m_is_aliyun.return_value = True
        self.regist_default_server()
        ret = self.ds.get_data()
        self.assertEqual(True, ret)
        self.assertEqual(1, m_is_aliyun.call_count)
        self._test_get_data()
        self._test_get_sshkey()
        self._test_get_iid()
        self._test_host_name()
        self.assertEqual('aliyun', self.ds.cloud_name)
        self.assertEqual('ec2', self.ds.platform)
        self.assertEqual(
            'metadata (http://100.100.100.200)', self.ds.subplatform)

    @mock.patch("cloudinit.sources.DataSourceAliYun._is_aliyun")
    def test_returns_false_when_not_on_aliyun(self, m_is_aliyun):
        """If is_aliyun returns false, then get_data should return False."""
        m_is_aliyun.return_value = False
        self.regist_default_server()
        ret = self.ds.get_data()
        self.assertEqual(1, m_is_aliyun.call_count)
        self.assertEqual(False, ret)

    def test_parse_public_keys(self):
        public_keys = {}
        self.assertEqual(ay.parse_public_keys(public_keys), [])

        public_keys = {'key-pair-0': 'ssh-key-0'}
        self.assertEqual(ay.parse_public_keys(public_keys),
                         [public_keys['key-pair-0']])

        public_keys = {'key-pair-0': 'ssh-key-0', 'key-pair-1': 'ssh-key-1'}
        self.assertEqual(set(ay.parse_public_keys(public_keys)),
                         set([public_keys['key-pair-0'],
                             public_keys['key-pair-1']]))

        public_keys = {'key-pair-0': ['ssh-key-0', 'ssh-key-1']}
        self.assertEqual(ay.parse_public_keys(public_keys),
                         public_keys['key-pair-0'])

        public_keys = {'key-pair-0': {'openssh-key': []}}
        self.assertEqual(ay.parse_public_keys(public_keys), [])

        public_keys = {'key-pair-0': {'openssh-key': 'ssh-key-0'}}
        self.assertEqual(ay.parse_public_keys(public_keys),
                         [public_keys['key-pair-0']['openssh-key']])

        public_keys = {'key-pair-0': {'openssh-key': ['ssh-key-0',
                                                      'ssh-key-1']}}
        self.assertEqual(ay.parse_public_keys(public_keys),
                         public_keys['key-pair-0']['openssh-key'])


class TestIsAliYun(test_helpers.CiTestCase):
    ALIYUN_PRODUCT = 'Alibaba Cloud ECS'
    read_dmi_data_expected = [mock.call('system-product-name')]

    @mock.patch("cloudinit.sources.DataSourceAliYun.util.read_dmi_data")
    def test_true_on_aliyun_product(self, m_read_dmi_data):
        """Should return true if the dmi product data has expected value."""
        m_read_dmi_data.return_value = self.ALIYUN_PRODUCT
        ret = ay._is_aliyun()
        self.assertEqual(self.read_dmi_data_expected,
                         m_read_dmi_data.call_args_list)
        self.assertEqual(True, ret)

    @mock.patch("cloudinit.sources.DataSourceAliYun.util.read_dmi_data")
    def test_false_on_empty_string(self, m_read_dmi_data):
        """Should return false on empty value returned."""
        m_read_dmi_data.return_value = ""
        ret = ay._is_aliyun()
        self.assertEqual(self.read_dmi_data_expected,
                         m_read_dmi_data.call_args_list)
        self.assertEqual(False, ret)

    @mock.patch("cloudinit.sources.DataSourceAliYun.util.read_dmi_data")
    def test_false_on_unknown_string(self, m_read_dmi_data):
        """Should return false on an unrelated string."""
        m_read_dmi_data.return_value = "cubs win"
        ret = ay._is_aliyun()
        self.assertEqual(self.read_dmi_data_expected,
                         m_read_dmi_data.call_args_list)
        self.assertEqual(False, ret)

# vi: ts=4 expandtab