Blame tests/unittests/test_datasource/test_azure_helper.py

Packit bc9a3a
# This file is part of cloud-init. See LICENSE file for license information.
Packit bc9a3a
Packit bc9a3a
import os
Packit bc9a3a
import unittest2
Packit bc9a3a
from textwrap import dedent
Packit bc9a3a
Packit bc9a3a
from cloudinit.sources.helpers import azure as azure_helper
Packit bc9a3a
from cloudinit.tests.helpers import CiTestCase, ExitStack, mock, populate_dir
Packit bc9a3a
Packit bc9a3a
from cloudinit.util import load_file
Packit bc9a3a
from cloudinit.sources.helpers.azure import WALinuxAgentShim as wa_shim
Packit bc9a3a
Packit bc9a3a
GOAL_STATE_TEMPLATE = """\
Packit bc9a3a
Packit bc9a3a
Packit bc9a3a
 xsi:noNamespaceSchemaLocation="goalstate10.xsd">
Packit bc9a3a
  <Version>2012-11-30</Version>
Packit bc9a3a
  <Incarnation>{incarnation}</Incarnation>
Packit bc9a3a
  <Machine>
Packit bc9a3a
    <ExpectedState>Started</ExpectedState>
Packit bc9a3a
    <StopRolesDeadlineHint>300000</StopRolesDeadlineHint>
Packit bc9a3a
    <LBProbePorts>
Packit bc9a3a
      <Port>16001</Port>
Packit bc9a3a
    </LBProbePorts>
Packit bc9a3a
    <ExpectHealthReport>FALSE</ExpectHealthReport>
Packit bc9a3a
  </Machine>
Packit bc9a3a
  <Container>
Packit bc9a3a
    <ContainerId>{container_id}</ContainerId>
Packit bc9a3a
    <RoleInstanceList>
Packit bc9a3a
      <RoleInstance>
Packit bc9a3a
        <InstanceId>{instance_id}</InstanceId>
Packit bc9a3a
        <State>Started</State>
Packit bc9a3a
        <Configuration>
Packit bc9a3a
          <HostingEnvironmentConfig>
Packit bc9a3a
            http://100.86.192.70:80/...hostingEnvironmentConfig...
Packit bc9a3a
          </HostingEnvironmentConfig>
Packit bc9a3a
          <SharedConfig>http://100.86.192.70:80/..SharedConfig..</SharedConfig>
Packit bc9a3a
          <ExtensionsConfig>
Packit bc9a3a
            http://100.86.192.70:80/...extensionsConfig...
Packit bc9a3a
          </ExtensionsConfig>
Packit bc9a3a
          <FullConfig>http://100.86.192.70:80/...fullConfig...</FullConfig>
Packit bc9a3a
          <Certificates>{certificates_url}</Certificates>
Packit bc9a3a
          <ConfigName>68ce47.0.68ce47.0.utl-trusty--292258.1.xml</ConfigName>
Packit bc9a3a
        </Configuration>
Packit bc9a3a
      </RoleInstance>
Packit bc9a3a
    </RoleInstanceList>
Packit bc9a3a
  </Container>
Packit bc9a3a
</GoalState>
Packit bc9a3a
"""
Packit bc9a3a
Packit bc9a3a
Packit bc9a3a
class TestFindEndpoint(CiTestCase):
Packit bc9a3a
Packit bc9a3a
    def setUp(self):
Packit bc9a3a
        super(TestFindEndpoint, self).setUp()
Packit bc9a3a
        patches = ExitStack()
Packit bc9a3a
        self.addCleanup(patches.close)
Packit bc9a3a
Packit bc9a3a
        self.load_file = patches.enter_context(
Packit bc9a3a
            mock.patch.object(azure_helper.util, 'load_file'))
Packit bc9a3a
Packit bc9a3a
        self.dhcp_options = patches.enter_context(
Packit bc9a3a
            mock.patch.object(wa_shim, '_load_dhclient_json'))
Packit bc9a3a
Packit bc9a3a
        self.networkd_leases = patches.enter_context(
Packit bc9a3a
            mock.patch.object(wa_shim, '_networkd_get_value_from_leases'))
Packit bc9a3a
        self.networkd_leases.return_value = None
Packit bc9a3a
Packit bc9a3a
    def test_missing_file(self):
Packit bc9a3a
        """wa_shim find_endpoint uses default endpoint if leasefile not found
Packit bc9a3a
        """
Packit bc9a3a
        self.assertEqual(wa_shim.find_endpoint(), "168.63.129.16")
Packit bc9a3a
Packit bc9a3a
    def test_missing_special_azure_line(self):
