# Copyright (C) 2013 Canonical Ltd.
# Copyright 2019 Joyent, Inc.
#
# Author: Ben Howard <ben.howard@canonical.com>
#
# This file is part of cloud-init. See LICENSE file for license information.
'''This is a testcase for the SmartOS datasource.
It replicates a serial console and acts like the SmartOS console does in
order to validate return responses.
'''
from binascii import crc32
import json
import multiprocessing
import os
import os.path
import re
import signal
import stat
import unittest
import uuid
from cloudinit import serial
from cloudinit.sources import DataSourceSmartOS
from cloudinit.sources.DataSourceSmartOS import (
convert_smartos_network_data as convert_net,
SMARTOS_ENV_KVM, SERIAL_DEVICE, get_smartos_environ,
identify_file)
from cloudinit.event import EventType
from cloudinit import helpers as c_helpers
from cloudinit.util import (b64e, write_file)
from cloudinit.subp import (subp, ProcessExecutionError, which)
from cloudinit.tests.helpers import (
CiTestCase, mock, FilesystemMockingTestCase, skipIf)
try:
import serial as _pyserial
assert _pyserial # avoid pyflakes error F401: import unused
HAS_PYSERIAL = True
except ImportError:
HAS_PYSERIAL = False
DSMOS = 'cloudinit.sources.DataSourceSmartOS'
SDC_NICS = json.loads("""
[
{
"nic_tag": "external",
"primary": true,
"mtu": 1500,
"model": "virtio",
"gateway": "8.12.42.1",
"netmask": "255.255.255.0",
"ip": "8.12.42.102",
"network_uuid": "992fc7ce-6aac-4b74-aed6-7b9d2c6c0bfe",
"gateways": [
"8.12.42.1"
],
"vlan_id": 324,
"mac": "90:b8:d0:f5:e4:f5",
"interface": "net0",
"ips": [
"8.12.42.102/24"
]
},
{
"nic_tag": "sdc_overlay/16187209",
"gateway": "192.168.128.1",
"model": "virtio",
"mac": "90:b8:d0:a5:ff:cd",
"netmask": "255.255.252.0",
"ip": "192.168.128.93",
"network_uuid": "4cad71da-09bc-452b-986d-03562a03a0a9",
"gateways": [
"192.168.128.1"
],
"vlan_id": 2,
"mtu": 8500,
"interface": "net1",
"ips": [
"192.168.128.93/22"
]
}
]
""")
SDC_NICS_ALT = json.loads("""
[
{
"interface": "net0",
"mac": "90:b8:d0:ae:64:51",
"vlan_id": 324,
"nic_tag": "external",
"gateway": "8.12.42.1",
"gateways": [
"8.12.42.1"
],
"netmask": "255.255.255.0",
"ip": "8.12.42.51",
"ips": [
"8.12.42.51/24"
],
"network_uuid": "992fc7ce-6aac-4b74-aed6-7b9d2c6c0bfe",
"model": "virtio",
"mtu": 1500,
"primary": true
},
{
"interface": "net1",
"mac": "90:b8:d0:bd:4f:9c",
"vlan_id": 600,
"nic_tag": "internal",
"netmask": "255.255.255.0",
"ip": "10.210.1.217",
"ips": [
"10.210.1.217/24"
],
"network_uuid": "98657fdf-11f4-4ee2-88a4-ce7fe73e33a6",
"model": "virtio",
"mtu": 1500
}
]
""")
SDC_NICS_DHCP = json.loads("""
[
{
"interface": "net0",
"mac": "90:b8:d0:ae:64:51",
"vlan_id": 324,
"nic_tag": "external",
"gateway": "8.12.42.1",
"gateways": [
"8.12.42.1"
],
"netmask": "255.255.255.0",
"ip": "8.12.42.51",
"ips": [
"8.12.42.51/24"
],
"network_uuid": "992fc7ce-6aac-4b74-aed6-7b9d2c6c0bfe",
"model": "virtio",
"mtu": 1500,
"primary": true
},
{
"interface": "net1",
"mac": "90:b8:d0:bd:4f:9c",
"vlan_id": 600,
"nic_tag": "internal",
"netmask": "255.255.255.0",
"ip": "10.210.1.217",
"ips": [
"dhcp"
],
"network_uuid": "98657fdf-11f4-4ee2-88a4-ce7fe73e33a6",
"model": "virtio",
"mtu": 1500
}
]
""")
SDC_NICS_MIP = json.loads("""
[
{
"interface": "net0",
"mac": "90:b8:d0:ae:64:51",
"vlan_id": 324,
"nic_tag": "external",
"gateway": "8.12.42.1",
"gateways": [
"8.12.42.1"
],
"netmask": "255.255.255.0",
"ip": "8.12.42.51",
"ips": [
"8.12.42.51/24",
"8.12.42.52/24"
],
"network_uuid": "992fc7ce-6aac-4b74-aed6-7b9d2c6c0bfe",
"model": "virtio",
"mtu": 1500,
"primary": true
},
{
"interface": "net1",
"mac": "90:b8:d0:bd:4f:9c",
"vlan_id": 600,
"nic_tag": "internal",
"netmask": "255.255.255.0",
"ip": "10.210.1.217",
"ips": [
"10.210.1.217/24",
"10.210.1.151/24"
],
"network_uuid": "98657fdf-11f4-4ee2-88a4-ce7fe73e33a6",
"model": "virtio",
"mtu": 1500
}
]
""")
SDC_NICS_MIP_IPV6 = json.loads("""
[
{
"interface": "net0",
"mac": "90:b8:d0:ae:64:51",
"vlan_id": 324,
"nic_tag": "external",
"gateway": "8.12.42.1",
"gateways": [
"8.12.42.1"
],
"netmask": "255.255.255.0",
"ip": "8.12.42.51",
"ips": [
"2001:4800:78ff:1b:be76:4eff:fe06:96b3/64",
"8.12.42.51/24"
],
"network_uuid": "992fc7ce-6aac-4b74-aed6-7b9d2c6c0bfe",
"model": "virtio",
"mtu": 1500,
"primary": true
},
{
"interface": "net1",
"mac": "90:b8:d0:bd:4f:9c",
"vlan_id": 600,
"nic_tag": "internal",
"netmask": "255.255.255.0",
"ip": "10.210.1.217",
"ips": [
"10.210.1.217/24"
],
"network_uuid": "98657fdf-11f4-4ee2-88a4-ce7fe73e33a6",
"model": "virtio",
"mtu": 1500
}
]
""")
SDC_NICS_IPV4_IPV6 = json.loads("""
[
{
"interface": "net0",
"mac": "90:b8:d0:ae:64:51",
"vlan_id": 324,
"nic_tag": "external",
"gateway": "8.12.42.1",
"gateways": ["8.12.42.1", "2001::1", "2001::2"],
"netmask": "255.255.255.0",
"ip": "8.12.42.51",
"ips": ["2001::10/64", "8.12.42.51/24", "2001::11/64",
"8.12.42.52/32"],
"network_uuid": "992fc7ce-6aac-4b74-aed6-7b9d2c6c0bfe",
"model": "virtio",
"mtu": 1500,
"primary": true
},
{
"interface": "net1",
"mac": "90:b8:d0:bd:4f:9c",
"vlan_id": 600,
"nic_tag": "internal",
"netmask": "255.255.255.0",
"ip": "10.210.1.217",
"ips": ["10.210.1.217/24"],
"gateways": ["10.210.1.210"],
"network_uuid": "98657fdf-11f4-4ee2-88a4-ce7fe73e33a6",
"model": "virtio",
"mtu": 1500
}
]
""")
SDC_NICS_SINGLE_GATEWAY = json.loads("""
[
{
"interface":"net0",
"mac":"90:b8:d0:d8:82:b4",
"vlan_id":324,
"nic_tag":"external",
"gateway":"8.12.42.1",
"gateways":["8.12.42.1"],
"netmask":"255.255.255.0",
"ip":"8.12.42.26",
"ips":["8.12.42.26/24"],
"network_uuid":"992fc7ce-6aac-4b74-aed6-7b9d2c6c0bfe",
"model":"virtio",
"mtu":1500,
"primary":true
},
{
"interface":"net1",
"mac":"90:b8:d0:0a:51:31",
"vlan_id":600,
"nic_tag":"internal",
"netmask":"255.255.255.0",
"ip":"10.210.1.27",
"ips":["10.210.1.27/24"],
"network_uuid":"98657fdf-11f4-4ee2-88a4-ce7fe73e33a6",
"model":"virtio",
"mtu":1500
}
]
""")
MOCK_RETURNS = {
'hostname': 'test-host',
'root_authorized_keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname',
'disable_iptables_flag': None,
'enable_motd_sys_info': None,
'test-var1': 'some data',
'cloud-init:user-data': '\n'.join(['#!/bin/sh', '/bin/true', '']),
'sdc:datacenter_name': 'somewhere2',
'sdc:operator-script': '\n'.join(['bin/true', '']),
'sdc:uuid': str(uuid.uuid4()),
'sdc:vendor-data': '\n'.join(['VENDOR_DATA', '']),
'user-data': '\n'.join(['something', '']),
'user-script': '\n'.join(['/bin/true', '']),
'sdc:nics': json.dumps(SDC_NICS),
}
DMI_DATA_RETURN = 'smartdc'
# Useful for calculating the length of a frame body. A SUCCESS body will be
# followed by more characters or be one character less if SUCCESS with no
# payload. See Section 4.3 of https://eng.joyent.com/mdata/protocol.html.
SUCCESS_LEN = len('0123abcd SUCCESS ')
NOTFOUND_LEN = len('0123abcd NOTFOUND')
class PsuedoJoyentClient(object):
def __init__(self, data=None):
if data is None:
data = MOCK_RETURNS.copy()
self.data = data
self._is_open = False
return
def get(self, key, default=None, strip=False):
if key in self.data:
r = self.data[key]
if strip:
r = r.strip()
else:
r = default
return r
def get_json(self, key, default=None):
result = self.get(key, default=default)
if result is None:
return default
return json.loads(result)
def exists(self):
return True
def open_transport(self):
assert(not self._is_open)
self._is_open = True
def close_transport(self):
assert(self._is_open)
self._is_open = False
class TestSmartOSDataSource(FilesystemMockingTestCase):
jmc_cfact = None
get_smartos_environ = None
def setUp(self):
super(TestSmartOSDataSource, self).setUp()
self.add_patch(DSMOS + ".get_smartos_environ", "get_smartos_environ")
self.add_patch(DSMOS + ".jmc_client_factory", "jmc_cfact")
self.legacy_user_d = self.tmp_path('legacy_user_tmp')
os.mkdir(self.legacy_user_d)
self.add_patch(DSMOS + ".LEGACY_USER_D", "m_legacy_user_d",
autospec=False, new=self.legacy_user_d)
self.add_patch(DSMOS + ".identify_file", "m_identify_file",
return_value="text/plain")
def _get_ds(self, mockdata=None, mode=DataSourceSmartOS.SMARTOS_ENV_KVM,
sys_cfg=None, ds_cfg=None):
self.jmc_cfact.return_value = PsuedoJoyentClient(mockdata)
self.get_smartos_environ.return_value = mode
tmpd = self.tmp_dir()
dirs = {'cloud_dir': self.tmp_path('cloud_dir', tmpd),
'run_dir': self.tmp_path('run_dir')}
for d in dirs.values():
os.mkdir(d)
paths = c_helpers.Paths(dirs)
if sys_cfg is None:
sys_cfg = {}
if ds_cfg is not None:
sys_cfg['datasource'] = sys_cfg.get('datasource', {})
sys_cfg['datasource']['SmartOS'] = ds_cfg
return DataSourceSmartOS.DataSourceSmartOS(
sys_cfg, distro=None, paths=paths)
def test_no_base64(self):
ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True}
dsrc = self._get_ds(ds_cfg=ds_cfg)
ret = dsrc.get_data()
self.assertTrue(ret)
def test_uuid(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertEqual(MOCK_RETURNS['sdc:uuid'],
dsrc.metadata['instance-id'])
def test_platform_info(self):
"""All platform-related attributes are properly set."""
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
self.assertEqual('joyent', dsrc.cloud_name)
self.assertEqual('joyent', dsrc.platform_type)
self.assertEqual('serial (/dev/ttyS1)', dsrc.subplatform)
def test_root_keys(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertEqual(MOCK_RETURNS['root_authorized_keys'],
dsrc.metadata['public-keys'])
def test_hostname_b64(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertEqual(MOCK_RETURNS['hostname'],
dsrc.metadata['local-hostname'])
def test_hostname(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertEqual(MOCK_RETURNS['hostname'],
dsrc.metadata['local-hostname'])
def test_hostname_if_no_sdc_hostname(self):
my_returns = MOCK_RETURNS.copy()
my_returns['sdc:hostname'] = 'sdc-' + my_returns['hostname']
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertEqual(my_returns['hostname'],
dsrc.metadata['local-hostname'])
def test_sdc_hostname_if_no_hostname(self):
my_returns = MOCK_RETURNS.copy()
my_returns['sdc:hostname'] = 'sdc-' + my_returns['hostname']
del my_returns['hostname']
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertEqual(my_returns['sdc:hostname'],
dsrc.metadata['local-hostname'])
def test_sdc_uuid_if_no_hostname_or_sdc_hostname(self):
my_returns = MOCK_RETURNS.copy()
del my_returns['hostname']
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertEqual(my_returns['sdc:uuid'],
dsrc.metadata['local-hostname'])
def test_userdata(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertEqual(MOCK_RETURNS['user-data'],
dsrc.metadata['legacy-user-data'])
self.assertEqual(MOCK_RETURNS['cloud-init:user-data'],
dsrc.userdata_raw)
def test_sdc_nics(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertEqual(json.loads(MOCK_RETURNS['sdc:nics']),
dsrc.metadata['network-data'])
def test_sdc_scripts(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertEqual(MOCK_RETURNS['user-script'],
dsrc.metadata['user-script'])
legacy_script_f = "%s/user-script" % self.legacy_user_d
print("legacy_script_f=%s" % legacy_script_f)
self.assertTrue(os.path.exists(legacy_script_f))
self.assertTrue(os.path.islink(legacy_script_f))
user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:]
self.assertEqual(user_script_perm, '700')
def test_scripts_shebanged(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertEqual(MOCK_RETURNS['user-script'],
dsrc.metadata['user-script'])
legacy_script_f = "%s/user-script" % self.legacy_user_d
self.assertTrue(os.path.exists(legacy_script_f))
self.assertTrue(os.path.islink(legacy_script_f))
shebang = None
with open(legacy_script_f, 'r') as f:
shebang = f.readlines()[0].strip()
self.assertEqual(shebang, "#!/bin/bash")
user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:]
self.assertEqual(user_script_perm, '700')
def test_scripts_shebang_not_added(self):
"""
Test that the SmartOS requirement that plain text scripts
are executable. This test makes sure that plain texts scripts
with out file magic have it added appropriately by cloud-init.
"""
my_returns = MOCK_RETURNS.copy()
my_returns['user-script'] = '\n'.join(['#!/usr/bin/perl',
'print("hi")', ''])
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertEqual(my_returns['user-script'],
dsrc.metadata['user-script'])
legacy_script_f = "%s/user-script" % self.legacy_user_d
self.assertTrue(os.path.exists(legacy_script_f))
self.assertTrue(os.path.islink(legacy_script_f))
shebang = None
with open(legacy_script_f, 'r') as f:
shebang = f.readlines()[0].strip()
self.assertEqual(shebang, "#!/usr/bin/perl")
def test_userdata_removed(self):
"""
User-data in the SmartOS world is supposed to be written to a file
each and every boot. This tests to make sure that in the event the
legacy user-data is removed, the existing user-data is backed-up
and there is no /var/db/user-data left.
"""
user_data_f = "%s/mdata-user-data" % self.legacy_user_d
with open(user_data_f, 'w') as f:
f.write("PREVIOUS")
my_returns = MOCK_RETURNS.copy()
del my_returns['user-data']
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertFalse(dsrc.metadata.get('legacy-user-data'))
found_new = False
for root, _dirs, files in os.walk(self.legacy_user_d):
for name in files:
name_f = os.path.join(root, name)
permissions = oct(os.stat(name_f)[stat.ST_MODE])[-3:]
if re.match(r'.*\/mdata-user-data$', name_f):
found_new = True
print(name_f)
self.assertEqual(permissions, '400')
self.assertFalse(found_new)
def test_vendor_data_not_default(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertEqual(MOCK_RETURNS['sdc:vendor-data'],
dsrc.metadata['vendor-data'])
def test_default_vendor_data(self):
my_returns = MOCK_RETURNS.copy()
def_op_script = my_returns['sdc:vendor-data']
del my_returns['sdc:vendor-data']
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertNotEqual(def_op_script, dsrc.metadata['vendor-data'])
# we expect default vendor-data is a boothook
self.assertTrue(dsrc.vendordata_raw.startswith("#cloud-boothook"))
def test_disable_iptables_flag(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertEqual(MOCK_RETURNS['disable_iptables_flag'],
dsrc.metadata['iptables_disable'])
def test_motd_sys_info(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertEqual(MOCK_RETURNS['enable_motd_sys_info'],
dsrc.metadata['motd_sys_info'])
def test_default_ephemeral(self):
# Test to make sure that the builtin config has the ephemeral
# configuration.
dsrc = self._get_ds()
cfg = dsrc.get_config_obj()
ret = dsrc.get_data()
self.assertTrue(ret)
assert 'disk_setup' in cfg
assert 'fs_setup' in cfg
self.assertIsInstance(cfg['disk_setup'], dict)
self.assertIsInstance(cfg['fs_setup'], list)
def test_override_disk_aliases(self):
# Test to make sure that the built-in DS is overriden
builtin = DataSourceSmartOS.BUILTIN_DS_CONFIG
mydscfg = {'disk_aliases': {'FOO': '/dev/bar'}}
# expect that these values are in builtin, or this is pointless
for k in mydscfg:
self.assertIn(k, builtin)
dsrc = self._get_ds(ds_cfg=mydscfg)
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertEqual(mydscfg['disk_aliases']['FOO'],
dsrc.ds_cfg['disk_aliases']['FOO'])
self.assertEqual(dsrc.device_name_to_device('FOO'),
mydscfg['disk_aliases']['FOO'])
def test_reconfig_network_on_boot(self):
# Test to ensure that network is configured from metadata on each boot
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
self.assertSetEqual(set([EventType.BOOT_NEW_INSTANCE, EventType.BOOT]),
dsrc.update_events['network'])
class TestIdentifyFile(CiTestCase):
"""Test the 'identify_file' utility."""
@skipIf(not which("file"), "command 'file' not available.")
def test_file_happy_path(self):
"""Test file is available and functional on plain text."""
fname = self.tmp_path("myfile")
write_file(fname, "plain text content here\n")
with self.allow_subp(["file"]):
self.assertEqual("text/plain", identify_file(fname))
@mock.patch(DSMOS + ".subp.subp")
def test_returns_none_on_error(self, m_subp):
"""On 'file' execution error, None should be returned."""
m_subp.side_effect = ProcessExecutionError("FILE_FAILED", exit_code=99)
fname = self.tmp_path("myfile")
write_file(fname, "plain text content here\n")
self.assertEqual(None, identify_file(fname))
self.assertEqual(
[mock.call(["file", "--brief", "--mime-type", fname])],
m_subp.call_args_list)
class ShortReader(object):
"""Implements a 'read' interface for bytes provided.
much like io.BytesIO but the 'endbyte' acts as if EOF.
When it is reached a short will be returned."""
def __init__(self, initial_bytes, endbyte=b'\0'):
self.data = initial_bytes
self.index = 0
self.len = len(self.data)
self.endbyte = endbyte
@property
def emptied(self):
return self.index >= self.len
def read(self, size=-1):
"""Read size bytes but not past a null."""
if size == 0 or self.index >= self.len:
return b''
rsize = size
if size < 0 or size + self.index > self.len:
rsize = self.len - self.index
next_null = self.data.find(self.endbyte, self.index, rsize)
if next_null >= 0:
rsize = next_null - self.index + 1
i = self.index
self.index += rsize
ret = self.data[i:i + rsize]
if len(ret) and ret[-1:] == self.endbyte:
ret = ret[:-1]
return ret
class TestJoyentMetadataClient(FilesystemMockingTestCase):
invalid = b'invalid command\n'
failure = b'FAILURE\n'
v2_ok = b'V2_OK\n'
def setUp(self):
super(TestJoyentMetadataClient, self).setUp()
self.serial = mock.MagicMock(spec=serial.Serial)
self.request_id = 0xabcdef12
self.metadata_value = 'value'
self.response_parts = {
'command': 'SUCCESS',
'crc': 'b5a9ff00',
'length': SUCCESS_LEN + len(b64e(self.metadata_value)),
'payload': b64e(self.metadata_value),
'request_id': '{0:08x}'.format(self.request_id),
}
def make_response():
payloadstr = ''
if 'payload' in self.response_parts:
payloadstr = ' {0}'.format(self.response_parts['payload'])
return ('V2 {length} {crc} {request_id} '
'{command}{payloadstr}\n'.format(
payloadstr=payloadstr,
**self.response_parts).encode('ascii'))
self.metasource_data = None
def read_response(length):
if not self.metasource_data:
self.metasource_data = make_response()
self.metasource_data_len = len(self.metasource_data)
resp = self.metasource_data[:length]
self.metasource_data = self.metasource_data[length:]
return resp
self.serial.read.side_effect = read_response
self.patched_funcs.enter_context(
mock.patch('cloudinit.sources.DataSourceSmartOS.random.randint',
mock.Mock(return_value=self.request_id)))
def _get_client(self):
return DataSourceSmartOS.JoyentMetadataClient(
fp=self.serial, smartos_type=DataSourceSmartOS.SMARTOS_ENV_KVM)
def _get_serial_client(self):
self.serial.timeout = 1
return DataSourceSmartOS.JoyentMetadataSerialClient(None,
fp=self.serial)
def assertEndsWith(self, haystack, prefix):
self.assertTrue(haystack.endswith(prefix),
"{0} does not end with '{1}'".format(
repr(haystack), prefix))
def assertStartsWith(self, haystack, prefix):
self.assertTrue(haystack.startswith(prefix),
"{0} does not start with '{1}'".format(
repr(haystack), prefix))
def assertNoMoreSideEffects(self, obj):
self.assertRaises(StopIteration, obj)
def test_get_metadata_writes_a_single_line(self):
client = self._get_client()
client.get('some_key')
self.assertEqual(1, self.serial.write.call_count)
written_line = self.serial.write.call_args[0][0]
self.assertEndsWith(written_line.decode('ascii'),
b'\n'.decode('ascii'))
self.assertEqual(1, written_line.count(b'\n'))
def _get_written_line(self, key='some_key'):
client = self._get_client()
client.get(key)
return self.serial.write.call_args[0][0]
def test_get_metadata_writes_bytes(self):
self.assertIsInstance(self._get_written_line(), bytes)
def test_get_metadata_line_starts_with_v2(self):
foo = self._get_written_line()
self.assertStartsWith(foo.decode('ascii'), b'V2'.decode('ascii'))
def test_get_metadata_uses_get_command(self):
parts = self._get_written_line().decode('ascii').strip().split(' ')
self.assertEqual('GET', parts[4])
def test_get_metadata_base64_encodes_argument(self):
key = 'my_key'
parts = self._get_written_line(key).decode('ascii').strip().split(' ')
self.assertEqual(b64e(key), parts[5])
def test_get_metadata_calculates_length_correctly(self):
parts = self._get_written_line().decode('ascii').strip().split(' ')
expected_length = len(' '.join(parts[3:]))
self.assertEqual(expected_length, int(parts[1]))
def test_get_metadata_uses_appropriate_request_id(self):
parts = self._get_written_line().decode('ascii').strip().split(' ')
request_id = parts[3]
self.assertEqual(8, len(request_id))
self.assertEqual(request_id, request_id.lower())
def test_get_metadata_uses_random_number_for_request_id(self):
line = self._get_written_line()
request_id = line.decode('ascii').strip().split(' ')[3]
self.assertEqual('{0:08x}'.format(self.request_id), request_id)
def test_get_metadata_checksums_correctly(self):
parts = self._get_written_line().decode('ascii').strip().split(' ')
expected_checksum = '{0:08x}'.format(
crc32(' '.join(parts[3:]).encode('utf-8')) & 0xffffffff)
checksum = parts[2]
self.assertEqual(expected_checksum, checksum)
def test_get_metadata_reads_a_line(self):
client = self._get_client()
client.get('some_key')
self.assertEqual(self.metasource_data_len, self.serial.read.call_count)
def test_get_metadata_returns_valid_value(self):
client = self._get_client()
value = client.get('some_key')
self.assertEqual(self.metadata_value, value)
def test_get_metadata_throws_exception_for_incorrect_length(self):
self.response_parts['length'] = 0
client = self._get_client()
self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
client.get, 'some_key')
def test_get_metadata_throws_exception_for_incorrect_crc(self):
self.response_parts['crc'] = 'deadbeef'
client = self._get_client()
self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
client.get, 'some_key')
def test_get_metadata_throws_exception_for_request_id_mismatch(self):
self.response_parts['request_id'] = 'deadbeef'
client = self._get_client()
client._checksum = lambda _: self.response_parts['crc']
self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
client.get, 'some_key')
def test_get_metadata_returns_None_if_value_not_found(self):
self.response_parts['payload'] = ''
self.response_parts['command'] = 'NOTFOUND'
self.response_parts['length'] = NOTFOUND_LEN
client = self._get_client()
client._checksum = lambda _: self.response_parts['crc']
self.assertIsNone(client.get('some_key'))
def test_negotiate(self):
client = self._get_client()
reader = ShortReader(self.v2_ok)
client.fp.read.side_effect = reader.read
client._negotiate()
self.assertTrue(reader.emptied)
def test_negotiate_short_response(self):
client = self._get_client()
# chopped '\n' from v2_ok.
reader = ShortReader(self.v2_ok[:-1] + b'\0')
client.fp.read.side_effect = reader.read
self.assertRaises(DataSourceSmartOS.JoyentMetadataTimeoutException,
client._negotiate)
self.assertTrue(reader.emptied)
def test_negotiate_bad_response(self):
client = self._get_client()
reader = ShortReader(b'garbage\n' + self.v2_ok)
client.fp.read.side_effect = reader.read
self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
client._negotiate)
self.assertEqual(self.v2_ok, client.fp.read())
def test_serial_open_transport(self):
client = self._get_serial_client()
reader = ShortReader(b'garbage\0' + self.invalid + self.v2_ok)
client.fp.read.side_effect = reader.read
client.open_transport()
self.assertTrue(reader.emptied)
def test_flush_failure(self):
client = self._get_serial_client()
reader = ShortReader(b'garbage' + b'\0' + self.failure +
self.invalid + self.v2_ok)
client.fp.read.side_effect = reader.read
client.open_transport()
self.assertTrue(reader.emptied)
def test_flush_many_timeouts(self):
client = self._get_serial_client()
reader = ShortReader(b'\0' * 100 + self.invalid + self.v2_ok)
client.fp.read.side_effect = reader.read
client.open_transport()
self.assertTrue(reader.emptied)
def test_list_metadata_returns_list(self):
parts = ['foo', 'bar']
value = b64e('\n'.join(parts))
self.response_parts['payload'] = value
self.response_parts['crc'] = '40873553'
self.response_parts['length'] = SUCCESS_LEN + len(value)
client = self._get_client()
self.assertEqual(client.list(), parts)
def test_list_metadata_returns_empty_list_if_no_customer_metadata(self):
del self.response_parts['payload']
self.response_parts['length'] = SUCCESS_LEN - 1
self.response_parts['crc'] = '14e563ba'
client = self._get_client()
self.assertEqual(client.list(), [])
class TestNetworkConversion(CiTestCase):
def test_convert_simple(self):
expected = {
'version': 1,
'config': [
{'name': 'net0', 'type': 'physical',
'subnets': [{'type': 'static', 'gateway': '8.12.42.1',
'address': '8.12.42.102/24'}],
'mtu': 1500, 'mac_address': '90:b8:d0:f5:e4:f5'},
{'name': 'net1', 'type': 'physical',
'subnets': [{'type': 'static',
'address': '192.168.128.93/22'}],
'mtu': 8500, 'mac_address': '90:b8:d0:a5:ff:cd'}]}
found = convert_net(SDC_NICS)
self.assertEqual(expected, found)
def test_convert_simple_alt(self):
expected = {
'version': 1,
'config': [
{'name': 'net0', 'type': 'physical',
'subnets': [{'type': 'static', 'gateway': '8.12.42.1',
'address': '8.12.42.51/24'}],
'mtu': 1500, 'mac_address': '90:b8:d0:ae:64:51'},
{'name': 'net1', 'type': 'physical',
'subnets': [{'type': 'static',
'address': '10.210.1.217/24'}],
'mtu': 1500, 'mac_address': '90:b8:d0:bd:4f:9c'}]}
found = convert_net(SDC_NICS_ALT)
self.assertEqual(expected, found)
def test_convert_simple_dhcp(self):
expected = {
'version': 1,
'config': [
{'name': 'net0', 'type': 'physical',
'subnets': [{'type': 'static', 'gateway': '8.12.42.1',
'address': '8.12.42.51/24'}],
'mtu': 1500, 'mac_address': '90:b8:d0:ae:64:51'},
{'name': 'net1', 'type': 'physical',
'subnets': [{'type': 'dhcp4'}],
'mtu': 1500, 'mac_address': '90:b8:d0:bd:4f:9c'}]}
found = convert_net(SDC_NICS_DHCP)
self.assertEqual(expected, found)
def test_convert_simple_multi_ip(self):
expected = {
'version': 1,
'config': [
{'name': 'net0', 'type': 'physical',
'subnets': [{'type': 'static', 'gateway': '8.12.42.1',
'address': '8.12.42.51/24'},
{'type': 'static',
'address': '8.12.42.52/24'}],
'mtu': 1500, 'mac_address': '90:b8:d0:ae:64:51'},
{'name': 'net1', 'type': 'physical',
'subnets': [{'type': 'static',
'address': '10.210.1.217/24'},
{'type': 'static',
'address': '10.210.1.151/24'}],
'mtu': 1500, 'mac_address': '90:b8:d0:bd:4f:9c'}]}
found = convert_net(SDC_NICS_MIP)
self.assertEqual(expected, found)
def test_convert_with_dns(self):
expected = {
'version': 1,
'config': [
{'name': 'net0', 'type': 'physical',
'subnets': [{'type': 'static', 'gateway': '8.12.42.1',
'address': '8.12.42.51/24'}],
'mtu': 1500, 'mac_address': '90:b8:d0:ae:64:51'},
{'name': 'net1', 'type': 'physical',
'subnets': [{'type': 'dhcp4'}],
'mtu': 1500, 'mac_address': '90:b8:d0:bd:4f:9c'},
{'type': 'nameserver',
'address': ['8.8.8.8', '8.8.8.1'], 'search': ["local"]}]}
found = convert_net(
network_data=SDC_NICS_DHCP, dns_servers=['8.8.8.8', '8.8.8.1'],
dns_domain="local")
self.assertEqual(expected, found)
def test_convert_simple_multi_ipv6(self):
expected = {
'version': 1,
'config': [
{'name': 'net0', 'type': 'physical',
'subnets': [{'type': 'static', 'address':
'2001:4800:78ff:1b:be76:4eff:fe06:96b3/64'},
{'type': 'static', 'gateway': '8.12.42.1',
'address': '8.12.42.51/24'}],
'mtu': 1500, 'mac_address': '90:b8:d0:ae:64:51'},
{'name': 'net1', 'type': 'physical',
'subnets': [{'type': 'static',
'address': '10.210.1.217/24'}],
'mtu': 1500, 'mac_address': '90:b8:d0:bd:4f:9c'}]}
found = convert_net(SDC_NICS_MIP_IPV6)
self.assertEqual(expected, found)
def test_convert_simple_both_ipv4_ipv6(self):
expected = {
'version': 1,
'config': [
{'mac_address': '90:b8:d0:ae:64:51', 'mtu': 1500,
'name': 'net0', 'type': 'physical',
'subnets': [{'address': '2001::10/64', 'gateway': '2001::1',
'type': 'static'},
{'address': '8.12.42.51/24',
'gateway': '8.12.42.1',
'type': 'static'},
{'address': '2001::11/64', 'type': 'static'},
{'address': '8.12.42.52/32', 'type': 'static'}]},
{'mac_address': '90:b8:d0:bd:4f:9c', 'mtu': 1500,
'name': 'net1', 'type': 'physical',
'subnets': [{'address': '10.210.1.217/24',
'type': 'static'}]}]}
found = convert_net(SDC_NICS_IPV4_IPV6)
self.assertEqual(expected, found)
def test_gateways_not_on_all_nics(self):
expected = {
'version': 1,
'config': [
{'mac_address': '90:b8:d0:d8:82:b4', 'mtu': 1500,
'name': 'net0', 'type': 'physical',
'subnets': [{'address': '8.12.42.26/24',
'gateway': '8.12.42.1', 'type': 'static'}]},
{'mac_address': '90:b8:d0:0a:51:31', 'mtu': 1500,
'name': 'net1', 'type': 'physical',
'subnets': [{'address': '10.210.1.27/24',
'type': 'static'}]}]}
found = convert_net(SDC_NICS_SINGLE_GATEWAY)
self.assertEqual(expected, found)
def test_routes_on_all_nics(self):
routes = [
{'linklocal': False, 'dst': '3.0.0.0/8', 'gateway': '8.12.42.3'},
{'linklocal': False, 'dst': '4.0.0.0/8', 'gateway': '10.210.1.4'}]
expected = {
'version': 1,
'config': [
{'mac_address': '90:b8:d0:d8:82:b4', 'mtu': 1500,
'name': 'net0', 'type': 'physical',
'subnets': [{'address': '8.12.42.26/24',
'gateway': '8.12.42.1', 'type': 'static',
'routes': [{'network': '3.0.0.0/8',
'gateway': '8.12.42.3'},
{'network': '4.0.0.0/8',
'gateway': '10.210.1.4'}]}]},
{'mac_address': '90:b8:d0:0a:51:31', 'mtu': 1500,
'name': 'net1', 'type': 'physical',
'subnets': [{'address': '10.210.1.27/24', 'type': 'static',
'routes': [{'network': '3.0.0.0/8',
'gateway': '8.12.42.3'},
{'network': '4.0.0.0/8',
'gateway': '10.210.1.4'}]}]}]}
found = convert_net(SDC_NICS_SINGLE_GATEWAY, routes=routes)
self.maxDiff = None
self.assertEqual(expected, found)
@unittest.skipUnless(get_smartos_environ() == SMARTOS_ENV_KVM,
"Only supported on KVM and bhyve guests under SmartOS")
@unittest.skipUnless(os.access(SERIAL_DEVICE, os.W_OK),
"Requires write access to " + SERIAL_DEVICE)
@unittest.skipUnless(HAS_PYSERIAL is True, "pyserial not available")
class TestSerialConcurrency(CiTestCase):
"""
This class tests locking on an actual serial port, and as such can only
be run in a kvm or bhyve guest running on a SmartOS host. A test run on
a metadata socket will not be valid because a metadata socket ensures
there is only one session over a connection. In contrast, in the
absence of proper locking multiple processes opening the same serial
port can corrupt each others' exchanges with the metadata server.
This takes on the order of 2 to 3 minutes to run.
"""
allowed_subp = ['mdata-get']
def setUp(self):
self.mdata_proc = multiprocessing.Process(target=self.start_mdata_loop)
self.mdata_proc.start()
super(TestSerialConcurrency, self).setUp()
def tearDown(self):
# os.kill() rather than mdata_proc.terminate() to avoid console spam.
os.kill(self.mdata_proc.pid, signal.SIGKILL)
self.mdata_proc.join()
super(TestSerialConcurrency, self).tearDown()
def start_mdata_loop(self):
"""
The mdata-get command is repeatedly run in a separate process so
that it may try to race with metadata operations performed in the
main test process. Use of mdata-get is better than two processes
using the protocol implementation in DataSourceSmartOS because we
are testing to be sure that cloud-init and mdata-get respect each
others locks.
"""
rcs = list(range(0, 256))
while True:
subp(['mdata-get', 'sdc:routes'], rcs=rcs)
def test_all_keys(self):
self.assertIsNotNone(self.mdata_proc.pid)
ds = DataSourceSmartOS
keys = [tup[0] for tup in ds.SMARTOS_ATTRIB_MAP.values()]
keys.extend(ds.SMARTOS_ATTRIB_JSON.values())
client = ds.jmc_client_factory(smartos_type=SMARTOS_ENV_KVM)
self.assertIsNotNone(client)
# The behavior that we are testing for was observed mdata-get running
# 10 times at roughly the same time as cloud-init fetched each key
# once. cloud-init would regularly see failures before making it
# through all keys once.
for _ in range(0, 3):
for key in keys:
# We don't care about the return value, just that it doesn't
# thrown any exceptions.
client.get(key)
self.assertIsNone(self.mdata_proc.exitcode)
# vi: ts=4 expandtab