|
Packit |
fd8b60 |
# Author: Nathaniel McCallum <npmccallum@redhat.com>
|
|
Packit |
fd8b60 |
#
|
|
Packit |
fd8b60 |
# Copyright (c) 2013 Red Hat, Inc.
|
|
Packit |
fd8b60 |
#
|
|
Packit |
fd8b60 |
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
Packit |
fd8b60 |
# of this software and associated documentation files (the "Software"), to deal
|
|
Packit |
fd8b60 |
# in the Software without restriction, including without limitation the rights
|
|
Packit |
fd8b60 |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
Packit |
fd8b60 |
# copies of the Software, and to permit persons to whom the Software is
|
|
Packit |
fd8b60 |
# furnished to do so, subject to the following conditions:
|
|
Packit |
fd8b60 |
#
|
|
Packit |
fd8b60 |
# The above copyright notice and this permission notice shall be included in
|
|
Packit |
fd8b60 |
# all copies or substantial portions of the Software.
|
|
Packit |
fd8b60 |
#
|
|
Packit |
fd8b60 |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
Packit |
fd8b60 |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
Packit |
fd8b60 |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
Packit |
fd8b60 |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
Packit |
fd8b60 |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
Packit |
fd8b60 |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
|
Packit |
fd8b60 |
# THE SOFTWARE.
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
#
|
|
Packit |
fd8b60 |
# This script tests OTP, both UDP and Unix Sockets, with a variety of
|
|
Packit |
fd8b60 |
# configuration. It requires pyrad to run, but exits gracefully if not found.
|
|
Packit |
fd8b60 |
# It also deliberately shuts down the test daemons between tests in order to
|
|
Packit |
fd8b60 |
# test how OTP handles the case of short daemon restarts.
|
|
Packit |
fd8b60 |
#
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
from k5test import *
|
|
Packit |
fd8b60 |
from queue import Empty
|
|
Packit |
fd8b60 |
import io
|
|
Packit |
fd8b60 |
import struct
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
try:
|
|
Packit |
fd8b60 |
from pyrad import packet, dictionary
|
|
Packit |
fd8b60 |
except ImportError:
|
|
Packit |
fd8b60 |
skip_rest('OTP tests', 'Python pyrad module not found')
|
|
Packit |
fd8b60 |
try:
|
|
Packit |
fd8b60 |
from multiprocessing import Process, Queue
|
|
Packit |
fd8b60 |
except ImportError:
|
|
Packit |
fd8b60 |
skip_rest('OTP tests', 'Python version 2.6 required')
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
# We could use a dictionary file, but since we need so few attributes,
|
|
Packit |
fd8b60 |
# we'll just include them here.
|
|
Packit |
fd8b60 |
radius_attributes = '''
|
|
Packit |
fd8b60 |
ATTRIBUTE User-Name 1 string
|
|
Packit |
fd8b60 |
ATTRIBUTE User-Password 2 octets
|
|
Packit |
fd8b60 |
ATTRIBUTE Service-Type 6 integer
|
|
Packit |
fd8b60 |
ATTRIBUTE NAS-Identifier 32 string
|
|
Packit |
fd8b60 |
'''
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
class RadiusDaemon(Process):
|
|
Packit |
fd8b60 |
MAX_PACKET_SIZE = 4096
|
|
Packit |
fd8b60 |
DICTIONARY = dictionary.Dictionary(io.StringIO(radius_attributes))
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
def listen(self, addr):
|
|
Packit |
fd8b60 |
raise NotImplementedError()
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
def recvRequest(self, data):
|
|
Packit |
fd8b60 |
raise NotImplementedError()
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
def run(self):
|
|
Packit |
fd8b60 |
addr = self._args[0]
|
|
Packit |
fd8b60 |
secrfile = self._args[1]
|
|
Packit |
fd8b60 |
pswd = self._args[2]
|
|
Packit |
fd8b60 |
outq = self._args[3]
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
if secrfile:
|
|
Packit |
fd8b60 |
with open(secrfile, 'rb') as file:
|
|
Packit |
fd8b60 |
secr = file.read().strip()
|
|
Packit |
fd8b60 |
else:
|
|
Packit |
fd8b60 |
secr = b''
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
data = self.listen(addr)
|
|
Packit |
fd8b60 |
outq.put("started")
|
|
Packit |
fd8b60 |
(buf, sock, addr) = self.recvRequest(data)
|
|
Packit |
fd8b60 |
pkt = packet.AuthPacket(secret=secr,
|
|
Packit |
fd8b60 |
dict=RadiusDaemon.DICTIONARY,
|
|
Packit |
fd8b60 |
packet=buf)
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
usernm = []
|
|
Packit |
fd8b60 |
passwd = []
|
|
Packit |
fd8b60 |
for key in pkt.keys():
|
|
Packit |
fd8b60 |
if key == 'User-Password':
|
|
Packit |
fd8b60 |
passwd = list(map(pkt.PwDecrypt, pkt[key]))
|
|
Packit |
fd8b60 |
elif key == 'User-Name':
|
|
Packit |
fd8b60 |
usernm = pkt[key]
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
reply = pkt.CreateReply()
|
|
Packit |
fd8b60 |
replyq = {'user': usernm, 'pass': passwd}
|
|
Packit |
fd8b60 |
if passwd == [pswd]:
|
|
Packit |
fd8b60 |
reply.code = packet.AccessAccept
|
|
Packit |
fd8b60 |
replyq['reply'] = True
|
|
Packit |
fd8b60 |
else:
|
|
Packit |
fd8b60 |
reply.code = packet.AccessReject
|
|
Packit |
fd8b60 |
replyq['reply'] = False
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
outq.put(replyq)
|
|
Packit |
fd8b60 |
if addr is None:
|
|
Packit |
fd8b60 |
sock.send(reply.ReplyPacket())
|
|
Packit |
fd8b60 |
else:
|
|
Packit |
fd8b60 |
sock.sendto(reply.ReplyPacket(), addr)
|
|
Packit |
fd8b60 |
sock.close()
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
class UDPRadiusDaemon(RadiusDaemon):
|
|
Packit |
fd8b60 |
def listen(self, addr):
|
|
Packit |
fd8b60 |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
Packit |
fd8b60 |
sock.bind((addr.split(':')[0], int(addr.split(':')[1])))
|
|
Packit |
fd8b60 |
return sock
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
def recvRequest(self, sock):
|
|
Packit |
fd8b60 |
(buf, addr) = sock.recvfrom(RadiusDaemon.MAX_PACKET_SIZE)
|
|
Packit |
fd8b60 |
return (buf, sock, addr)
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
class UnixRadiusDaemon(RadiusDaemon):
|
|
Packit |
fd8b60 |
def listen(self, addr):
|
|
Packit |
fd8b60 |
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
Packit |
fd8b60 |
if os.path.exists(addr):
|
|
Packit |
fd8b60 |
os.remove(addr)
|
|
Packit |
fd8b60 |
sock.bind(addr)
|
|
Packit |
fd8b60 |
sock.listen(1)
|
|
Packit |
fd8b60 |
return (sock, addr)
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
def recvRequest(self, sock_and_addr):
|
|
Packit |
fd8b60 |
sock, addr = sock_and_addr
|
|
Packit |
fd8b60 |
conn = sock.accept()[0]
|
|
Packit |
fd8b60 |
sock.close()
|
|
Packit |
fd8b60 |
os.remove(addr)
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
buf = b''
|
|
Packit |
fd8b60 |
remain = RadiusDaemon.MAX_PACKET_SIZE
|
|
Packit |
fd8b60 |
while True:
|
|
Packit |
fd8b60 |
buf += conn.recv(remain)
|
|
Packit |
fd8b60 |
remain = RadiusDaemon.MAX_PACKET_SIZE - len(buf)
|
|
Packit |
fd8b60 |
if (len(buf) >= 4):
|
|
Packit |
fd8b60 |
remain = struct.unpack("!BBH", buf[0:4])[2] - len(buf)
|
|
Packit |
fd8b60 |
if (remain <= 0):
|
|
Packit |
fd8b60 |
return (buf, conn, None)
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
def verify(daemon, queue, reply, usernm, passwd):
|
|
Packit |
fd8b60 |
try:
|
|
Packit |
fd8b60 |
data = queue.get(timeout=1)
|
|
Packit |
fd8b60 |
except Empty:
|
|
Packit |
fd8b60 |
sys.stderr.write("ERROR: Packet not received by daemon!\n")
|
|
Packit |
fd8b60 |
daemon.terminate()
|
|
Packit |
fd8b60 |
sys.exit(1)
|
|
Packit |
fd8b60 |
assert data['reply'] is reply
|
|
Packit |
fd8b60 |
assert data['user'] == [usernm]
|
|
Packit |
fd8b60 |
assert data['pass'] == [passwd]
|
|
Packit |
fd8b60 |
daemon.join()
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
# Compose a single token configuration.
|
|
Packit |
fd8b60 |
def otpconfig_1(toktype, username=None, indicators=None):
|
|
Packit |
fd8b60 |
val = '{"type": "%s"' % toktype
|
|
Packit |
fd8b60 |
if username is not None:
|
|
Packit |
fd8b60 |
val += ', "username": "%s"' % username
|
|
Packit |
fd8b60 |
if indicators is not None:
|
|
Packit |
fd8b60 |
qind = ['"%s"' % s for s in indicators]
|
|
Packit |
fd8b60 |
jsonlist = '[' + ', '.join(qind) + ']'
|
|
Packit |
fd8b60 |
val += ', "indicators":' + jsonlist
|
|
Packit |
fd8b60 |
val += '}'
|
|
Packit |
fd8b60 |
return val
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
# Compose a token configuration list suitable for the "otp" string
|
|
Packit |
fd8b60 |
# attribute.
|
|
Packit |
fd8b60 |
def otpconfig(toktype, username=None, indicators=None):
|
|
Packit |
fd8b60 |
return '[' + otpconfig_1(toktype, username, indicators) + ']'
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
prefix = "/tmp/%d" % os.getpid()
|
|
Packit |
fd8b60 |
secret_file = prefix + ".secret"
|
|
Packit |
fd8b60 |
socket_file = prefix + ".socket"
|
|
Packit |
fd8b60 |
with open(secret_file, "w") as file:
|
|
Packit |
fd8b60 |
file.write("otptest")
|
|
Packit |
fd8b60 |
atexit.register(lambda: os.remove(secret_file))
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
conf = {'plugins': {'kdcpreauth': {'enable_only': 'otp'}},
|
|
Packit |
fd8b60 |
'otp': {'udp': {'server': '127.0.0.1:$port9',
|
|
Packit |
fd8b60 |
'secret': secret_file,
|
|
Packit |
fd8b60 |
'strip_realm': 'true',
|
|
Packit |
fd8b60 |
'indicator': ['indotp1', 'indotp2']},
|
|
Packit |
fd8b60 |
'unix': {'server': socket_file,
|
|
Packit |
fd8b60 |
'strip_realm': 'false'}}}
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
queue = Queue()
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
realm = K5Realm(kdc_conf=conf)
|
|
Packit |
fd8b60 |
realm.run([kadminl, 'modprinc', '+requires_preauth', realm.user_princ])
|
|
Packit |
fd8b60 |
flags = ['-T', realm.ccache]
|
|
Packit |
fd8b60 |
server_addr = '127.0.0.1:' + str(realm.portbase + 9)
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
## Test UDP fail / custom username
|
|
Packit |
fd8b60 |
mark('UDP fail / custom username')
|
|
Packit |
fd8b60 |
daemon = UDPRadiusDaemon(args=(server_addr, secret_file, 'accept', queue))
|
|
Packit |
fd8b60 |
daemon.start()
|
|
Packit |
fd8b60 |
queue.get()
|
|
Packit |
fd8b60 |
realm.run([kadminl, 'setstr', realm.user_princ, 'otp',
|
|
Packit |
fd8b60 |
otpconfig('udp', 'custom')])
|
|
Packit |
fd8b60 |
realm.kinit(realm.user_princ, 'reject', flags=flags, expected_code=1)
|
|
Packit |
fd8b60 |
verify(daemon, queue, False, 'custom', 'reject')
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
## Test UDP success / standard username
|
|
Packit |
fd8b60 |
mark('UDP success / standard username')
|
|
Packit |
fd8b60 |
daemon = UDPRadiusDaemon(args=(server_addr, secret_file, 'accept', queue))
|
|
Packit |
fd8b60 |
daemon.start()
|
|
Packit |
fd8b60 |
queue.get()
|
|
Packit |
fd8b60 |
realm.run([kadminl, 'setstr', realm.user_princ, 'otp', otpconfig('udp')])
|
|
Packit |
fd8b60 |
realm.kinit(realm.user_princ, 'accept', flags=flags)
|
|
Packit |
fd8b60 |
verify(daemon, queue, True, realm.user_princ.split('@')[0], 'accept')
|
|
Packit |
fd8b60 |
realm.extract_keytab(realm.krbtgt_princ, realm.keytab)
|
|
Packit |
fd8b60 |
realm.run(['./adata', realm.krbtgt_princ],
|
|
Packit |
fd8b60 |
expected_msg='+97: [indotp1, indotp2]')
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
# Repeat with an indicators override in the string attribute.
|
|
Packit |
fd8b60 |
mark('auth indicator override')
|
|
Packit |
fd8b60 |
daemon = UDPRadiusDaemon(args=(server_addr, secret_file, 'accept', queue))
|
|
Packit |
fd8b60 |
daemon.start()
|
|
Packit |
fd8b60 |
queue.get()
|
|
Packit |
fd8b60 |
oconf = otpconfig('udp', indicators=['indtok1', 'indtok2'])
|
|
Packit |
fd8b60 |
realm.run([kadminl, 'setstr', realm.user_princ, 'otp', oconf])
|
|
Packit |
fd8b60 |
realm.kinit(realm.user_princ, 'accept', flags=flags)
|
|
Packit |
fd8b60 |
verify(daemon, queue, True, realm.user_princ.split('@')[0], 'accept')
|
|
Packit |
fd8b60 |
realm.extract_keytab(realm.krbtgt_princ, realm.keytab)
|
|
Packit |
fd8b60 |
realm.run(['./adata', realm.krbtgt_princ],
|
|
Packit |
fd8b60 |
expected_msg='+97: [indtok1, indtok2]')
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
# Detect upstream pyrad bug
|
|
Packit |
fd8b60 |
# https://github.com/wichert/pyrad/pull/18
|
|
Packit |
fd8b60 |
try:
|
|
Packit |
fd8b60 |
auth = packet.Packet.CreateAuthenticator()
|
|
Packit |
fd8b60 |
packet.Packet(authenticator=auth, secret=b'').ReplyPacket()
|
|
Packit |
fd8b60 |
except AssertionError:
|
|
Packit |
fd8b60 |
skip_rest('OTP UNIX domain socket tests', 'pyrad assertion bug detected')
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
## Test Unix fail / custom username
|
|
Packit |
fd8b60 |
mark('Unix socket fail / custom username')
|
|
Packit |
fd8b60 |
daemon = UnixRadiusDaemon(args=(socket_file, None, 'accept', queue))
|
|
Packit |
fd8b60 |
daemon.start()
|
|
Packit |
fd8b60 |
queue.get()
|
|
Packit |
fd8b60 |
realm.run([kadminl, 'setstr', realm.user_princ, 'otp',
|
|
Packit |
fd8b60 |
otpconfig('unix', 'custom')])
|
|
Packit |
fd8b60 |
realm.kinit(realm.user_princ, 'reject', flags=flags, expected_code=1)
|
|
Packit |
fd8b60 |
verify(daemon, queue, False, 'custom', 'reject')
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
## Test Unix success / standard username
|
|
Packit |
fd8b60 |
mark('Unix socket success / standard username')
|
|
Packit |
fd8b60 |
daemon = UnixRadiusDaemon(args=(socket_file, None, 'accept', queue))
|
|
Packit |
fd8b60 |
daemon.start()
|
|
Packit |
fd8b60 |
queue.get()
|
|
Packit |
fd8b60 |
realm.run([kadminl, 'setstr', realm.user_princ, 'otp', otpconfig('unix')])
|
|
Packit |
fd8b60 |
realm.kinit(realm.user_princ, 'accept', flags=flags)
|
|
Packit |
fd8b60 |
verify(daemon, queue, True, realm.user_princ, 'accept')
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
## Regression test for #8708: test with the standard username and two
|
|
Packit |
fd8b60 |
## tokens configured, with the first rejecting and the second
|
|
Packit |
fd8b60 |
## accepting. With the bug, the KDC incorrectly rejects the request
|
|
Packit |
fd8b60 |
## and then performs invalid memory accesses, most likely crashing.
|
|
Packit |
fd8b60 |
daemon1 = UDPRadiusDaemon(args=(server_addr, secret_file, 'accept1', queue))
|
|
Packit |
fd8b60 |
daemon2 = UnixRadiusDaemon(args=(socket_file, None, 'accept2', queue))
|
|
Packit |
fd8b60 |
daemon1.start()
|
|
Packit |
fd8b60 |
queue.get()
|
|
Packit |
fd8b60 |
daemon2.start()
|
|
Packit |
fd8b60 |
queue.get()
|
|
Packit |
fd8b60 |
oconf = '[' + otpconfig_1('udp') + ', ' + otpconfig_1('unix') + ']'
|
|
Packit |
fd8b60 |
realm.run([kadminl, 'setstr', realm.user_princ, 'otp', oconf])
|
|
Packit |
fd8b60 |
realm.kinit(realm.user_princ, 'accept2', flags=flags)
|
|
Packit |
fd8b60 |
verify(daemon1, queue, False, realm.user_princ.split('@')[0], 'accept2')
|
|
Packit |
fd8b60 |
verify(daemon2, queue, True, realm.user_princ, 'accept2')
|
|
Packit |
fd8b60 |
|
|
Packit |
fd8b60 |
success('OTP tests')
|