Blob Blame History Raw
#! /usr/bin/env python3
# Copyright © 2017, 2019 Red Hat, Inc
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
# Authors:
#       Christian J. Kellner <christian@kellner.me>
#       Benjamin Berg <bberg@redhat.com>

import unittest
import time
import subprocess
import os
import os.path
import sys
import tempfile
import glob
import shutil
import socket
import struct
import dbusmock
import gi
from gi.repository import GLib, Gio
import cairo

try:
    from subprocess import DEVNULL
except ImportError:
    DEVNULL = open(os.devnull, 'wb')

SERVICE_FILE = '/usr/share/dbus-1/system-services/net.reactivated.Fprint.service'

def get_timeout(topic='default'):
    vals = {
        'valgrind': {
            'test': 300,
            'default': 20,
            'daemon_start': 60
        },
        'default': {
            'test': 60,
            'default': 3,
            'daemon_start': 5
        }
    }

    valgrind = os.getenv('VALGRIND')
    lut = vals['valgrind' if valgrind is not None else 'default']
    if topic not in lut:
        raise ValueError('invalid topic')
    return lut[topic]


# Copied from libfprint tests
class Connection:

    def __init__(self, addr):
        self.addr = addr

    def __enter__(self):
        self.con = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        self.con.connect(self.addr)
        return self.con

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.con.close()
        del self.con

def load_image(img):
    png = cairo.ImageSurface.create_from_png(img)

    # Cairo wants 4 byte aligned rows, so just add a few pixel if necessary
    w = png.get_width()
    h = png.get_height()
    w = (w + 3) // 4 * 4
    h = (h + 3) // 4 * 4
    img = cairo.ImageSurface(cairo.Format.A8, w, h)
    cr = cairo.Context(img)

    cr.set_source_rgba(1, 1, 1, 1)
    cr.paint()

    cr.set_source_rgba(0, 0, 0, 0)
    cr.set_operator(cairo.OPERATOR_SOURCE)

    cr.set_source_surface(png)
    cr.paint()

    return img

if hasattr(os.environ, 'TOPSRCDIR'):
    root = os.environ['TOPSRCDIR']
else:
    root = os.path.join(os.path.dirname(__file__), '..')

imgdir = os.path.join(root, 'tests', 'prints')

ctx = GLib.main_context_default()

