|
Packit Service |
a04d08 |
# Copyright (C) 2009-2010 Canonical Ltd.
|
|
Packit Service |
a04d08 |
# Copyright (C) 2012 Hewlett-Packard Development Company, L.P.
|
|
Packit Service |
a04d08 |
# Copyright (C) 2012 Yahoo! Inc.
|
|
Packit Service |
a04d08 |
#
|
|
Packit Service |
a04d08 |
# Author: Joe VLcek <JVLcek@RedHat.com>
|
|
Packit Service |
a04d08 |
#
|
|
Packit Service |
a04d08 |
# This file is part of cloud-init. See LICENSE file for license information.
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
This test file exercises the code in sources DataSourceAltCloud.py
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
import os
|
|
Packit Service |
a04d08 |
import shutil
|
|
Packit Service |
a04d08 |
import tempfile
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
from cloudinit import helpers
|
|
Packit Service |
751c4a |
from cloudinit import subp
|
|
Packit Service |
a04d08 |
from cloudinit import util
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
from cloudinit.tests.helpers import CiTestCase, mock
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
import cloudinit.sources.DataSourceAltCloud as dsac
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
OS_UNAME_ORIG = getattr(os, 'uname')
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def _write_user_data_files(mount_dir, value):
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
Populate the deltacloud_user_data_file the user_data_file
|
|
Packit Service |
a04d08 |
which would be populated with user data.
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt'
|
|
Packit Service |
a04d08 |
user_data_file = mount_dir + '/user-data.txt'
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
udfile = open(deltacloud_user_data_file, 'w')
|
|
Packit Service |
a04d08 |
udfile.write(value)
|
|
Packit Service |
a04d08 |
udfile.close()
|
|
Packit Service |
a04d08 |
os.chmod(deltacloud_user_data_file, 0o664)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
udfile = open(user_data_file, 'w')
|
|
Packit Service |
a04d08 |
udfile.write(value)
|
|
Packit Service |
a04d08 |
udfile.close()
|
|
Packit Service |
a04d08 |
os.chmod(user_data_file, 0o664)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def _remove_user_data_files(mount_dir,
|
|
Packit Service |
a04d08 |
dc_file=True,
|
|
Packit Service |
a04d08 |
non_dc_file=True):
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
Remove the test files: deltacloud_user_data_file and
|
|
Packit Service |
a04d08 |
user_data_file
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt'
|
|
Packit Service |
a04d08 |
user_data_file = mount_dir + '/user-data.txt'
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
# Ignore any failures removeing files that are already gone.
|
|
Packit Service |
a04d08 |
if dc_file:
|
|
Packit Service |
a04d08 |
try:
|
|
Packit Service |
a04d08 |
os.remove(deltacloud_user_data_file)
|
|
Packit Service |
a04d08 |
except OSError:
|
|
Packit Service |
a04d08 |
pass
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
if non_dc_file:
|
|
Packit Service |
a04d08 |
try:
|
|
Packit Service |
a04d08 |
os.remove(user_data_file)
|
|
Packit Service |
a04d08 |
except OSError:
|
|
Packit Service |
a04d08 |
pass
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def _dmi_data(expected):
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
Spoof the data received over DMI
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
def _data(key):
|
|
Packit Service |
a04d08 |
return expected
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
return _data
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class TestGetCloudType(CiTestCase):
|
|
Packit Service |
a04d08 |
'''Test to exercise method: DataSourceAltCloud.get_cloud_type()'''
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
with_logs = True
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def setUp(self):
|
|
Packit Service |
a04d08 |
'''Set up.'''
|
|
Packit Service |
a04d08 |
super(TestGetCloudType, self).setUp()
|
|
Packit Service |
a04d08 |
self.tmp = self.tmp_dir()
|
|
Packit Service |
a04d08 |
self.paths = helpers.Paths({'cloud_dir': self.tmp})
|
|
Packit Service |
a04d08 |
self.dmi_data = util.read_dmi_data
|
|
Packit Service |
a04d08 |
# We have a different code path for arm to deal with LP1243287
|
|
Packit Service |
a04d08 |
# We have to switch arch to x86_64 to avoid test failure
|
|
Packit Service |
a04d08 |
force_arch('x86_64')
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def tearDown(self):
|
|
Packit Service |
a04d08 |
# Reset
|
|
Packit Service |
a04d08 |
util.read_dmi_data = self.dmi_data
|
|
Packit Service |
a04d08 |
force_arch()
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_cloud_info_file_ioerror(self):
|
|
Packit Service |
a04d08 |
"""Return UNKNOWN when /etc/sysconfig/cloud-info exists but errors."""
|
|
Packit Service |
a04d08 |
self.assertEqual('/etc/sysconfig/cloud-info', dsac.CLOUD_INFO_FILE)
|
|
Packit Service |
a04d08 |
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
|
|
Packit Service |
a04d08 |
# Attempting to read the directory generates IOError
|
|
Packit Service |
a04d08 |
with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.tmp):
|
|
Packit Service |
a04d08 |
self.assertEqual('UNKNOWN', dsrc.get_cloud_type())
|
|
Packit Service |
a04d08 |
self.assertIn(
|
|
Packit Service |
a04d08 |
"[Errno 21] Is a directory: '%s'" % self.tmp,
|
|
Packit Service |
a04d08 |
self.logs.getvalue())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_cloud_info_file(self):
|
|
Packit Service |
a04d08 |
"""Return uppercase stripped content from /etc/sysconfig/cloud-info."""
|
|
Packit Service |
a04d08 |
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
|
|
Packit Service |
a04d08 |
cloud_info = self.tmp_path('cloud-info', dir=self.tmp)
|
|
Packit Service |
a04d08 |
util.write_file(cloud_info, ' OverRiDdeN CloudType ')
|
|
Packit Service |
a04d08 |
# Attempting to read the directory generates IOError
|
|
Packit Service |
a04d08 |
with mock.patch.object(dsac, 'CLOUD_INFO_FILE', cloud_info):
|
|
Packit Service |
a04d08 |
self.assertEqual('OVERRIDDEN CLOUDTYPE', dsrc.get_cloud_type())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_rhev(self):
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
Test method get_cloud_type() for RHEVm systems.
|
|
Packit Service |
a04d08 |
Forcing read_dmi_data return to match a RHEVm system: RHEV Hypervisor
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
util.read_dmi_data = _dmi_data('RHEV')
|
|
Packit Service |
a04d08 |
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
|
|
Packit Service |
a04d08 |
self.assertEqual('RHEV', dsrc.get_cloud_type())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_vsphere(self):
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
Test method get_cloud_type() for vSphere systems.
|
|
Packit Service |
a04d08 |
Forcing read_dmi_data return to match a vSphere system: RHEV Hypervisor
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
util.read_dmi_data = _dmi_data('VMware Virtual Platform')
|
|
Packit Service |
a04d08 |
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
|
|
Packit Service |
a04d08 |
self.assertEqual('VSPHERE', dsrc.get_cloud_type())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_unknown(self):
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
Test method get_cloud_type() for unknown systems.
|
|
Packit Service |
a04d08 |
Forcing read_dmi_data return to match an unrecognized return.
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
util.read_dmi_data = _dmi_data('Unrecognized Platform')
|
|
Packit Service |
a04d08 |
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
|
|
Packit Service |
a04d08 |
self.assertEqual('UNKNOWN', dsrc.get_cloud_type())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class TestGetDataCloudInfoFile(CiTestCase):
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
Test to exercise method: DataSourceAltCloud.get_data()
|
|
Packit Service |
a04d08 |
With a contrived CLOUD_INFO_FILE
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
def setUp(self):
|
|
Packit Service |
a04d08 |
'''Set up.'''
|
|
Packit Service |
a04d08 |
self.tmp = self.tmp_dir()
|
|
Packit Service |
a04d08 |
self.paths = helpers.Paths(
|
|
Packit Service |
a04d08 |
{'cloud_dir': self.tmp, 'run_dir': self.tmp})
|
|
Packit Service |
a04d08 |
self.cloud_info_file = self.tmp_path('cloud-info', dir=self.tmp)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_rhev(self):
|
|
Packit Service |
a04d08 |
'''Success Test module get_data() forcing RHEV.'''
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
util.write_file(self.cloud_info_file, 'RHEV')
|
|
Packit Service |
a04d08 |
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
|
|
Packit Service |
a04d08 |
dsrc.user_data_rhevm = lambda: True
|
|
Packit Service |
a04d08 |
with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.cloud_info_file):
|
|
Packit Service |
a04d08 |
self.assertEqual(True, dsrc.get_data())
|
|
Packit Service |
a04d08 |
self.assertEqual('altcloud', dsrc.cloud_name)
|
|
Packit Service |
a04d08 |
self.assertEqual('altcloud', dsrc.platform_type)
|
|
Packit Service |
a04d08 |
self.assertEqual('rhev (/dev/fd0)', dsrc.subplatform)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_vsphere(self):
|
|
Packit Service |
a04d08 |
'''Success Test module get_data() forcing VSPHERE.'''
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
util.write_file(self.cloud_info_file, 'VSPHERE')
|
|
Packit Service |
a04d08 |
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
|
|
Packit Service |
a04d08 |
dsrc.user_data_vsphere = lambda: True
|
|
Packit Service |
a04d08 |
with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.cloud_info_file):
|
|
Packit Service |
a04d08 |
self.assertEqual(True, dsrc.get_data())
|
|
Packit Service |
a04d08 |
self.assertEqual('altcloud', dsrc.cloud_name)
|
|
Packit Service |
a04d08 |
self.assertEqual('altcloud', dsrc.platform_type)
|
|
Packit Service |
a04d08 |
self.assertEqual('vsphere (unknown)', dsrc.subplatform)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_fail_rhev(self):
|
|
Packit Service |
a04d08 |
'''Failure Test module get_data() forcing RHEV.'''
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
util.write_file(self.cloud_info_file, 'RHEV')
|
|
Packit Service |
a04d08 |
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
|
|
Packit Service |
a04d08 |
dsrc.user_data_rhevm = lambda: False
|
|
Packit Service |
a04d08 |
with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.cloud_info_file):
|
|
Packit Service |
a04d08 |
self.assertEqual(False, dsrc.get_data())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_fail_vsphere(self):
|
|
Packit Service |
a04d08 |
'''Failure Test module get_data() forcing VSPHERE.'''
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
util.write_file(self.cloud_info_file, 'VSPHERE')
|
|
Packit Service |
a04d08 |
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
|
|
Packit Service |
a04d08 |
dsrc.user_data_vsphere = lambda: False
|
|
Packit Service |
a04d08 |
with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.cloud_info_file):
|
|
Packit Service |
a04d08 |
self.assertEqual(False, dsrc.get_data())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_unrecognized(self):
|
|
Packit Service |
a04d08 |
'''Failure Test module get_data() forcing unrecognized.'''
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
util.write_file(self.cloud_info_file, 'unrecognized')
|
|
Packit Service |
a04d08 |
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
|
|
Packit Service |
a04d08 |
with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.cloud_info_file):
|
|
Packit Service |
a04d08 |
self.assertEqual(False, dsrc.get_data())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class TestGetDataNoCloudInfoFile(CiTestCase):
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
Test to exercise method: DataSourceAltCloud.get_data()
|
|
Packit Service |
a04d08 |
Without a CLOUD_INFO_FILE
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
def setUp(self):
|
|
Packit Service |
a04d08 |
'''Set up.'''
|
|
Packit Service |
a04d08 |
self.tmp = self.tmp_dir()
|
|
Packit Service |
a04d08 |
self.paths = helpers.Paths(
|
|
Packit Service |
a04d08 |
{'cloud_dir': self.tmp, 'run_dir': self.tmp})
|
|
Packit Service |
a04d08 |
self.dmi_data = util.read_dmi_data
|
|
Packit Service |
a04d08 |
dsac.CLOUD_INFO_FILE = \
|
|
Packit Service |
a04d08 |
'no such file'
|
|
Packit Service |
a04d08 |
# We have a different code path for arm to deal with LP1243287
|
|
Packit Service |
a04d08 |
# We have to switch arch to x86_64 to avoid test failure
|
|
Packit Service |
a04d08 |
force_arch('x86_64')
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def tearDown(self):
|
|
Packit Service |
a04d08 |
# Reset
|
|
Packit Service |
a04d08 |
dsac.CLOUD_INFO_FILE = \
|
|
Packit Service |
a04d08 |
'/etc/sysconfig/cloud-info'
|
|
Packit Service |
a04d08 |
util.read_dmi_data = self.dmi_data
|
|
Packit Service |
a04d08 |
# Return back to original arch
|
|
Packit Service |
a04d08 |
force_arch()
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_rhev_no_cloud_file(self):
|
|
Packit Service |
a04d08 |
'''Test No cloud info file module get_data() forcing RHEV.'''
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
util.read_dmi_data = _dmi_data('RHEV Hypervisor')
|
|
Packit Service |
a04d08 |
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
|
|
Packit Service |
a04d08 |
dsrc.user_data_rhevm = lambda: True
|
|
Packit Service |
a04d08 |
self.assertEqual(True, dsrc.get_data())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_vsphere_no_cloud_file(self):
|
|
Packit Service |
a04d08 |
'''Test No cloud info file module get_data() forcing VSPHERE.'''
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
util.read_dmi_data = _dmi_data('VMware Virtual Platform')
|
|
Packit Service |
a04d08 |
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
|
|
Packit Service |
a04d08 |
dsrc.user_data_vsphere = lambda: True
|
|
Packit Service |
a04d08 |
self.assertEqual(True, dsrc.get_data())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_failure_no_cloud_file(self):
|
|
Packit Service |
a04d08 |
'''Test No cloud info file module get_data() forcing unrecognized.'''
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
util.read_dmi_data = _dmi_data('Unrecognized Platform')
|
|
Packit Service |
a04d08 |
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
|
|
Packit Service |
a04d08 |
self.assertEqual(False, dsrc.get_data())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class TestUserDataRhevm(CiTestCase):
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
Test to exercise method: DataSourceAltCloud.user_data_rhevm()
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
def setUp(self):
|
|
Packit Service |
a04d08 |
'''Set up.'''
|
|
Packit Service |
a04d08 |
self.paths = helpers.Paths({'cloud_dir': '/tmp'})
|
|
Packit Service |
a04d08 |
self.mount_dir = self.tmp_dir()
|
|
Packit Service |
a04d08 |
_write_user_data_files(self.mount_dir, 'test user data')
|
|
Packit Service |
a04d08 |
self.add_patch(
|
|
Packit Service |
a04d08 |
'cloudinit.sources.DataSourceAltCloud.modprobe_floppy',
|
|
Packit Service |
a04d08 |
'm_modprobe_floppy', return_value=None)
|
|
Packit Service |
a04d08 |
self.add_patch(
|
|
Packit Service |
a04d08 |
'cloudinit.sources.DataSourceAltCloud.util.udevadm_settle',
|
|
Packit Service |
a04d08 |
'm_udevadm_settle', return_value=('', ''))
|
|
Packit Service |
a04d08 |
self.add_patch(
|
|
Packit Service |
a04d08 |
'cloudinit.sources.DataSourceAltCloud.util.mount_cb',
|
|
Packit Service |
a04d08 |
'm_mount_cb')
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_mount_cb_fails(self):
|
|
Packit Service |
a04d08 |
'''Test user_data_rhevm() where mount_cb fails.'''
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
self.m_mount_cb.side_effect = util.MountFailedError("Failed Mount")
|
|
Packit Service |
a04d08 |
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
|
|
Packit Service |
a04d08 |
self.assertEqual(False, dsrc.user_data_rhevm())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_modprobe_fails(self):
|
|
Packit Service |
a04d08 |
'''Test user_data_rhevm() where modprobe fails.'''
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
751c4a |
self.m_modprobe_floppy.side_effect = subp.ProcessExecutionError(
|
|
Packit Service |
a04d08 |
"Failed modprobe")
|
|
Packit Service |
a04d08 |
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
|
|
Packit Service |
a04d08 |
self.assertEqual(False, dsrc.user_data_rhevm())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_no_modprobe_cmd(self):
|
|
Packit Service |
a04d08 |
'''Test user_data_rhevm() with no modprobe command.'''
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
751c4a |
self.m_modprobe_floppy.side_effect = subp.ProcessExecutionError(
|
|
Packit Service |
a04d08 |
"No such file or dir")
|
|
Packit Service |
a04d08 |
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
|
|
Packit Service |
a04d08 |
self.assertEqual(False, dsrc.user_data_rhevm())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_udevadm_fails(self):
|
|
Packit Service |
a04d08 |
'''Test user_data_rhevm() where udevadm fails.'''
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
751c4a |
self.m_udevadm_settle.side_effect = subp.ProcessExecutionError(
|
|
Packit Service |
a04d08 |
"Failed settle.")
|
|
Packit Service |
a04d08 |
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
|
|
Packit Service |
a04d08 |
self.assertEqual(False, dsrc.user_data_rhevm())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_no_udevadm_cmd(self):
|
|
Packit Service |
a04d08 |
'''Test user_data_rhevm() with no udevadm command.'''
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
self.m_udevadm_settle.side_effect = OSError("No such file or dir")
|
|
Packit Service |
a04d08 |
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
|
|
Packit Service |
a04d08 |
self.assertEqual(False, dsrc.user_data_rhevm())
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class TestUserDataVsphere(CiTestCase):
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
Test to exercise method: DataSourceAltCloud.user_data_vsphere()
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
def setUp(self):
|
|
Packit Service |
a04d08 |
'''Set up.'''
|
|
Packit Service |
a04d08 |
self.tmp = self.tmp_dir()
|
|
Packit Service |
a04d08 |
self.paths = helpers.Paths({'cloud_dir': self.tmp})
|
|
Packit Service |
a04d08 |
self.mount_dir = tempfile.mkdtemp()
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
_write_user_data_files(self.mount_dir, 'test user data')
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def tearDown(self):
|
|
Packit Service |
a04d08 |
# Reset
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
_remove_user_data_files(self.mount_dir)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
# Attempt to remove the temp dir ignoring errors
|
|
Packit Service |
a04d08 |
try:
|
|
Packit Service |
a04d08 |
shutil.rmtree(self.mount_dir)
|
|
Packit Service |
a04d08 |
except OSError:
|
|
Packit Service |
a04d08 |
pass
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
dsac.CLOUD_INFO_FILE = \
|
|
Packit Service |
a04d08 |
'/etc/sysconfig/cloud-info'
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
@mock.patch("cloudinit.sources.DataSourceAltCloud.util.find_devs_with")
|
|
Packit Service |
a04d08 |
@mock.patch("cloudinit.sources.DataSourceAltCloud.util.mount_cb")
|
|
Packit Service |
a04d08 |
def test_user_data_vsphere_no_cdrom(self, m_mount_cb, m_find_devs_with):
|
|
Packit Service |
a04d08 |
'''Test user_data_vsphere() where mount_cb fails.'''
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
m_mount_cb.return_value = []
|
|
Packit Service |
a04d08 |
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
|
|
Packit Service |
a04d08 |
self.assertEqual(False, dsrc.user_data_vsphere())
|
|
Packit Service |
a04d08 |
self.assertEqual(0, m_mount_cb.call_count)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
@mock.patch("cloudinit.sources.DataSourceAltCloud.util.find_devs_with")
|
|
Packit Service |
a04d08 |
@mock.patch("cloudinit.sources.DataSourceAltCloud.util.mount_cb")
|
|
Packit Service |
a04d08 |
def test_user_data_vsphere_mcb_fail(self, m_mount_cb, m_find_devs_with):
|
|
Packit Service |
a04d08 |
'''Test user_data_vsphere() where mount_cb fails.'''
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
m_find_devs_with.return_value = ["/dev/mock/cdrom"]
|
|
Packit Service |
a04d08 |
m_mount_cb.side_effect = util.MountFailedError("Unable To mount")
|
|
Packit Service |
a04d08 |
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
|
|
Packit Service |
a04d08 |
self.assertEqual(False, dsrc.user_data_vsphere())
|
|
Packit Service |
a04d08 |
self.assertEqual(1, m_find_devs_with.call_count)
|
|
Packit Service |
a04d08 |
self.assertEqual(1, m_mount_cb.call_count)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
@mock.patch("cloudinit.sources.DataSourceAltCloud.util.find_devs_with")
|
|
Packit Service |
a04d08 |
@mock.patch("cloudinit.sources.DataSourceAltCloud.util.mount_cb")
|
|
Packit Service |
a04d08 |
def test_user_data_vsphere_success(self, m_mount_cb, m_find_devs_with):
|
|
Packit Service |
a04d08 |
"""Test user_data_vsphere() where successful."""
|
|
Packit Service |
a04d08 |
m_find_devs_with.return_value = ["/dev/mock/cdrom"]
|
|
Packit Service |
a04d08 |
m_mount_cb.return_value = 'raw userdata from cdrom'
|
|
Packit Service |
a04d08 |
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
|
|
Packit Service |
a04d08 |
cloud_info = self.tmp_path('cloud-info', dir=self.tmp)
|
|
Packit Service |
a04d08 |
util.write_file(cloud_info, 'VSPHERE')
|
|
Packit Service |
a04d08 |
self.assertEqual(True, dsrc.user_data_vsphere())
|
|
Packit Service |
a04d08 |
m_find_devs_with.assert_called_once_with('LABEL=CDROM')
|
|
Packit Service |
a04d08 |
m_mount_cb.assert_called_once_with(
|
|
Packit Service |
a04d08 |
'/dev/mock/cdrom', dsac.read_user_data_callback)
|
|
Packit Service |
a04d08 |
with mock.patch.object(dsrc, 'get_cloud_type', return_value='VSPHERE'):
|
|
Packit Service |
a04d08 |
self.assertEqual('vsphere (/dev/mock/cdrom)', dsrc.subplatform)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
class TestReadUserDataCallback(CiTestCase):
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
Test to exercise method: DataSourceAltCloud.read_user_data_callback()
|
|
Packit Service |
a04d08 |
'''
|
|
Packit Service |
a04d08 |
def setUp(self):
|
|
Packit Service |
a04d08 |
'''Set up.'''
|
|
Packit Service |
a04d08 |
self.paths = helpers.Paths({'cloud_dir': '/tmp'})
|
|
Packit Service |
a04d08 |
self.mount_dir = tempfile.mkdtemp()
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
_write_user_data_files(self.mount_dir, 'test user data')
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def tearDown(self):
|
|
Packit Service |
a04d08 |
# Reset
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
_remove_user_data_files(self.mount_dir)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
# Attempt to remove the temp dir ignoring errors
|
|
Packit Service |
a04d08 |
try:
|
|
Packit Service |
a04d08 |
shutil.rmtree(self.mount_dir)
|
|
Packit Service |
a04d08 |
except OSError:
|
|
Packit Service |
a04d08 |
pass
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_callback_both(self):
|
|
Packit Service |
a04d08 |
'''Test read_user_data_callback() with both files.'''
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
self.assertEqual('test user data',
|
|
Packit Service |
a04d08 |
dsac.read_user_data_callback(self.mount_dir))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_callback_dc(self):
|
|
Packit Service |
a04d08 |
'''Test read_user_data_callback() with only DC file.'''
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
_remove_user_data_files(self.mount_dir,
|
|
Packit Service |
a04d08 |
dc_file=False,
|
|
Packit Service |
a04d08 |
non_dc_file=True)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
self.assertEqual('test user data',
|
|
Packit Service |
a04d08 |
dsac.read_user_data_callback(self.mount_dir))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_callback_non_dc(self):
|
|
Packit Service |
a04d08 |
'''Test read_user_data_callback() with only non-DC file.'''
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
_remove_user_data_files(self.mount_dir,
|
|
Packit Service |
a04d08 |
dc_file=True,
|
|
Packit Service |
a04d08 |
non_dc_file=False)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
self.assertEqual('test user data',
|
|
Packit Service |
a04d08 |
dsac.read_user_data_callback(self.mount_dir))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def test_callback_none(self):
|
|
Packit Service |
a04d08 |
'''Test read_user_data_callback() no files are found.'''
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
_remove_user_data_files(self.mount_dir)
|
|
Packit Service |
a04d08 |
self.assertIsNone(dsac.read_user_data_callback(self.mount_dir))
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def force_arch(arch=None):
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
def _os_uname():
|
|
Packit Service |
a04d08 |
return ('LINUX', 'NODENAME', 'RELEASE', 'VERSION', arch)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
if arch:
|
|
Packit Service |
a04d08 |
setattr(os, 'uname', _os_uname)
|
|
Packit Service |
a04d08 |
elif arch is None:
|
|
Packit Service |
a04d08 |
setattr(os, 'uname', OS_UNAME_ORIG)
|
|
Packit Service |
a04d08 |
|
|
Packit Service |
a04d08 |
# vi: ts=4 expandtab
|