Blame test/probe.py

Packit b802ec
#!/usr/bin/env python
Packit b802ec
#
Packit b802ec
#   mtr  --  a network diagnostic tool
Packit b802ec
#   Copyright (C) 2016  Matt Kimball
Packit b802ec
#
Packit b802ec
#   This program is free software; you can redistribute it and/or modify
Packit b802ec
#   it under the terms of the GNU General Public License version 2 as
Packit b802ec
#   published by the Free Software Foundation.
Packit b802ec
#
Packit b802ec
#   This program is distributed in the hope that it will be useful,
Packit b802ec
#   but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit b802ec
#   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
Packit b802ec
#   GNU General Public License for more details.
Packit b802ec
#
Packit b802ec
#   You should have received a copy of the GNU General Public License
Packit b802ec
#   along with this program; if not, write to the Free Software
Packit b802ec
#   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
Packit b802ec
#
Packit b802ec
Packit b802ec
'''Test sending probes and receiving respones.'''
Packit b802ec
Packit b802ec
import socket
Packit b802ec
import sys
Packit b802ec
import time
Packit b802ec
import unittest
Packit b802ec
Packit b802ec
import mtrpacket
Packit b802ec
Packit b802ec
Packit b802ec
def resolve_ipv6_address(hostname):  # type: (str) -> str
Packit b802ec
    'Resolve a hostname to an IP version 6 address'
Packit b802ec
Packit b802ec
    for addrinfo in socket.getaddrinfo(hostname, 0):
Packit b802ec
        # pylint: disable=locally-disabled, unused-variable
Packit b802ec
        (family, socktype, proto, name, sockaddr) = addrinfo
Packit b802ec
Packit b802ec
        if family == socket.AF_INET6:
Packit b802ec
            sockaddr6 = sockaddr  # type: tuple
Packit b802ec
Packit b802ec
            (address, port, flow, scope) = sockaddr6
Packit b802ec
            return address
Packit b802ec
Packit b802ec
    raise LookupError(hostname)
Packit b802ec
Packit b802ec
Packit b802ec
def check_feature(test, feature):
Packit b802ec
    'Check for support for a particular feature with mtr-packet'
Packit b802ec
Packit b802ec
    check_cmd = '70 check-support feature ' + feature
Packit b802ec
    test.write_command(check_cmd)
Packit b802ec
Packit b802ec
    reply = test.parse_reply()
Packit b802ec
    test.assertEqual(reply.command_name, 'feature-support')
Packit b802ec
    test.assertIn('support', reply.argument)
Packit b802ec
Packit b802ec
    if reply.argument['support'] != 'ok':
Packit b802ec
        return False
Packit b802ec
Packit b802ec
    return True
Packit b802ec
Packit b802ec
Packit b802ec
def test_basic_remote_probe(test, ip_version, protocol):
Packit b802ec
    'Test a probe to a remote host with a TTL of 1'
Packit b802ec
Packit b802ec
    protocol_str = 'protocol ' + protocol
Packit b802ec
    if ip_version == 6:
Packit b802ec
        address_str = 'ip-6 ' + resolve_ipv6_address(mtrpacket.IPV6_TEST_HOST)
Packit b802ec
    elif ip_version == 4:
Packit b802ec
        address_str = 'ip-4 8.8.8.8'
Packit b802ec
    else:
Packit b802ec
        raise ValueError(ip_version)
Packit b802ec
Packit b802ec
    cmd = '60 send-probe ' + \
Packit b802ec
        protocol_str + ' ' + address_str + ' port 164 ttl 1'
Packit b802ec
    test.write_command(cmd)
Packit b802ec
Packit b802ec
    reply = test.parse_reply()
Packit b802ec
    test.assertEqual(reply.command_name, 'ttl-expired')
Packit b802ec
Packit b802ec
Packit b802ec
def test_basic_local_probe(test, ip_version, protocol):
Packit b802ec
    'Test a probe to a closed port on localhost'
Packit b802ec
Packit b802ec
    protocol_str = 'protocol ' + protocol
Packit b802ec
    if ip_version == 6:
Packit b802ec
        address_str = 'ip-6 ::1'
Packit b802ec
    elif ip_version == 4:
Packit b802ec
        address_str = 'ip-4 127.0.0.1'
