Blame test/mtrpacket.py

Packit b802ec
#
Packit b802ec
#   mtr  --  a network diagnostic tool
Packit b802ec
#   Copyright (C) 2016  Matt Kimball
Packit b802ec
#
Packit b802ec
#   This program is free software; you can redistribute it and/or modify
Packit b802ec
#   it under the terms of the GNU General Public License version 2 as
Packit b802ec
#   published by the Free Software Foundation.
Packit b802ec
#
Packit b802ec
#   This program is distributed in the hope that it will be useful,
Packit b802ec
#   but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit b802ec
#   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
Packit b802ec
#   GNU General Public License for more details.
Packit b802ec
#
Packit b802ec
#   You should have received a copy of the GNU General Public License
Packit b802ec
#   along with this program; if not, write to the Free Software
Packit b802ec
#   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
Packit b802ec
#
Packit b802ec
Packit b802ec
'''Infrastructure for running tests which invoke mtr-packet.'''
Packit b802ec
Packit b802ec
import fcntl
Packit b802ec
import os
Packit b802ec
import select
Packit b802ec
import socket
Packit b802ec
import subprocess
Packit b802ec
import sys
Packit b802ec
import time
Packit b802ec
import unittest
Packit b802ec
Packit b802ec
#
Packit b802ec
#  typing is used for mypy type checking, but isn't required to run,
Packit b802ec
#  so it's okay if we can't import it.
Packit b802ec
#
Packit b802ec
try:
Packit b802ec
    # pylint: disable=locally-disabled, unused-import
Packit b802ec
    from typing import Dict, List
Packit b802ec
except ImportError:
Packit b802ec
    pass
Packit b802ec
Packit b802ec
Packit b802ec
IPV6_TEST_HOST = 'google-public-dns-a.google.com'
Packit b802ec
Packit b802ec
Packit b802ec
class MtrPacketExecuteError(Exception):
Packit b802ec
    "Exception raised when MtrPacketTest can't execute mtr-packet"
Packit b802ec
    pass
Packit b802ec
Packit b802ec
Packit b802ec
class ReadReplyTimeout(Exception):
Packit b802ec
    'Exception raised by TestProbe.read_reply upon timeout'
Packit b802ec
Packit b802ec
    pass
Packit b802ec
Packit b802ec
Packit b802ec
class WriteCommandTimeout(Exception):
Packit b802ec
    'Exception raised by TestProbe.write_command upon timeout'
Packit b802ec
Packit b802ec
    pass
Packit b802ec
Packit b802ec
Packit b802ec
class MtrPacketReplyParseError(Exception):
Packit b802ec
    "Exception raised when MtrPacketReply can't parse the reply string"
Packit b802ec
Packit b802ec
    pass
Packit b802ec
Packit b802ec
Packit b802ec
class PacketListenError(Exception):
Packit b802ec
    'Exception raised when we have unexpected results from mtr-packet-listen'
Packit b802ec
Packit b802ec
    pass
Packit b802ec
Packit b802ec
Packit b802ec
def set_nonblocking(file_descriptor):  # type: (int) -> None
Packit b802ec
    'Put a file descriptor into non-blocking mode'
Packit b802ec
Packit b802ec
    flags = fcntl.fcntl(file_descriptor, fcntl.F_GETFL)
Packit b802ec
Packit b802ec
    # pylint: disable=locally-disabled, no-member
Packit b802ec
    fcntl.fcntl(file_descriptor, fcntl.F_SETFL, flags | os.O_NONBLOCK)
Packit b802ec
Packit b802ec
Packit b802ec
def check_for_local_ipv6():
Packit b802ec
    '''Check for IPv6 support on the test host, to see if we should skip
Packit b802ec
    the IPv6 tests'''
Packit b802ec
Packit b802ec
    addrinfo = socket.getaddrinfo(IPV6_TEST_HOST, 1, socket.AF_INET6)
Packit b802ec
    if len(addrinfo):
Packit b802ec
        addr = addrinfo[0][4]
Packit b802ec
Packit b802ec
    #  Create a UDP socket and check to see it can be connected to
Packit b802ec
    #  IPV6_TEST_HOST.  (Connecting UDP requires no packets sent, just
Packit b802ec
    #  a route present.)
