Blame tests/output_checker.py

Packit Service 8ebd8e
#! /usr/bin/env python3
Packit Service 8ebd8e
# Copyright © 2020, RedHat Inc.
Packit Service 8ebd8e
#
Packit Service 8ebd8e
# This program is free software; you can redistribute it and/or
Packit Service 8ebd8e
# modify it under the terms of the GNU Lesser General Public
Packit Service 8ebd8e
# License as published by the Free Software Foundation; either
Packit Service 8ebd8e
# version 2.1 of the License, or (at your option) any later version.
Packit Service 8ebd8e
#
Packit Service 8ebd8e
# This program is distributed in the hope that it will be useful,
Packit Service 8ebd8e
# but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit Service 8ebd8e
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
Packit Service 8ebd8e
# Lesser General Public License for more details.
Packit Service 8ebd8e
#
Packit Service 8ebd8e
# You should have received a copy of the GNU Lesser General Public
Packit Service 8ebd8e
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
Packit Service 8ebd8e
# Authors:
Packit Service 8ebd8e
#       Benjamin Berg <bberg@redhat.com>
Packit Service 8ebd8e
Packit Service 8ebd8e
import os
Packit Service 8ebd8e
import sys
Packit Service 8ebd8e
import fcntl
Packit Service 8ebd8e
import io
Packit Service 8ebd8e
import re
Packit Service 8ebd8e
import time
Packit Service 8ebd8e
import threading
Packit Service 8ebd8e
Packit Service 8ebd8e
class OutputChecker(object):
Packit Service 8ebd8e
Packit Service 8ebd8e
    def __init__(self, out=sys.stdout):
Packit Service 8ebd8e
        self._output = out
Packit Service 8ebd8e
        self._pipe_fd_r, self._pipe_fd_w = os.pipe()
Packit Service 8ebd8e
        self._partial_buf = b''
Packit Service 8ebd8e
        self._lines_sem = threading.Semaphore()
Packit Service 8ebd8e
        self._lines = []
Packit Service 8ebd8e
        self._reader_io = io.StringIO()
Packit Service 8ebd8e
Packit Service 8ebd8e
        # Just to be sure, shouldn't be a problem even if we didn't set it
Packit Service 8ebd8e
        fcntl.fcntl(self._pipe_fd_r, fcntl.F_SETFL,
Packit Service 8ebd8e
                    fcntl.fcntl(self._pipe_fd_r, fcntl.F_GETFL) | os.O_CLOEXEC)
Packit Service 8ebd8e
        fcntl.fcntl(self._pipe_fd_w, fcntl.F_SETFL,
Packit Service 8ebd8e
                    fcntl.fcntl(self._pipe_fd_w, fcntl.F_GETFL) | os.O_CLOEXEC)
Packit Service 8ebd8e
Packit Service 8ebd8e
        # Start copier thread
Packit Service 8ebd8e
        self._thread = threading.Thread(target=self._copy)
Packit Service 8ebd8e
        self._thread.start()
Packit Service 8ebd8e
Packit Service 8ebd8e
    def _copy(self):
Packit Service 8ebd8e
        while True:
Packit Service 8ebd8e
            r = os.read(self._pipe_fd_r, 1024)
Packit Service 8ebd8e
            if not r:
Packit Service 8ebd8e
                return
Packit Service 8ebd8e
Packit Service 8ebd8e
            l = r.split(b'\n')
Packit Service 8ebd8e
            l[0] = self._partial_buf + l[0]
Packit Service 8ebd8e
            self._lines.extend(l[:-1])
Packit Service 8ebd8e
            self._partial_buf = l[-1]
Packit Service 8ebd8e
Packit Service 8ebd8e
            self._lines_sem.release()
Packit Service 8ebd8e
Packit Service 8ebd8e
            os.write(self._output.fileno(), r)
Packit Service 8ebd8e
Packit Service 8ebd8e
    def check_line_re(self, needle_re, timeout=0, failmsg=None):
Packit Service 8ebd8e
        deadline = time.time() + timeout
Packit Service 8ebd8e
Packit Service 8ebd8e
        if isinstance(needle_re, str):
Packit Service 8ebd8e
            needle_re = needle_re.encode('ascii')
Packit Service 8ebd8e
Packit Service 8ebd8e
        r = re.compile(needle_re)
Packit Service 8ebd8e
        ret = []
Packit Service 8ebd8e
Packit Service 8ebd8e
        while True:
Packit Service 8ebd8e
            try:
Packit Service 8ebd8e
                l = self._lines.pop(0)