Packit bc9a3a
        """wa_shim find_endpoint uses default endpoint if leasefile is found
Packit bc9a3a
        but does not contain DHCP Option 245 (whose value is the endpoint)
Packit bc9a3a
        """
Packit bc9a3a
        self.load_file.return_value = ''
Packit bc9a3a
        self.dhcp_options.return_value = {'eth0': {'key': 'value'}}
Packit bc9a3a
        self.assertEqual(wa_shim.find_endpoint(), "168.63.129.16")
Packit bc9a3a
Packit bc9a3a
    @staticmethod
Packit bc9a3a
    def _build_lease_content(encoded_address):
Packit bc9a3a
        endpoint = azure_helper._get_dhcp_endpoint_option_name()
Packit bc9a3a
        return '\n'.join([
Packit bc9a3a
            'lease {',
Packit bc9a3a
            ' interface "eth0";',
Packit bc9a3a
            ' option {0} {1};'.format(endpoint, encoded_address),
Packit bc9a3a
            '}'])
Packit bc9a3a
Packit bc9a3a
    def test_from_dhcp_client(self):
Packit bc9a3a
        self.dhcp_options.return_value = {"eth0": {"unknown_245": "5:4:3:2"}}
Packit bc9a3a
        self.assertEqual('5.4.3.2', wa_shim.find_endpoint(None))
Packit bc9a3a
Packit bc9a3a
    @mock.patch('cloudinit.sources.helpers.azure.util.is_FreeBSD')
Packit bc9a3a
    def test_latest_lease_used(self, m_is_freebsd):
Packit bc9a3a
        m_is_freebsd.return_value = False  # To avoid hitting load_file
Packit bc9a3a
        encoded_addresses = ['5:4:3:2', '4:3:2:1']
Packit bc9a3a
        file_content = '\n'.join([self._build_lease_content(encoded_address)
Packit bc9a3a
                                  for encoded_address in encoded_addresses])
Packit bc9a3a
        self.load_file.return_value = file_content
Packit bc9a3a
        self.assertEqual(encoded_addresses[-1].replace(':', '.'),
Packit bc9a3a
                         wa_shim.find_endpoint("foobar"))
Packit bc9a3a
Packit bc9a3a
Packit bc9a3a
class TestExtractIpAddressFromLeaseValue(CiTestCase):
Packit bc9a3a
Packit bc9a3a
    def test_hex_string(self):
Packit bc9a3a
        ip_address, encoded_address = '98.76.54.32', '62:4c:36:20'
Packit bc9a3a
        self.assertEqual(
Packit bc9a3a
            ip_address, wa_shim.get_ip_from_lease_value(encoded_address))
Packit bc9a3a
Packit bc9a3a
    def test_hex_string_with_single_character_part(self):
Packit bc9a3a
        ip_address, encoded_address = '4.3.2.1', '4:3:2:1'
Packit bc9a3a
        self.assertEqual(
Packit bc9a3a
            ip_address, wa_shim.get_ip_from_lease_value(encoded_address))
Packit bc9a3a
Packit bc9a3a
    def test_packed_string(self):
Packit bc9a3a
        ip_address, encoded_address = '98.76.54.32', 'bL6 '
Packit bc9a3a
        self.assertEqual(
Packit bc9a3a
            ip_address, wa_shim.get_ip_from_lease_value(encoded_address))
Packit bc9a3a
Packit bc9a3a
    def test_packed_string_with_escaped_quote(self):
Packit bc9a3a
        ip_address, encoded_address = '100.72.34.108', 'dH\\"l'
Packit bc9a3a
        self.assertEqual(
Packit bc9a3a
            ip_address, wa_shim.get_ip_from_lease_value(encoded_address))
Packit bc9a3a
Packit bc9a3a
    def test_packed_string_containing_a_colon(self):
Packit bc9a3a
        ip_address, encoded_address = '100.72.58.108', 'dH:l'
Packit bc9a3a
        self.assertEqual(
Packit bc9a3a
            ip_address, wa_shim.get_ip_from_lease_value(encoded_address))
Packit bc9a3a
Packit bc9a3a
Packit bc9a3a
class TestGoalStateParsing(CiTestCase):
Packit bc9a3a
Packit bc9a3a
    default_parameters = {
Packit bc9a3a
        'incarnation': 1,
Packit bc9a3a
        'container_id': 'MyContainerId',
Packit bc9a3a
        'instance_id': 'MyInstanceId',
Packit bc9a3a
        'certificates_url': 'MyCertificatesUrl',
Packit bc9a3a
    }
Packit bc9a3a
Packit bc9a3a
    def _get_goal_state(self, http_client=None, **kwargs):
Packit bc9a3a
        if http_client is None:
Packit bc9a3a
            http_client = mock.MagicMock()