Packit b802ec
    sock = socket.socket(
Packit b802ec
        socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
Packit b802ec
Packit b802ec
    connect_success = False
Packit b802ec
    try:
Packit b802ec
        sock.connect(addr)
Packit b802ec
        connect_success = True
Packit b802ec
    except socket.error:
Packit b802ec
        pass
Packit b802ec
Packit b802ec
    sock.close()
Packit b802ec
Packit b802ec
    if not connect_success:
Packit b802ec
        sys.stderr.write(
Packit b802ec
            'This host has no IPv6.  Skipping IPv6 tests.\n')
Packit b802ec
Packit b802ec
    return connect_success
Packit b802ec
Packit b802ec
Packit b802ec
HAVE_IPV6 = check_for_local_ipv6()
Packit b802ec
Packit b802ec
Packit b802ec
# pylint: disable=locally-disabled, too-few-public-methods
Packit b802ec
class MtrPacketReply(object):
Packit b802ec
    'A parsed reply from mtr-packet'
Packit b802ec
Packit b802ec
    def __init__(self, reply):  # type: (unicode) -> None
Packit b802ec
        self.token = 0  # type: int
Packit b802ec
        self.command_name = None  # type: unicode
Packit b802ec
        self.argument = {}  # type: Dict[unicode, unicode]
Packit b802ec
Packit b802ec
        self.parse_reply(reply)
Packit b802ec
Packit b802ec
    def parse_reply(self, reply):  # type (unicode) -> None
Packit b802ec
        'Parses a reply string into members for the instance of this class'
Packit b802ec
Packit b802ec
        tokens = reply.split()  # type List[unicode]
Packit b802ec
Packit b802ec
        try:
Packit b802ec
            self.token = int(tokens[0])
Packit b802ec
            self.command_name = tokens[1]
Packit b802ec
        except IndexError:
Packit b802ec
            raise MtrPacketReplyParseError(reply)
Packit b802ec
Packit b802ec
        i = 2
Packit b802ec
        while i < len(tokens):
Packit b802ec
            try:
Packit b802ec
                name = tokens[i]
Packit b802ec
                value = tokens[i + 1]
Packit b802ec
            except IndexError:
Packit b802ec
                raise MtrPacketReplyParseError(reply)
Packit b802ec
Packit b802ec
            self.argument[name] = value
Packit b802ec
            i += 2
Packit b802ec
Packit b802ec
Packit b802ec
class PacketListen(object):
Packit b802ec
    'A test process which listens for a single packet'
Packit b802ec
Packit b802ec
    def __init__(self, *args):
Packit b802ec
        self.process_args = list(args)  # type: List[unicode]
Packit b802ec
        self.listen_process = None  # type: subprocess.Popen
Packit b802ec
        self.attrib = None  # type: Dict[unicode, unicode]
Packit b802ec
Packit b802ec
    def __enter__(self):
Packit b802ec
        try:
Packit b802ec
            self.listen_process = subprocess.Popen(
Packit b802ec
                ['./mtr-packet-listen'] + self.process_args,
Packit b802ec
                stdin=subprocess.PIPE,
Packit b802ec
                stdout=subprocess.PIPE)
Packit b802ec
        except OSError:
Packit b802ec
            raise PacketListenError('unable to launch mtr-packet-listen')
Packit b802ec
Packit b802ec
        status = self.listen_process.stdout.readline().decode('utf-8')
Packit b802ec
        if status != 'status listening\n':
Packit b802ec
            raise PacketListenError('unexpected status')
Packit b802ec
Packit b802ec
        return self
Packit b802ec
Packit b802ec
    def __exit__(self, exc_type, exc_value, traceback):
Packit b802ec
        self.wait_for_exit()
Packit b802ec
Packit b802ec
        self.attrib = {}
Packit b802ec
        for line in self.listen_process.stdout.readlines():
Packit b802ec
            tokens = line.decode('utf-8').split()
Packit b802ec
Packit b802ec
            if len(tokens) >= 2:
Packit b802ec
                name = tokens[0]
Packit b802ec
                value = tokens[1]
Packit b802ec
Packit b802ec
                self.attrib[name] = value
Packit b802ec
Packit b802ec
        self.listen_process.stdin.close()
Packit b802ec
        self.listen_process.stdout.close()
Packit b802ec
Packit b802ec
    def wait_for_exit(self):
Packit b802ec
        '''Poll the subprocess for up to ten seconds, until it exits.
Packit b802ec
Packit b802ec
        We need to wait for its exit to ensure we are able to read its
Packit b802ec
        output.'''
Packit b802ec
Packit b802ec
        wait_time = 10
Packit b802ec
        wait_step = 0.1
Packit b802ec
Packit b802ec
        steps = int(wait_time / wait_step)
Packit b802ec
Packit b802ec
        exit_value = None
Packit b802ec
Packit b802ec
        # pylint: disable=locally-disabled, unused-variable
Packit b802ec
        for i in range(steps):
Packit b802ec
            exit_value = self.listen_process.poll()
Packit b802ec
            if exit_value is not None:
Packit b802ec
                break
Packit b802ec
Packit b802ec
            time.sleep(wait_step)
Packit b802ec
Packit b802ec
        if exit_value is None:
Packit b802ec
            raise PacketListenError('mtr-packet-listen timeout')
Packit b802ec
Packit b802ec
        if exit_value != 0:
Packit b802ec
            raise PacketListenError('mtr-packet-listen unexpected error')
Packit b802ec
Packit b802ec
Packit b802ec
class MtrPacketTest(unittest.TestCase):
Packit b802ec
    '''Base class for tests invoking mtr-packet.
Packit b802ec
Packit b802ec
    Start a new mtr-packet subprocess for each test, and kill it
Packit b802ec
    at the conclusion of the test.
Packit b802ec
Packit b802ec
    Provide methods for writing commands and reading replies.
Packit b802ec
    '''
Packit b802ec
Packit b802ec
    def __init__(self, *args):
Packit b802ec
        self.reply_buffer = None  # type: unicode
Packit b802ec
        self.packet_process = None  # type: subprocess.Popen
Packit b802ec
        self.stdout_fd = None  # type: int
Packit b802ec
Packit b802ec
        super(MtrPacketTest, self).__init__(*args)
Packit b802ec
Packit b802ec
    def setUp(self):
Packit b802ec
        'Set up a test case by spawning a mtr-packet process'
Packit b802ec
Packit b802ec
        packet_path = os.environ.get('MTR_PACKET', './mtr-packet')
Packit b802ec
Packit b802ec
        self.reply_buffer = ''
Packit b802ec
        try:
Packit b802ec
            self.packet_process = subprocess.Popen(
Packit b802ec
                [packet_path],
Packit b802ec
                stdin=subprocess.PIPE,
Packit b802ec
                stdout=subprocess.PIPE)
Packit b802ec
        except OSError:
Packit b802ec
            raise MtrPacketExecuteError(packet_path)
Packit b802ec
Packit b802ec
        #  Put the mtr-packet process's stdout in non-blocking mode
Packit b802ec
        #  so that we can read from it without a timeout when
Packit b802ec
        #  no reply is available.
Packit b802ec
        self.stdout_fd = self.packet_process.stdout.fileno()
Packit b802ec
        set_nonblocking(self.stdout_fd)
Packit b802ec
Packit b802ec
        self.stdin_fd = self.packet_process.stdin.fileno()
Packit b802ec
        set_nonblocking(self.stdin_fd)
Packit b802ec
Packit b802ec
    def tearDown(self):
Packit b802ec
        'After a test, kill the running mtr-packet instance'
Packit b802ec
Packit b802ec
        self.packet_process.stdin.close()
Packit b802ec
        self.packet_process.stdout.close()
Packit b802ec
Packit b802ec
        try:
Packit b802ec
            self.packet_process.kill()
Packit b802ec
        except OSError:
Packit b802ec
            return
Packit b802ec
Packit b802ec
    def parse_reply(self, timeout=10.0):  # type: (float) -> MtrPacketReply
Packit b802ec
        '''Read the next reply from mtr-packet and parse it into
Packit b802ec
        an MtrPacketReply object.'''
Packit b802ec
Packit b802ec
        reply_str = self.read_reply(timeout)
Packit b802ec
Packit b802ec
        return MtrPacketReply(reply_str)
Packit b802ec
Packit b802ec
    def read_reply(self, timeout=10.0):  # type: (float) -> unicode
Packit b802ec
        '''Read the next reply from mtr-packet.
Packit b802ec
Packit b802ec
        Attempt to read the next command reply from mtr-packet.  If no reply
Packit b802ec
        is available withing the timeout time, raise ReadReplyTimeout
Packit b802ec
        instead.'''
Packit b802ec
Packit b802ec
        start_time = time.time()
Packit b802ec
Packit b802ec
        #  Read from mtr-packet until either the timeout time has elapsed
Packit b802ec
        #  or we read a newline character, which indicates a finished
Packit b802ec
        #  reply.
Packit b802ec
        while True:
Packit b802ec
            now = time.time()
Packit b802ec
            elapsed = now - start_time
Packit b802ec
Packit b802ec
            select_time = timeout - elapsed
Packit b802ec
            if select_time < 0:
Packit b802ec
                select_time = 0
Packit b802ec
Packit b802ec
            select.select([self.stdout_fd], [], [], select_time)
Packit b802ec
Packit b802ec
            reply_bytes = None
Packit b802ec
Packit b802ec
            try:
Packit b802ec
                reply_bytes = os.read(self.stdout_fd, 1024)
Packit b802ec
            except OSError:
Packit b802ec
                pass
Packit b802ec
Packit b802ec
            if reply_bytes:
Packit b802ec
                self.reply_buffer += reply_bytes.decode('utf-8')
Packit b802ec
Packit b802ec
            #  If we have read a newline character, we can stop waiting
Packit b802ec
            #  for more input.
Packit b802ec
            newline_ix = self.reply_buffer.find('\n')
Packit b802ec
            if newline_ix != -1:
Packit b802ec
                break
Packit b802ec
Packit b802ec
            if elapsed >= timeout:
Packit b802ec
                raise ReadReplyTimeout()
Packit b802ec
Packit b802ec
        reply = self.reply_buffer[:newline_ix]
Packit b802ec
        self.reply_buffer = self.reply_buffer[newline_ix + 1:]
Packit b802ec
        return reply
Packit b802ec
Packit b802ec
    def write_command(self, cmd, timeout=10.0):
Packit b802ec
        # type: (unicode, float) -> None
Packit b802ec
Packit b802ec
        '''Send a command string to the mtr-packet instance, timing out
Packit b802ec
        if we are unable to write for an extended period of time.  The
Packit b802ec
        timeout is to avoid deadlocks with the child process where both
Packit b802ec
        the parent and the child are writing to their end of the pipe
Packit b802ec
        and expecting the other end to be reading.'''
Packit b802ec
Packit b802ec
        command_str = cmd + '\n'
Packit b802ec
        command_bytes = command_str.encode('utf-8')
Packit b802ec
Packit b802ec
        start_time = time.time()
Packit b802ec
Packit b802ec
        while True:
Packit b802ec
            now = time.time()
Packit b802ec
            elapsed = now - start_time
Packit b802ec
Packit b802ec
            select_time = timeout - elapsed
Packit b802ec
            if select_time < 0:
Packit b802ec
                select_time = 0
Packit b802ec
Packit b802ec
            select.select([], [self.stdin_fd], [], select_time)
Packit b802ec
Packit b802ec
            bytes_written = 0
Packit b802ec
            try:
Packit b802ec
                bytes_written = os.write(self.stdin_fd, command_bytes)
Packit b802ec
            except OSError:
Packit b802ec
                pass
Packit b802ec
Packit b802ec
            command_bytes = command_bytes[bytes_written:]
Packit b802ec
            if not len(command_bytes):
Packit b802ec
                break
Packit b802ec
Packit b802ec
            if elapsed >= timeout:
Packit b802ec
                raise WriteCommandTimeout()
Packit b802ec
Packit b802ec
Packit b802ec
def check_running_as_root():
Packit b802ec
    'Print a warning to stderr if we are not running as root.'
Packit b802ec
Packit b802ec
    # pylint: disable=locally-disabled, no-member
Packit b802ec
    if sys.platform != 'cygwin' and os.getuid() > 0:
Packit b802ec
        sys.stderr.write(
Packit b802ec
            'Warning: many tests require running as root\n')