Packit b802ec
Packit b802ec
    cmd = '61 send-probe ' + \
Packit b802ec
        protocol_str + ' ' + address_str + ' port 164'
Packit b802ec
    test.write_command(cmd)
Packit b802ec
Packit b802ec
    reply = test.parse_reply()
Packit b802ec
    test.assertEqual(reply.command_name, 'reply')
Packit b802ec
Packit b802ec
    if ip_version == 6:
Packit b802ec
        test.assertIn('ip-6', reply.argument)
Packit b802ec
        test.assertEqual(reply.argument['ip-6'], '::1')
Packit b802ec
    elif ip_version == 4:
Packit b802ec
        test.assertIn('ip-4', reply.argument)
Packit b802ec
        test.assertEqual(reply.argument['ip-4'], '127.0.0.1')
Packit b802ec
Packit b802ec
Packit b802ec
def test_basic_probe(test, ip_version, protocol):
Packit b802ec
    # type: (mtrpacket.MtrPacketTest, int, unicode) -> None
Packit b802ec
Packit b802ec
    '''Test a probe with TTL expiration and a probe which reaches its
Packit b802ec
    destination with a particular protocol.'''
Packit b802ec
Packit b802ec
    if not check_feature(test, protocol):
Packit b802ec
        err_str = 'Skipping ' + protocol + ' test due to no support\n'
Packit b802ec
        sys.stderr.write(err_str.encode('utf-8'))
Packit b802ec
        return
Packit b802ec
Packit b802ec
    test_basic_remote_probe(test, ip_version, protocol)
Packit b802ec
    test_basic_local_probe(test, ip_version, protocol)
Packit b802ec
Packit b802ec
Packit b802ec
class TestProbeICMPv4(mtrpacket.MtrPacketTest):
Packit b802ec
    '''Test sending probes using IP version 4'''
Packit b802ec
Packit b802ec
    def test_probe(self):
Packit b802ec
        'Test sending regular ICMP probes to known addresses'
Packit b802ec
Packit b802ec
        #  Probe Google's well-known DNS server and expect a reply
Packit b802ec
        self.write_command('14 send-probe ip-4 8.8.8.8')
Packit b802ec
        reply = self.parse_reply()
Packit b802ec
        self.assertEqual(reply.token, 14)
Packit b802ec
        self.assertEqual(reply.command_name, 'reply')
Packit b802ec
        self.assertIn('ip-4', reply.argument)
Packit b802ec
        self.assertEqual(reply.argument['ip-4'], '8.8.8.8')
Packit b802ec
        self.assertIn('round-trip-time', reply.argument)
Packit b802ec
Packit b802ec
    def test_timeout(self):
Packit b802ec
        'Test timeouts when sending to a non-existant address'
Packit b802ec
Packit b802ec
        #
Packit b802ec
        #  Probe a non-existant address, and expect no reply
Packit b802ec
        #
Packit b802ec
        #  I'm not sure what the best way to find an address that doesn't
Packit b802ec
        #  exist, but is still route-able.  If we use a reserved IP
Packit b802ec
        #  address range, Windows will tell us it is non-routeable,
Packit b802ec
        #  rather than timing out when transmitting to that address.
Packit b802ec
        #
Packit b802ec
        #  We're just using a currently unused address in Google's
Packit b802ec
        #  range instead.  This is probably not the best solution.
Packit b802ec
        #
Packit b802ec
Packit b802ec
        # pylint: disable=locally-disabled, unused-variable
Packit b802ec
        for i in range(16):
Packit b802ec
            self.write_command('15 send-probe ip-4 8.8.254.254 timeout 1')
Packit b802ec
            reply = self.parse_reply()
Packit b802ec
            self.assertEqual(reply.token, 15)
Packit b802ec
            self.assertEqual(reply.command_name, 'no-reply')
Packit b802ec
Packit b802ec
    def test_exhaust_probes(self):
Packit b802ec
        'Test exhausting all available probes'
Packit b802ec
Packit b802ec
        probe_count = 4 * 1024
Packit b802ec
        token = 1024
Packit b802ec
Packit b802ec
        # pylint: disable=locally-disabled, unused-variable
Packit b802ec
        for i in range(probe_count):