Packit bc9a3a
        parameters = self.default_parameters.copy()
Packit bc9a3a
        parameters.update(kwargs)
Packit bc9a3a
        xml = GOAL_STATE_TEMPLATE.format(**parameters)
Packit bc9a3a
        if parameters['certificates_url'] is None:
Packit bc9a3a
            new_xml_lines = []
Packit bc9a3a
            for line in xml.splitlines():
Packit bc9a3a
                if 'Certificates' in line:
Packit bc9a3a
                    continue
Packit bc9a3a
                new_xml_lines.append(line)
Packit bc9a3a
            xml = '\n'.join(new_xml_lines)
Packit bc9a3a
        return azure_helper.GoalState(xml, http_client)
Packit bc9a3a
Packit bc9a3a
    def test_incarnation_parsed_correctly(self):
Packit bc9a3a
        incarnation = '123'
Packit bc9a3a
        goal_state = self._get_goal_state(incarnation=incarnation)
Packit bc9a3a
        self.assertEqual(incarnation, goal_state.incarnation)
Packit bc9a3a
Packit bc9a3a
    def test_container_id_parsed_correctly(self):
Packit bc9a3a
        container_id = 'TestContainerId'
Packit bc9a3a
        goal_state = self._get_goal_state(container_id=container_id)
Packit bc9a3a
        self.assertEqual(container_id, goal_state.container_id)
Packit bc9a3a
Packit bc9a3a
    def test_instance_id_parsed_correctly(self):
Packit bc9a3a
        instance_id = 'TestInstanceId'
Packit bc9a3a
        goal_state = self._get_goal_state(instance_id=instance_id)
Packit bc9a3a
        self.assertEqual(instance_id, goal_state.instance_id)
Packit bc9a3a
Packit bc9a3a
    def test_instance_id_byte_swap(self):
Packit bc9a3a
        """Return true when previous_iid is byteswapped current_iid"""
Packit bc9a3a
        previous_iid = "D0DF4C54-4ECB-4A4B-9954-5BDF3ED5C3B8"
Packit bc9a3a
        current_iid = "544CDFD0-CB4E-4B4A-9954-5BDF3ED5C3B8"
Packit bc9a3a
        self.assertTrue(
Packit bc9a3a
            azure_helper.is_byte_swapped(previous_iid, current_iid))
Packit bc9a3a
Packit bc9a3a
    def test_instance_id_no_byte_swap_same_instance_id(self):
Packit bc9a3a
        previous_iid = "D0DF4C54-4ECB-4A4B-9954-5BDF3ED5C3B8"
Packit bc9a3a
        current_iid = "D0DF4C54-4ECB-4A4B-9954-5BDF3ED5C3B8"
Packit bc9a3a
        self.assertFalse(
Packit bc9a3a
            azure_helper.is_byte_swapped(previous_iid, current_iid))
Packit bc9a3a
Packit bc9a3a
    def test_instance_id_no_byte_swap_diff_instance_id(self):
Packit bc9a3a
        previous_iid = "D0DF4C54-4ECB-4A4B-9954-5BDF3ED5C3B8"
Packit bc9a3a
        current_iid = "G0DF4C54-4ECB-4A4B-9954-5BDF3ED5C3B8"
Packit bc9a3a
        self.assertFalse(
Packit bc9a3a
            azure_helper.is_byte_swapped(previous_iid, current_iid))
Packit bc9a3a
Packit bc9a3a
    def test_certificates_xml_parsed_and_fetched_correctly(self):
Packit bc9a3a
        http_client = mock.MagicMock()
Packit bc9a3a
        certificates_url = 'TestCertificatesUrl'
Packit bc9a3a
        goal_state = self._get_goal_state(
Packit bc9a3a
            http_client=http_client, certificates_url=certificates_url)
Packit bc9a3a
        certificates_xml = goal_state.certificates_xml
Packit bc9a3a
        self.assertEqual(1, http_client.get.call_count)
Packit bc9a3a
        self.assertEqual(certificates_url, http_client.get.call_args[0][0])
Packit bc9a3a
        self.assertTrue(http_client.get.call_args[1].get('secure', False))
Packit bc9a3a
        self.assertEqual(http_client.get.return_value.contents,
Packit bc9a3a
                         certificates_xml)
Packit bc9a3a
Packit bc9a3a
    def test_missing_certificates_skips_http_get(self):
Packit bc9a3a
        http_client = mock.MagicMock()
Packit bc9a3a
        goal_state = self._get_goal_state(
Packit bc9a3a
            http_client=http_client, certificates_url=None)
Packit bc9a3a
        certificates_xml = goal_state.certificates_xml
Packit bc9a3a
        self.assertEqual(0, http_client.get.call_count)
