|
Packit Service |
8ebd8e |
#! /usr/bin/env python3
|
|
Packit Service |
8ebd8e |
# Copyright © 2020, RedHat Inc.
|
|
Packit Service |
8ebd8e |
#
|
|
Packit Service |
8ebd8e |
# This program is free software; you can redistribute it and/or
|
|
Packit Service |
8ebd8e |
# modify it under the terms of the GNU Lesser General Public
|
|
Packit Service |
8ebd8e |
# License as published by the Free Software Foundation; either
|
|
Packit Service |
8ebd8e |
# version 2.1 of the License, or (at your option) any later version.
|
|
Packit Service |
8ebd8e |
#
|
|
Packit Service |
8ebd8e |
# This program is distributed in the hope that it will be useful,
|
|
Packit Service |
8ebd8e |
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
Packit Service |
8ebd8e |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
Packit Service |
8ebd8e |
# Lesser General Public License for more details.
|
|
Packit Service |
8ebd8e |
#
|
|
Packit Service |
8ebd8e |
# You should have received a copy of the GNU Lesser General Public
|
|
Packit Service |
8ebd8e |
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
|
|
Packit Service |
8ebd8e |
# Authors:
|
|
Packit Service |
8ebd8e |
# Benjamin Berg <bberg@redhat.com>
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
import os
|
|
Packit Service |
8ebd8e |
import sys
|
|
Packit Service |
8ebd8e |
import fcntl
|
|
Packit Service |
8ebd8e |
import io
|
|
Packit Service |
8ebd8e |
import re
|
|
Packit Service |
8ebd8e |
import time
|
|
Packit Service |
8ebd8e |
import threading
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
class OutputChecker(object):
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
def __init__(self, out=sys.stdout):
|
|
Packit Service |
8ebd8e |
self._output = out
|
|
Packit Service |
8ebd8e |
self._pipe_fd_r, self._pipe_fd_w = os.pipe()
|
|
Packit Service |
8ebd8e |
self._partial_buf = b''
|
|
Packit Service |
8ebd8e |
self._lines_sem = threading.Semaphore()
|
|
Packit Service |
8ebd8e |
self._lines = []
|
|
Packit Service |
8ebd8e |
self._reader_io = io.StringIO()
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
# Just to be sure, shouldn't be a problem even if we didn't set it
|
|
Packit Service |
8ebd8e |
fcntl.fcntl(self._pipe_fd_r, fcntl.F_SETFL,
|
|
Packit Service |
8ebd8e |
fcntl.fcntl(self._pipe_fd_r, fcntl.F_GETFL) | os.O_CLOEXEC)
|
|
Packit Service |
8ebd8e |
fcntl.fcntl(self._pipe_fd_w, fcntl.F_SETFL,
|
|
Packit Service |
8ebd8e |
fcntl.fcntl(self._pipe_fd_w, fcntl.F_GETFL) | os.O_CLOEXEC)
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
# Start copier thread
|
|
Packit Service |
8ebd8e |
self._thread = threading.Thread(target=self._copy)
|
|
Packit Service |
8ebd8e |
self._thread.start()
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
def _copy(self):
|
|
Packit Service |
8ebd8e |
while True:
|
|
Packit Service |
8ebd8e |
r = os.read(self._pipe_fd_r, 1024)
|
|
Packit Service |
8ebd8e |
if not r:
|
|
Packit Service |
8ebd8e |
return
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
l = r.split(b'\n')
|
|
Packit Service |
8ebd8e |
l[0] = self._partial_buf + l[0]
|
|
Packit Service |
8ebd8e |
self._lines.extend(l[:-1])
|
|
Packit Service |
8ebd8e |
self._partial_buf = l[-1]
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
self._lines_sem.release()
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
os.write(self._output.fileno(), r)
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
def check_line_re(self, needle_re, timeout=0, failmsg=None):
|
|
Packit Service |
8ebd8e |
deadline = time.time() + timeout
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
if isinstance(needle_re, str):
|
|
Packit Service |
8ebd8e |
needle_re = needle_re.encode('ascii')
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
r = re.compile(needle_re)
|
|
Packit Service |
8ebd8e |
ret = []
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
while True:
|
|
Packit Service |
8ebd8e |
try:
|
|
Packit Service |
8ebd8e |
l = self._lines.pop(0)
|
|
Packit Service |
8ebd8e |
except IndexError:
|
|
Packit Service |
8ebd8e |
# Check if should wake up
|
|
Packit Service |
8ebd8e |
if not self._lines_sem.acquire(timeout = deadline - time.time()):
|
|
Packit Service |
8ebd8e |
if failmsg:
|
|
Packit Service |
8ebd8e |
raise AssertionError(failmsg)
|
|
Packit Service |
8ebd8e |
else:
|
|
Packit Service |
8ebd8e |
raise AssertionError('Timed out waiting for needle %s (timeout: %0.2f)' % (str(needle_re), timeout))
|
|
Packit Service |
8ebd8e |
continue
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
ret.append(l)
|
|
Packit Service |
8ebd8e |
if r.search(l):
|
|
Packit Service |
8ebd8e |
return ret
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
def check_line(self, needle, timeout=0, failmsg=None):
|
|
Packit Service |
8ebd8e |
if isinstance(needle, str):
|
|
Packit Service |
8ebd8e |
needle = needle.encode('ascii')
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
needle_re = re.escape(needle)
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
return self.check_line_re(needle_re, timeout=timeout, failmsg=failmsg)
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
def check_no_line_re(self, needle_re, wait=0, failmsg=None):
|
|
Packit Service |
8ebd8e |
deadline = time.time() + wait
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
if isinstance(needle_re, str):
|
|
Packit Service |
8ebd8e |
needle_re = needle_re.encode('ascii')
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
r = re.compile(needle_re)
|
|
Packit Service |
8ebd8e |
ret = []
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
while True:
|
|
Packit Service |
8ebd8e |
try:
|
|
Packit Service |
8ebd8e |
l = self._lines.pop(0)
|
|
Packit Service |
8ebd8e |
except IndexError:
|
|
Packit Service |
8ebd8e |
# Check if should wake up
|
|
Packit Service |
8ebd8e |
if not self._lines_sem.acquire(timeout = deadline - time.time()):
|
|
Packit Service |
8ebd8e |
# Timed out, so everything is good
|
|
Packit Service |
8ebd8e |
break
|
|
Packit Service |
8ebd8e |
continue
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
ret.append(l)
|
|
Packit Service |
8ebd8e |
if r.search(l):
|
|
Packit Service |
8ebd8e |
if failmsg:
|
|
Packit Service |
8ebd8e |
raise AssertionError(failmsg)
|
|
Packit Service |
8ebd8e |
else:
|
|
Packit Service |
8ebd8e |
raise AssertionError('Found needle %s but shouldn\'t have been there (timeout: %0.2f)' % (str(needle_re), timeout))
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
return ret
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
def check_no_line(self, needle, wait=0, failmsg=None):
|
|
Packit Service |
8ebd8e |
if isinstance(needle, str):
|
|
Packit Service |
8ebd8e |
needle = needle.encode('ascii')
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
needle_re = re.escape(needle)
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
return self.check_no_line_re(needle_re, wait=wait, failmsg=failmsg)
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
def clear(self):
|
|
Packit Service |
8ebd8e |
ret = self._lines
|
|
Packit Service |
8ebd8e |
self._lines = []
|
|
Packit Service |
8ebd8e |
return ret
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
def assert_closed(self, timeout=1):
|
|
Packit Service |
8ebd8e |
self._thread.join(timeout)
|
|
Packit Service |
8ebd8e |
if self._thread.is_alive() != False:
|
|
Packit Service |
8ebd8e |
raise AssertionError("OutputCheck: Write side has not been closed yet!")
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
@property
|
|
Packit Service |
8ebd8e |
def fd(self):
|
|
Packit Service |
8ebd8e |
return self._pipe_fd_w
|
|
Packit Service |
8ebd8e |
|
|
Packit Service |
8ebd8e |
def writer_attached(self):
|
|
Packit Service |
8ebd8e |
os.close(self._pipe_fd_w)
|
|
Packit Service |
8ebd8e |
self._pipe_fd_w = -1
|
|
Packit Service |
8ebd8e |
|