|
Packit |
b802ec |
#!/usr/bin/env python
|
|
Packit |
b802ec |
#
|
|
Packit |
b802ec |
# mtr -- a network diagnostic tool
|
|
Packit |
b802ec |
# Copyright (C) 2016 Matt Kimball
|
|
Packit |
b802ec |
#
|
|
Packit |
b802ec |
# This program is free software; you can redistribute it and/or modify
|
|
Packit |
b802ec |
# it under the terms of the GNU General Public License version 2 as
|
|
Packit |
b802ec |
# published by the Free Software Foundation.
|
|
Packit |
b802ec |
#
|
|
Packit |
b802ec |
# This program is distributed in the hope that it will be useful,
|
|
Packit |
b802ec |
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
Packit |
b802ec |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
Packit |
b802ec |
# GNU General Public License for more details.
|
|
Packit |
b802ec |
#
|
|
Packit |
b802ec |
# You should have received a copy of the GNU General Public License
|
|
Packit |
b802ec |
# along with this program; if not, write to the Free Software
|
|
Packit |
b802ec |
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
|
|
Packit |
b802ec |
#
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
'''Test sending probes and receiving respones.'''
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
import socket
|
|
Packit |
b802ec |
import sys
|
|
Packit |
b802ec |
import time
|
|
Packit |
b802ec |
import unittest
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
import mtrpacket
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
def resolve_ipv6_address(hostname): # type: (str) -> str
|
|
Packit |
b802ec |
'Resolve a hostname to an IP version 6 address'
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
for addrinfo in socket.getaddrinfo(hostname, 0):
|
|
Packit |
b802ec |
# pylint: disable=locally-disabled, unused-variable
|
|
Packit |
b802ec |
(family, socktype, proto, name, sockaddr) = addrinfo
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
if family == socket.AF_INET6:
|
|
Packit |
b802ec |
sockaddr6 = sockaddr # type: tuple
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
(address, port, flow, scope) = sockaddr6
|
|
Packit |
b802ec |
return address
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
raise LookupError(hostname)
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
def check_feature(test, feature):
|
|
Packit |
b802ec |
'Check for support for a particular feature with mtr-packet'
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
check_cmd = '70 check-support feature ' + feature
|
|
Packit |
b802ec |
test.write_command(check_cmd)
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
reply = test.parse_reply()
|
|
Packit |
b802ec |
test.assertEqual(reply.command_name, 'feature-support')
|
|
Packit |
b802ec |
test.assertIn('support', reply.argument)
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
if reply.argument['support'] != 'ok':
|
|
Packit |
b802ec |
return False
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
return True
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
def test_basic_remote_probe(test, ip_version, protocol):
|
|
Packit |
b802ec |
'Test a probe to a remote host with a TTL of 1'
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
protocol_str = 'protocol ' + protocol
|
|
Packit |
b802ec |
if ip_version == 6:
|
|
Packit |
b802ec |
address_str = 'ip-6 ' + resolve_ipv6_address(mtrpacket.IPV6_TEST_HOST)
|
|
Packit |
b802ec |
elif ip_version == 4:
|
|
Packit |
b802ec |
address_str = 'ip-4 8.8.8.8'
|
|
Packit |
b802ec |
else:
|
|
Packit |
b802ec |
raise ValueError(ip_version)
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
cmd = '60 send-probe ' + \
|
|
Packit |
b802ec |
protocol_str + ' ' + address_str + ' port 164 ttl 1'
|
|
Packit |
b802ec |
test.write_command(cmd)
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
reply = test.parse_reply()
|
|
Packit |
b802ec |
test.assertEqual(reply.command_name, 'ttl-expired')
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
def test_basic_local_probe(test, ip_version, protocol):
|
|
Packit |
b802ec |
'Test a probe to a closed port on localhost'
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
protocol_str = 'protocol ' + protocol
|
|
Packit |
b802ec |
if ip_version == 6:
|
|
Packit |
b802ec |
address_str = 'ip-6 ::1'
|
|
Packit |
b802ec |
elif ip_version == 4:
|
|
Packit |
b802ec |
address_str = 'ip-4 127.0.0.1'
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
cmd = '61 send-probe ' + \
|
|
Packit |
b802ec |
protocol_str + ' ' + address_str + ' port 164'
|
|
Packit |
b802ec |
test.write_command(cmd)
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
reply = test.parse_reply()
|
|
Packit |
b802ec |
test.assertEqual(reply.command_name, 'reply')
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
if ip_version == 6:
|
|
Packit |
b802ec |
test.assertIn('ip-6', reply.argument)
|
|
Packit |
b802ec |
test.assertEqual(reply.argument['ip-6'], '::1')
|
|
Packit |
b802ec |
elif ip_version == 4:
|
|
Packit |
b802ec |
test.assertIn('ip-4', reply.argument)
|
|
Packit |
b802ec |
test.assertEqual(reply.argument['ip-4'], '127.0.0.1')
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
def test_basic_probe(test, ip_version, protocol):
|
|
Packit |
b802ec |
# type: (mtrpacket.MtrPacketTest, int, unicode) -> None
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
'''Test a probe with TTL expiration and a probe which reaches its
|
|
Packit |
b802ec |
destination with a particular protocol.'''
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
if not check_feature(test, protocol):
|
|
Packit |
b802ec |
err_str = 'Skipping ' + protocol + ' test due to no support\n'
|
|
Packit |
b802ec |
sys.stderr.write(err_str.encode('utf-8'))
|
|
Packit |
b802ec |
return
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
test_basic_remote_probe(test, ip_version, protocol)
|
|
Packit |
b802ec |
test_basic_local_probe(test, ip_version, protocol)
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
class TestProbeICMPv4(mtrpacket.MtrPacketTest):
|
|
Packit |
b802ec |
'''Test sending probes using IP version 4'''
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
def test_probe(self):
|
|
Packit |
b802ec |
'Test sending regular ICMP probes to known addresses'
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
# Probe Google's well-known DNS server and expect a reply
|
|
Packit |
b802ec |
self.write_command('14 send-probe ip-4 8.8.8.8')
|
|
Packit |
b802ec |
reply = self.parse_reply()
|
|
Packit |
b802ec |
self.assertEqual(reply.token, 14)
|
|
Packit |
b802ec |
self.assertEqual(reply.command_name, 'reply')
|
|
Packit |
b802ec |
self.assertIn('ip-4', reply.argument)
|
|
Packit |
b802ec |
self.assertEqual(reply.argument['ip-4'], '8.8.8.8')
|
|
Packit |
b802ec |
self.assertIn('round-trip-time', reply.argument)
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
def test_timeout(self):
|
|
Packit |
b802ec |
'Test timeouts when sending to a non-existant address'
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
#
|
|
Packit |
b802ec |
# Probe a non-existant address, and expect no reply
|
|
Packit |
b802ec |
#
|
|
Packit |
b802ec |
# I'm not sure what the best way to find an address that doesn't
|
|
Packit |
b802ec |
# exist, but is still route-able. If we use a reserved IP
|
|
Packit |
b802ec |
# address range, Windows will tell us it is non-routeable,
|
|
Packit |
b802ec |
# rather than timing out when transmitting to that address.
|
|
Packit |
b802ec |
#
|
|
Packit |
b802ec |
# We're just using a currently unused address in Google's
|
|
Packit |
b802ec |
# range instead. This is probably not the best solution.
|
|
Packit |
b802ec |
#
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
# pylint: disable=locally-disabled, unused-variable
|
|
Packit |
b802ec |
for i in range(16):
|
|
Packit |
b802ec |
self.write_command('15 send-probe ip-4 8.8.254.254 timeout 1')
|
|
Packit |
b802ec |
reply = self.parse_reply()
|
|
Packit |
b802ec |
self.assertEqual(reply.token, 15)
|
|
Packit |
b802ec |
self.assertEqual(reply.command_name, 'no-reply')
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
def test_exhaust_probes(self):
|
|
Packit |
b802ec |
'Test exhausting all available probes'
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
probe_count = 4 * 1024
|
|
Packit |
b802ec |
token = 1024
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
# pylint: disable=locally-disabled, unused-variable
|
|
Packit |
b802ec |
for i in range(probe_count):
|
|
Packit |
b802ec |
command = str(token) + ' send-probe ip-4 8.8.254.254 timeout 60'
|
|
Packit |
b802ec |
token += 1
|
|
Packit |
b802ec |
self.write_command(command)
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
reply = None
|
|
Packit |
b802ec |
try:
|
|
Packit |
b802ec |
reply = self.parse_reply(0)
|
|
Packit |
b802ec |
except mtrpacket.ReadReplyTimeout:
|
|
Packit |
b802ec |
pass
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
if reply:
|
|
Packit |
b802ec |
if reply.command_name == 'probes-exhausted':
|
|
Packit |
b802ec |
break
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
self.assertIsNotNone(reply)
|
|
Packit |
b802ec |
self.assertEqual(reply.command_name, 'probes-exhausted')
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
def test_timeout_values(self):
|
|
Packit |
b802ec |
'''Test that timeout values wait the right amount of time
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
Give each probe a half-second grace period to probe a timeout
|
|
Packit |
b802ec |
reply after the expected timeout time.'''
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
begin = time.time()
|
|
Packit |
b802ec |
self.write_command('19 send-probe ip-4 8.8.254.254 timeout 0')
|
|
Packit |
b802ec |
self.parse_reply()
|
|
Packit |
b802ec |
elapsed = time.time() - begin
|
|
Packit |
b802ec |
self.assertLess(elapsed, 0.5)
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
begin = time.time()
|
|
Packit |
b802ec |
self.write_command('20 send-probe ip-4 8.8.254.254 timeout 1')
|
|
Packit |
b802ec |
self.parse_reply()
|
|
Packit |
b802ec |
elapsed = time.time() - begin
|
|
Packit |
b802ec |
self.assertGreaterEqual(elapsed, 0.9)
|
|
Packit |
b802ec |
self.assertLess(elapsed, 1.5)
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
begin = time.time()
|
|
Packit |
b802ec |
self.write_command('21 send-probe ip-4 8.8.254.254 timeout 3')
|
|
Packit |
b802ec |
self.parse_reply()
|
|
Packit |
b802ec |
elapsed = time.time() - begin
|
|
Packit |
b802ec |
self.assertGreaterEqual(elapsed, 2.9)
|
|
Packit |
b802ec |
self.assertLess(elapsed, 3.5)
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
def test_ttl_expired(self):
|
|
Packit |
b802ec |
'Test sending a probe which will have its time-to-live expire'
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
# Probe Goolge's DNS server, but give the probe only one hop
|
|
Packit |
b802ec |
# to live.
|
|
Packit |
b802ec |
self.write_command('16 send-probe ip-4 8.8.8.8 ttl 1')
|
|
Packit |
b802ec |
reply = self.parse_reply()
|
|
Packit |
b802ec |
self.assertEqual(reply.command_name, 'ttl-expired')
|
|
Packit |
b802ec |
self.assertIn('ip-4', reply.argument)
|
|
Packit |
b802ec |
self.assertIn('round-trip-time', reply.argument)
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
def test_parallel_probes(self):
|
|
Packit |
b802ec |
'''Test sending multiple probes in parallel
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
We will expect the probes to complete out-of-order by sending
|
|
Packit |
b802ec |
a probe to a distant host immeidately followed by a probe to
|
|
Packit |
b802ec |
the local host.'''
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
success_count = 0
|
|
Packit |
b802ec |
loop_count = 32
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
# pylint: disable=locally-disabled, unused-variable
|
|
Packit |
b802ec |
for i in range(loop_count):
|
|
Packit |
b802ec |
# Probe the distant host before the local host.
|
|
Packit |
b802ec |
self.write_command('17 send-probe ip-4 8.8.8.8 timeout 1')
|
|
Packit |
b802ec |
self.write_command('18 send-probe ip-4 127.0.0.1 timeout 1')
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
reply = self.parse_reply()
|
|
Packit |
b802ec |
if reply.command_name == 'no-reply':
|
|
Packit |
b802ec |
continue
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
self.assertEqual(reply.command_name, 'reply')
|
|
Packit |
b802ec |
self.assertIn('ip-4', reply.argument)
|
|
Packit |
b802ec |
self.assertEqual(reply.argument['ip-4'], '127.0.0.1')
|
|
Packit |
b802ec |
self.assertIn('round-trip-time', reply.argument)
|
|
Packit |
b802ec |
first_time = int(reply.argument['round-trip-time'])
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
reply = self.parse_reply()
|
|
Packit |
b802ec |
if reply.command_name == 'no-reply':
|
|
Packit |
b802ec |
continue
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
self.assertEqual(reply.command_name, 'reply')
|
|
Packit |
b802ec |
self.assertIn('ip-4', reply.argument)
|
|
Packit |
b802ec |
self.assertEqual(reply.argument['ip-4'], '8.8.8.8')
|
|
Packit |
b802ec |
self.assertIn('round-trip-time', reply.argument)
|
|
Packit |
b802ec |
second_time = int(reply.argument['round-trip-time'])
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
# Ensure we got a reply from the host with the lowest latency
|
|
Packit |
b802ec |
# first.
|
|
Packit |
b802ec |
self.assertLess(first_time, second_time)
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
success_count += 1
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
# We need 90% success to pass. This allows a few probes to be
|
|
Packit |
b802ec |
# occasionally dropped by the network without failing the test.
|
|
Packit |
b802ec |
required_success = int(loop_count * 0.90)
|
|
Packit |
b802ec |
self.assertGreaterEqual(success_count, required_success)
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
class TestProbeICMPv6(mtrpacket.MtrPacketTest):
|
|
Packit |
b802ec |
'''Test sending probes using IP version 6'''
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
def __init__(self, *args):
|
|
Packit |
b802ec |
google_addr = resolve_ipv6_address(mtrpacket.IPV6_TEST_HOST)
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
self.google_addr = google_addr # type: str
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
super(TestProbeICMPv6, self).__init__(*args)
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
@unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
|
|
Packit |
b802ec |
def test_probe(self):
|
|
Packit |
b802ec |
"Test a probe to Google's public DNS server"
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
# Probe Google's well-known DNS server and expect a reply
|
|
Packit |
b802ec |
self.write_command('51 send-probe ip-6 ' + self.google_addr)
|
|
Packit |
b802ec |
reply = self.parse_reply()
|
|
Packit |
b802ec |
self.assertEqual(reply.command_name, 'reply')
|
|
Packit |
b802ec |
self.assertIn('ip-6', reply.argument)
|
|
Packit |
b802ec |
self.assertIn('round-trip-time', reply.argument)
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
# Probe the loopback, and check the address we get a reply from is
|
|
Packit |
b802ec |
# also the loopback. While implementing IPv6, I had a bug where
|
|
Packit |
b802ec |
# the low bits of the received address got zeroed. This checks for
|
|
Packit |
b802ec |
# that bug.
|
|
Packit |
b802ec |
self.write_command('52 send-probe ip-6 ::1')
|
|
Packit |
b802ec |
reply = self.parse_reply()
|
|
Packit |
b802ec |
self.assertEqual(reply.command_name, 'reply')
|
|
Packit |
b802ec |
self.assertIn('ip-6', reply.argument)
|
|
Packit |
b802ec |
self.assertIn('round-trip-time', reply.argument)
|
|
Packit |
b802ec |
self.assertEqual(reply.argument['ip-6'], '::1')
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
@unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
|
|
Packit |
b802ec |
def test_ttl_expired(self):
|
|
Packit |
b802ec |
'Test sending a probe which will have its time-to-live expire'
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
# Probe Goolge's DNS server, but give the probe only one hop
|
|
Packit |
b802ec |
# to live.
|
|
Packit |
b802ec |
cmd = '53 send-probe ip-6 ' + self.google_addr + ' ttl 1'
|
|
Packit |
b802ec |
self.write_command(cmd)
|
|
Packit |
b802ec |
reply = self.parse_reply()
|
|
Packit |
b802ec |
self.assertEqual('ttl-expired', reply.command_name)
|
|
Packit |
b802ec |
self.assertIn('ip-6', reply.argument)
|
|
Packit |
b802ec |
self.assertIn('round-trip-time', reply.argument)
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
class TestProbeUDP(mtrpacket.MtrPacketTest):
|
|
Packit |
b802ec |
'Test transmitting probes using UDP'
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
def udp_port_test(self, address): # type: (unicode) -> None
|
|
Packit |
b802ec |
'Test UDP probes with variations on source port and dest port'
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
if not check_feature(self, 'udp'):
|
|
Packit |
b802ec |
return
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
cmd = '80 send-probe protocol udp ' + address
|
|
Packit |
b802ec |
self.write_command(cmd)
|
|
Packit |
b802ec |
reply = self.parse_reply()
|
|
Packit |
b802ec |
self.assertEqual('reply', reply.command_name)
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
cmd = '81 send-probe protocol udp port 990 ' + address
|
|
Packit |
b802ec |
self.write_command(cmd)
|
|
Packit |
b802ec |
reply = self.parse_reply()
|
|
Packit |
b802ec |
self.assertEqual('reply', reply.command_name)
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
cmd = '82 send-probe protocol udp local-port 1991 ' + address
|
|
Packit |
b802ec |
self.write_command(cmd)
|
|
Packit |
b802ec |
reply = self.parse_reply()
|
|
Packit |
b802ec |
self.assertEqual('reply', reply.command_name)
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
def test_udp_v4(self):
|
|
Packit |
b802ec |
'Test IPv4 UDP probes'
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
test_basic_probe(self, 4, 'udp')
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
self.udp_port_test('ip-4 127.0.0.1')
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
@unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
|
|
Packit |
b802ec |
def test_udp_v6(self):
|
|
Packit |
b802ec |
'Test IPv6 UDP probes'
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
test_basic_probe(self, 6, 'udp')
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
self.udp_port_test('ip-6 ::1')
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
class TestProbeTCP(mtrpacket.MtrPacketTest):
|
|
Packit |
b802ec |
'Test TCP probe support'
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
def test_tcp_v4(self):
|
|
Packit |
b802ec |
'''Test IPv4 TCP probes, with TTL expiration, to a refused port
|
|
Packit |
b802ec |
and to an open port'''
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
test_basic_probe(self, 4, 'tcp')
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
if not check_feature(self, 'tcp'):
|
|
Packit |
b802ec |
return
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
# Probe a local port assumed to be open (ssh)
|
|
Packit |
b802ec |
cmd = '80 send-probe ip-4 127.0.0.1 protocol tcp port 22'
|
|
Packit |
b802ec |
self.write_command(cmd)
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
reply = self.parse_reply()
|
|
Packit |
b802ec |
self.assertEqual(reply.command_name, 'reply')
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
@unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
|
|
Packit |
b802ec |
def test_tcp_v6(self):
|
|
Packit |
b802ec |
'Test IPv6 TCP probes'
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
test_basic_probe(self, 6, 'tcp')
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
if not check_feature(self, 'tcp'):
|
|
Packit |
b802ec |
return
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
# Probe a local port assumed to be open (ssh)
|
|
Packit |
b802ec |
cmd = '80 send-probe ip-6 ::1 protocol tcp port 22'
|
|
Packit |
b802ec |
self.write_command(cmd)
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
reply = self.parse_reply()
|
|
Packit |
b802ec |
self.assertEqual(reply.command_name, 'reply')
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
class TestProbeSCTP(mtrpacket.MtrPacketTest):
|
|
Packit |
b802ec |
'Test SCTP probes'
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
def test_sctp_v4(self):
|
|
Packit |
b802ec |
'Test basic SCTP probes over IPv4'
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
test_basic_probe(self, 4, 'sctp')
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
@unittest.skipUnless(mtrpacket.HAVE_IPV6, 'No IPv6')
|
|
Packit |
b802ec |
def test_sctp_v6(self):
|
|
Packit |
b802ec |
'Test basic SCTP probes over IPv6'
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
test_basic_probe(self, 6, 'sctp')
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
|
|
Packit |
b802ec |
if __name__ == '__main__':
|
|
Packit |
b802ec |
mtrpacket.check_running_as_root()
|
|
Packit |
b802ec |
unittest.main()
|