Blob Blame History Raw
# -*- coding: utf-8 -*-

# Copyright (C) 2012-2018 Red Hat, Inc.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# the GNU General Public License v.2, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY expressed or implied, including the implied warranties of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.  You should have received a copy of the
# GNU General Public License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
# source code or documentation are not subject to the GNU General Public
# License and may only be used or replicated with the express permission of
# Red Hat, Inc.

from __future__ import absolute_import
from __future__ import unicode_literals

import dnf.callback
import dnf.cli.progress
import dnf.pycomp

import tests.support
from tests.support import mock


class MockStdout(dnf.pycomp.StringIO):
    def visible_lines(self):
        lines = self.lines()
        last = len(lines) - 1
        return [l[:-1] for (i, l) in enumerate(lines)
                if l.endswith('\n') or i == last]

    def lines(self):
        return self.getvalue().splitlines(True)


class FakePayload(object):
    def __init__(self, string, size):
        self.string = string
        self._size = size

    def __str__(self):
        return self.string

    @property
    def download_size(self):
        return self._size


class ProgressTest(tests.support.TestCase):
    def test_single(self):
        now = 1379406823.9
        fo = MockStdout()
        with mock.patch('dnf.cli.progress._term_width', return_value=60), \
                mock.patch('dnf.cli.progress.time', lambda: now):

            p = dnf.cli.progress.MultiFileProgressMeter(fo)
            p.isatty = True
            pload = FakePayload('dummy-text', 5)
            p.start(1, 1)
            for i in range(6):
                now += 1.0
                p.progress(pload, i)
            p.end(pload, None, None)
        self.assertEqual(fo.lines(), [
            'dummy-text  0% [          ] ---  B/s |   0  B     --:-- ETA\r',
            'dummy-text 20% [==        ] 1.0  B/s |   1  B     00:04 ETA\r',
            'dummy-text 40% [====      ] 1.0  B/s |   2  B     00:03 ETA\r',
            'dummy-text 60% [======    ] 1.0  B/s |   3  B     00:02 ETA\r',
            'dummy-text 80% [========  ] 1.0  B/s |   4  B     00:01 ETA\r',
            'dummy-text100% [==========] 1.0  B/s |   5  B     00:00 ETA\r',
            'dummy-text                  1.0  B/s |   5  B     00:05    \n'])

    def test_mirror(self):
        fo = MockStdout()
        p = dnf.cli.progress.MultiFileProgressMeter(fo, update_period=-1)
        p.isatty = True
        p.start(1, 5)
        pload = FakePayload('foo', 5.0)
        now = 1379406823.9

        with mock.patch('dnf.cli.progress._term_width', return_value=60), \
                mock.patch('dnf.cli.progress.time', lambda: now):
            p.progress(pload, 3)
            p.end(pload, dnf.callback.STATUS_MIRROR, 'Timeout.')
            p.progress(pload, 4)
        self.assertEqual(fo.visible_lines(), [
            '[MIRROR] foo: Timeout.                                     ',
            'foo        80% [========  ] ---  B/s |   4  B     --:-- ETA'])

    _REFERENCE_TAB = [
        ['(1-2/2): f  0% [          ] ---  B/s |   0  B     --:-- ETA'],
        ['(1-2/2): b 10% [=         ] 2.2  B/s |   3  B     00:12 ETA'],
        ['(1-2/2): f 20% [==        ] 2.4  B/s |   6  B     00:10 ETA'],
        ['(1-2/2): b 30% [===       ] 2.5  B/s |   9  B     00:08 ETA'],
        ['(1-2/2): f 40% [====      ] 2.6  B/s |  12  B     00:06 ETA'],
        ['(1-2/2): b 50% [=====     ] 2.7  B/s |  15  B     00:05 ETA'],
        ['(1-2/2): f 60% [======    ] 2.8  B/s |  18  B     00:04 ETA'],
        ['(1-2/2): b 70% [=======   ] 2.8  B/s |  21  B     00:03 ETA'],
        ['(1-2/2): f 80% [========  ] 2.9  B/s |  24  B     00:02 ETA'],
        ['(1-2/2): b 90% [========= ] 2.9  B/s |  27  B     00:01 ETA'],
        ['(1/2): foo                  1.0  B/s |  10  B     00:10    ',
         '(2/2): bar100% [==========] 2.9  B/s |  30  B     00:00 ETA']]

    def test_multi(self):
        now = 1379406823.9
        fo = MockStdout()
        with mock.patch('dnf.cli.progress._term_width', return_value=60), \
                mock.patch('dnf.cli.progress.time', lambda: now):

            p = dnf.cli.progress.MultiFileProgressMeter(fo)
            p.isatty = True
            p.start(2, 30)
            pload1 = FakePayload('foo', 10.0)
            pload2 = FakePayload('bar', 20.0)
            for i in range(11):
                p.progress(pload1, float(i))
                if i == 10:
                    p.end(pload1, None, None)
                now += 0.5

                p.progress(pload2, float(i * 2))
                self.assertEqual(self._REFERENCE_TAB[i], fo.visible_lines())
                if i == 10:
                    p.end(pload2, dnf.callback.STATUS_FAILED, 'some error')
                now += 0.5

        # check "end" events
        self.assertEqual(fo.visible_lines(), [
            '(1/2): foo                  1.0  B/s |  10  B     00:10    ',
            '[FAILED] bar: some error                                   '])
        self.assertTrue(2.0 < p.rate < 4.0)

    @mock.patch('dnf.cli.progress._term_width', return_value=40)
    def test_skip(self, mock_term_width):
        fo = MockStdout()
        p = dnf.cli.progress.MultiFileProgressMeter(fo)
        p.start(2, 30)
        pload1 = FakePayload('club', 20.0)
        p.end(pload1, dnf.callback.STATUS_ALREADY_EXISTS, 'already got')
        self.assertEqual(p.done_files, 1)
        self.assertEqual(p.done_size, pload1._size)
        self.assertEqual(fo.getvalue(),
                         '[SKIPPED] club: already got            \n')