Packit bc9a3a
        self.assertIsNone(certificates_xml)
Packit bc9a3a
Packit bc9a3a
Packit bc9a3a
class TestAzureEndpointHttpClient(CiTestCase):
Packit bc9a3a
Packit bc9a3a
    regular_headers = {
Packit bc9a3a
        'x-ms-agent-name': 'WALinuxAgent',
Packit bc9a3a
        'x-ms-version': '2012-11-30',
Packit bc9a3a
    }
Packit bc9a3a
Packit bc9a3a
    def setUp(self):
Packit bc9a3a
        super(TestAzureEndpointHttpClient, self).setUp()
Packit bc9a3a
        patches = ExitStack()
Packit bc9a3a
        self.addCleanup(patches.close)
Packit bc9a3a
Packit bc9a3a
        self.read_file_or_url = patches.enter_context(
Packit bc9a3a
            mock.patch.object(azure_helper.url_helper, 'read_file_or_url'))
Packit bc9a3a
Packit bc9a3a
    def test_non_secure_get(self):
Packit bc9a3a
        client = azure_helper.AzureEndpointHttpClient(mock.MagicMock())
Packit bc9a3a
        url = 'MyTestUrl'
Packit bc9a3a
        response = client.get(url, secure=False)
Packit bc9a3a
        self.assertEqual(1, self.read_file_or_url.call_count)
Packit bc9a3a
        self.assertEqual(self.read_file_or_url.return_value, response)
Packit bc9a3a
        self.assertEqual(
Packit bc9a3a
            mock.call(url, headers=self.regular_headers, retries=10,
Packit bc9a3a
                      timeout=5),
Packit bc9a3a
            self.read_file_or_url.call_args)
Packit bc9a3a
Packit bc9a3a
    def test_secure_get(self):
Packit bc9a3a
        url = 'MyTestUrl'
Packit bc9a3a
        certificate = mock.MagicMock()
Packit bc9a3a
        expected_headers = self.regular_headers.copy()
Packit bc9a3a
        expected_headers.update({
Packit bc9a3a
            "x-ms-cipher-name": "DES_EDE3_CBC",
Packit bc9a3a
            "x-ms-guest-agent-public-x509-cert": certificate,
Packit bc9a3a
        })
Packit bc9a3a
        client = azure_helper.AzureEndpointHttpClient(certificate)
Packit bc9a3a
        response = client.get(url, secure=True)
Packit bc9a3a
        self.assertEqual(1, self.read_file_or_url.call_count)
Packit bc9a3a
        self.assertEqual(self.read_file_or_url.return_value, response)
Packit bc9a3a
        self.assertEqual(
Packit bc9a3a
            mock.call(url, headers=expected_headers, retries=10,
Packit bc9a3a
                      timeout=5),
Packit bc9a3a
            self.read_file_or_url.call_args)
Packit bc9a3a
Packit bc9a3a
    def test_post(self):
Packit bc9a3a
        data = mock.MagicMock()
Packit bc9a3a
        url = 'MyTestUrl'
Packit bc9a3a
        client = azure_helper.AzureEndpointHttpClient(mock.MagicMock())
Packit bc9a3a
        response = client.post(url, data=data)
Packit bc9a3a
        self.assertEqual(1, self.read_file_or_url.call_count)
Packit bc9a3a
        self.assertEqual(self.read_file_or_url.return_value, response)
Packit bc9a3a
        self.assertEqual(
Packit bc9a3a
            mock.call(url, data=data, headers=self.regular_headers, retries=10,
Packit bc9a3a
                      timeout=5),
Packit bc9a3a
            self.read_file_or_url.call_args)
Packit bc9a3a
Packit bc9a3a
    def test_post_with_extra_headers(self):
Packit bc9a3a
        url = 'MyTestUrl'
Packit bc9a3a
        client = azure_helper.AzureEndpointHttpClient(mock.MagicMock())
Packit bc9a3a
        extra_headers = {'test': 'header'}
Packit bc9a3a
        client.post(url, extra_headers=extra_headers)
Packit bc9a3a
        self.assertEqual(1, self.read_file_or_url.call_count)
Packit bc9a3a
        expected_headers = self.regular_headers.copy()
Packit bc9a3a
        expected_headers.update(extra_headers)
Packit bc9a3a
        self.assertEqual(
Packit bc9a3a
            mock.call(mock.ANY, data=mock.ANY, headers=expected_headers,
Packit bc9a3a
                      retries=10, timeout=5),
Packit bc9a3a
            self.read_file_or_url.call_args)
Packit bc9a3a
Packit bc9a3a
Packit bc9a3a
class TestOpenSSLManager(CiTestCase):
Packit bc9a3a
Packit bc9a3a
    def setUp(self):
