Blame tools/libinput-replay

Packit Service ce9a3e
#! /usr/libexec/platform-python
Packit Service f6b565
# vim: set expandtab shiftwidth=4:
Packit Service f6b565
# -*- Mode: python; coding: utf-8; indent-tabs-mode: nil -*- */
Packit Service f6b565
#
Packit Service f6b565
# Copyright © 2018 Red Hat, Inc.
Packit Service f6b565
#
Packit Service f6b565
# Permission is hereby granted, free of charge, to any person obtaining a
Packit Service f6b565
# copy of this software and associated documentation files (the "Software"),
Packit Service f6b565
# to deal in the Software without restriction, including without limitation
Packit Service f6b565
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
Packit Service f6b565
# and/or sell copies of the Software, and to permit persons to whom the
Packit Service f6b565
# Software is furnished to do so, subject to the following conditions:
Packit Service f6b565
#
Packit Service f6b565
# The above copyright notice and this permission notice (including the next
Packit Service f6b565
# paragraph) shall be included in all copies or substantial portions of the
Packit Service f6b565
# Software.
Packit Service f6b565
#
Packit Service f6b565
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
Packit Service f6b565
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
Packit Service f6b565
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL
Packit Service f6b565
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
Packit Service f6b565
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
Packit Service f6b565
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
Packit Service f6b565
# DEALINGS IN THE SOFTWARE.
Packit Service f6b565
Packit Service f6b565
import os
Packit Service f6b565
import sys
Packit Service f6b565
import time
Packit Service f6b565
import multiprocessing
Packit Service f6b565
import argparse
Packit Service f6b565
Packit Service f6b565
try:
Packit Service f6b565
    import libevdev
Packit Service f6b565
    import yaml
Packit Service f6b565
except ModuleNotFoundError as e:
Packit Service f6b565
    print('Error: {}'.format(e), file=sys.stderr)
Packit Service f6b565
    print('One or more python modules are missing. Please install those '
Packit Service f6b565
          'modules and re-run this tool.')
Packit Service f6b565
    sys.exit(1)
Packit Service f6b565
Packit Service f6b565
Packit Service f6b565
SUPPORTED_FILE_VERSION = 1
Packit Service f6b565
Packit Service f6b565
Packit Service f6b565
def error(msg, **kwargs):
Packit Service f6b565
    print(msg, **kwargs, file=sys.stderr)
Packit Service f6b565
Packit Service f6b565
Packit Service f6b565
class YamlException(Exception):
Packit Service f6b565
    pass
Packit Service f6b565
Packit Service f6b565
Packit Service f6b565
def fetch(yaml, key):
Packit Service f6b565
    '''Helper function to avoid confusing a YAML error with a
Packit Service f6b565
    normal KeyError bug'''
Packit Service f6b565
    try:
Packit Service f6b565
        return yaml[key]
Packit Service f6b565
    except KeyError:
Packit Service f6b565
        raise YamlException('Failed to get \'{}\' from recording.'.format(key))
Packit Service f6b565
Packit Service f6b565
Packit Service f6b565
def create(device):
Packit Service f6b565
    evdev = fetch(device, 'evdev')
Packit Service f6b565
Packit Service f6b565
    d = libevdev.Device()
Packit Service f6b565
    d.name = fetch(evdev, 'name')
Packit Service f6b565
Packit Service f6b565
    ids = fetch(evdev, 'id')
Packit Service f6b565
    if len(ids) != 4:
Packit Service f6b565
        raise YamlException('Invalid ID format: {}'.format(ids))
Packit Service f6b565
    d.id = dict(zip(['bustype', 'vendor', 'product', 'version'], ids))
Packit Service f6b565
Packit Service f6b565
    codes = fetch(evdev, 'codes')
Packit Service f6b565
    for evtype, evcodes in codes.items():
Packit Service f6b565
        for code in evcodes:
Packit Service f6b565
            data = None
Packit Service f6b565
            if evtype == libevdev.EV_ABS.value:
Packit Service f6b565
                values = fetch(evdev, 'absinfo')[code]
Packit Service f6b565
                absinfo = libevdev.InputAbsInfo(minimum=values[0],
Packit Service f6b565
                                                maximum=values[1],
Packit Service f6b565
                                                fuzz=values[2],
Packit Service f6b565
                                                flat=values[3],
Packit Service f6b565
                                                resolution=values[4])
Packit Service f6b565
                data = absinfo
Packit Service f6b565
            elif evtype == libevdev.EV_REP.value:
Packit Service f6b565
                if code == libevdev.EV_REP.REP_DELAY.value:
Packit Service f6b565
                    data = 500
Packit Service f6b565
                elif code == libevdev.EV_REP.REP_PERIOD.value:
Packit Service f6b565
                    data = 20
Packit Service f6b565
            d.enable(libevdev.evbit(evtype, code), data=data)
Packit Service f6b565
Packit Service f6b565
    properties = fetch(evdev, 'properties')
Packit Service f6b565
    for prop in properties:
Packit Service f6b565
        d.enable(libevdev.propbit(prop))
Packit Service f6b565
Packit Service f6b565
    uinput = d.create_uinput_device()
Packit Service f6b565
    return uinput
Packit Service f6b565
Packit Service f6b565
Packit Service f6b565
def print_events(devnode, indent, evs):
Packit Service f6b565
    devnode = os.path.basename(devnode)
Packit Service f6b565
    for e in evs:
