#!/usr/bin/python3
# Copyright (C) 2008 OpenedHand Ltd
# Copyright (C) 2008 Intel Corporation
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 51 Franklin
# St, Fifth Floor, Boston, MA 02110-1301 USA
# TODO:
# - finish code cleanup
# - currently allowedValueList is not used: could use it to turn
# current char* value to an enum
# - could warn if values outside allowedValueRange are used in *_action_set()
# - add option to generate skeleton source code that uses the bindings?
import os.path, re, xml.etree.ElementTree as ET
from optparse import OptionParser
# upnp type: (C type, GType, type for g_value_get_*
# and g_value_set_*)
typemap = {
'ui1': ('guint ', 'G_TYPE_UINT', 'uint'),
'ui2': ('guint ', 'G_TYPE_UINT', 'uint'),
'ui4': ('guint ', 'G_TYPE_UINT', 'uint'),
'i1': ('gint' , 'G_TYPE_INT', 'int'),
'i2': ('gint ', 'G_TYPE_INT', 'int'),
'i4': ('gint ', 'G_TYPE_INT', 'int'),
'int': ('gint ', 'G_TYPE_INT', 'int'),
'r4': ('gfloat ', 'G_TYPE_FLOAT', 'float'),
'r8': ('gdouble ', 'G_TYPE_DOUBLE', 'double'),
'number': ('gdouble ', 'G_TYPE_DOUBLE', 'double'),
'fixed.14.4': ('gdouble ', 'G_TYPE_DOUBLE', 'double'),
'float': ('gdouble ', 'G_TYPE_DOUBLE', 'double'),
'char': ('gchar ', 'G_TYPE_CHAR', 'char'),
'string': ('gchar *', 'G_TYPE_STRING', 'string'),
'date': ('gchar *', 'GUPNP_TYPE_DATE', 'string'),
'dateTime': ('gchar *', 'GUPNP_TYPE_DATE_TIME', 'string'),
'dateTime.tz': ('gchar *', 'GUPNP_TYPE_DATE_TIME_TZ', 'string'),
'time': ('gchar *', 'GUPNP_TYPE_TIME', 'string'),
'time.tz': ('gchar *', 'GUPNP_TYPE_TIME_TZ', 'string'),
'boolean': ('gboolean ', 'G_TYPE_BOOLEAN', 'boolean'),
'bin.base64': ('gchar *', 'GUPNP_TYPE_BIN_BASE64', 'string'),
'bin.hex': ('gchar *', 'GUPNP_TYPE_BIN_HEX', 'string'),
'uri': ('gchar *', 'GUPNP_TYPE_URI', 'string'),
'uuid': ('gchar *', 'GUPNP_TYPE_UUID', 'string')
}
class Action:
def __init__(self):
self.name = None
self.c_name = None
self.c_prefixed_name = None
self.in_args = []
self.out_args = []
self.notify_vars = []
class Argument:
def __init__(self):
self.name = None
self.c_name = None
self.direction = None
self.related_var = None
class Variable:
def __init__(self):
self.name = None
self.c_name = None
self.c_prefixed_name = None
self.ctype = None
self.gtype = None
self.get_function = None
self.set_function = None
self.notified = True
self.dummy = False
def camelCaseToLowerCase (str):
# http://www.djangosnippets.org/snippets/585/
tmp = re.sub('(((?<=[a-z])[A-Z])|([A-Z](?![A-Z]|$)))', '_\\1', str)
lower_case = tmp.lower().strip('_')
return re.sub('[^a-z0-9]', '_', lower_case)
def getActions(action_elements, prefix, variables):
"""
Parse the action element list into a list of Action objects.
"""
actions = []
for actionElement in action_elements:
a = Action()
actions.append(a)
a.name = actionElement.find("{urn:schemas-upnp-org:service-1-0}name").text
if a.name is None:
raise Exception("No name found for action")
a.c_name = camelCaseToLowerCase (a.name)
a.c_prefixed_name = prefix + a.c_name
for argElement in actionElement.findall("{urn:schemas-upnp-org:service-1-0}argumentList/{urn:schemas-upnp-org:service-1-0}argument"):
arg = Argument()
arg.name = argElement.find("{urn:schemas-upnp-org:service-1-0}name").text
if arg.name is None:
raise Exception("No name found for argument")
arg.c_name = camelCaseToLowerCase (arg.name)
var_name = argElement.find("{urn:schemas-upnp-org:service-1-0}relatedStateVariable").text
for var in variables:
if var.name == var_name:
arg.related_var = var
break
if arg.related_var is None:
raise Exception("%s: related state variable %s not found" % (arg.name, var_name))
arg.direction = argElement.find("{urn:schemas-upnp-org:service-1-0}direction").text
if arg.direction == "in":
a.in_args.append(arg)
else:
a.out_args.append(arg)
return actions
def getVariables(var_elements, prefix):
"""
Parse the state variable element list into a list of Variable objects.
"""
variables = []
for varElement in var_elements:
var = Variable()
variables.append(var)
var.name = varElement.find("{urn:schemas-upnp-org:service-1-0}name").text
if var.name.startswith("A_ARG_TYPE_"):
# dummy state variable, only there to give type info to getter/setter
var.dummy = True
var.c_name = camelCaseToLowerCase (var.name)
var.c_prefixed_name = prefix + var.c_name
if (varElement.get("sendEvents") == "no"):
var.notified = False
dataType = varElement.find("{urn:schemas-upnp-org:service-1-0}dataType").text
if not dataType in typemap:
raise Exception("Unknown dataType %s" % dataType)
(var.ctype, var.gtype, g_value_type) = typemap[dataType];
var.get_function = "g_value_get_" + g_value_type
var.set_function = "g_value_set_" + g_value_type
return variables
def printClientSyncActionBinding(a):
indent = (2 + len (a.c_prefixed_name)) * " "
print("static inline gboolean")
print("%s (GUPnPServiceProxy *proxy," % a.c_prefixed_name)
for arg in a.in_args:
print("%sconst %sin_%s," % (indent, arg.related_var.ctype, arg.c_name))
for arg in a.out_args:
print("%s%s*out_%s," % (indent, arg.related_var.ctype, arg.c_name))
print("%sGError **error)" % indent)
print("{")
print(" return gupnp_service_proxy_send_action")
print(" (proxy, \"%s\", error," % a.name)
for arg in a.in_args:
print(" \"%s\", %s, in_%s," % (arg.name, arg.related_var.gtype, arg.c_name))
print(" NULL,")
for arg in a.out_args:
print(" \"%s\", %s, out_%s," % (arg.name, arg.related_var.gtype, arg.c_name))
print(" NULL);")
print("}\n")
def printClientAsyncActionBinding(a):
# User-callback prototype
indent = (24 + len (a.c_prefixed_name)) * " "
print("typedef void (*%s_reply) (GUPnPServiceProxy *proxy," % a.c_prefixed_name)
for arg in a.out_args:
print("%sconst %sout_%s," % (indent, arg.related_var.ctype, arg.c_name))
print("%sGError *error," % indent)
print("%sgpointer userdata);" % indent)
print("")
# Generated async callback handler, calls user-provided callback
indent = (30 + len (a.c_prefixed_name)) * " "
print("static void _%s_async_callback (GUPnPServiceProxy *proxy," % a.c_prefixed_name)
print("%sGUPnPServiceProxyAction *action," % indent)
print("%sgpointer user_data)" % indent)
print("{")
print(" GUPnPAsyncData *cbdata;")
print(" GError *error = NULL;")
for arg in a.out_args:
print(" %s%s;" % (arg.related_var.ctype, arg.c_name))
print("")
print(" cbdata = (GUPnPAsyncData *) user_data;")
print(" gupnp_service_proxy_end_action")
print(" (proxy, action, &error,")
for arg in a.out_args:
print(" \"%s\", %s, &%s," % (arg.name, arg.related_var.gtype, arg.c_name))
print(" NULL);")
print(" ((%s_reply)cbdata->cb)" % a.c_prefixed_name)
print(" (proxy,")
for arg in a.out_args:
print(" %s," % arg.c_name)
print(" error, cbdata->userdata);")
print("")
print(" g_slice_free1 (sizeof (*cbdata), cbdata);")
print("}")
print("")
# Entry point
indent = (8 + len (a.c_prefixed_name)) * " "
print("static inline GUPnPServiceProxyAction *")
print("%s_async (GUPnPServiceProxy *proxy," % a.c_prefixed_name)
for arg in a.in_args:
print("%sconst %sin_%s," % (indent, arg.related_var.ctype, arg.c_name))
print("%s%s_reply callback," % (indent, a.c_prefixed_name))
print("%sgpointer userdata)" % indent)
print("{")
print(" GUPnPServiceProxyAction* action;")
print(" GUPnPAsyncData *cbdata;")
print("")
print(" cbdata = (GUPnPAsyncData *) g_slice_alloc (sizeof (*cbdata));")
print(" cbdata->cb = G_CALLBACK (callback);")
print(" cbdata->userdata = userdata;")
print(" action = gupnp_service_proxy_begin_action")
print(" (proxy, \"%s\"," % a.name)
print(" _%s_async_callback, cbdata," % a.c_prefixed_name)
for arg in a.in_args:
print(" \"%s\", %s, in_%s," % (arg.name, arg.related_var.gtype, arg.c_name))
print(" NULL);")
print("")
print(" return action;")
print("}")
def printClientVariableNotifyBinding(v):
ctype = re.sub ("^gchar", "const gchar", v.ctype);
# callback prototype
indent = (22 + len (v.c_prefixed_name)) * " "
print("typedef void")
print("(*%s_changed_callback) (GUPnPServiceProxy *proxy," % v.c_prefixed_name)
print("%s%s%s," % (indent, ctype, v.c_name))
print("%sgpointer userdata);" % indent)
print("")
# private callback
indent = (20 + len (v.c_prefixed_name)) * " "
print("static void")
print("_%s_changed_callback (GUPnPServiceProxy *proxy," % v.c_prefixed_name)
print("%sconst gchar *variable," % indent)
print("%sGValue *value," % indent)
print("%sgpointer userdata)" % indent)
print("{")
print(" GUPnPAsyncData *cbdata;")
print(" %s%s;" % (ctype, v.c_name))
print("")
print(" cbdata = (GUPnPAsyncData *) userdata;")
print(" %s = %s (value);" % (v.c_name, v.get_function))
print(" ((%s_changed_callback)cbdata->cb)" % v.c_prefixed_name)
print(" (proxy,")
print(" %s," % v.c_name)
print(" cbdata->userdata);")
print("")
print(" g_slice_free1 (sizeof (*cbdata), cbdata);")
print("}")
print("")
# public notify_add function
indent = (13 + len (v.c_prefixed_name)) * " "
print("static inline gboolean")
print("%s_add_notify (GUPnPServiceProxy *proxy," % v.c_prefixed_name)
print("%s%s_changed_callback callback," % (indent, v.c_prefixed_name))
print("%sgpointer userdata)" % indent)
print("{")
print(" GUPnPAsyncData *cbdata;")
print("")
print(" cbdata = (GUPnPAsyncData *) g_slice_alloc (sizeof (*cbdata));")
print(" cbdata->cb = G_CALLBACK (callback);")
print(" cbdata->userdata = userdata;")
print("")
print(" return gupnp_service_proxy_add_notify")
print(" (proxy,")
print(" \"%s\"," % v.name)
print(" %s," % v.gtype)
print(" _%s_changed_callback," % v.c_prefixed_name)
print(" cbdata);")
print("}")
def printServerVariableQueryBinding(v):
# User callback proto
indent = (28 + len (v.ctype)+ len (v.c_prefixed_name)) * " "
print("typedef %s(*%s_query_callback) (GUPnPService *service," % (v.ctype, v.c_prefixed_name))
print("%sgpointer userdata);" % indent)
print("")
indent = (12 + len (v.c_prefixed_name)) * " "
print("static void")
print("_%s_query_cb (GUPnPService *service," % v.c_prefixed_name)
print("%sgchar *variable," % indent)
print("%sGValue *value," % indent)
print("%sgpointer userdata)" % indent)
print("{")
print(" GUPnPAsyncData *cbdata;")
print(" %s%s;" % (v.ctype, v.c_name))
print("")
indent = (36 + len (v.c_name) + len (v.c_prefixed_name)) * " "
print(" cbdata = (GUPnPAsyncData *) userdata;")
print(" %s = ((%s_query_callback)cbdata->cb) (service," % (v.c_name, v.c_prefixed_name))
print("%scbdata->userdata);" % indent)
print(" g_value_init (value, %s);" % v.gtype)
print(" %s (value, %s);" % (v.set_function, v.c_name))
print("}")
print("")
indent = (16 + len (v.c_prefixed_name)) * " "
print("gulong")
print("%s_query_connect (GUPnPService *service," % v.c_prefixed_name)
print("%s%s_query_callback callback," % (indent, v.c_prefixed_name))
print("%sgpointer userdata)" % indent)
print("{")
print(" GUPnPAsyncData *cbdata;")
print("")
print(" cbdata = (GUPnPAsyncData *) g_slice_alloc (sizeof (*cbdata));")
print(" cbdata->cb = G_CALLBACK (callback);")
print(" cbdata->userdata = userdata;")
print("")
print(" return g_signal_connect_data (service,")
print(" \"query-variable::%s\"," % v.name)
print(" G_CALLBACK (_%s_query_cb)," % v.c_prefixed_name)
print(" cbdata,")
print(" _free_cb_data,")
print(" (GConnectFlags) 0);")
print("}")
print("")
def printServerActionBinding(a):
# getter and setter func for GUPnPServiceAction
indent = (13 + len (a.c_prefixed_name)) * " "
if len (a.in_args) > 0:
print("static inline void")
print("%s_action_get (GUPnPServiceAction *action," % (a.c_prefixed_name))
for arg in a.in_args[:-1]:
print("%s%s*in_%s," % (indent, arg.related_var.ctype, arg.c_name))
print("%s%s*in_%s)" % (indent, a.in_args[-1].related_var.ctype, a.in_args[-1].c_name))
print("{")
print(" gupnp_service_action_get (action,")
for arg in a.in_args:
print(" \"%s\", %s, in_%s," % (arg.name, arg.related_var.gtype, arg.c_name))
print(" NULL);")
print("}")
print("")
if len (a.out_args) > 0:
print("static inline void")
print("%s_action_set (GUPnPServiceAction *action," % (a.c_prefixed_name))
for arg in a.out_args[:-1]:
print("%sconst %sout_%s," % (indent, arg.related_var.ctype, arg.c_name))
print("%sconst %sout_%s)" % (indent, a.out_args[-1].related_var.ctype, a.out_args[-1].c_name))
print("{")
print(" gupnp_service_action_set (action,")
for arg in a.out_args:
print(" \"%s\", %s, out_%s," % (arg.name, arg.related_var.gtype, arg.c_name))
print(" NULL);")
print("}")
print("")
indent = (17 + len (a.c_prefixed_name)) * " "
print("static inline gulong")
print("%s_action_connect (GUPnPService *service," % a.c_prefixed_name)
print("%sGCallback callback," % indent)
print("%sgpointer userdata)" % indent)
print("{")
print(" return g_signal_connect (service,")
print(" \"action-invoked::%s\"," % a.name)
print(" callback,")
print(" userdata);")
print("}")
print("")
def PrintServerVariableNotifyBinding(v):
indent = (18 + len (v.c_prefixed_name)) * " "
print("void")
print("%s_variable_notify (GUPnPService *service," % v.c_prefixed_name)
print("%sconst %s%s)" % (indent ,v.ctype, v.c_name))
print("{")
print(" gupnp_service_notify (service,")
print(" \"%s\", %s, %s," % (v.name, v.gtype, v.c_name))
print(" NULL);")
print("}")
print("")
def parseSCPD(scpd, prefix):
"""
Parse the scpd file into lists of Action and Variable objects.
"""
if prefix != "":
prefix = prefix.lower() + "_"
action_elements = scpd.findall("{urn:schemas-upnp-org:service-1-0}actionList/{urn:schemas-upnp-org:service-1-0}action")
var_elements = scpd.findall("{urn:schemas-upnp-org:service-1-0}serviceStateTable/{urn:schemas-upnp-org:service-1-0}stateVariable")
variables = getVariables(var_elements, prefix)
actions = getActions(action_elements, prefix, variables)
return (actions, variables)
def printClientBindings(binding_name, actions, variables):
define = "GUPNP_GENERATED_CLIENT_BINDING_%s" % binding_name
print("/* Generated by gupnp-binding-tool */")
print("")
print("#include <libgupnp/gupnp.h>")
print("")
print("#ifndef %s" % define)
print("#define %s" % define)
print("")
print("G_BEGIN_DECLS")
print("\n#ifndef __GUPNPASYNCDATA_TYPE__")
print("#define __GUPNPASYNCDATA_TYPE__")
print("typedef struct {GCallback cb; gpointer userdata; } GUPnPAsyncData;")
print("#endif")
for a in actions:
print("\n/* action %s */\n" % a.name)
printClientSyncActionBinding(a)
printClientAsyncActionBinding(a)
for v in variables:
if (not v.dummy) and v.notified:
print("\n/* state variable %s */\n" % v.name)
printClientVariableNotifyBinding(v)
print("")
print("G_END_DECLS")
print("")
print("#endif /* %s */" % define)
def printServerBindings(binding_name, actions, variables):
define = "GUPNP_GENERATED_CLIENT_BINDING_%s" % binding_name
print("/* Generated by gupnp-binding-tool */")
print("")
print("#include <libgupnp/gupnp.h>")
print("")
print("#ifndef %s" % define)
print("#define %s" % define)
print("")
print("G_BEGIN_DECLS")
print("\n#ifndef __GUPNPASYNCDATA_TYPE__")
print("#define __GUPNPASYNCDATA_TYPE__")
print("typedef struct {GCallback cb; gpointer userdata; } GUPnPAsyncData;")
print("#endif")
print("static void")
print("_free_cb_data (gpointer data, GClosure *closure)")
print("{")
print(" GUPnPAsyncData *cbdata = (GUPnPAsyncData *) data;")
print(" g_slice_free1 (sizeof (*cbdata), cbdata);")
print("}")
print("")
for a in actions:
print("\n/* action %s */\n" % a.name)
printServerActionBinding(a)
for v in variables:
if not v.dummy:
print("\n/* state variable %s */\n" % v.name)
printServerVariableQueryBinding(v)
if v.notified:
PrintServerVariableNotifyBinding(v)
print("")
print("G_END_DECLS")
print("")
print("#endif /* %s */" % define)
def main ():
parser = OptionParser("Usage: %prog [options] service-filename")
parser.add_option("-p", "--prefix", dest="prefix",
metavar="PREFIX", default="",
help="set prefix for generated function names")
parser.add_option("-m", "--mode", type="choice", dest="mode",
metavar="MODE", default="client",
choices=("client", "server"),
help="select generation mode, 'client' or 'server'")
(options, args) = parser.parse_args()
if len(args) != 1:
parser.error("Expected 1 argument, got %d" % len(args))
# get a name for header ifdef
if options.prefix == "":
base = re.sub("[^a-zA-Z0-9]", "_", os.path.basename(args[0]))
binding_name = base.upper()
else:
binding_name = options.prefix.upper()
# parse scpd file, extract action list and state variable list
scpd = ET.parse(args[0])
(actions, variables) = parseSCPD (scpd, options.prefix)
if (len(actions) == 0 and len(variables) == 0):
raise Exception ("No actions or variables found in document")
# generate bindings
if (options.mode == "client"):
printClientBindings(binding_name, actions, variables)
else:
printServerBindings(binding_name, actions, variables)
if __name__ == "__main__":
main()