Packit bc9a3a
        super(TestOpenSSLManager, self).setUp()
Packit bc9a3a
        patches = ExitStack()
Packit bc9a3a
        self.addCleanup(patches.close)
Packit bc9a3a
Packit bc9a3a
        self.subp = patches.enter_context(
Packit bc9a3a
            mock.patch.object(azure_helper.util, 'subp'))
Packit bc9a3a
        try:
Packit bc9a3a
            self.open = patches.enter_context(
Packit bc9a3a
                mock.patch('__builtin__.open'))
Packit bc9a3a
        except ImportError:
Packit bc9a3a
            self.open = patches.enter_context(
Packit bc9a3a
                mock.patch('builtins.open'))
Packit bc9a3a
Packit bc9a3a
    @mock.patch.object(azure_helper, 'cd', mock.MagicMock())
Packit bc9a3a
    @mock.patch.object(azure_helper.temp_utils, 'mkdtemp')
Packit bc9a3a
    def test_openssl_manager_creates_a_tmpdir(self, mkdtemp):
Packit bc9a3a
        manager = azure_helper.OpenSSLManager()
Packit bc9a3a
        self.assertEqual(mkdtemp.return_value, manager.tmpdir)
Packit bc9a3a
Packit bc9a3a
    def test_generate_certificate_uses_tmpdir(self):
Packit bc9a3a
        subp_directory = {}
Packit bc9a3a
Packit bc9a3a
        def capture_directory(*args, **kwargs):
Packit bc9a3a
            subp_directory['path'] = os.getcwd()
Packit bc9a3a
Packit bc9a3a
        self.subp.side_effect = capture_directory
Packit bc9a3a
        manager = azure_helper.OpenSSLManager()
Packit bc9a3a
        self.assertEqual(manager.tmpdir, subp_directory['path'])
Packit bc9a3a
        manager.clean_up()
Packit bc9a3a
Packit bc9a3a
    @mock.patch.object(azure_helper, 'cd', mock.MagicMock())
Packit bc9a3a
    @mock.patch.object(azure_helper.temp_utils, 'mkdtemp', mock.MagicMock())
Packit bc9a3a
    @mock.patch.object(azure_helper.util, 'del_dir')
Packit bc9a3a
    def test_clean_up(self, del_dir):
Packit bc9a3a
        manager = azure_helper.OpenSSLManager()
Packit bc9a3a
        manager.clean_up()
Packit bc9a3a
        self.assertEqual([mock.call(manager.tmpdir)], del_dir.call_args_list)
Packit bc9a3a
Packit bc9a3a
Packit bc9a3a
class TestOpenSSLManagerActions(CiTestCase):
Packit bc9a3a
Packit bc9a3a
    def setUp(self):
Packit bc9a3a
        super(TestOpenSSLManagerActions, self).setUp()
Packit bc9a3a
Packit bc9a3a
        self.allowed_subp = True
Packit bc9a3a
Packit bc9a3a
    def _data_file(self, name):
Packit bc9a3a
        path = 'tests/data/azure'
Packit bc9a3a
        return os.path.join(path, name)
Packit bc9a3a
Packit bc9a3a
    @unittest2.skip("todo move to cloud_test")
Packit bc9a3a
    def test_pubkey_extract(self):
Packit bc9a3a
        cert = load_file(self._data_file('pubkey_extract_cert'))
Packit bc9a3a
        good_key = load_file(self._data_file('pubkey_extract_ssh_key'))
Packit bc9a3a
        sslmgr = azure_helper.OpenSSLManager()
Packit bc9a3a
        key = sslmgr._get_ssh_key_from_cert(cert)
Packit bc9a3a
        self.assertEqual(good_key, key)
Packit bc9a3a
Packit bc9a3a
        good_fingerprint = '073E19D14D1C799224C6A0FD8DDAB6A8BF27D473'
Packit bc9a3a
        fingerprint = sslmgr._get_fingerprint_from_cert(cert)
Packit bc9a3a
        self.assertEqual(good_fingerprint, fingerprint)
Packit bc9a3a
Packit bc9a3a
    @unittest2.skip("todo move to cloud_test")
Packit bc9a3a
    @mock.patch.object(azure_helper.OpenSSLManager, '_decrypt_certs_from_xml')
Packit bc9a3a
    def test_parse_certificates(self, mock_decrypt_certs):
Packit bc9a3a
        """Azure control plane puts private keys as well as certificates
Packit bc9a3a
           into the Certificates XML object. Make sure only the public keys
Packit bc9a3a
           from certs are extracted and that fingerprints are converted to
Packit bc9a3a
           the form specified in the ovf-env.xml file.
Packit bc9a3a
        """