Packit b802ec
            command = str(token) + ' send-probe ip-4 8.8.254.254 timeout 60'
Packit b802ec
            token += 1
Packit b802ec
            self.write_command(command)
Packit b802ec
Packit b802ec
            reply = None
Packit b802ec
            try:
Packit b802ec
                reply = self.parse_reply(0)
Packit b802ec
            except mtrpacket.ReadReplyTimeout:
Packit b802ec
                pass
Packit b802ec
Packit b802ec
            if reply:
Packit b802ec
                if reply.command_name == 'probes-exhausted':
Packit b802ec
                    break
Packit b802ec
Packit b802ec
        self.assertIsNotNone(reply)
Packit b802ec
        self.assertEqual(reply.command_name, 'probes-exhausted')
Packit b802ec
Packit b802ec
    def test_timeout_values(self):
Packit b802ec
        '''Test that timeout values wait the right amount of time
Packit b802ec
Packit b802ec
        Give each probe a half-second grace period to probe a timeout
Packit b802ec
        reply after the expected timeout time.'''
Packit b802ec
Packit b802ec
        begin = time.time()
Packit b802ec
        self.write_command('19 send-probe ip-4 8.8.254.254 timeout 0')
Packit b802ec
        self.parse_reply()
Packit b802ec
        elapsed = time.time() - begin
Packit b802ec
        self.assertLess(elapsed, 0.5)
Packit b802ec
Packit b802ec
        begin = time.time()
Packit b802ec
        self.write_command('20 send-probe ip-4 8.8.254.254 timeout 1')
Packit b802ec
        self.parse_reply()
Packit b802ec
        elapsed = time.time() - begin
Packit b802ec
        self.assertGreaterEqual(elapsed, 0.9)
Packit b802ec
        self.assertLess(elapsed, 1.5)
Packit b802ec
Packit b802ec
        begin = time.time()
Packit b802ec
        self.write_command('21 send-probe ip-4 8.8.254.254 timeout 3')
Packit b802ec
        self.parse_reply()
Packit b802ec
        elapsed = time.time() - begin
Packit b802ec
        self.assertGreaterEqual(elapsed, 2.9)
Packit b802ec
        self.assertLess(elapsed, 3.5)
Packit b802ec
Packit b802ec
    def test_ttl_expired(self):
Packit b802ec
        'Test sending a probe which will have its time-to-live expire'
Packit b802ec
Packit b802ec
        #  Probe Goolge's DNS server, but give the probe only one hop
Packit b802ec
        #  to live.
Packit b802ec
        self.write_command('16 send-probe ip-4 8.8.8.8 ttl 1')
Packit b802ec
        reply = self.parse_reply()
Packit b802ec
        self.assertEqual(reply.command_name, 'ttl-expired')
Packit b802ec
        self.assertIn('ip-4', reply.argument)
Packit b802ec
        self.assertIn('round-trip-time', reply.argument)
Packit b802ec
Packit b802ec
    def test_parallel_probes(self):
Packit b802ec
        '''Test sending multiple probes in parallel
Packit b802ec
Packit b802ec
        We will expect the probes to complete out-of-order by sending
Packit b802ec
        a probe to a distant host immeidately followed by a probe to
Packit b802ec
        the local host.'''
Packit b802ec
Packit b802ec
        success_count = 0
Packit b802ec
        loop_count = 32
Packit b802ec
Packit b802ec
        # pylint: disable=locally-disabled, unused-variable
Packit b802ec
        for i in range(loop_count):
Packit b802ec
            #  Probe the distant host before the local host.
Packit b802ec
            self.write_command('17 send-probe ip-4 8.8.8.8 timeout 1')
Packit b802ec
            self.write_command('18 send-probe ip-4 127.0.0.1 timeout 1')
Packit b802ec
Packit b802ec
            reply = self.parse_reply()
Packit b802ec
            if reply.command_name == 'no-reply':
Packit b802ec
                continue
Packit b802ec
Packit b802ec
            self.assertEqual(reply.command_name, 'reply')
Packit b802ec
            self.assertIn('ip-4', reply.argument)
Packit b802ec
            self.assertEqual(reply.argument['ip-4'], '127.0.0.1')
Packit b802ec
            self.assertIn('round-trip-time', reply.argument)
Packit b802ec
            first_time = int(reply.argument['round-trip-time'])
