# This file is part of cloud-init. See LICENSE file for license information.
from cloudinit.config import cc_ntp
from cloudinit.sources import DataSourceNone
from cloudinit import (distros, helpers, cloud, util)
from cloudinit.tests.helpers import (
CiTestCase, FilesystemMockingTestCase, mock, skipUnlessJsonSchema)
import copy
import os
from os.path import dirname
import shutil
NTP_TEMPLATE = """\
## template: jinja
servers {{servers}}
pools {{pools}}
"""
TIMESYNCD_TEMPLATE = """\
## template:jinja
[Time]
{% if servers or pools -%}
NTP={% for host in servers|list + pools|list %}{{ host }} {% endfor -%}
{% endif -%}
"""
class TestNtp(FilesystemMockingTestCase):
with_logs = True
def setUp(self):
super(TestNtp, self).setUp()
self.new_root = self.tmp_dir()
self.add_patch('cloudinit.util.system_is_snappy', 'm_snappy')
self.m_snappy.return_value = False
self.add_patch('cloudinit.util.system_info', 'm_sysinfo')
self.m_sysinfo.return_value = {'dist': ('Distro', '99.1', 'Codename')}
def _get_cloud(self, distro, sys_cfg=None):
self.new_root = self.reRoot(root=self.new_root)
paths = helpers.Paths({'templates_dir': self.new_root})
cls = distros.fetch(distro)
if not sys_cfg:
sys_cfg = {}
mydist = cls(distro, sys_cfg, paths)
myds = DataSourceNone.DataSourceNone(sys_cfg, mydist, paths)
return cloud.Cloud(myds, paths, sys_cfg, mydist, None)
def _get_template_path(self, template_name, distro, basepath=None):
# ntp.conf.{distro} -> ntp.conf.debian.tmpl
template_fn = '{0}.tmpl'.format(
template_name.replace('{distro}', distro))
if not basepath:
basepath = self.new_root
path = os.path.join(basepath, template_fn)
return path
def _generate_template(self, template=None):
if not template:
template = NTP_TEMPLATE
confpath = os.path.join(self.new_root, 'client.conf')
template_fn = os.path.join(self.new_root, 'client.conf.tmpl')
util.write_file(template_fn, content=template)
return (confpath, template_fn)
def _mock_ntp_client_config(self, client=None, distro=None):
if not client:
client = 'ntp'
if not distro:
distro = 'ubuntu'
dcfg = cc_ntp.distro_ntp_client_configs(distro)
if client == 'systemd-timesyncd':
template = TIMESYNCD_TEMPLATE
else:
template = NTP_TEMPLATE
(confpath, _template_fn) = self._generate_template(template=template)
ntpconfig = copy.deepcopy(dcfg[client])
ntpconfig['confpath'] = confpath
ntpconfig['template_name'] = os.path.basename(confpath)
return ntpconfig
@mock.patch("cloudinit.config.cc_ntp.subp")
def test_ntp_install(self, mock_subp):
"""ntp_install_client runs install_func when check_exe is absent."""
mock_subp.which.return_value = None # check_exe not found.
install_func = mock.MagicMock()
cc_ntp.install_ntp_client(install_func,
packages=['ntpx'], check_exe='ntpdx')
mock_subp.which.assert_called_with('ntpdx')
install_func.assert_called_once_with(['ntpx'])
@mock.patch("cloudinit.config.cc_ntp.subp")
def test_ntp_install_not_needed(self, mock_subp):
"""ntp_install_client doesn't install when check_exe is found."""
client = 'chrony'
mock_subp.which.return_value = [client] # check_exe found.
install_func = mock.MagicMock()
cc_ntp.install_ntp_client(install_func, packages=[client],
check_exe=client)
install_func.assert_not_called()
@mock.patch("cloudinit.config.cc_ntp.subp")
def test_ntp_install_no_op_with_empty_pkg_list(self, mock_subp):
"""ntp_install_client runs install_func with empty list"""
mock_subp.which.return_value = None # check_exe not found
install_func = mock.MagicMock()
cc_ntp.install_ntp_client(install_func, packages=[],
check_exe='timesyncd')
install_func.assert_called_once_with([])
@mock.patch("cloudinit.config.cc_ntp.subp")
def test_reload_ntp_defaults(self, mock_subp):
"""Test service is restarted/reloaded (defaults)"""
service = 'ntp_service_name'
cmd = ['service', service, 'restart']
cc_ntp.reload_ntp(service)
mock_subp.subp.assert_called_with(cmd, capture=True)
@mock.patch("cloudinit.config.cc_ntp.subp")
def test_reload_ntp_systemd(self, mock_subp):
"""Test service is restarted/reloaded (systemd)"""
service = 'ntp_service_name'
cc_ntp.reload_ntp(service, systemd=True)
cmd = ['systemctl', 'reload-or-restart', service]
mock_subp.subp.assert_called_with(cmd, capture=True)
def test_ntp_rename_ntp_conf(self):
"""When NTP_CONF exists, rename_ntp moves it."""
ntpconf = self.tmp_path("ntp.conf", self.new_root)
util.write_file(ntpconf, "")
cc_ntp.rename_ntp_conf(confpath=ntpconf)
self.assertFalse(os.path.exists(ntpconf))
self.assertTrue(os.path.exists("{0}.dist".format(ntpconf)))
def test_ntp_rename_ntp_conf_skip_missing(self):
"""When NTP_CONF doesn't exist rename_ntp doesn't create a file."""
ntpconf = self.tmp_path("ntp.conf", self.new_root)
self.assertFalse(os.path.exists(ntpconf))
cc_ntp.rename_ntp_conf(confpath=ntpconf)
self.assertFalse(os.path.exists("{0}.dist".format(ntpconf)))
self.assertFalse(os.path.exists(ntpconf))
def test_write_ntp_config_template_uses_ntp_conf_distro_no_servers(self):
"""write_ntp_config_template reads from $client.conf.distro.tmpl"""
servers = []
pools = ['10.0.0.1', '10.0.0.2']
(confpath, template_fn) = self._generate_template()
mock_path = 'cloudinit.config.cc_ntp.temp_utils._TMPDIR'
with mock.patch(mock_path, self.new_root):
cc_ntp.write_ntp_config_template('ubuntu',
servers=servers, pools=pools,
path=confpath,
template_fn=template_fn,
template=None)
self.assertEqual(
"servers []\npools ['10.0.0.1', '10.0.0.2']\n",
util.load_file(confpath))
def test_write_ntp_config_template_defaults_pools_w_empty_lists(self):
"""write_ntp_config_template defaults pools servers upon empty config.
When both pools and servers are empty, default NR_POOL_SERVERS get
configured.
"""
distro = 'ubuntu'
pools = cc_ntp.generate_server_names(distro)
servers = []
(confpath, template_fn) = self._generate_template()
mock_path = 'cloudinit.config.cc_ntp.temp_utils._TMPDIR'
with mock.patch(mock_path, self.new_root):
cc_ntp.write_ntp_config_template(distro,
servers=servers, pools=pools,
path=confpath,
template_fn=template_fn,
template=None)
self.assertEqual(
"servers []\npools {0}\n".format(pools),
util.load_file(confpath))
def test_defaults_pools_empty_lists_sles(self):
"""write_ntp_config_template defaults opensuse pools upon empty config.
When both pools and servers are empty, default NR_POOL_SERVERS get
configured.
"""
distro = 'sles'
default_pools = cc_ntp.generate_server_names(distro)
(confpath, template_fn) = self._generate_template()
cc_ntp.write_ntp_config_template(distro,
servers=[], pools=[],
path=confpath,
template_fn=template_fn,
template=None)
for pool in default_pools:
self.assertIn('opensuse', pool)
self.assertEqual(
"servers []\npools {0}\n".format(default_pools),
util.load_file(confpath))
self.assertIn(
"Adding distro default ntp pool servers: {0}".format(
",".join(default_pools)),
self.logs.getvalue())
def test_timesyncd_template(self):
"""Test timesycnd template is correct"""
pools = ['0.mycompany.pool.ntp.org', '3.mycompany.pool.ntp.org']
servers = ['192.168.23.3', '192.168.23.4']
(confpath, template_fn) = self._generate_template(
template=TIMESYNCD_TEMPLATE)
cc_ntp.write_ntp_config_template('ubuntu',
servers=servers, pools=pools,
path=confpath,
template_fn=template_fn,
template=None)
self.assertEqual(
"[Time]\nNTP=%s %s \n" % (" ".join(servers), " ".join(pools)),
util.load_file(confpath))
def test_distro_ntp_client_configs(self):
"""Test we have updated ntp client configs on different distros"""
delta = copy.deepcopy(cc_ntp.DISTRO_CLIENT_CONFIG)
base = copy.deepcopy(cc_ntp.NTP_CLIENT_CONFIG)
# confirm no-delta distros match the base config
for distro in cc_ntp.distros:
if distro not in delta:
result = cc_ntp.distro_ntp_client_configs(distro)
self.assertEqual(base, result)
# for distros with delta, ensure the merged config values match
# what is set in the delta
for distro in delta.keys():
result = cc_ntp.distro_ntp_client_configs(distro)
for client in delta[distro].keys():
for key in delta[distro][client].keys():
self.assertEqual(delta[distro][client][key],
result[client][key])
def _get_expected_pools(self, pools, distro, client):
if client in ['ntp', 'chrony']:
if client == 'ntp' and distro == 'alpine':
# NTP for Alpine Linux is Busybox's ntp which does not
# support 'pool' lines in its configuration file.
expected_pools = []
else:
expected_pools = [
'pool {0} iburst'.format(pool) for pool in pools]
elif client == 'systemd-timesyncd':
expected_pools = " ".join(pools)
return expected_pools
def _get_expected_servers(self, servers, distro, client):
if client in ['ntp', 'chrony']:
if client == 'ntp' and distro == 'alpine':
# NTP for Alpine Linux is Busybox's ntp which only supports
# 'server' lines without iburst option.
expected_servers = [
'server {0}'.format(srv) for srv in servers]
else:
expected_servers = [
'server {0} iburst'.format(srv) for srv in servers]
elif client == 'systemd-timesyncd':
expected_servers = " ".join(servers)
return expected_servers
def test_ntp_handler_real_distro_ntp_templates(self):
"""Test ntp handler renders the shipped distro ntp client templates."""
pools = ['0.mycompany.pool.ntp.org', '3.mycompany.pool.ntp.org']
servers = ['192.168.23.3', '192.168.23.4']
for client in ['ntp', 'systemd-timesyncd', 'chrony']:
for distro in cc_ntp.distros:
distro_cfg = cc_ntp.distro_ntp_client_configs(distro)
ntpclient = distro_cfg[client]
confpath = (
os.path.join(self.new_root, ntpclient.get('confpath')[1:]))
template = ntpclient.get('template_name')
# find sourcetree template file
root_dir = (
dirname(dirname(os.path.realpath(util.__file__))) +
'/templates')
source_fn = self._get_template_path(template, distro,
basepath=root_dir)
template_fn = self._get_template_path(template, distro)
# don't fail if cloud-init doesn't have a template for
# a distro,client pair
if not os.path.exists(source_fn):
continue
# Create a copy in our tmp_dir
shutil.copy(source_fn, template_fn)
cc_ntp.write_ntp_config_template(distro, servers=servers,
pools=pools, path=confpath,
template_fn=template_fn)
content = util.load_file(confpath)
if client in ['ntp', 'chrony']:
content_lines = content.splitlines()
expected_servers = self._get_expected_servers(servers,
distro,
client)
print('distro=%s client=%s' % (distro, client))
for sline in expected_servers:
self.assertIn(sline, content_lines,
('failed to render {0} conf'
' for distro:{1}'.format(client,
distro)))
expected_pools = self._get_expected_pools(pools, distro,
client)
if expected_pools != []:
for pline in expected_pools:
self.assertIn(pline, content_lines,
('failed to render {0} conf'
' for distro:{1}'.format(client,
distro)))
elif client == 'systemd-timesyncd':
expected_servers = self._get_expected_servers(servers,
distro,
client)
expected_pools = self._get_expected_pools(pools,
distro,
client)
expected_content = (
"# cloud-init generated file\n" +
"# See timesyncd.conf(5) for details.\n\n" +
"[Time]\nNTP=%s %s \n" % (expected_servers,
expected_pools))
self.assertEqual(expected_content, content)
def test_no_ntpcfg_does_nothing(self):
"""When no ntp section is defined handler logs a warning and noops."""
cc_ntp.handle('cc_ntp', {}, None, None, [])
self.assertEqual(
'DEBUG: Skipping module named cc_ntp, '
'not present or disabled by cfg\n',
self.logs.getvalue())
@mock.patch('cloudinit.config.cc_ntp.select_ntp_client')
def test_ntp_handler_schema_validation_allows_empty_ntp_config(self,
m_select):
"""Ntp schema validation allows for an empty ntp: configuration."""
valid_empty_configs = [{'ntp': {}}, {'ntp': None}]
for valid_empty_config in valid_empty_configs:
for distro in cc_ntp.distros:
mycloud = self._get_cloud(distro)
ntpconfig = self._mock_ntp_client_config(distro=distro)
confpath = ntpconfig['confpath']
m_select.return_value = ntpconfig
cc_ntp.handle('cc_ntp', valid_empty_config, mycloud, None, [])
if distro == 'alpine':
# _mock_ntp_client_config call above did not specify a
# client value and so it defaults to "ntp" which on
# Alpine Linux only supports servers and not pools.
servers = cc_ntp.generate_server_names(mycloud.distro.name)
self.assertEqual(
"servers {0}\npools []\n".format(servers),
util.load_file(confpath))
else:
pools = cc_ntp.generate_server_names(mycloud.distro.name)
self.assertEqual(
"servers []\npools {0}\n".format(pools),
util.load_file(confpath))
self.assertNotIn('Invalid config:', self.logs.getvalue())
@skipUnlessJsonSchema()
@mock.patch('cloudinit.config.cc_ntp.select_ntp_client')
def test_ntp_handler_schema_validation_warns_non_string_item_type(self,
m_sel):
"""Ntp schema validation warns of non-strings in pools or servers.
Schema validation is not strict, so ntp config is still be rendered.
"""
invalid_config = {'ntp': {'pools': [123], 'servers': ['valid', None]}}
for distro in cc_ntp.distros:
mycloud = self._get_cloud(distro)
ntpconfig = self._mock_ntp_client_config(distro=distro)
confpath = ntpconfig['confpath']
m_sel.return_value = ntpconfig
cc_ntp.handle('cc_ntp', invalid_config, mycloud, None, [])
self.assertIn(
"Invalid config:\nntp.pools.0: 123 is not of type 'string'\n"
"ntp.servers.1: None is not of type 'string'",
self.logs.getvalue())
self.assertEqual("servers ['valid', None]\npools [123]\n",
util.load_file(confpath))
@skipUnlessJsonSchema()
@mock.patch('cloudinit.config.cc_ntp.select_ntp_client')
def test_ntp_handler_schema_validation_warns_of_non_array_type(self,
m_select):
"""Ntp schema validation warns of non-array pools or servers types.
Schema validation is not strict, so ntp config is still be rendered.
"""
invalid_config = {'ntp': {'pools': 123, 'servers': 'non-array'}}
for distro in cc_ntp.distros:
mycloud = self._get_cloud(distro)
ntpconfig = self._mock_ntp_client_config(distro=distro)
confpath = ntpconfig['confpath']
m_select.return_value = ntpconfig
cc_ntp.handle('cc_ntp', invalid_config, mycloud, None, [])
self.assertIn(
"Invalid config:\nntp.pools: 123 is not of type 'array'\n"
"ntp.servers: 'non-array' is not of type 'array'",
self.logs.getvalue())
self.assertEqual("servers non-array\npools 123\n",
util.load_file(confpath))
@skipUnlessJsonSchema()
@mock.patch('cloudinit.config.cc_ntp.select_ntp_client')
def test_ntp_handler_schema_validation_warns_invalid_key_present(self,
m_select):
"""Ntp schema validation warns of invalid keys present in ntp config.
Schema validation is not strict, so ntp config is still be rendered.
"""
invalid_config = {
'ntp': {'invalidkey': 1, 'pools': ['0.mycompany.pool.ntp.org']}}
for distro in cc_ntp.distros:
if distro != 'alpine':
mycloud = self._get_cloud(distro)
ntpconfig = self._mock_ntp_client_config(distro=distro)
confpath = ntpconfig['confpath']
m_select.return_value = ntpconfig
cc_ntp.handle('cc_ntp', invalid_config, mycloud, None, [])
self.assertIn(
"Invalid config:\nntp: Additional properties are not "
"allowed ('invalidkey' was unexpected)",
self.logs.getvalue())
self.assertEqual(
"servers []\npools ['0.mycompany.pool.ntp.org']\n",
util.load_file(confpath))
@skipUnlessJsonSchema()
@mock.patch('cloudinit.config.cc_ntp.select_ntp_client')
def test_ntp_handler_schema_validation_warns_of_duplicates(self, m_select):
"""Ntp schema validation warns of duplicates in servers or pools.
Schema validation is not strict, so ntp config is still be rendered.
"""
invalid_config = {
'ntp': {'pools': ['0.mypool.org', '0.mypool.org'],
'servers': ['10.0.0.1', '10.0.0.1']}}
for distro in cc_ntp.distros:
mycloud = self._get_cloud(distro)
ntpconfig = self._mock_ntp_client_config(distro=distro)
confpath = ntpconfig['confpath']
m_select.return_value = ntpconfig
cc_ntp.handle('cc_ntp', invalid_config, mycloud, None, [])
self.assertIn(
"Invalid config:\nntp.pools: ['0.mypool.org', '0.mypool.org']"
" has non-unique elements\nntp.servers: "
"['10.0.0.1', '10.0.0.1'] has non-unique elements",
self.logs.getvalue())
self.assertEqual(
"servers ['10.0.0.1', '10.0.0.1']\n"
"pools ['0.mypool.org', '0.mypool.org']\n",
util.load_file(confpath))
@mock.patch('cloudinit.config.cc_ntp.select_ntp_client')
def test_ntp_handler_timesyncd(self, m_select):
"""Test ntp handler configures timesyncd"""
servers = ['192.168.2.1', '192.168.2.2']
pools = ['0.mypool.org']
cfg = {'ntp': {'servers': servers, 'pools': pools}}
client = 'systemd-timesyncd'
for distro in cc_ntp.distros:
mycloud = self._get_cloud(distro)
ntpconfig = self._mock_ntp_client_config(distro=distro,
client=client)
confpath = ntpconfig['confpath']
m_select.return_value = ntpconfig
cc_ntp.handle('cc_ntp', cfg, mycloud, None, [])
self.assertEqual(
"[Time]\nNTP=192.168.2.1 192.168.2.2 0.mypool.org \n",
util.load_file(confpath))
@mock.patch('cloudinit.config.cc_ntp.select_ntp_client')
def test_ntp_handler_enabled_false(self, m_select):
"""Test ntp handler does not run if enabled: false """
cfg = {'ntp': {'enabled': False}}
for distro in cc_ntp.distros:
mycloud = self._get_cloud(distro)
cc_ntp.handle('notimportant', cfg, mycloud, None, None)
self.assertEqual(0, m_select.call_count)
@mock.patch("cloudinit.config.cc_ntp.subp")
@mock.patch('cloudinit.config.cc_ntp.select_ntp_client')
@mock.patch("cloudinit.distros.Distro.uses_systemd")
def test_ntp_the_whole_package(self, m_sysd, m_select, m_subp):
"""Test enabled config renders template, and restarts service """
cfg = {'ntp': {'enabled': True}}
for distro in cc_ntp.distros:
mycloud = self._get_cloud(distro)
ntpconfig = self._mock_ntp_client_config(distro=distro)
confpath = ntpconfig['confpath']
service_name = ntpconfig['service_name']
m_select.return_value = ntpconfig
hosts = cc_ntp.generate_server_names(mycloud.distro.name)
uses_systemd = True
expected_service_call = ['systemctl', 'reload-or-restart',
service_name]
expected_content = "servers []\npools {0}\n".format(hosts)
if distro == 'alpine':
uses_systemd = False
expected_service_call = ['service', service_name, 'restart']
# _mock_ntp_client_config call above did not specify a client
# value and so it defaults to "ntp" which on Alpine Linux only
# supports servers and not pools.
expected_content = "servers {0}\npools []\n".format(hosts)
m_sysd.return_value = uses_systemd
with mock.patch('cloudinit.config.cc_ntp.util') as m_util:
# allow use of util.mergemanydict
m_util.mergemanydict.side_effect = util.mergemanydict
# default client is present
m_subp.which.return_value = True
# use the config 'enabled' value
m_util.is_false.return_value = util.is_false(
cfg['ntp']['enabled'])
cc_ntp.handle('notimportant', cfg, mycloud, None, None)
m_subp.subp.assert_called_with(
expected_service_call, capture=True)
self.assertEqual(expected_content, util.load_file(confpath))
def test_opensuse_picks_chrony(self):
"""Test opensuse picks chrony or ntp on certain distro versions"""
# < 15.0 => ntp
self.m_sysinfo.return_value = {'dist':
('openSUSE', '13.2', 'Harlequin')}
mycloud = self._get_cloud('opensuse')
expected_client = mycloud.distro.preferred_ntp_clients[0]
self.assertEqual('ntp', expected_client)
# >= 15.0 and not openSUSE => chrony
self.m_sysinfo.return_value = {'dist':
('SLES', '15.0',
'SUSE Linux Enterprise Server 15')}
mycloud = self._get_cloud('sles')
expected_client = mycloud.distro.preferred_ntp_clients[0]
self.assertEqual('chrony', expected_client)
# >= 15.0 and openSUSE and ver != 42 => chrony
self.m_sysinfo.return_value = {'dist': ('openSUSE Tumbleweed',
'20180326',
'timbleweed')}
mycloud = self._get_cloud('opensuse')
expected_client = mycloud.distro.preferred_ntp_clients[0]
self.assertEqual('chrony', expected_client)
def test_ubuntu_xenial_picks_ntp(self):
"""Test Ubuntu picks ntp on xenial release"""
self.m_sysinfo.return_value = {'dist': ('Ubuntu', '16.04', 'xenial')}
mycloud = self._get_cloud('ubuntu')
expected_client = mycloud.distro.preferred_ntp_clients[0]
self.assertEqual('ntp', expected_client)
@mock.patch('cloudinit.config.cc_ntp.subp.which')
def test_snappy_system_picks_timesyncd(self, m_which):
"""Test snappy systems prefer installed clients"""
# we are on ubuntu-core here
self.m_snappy.return_value = True
# ubuntu core systems will have timesyncd installed
m_which.side_effect = iter([None, '/lib/systemd/systemd-timesyncd',
None, None, None])
distro = 'ubuntu'
mycloud = self._get_cloud(distro)
distro_configs = cc_ntp.distro_ntp_client_configs(distro)
expected_client = 'systemd-timesyncd'
expected_cfg = distro_configs[expected_client]
expected_calls = []
# we only get to timesyncd
for client in mycloud.distro.preferred_ntp_clients[0:2]:
cfg = distro_configs[client]
expected_calls.append(mock.call(cfg['check_exe']))
result = cc_ntp.select_ntp_client(None, mycloud.distro)
m_which.assert_has_calls(expected_calls)
self.assertEqual(sorted(expected_cfg), sorted(cfg))
self.assertEqual(sorted(expected_cfg), sorted(result))
@mock.patch('cloudinit.config.cc_ntp.subp.which')
def test_ntp_distro_searches_all_preferred_clients(self, m_which):
"""Test select_ntp_client search all distro perferred clients """
# nothing is installed
m_which.return_value = None
for distro in cc_ntp.distros:
mycloud = self._get_cloud(distro)
distro_configs = cc_ntp.distro_ntp_client_configs(distro)
expected_client = mycloud.distro.preferred_ntp_clients[0]
expected_cfg = distro_configs[expected_client]
expected_calls = []
for client in mycloud.distro.preferred_ntp_clients:
cfg = distro_configs[client]
expected_calls.append(mock.call(cfg['check_exe']))
cc_ntp.select_ntp_client({}, mycloud.distro)
m_which.assert_has_calls(expected_calls)
self.assertEqual(sorted(expected_cfg), sorted(cfg))
@mock.patch('cloudinit.config.cc_ntp.subp.which')
def test_user_cfg_ntp_client_auto_uses_distro_clients(self, m_which):
"""Test user_cfg.ntp_client='auto' defaults to distro search"""
# nothing is installed
m_which.return_value = None
for distro in cc_ntp.distros:
mycloud = self._get_cloud(distro)
distro_configs = cc_ntp.distro_ntp_client_configs(distro)
expected_client = mycloud.distro.preferred_ntp_clients[0]
expected_cfg = distro_configs[expected_client]
expected_calls = []
for client in mycloud.distro.preferred_ntp_clients:
cfg = distro_configs[client]
expected_calls.append(mock.call(cfg['check_exe']))
cc_ntp.select_ntp_client('auto', mycloud.distro)
m_which.assert_has_calls(expected_calls)
self.assertEqual(sorted(expected_cfg), sorted(cfg))
@mock.patch('cloudinit.config.cc_ntp.write_ntp_config_template')
@mock.patch('cloudinit.cloud.Cloud.get_template_filename')
@mock.patch('cloudinit.config.cc_ntp.subp.which')
def test_ntp_custom_client_overrides_installed_clients(self, m_which,
m_tmpfn, m_write):
"""Test user client is installed despite other clients present """
client = 'ntpdate'
cfg = {'ntp': {'ntp_client': client}}
for distro in cc_ntp.distros:
# client is not installed
m_which.side_effect = iter([None])
mycloud = self._get_cloud(distro)
with mock.patch.object(mycloud.distro,
'install_packages') as m_install:
cc_ntp.handle('notimportant', cfg, mycloud, None, None)
m_install.assert_called_with([client])
m_which.assert_called_with(client)
@mock.patch('cloudinit.config.cc_ntp.subp.which')
def test_ntp_system_config_overrides_distro_builtin_clients(self, m_which):
"""Test distro system_config overrides builtin preferred ntp clients"""
system_client = 'chrony'
sys_cfg = {'ntp_client': system_client}
# no clients installed
m_which.return_value = None
for distro in cc_ntp.distros:
mycloud = self._get_cloud(distro, sys_cfg=sys_cfg)
distro_configs = cc_ntp.distro_ntp_client_configs(distro)
expected_cfg = distro_configs[system_client]
result = cc_ntp.select_ntp_client(None, mycloud.distro)
self.assertEqual(sorted(expected_cfg), sorted(result))
m_which.assert_has_calls([])
@mock.patch('cloudinit.config.cc_ntp.subp.which')
def test_ntp_user_config_overrides_system_cfg(self, m_which):
"""Test user-data overrides system_config ntp_client"""
system_client = 'chrony'
sys_cfg = {'ntp_client': system_client}
user_client = 'systemd-timesyncd'
# no clients installed
m_which.return_value = None
for distro in cc_ntp.distros:
mycloud = self._get_cloud(distro, sys_cfg=sys_cfg)
distro_configs = cc_ntp.distro_ntp_client_configs(distro)
expected_cfg = distro_configs[user_client]
result = cc_ntp.select_ntp_client(user_client, mycloud.distro)
self.assertEqual(sorted(expected_cfg), sorted(result))
m_which.assert_has_calls([])
@mock.patch('cloudinit.config.cc_ntp.reload_ntp')
@mock.patch('cloudinit.config.cc_ntp.install_ntp_client')
def test_ntp_user_provided_config_with_template(self, m_install, m_reload):
custom = r'\n#MyCustomTemplate'
user_template = NTP_TEMPLATE + custom
confpath = os.path.join(self.new_root, 'etc/myntp/myntp.conf')
cfg = {
'ntp': {
'pools': ['mypool.org'],
'ntp_client': 'myntpd',
'config': {
'check_exe': 'myntpd',
'confpath': confpath,
'packages': ['myntp'],
'service_name': 'myntp',
'template': user_template,
}
}
}
for distro in cc_ntp.distros:
mycloud = self._get_cloud(distro)
mock_path = 'cloudinit.config.cc_ntp.temp_utils._TMPDIR'
with mock.patch(mock_path, self.new_root):
cc_ntp.handle('notimportant', cfg, mycloud, None, None)
self.assertEqual(
"servers []\npools ['mypool.org']\n%s" % custom,
util.load_file(confpath))
@mock.patch('cloudinit.config.cc_ntp.supplemental_schema_validation')
@mock.patch('cloudinit.config.cc_ntp.reload_ntp')
@mock.patch('cloudinit.config.cc_ntp.install_ntp_client')
@mock.patch('cloudinit.config.cc_ntp.select_ntp_client')
def test_ntp_user_provided_config_template_only(self, m_select, m_install,
m_reload, m_schema):
"""Test custom template for default client"""
custom = r'\n#MyCustomTemplate'
user_template = NTP_TEMPLATE + custom
client = 'chrony'
cfg = {
'pools': ['mypool.org'],
'ntp_client': client,
'config': {
'template': user_template,
}
}
expected_merged_cfg = {
'check_exe': 'chronyd',
'confpath': '{tmpdir}/client.conf'.format(tmpdir=self.new_root),
'template_name': 'client.conf', 'template': user_template,
'service_name': 'chrony', 'packages': ['chrony']}
for distro in cc_ntp.distros:
mycloud = self._get_cloud(distro)
ntpconfig = self._mock_ntp_client_config(client=client,
distro=distro)
confpath = ntpconfig['confpath']
m_select.return_value = ntpconfig
mock_path = 'cloudinit.config.cc_ntp.temp_utils._TMPDIR'
with mock.patch(mock_path, self.new_root):
cc_ntp.handle('notimportant',
{'ntp': cfg}, mycloud, None, None)
self.assertEqual(
"servers []\npools ['mypool.org']\n%s" % custom,
util.load_file(confpath))
m_schema.assert_called_with(expected_merged_cfg)
class TestSupplementalSchemaValidation(CiTestCase):
def test_error_on_missing_keys(self):
"""ValueError raised reporting any missing required ntp:config keys"""
cfg = {}
match = (r'Invalid ntp configuration:\\nMissing required ntp:config'
' keys: check_exe, confpath, packages, service_name')
with self.assertRaisesRegex(ValueError, match):
cc_ntp.supplemental_schema_validation(cfg)
def test_error_requiring_either_template_or_template_name(self):
"""ValueError raised if both template not template_name are None."""
cfg = {'confpath': 'someconf', 'check_exe': '', 'service_name': '',
'template': None, 'template_name': None, 'packages': []}
match = (r'Invalid ntp configuration:\\nEither ntp:config:template'
' or ntp:config:template_name values are required')
with self.assertRaisesRegex(ValueError, match):
cc_ntp.supplemental_schema_validation(cfg)
def test_error_on_non_list_values(self):
"""ValueError raised when packages is not of type list."""
cfg = {'confpath': 'someconf', 'check_exe': '', 'service_name': '',
'template': 'asdf', 'template_name': None, 'packages': 'NOPE'}
match = (r'Invalid ntp configuration:\\nExpected a list of required'
' package names for ntp:config:packages. Found \\(NOPE\\)')
with self.assertRaisesRegex(ValueError, match):
cc_ntp.supplemental_schema_validation(cfg)
def test_error_on_non_string_values(self):
"""ValueError raised for any values expected as string type."""
cfg = {'confpath': 1, 'check_exe': 2, 'service_name': 3,
'template': 4, 'template_name': 5, 'packages': []}
errors = [
'Expected a config file path ntp:config:confpath. Found (1)',
'Expected a string type for ntp:config:check_exe. Found (2)',
'Expected a string type for ntp:config:service_name. Found (3)',
'Expected a string type for ntp:config:template. Found (4)',
'Expected a string type for ntp:config:template_name. Found (5)']
with self.assertRaises(ValueError) as context_mgr:
cc_ntp.supplemental_schema_validation(cfg)
error_msg = str(context_mgr.exception)
for error in errors:
self.assertIn(error, error_msg)
# vi: ts=4 expandtab