Packit bc9a3a
        cert_contents = load_file(self._data_file('parse_certificates_pem'))
Packit bc9a3a
        fingerprints = load_file(self._data_file(
Packit bc9a3a
            'parse_certificates_fingerprints')
Packit bc9a3a
        ).splitlines()
Packit bc9a3a
        mock_decrypt_certs.return_value = cert_contents
Packit bc9a3a
        sslmgr = azure_helper.OpenSSLManager()
Packit bc9a3a
        keys_by_fp = sslmgr.parse_certificates('')
Packit bc9a3a
        for fp in keys_by_fp.keys():
Packit bc9a3a
            self.assertIn(fp, fingerprints)
Packit bc9a3a
        for fp in fingerprints:
Packit bc9a3a
            self.assertIn(fp, keys_by_fp)
Packit bc9a3a
Packit bc9a3a
Packit bc9a3a
class TestWALinuxAgentShim(CiTestCase):
Packit bc9a3a
Packit bc9a3a
    def setUp(self):
Packit bc9a3a
        super(TestWALinuxAgentShim, self).setUp()
Packit bc9a3a
        patches = ExitStack()
Packit bc9a3a
        self.addCleanup(patches.close)
Packit bc9a3a
Packit bc9a3a
        self.AzureEndpointHttpClient = patches.enter_context(
Packit bc9a3a
            mock.patch.object(azure_helper, 'AzureEndpointHttpClient'))
Packit bc9a3a
        self.find_endpoint = patches.enter_context(
Packit bc9a3a
            mock.patch.object(wa_shim, 'find_endpoint'))
Packit bc9a3a
        self.GoalState = patches.enter_context(
Packit bc9a3a
            mock.patch.object(azure_helper, 'GoalState'))
Packit bc9a3a
        self.OpenSSLManager = patches.enter_context(
Packit bc9a3a
            mock.patch.object(azure_helper, 'OpenSSLManager'))
Packit bc9a3a
        patches.enter_context(
Packit bc9a3a
            mock.patch.object(azure_helper.time, 'sleep', mock.MagicMock()))
Packit bc9a3a
Packit bc9a3a
    def test_http_client_uses_certificate(self):
Packit bc9a3a
        shim = wa_shim()
Packit bc9a3a
        shim.register_with_azure_and_fetch_data()
Packit bc9a3a
        self.assertEqual(
Packit bc9a3a
            [mock.call(self.OpenSSLManager.return_value.certificate)],
Packit bc9a3a
            self.AzureEndpointHttpClient.call_args_list)
Packit bc9a3a
Packit bc9a3a
    def test_correct_url_used_for_goalstate(self):
Packit bc9a3a
        self.find_endpoint.return_value = 'test_endpoint'
Packit bc9a3a
        shim = wa_shim()
Packit bc9a3a
        shim.register_with_azure_and_fetch_data()
Packit bc9a3a
        get = self.AzureEndpointHttpClient.return_value.get
Packit bc9a3a
        self.assertEqual(
Packit bc9a3a
            [mock.call('http://test_endpoint/machine/?comp=goalstate')],
Packit bc9a3a
            get.call_args_list)
Packit bc9a3a
        self.assertEqual(
Packit bc9a3a
            [mock.call(get.return_value.contents,
Packit bc9a3a
                       self.AzureEndpointHttpClient.return_value)],
Packit bc9a3a
            self.GoalState.call_args_list)
Packit bc9a3a
Packit bc9a3a
    def test_certificates_used_to_determine_public_keys(self):
Packit bc9a3a
        shim = wa_shim()
Packit bc9a3a
        """if register_with_azure_and_fetch_data() isn't passed some info about
Packit bc9a3a
           the user's public keys, there's no point in even trying to parse
Packit bc9a3a
           the certificates
Packit bc9a3a
        """
Packit bc9a3a
        mypk = [{'fingerprint': 'fp1', 'path': 'path1'},
Packit bc9a3a
                {'fingerprint': 'fp3', 'path': 'path3', 'value': ''}]
Packit bc9a3a
        certs = {'fp1': 'expected-key',
Packit bc9a3a
                 'fp2': 'should-not-be-found',
Packit bc9a3a
                 'fp3': 'expected-no-value-key',
Packit bc9a3a
                 }
Packit bc9a3a
        sslmgr = self.OpenSSLManager.return_value
Packit bc9a3a
        sslmgr.parse_certificates.return_value = certs
Packit bc9a3a
        data = shim.register_with_azure_and_fetch_data(pubkey_info=mypk)
Packit bc9a3a
        self.assertEqual(
Packit bc9a3a
            [mock.call(self.GoalState.return_value.certificates_xml)],
Packit bc9a3a
            sslmgr.parse_certificates.call_args_list)
