# This file is part of cloud-init. See LICENSE file for license information.
from datetime import datetime
from textwrap import dedent
from cloudinit.analyze.dump import (
dump_events, parse_ci_logline, parse_timestamp)
from cloudinit.util import write_file
from cloudinit.subp import which
from cloudinit.tests.helpers import CiTestCase, mock, skipIf
class TestParseTimestamp(CiTestCase):
def test_parse_timestamp_handles_cloud_init_default_format(self):
"""Logs with cloud-init detailed formats will be properly parsed."""
trusty_fmt = '%Y-%m-%d %H:%M:%S,%f'
trusty_stamp = '2016-09-12 14:39:20,839'
dt = datetime.strptime(trusty_stamp, trusty_fmt)
self.assertEqual(
float(dt.strftime('%s.%f')), parse_timestamp(trusty_stamp))
def test_parse_timestamp_handles_syslog_adding_year(self):
"""Syslog timestamps lack a year. Add year and properly parse."""
syslog_fmt = '%b %d %H:%M:%S %Y'
syslog_stamp = 'Aug 08 15:12:51'
# convert stamp ourselves by adding the missing year value
year = datetime.now().year
dt = datetime.strptime(syslog_stamp + " " + str(year), syslog_fmt)
self.assertEqual(
float(dt.strftime('%s.%f')),
parse_timestamp(syslog_stamp))
def test_parse_timestamp_handles_journalctl_format_adding_year(self):
"""Journalctl precise timestamps lack a year. Add year and parse."""
journal_fmt = '%b %d %H:%M:%S.%f %Y'
journal_stamp = 'Aug 08 17:15:50.606811'
# convert stamp ourselves by adding the missing year value
year = datetime.now().year
dt = datetime.strptime(journal_stamp + " " + str(year), journal_fmt)
self.assertEqual(
float(dt.strftime('%s.%f')), parse_timestamp(journal_stamp))
@skipIf(not which("date"), "'date' command not available.")
def test_parse_unexpected_timestamp_format_with_date_command(self):
"""Dump sends unexpected timestamp formats to date for processing."""
new_fmt = '%H:%M %m/%d %Y'
new_stamp = '17:15 08/08'
# convert stamp ourselves by adding the missing year value
year = datetime.now().year
dt = datetime.strptime(new_stamp + " " + str(year), new_fmt)
# use date(1)
with self.allow_subp(["date"]):
self.assertEqual(
float(dt.strftime('%s.%f')), parse_timestamp(new_stamp))
class TestParseCILogLine(CiTestCase):
def test_parse_logline_returns_none_without_separators(self):
"""When no separators are found, parse_ci_logline returns None."""
expected_parse_ignores = [
'', '-', 'adsf-asdf', '2017-05-22 18:02:01,088', 'CLOUDINIT']
for parse_ignores in expected_parse_ignores:
self.assertIsNone(parse_ci_logline(parse_ignores))
def test_parse_logline_returns_event_for_cloud_init_logs(self):
"""parse_ci_logline returns an event parse from cloud-init format."""
line = (
"2017-08-08 20:05:07,147 - util.py[DEBUG]: Cloud-init v. 0.7.9"
" running 'init-local' at Tue, 08 Aug 2017 20:05:07 +0000. Up"
" 6.26 seconds.")
dt = datetime.strptime(
'2017-08-08 20:05:07,147', '%Y-%m-%d %H:%M:%S,%f')
timestamp = float(dt.strftime('%s.%f'))
expected = {
'description': 'starting search for local datasources',
'event_type': 'start',
'name': 'init-local',
'origin': 'cloudinit',
'timestamp': timestamp}
self.assertEqual(expected, parse_ci_logline(line))
def test_parse_logline_returns_event_for_journalctl_logs(self):
"""parse_ci_logline returns an event parse from journalctl format."""
line = ("Nov 03 06:51:06.074410 x2 cloud-init[106]: [CLOUDINIT]"
" util.py[DEBUG]: Cloud-init v. 0.7.8 running 'init-local' at"
" Thu, 03 Nov 2016 06:51:06 +0000. Up 1.0 seconds.")
year = datetime.now().year
dt = datetime.strptime(
'Nov 03 06:51:06.074410 %d' % year, '%b %d %H:%M:%S.%f %Y')
timestamp = float(dt.strftime('%s.%f'))
expected = {
'description': 'starting search for local datasources',
'event_type': 'start',
'name': 'init-local',
'origin': 'cloudinit',
'timestamp': timestamp}
self.assertEqual(expected, parse_ci_logline(line))
@mock.patch("cloudinit.analyze.dump.parse_timestamp_from_date")
def test_parse_logline_returns_event_for_finish_events(self,
m_parse_from_date):
"""parse_ci_logline returns a finish event for a parsed log line."""
line = ('2016-08-30 21:53:25.972325+00:00 y1 [CLOUDINIT]'
' handlers.py[DEBUG]: finish: modules-final: SUCCESS: running'
' modules for final')
expected = {
'description': 'running modules for final',
'event_type': 'finish',
'name': 'modules-final',
'origin': 'cloudinit',
'result': 'SUCCESS',
'timestamp': 1472594005.972}
m_parse_from_date.return_value = "1472594005.972"
self.assertEqual(expected, parse_ci_logline(line))
m_parse_from_date.assert_has_calls(
[mock.call("2016-08-30 21:53:25.972325+00:00")])
def test_parse_logline_returns_event_for_amazon_linux_2_line(self):
line = (
"Apr 30 19:39:11 cloud-init[2673]: handlers.py[DEBUG]: start:"
" init-local/check-cache: attempting to read from cache [check]")
# Generate the expected value using `datetime`, so that TZ
# determination is consistent with the code under test.
timestamp_dt = datetime.strptime(
"Apr 30 19:39:11", "%b %d %H:%M:%S"
).replace(year=datetime.now().year)
expected = {
'description': 'attempting to read from cache [check]',
'event_type': 'start',
'name': 'init-local/check-cache',
'origin': 'cloudinit',
'timestamp': timestamp_dt.timestamp()}
self.assertEqual(expected, parse_ci_logline(line))
SAMPLE_LOGS = dedent("""\
Nov 03 06:51:06.074410 x2 cloud-init[106]: [CLOUDINIT] util.py[DEBUG]:\
Cloud-init v. 0.7.8 running 'init-local' at Thu, 03 Nov 2016\
06:51:06 +0000. Up 1.0 seconds.
2016-08-30 21:53:25.972325+00:00 y1 [CLOUDINIT] handlers.py[DEBUG]: finish:\
modules-final: SUCCESS: running modules for final
""")
class TestDumpEvents(CiTestCase):
maxDiff = None
@mock.patch("cloudinit.analyze.dump.parse_timestamp_from_date")
def test_dump_events_with_rawdata(self, m_parse_from_date):
"""Rawdata is split and parsed into a tuple of events and data"""
m_parse_from_date.return_value = "1472594005.972"
events, data = dump_events(rawdata=SAMPLE_LOGS)
expected_data = SAMPLE_LOGS.splitlines()
self.assertEqual(
[mock.call("2016-08-30 21:53:25.972325+00:00")],
m_parse_from_date.call_args_list)
self.assertEqual(expected_data, data)
year = datetime.now().year
dt1 = datetime.strptime(
'Nov 03 06:51:06.074410 %d' % year, '%b %d %H:%M:%S.%f %Y')
timestamp1 = float(dt1.strftime('%s.%f'))
expected_events = [{
'description': 'starting search for local datasources',
'event_type': 'start',
'name': 'init-local',
'origin': 'cloudinit',
'timestamp': timestamp1}, {
'description': 'running modules for final',
'event_type': 'finish',
'name': 'modules-final',
'origin': 'cloudinit',
'result': 'SUCCESS',
'timestamp': 1472594005.972}]
self.assertEqual(expected_events, events)
@mock.patch("cloudinit.analyze.dump.parse_timestamp_from_date")
def test_dump_events_with_cisource(self, m_parse_from_date):
"""Cisource file is read and parsed into a tuple of events and data."""
tmpfile = self.tmp_path('logfile')
write_file(tmpfile, SAMPLE_LOGS)
m_parse_from_date.return_value = 1472594005.972
events, data = dump_events(cisource=open(tmpfile))
year = datetime.now().year
dt1 = datetime.strptime(
'Nov 03 06:51:06.074410 %d' % year, '%b %d %H:%M:%S.%f %Y')
timestamp1 = float(dt1.strftime('%s.%f'))
expected_events = [{
'description': 'starting search for local datasources',
'event_type': 'start',
'name': 'init-local',
'origin': 'cloudinit',
'timestamp': timestamp1}, {
'description': 'running modules for final',
'event_type': 'finish',
'name': 'modules-final',
'origin': 'cloudinit',
'result': 'SUCCESS',
'timestamp': 1472594005.972}]
self.assertEqual(expected_events, events)
self.assertEqual(SAMPLE_LOGS.splitlines(), [d.strip() for d in data])
m_parse_from_date.assert_has_calls(
[mock.call("2016-08-30 21:53:25.972325+00:00")])