|
Packit |
130fc8 |
#!/usr/bin/env python
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
# Copyright (C) 2004 Red Hat Inc. <http://www.redhat.com/>
|
|
Packit |
130fc8 |
# Copyright (C) 2005-2007 Collabora Ltd. <http://www.collabora.co.uk/>
|
|
Packit |
130fc8 |
#
|
|
Packit |
130fc8 |
# Permission is hereby granted, free of charge, to any person
|
|
Packit |
130fc8 |
# obtaining a copy of this software and associated documentation
|
|
Packit |
130fc8 |
# files (the "Software"), to deal in the Software without
|
|
Packit |
130fc8 |
# restriction, including without limitation the rights to use, copy,
|
|
Packit |
130fc8 |
# modify, merge, publish, distribute, sublicense, and/or sell copies
|
|
Packit |
130fc8 |
# of the Software, and to permit persons to whom the Software is
|
|
Packit |
130fc8 |
# furnished to do so, subject to the following conditions:
|
|
Packit |
130fc8 |
#
|
|
Packit |
130fc8 |
# The above copyright notice and this permission notice shall be
|
|
Packit |
130fc8 |
# included in all copies or substantial portions of the Software.
|
|
Packit |
130fc8 |
#
|
|
Packit |
130fc8 |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
|
Packit |
130fc8 |
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
|
Packit |
130fc8 |
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
|
Packit |
130fc8 |
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
|
Packit |
130fc8 |
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
|
Packit |
130fc8 |
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
Packit |
130fc8 |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
|
Packit |
130fc8 |
# DEALINGS IN THE SOFTWARE.
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
import sys
|
|
Packit |
130fc8 |
import os
|
|
Packit |
130fc8 |
import unittest
|
|
Packit |
130fc8 |
import time
|
|
Packit |
130fc8 |
import logging
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
import dbus
|
|
Packit |
130fc8 |
import _dbus_bindings
|
|
Packit |
130fc8 |
import dbus.glib
|
|
Packit |
130fc8 |
import dbus.service
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
try:
|
|
Packit |
130fc8 |
from gi.repository import GObject as gobject
|
|
Packit |
130fc8 |
except ImportError:
|
|
Packit |
130fc8 |
raise SystemExit(77)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
logging.basicConfig()
|
|
Packit |
130fc8 |
logging.getLogger().setLevel(1)
|
|
Packit |
130fc8 |
logger = logging.getLogger('test-signals')
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
if 'DBUS_TEST_UNINSTALLED' in os.environ:
|
|
Packit |
130fc8 |
builddir = os.path.normpath(os.environ["DBUS_TOP_BUILDDIR"])
|
|
Packit |
130fc8 |
pydir = os.path.normpath(os.environ["DBUS_TOP_SRCDIR"])
|
|
Packit |
130fc8 |
pkg = dbus.__file__
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
if not pkg.startswith(pydir):
|
|
Packit |
130fc8 |
raise Exception("DBus modules (%s) are not being picked up from the "
|
|
Packit |
130fc8 |
"package" % pkg)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
if not _dbus_bindings.__file__.startswith(builddir):
|
|
Packit |
130fc8 |
raise Exception("DBus modules (%s) are not being picked up from the "
|
|
Packit |
130fc8 |
"package" % _dbus_bindings.__file__)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
NAME = "org.freedesktop.DBus.TestSuitePythonService"
|
|
Packit |
130fc8 |
IFACE = "org.freedesktop.DBus.TestSuiteInterface"
|
|
Packit |
130fc8 |
OBJECT = "/org/freedesktop/DBus/TestSuitePythonObject"
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
class TestSignals(unittest.TestCase):
|
|
Packit |
130fc8 |
def setUp(self):
|
|
Packit |
130fc8 |
logger.info('setUp()')
|
|
Packit |
130fc8 |
self.bus = dbus.SessionBus()
|
|
Packit |
130fc8 |
self.remote_object = self.bus.get_object(NAME, OBJECT)
|
|
Packit |
130fc8 |
self.remote_object_fallback_trivial = self.bus.get_object(NAME,
|
|
Packit |
130fc8 |
OBJECT + '/Fallback')
|
|
Packit |
130fc8 |
self.remote_object_fallback = self.bus.get_object(NAME,
|
|
Packit |
130fc8 |
OBJECT + '/Fallback/Badger')
|
|
Packit |
130fc8 |
self.remote_object_follow = self.bus.get_object(NAME, OBJECT,
|
|
Packit |
130fc8 |
follow_name_owner_changes=True)
|
|
Packit |
130fc8 |
self.iface = dbus.Interface(self.remote_object, IFACE)
|
|
Packit |
130fc8 |
self.iface_follow = dbus.Interface(self.remote_object_follow, IFACE)
|
|
Packit |
130fc8 |
self.fallback_iface = dbus.Interface(self.remote_object_fallback, IFACE)
|
|
Packit |
130fc8 |
self.fallback_trivial_iface = dbus.Interface(
|
|
Packit |
130fc8 |
self.remote_object_fallback_trivial, IFACE)
|
|
Packit |
130fc8 |
self.in_test = None
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def signal_test_impl(self, iface, name, test_removal=False):
|
|
Packit |
130fc8 |
self.in_test = name
|
|
Packit |
130fc8 |
# using append rather than assignment here to avoid scoping issues
|
|
Packit |
130fc8 |
result = []
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def _timeout_handler():
|
|
Packit |
130fc8 |
logger.debug('_timeout_handler for %s: current state %s', name, self.in_test)
|
|
Packit |
130fc8 |
if self.in_test == name:
|
|
Packit |
130fc8 |
main_loop.quit()
|
|
Packit |
130fc8 |
def _signal_handler(s, sender, path):
|
|
Packit |
130fc8 |
logger.debug('_signal_handler for %s: current state %s', name, self.in_test)
|
|
Packit |
130fc8 |
if self.in_test not in (name, name + '+removed'):
|
|
Packit |
130fc8 |
return
|
|
Packit |
130fc8 |
logger.info('Received signal from %s:%s, argument is %r',
|
|
Packit |
130fc8 |
sender, path, s)
|
|
Packit |
130fc8 |
result.append('received')
|
|
Packit |
130fc8 |
main_loop.quit()
|
|
Packit |
130fc8 |
def _rm_timeout_handler():
|
|
Packit |
130fc8 |
logger.debug('_timeout_handler for %s: current state %s', name, self.in_test)
|
|
Packit |
130fc8 |
if self.in_test == name + '+removed':
|
|
Packit |
130fc8 |
main_loop.quit()
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
logger.info('Testing %s', name)
|
|
Packit |
130fc8 |
match = iface.connect_to_signal('SignalOneString', _signal_handler,
|
|
Packit |
130fc8 |
sender_keyword='sender',
|
|
Packit |
130fc8 |
path_keyword='path')
|
|
Packit |
130fc8 |
logger.info('Waiting for signal...')
|
|
Packit |
130fc8 |
iface.EmitSignal('SignalOneString', 0)
|
|
Packit |
130fc8 |
source_id = gobject.timeout_add(1000, _timeout_handler)
|
|
Packit |
130fc8 |
main_loop.run()
|
|
Packit |
130fc8 |
if not result:
|
|
Packit |
130fc8 |
raise AssertionError('Signal did not arrive within 1 second')
|
|
Packit |
130fc8 |
logger.debug('Removing match')
|
|
Packit |
130fc8 |
match.remove()
|
|
Packit |
130fc8 |
gobject.source_remove(source_id)
|
|
Packit |
130fc8 |
if test_removal:
|
|
Packit |
130fc8 |
self.in_test = name + '+removed'
|
|
Packit |
130fc8 |
logger.info('Testing %s', name)
|
|
Packit |
130fc8 |
result = []
|
|
Packit |
130fc8 |
iface.EmitSignal('SignalOneString', 0)
|
|
Packit |
130fc8 |
source_id = gobject.timeout_add(1000, _rm_timeout_handler)
|
|
Packit |
130fc8 |
main_loop.run()
|
|
Packit |
130fc8 |
if result:
|
|
Packit |
130fc8 |
raise AssertionError('Signal should not have arrived, but did')
|
|
Packit |
130fc8 |
gobject.source_remove(source_id)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testFallback(self):
|
|
Packit |
130fc8 |
self.signal_test_impl(self.fallback_iface, 'Fallback')
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testFallbackTrivial(self):
|
|
Packit |
130fc8 |
self.signal_test_impl(self.fallback_trivial_iface, 'FallbackTrivial')
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testSignal(self):
|
|
Packit |
130fc8 |
self.signal_test_impl(self.iface, 'Signal')
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testRemoval(self):
|
|
Packit |
130fc8 |
self.signal_test_impl(self.iface, 'Removal', True)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testSignalAgain(self):
|
|
Packit |
130fc8 |
self.signal_test_impl(self.iface, 'SignalAgain')
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testRemovalAgain(self):
|
|
Packit |
130fc8 |
self.signal_test_impl(self.iface, 'RemovalAgain', True)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testSignalF(self):
|
|
Packit |
130fc8 |
self.signal_test_impl(self.iface_follow, 'Signal')
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testRemovalF(self):
|
|
Packit |
130fc8 |
self.signal_test_impl(self.iface_follow, 'Removal', True)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testSignalAgainF(self):
|
|
Packit |
130fc8 |
self.signal_test_impl(self.iface_follow, 'SignalAgain')
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testRemovalAgainF(self):
|
|
Packit |
130fc8 |
self.signal_test_impl(self.iface_follow, 'RemovalAgain', True)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
if __name__ == '__main__':
|
|
Packit |
130fc8 |
main_loop = gobject.MainLoop()
|
|
Packit |
130fc8 |
gobject.threads_init()
|
|
Packit |
130fc8 |
dbus.glib.init_threads()
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
logger.info('Starting unit test')
|
|
Packit |
130fc8 |
unittest.main()
|