Packit Service 8ebd8e
            except IndexError:
Packit Service 8ebd8e
                # Check if should wake up
Packit Service 8ebd8e
                if not self._lines_sem.acquire(timeout = deadline - time.time()):
Packit Service 8ebd8e
                    if failmsg:
Packit Service 8ebd8e
                        raise AssertionError(failmsg)
Packit Service 8ebd8e
                    else:
Packit Service 8ebd8e
                        raise AssertionError('Timed out waiting for needle %s (timeout: %0.2f)' % (str(needle_re), timeout))
Packit Service 8ebd8e
                continue
Packit Service 8ebd8e
Packit Service 8ebd8e
            ret.append(l)
Packit Service 8ebd8e
            if r.search(l):
Packit Service 8ebd8e
                return ret
Packit Service 8ebd8e
Packit Service 8ebd8e
    def check_line(self, needle, timeout=0, failmsg=None):
Packit Service 8ebd8e
        if isinstance(needle, str):
Packit Service 8ebd8e
            needle = needle.encode('ascii')
Packit Service 8ebd8e
Packit Service 8ebd8e
        needle_re = re.escape(needle)
Packit Service 8ebd8e
Packit Service 8ebd8e
        return self.check_line_re(needle_re, timeout=timeout, failmsg=failmsg)
Packit Service 8ebd8e
Packit Service 8ebd8e
    def check_no_line_re(self, needle_re, wait=0, failmsg=None):
Packit Service 8ebd8e
        deadline = time.time() + wait
Packit Service 8ebd8e
Packit Service 8ebd8e
        if isinstance(needle_re, str):
Packit Service 8ebd8e
            needle_re = needle_re.encode('ascii')
Packit Service 8ebd8e
Packit Service 8ebd8e
        r = re.compile(needle_re)
Packit Service 8ebd8e
        ret = []
Packit Service 8ebd8e
Packit Service 8ebd8e
        while True:
Packit Service 8ebd8e
            try:
Packit Service 8ebd8e
                l = self._lines.pop(0)
Packit Service 8ebd8e
            except IndexError:
Packit Service 8ebd8e
                # Check if should wake up
Packit Service 8ebd8e
                if not self._lines_sem.acquire(timeout = deadline - time.time()):
Packit Service 8ebd8e
                    # Timed out, so everything is good
Packit Service 8ebd8e
                    break
Packit Service 8ebd8e
                continue
Packit Service 8ebd8e
Packit Service 8ebd8e
            ret.append(l)
Packit Service 8ebd8e
            if r.search(l):
Packit Service 8ebd8e
                if failmsg:
Packit Service 8ebd8e
                    raise AssertionError(failmsg)
Packit Service 8ebd8e
                else:
Packit Service 8ebd8e
                    raise AssertionError('Found needle %s but shouldn\'t have been there (timeout: %0.2f)' % (str(needle_re), timeout))
Packit Service 8ebd8e
Packit Service 8ebd8e
        return ret
Packit Service 8ebd8e
Packit Service 8ebd8e
    def check_no_line(self, needle, wait=0, failmsg=None):
Packit Service 8ebd8e
        if isinstance(needle, str):
Packit Service 8ebd8e
            needle = needle.encode('ascii')
Packit Service 8ebd8e
Packit Service 8ebd8e
        needle_re = re.escape(needle)
Packit Service 8ebd8e
Packit Service 8ebd8e
        return self.check_no_line_re(needle_re, wait=wait, failmsg=failmsg)
Packit Service 8ebd8e
Packit Service 8ebd8e
    def clear(self):
Packit Service 8ebd8e
        ret = self._lines
Packit Service 8ebd8e
        self._lines = []
Packit Service 8ebd8e
        return ret
Packit Service 8ebd8e
Packit Service 8ebd8e
    def assert_closed(self, timeout=1):
Packit Service 8ebd8e
        self._thread.join(timeout)
Packit Service 8ebd8e
        if self._thread.is_alive() != False:
Packit Service 8ebd8e
            raise AssertionError("OutputCheck: Write side has not been closed yet!")
Packit Service 8ebd8e
Packit Service 8ebd8e
    @property
Packit Service 8ebd8e
    def fd(self):
Packit Service 8ebd8e
        return self._pipe_fd_w
Packit Service 8ebd8e
Packit Service 8ebd8e
    def writer_attached(self):
Packit Service 8ebd8e
        os.close(self._pipe_fd_w)
Packit Service 8ebd8e
        self._pipe_fd_w = -1
Packit Service 8ebd8e