Packit bc9a3a
        self.assertIn('expected-key', data['public-keys'])
Packit bc9a3a
        self.assertIn('expected-no-value-key', data['public-keys'])
Packit bc9a3a
        self.assertNotIn('should-not-be-found', data['public-keys'])
Packit bc9a3a
Packit bc9a3a
    def test_absent_certificates_produces_empty_public_keys(self):
Packit bc9a3a
        mypk = [{'fingerprint': 'fp1', 'path': 'path1'}]
Packit bc9a3a
        self.GoalState.return_value.certificates_xml = None
Packit bc9a3a
        shim = wa_shim()
Packit bc9a3a
        data = shim.register_with_azure_and_fetch_data(pubkey_info=mypk)
Packit bc9a3a
        self.assertEqual([], data['public-keys'])
Packit bc9a3a
Packit bc9a3a
    def test_correct_url_used_for_report_ready(self):
Packit bc9a3a
        self.find_endpoint.return_value = 'test_endpoint'
Packit bc9a3a
        shim = wa_shim()
Packit bc9a3a
        shim.register_with_azure_and_fetch_data()
Packit bc9a3a
        expected_url = 'http://test_endpoint/machine?comp=health'
Packit bc9a3a
        self.assertEqual(
Packit bc9a3a
            [mock.call(expected_url, data=mock.ANY, extra_headers=mock.ANY)],
Packit bc9a3a
            self.AzureEndpointHttpClient.return_value.post.call_args_list)
Packit bc9a3a
Packit bc9a3a
    def test_goal_state_values_used_for_report_ready(self):
Packit bc9a3a
        self.GoalState.return_value.incarnation = 'TestIncarnation'
Packit bc9a3a
        self.GoalState.return_value.container_id = 'TestContainerId'
Packit bc9a3a
        self.GoalState.return_value.instance_id = 'TestInstanceId'
Packit bc9a3a
        shim = wa_shim()
Packit bc9a3a
        shim.register_with_azure_and_fetch_data()
Packit bc9a3a
        posted_document = (
Packit bc9a3a
            self.AzureEndpointHttpClient.return_value.post.call_args[1]['data']
Packit bc9a3a
        )
Packit bc9a3a
        self.assertIn('TestIncarnation', posted_document)
Packit bc9a3a
        self.assertIn('TestContainerId', posted_document)
Packit bc9a3a
        self.assertIn('TestInstanceId', posted_document)
Packit bc9a3a
Packit bc9a3a
    def test_clean_up_can_be_called_at_any_time(self):
Packit bc9a3a
        shim = wa_shim()
Packit bc9a3a
        shim.clean_up()
Packit bc9a3a
Packit bc9a3a
    def test_clean_up_will_clean_up_openssl_manager_if_instantiated(self):
Packit bc9a3a
        shim = wa_shim()
Packit bc9a3a
        shim.register_with_azure_and_fetch_data()
Packit bc9a3a
        shim.clean_up()
Packit bc9a3a
        self.assertEqual(
Packit bc9a3a
            1, self.OpenSSLManager.return_value.clean_up.call_count)
Packit bc9a3a
Packit bc9a3a
    def test_failure_to_fetch_goalstate_bubbles_up(self):
Packit bc9a3a
        class SentinelException(Exception):
Packit bc9a3a
            pass
Packit bc9a3a
        self.AzureEndpointHttpClient.return_value.get.side_effect = (
Packit bc9a3a
            SentinelException)
Packit bc9a3a
        shim = wa_shim()
Packit bc9a3a
        self.assertRaises(SentinelException,
Packit bc9a3a
                          shim.register_with_azure_and_fetch_data)
Packit bc9a3a
Packit bc9a3a
Packit bc9a3a
class TestGetMetadataFromFabric(CiTestCase):
Packit bc9a3a
Packit bc9a3a
    @mock.patch.object(azure_helper, 'WALinuxAgentShim')
Packit bc9a3a
    def test_data_from_shim_returned(self, shim):
Packit bc9a3a
        ret = azure_helper.get_metadata_from_fabric()
Packit bc9a3a
        self.assertEqual(
Packit bc9a3a
            shim.return_value.register_with_azure_and_fetch_data.return_value,
Packit bc9a3a
            ret)
Packit bc9a3a
Packit bc9a3a
    @mock.patch.object(azure_helper, 'WALinuxAgentShim')
Packit bc9a3a
    def test_success_calls_clean_up(self, shim):
Packit bc9a3a
        azure_helper.get_metadata_from_fabric()
Packit bc9a3a
        self.assertEqual(1, shim.return_value.clean_up.call_count)