Packit b802ec
Packit b802ec
            reply = self.parse_reply()
Packit b802ec
            if reply.command_name == 'no-reply':
Packit b802ec
                continue
Packit b802ec
Packit b802ec
            self.assertEqual(reply.command_name, 'reply')
Packit b802ec
            self.assertIn('ip-4', reply.argument)
Packit b802ec
            self.assertEqual(reply.argument['ip-4'], '8.8.8.8')
Packit b802ec
            self.assertIn('round-trip-time', reply.argument)
Packit b802ec
            second_time = int(reply.argument['round-trip-time'])
Packit b802ec
Packit b802ec
            #  Ensure we got a reply from the host with the lowest latency
Packit b802ec
            #  first.
Packit b802ec
            self.assertLess(first_time, second_time)
Packit b802ec
Packit b802ec
            success_count += 1
Packit b802ec
Packit b802ec
        #  We need 90% success to pass.  This allows a few probes to be
Packit b802ec
        #  occasionally dropped by the network without failing the test.
Packit b802ec
        required_success = int(loop_count * 0.90)
Packit b802ec
        self.assertGreaterEqual(success_count, required_success)
Packit b802ec
Packit b802ec
Packit b802ec
class TestProbeICMPv6(mtrpacket.MtrPacketTest):
Packit b802ec
    '''Test sending probes using IP version 6'''
Packit b802ec
Packit b802ec
    def __init__(self, *args):
Packit b802ec
        google_addr = resolve_ipv6_address(mtrpacket.IPV6_TEST_HOST)
Packit b802ec
Packit b802ec
        self.google_addr = google_addr  # type: str
Packit b802ec
Packit b802ec
        super(TestProbeICMPv6, self).__init__(*args)
Packit b802ec
Packit b802ec
    @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
Packit b802ec
    def test_probe(self):
Packit b802ec
        "Test a probe to Google's public DNS server"
Packit b802ec
Packit b802ec
        #  Probe Google's well-known DNS server and expect a reply
Packit b802ec
        self.write_command('51 send-probe ip-6 ' + self.google_addr)
Packit b802ec
        reply = self.parse_reply()
Packit b802ec
        self.assertEqual(reply.command_name, 'reply')
Packit b802ec
        self.assertIn('ip-6', reply.argument)
Packit b802ec
        self.assertIn('round-trip-time', reply.argument)
Packit b802ec
Packit b802ec
        #  Probe the loopback, and check the address we get a reply from is
Packit b802ec
        #  also the loopback.  While implementing IPv6, I had a bug where
Packit b802ec
        #  the low bits of the received address got zeroed.  This checks for
Packit b802ec
        #  that bug.
Packit b802ec
        self.write_command('52 send-probe ip-6 ::1')
Packit b802ec
        reply = self.parse_reply()
Packit b802ec
        self.assertEqual(reply.command_name, 'reply')
Packit b802ec
        self.assertIn('ip-6', reply.argument)
Packit b802ec
        self.assertIn('round-trip-time', reply.argument)
Packit b802ec
        self.assertEqual(reply.argument['ip-6'], '::1')
Packit b802ec
Packit b802ec
    @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
Packit b802ec
    def test_ttl_expired(self):
Packit b802ec
        'Test sending a probe which will have its time-to-live expire'
Packit b802ec
Packit b802ec
        #  Probe Goolge's DNS server, but give the probe only one hop
Packit b802ec
        #  to live.
Packit b802ec
        cmd = '53 send-probe ip-6 ' + self.google_addr + ' ttl 1'
Packit b802ec
        self.write_command(cmd)
Packit b802ec
        reply = self.parse_reply()
Packit b802ec
        self.assertEqual('ttl-expired', reply.command_name)
Packit b802ec
        self.assertIn('ip-6', reply.argument)
Packit b802ec
        self.assertIn('round-trip-time', reply.argument)
Packit b802ec
Packit b802ec
Packit b802ec
class TestProbeUDP(mtrpacket.MtrPacketTest):
Packit b802ec
    'Test transmitting probes using UDP'
Packit b802ec
Packit b802ec
    def udp_port_test(self, address):  # type: (unicode) -> None
Packit b802ec
        'Test UDP probes with variations on source port and dest port'
