|
Packit |
130fc8 |
#!/usr/bin/env python
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
# Copyright (C) 2004 Red Hat Inc. <http://www.redhat.com/>
|
|
Packit |
130fc8 |
# Copyright (C) 2005-2007 Collabora Ltd. <http://www.collabora.co.uk/>
|
|
Packit |
130fc8 |
#
|
|
Packit |
130fc8 |
# Permission is hereby granted, free of charge, to any person
|
|
Packit |
130fc8 |
# obtaining a copy of this software and associated documentation
|
|
Packit |
130fc8 |
# files (the "Software"), to deal in the Software without
|
|
Packit |
130fc8 |
# restriction, including without limitation the rights to use, copy,
|
|
Packit |
130fc8 |
# modify, merge, publish, distribute, sublicense, and/or sell copies
|
|
Packit |
130fc8 |
# of the Software, and to permit persons to whom the Software is
|
|
Packit |
130fc8 |
# furnished to do so, subject to the following conditions:
|
|
Packit |
130fc8 |
#
|
|
Packit |
130fc8 |
# The above copyright notice and this permission notice shall be
|
|
Packit |
130fc8 |
# included in all copies or substantial portions of the Software.
|
|
Packit |
130fc8 |
#
|
|
Packit |
130fc8 |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
|
Packit |
130fc8 |
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
|
Packit |
130fc8 |
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
|
Packit |
130fc8 |
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
|
|
Packit |
130fc8 |
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
|
Packit |
130fc8 |
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
Packit |
130fc8 |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
|
Packit |
130fc8 |
# DEALINGS IN THE SOFTWARE.
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
from __future__ import print_function
|
|
Packit |
130fc8 |
import os
|
|
Packit |
130fc8 |
import unittest
|
|
Packit |
130fc8 |
import time
|
|
Packit |
130fc8 |
import logging
|
|
Packit |
130fc8 |
import weakref
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
import dbus
|
|
Packit |
130fc8 |
import _dbus_bindings
|
|
Packit |
130fc8 |
import dbus.glib
|
|
Packit |
130fc8 |
import dbus.service
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
from dbus._compat import is_py2, is_py3
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
try:
|
|
Packit |
130fc8 |
from gi.repository import GObject as gobject
|
|
Packit |
130fc8 |
except ImportError:
|
|
Packit |
130fc8 |
raise SystemExit(77)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
logging.basicConfig()
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
if 'DBUS_TEST_UNINSTALLED' in os.environ:
|
|
Packit |
130fc8 |
builddir = os.path.normpath(os.environ["DBUS_TOP_BUILDDIR"])
|
|
Packit |
130fc8 |
pydir = os.path.normpath(os.environ["DBUS_TOP_SRCDIR"])
|
|
Packit |
130fc8 |
pkg = dbus.__file__
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
if not pkg.startswith(pydir):
|
|
Packit |
130fc8 |
raise Exception("DBus modules (%s) are not being picked up from the "
|
|
Packit |
130fc8 |
"package" % pkg)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
if not _dbus_bindings.__file__.startswith(builddir):
|
|
Packit |
130fc8 |
raise Exception("DBus modules (%s) are not being picked up from the "
|
|
Packit |
130fc8 |
"package" % _dbus_bindings.__file__)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
test_types_vals = [1, 12323231, 3.14159265, 99999999.99,
|
|
Packit |
130fc8 |
"dude", "123", "What is all the fuss about?", "gob@gob.com",
|
|
Packit |
130fc8 |
'\\u310c\\u310e\\u3114', '\\u0413\\u0414\\u0415',
|
|
Packit |
130fc8 |
'\\u2200software \\u2203crack', '\\xf4\\xe5\\xe8',
|
|
Packit |
130fc8 |
[1,2,3], ["how", "are", "you"], [1.23,2.3], [1], ["Hello"],
|
|
Packit |
130fc8 |
(1,2,3), (1,), (1,"2",3), ("2", "what"), ("you", 1.2),
|
|
Packit |
130fc8 |
{1:"a", 2:"b"}, {"a":1, "b":2}, #{"a":(1,"B")},
|
|
Packit |
130fc8 |
{1:1.1, 2:2.2}, [[1,2,3],[2,3,4]], [["a","b"],["c","d"]],
|
|
Packit |
130fc8 |
True, False,
|
|
Packit |
130fc8 |
dbus.Int16(-10), dbus.UInt16(10), 'SENTINEL',
|
|
Packit |
130fc8 |
#([1,2,3],"c", 1.2, ["a","b","c"], {"a": (1,"v"), "b": (2,"d")})
|
|
Packit |
130fc8 |
]
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
NAME = "org.freedesktop.DBus.TestSuitePythonService"
|
|
Packit |
130fc8 |
IFACE = "org.freedesktop.DBus.TestSuiteInterface"
|
|
Packit |
130fc8 |
OBJECT = "/org/freedesktop/DBus/TestSuitePythonObject"
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
class TestDBusBindings(unittest.TestCase):
|
|
Packit |
130fc8 |
def setUp(self):
|
|
Packit |
130fc8 |
self.bus = dbus.SessionBus()
|
|
Packit |
130fc8 |
self.remote_object = self.bus.get_object(NAME, OBJECT)
|
|
Packit |
130fc8 |
self.remote_object_follow = self.bus.get_object(NAME, OBJECT,
|
|
Packit |
130fc8 |
follow_name_owner_changes=True)
|
|
Packit |
130fc8 |
self.iface = dbus.Interface(self.remote_object, IFACE)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testGObject(self):
|
|
Packit |
130fc8 |
print("Testing ExportedGObject... ", end='')
|
|
Packit |
130fc8 |
remote_gobject = self.bus.get_object(NAME, OBJECT + '/GObject')
|
|
Packit |
130fc8 |
iface = dbus.Interface(remote_gobject, IFACE)
|
|
Packit |
130fc8 |
print("introspection, ", end='')
|
|
Packit |
130fc8 |
remote_gobject.Introspect(dbus_interface=dbus.INTROSPECTABLE_IFACE)
|
|
Packit |
130fc8 |
print("method call, ", end='')
|
|
Packit |
130fc8 |
self.assertEqual(iface.Echo('123'), '123')
|
|
Packit |
130fc8 |
print("... OK")
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testWeakRefs(self):
|
|
Packit |
130fc8 |
# regression test for Sugar crash caused by smcv getting weak refs
|
|
Packit |
130fc8 |
# wrong - pre-bugfix, this would segfault
|
|
Packit |
130fc8 |
bus = dbus.SessionBus(private=True)
|
|
Packit |
130fc8 |
ref = weakref.ref(bus)
|
|
Packit |
130fc8 |
self.assertTrue(ref() is bus)
|
|
Packit |
130fc8 |
del bus
|
|
Packit |
130fc8 |
self.assertTrue(ref() is None)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testInterfaceKeyword(self):
|
|
Packit |
130fc8 |
#test dbus_interface parameter
|
|
Packit |
130fc8 |
print(self.remote_object.Echo("dbus_interface on Proxy test Passed",
|
|
Packit |
130fc8 |
dbus_interface = IFACE))
|
|
Packit |
130fc8 |
print(self.iface.Echo("dbus_interface on Interface test Passed",
|
|
Packit |
130fc8 |
dbus_interface = IFACE))
|
|
Packit |
130fc8 |
self.assertTrue(True)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testGetDBusMethod(self):
|
|
Packit |
130fc8 |
self.assertEqual(self.iface.get_dbus_method('AcceptListOfByte')(b'\1\2\3'), [1,2,3])
|
|
Packit |
130fc8 |
self.assertEqual(self.remote_object.get_dbus_method('AcceptListOfByte', dbus_interface=IFACE)(b'\1\2\3'), [1,2,3])
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testCallingConventionOptions(self):
|
|
Packit |
130fc8 |
self.assertEqual(self.iface.AcceptListOfByte(b'\1\2\3'), [1,2,3])
|
|
Packit |
130fc8 |
self.assertEqual(self.iface.AcceptListOfByte(b'\1\2\3', byte_arrays=True), b'\1\2\3')
|
|
Packit |
130fc8 |
self.assertEqual(self.iface.AcceptByteArray(b'\1\2\3'), [1,2,3])
|
|
Packit |
130fc8 |
self.assertEqual(self.iface.AcceptByteArray(b'\1\2\3', byte_arrays=True), b'\1\2\3')
|
|
Packit |
130fc8 |
if is_py2:
|
|
Packit |
130fc8 |
self.assertTrue(isinstance(self.iface.AcceptUTF8String('abc'), unicode))
|
|
Packit |
130fc8 |
self.assertTrue(isinstance(self.iface.AcceptUTF8String('abc', utf8_strings=True), str))
|
|
Packit |
130fc8 |
unicode_type = (str if is_py3 else unicode)
|
|
Packit |
130fc8 |
self.assertTrue(isinstance(self.iface.AcceptUnicodeString('abc'),
|
|
Packit |
130fc8 |
unicode_type))
|
|
Packit |
130fc8 |
kwargs = {}
|
|
Packit |
130fc8 |
if is_py2:
|
|
Packit |
130fc8 |
kwargs['utf8_strings'] = True
|
|
Packit |
130fc8 |
self.assertTrue(isinstance(self.iface.AcceptUnicodeString('abc', **kwargs), str))
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testIntrospection(self):
|
|
Packit |
130fc8 |
#test introspection
|
|
Packit |
130fc8 |
print("\n********* Introspection Test ************")
|
|
Packit |
130fc8 |
print(self.remote_object.Introspect(
|
|
Packit |
130fc8 |
dbus_interface="org.freedesktop.DBus.Introspectable"))
|
|
Packit |
130fc8 |
print("Introspection test passed")
|
|
Packit |
130fc8 |
self.assertTrue(True)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testMultiPathIntrospection(self):
|
|
Packit |
130fc8 |
# test introspection on an object exported in multiple places
|
|
Packit |
130fc8 |
# https://bugs.freedesktop.org/show_bug.cgi?id=11794
|
|
Packit |
130fc8 |
remote_object = self.bus.get_object(NAME, OBJECT + '/Multi1')
|
|
Packit |
130fc8 |
remote_object.Introspect(dbus_interface="org.freedesktop.DBus.Introspectable")
|
|
Packit |
130fc8 |
remote_object = self.bus.get_object(NAME, OBJECT + '/Multi2')
|
|
Packit |
130fc8 |
remote_object.Introspect(dbus_interface="org.freedesktop.DBus.Introspectable")
|
|
Packit |
130fc8 |
remote_object = self.bus.get_object(NAME, OBJECT + '/Multi2/3')
|
|
Packit |
130fc8 |
remote_object.Introspect(dbus_interface="org.freedesktop.DBus.Introspectable")
|
|
Packit |
130fc8 |
self.assertTrue(True)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testPythonTypes(self):
|
|
Packit |
130fc8 |
#test sending python types and getting them back
|
|
Packit |
130fc8 |
print("\n********* Testing Python Types ***********")
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
for send_val in test_types_vals:
|
|
Packit |
130fc8 |
print("Testing %s"% str(send_val))
|
|
Packit |
130fc8 |
recv_val = self.iface.Echo(send_val)
|
|
Packit |
130fc8 |
self.assertEqual(send_val, recv_val)
|
|
Packit |
130fc8 |
self.assertEqual(recv_val.variant_level, 1)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testMethodExtraInfoKeywords(self):
|
|
Packit |
130fc8 |
print("Testing MethodExtraInfoKeywords...")
|
|
Packit |
130fc8 |
sender, path, destination, message_cls = self.iface.MethodExtraInfoKeywords()
|
|
Packit |
130fc8 |
self.assertTrue(sender.startswith(':'))
|
|
Packit |
130fc8 |
self.assertEqual(path, '/org/freedesktop/DBus/TestSuitePythonObject')
|
|
Packit |
130fc8 |
# we're using the "early binding" form of get_object (without
|
|
Packit |
130fc8 |
# follow_name_owner_changes), so the destination we actually sent it
|
|
Packit |
130fc8 |
# to will be the unique name
|
|
Packit |
130fc8 |
self.assertTrue(destination.startswith(':'))
|
|
Packit |
130fc8 |
self.assertEqual(message_cls, 'dbus.lowlevel.MethodCallMessage')
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testUtf8StringsSync(self):
|
|
Packit |
130fc8 |
if is_py3:
|
|
Packit |
130fc8 |
return
|
|
Packit |
130fc8 |
send_val = 'foo'
|
|
Packit |
130fc8 |
recv_val = self.iface.Echo(send_val, utf8_strings=True)
|
|
Packit |
130fc8 |
self.assertTrue(isinstance(recv_val, str))
|
|
Packit |
130fc8 |
self.assertTrue(isinstance(recv_val, dbus.UTF8String))
|
|
Packit |
130fc8 |
recv_val = self.iface.Echo(send_val, utf8_strings=False)
|
|
Packit |
130fc8 |
self.assertTrue(isinstance(recv_val, unicode))
|
|
Packit |
130fc8 |
self.assertTrue(isinstance(recv_val, dbus.String))
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testBenchmarkIntrospect(self):
|
|
Packit |
130fc8 |
print("\n********* Benchmark Introspect ************")
|
|
Packit |
130fc8 |
a = time.time()
|
|
Packit |
130fc8 |
print(a)
|
|
Packit |
130fc8 |
print(self.iface.GetComplexArray())
|
|
Packit |
130fc8 |
b = time.time()
|
|
Packit |
130fc8 |
print(b)
|
|
Packit |
130fc8 |
print("Delta: %f" % (b - a))
|
|
Packit |
130fc8 |
self.assertTrue(True)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testAsyncCalls(self):
|
|
Packit |
130fc8 |
#test sending python types and getting them back async
|
|
Packit |
130fc8 |
print("\n********* Testing Async Calls ***********")
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
failures = []
|
|
Packit |
130fc8 |
main_loop = gobject.MainLoop()
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
class async_check:
|
|
Packit |
130fc8 |
def __init__(self, test_controler, expected_result, do_exit, **kwargs):
|
|
Packit |
130fc8 |
self.expected_result = expected_result
|
|
Packit |
130fc8 |
self.do_exit = do_exit
|
|
Packit |
130fc8 |
self.test_controler = test_controler
|
|
Packit |
130fc8 |
if is_py2:
|
|
Packit |
130fc8 |
self.utf8 = kwargs['utf8']
|
|
Packit |
130fc8 |
elif 'utf8' in kwargs:
|
|
Packit |
130fc8 |
raise TypeError("unexpected keyword argument 'utf8'")
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def callback(self, val):
|
|
Packit |
130fc8 |
try:
|
|
Packit |
130fc8 |
if self.do_exit:
|
|
Packit |
130fc8 |
main_loop.quit()
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
self.test_controler.assertEqual(val, self.expected_result)
|
|
Packit |
130fc8 |
self.test_controler.assertEqual(val.variant_level, 1)
|
|
Packit |
130fc8 |
if is_py2:
|
|
Packit |
130fc8 |
if self.utf8 and not isinstance(val, dbus.UTF8String):
|
|
Packit |
130fc8 |
failures.append('%r should have been utf8 but was not' % val)
|
|
Packit |
130fc8 |
return
|
|
Packit |
130fc8 |
elif not self.utf8 and isinstance(val, dbus.UTF8String):
|
|
Packit |
130fc8 |
failures.append('%r should not have been utf8' % val)
|
|
Packit |
130fc8 |
return
|
|
Packit |
130fc8 |
except Exception as e:
|
|
Packit |
130fc8 |
failures.append("%s:\n%s" % (e.__class__, e))
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def error_handler(self, error):
|
|
Packit |
130fc8 |
print(error)
|
|
Packit |
130fc8 |
if self.do_exit:
|
|
Packit |
130fc8 |
main_loop.quit()
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
failures.append('%s: %s' % (error.__class__, error))
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
last_type = test_types_vals[-1]
|
|
Packit |
130fc8 |
for send_val in test_types_vals:
|
|
Packit |
130fc8 |
print("Testing %s" % str(send_val))
|
|
Packit |
130fc8 |
kwargs = {}
|
|
Packit |
130fc8 |
if is_py2:
|
|
Packit |
130fc8 |
utf8 = (send_val == 'gob@gob.com')
|
|
Packit |
130fc8 |
kwargs['utf8'] = utf8
|
|
Packit |
130fc8 |
kwargs['utf8_strings'] = utf8
|
|
Packit |
130fc8 |
check = async_check(self, send_val, last_type == send_val,
|
|
Packit |
130fc8 |
**kwargs)
|
|
Packit |
130fc8 |
recv_val = self.iface.Echo(send_val,
|
|
Packit |
130fc8 |
reply_handler=check.callback,
|
|
Packit |
130fc8 |
error_handler=check.error_handler,
|
|
Packit |
130fc8 |
**kwargs)
|
|
Packit |
130fc8 |
main_loop.run()
|
|
Packit |
130fc8 |
if failures:
|
|
Packit |
130fc8 |
self.assertTrue(False, failures)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testStrictMarshalling(self):
|
|
Packit |
130fc8 |
print("\n********* Testing strict return & signal marshalling ***********")
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
# these values are the same as in the server, and the
|
|
Packit |
130fc8 |
# methods should only succeed when they are called with
|
|
Packit |
130fc8 |
# the right value number, because they have out_signature
|
|
Packit |
130fc8 |
# decorations, and return an unmatching type when called
|
|
Packit |
130fc8 |
# with a different number
|
|
Packit |
130fc8 |
values = ["", ("",""), ("","",""), [], {}, ["",""], ["","",""]]
|
|
Packit |
130fc8 |
methods = [
|
|
Packit |
130fc8 |
(self.iface.ReturnOneString, 'SignalOneString', set([0]), set([0])),
|
|
Packit |
130fc8 |
(self.iface.ReturnTwoStrings, 'SignalTwoStrings', set([1, 5]), set([1])),
|
|
Packit |
130fc8 |
(self.iface.ReturnStruct, 'SignalStruct', set([1, 5]), set([1])),
|
|
Packit |
130fc8 |
# all of our test values are sequences so will marshall correctly
|
|
Packit |
130fc8 |
# into an array :P
|
|
Packit |
130fc8 |
(self.iface.ReturnArray, 'SignalArray', set(range(len(values))), set([3, 5, 6])),
|
|
Packit |
130fc8 |
(self.iface.ReturnDict, 'SignalDict', set([0, 3, 4]), set([4]))
|
|
Packit |
130fc8 |
]
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
for (method, signal, success_values, return_values) in methods:
|
|
Packit |
130fc8 |
print("\nTrying correct behaviour of", method._method_name)
|
|
Packit |
130fc8 |
for value in range(len(values)):
|
|
Packit |
130fc8 |
try:
|
|
Packit |
130fc8 |
ret = method(value)
|
|
Packit |
130fc8 |
except Exception as e:
|
|
Packit |
130fc8 |
print("%s(%r) raised %s: %s" %
|
|
Packit |
130fc8 |
(method._method_name, values[value], e.__class__, e))
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
# should fail if it tried to marshal the wrong type
|
|
Packit |
130fc8 |
self.assertTrue(value not in success_values,
|
|
Packit |
130fc8 |
"%s should succeed when we ask it to "
|
|
Packit |
130fc8 |
"return %r\n%s\n%s" % (
|
|
Packit |
130fc8 |
method._method_name, values[value],
|
|
Packit |
130fc8 |
e.__class__, e))
|
|
Packit |
130fc8 |
else:
|
|
Packit |
130fc8 |
print("%s(%r) returned %r" % (
|
|
Packit |
130fc8 |
method._method_name, values[value], ret))
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
# should only succeed if it's the right return type
|
|
Packit |
130fc8 |
self.assertTrue(value in success_values,
|
|
Packit |
130fc8 |
"%s should fail when we ask it to "
|
|
Packit |
130fc8 |
"return %r" % (
|
|
Packit |
130fc8 |
method._method_name, values[value]))
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
# check the value is right too :D
|
|
Packit |
130fc8 |
returns = map(lambda n: values[n], return_values)
|
|
Packit |
130fc8 |
self.assertTrue(ret in returns,
|
|
Packit |
130fc8 |
"%s should return one of %r but it "
|
|
Packit |
130fc8 |
"returned %r instead" % (
|
|
Packit |
130fc8 |
method._method_name, returns, ret))
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
print("\nTrying correct emission of", signal)
|
|
Packit |
130fc8 |
for value in range(len(values)):
|
|
Packit |
130fc8 |
try:
|
|
Packit |
130fc8 |
self.iface.EmitSignal(signal, value)
|
|
Packit |
130fc8 |
except Exception as e:
|
|
Packit |
130fc8 |
print("EmitSignal(%s, %r) raised %s" % (signal, values[value], e.__class__))
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
# should fail if it tried to marshal the wrong type
|
|
Packit |
130fc8 |
self.assertTrue(value not in success_values, "EmitSignal(%s) should succeed when we ask it to return %r\n%s\n%s" % (signal, values[value], e.__class__, e))
|
|
Packit |
130fc8 |
else:
|
|
Packit |
130fc8 |
print("EmitSignal(%s, %r) appeared to succeed" % (signal, values[value]))
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
# should only succeed if it's the right return type
|
|
Packit |
130fc8 |
self.assertTrue(value in success_values, "EmitSignal(%s) should fail when we ask it to return %r" % (signal, values[value]))
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
# FIXME: wait for the signal here
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
print()
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testInheritance(self):
|
|
Packit |
130fc8 |
print("\n********* Testing inheritance from dbus.method.Interface ***********")
|
|
Packit |
130fc8 |
ret = self.iface.CheckInheritance()
|
|
Packit |
130fc8 |
print("CheckInheritance returned %s" % ret)
|
|
Packit |
130fc8 |
self.assertTrue(ret, "overriding CheckInheritance from TestInterface failed")
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testAsyncMethods(self):
|
|
Packit |
130fc8 |
print("\n********* Testing asynchronous method implementation *******")
|
|
Packit |
130fc8 |
for async in (True, False):
|
|
Packit |
130fc8 |
for fail in (True, False):
|
|
Packit |
130fc8 |
try:
|
|
Packit |
130fc8 |
val = ('a', 1, False, [1,2], {1:2})
|
|
Packit |
130fc8 |
print("calling AsynchronousMethod with %s %s %s" % (async, fail, val))
|
|
Packit |
130fc8 |
ret = self.iface.AsynchronousMethod(async, fail, val)
|
|
Packit |
130fc8 |
except Exception as e:
|
|
Packit |
130fc8 |
self.assertTrue(fail, '%s: %s' % (e.__class__, e))
|
|
Packit |
130fc8 |
print("Expected failure: %s: %s" % (e.__class__, e))
|
|
Packit |
130fc8 |
else:
|
|
Packit |
130fc8 |
self.assertTrue(not fail, 'Expected failure but succeeded?!')
|
|
Packit |
130fc8 |
self.assertEqual(val, ret)
|
|
Packit |
130fc8 |
self.assertEqual(1, ret.variant_level)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testBusInstanceCaching(self):
|
|
Packit |
130fc8 |
print("\n********* Testing dbus.Bus instance sharing *********")
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
# unfortunately we can't test the system bus here
|
|
Packit |
130fc8 |
# but the codepaths are the same
|
|
Packit |
130fc8 |
for (cls, type, func) in ((dbus.SessionBus, dbus.Bus.TYPE_SESSION, dbus.Bus.get_session), (dbus.StarterBus, dbus.Bus.TYPE_STARTER, dbus.Bus.get_starter)):
|
|
Packit |
130fc8 |
print("\nTesting %s:" % cls.__name__)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
share_cls = cls()
|
|
Packit |
130fc8 |
share_type = dbus.Bus(bus_type=type)
|
|
Packit |
130fc8 |
share_func = func()
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
private_cls = cls(private=True)
|
|
Packit |
130fc8 |
private_type = dbus.Bus(bus_type=type, private=True)
|
|
Packit |
130fc8 |
private_func = func(private=True)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
print(" - checking shared instances are the same...")
|
|
Packit |
130fc8 |
self.assertTrue(share_cls == share_type, '%s should equal %s' % (share_cls, share_type))
|
|
Packit |
130fc8 |
self.assertTrue(share_type == share_func, '%s should equal %s' % (share_type, share_func))
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
print(" - checking private instances are distinct from the shared instance...")
|
|
Packit |
130fc8 |
self.assertTrue(share_cls != private_cls, '%s should not equal %s' % (share_cls, private_cls))
|
|
Packit |
130fc8 |
self.assertTrue(share_type != private_type, '%s should not equal %s' % (share_type, private_type))
|
|
Packit |
130fc8 |
self.assertTrue(share_func != private_func, '%s should not equal %s' % (share_func, private_func))
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
print(" - checking private instances are distinct from each other...")
|
|
Packit |
130fc8 |
self.assertTrue(private_cls != private_type, '%s should not equal %s' % (private_cls, private_type))
|
|
Packit |
130fc8 |
self.assertTrue(private_type != private_func, '%s should not equal %s' % (private_type, private_func))
|
|
Packit |
130fc8 |
self.assertTrue(private_func != private_cls, '%s should not equal %s' % (private_func, private_cls))
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testSenderName(self):
|
|
Packit |
130fc8 |
print('\n******** Testing sender name keyword ********')
|
|
Packit |
130fc8 |
myself = self.iface.WhoAmI()
|
|
Packit |
130fc8 |
print("I am", myself)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testBusGetNameOwner(self):
|
|
Packit |
130fc8 |
ret = self.bus.get_name_owner(NAME)
|
|
Packit |
130fc8 |
self.assertTrue(ret.startswith(':'), ret)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testBusListNames(self):
|
|
Packit |
130fc8 |
ret = self.bus.list_names()
|
|
Packit |
130fc8 |
self.assertTrue(NAME in ret, ret)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testBusListActivatableNames(self):
|
|
Packit |
130fc8 |
ret = self.bus.list_activatable_names()
|
|
Packit |
130fc8 |
self.assertTrue(NAME in ret, ret)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testBusNameHasOwner(self):
|
|
Packit |
130fc8 |
self.assertTrue(self.bus.name_has_owner(NAME))
|
|
Packit |
130fc8 |
self.assertTrue(not self.bus.name_has_owner('badger.mushroom.snake'))
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testBusNameCreation(self):
|
|
Packit |
130fc8 |
print('\n******** Testing BusName creation ********')
|
|
Packit |
130fc8 |
test = [('org.freedesktop.DBus.Python.TestName', True),
|
|
Packit |
130fc8 |
('org.freedesktop.DBus.Python.TestName', True),
|
|
Packit |
130fc8 |
('org.freedesktop.DBus.Python.InvalidName&^*%$', False)]
|
|
Packit |
130fc8 |
# Do some more intelligent handling/testing of queueing vs success?
|
|
Packit |
130fc8 |
# ('org.freedesktop.DBus.TestSuitePythonService', False)]
|
|
Packit |
130fc8 |
# For some reason this actually succeeds
|
|
Packit |
130fc8 |
# ('org.freedesktop.DBus', False)]
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
# make a method call to ensure the test service is active
|
|
Packit |
130fc8 |
self.iface.Echo("foo")
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
names = {}
|
|
Packit |
130fc8 |
for (name, succeed) in test:
|
|
Packit |
130fc8 |
try:
|
|
Packit |
130fc8 |
print("requesting %s" % name)
|
|
Packit |
130fc8 |
busname = dbus.service.BusName(name, dbus.SessionBus())
|
|
Packit |
130fc8 |
except Exception as e:
|
|
Packit |
130fc8 |
print("%s:\n%s" % (e.__class__, e))
|
|
Packit |
130fc8 |
self.assertTrue(not succeed, 'did not expect registering bus name %s to fail' % name)
|
|
Packit |
130fc8 |
else:
|
|
Packit |
130fc8 |
print(busname)
|
|
Packit |
130fc8 |
self.assertTrue(succeed, 'expected registering bus name %s to fail'% name)
|
|
Packit |
130fc8 |
if name in names:
|
|
Packit |
130fc8 |
self.assertTrue(names[name] == busname, 'got a new instance for same name %s' % name)
|
|
Packit |
130fc8 |
print("instance of %s re-used, good!" % name)
|
|
Packit |
130fc8 |
else:
|
|
Packit |
130fc8 |
names[name] = busname
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
del busname
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
print()
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
del names
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
bus = dbus.Bus()
|
|
Packit |
130fc8 |
ret = bus.name_has_owner('org.freedesktop.DBus.Python.TestName')
|
|
Packit |
130fc8 |
self.assertTrue(not ret, 'deleting reference failed to release BusName org.freedesktop.DBus.Python.TestName')
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testMultipleReturnWithoutSignature(self):
|
|
Packit |
130fc8 |
# https://bugs.freedesktop.org/show_bug.cgi?id=10174
|
|
Packit |
130fc8 |
ret = self.iface.MultipleReturnWithoutSignature()
|
|
Packit |
130fc8 |
self.assertTrue(not isinstance(ret, dbus.Struct), repr(ret))
|
|
Packit |
130fc8 |
self.assertEqual(ret, ('abc', 123))
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testListExportedChildObjects(self):
|
|
Packit |
130fc8 |
self.assertTrue(self.iface.TestListExportedChildObjects())
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testRemoveFromConnection(self):
|
|
Packit |
130fc8 |
# https://bugs.freedesktop.org/show_bug.cgi?id=10457
|
|
Packit |
130fc8 |
self.assertTrue(not self.iface.HasRemovableObject())
|
|
Packit |
130fc8 |
self.assertTrue(self.iface.AddRemovableObject())
|
|
Packit |
130fc8 |
self.assertTrue(self.iface.HasRemovableObject())
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
removable = self.bus.get_object(NAME, OBJECT + '/RemovableObject')
|
|
Packit |
130fc8 |
iface = dbus.Interface(removable, IFACE)
|
|
Packit |
130fc8 |
self.assertTrue(iface.IsThere())
|
|
Packit |
130fc8 |
self.assertTrue(iface.RemoveSelf())
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
self.assertTrue(not self.iface.HasRemovableObject())
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
# and again...
|
|
Packit |
130fc8 |
self.assertTrue(self.iface.AddRemovableObject())
|
|
Packit |
130fc8 |
self.assertTrue(self.iface.HasRemovableObject())
|
|
Packit |
130fc8 |
self.assertTrue(iface.IsThere())
|
|
Packit |
130fc8 |
self.assertTrue(iface.RemoveSelf())
|
|
Packit |
130fc8 |
self.assertTrue(not self.iface.HasRemovableObject())
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testFallbackObjectTrivial(self):
|
|
Packit |
130fc8 |
obj = self.bus.get_object(NAME, OBJECT + '/Fallback')
|
|
Packit |
130fc8 |
iface = dbus.Interface(obj, IFACE)
|
|
Packit |
130fc8 |
path, rel, unique_name = iface.TestPathAndConnKeywords()
|
|
Packit |
130fc8 |
self.assertEqual(path, OBJECT + '/Fallback')
|
|
Packit |
130fc8 |
self.assertEqual(rel, '/')
|
|
Packit |
130fc8 |
self.assertEqual(unique_name, obj.bus_name)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testFallbackObjectNested(self):
|
|
Packit |
130fc8 |
obj = self.bus.get_object(NAME, OBJECT + '/Fallback/Nested')
|
|
Packit |
130fc8 |
iface = dbus.Interface(obj, IFACE)
|
|
Packit |
130fc8 |
path, rel, unique_name = iface.TestPathAndConnKeywords()
|
|
Packit |
130fc8 |
self.assertEqual(path, OBJECT + '/Fallback/Nested')
|
|
Packit |
130fc8 |
self.assertEqual(rel, '/')
|
|
Packit |
130fc8 |
self.assertEqual(unique_name, obj.bus_name)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
obj = self.bus.get_object(NAME, OBJECT + '/Fallback/Nested/Badger/Mushroom')
|
|
Packit |
130fc8 |
iface = dbus.Interface(obj, IFACE)
|
|
Packit |
130fc8 |
path, rel, unique_name = iface.TestPathAndConnKeywords()
|
|
Packit |
130fc8 |
self.assertEqual(path, OBJECT + '/Fallback/Nested/Badger/Mushroom')
|
|
Packit |
130fc8 |
self.assertEqual(rel, '/Badger/Mushroom')
|
|
Packit |
130fc8 |
self.assertEqual(unique_name, obj.bus_name)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testFallbackObject(self):
|
|
Packit |
130fc8 |
obj = self.bus.get_object(NAME, OBJECT + '/Fallback/Badger/Mushroom')
|
|
Packit |
130fc8 |
iface = dbus.Interface(obj, IFACE)
|
|
Packit |
130fc8 |
path, rel, unique_name = iface.TestPathAndConnKeywords()
|
|
Packit |
130fc8 |
self.assertEqual(path, OBJECT + '/Fallback/Badger/Mushroom')
|
|
Packit |
130fc8 |
self.assertEqual(rel, '/Badger/Mushroom')
|
|
Packit |
130fc8 |
self.assertEqual(unique_name, obj.bus_name)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testTimeoutSync(self):
|
|
Packit |
130fc8 |
self.assertTrue(self.iface.BlockFor500ms(timeout=1.0) is None)
|
|
Packit |
130fc8 |
self.assertRaises(dbus.DBusException,
|
|
Packit |
130fc8 |
lambda: self.iface.BlockFor500ms(timeout=0.25))
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testAsyncRaise(self):
|
|
Packit |
130fc8 |
self.assertRaises(dbus.DBusException, self.iface.AsyncRaise)
|
|
Packit |
130fc8 |
try:
|
|
Packit |
130fc8 |
self.iface.AsyncRaise()
|
|
Packit |
130fc8 |
except dbus.DBusException as e:
|
|
Packit |
130fc8 |
self.assertTrue(e.get_dbus_name() ==
|
|
Packit |
130fc8 |
'org.freedesktop.bugzilla.bug12403',
|
|
Packit |
130fc8 |
e.get_dbus_name())
|
|
Packit |
130fc8 |
else:
|
|
Packit |
130fc8 |
self.assertTrue(False)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testClosePrivateBus(self):
|
|
Packit |
130fc8 |
# fd.o #12096
|
|
Packit |
130fc8 |
dbus.Bus(private=True).close()
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testTimeoutAsyncClient(self):
|
|
Packit |
130fc8 |
loop = gobject.MainLoop()
|
|
Packit |
130fc8 |
passes = []
|
|
Packit |
130fc8 |
fails = []
|
|
Packit |
130fc8 |
def correctly_returned():
|
|
Packit |
130fc8 |
passes.append('1000')
|
|
Packit |
130fc8 |
if len(passes) + len(fails) >= 2:
|
|
Packit |
130fc8 |
loop.quit()
|
|
Packit |
130fc8 |
def correctly_failed(exc):
|
|
Packit |
130fc8 |
passes.append('250')
|
|
Packit |
130fc8 |
if len(passes) + len(fails) >= 2:
|
|
Packit |
130fc8 |
loop.quit()
|
|
Packit |
130fc8 |
def incorrectly_returned():
|
|
Packit |
130fc8 |
fails.append('250')
|
|
Packit |
130fc8 |
if len(passes) + len(fails) >= 2:
|
|
Packit |
130fc8 |
loop.quit()
|
|
Packit |
130fc8 |
def incorrectly_failed(exc):
|
|
Packit |
130fc8 |
fails.append('1000')
|
|
Packit |
130fc8 |
if len(passes) + len(fails) >= 2:
|
|
Packit |
130fc8 |
loop.quit()
|
|
Packit |
130fc8 |
self.iface.BlockFor500ms(timeout=1.0,
|
|
Packit |
130fc8 |
reply_handler=correctly_returned,
|
|
Packit |
130fc8 |
error_handler=incorrectly_failed)
|
|
Packit |
130fc8 |
self.iface.BlockFor500ms(timeout=0.25,
|
|
Packit |
130fc8 |
reply_handler=incorrectly_returned,
|
|
Packit |
130fc8 |
error_handler=correctly_failed)
|
|
Packit |
130fc8 |
loop.run()
|
|
Packit |
130fc8 |
self.assertEqual(passes, ['250', '1000'])
|
|
Packit |
130fc8 |
self.assertEqual(fails, [])
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testTimeoutAsyncService(self):
|
|
Packit |
130fc8 |
self.assertTrue(self.iface.AsyncWait500ms(timeout=1.0) is None)
|
|
Packit |
130fc8 |
self.assertRaises(dbus.DBusException,
|
|
Packit |
130fc8 |
lambda: self.iface.AsyncWait500ms(timeout=0.25))
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testExceptions(self):
|
|
Packit |
130fc8 |
#self.assertRaises(dbus.DBusException,
|
|
Packit |
130fc8 |
# lambda: self.iface.RaiseValueError)
|
|
Packit |
130fc8 |
#self.assertRaises(dbus.DBusException,
|
|
Packit |
130fc8 |
# lambda: self.iface.RaiseDBusExceptionNoTraceback)
|
|
Packit |
130fc8 |
#self.assertRaises(dbus.DBusException,
|
|
Packit |
130fc8 |
# lambda: self.iface.RaiseDBusExceptionWithTraceback)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
try:
|
|
Packit |
130fc8 |
self.iface.RaiseValueError()
|
|
Packit |
130fc8 |
except Exception as e:
|
|
Packit |
130fc8 |
self.assertTrue(isinstance(e, dbus.DBusException), e.__class__)
|
|
Packit |
130fc8 |
self.assertTrue('.ValueError: Traceback ' in str(e),
|
|
Packit |
130fc8 |
'Wanted a traceback but got:\n"""%s"""' % str(e))
|
|
Packit |
130fc8 |
else:
|
|
Packit |
130fc8 |
raise AssertionError('Wanted an exception')
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
try:
|
|
Packit |
130fc8 |
self.iface.RaiseDBusExceptionNoTraceback()
|
|
Packit |
130fc8 |
except Exception as e:
|
|
Packit |
130fc8 |
self.assertTrue(isinstance(e, dbus.DBusException), e.__class__)
|
|
Packit |
130fc8 |
self.assertEqual(e.get_dbus_name(),
|
|
Packit |
130fc8 |
'com.example.Networking.ServerError')
|
|
Packit |
130fc8 |
self.assertEqual(str(e),
|
|
Packit |
130fc8 |
'com.example.Networking.ServerError: '
|
|
Packit |
130fc8 |
'Server not responding')
|
|
Packit |
130fc8 |
else:
|
|
Packit |
130fc8 |
raise AssertionError('Wanted an exception')
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
try:
|
|
Packit |
130fc8 |
self.iface.RaiseDBusExceptionWithTraceback()
|
|
Packit |
130fc8 |
except Exception as e:
|
|
Packit |
130fc8 |
self.assertTrue(isinstance(e, dbus.DBusException), e.__class__)
|
|
Packit |
130fc8 |
self.assertEqual(e.get_dbus_name(),
|
|
Packit |
130fc8 |
'com.example.Misc.RealityFailure')
|
|
Packit |
130fc8 |
self.assertTrue(str(e).startswith('com.example.Misc.RealityFailure: '
|
|
Packit |
130fc8 |
'Traceback '),
|
|
Packit |
130fc8 |
'Wanted a traceback but got:\n%s' % str(e))
|
|
Packit |
130fc8 |
else:
|
|
Packit |
130fc8 |
raise AssertionError('Wanted an exception')
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
""" Remove this for now
|
|
Packit |
130fc8 |
class TestDBusPythonToGLibBindings(unittest.TestCase):
|
|
Packit |
130fc8 |
def setUp(self):
|
|
Packit |
130fc8 |
self.bus = dbus.SessionBus()
|
|
Packit |
130fc8 |
self.remote_object = self.bus.get_object("org.freedesktop.DBus.TestSuiteGLibService", "/org/freedesktop/DBus/Tests/MyTestObject")
|
|
Packit |
130fc8 |
self.iface = dbus.Interface(self.remote_object, "org.freedesktop.DBus.Tests.MyObject")
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testIntrospection(self):
|
|
Packit |
130fc8 |
#test introspection
|
|
Packit |
130fc8 |
print "\n********* Introspection Test ************"
|
|
Packit |
130fc8 |
print self.remote_object.Introspect(dbus_interface="org.freedesktop.DBus.Introspectable")
|
|
Packit |
130fc8 |
print "Introspection test passed"
|
|
Packit |
130fc8 |
self.assertTrue(True)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testCalls(self):
|
|
Packit |
130fc8 |
print "\n********* Call Test ************"
|
|
Packit |
130fc8 |
result = self.iface.ManyArgs(1000, 'Hello GLib', 2)
|
|
Packit |
130fc8 |
print result
|
|
Packit |
130fc8 |
self.assertTrue(result == [2002.0, 'HELLO GLIB'])
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
arg0 = {"Dude": 1, "john": "palmieri", "python": 2.4}
|
|
Packit |
130fc8 |
result = self.iface.ManyStringify(arg0)
|
|
Packit |
130fc8 |
print result
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
print "Call test passed"
|
|
Packit |
130fc8 |
self.assertTrue(True)
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
def testPythonTypes(self):
|
|
Packit |
130fc8 |
print "\n********* Testing Python Types ***********"
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
for send_val in test_types_vals:
|
|
Packit |
130fc8 |
print "Testing %s"% str(send_val)
|
|
Packit |
130fc8 |
recv_val = self.iface.EchoVariant(send_val)
|
|
Packit |
130fc8 |
self.assertEqual(send_val, recv_val.object)
|
|
Packit |
130fc8 |
"""
|
|
Packit |
130fc8 |
if __name__ == '__main__':
|
|
Packit |
130fc8 |
gobject.threads_init()
|
|
Packit |
130fc8 |
dbus.glib.init_threads()
|
|
Packit |
130fc8 |
|
|
Packit |
130fc8 |
unittest.main()
|