Packit Service f6b565
        print("{}: {}{:06d}.{:06d} {} / {:<20s} {:4d}".format(
Packit Service f6b565
            devnode, ' ' * (indent * 8), e.sec, e.usec, e.type.name, e.code.name, e.value))
Packit Service f6b565
Packit Service f6b565
Packit Service f6b565
def replay(device, verbose):
Packit Service f6b565
    events = fetch(device, 'events')
Packit Service f6b565
    if events is None:
Packit Service f6b565
        return
Packit Service f6b565
    uinput = device['__uinput']
Packit Service f6b565
Packit Service f6b565
    offset = time.time()
Packit Service f6b565
    handled_first_event = False
Packit Service f6b565
Packit Service f6b565
    # each 'evdev' set contains one SYN_REPORT so we only need to check for
Packit Service f6b565
    # the time offset once per event
Packit Service f6b565
    for event in events:
Packit Service f6b565
        try:
Packit Service f6b565
            evdev = fetch(event, 'evdev')
Packit Service f6b565
        except YamlException:
Packit Service f6b565
            continue
Packit Service f6b565
Packit Service f6b565
        (sec, usec, evtype, evcode, value) = evdev[0]
Packit Service f6b565
Packit Service f6b565
        # The first event may have a nonzero offset but we want to replay
Packit Service f6b565
        # immediately regardless.
Packit Service f6b565
        if not handled_first_event:
Packit Service f6b565
            offset -= sec + usec/1.e6
Packit Service f6b565
            handled_first_event = True
Packit Service f6b565
Packit Service f6b565
        evtime = sec + usec/1e6 + offset
Packit Service f6b565
        now = time.time()
Packit Service f6b565
Packit Service f6b565
        if evtime - now > 150/1e6:  # 150 µs error margin
Packit Service f6b565
            time.sleep(evtime - now - 150/1e6)
Packit Service f6b565
Packit Service f6b565
        evs = [libevdev.InputEvent(libevdev.evbit(e[2], e[3]), value=e[4], sec=e[0], usec=e[1]) for e in evdev]
Packit Service f6b565
        uinput.send_events(evs)
Packit Service f6b565
        if verbose:
Packit Service f6b565
            print_events(uinput.devnode, device['__index'], evs)
Packit Service f6b565
Packit Service f6b565
Packit Service f6b565
def wrap(func, *args):
Packit Service f6b565
    try:
Packit Service f6b565
        func(*args)
Packit Service f6b565
    except KeyboardInterrupt:
Packit Service f6b565
        pass
Packit Service f6b565
Packit Service f6b565
Packit Service f6b565
def loop(args, recording):
Packit Service f6b565
    version = fetch(recording, 'version')
Packit Service f6b565
    if version != SUPPORTED_FILE_VERSION:
Packit Service f6b565
        raise YamlException('Invalid file format: {}, expected {}'.format(version, SUPPORTED_FILE_VERSION))
Packit Service f6b565
Packit Service f6b565
    ndevices = fetch(recording, 'ndevices')
Packit Service f6b565
    devices = fetch(recording, 'devices')
Packit Service f6b565
    if ndevices != len(devices):
Packit Service f6b565
        error('WARNING: truncated file, expected {} devices, got {}'.format(ndevices, len(devices)))
Packit Service f6b565
Packit Service f6b565
    for idx, d in enumerate(devices):
Packit Service f6b565
        uinput = create(d)
Packit Service f6b565
        print('{}: {}'.format(uinput.devnode, uinput.name))
Packit Service f6b565
        d['__uinput'] = uinput  # cheaper to hide it in the dict then work around it
Packit Service f6b565
        d['__index'] = idx
Packit Service f6b565
Packit Service f6b565
    stop = False
Packit Service f6b565
    while not stop:
Packit Service f6b565
        input('Hit enter to start replaying')
Packit Service f6b565
Packit Service f6b565
        processes = []
Packit Service f6b565
        for d in devices:
Packit Service f6b565
            p = multiprocessing.Process(target=wrap, args=(replay, d, args.verbose))
Packit Service f6b565
            processes.append(p)
Packit Service f6b565
Packit Service f6b565
        for p in processes:
Packit Service f6b565
            p.start()
Packit Service f6b565
Packit Service f6b565
        for p in processes:
Packit Service f6b565
            p.join()
Packit Service f6b565
Packit Service f6b565
        del processes
Packit Service f6b565
Packit Service f6b565
Packit Service f6b565
def main():
Packit Service f6b565
    parser = argparse.ArgumentParser(
Packit Service f6b565
            description='Replay a device recording'
Packit Service f6b565
    )
Packit Service f6b565
    parser.add_argument('recording', metavar='recorded-file.yaml',
Packit Service f6b565
                        type=str, help='Path to device recording')
Packit Service f6b565
    parser.add_argument('--verbose', action='store_true')
Packit Service f6b565
    args = parser.parse_args()
Packit Service f6b565
Packit Service f6b565
    try:
Packit Service f6b565
        with open(args.recording) as f:
Packit Service f6b565
            y = yaml.safe_load(f)
Packit Service f6b565
            loop(args, y)
Packit Service f6b565
    except KeyboardInterrupt:
Packit Service f6b565
        pass
Packit Service f6b565
    except (PermissionError, OSError) as e:
Packit Service f6b565
        error('Error: failed to open device: {}'.format(e))
Packit Service f6b565
    except YamlException as e:
Packit Service f6b565
        error('Error: failed to parse recording: {}'.format(e))
Packit Service f6b565
Packit Service f6b565
Packit Service f6b565
if __name__ == '__main__':
Packit Service f6b565
    main()