class FPrintdTest(dbusmock.DBusTestCase):

    @staticmethod
    def path_from_service_file(sf):
        with open(SERVICE_FILE) as f:
                for line in f:
                    if not line.startswith('Exec='):
                        continue
                    return line.split('=', 1)[1].strip()
        return None

    @classmethod
    def setUpClass(cls):
        fprintd = None

        if 'FPRINT_BUILD_DIR' in os.environ:
            print('Testing local build')
            build_dir = os.environ['FPRINT_BUILD_DIR']
            fprintd = os.path.join(build_dir, 'fprintd')
        elif 'UNDER_JHBUILD' in os.environ:
            print('Testing JHBuild version')
            jhbuild_prefix = os.environ['JHBUILD_PREFIX']
            fprintd = os.path.join(jhbuild_prefix, 'libexec', 'fprintd')
        else:
            print('Testing installed system binaries')
            fprintd = cls.path_from_service_file(SERVICE_FILE)

        assert fprintd is not None, 'failed to find daemon'
        cls.paths = {'daemon': fprintd }


        cls.tmpdir = tempfile.mkdtemp(prefix='libfprint-')

        cls.sockaddr = os.path.join(cls.tmpdir, 'virtual-image.socket')
        os.environ['FP_VIRTUAL_IMAGE'] = cls.sockaddr

        cls.prints = {}
        for f in glob.glob(os.path.join(imgdir, '*.png')):
            n = os.path.basename(f)[:-4]
            cls.prints[n] = load_image(f)


        cls.test_bus = Gio.TestDBus.new(Gio.TestDBusFlags.NONE)
        cls.test_bus.up()
        try:
            del os.environ['DBUS_SESSION_BUS_ADDRESS']
        except KeyError:
            pass
        os.environ['DBUS_SYSTEM_BUS_ADDRESS'] = cls.test_bus.get_bus_address()
        cls.dbus = Gio.bus_get_sync(Gio.BusType.SYSTEM, None)

    @classmethod
    def tearDownClass(cls):
        cls.test_bus.down()
        shutil.rmtree(cls.tmpdir)
        dbusmock.DBusTestCase.tearDownClass()


    def daemon_start(self):
        timeout = get_timeout('daemon_start')  # seconds
        env = os.environ.copy()
        env['G_DEBUG'] = 'fatal-criticals'
        env['STATE_DIRECTORY'] = self.state_dir
        env['RUNTIME_DIRECTORY'] = self.run_dir

        argv = [self.paths['daemon'], '-t']
        valgrind = os.getenv('VALGRIND')
        if valgrind is not None:
            argv.insert(0, 'valgrind')
            argv.insert(1, '--leak-check=full')
            if os.path.exists(valgrind):
                argv.insert(2, '--suppressions=%s' % valgrind)
            self.valgrind = True
        self.daemon = subprocess.Popen(argv,
                                       env=env,
                                       stdout=None,
                                       stderr=subprocess.STDOUT)
        self.device = None

        timeout_count = timeout * 10
        timeout_sleep = 0.1
        while timeout_count > 0:
            time.sleep(timeout_sleep)
            timeout_count -= 1
            try:
                self.manager = Gio.DBusProxy.new_sync(self.dbus,
                                                      Gio.DBusProxyFlags.DO_NOT_AUTO_START,
                                                      None,
                                                      'net.reactivated.Fprint',
                                                      '/net/reactivated/Fprint/Manager',
                                                      'net.reactivated.Fprint.Manager',
                                                      None)

                devices = self.manager.GetDevices()
                # Find the virtual device, just in case it is a local run
                # and there is another usable sensor available locally
                for path in devices:
                    dev = Gio.DBusProxy.new_sync(self.dbus,
                                                 Gio.DBusProxyFlags.DO_NOT_AUTO_START,
                                                 None,
                                                 'net.reactivated.Fprint',
                                                 path,
                                                 'net.reactivated.Fprint.Device',
                                                 None)

                    if 'Virtual image device' in str(dev.get_cached_property('name')):
                        self.device = dev
                        break
                else:
                    print('Did not find virtual device! Probably libfprint was build without the corresponding driver!')

                break
            except GLib.GError:
                pass
        else:
            timeout_time = timeout * 10 * timeout_sleep
            self.fail('daemon did not start in %d seconds' % timeout_time)

    def daemon_stop(self):

        if self.daemon:
            try:
                self.daemon.terminate()
            except OSError:
                pass
            self.daemon.wait()

        self.daemon = None
        self.client = None

    def polkitd_start(self):
        self._polkitd, self._polkitd_obj = self.spawn_server_template(
            'polkitd', {}, stdout=DEVNULL)

    def polkitd_stop(self):
        if self._polkitd is None:
            return
        self._polkitd.terminate()
        self._polkitd.wait()



    def setUp(self):
        self.test_dir = tempfile.mkdtemp()
        self.state_dir = os.path.join(self.test_dir, 'state')
        self.run_dir = os.path.join(self.test_dir, 'run')

    def tearDown(self):
        shutil.rmtree(self.test_dir)

    # From libfprint tests
    def send_retry(self, retry_error=1):
        # The default (1) is too-short
        with Connection(self.sockaddr) as con:
            con.sendall(struct.pack('ii', -1, retry_error))

    # From libfprint tests
    def send_image(self, image):
        img = self.prints[image]
        with Connection(self.sockaddr) as con:
            mem = img.get_data()
            mem = mem.tobytes()
            assert len(mem) == img.get_width() * img.get_height()

            encoded_img = struct.pack('ii', img.get_width(), img.get_height())
            encoded_img += mem

            con.sendall(encoded_img)

    def test_enroll_verify_delete(self):
        self.polkitd_start()
        self.daemon_start()

        if self.device is None:
            self.daemon_stop()
            self.polkitd_stop()
            self.skipTest("Need virtual_image device to run the test")

        def timeout_cb(*args):
            # Note: With meson we could just rely on it to kill us
            print("Test timed out, hard exiting")
            sys.exit(1)

        timeout = GLib.timeout_add(get_timeout('test') * 1000, timeout_cb)

        self._polkitd_obj.SetAllowed(['net.reactivated.fprint.device.setusername',
                                      'net.reactivated.fprint.device.enroll',
                                      'net.reactivated.fprint.device.verify'])

        def signal_cb(proxy, sender, signal, params):
            print(signal, params)
            if signal == 'EnrollStatus':
                self._abort = params[1]
                self._last_result = params[0]

                if not self._abort and self._last_result == 'enroll-stage-passed':
                    self.send_image('whorl')
                elif self._abort:
                    pass
                else:
                    self._abort = True
                    self._last_result = 'Unexpected signal values'
                    print('Unexpected signal values')
            elif signal == 'VerifyFingerSelected':
                pass
            elif signal == 'VerifyStatus':
                self._abort = True
                self._last_result = params[0]
                self._verify_stopped = params[1]
            else:
                self._abort = True
                self._last_result = 'Unexpected signal'

        signal_id = self.device.connect('g-signal', signal_cb)

        self.device.Claim('(s)', 'testuser')

        self.device.EnrollStart('(s)', 'right-index-finger')

        self.send_image('whorl')

        self._abort = False
        while not self._abort:
            ctx.iteration(True)

        assert self._last_result == 'enroll-completed'

        self.device.EnrollStop()

        assert os.path.exists(os.path.join(self.state_dir, 'testuser/virtual_image/0/7'))

        # Finger is enrolled, try to verify it
        self.device.VerifyStart('(s)', 'any')

        # Try a wrong print; will stop verification
        self.send_image('tented_arch')
        self._abort = False
        while not self._abort:
            ctx.iteration(True)
        assert self._verify_stopped == True
        assert self._last_result == 'verify-no-match'

        self.device.VerifyStop()
        self.device.VerifyStart('(s)', 'any')

        # Send a retry error (swipe too short); will not stop verification
        self.send_retry()
        self._abort = False
        while not self._abort:
            ctx.iteration(True)
        assert self._verify_stopped == False
        assert self._last_result == 'verify-swipe-too-short'

        # Try the correct print; will stop verification
        self.send_image('whorl')
        self._abort = False
        while not self._abort:
            ctx.iteration(True)
        assert self._verify_stopped == True
        assert self._last_result == 'verify-match'


        # And delete the print(s) again
        self.device.DeleteEnrolledFingers('(s)', 'testuser')

        assert not os.path.exists(os.path.join(self.state_dir, 'testuser/virtual_image/0/7'))

        GLib.source_remove(timeout)

        self.device.disconnect(signal_id)

        self.device.Release()
        self.daemon_stop()
        self.polkitd_stop()

    def test_enroll_delete2(self):
        self.polkitd_start()
        self.daemon_start()

        if self.device is None:
            self.daemon_stop()
            self.polkitd_stop()
            self.skipTest("Need virtual_image device to run the test")

        def timeout_cb(*args):
            # Note: With meson we could just rely on it to kill us
            print("Test timed out, hard exiting")
            sys.exit(1)

        timeout = GLib.timeout_add(get_timeout('test') * 1000, timeout_cb)

        self._polkitd_obj.SetAllowed(['net.reactivated.fprint.device.setusername',
                                      'net.reactivated.fprint.device.enroll',
                                      'net.reactivated.fprint.device.verify'])

        def signal_cb(proxy, sender, signal, params):
            print(signal, params)
            if signal == 'EnrollStatus':
                self._abort = params[1]
                self._last_result = params[0]

                if not self._abort and self._last_result == 'enroll-stage-passed':
                    self.send_image('whorl')
                elif self._abort:
                    pass
                else:
                    self._abort = True
                    self._last_result = 'Unexpected signal values'
                    print('Unexpected signal values')
            elif signal == 'VerifyFingerSelected':
                pass
            elif signal == 'VerifyStatus':
                self._abort = True
                self._last_result = params[0]
                self._verify_stopped = params[1]
            else:
                self._abort = True
                self._last_result = 'Unexpected signal'

        signal_id = self.device.connect('g-signal', signal_cb)

        self.device.Claim('(s)', 'testuser')

        self.device.EnrollStart('(s)', 'right-index-finger')

        self.send_image('whorl')

        self._abort = False
        while not self._abort:
            ctx.iteration(True)

        assert self._last_result == 'enroll-completed'

        self.device.EnrollStop()

        assert os.path.exists(os.path.join(self.state_dir, 'testuser/virtual_image/0/7'))

        # And delete the print(s) again using the new API
        self.device.DeleteEnrolledFingers2()

        assert not os.path.exists(os.path.join(self.state_dir, 'testuser/virtual_image/0/7'))

        GLib.source_remove(timeout)

        self.device.disconnect(signal_id)

        self.device.Release()
        self.daemon_stop()
        self.polkitd_stop()

if __name__ == '__main__':
    if len(sys.argv) == 2 and sys.argv[1] == "list-tests":
        for machine, human in list_tests():
            print("%s %s" % (machine, human), end="\n")
        sys.exit(0)

    unittest.main(verbosity=2)