Packit b802ec
Packit b802ec
        if not check_feature(self, 'udp'):
Packit b802ec
            return
Packit b802ec
Packit b802ec
        cmd = '80 send-probe protocol udp ' + address
Packit b802ec
        self.write_command(cmd)
Packit b802ec
        reply = self.parse_reply()
Packit b802ec
        self.assertEqual('reply', reply.command_name)
Packit b802ec
Packit b802ec
        cmd = '81 send-probe protocol udp port 990 ' + address
Packit b802ec
        self.write_command(cmd)
Packit b802ec
        reply = self.parse_reply()
Packit b802ec
        self.assertEqual('reply', reply.command_name)
Packit b802ec
Packit b802ec
        cmd = '82 send-probe protocol udp local-port 1991 ' + address
Packit b802ec
        self.write_command(cmd)
Packit b802ec
        reply = self.parse_reply()
Packit b802ec
        self.assertEqual('reply', reply.command_name)
Packit b802ec
Packit b802ec
    def test_udp_v4(self):
Packit b802ec
        'Test IPv4 UDP probes'
Packit b802ec
Packit b802ec
        test_basic_probe(self, 4, 'udp')
Packit b802ec
Packit b802ec
        self.udp_port_test('ip-4 127.0.0.1')
Packit b802ec
Packit b802ec
    @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
Packit b802ec
    def test_udp_v6(self):
Packit b802ec
        'Test IPv6 UDP probes'
Packit b802ec
Packit b802ec
        test_basic_probe(self, 6, 'udp')
Packit b802ec
Packit b802ec
        self.udp_port_test('ip-6 ::1')
Packit b802ec
Packit b802ec
Packit b802ec
class TestProbeTCP(mtrpacket.MtrPacketTest):
Packit b802ec
    'Test TCP probe support'
Packit b802ec
Packit b802ec
    def test_tcp_v4(self):
Packit b802ec
        '''Test IPv4 TCP probes, with TTL expiration, to a refused port
Packit b802ec
        and to an open port'''
Packit b802ec
Packit b802ec
        test_basic_probe(self, 4, 'tcp')
Packit b802ec
Packit b802ec
        if not check_feature(self, 'tcp'):
Packit b802ec
            return
Packit b802ec
Packit b802ec
        #  Probe a local port assumed to be open  (ssh)
Packit b802ec
        cmd = '80 send-probe ip-4 127.0.0.1 protocol tcp port 22'
Packit b802ec
        self.write_command(cmd)
Packit b802ec
Packit b802ec
        reply = self.parse_reply()
Packit b802ec
        self.assertEqual(reply.command_name, 'reply')
Packit b802ec
Packit b802ec
    @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
Packit b802ec
    def test_tcp_v6(self):
Packit b802ec
        'Test IPv6 TCP probes'
Packit b802ec
Packit b802ec
        test_basic_probe(self, 6, 'tcp')
Packit b802ec
Packit b802ec
        if not check_feature(self, 'tcp'):
Packit b802ec
            return
Packit b802ec
Packit b802ec
        #  Probe a local port assumed to be open  (ssh)
Packit b802ec
        cmd = '80 send-probe ip-6 ::1 protocol tcp port 22'
Packit b802ec
        self.write_command(cmd)
Packit b802ec
Packit b802ec
        reply = self.parse_reply()
Packit b802ec
        self.assertEqual(reply.command_name, 'reply')
Packit b802ec
Packit b802ec
Packit b802ec
class TestProbeSCTP(mtrpacket.MtrPacketTest):
Packit b802ec
    'Test SCTP probes'
Packit b802ec
Packit b802ec
    def test_sctp_v4(self):
Packit b802ec
        'Test basic SCTP probes over IPv4'
Packit b802ec
Packit b802ec
        test_basic_probe(self, 4, 'sctp')
Packit b802ec
Packit b802ec
    @unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
Packit b802ec
    def test_sctp_v6(self):
Packit b802ec
        'Test basic SCTP probes over IPv6'
Packit b802ec
Packit b802ec
        test_basic_probe(self, 6, 'sctp')
Packit b802ec
Packit b802ec
Packit b802ec
if __name__ == '__main__':
Packit b802ec
    mtrpacket.check_running_as_root()
Packit b802ec
    unittest.main()