Packit bc9a3a
Packit bc9a3a
    @mock.patch.object(azure_helper, 'WALinuxAgentShim')
Packit bc9a3a
    def test_failure_in_registration_calls_clean_up(self, shim):
Packit bc9a3a
        class SentinelException(Exception):
Packit bc9a3a
            pass
Packit bc9a3a
        shim.return_value.register_with_azure_and_fetch_data.side_effect = (
Packit bc9a3a
            SentinelException)
Packit bc9a3a
        self.assertRaises(SentinelException,
Packit bc9a3a
                          azure_helper.get_metadata_from_fabric)
Packit bc9a3a
        self.assertEqual(1, shim.return_value.clean_up.call_count)
Packit bc9a3a
Packit bc9a3a
Packit bc9a3a
class TestExtractIpAddressFromNetworkd(CiTestCase):
Packit bc9a3a
Packit bc9a3a
    azure_lease = dedent("""\
Packit bc9a3a
    # This is private data. Do not parse.
Packit bc9a3a
    ADDRESS=10.132.0.5
Packit bc9a3a
    NETMASK=255.255.255.255
Packit bc9a3a
    ROUTER=10.132.0.1
Packit bc9a3a
    SERVER_ADDRESS=169.254.169.254
Packit bc9a3a
    NEXT_SERVER=10.132.0.1
Packit bc9a3a
    MTU=1460
Packit bc9a3a
    T1=43200
Packit bc9a3a
    T2=75600
Packit bc9a3a
    LIFETIME=86400
Packit bc9a3a
    DNS=169.254.169.254
Packit bc9a3a
    NTP=169.254.169.254
Packit bc9a3a
    DOMAINNAME=c.ubuntu-foundations.internal
Packit bc9a3a
    DOMAIN_SEARCH_LIST=c.ubuntu-foundations.internal google.internal
Packit bc9a3a
    HOSTNAME=tribaal-test-171002-1349.c.ubuntu-foundations.internal
Packit bc9a3a
    ROUTES=10.132.0.1/32,0.0.0.0 0.0.0.0/0,10.132.0.1
Packit bc9a3a
    CLIENTID=ff405663a200020000ab11332859494d7a8b4c
Packit bc9a3a
    OPTION_245=624c3620
Packit bc9a3a
    """)
Packit bc9a3a
Packit bc9a3a
    def setUp(self):
Packit bc9a3a
        super(TestExtractIpAddressFromNetworkd, self).setUp()
Packit bc9a3a
        self.lease_d = self.tmp_dir()
Packit bc9a3a
Packit bc9a3a
    def test_no_valid_leases_is_none(self):
Packit bc9a3a
        """No valid leases should return None."""
Packit bc9a3a
        self.assertIsNone(
Packit bc9a3a
            wa_shim._networkd_get_value_from_leases(self.lease_d))
Packit bc9a3a
Packit bc9a3a
    def test_option_245_is_found_in_single(self):
Packit bc9a3a
        """A single valid lease with 245 option should return it."""
Packit bc9a3a
        populate_dir(self.lease_d, {'9': self.azure_lease})
Packit bc9a3a
        self.assertEqual(
Packit bc9a3a
            '624c3620', wa_shim._networkd_get_value_from_leases(self.lease_d))
Packit bc9a3a
Packit bc9a3a
    def test_option_245_not_found_returns_None(self):
Packit bc9a3a
        """A valid lease, but no option 245 should return None."""
Packit bc9a3a
        populate_dir(
Packit bc9a3a
            self.lease_d,
Packit bc9a3a
            {'9': self.azure_lease.replace("OPTION_245", "OPTION_999")})
Packit bc9a3a
        self.assertIsNone(
Packit bc9a3a
            wa_shim._networkd_get_value_from_leases(self.lease_d))
Packit bc9a3a
Packit bc9a3a
    def test_multiple_returns_first(self):
Packit bc9a3a
        """Somewhat arbitrarily return the first address when multiple.
Packit bc9a3a
Packit bc9a3a
        Most important at the moment is that this is consistent behavior
Packit bc9a3a
        rather than changing randomly as in order of a dictionary."""
Packit bc9a3a
        myval = "624c3601"
Packit bc9a3a
        populate_dir(
Packit bc9a3a
            self.lease_d,
Packit bc9a3a
            {'9': self.azure_lease,
Packit bc9a3a
             '2': self.azure_lease.replace("624c3620", myval)})
Packit bc9a3a
        self.assertEqual(
Packit bc9a3a
            myval, wa_shim._networkd_get_value_from_leases(self.lease_d))
Packit bc9a3a
Packit bc9a3a
Packit bc9a3a
# vi: ts=4 expandtab