Blob Blame History Raw
# Copyright (C) 2020 Canonical Ltd.
#
# Author: Daniel Watkins <oddbloke@ubuntu.com>
#
# This file is part of cloud-init. See LICENSE file for license information.
"""Tests for cloudinit/distros/__init__.py"""

from unittest import mock

import pytest

from cloudinit.distros import _get_package_mirror_info, LDH_ASCII_CHARS


# Define a set of characters we would expect to be replaced
INVALID_URL_CHARS = [
    chr(x) for x in range(127) if chr(x) not in LDH_ASCII_CHARS
]
for separator in [":", ".", "/", "#", "?", "@", "[", "]"]:
    # Remove from the set characters that either separate hostname parts (":",
    # "."), terminate hostnames ("/", "#", "?", "@"), or cause Python to be
    # unable to parse URLs ("[", "]").
    INVALID_URL_CHARS.remove(separator)


class TestGetPackageMirrorInfo:
    """
    Tests for cloudinit.distros._get_package_mirror_info.

    These supplement the tests in tests/unittests/test_distros/test_generic.py
    which are more focused on testing a single production-like configuration.
    These tests are more focused on specific aspects of the unit under test.
    """

    @pytest.mark.parametrize('mirror_info,expected', [
        # Empty info gives empty return
        ({}, {}),
        # failsafe values used if present
        ({'failsafe': {'primary': 'http://value', 'security': 'http://other'}},
         {'primary': 'http://value', 'security': 'http://other'}),
        # search values used if present
        ({'search': {'primary': ['http://value'],
                     'security': ['http://other']}},
         {'primary': ['http://value'], 'security': ['http://other']}),
        # failsafe values used if search value not present
        ({'search': {'primary': ['http://value']},
          'failsafe': {'security': 'http://other'}},
         {'primary': ['http://value'], 'security': 'http://other'})
    ])
    def test_get_package_mirror_info_failsafe(self, mirror_info, expected):
        """
        Test the interaction between search and failsafe inputs

        (This doesn't test the case where the mirror_filter removes all search
        options; test_failsafe_used_if_all_search_results_filtered_out covers
        that.)
        """
        assert expected == _get_package_mirror_info(mirror_info,
                                                    mirror_filter=lambda x: x)

    def test_failsafe_used_if_all_search_results_filtered_out(self):
        """Test the failsafe option used if all search options eliminated."""
        mirror_info = {
            'search': {'primary': ['http://value']},
            'failsafe': {'primary': 'http://other'}
        }
        assert {'primary': 'http://other'} == _get_package_mirror_info(
            mirror_info, mirror_filter=lambda x: False)

    @pytest.mark.parametrize('allow_ec2_mirror, platform_type', [
        (True, 'ec2')
    ])
    @pytest.mark.parametrize('availability_zone,region,patterns,expected', (
        # Test ec2_region alone
        ('fk-fake-1f', None, ['http://EC2-%(ec2_region)s/ubuntu'],
         ['http://ec2-fk-fake-1/ubuntu']),
        # Test availability_zone alone
        ('fk-fake-1f', None, ['http://AZ-%(availability_zone)s/ubuntu'],
         ['http://az-fk-fake-1f/ubuntu']),
        # Test region alone
        (None, 'fk-fake-1', ['http://RG-%(region)s/ubuntu'],
         ['http://rg-fk-fake-1/ubuntu']),
        # Test that ec2_region is not available for non-matching AZs
        ('fake-fake-1f', None,
         ['http://EC2-%(ec2_region)s/ubuntu',
          'http://AZ-%(availability_zone)s/ubuntu'],
         ['http://az-fake-fake-1f/ubuntu']),
        # Test that template order maintained
        (None, 'fake-region',
         ['http://RG-%(region)s-2/ubuntu', 'http://RG-%(region)s-1/ubuntu'],
         ['http://rg-fake-region-2/ubuntu', 'http://rg-fake-region-1/ubuntu']),
        # Test that non-ASCII hostnames are IDNA encoded;
        # "IDNA-ТεЅТ̣".encode('idna') == b"xn--idna--4kd53hh6aba3q"
        (None, 'ТεЅТ̣', ['http://www.IDNA-%(region)s.com/ubuntu'],
         ['http://www.xn--idna--4kd53hh6aba3q.com/ubuntu']),
        # Test that non-ASCII hostnames with a port are IDNA encoded;
        # "IDNA-ТεЅТ̣".encode('idna') == b"xn--idna--4kd53hh6aba3q"
        (None, 'ТεЅТ̣', ['http://www.IDNA-%(region)s.com:8080/ubuntu'],
         ['http://www.xn--idna--4kd53hh6aba3q.com:8080/ubuntu']),
        # Test that non-ASCII non-hostname parts of URLs are unchanged
        (None, 'ТεЅТ̣', ['http://www.example.com/%(region)s/ubuntu'],
         ['http://www.example.com/ТεЅТ̣/ubuntu']),
        # Test that IPv4 addresses are unchanged
        (None, 'fk-fake-1', ['http://192.168.1.1:8080/%(region)s/ubuntu'],
         ['http://192.168.1.1:8080/fk-fake-1/ubuntu']),
        # Test that IPv6 addresses are unchanged
        (None, 'fk-fake-1',
         ['http://[2001:67c:1360:8001::23]/%(region)s/ubuntu'],
         ['http://[2001:67c:1360:8001::23]/fk-fake-1/ubuntu']),
        # Test that unparseable URLs are filtered out of the mirror list
        (None, 'inv[lid',
         ['http://%(region)s.in.hostname/should/be/filtered',
          'http://but.not.in.the.path/%(region)s'],
         ['http://but.not.in.the.path/inv[lid']),
        (None, '-some-region-',
         ['http://-lead-ing.%(region)s.trail-ing-.example.com/ubuntu'],
         ['http://lead-ing.some-region.trail-ing.example.com/ubuntu']),
    ) + tuple(
        # Dynamically generate a test case for each non-LDH
        # (Letters/Digits/Hyphen) ASCII character, testing that it is
        # substituted with a hyphen
        (None, 'fk{0}fake{0}1'.format(invalid_char),
         ['http://%(region)s/ubuntu'], ['http://fk-fake-1/ubuntu'])
        for invalid_char in INVALID_URL_CHARS
    ))
    def test_valid_substitution(self,
                                allow_ec2_mirror,
                                platform_type,
                                availability_zone,
                                region,
                                patterns,
                                expected):
        """Test substitution works as expected."""
        flag_path = "cloudinit.distros." \
                    "ALLOW_EC2_MIRRORS_ON_NON_AWS_INSTANCE_TYPES"

        m_data_source = mock.Mock(
            availability_zone=availability_zone,
            region=region,
            platform_type=platform_type
        )
        mirror_info = {'search': {'primary': patterns}}

        with mock.patch(flag_path, allow_ec2_mirror):
            ret = _get_package_mirror_info(
                mirror_info,
                data_source=m_data_source,
                mirror_filter=lambda x: x
            )
        print(allow_ec2_mirror)
        print(platform_type)
        print(availability_zone)
        print(region)
        print(patterns)
        print(expected)
        assert {'primary': expected} == ret