|
Packit Service |
ce9a3e |
#! /usr/libexec/platform-python
|
|
Packit Service |
f6b565 |
# vim: set expandtab shiftwidth=4:
|
|
Packit Service |
f6b565 |
# -*- Mode: python; coding: utf-8; indent-tabs-mode: nil -*- */
|
|
Packit Service |
f6b565 |
#
|
|
Packit Service |
f6b565 |
# Copyright © 2018 Red Hat, Inc.
|
|
Packit Service |
f6b565 |
#
|
|
Packit Service |
f6b565 |
# Permission is hereby granted, free of charge, to any person obtaining a
|
|
Packit Service |
f6b565 |
# copy of this software and associated documentation files (the "Software"),
|
|
Packit Service |
f6b565 |
# to deal in the Software without restriction, including without limitation
|
|
Packit Service |
f6b565 |
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
|
Packit Service |
f6b565 |
# and/or sell copies of the Software, and to permit persons to whom the
|
|
Packit Service |
f6b565 |
# Software is furnished to do so, subject to the following conditions:
|
|
Packit Service |
f6b565 |
#
|
|
Packit Service |
f6b565 |
# The above copyright notice and this permission notice (including the next
|
|
Packit Service |
f6b565 |
# paragraph) shall be included in all copies or substantial portions of the
|
|
Packit Service |
f6b565 |
# Software.
|
|
Packit Service |
f6b565 |
#
|
|
Packit Service |
f6b565 |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
Packit Service |
f6b565 |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
Packit Service |
f6b565 |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
|
|
Packit Service |
f6b565 |
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
Packit Service |
f6b565 |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
Packit Service |
f6b565 |
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
|
Packit Service |
f6b565 |
# DEALINGS IN THE SOFTWARE.
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
import os
|
|
Packit Service |
f6b565 |
import sys
|
|
Packit Service |
f6b565 |
import time
|
|
Packit Service |
f6b565 |
import multiprocessing
|
|
Packit Service |
f6b565 |
import argparse
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
try:
|
|
Packit Service |
f6b565 |
import libevdev
|
|
Packit Service |
f6b565 |
import yaml
|
|
Packit Service |
f6b565 |
except ModuleNotFoundError as e:
|
|
Packit Service |
f6b565 |
print('Error: {}'.format(e), file=sys.stderr)
|
|
Packit Service |
f6b565 |
print('One or more python modules are missing. Please install those '
|
|
Packit Service |
f6b565 |
'modules and re-run this tool.')
|
|
Packit Service |
f6b565 |
sys.exit(1)
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
SUPPORTED_FILE_VERSION = 1
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
def error(msg, **kwargs):
|
|
Packit Service |
f6b565 |
print(msg, **kwargs, file=sys.stderr)
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
class YamlException(Exception):
|
|
Packit Service |
f6b565 |
pass
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
def fetch(yaml, key):
|
|
Packit Service |
f6b565 |
'''Helper function to avoid confusing a YAML error with a
|
|
Packit Service |
f6b565 |
normal KeyError bug'''
|
|
Packit Service |
f6b565 |
try:
|
|
Packit Service |
f6b565 |
return yaml[key]
|
|
Packit Service |
f6b565 |
except KeyError:
|
|
Packit Service |
f6b565 |
raise YamlException('Failed to get \'{}\' from recording.'.format(key))
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
def create(device):
|
|
Packit Service |
f6b565 |
evdev = fetch(device, 'evdev')
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
d = libevdev.Device()
|
|
Packit Service |
f6b565 |
d.name = fetch(evdev, 'name')
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
ids = fetch(evdev, 'id')
|
|
Packit Service |
f6b565 |
if len(ids) != 4:
|
|
Packit Service |
f6b565 |
raise YamlException('Invalid ID format: {}'.format(ids))
|
|
Packit Service |
f6b565 |
d.id = dict(zip(['bustype', 'vendor', 'product', 'version'], ids))
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
codes = fetch(evdev, 'codes')
|
|
Packit Service |
f6b565 |
for evtype, evcodes in codes.items():
|
|
Packit Service |
f6b565 |
for code in evcodes:
|
|
Packit Service |
f6b565 |
data = None
|
|
Packit Service |
f6b565 |
if evtype == libevdev.EV_ABS.value:
|
|
Packit Service |
f6b565 |
values = fetch(evdev, 'absinfo')[code]
|
|
Packit Service |
f6b565 |
absinfo = libevdev.InputAbsInfo(minimum=values[0],
|
|
Packit Service |
f6b565 |
maximum=values[1],
|
|
Packit Service |
f6b565 |
fuzz=values[2],
|
|
Packit Service |
f6b565 |
flat=values[3],
|
|
Packit Service |
f6b565 |
resolution=values[4])
|
|
Packit Service |
f6b565 |
data = absinfo
|
|
Packit Service |
f6b565 |
elif evtype == libevdev.EV_REP.value:
|
|
Packit Service |
f6b565 |
if code == libevdev.EV_REP.REP_DELAY.value:
|
|
Packit Service |
f6b565 |
data = 500
|
|
Packit Service |
f6b565 |
elif code == libevdev.EV_REP.REP_PERIOD.value:
|
|
Packit Service |
f6b565 |
data = 20
|
|
Packit Service |
f6b565 |
d.enable(libevdev.evbit(evtype, code), data=data)
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
properties = fetch(evdev, 'properties')
|
|
Packit Service |
f6b565 |
for prop in properties:
|
|
Packit Service |
f6b565 |
d.enable(libevdev.propbit(prop))
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
uinput = d.create_uinput_device()
|
|
Packit Service |
f6b565 |
return uinput
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
def print_events(devnode, indent, evs):
|
|
Packit Service |
f6b565 |
devnode = os.path.basename(devnode)
|
|
Packit Service |
f6b565 |
for e in evs:
|
|
Packit Service |
f6b565 |
print("{}: {}{:06d}.{:06d} {} / {:<20s} {:4d}".format(
|
|
Packit Service |
f6b565 |
devnode, ' ' * (indent * 8), e.sec, e.usec, e.type.name, e.code.name, e.value))
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
def replay(device, verbose):
|
|
Packit Service |
f6b565 |
events = fetch(device, 'events')
|
|
Packit Service |
f6b565 |
if events is None:
|
|
Packit Service |
f6b565 |
return
|
|
Packit Service |
f6b565 |
uinput = device['__uinput']
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
offset = time.time()
|
|
Packit Service |
f6b565 |
handled_first_event = False
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
# each 'evdev' set contains one SYN_REPORT so we only need to check for
|
|
Packit Service |
f6b565 |
# the time offset once per event
|
|
Packit Service |
f6b565 |
for event in events:
|
|
Packit Service |
f6b565 |
try:
|
|
Packit Service |
f6b565 |
evdev = fetch(event, 'evdev')
|
|
Packit Service |
f6b565 |
except YamlException:
|
|
Packit Service |
f6b565 |
continue
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
(sec, usec, evtype, evcode, value) = evdev[0]
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
# The first event may have a nonzero offset but we want to replay
|
|
Packit Service |
f6b565 |
# immediately regardless.
|
|
Packit Service |
f6b565 |
if not handled_first_event:
|
|
Packit Service |
f6b565 |
offset -= sec + usec/1.e6
|
|
Packit Service |
f6b565 |
handled_first_event = True
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
evtime = sec + usec/1e6 + offset
|
|
Packit Service |
f6b565 |
now = time.time()
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
if evtime - now > 150/1e6: # 150 µs error margin
|
|
Packit Service |
f6b565 |
time.sleep(evtime - now - 150/1e6)
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
evs = [libevdev.InputEvent(libevdev.evbit(e[2], e[3]), value=e[4], sec=e[0], usec=e[1]) for e in evdev]
|
|
Packit Service |
f6b565 |
uinput.send_events(evs)
|
|
Packit Service |
f6b565 |
if verbose:
|
|
Packit Service |
f6b565 |
print_events(uinput.devnode, device['__index'], evs)
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
def wrap(func, *args):
|
|
Packit Service |
f6b565 |
try:
|
|
Packit Service |
f6b565 |
func(*args)
|
|
Packit Service |
f6b565 |
except KeyboardInterrupt:
|
|
Packit Service |
f6b565 |
pass
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
def loop(args, recording):
|
|
Packit Service |
f6b565 |
version = fetch(recording, 'version')
|
|
Packit Service |
f6b565 |
if version != SUPPORTED_FILE_VERSION:
|
|
Packit Service |
f6b565 |
raise YamlException('Invalid file format: {}, expected {}'.format(version, SUPPORTED_FILE_VERSION))
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
ndevices = fetch(recording, 'ndevices')
|
|
Packit Service |
f6b565 |
devices = fetch(recording, 'devices')
|
|
Packit Service |
f6b565 |
if ndevices != len(devices):
|
|
Packit Service |
f6b565 |
error('WARNING: truncated file, expected {} devices, got {}'.format(ndevices, len(devices)))
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
for idx, d in enumerate(devices):
|
|
Packit Service |
f6b565 |
uinput = create(d)
|
|
Packit Service |
f6b565 |
print('{}: {}'.format(uinput.devnode, uinput.name))
|
|
Packit Service |
f6b565 |
d['__uinput'] = uinput # cheaper to hide it in the dict then work around it
|
|
Packit Service |
f6b565 |
d['__index'] = idx
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
stop = False
|
|
Packit Service |
f6b565 |
while not stop:
|
|
Packit Service |
f6b565 |
input('Hit enter to start replaying')
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
processes = []
|
|
Packit Service |
f6b565 |
for d in devices:
|
|
Packit Service |
f6b565 |
p = multiprocessing.Process(target=wrap, args=(replay, d, args.verbose))
|
|
Packit Service |
f6b565 |
processes.append(p)
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
for p in processes:
|
|
Packit Service |
f6b565 |
p.start()
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
for p in processes:
|
|
Packit Service |
f6b565 |
p.join()
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
del processes
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
def main():
|
|
Packit Service |
f6b565 |
parser = argparse.ArgumentParser(
|
|
Packit Service |
f6b565 |
description='Replay a device recording'
|
|
Packit Service |
f6b565 |
)
|
|
Packit Service |
f6b565 |
parser.add_argument('recording', metavar='recorded-file.yaml',
|
|
Packit Service |
f6b565 |
type=str, help='Path to device recording')
|
|
Packit Service |
f6b565 |
parser.add_argument('--verbose', action='store_true')
|
|
Packit Service |
f6b565 |
args = parser.parse_args()
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
try:
|
|
Packit Service |
f6b565 |
with open(args.recording) as f:
|
|
Packit Service |
f6b565 |
y = yaml.safe_load(f)
|
|
Packit Service |
f6b565 |
loop(args, y)
|
|
Packit Service |
f6b565 |
except KeyboardInterrupt:
|
|
Packit Service |
f6b565 |
pass
|
|
Packit Service |
f6b565 |
except (PermissionError, OSError) as e:
|
|
Packit Service |
f6b565 |
error('Error: failed to open device: {}'.format(e))
|
|
Packit Service |
f6b565 |
except YamlException as e:
|
|
Packit Service |
f6b565 |
error('Error: failed to parse recording: {}'.format(e))
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
|
|
Packit Service |
f6b565 |
if __name__ == '__main__':
|
|
Packit Service |
f6b565 |
main()
|