Blame test/test-hfp

Packit 34410b
#!/usr/bin/python
Packit 34410b
Packit 34410b
from __future__ import absolute_import, print_function, unicode_literals
Packit 34410b
Packit 34410b
from optparse import OptionParser, make_option
Packit 34410b
import os
Packit 34410b
from socket import SOCK_SEQPACKET, socket
Packit 34410b
import sys
Packit 34410b
import dbus
Packit 34410b
import dbus.service
Packit 34410b
import dbus.mainloop.glib
Packit 34410b
import glib
Packit 34410b
try:
Packit 34410b
  from gi.repository import GObject
Packit 34410b
except ImportError:
Packit 34410b
  import gobject as GObject
Packit 34410b
Packit 34410b
mainloop = None
Packit 34410b
audio_supported = True
Packit 34410b
Packit 34410b
try:
Packit 34410b
	from socket import AF_BLUETOOTH, BTPROTO_SCO
Packit 34410b
except:
Packit 34410b
	print("WARNING: python compiled without Bluetooth support"
Packit 34410b
					" - audio will not be available")
Packit 34410b
	audio_supported = False
Packit 34410b
Packit 34410b
BUF_SIZE = 1024
Packit 34410b
Packit 34410b
BDADDR_ANY = '00:00:00:00:00:00'
Packit 34410b
Packit 34410b
HF_NREC			= 0x0001
Packit 34410b
HF_3WAY			= 0x0002
Packit 34410b
HF_CLI			= 0x0004
Packit 34410b
HF_VOICE_RECOGNITION	= 0x0008
Packit 34410b
HF_REMOTE_VOL		= 0x0010
Packit 34410b
HF_ENHANCED_STATUS	= 0x0020
Packit 34410b
HF_ENHANCED_CONTROL	= 0x0040
Packit 34410b
HF_CODEC_NEGOTIATION	= 0x0080
Packit 34410b
Packit 34410b
AG_3WAY			= 0x0001
Packit 34410b
AG_NREC			= 0x0002
Packit 34410b
AG_VOICE_RECOGNITION	= 0x0004
Packit 34410b
AG_INBAND_RING		= 0x0008
Packit 34410b
AG_VOICE_TAG		= 0x0010
Packit 34410b
AG_REJECT_CALL		= 0x0020
Packit 34410b
AG_ENHANCED_STATUS	= 0x0040
Packit 34410b
AG_ENHANCED_CONTROL	= 0x0080
Packit 34410b
AG_EXTENDED_RESULT	= 0x0100
Packit 34410b
AG_CODEC_NEGOTIATION	= 0x0200
Packit 34410b
Packit 34410b
HF_FEATURES = (HF_3WAY | HF_CLI | HF_VOICE_RECOGNITION |
Packit 34410b
			HF_REMOTE_VOL | HF_ENHANCED_STATUS |
Packit 34410b
			HF_ENHANCED_CONTROL | HF_CODEC_NEGOTIATION)
Packit 34410b
Packit 34410b
AVAIL_CODECS = "1,2"
Packit 34410b
Packit 34410b
class HfpConnection:
Packit 34410b
	slc_complete = False
Packit 34410b
	fd = None
Packit 34410b
	io_id = 0
Packit 34410b
	version = 0
Packit 34410b
	features = 0
Packit 34410b
	pending = None
Packit 34410b
Packit 34410b
	def disconnect(self):
Packit 34410b
		if (self.fd >= 0):
Packit 34410b
			os.close(self.fd)
Packit 34410b
			self.fd = -1
Packit 34410b
			glib.source_remove(self.io_id)
Packit 34410b
			self.io_id = 0
Packit 34410b
Packit 34410b
	def slc_completed(self):
Packit 34410b
		print("SLC establisment complete")
Packit 34410b
		self.slc_complete = True
Packit 34410b
Packit 34410b
	def slc_next_cmd(self, cmd):
Packit 34410b
		if not cmd:
Packit 34410b
			self.send_cmd("AT+BRSF=%u" % (HF_FEATURES))
Packit 34410b
		elif (cmd.startswith("AT+BRSF")):
Packit 34410b
			if (self.features & AG_CODEC_NEGOTIATION and
Packit 34410b
					HF_FEATURES & HF_CODEC_NEGOTIATION):
Packit 34410b
				self.send_cmd("AT+BAC=%s" % (AVAIL_CODECS))
Packit 34410b
			else:
Packit 34410b
				self.send_cmd("AT+CIND=?")
Packit 34410b
		elif (cmd.startswith("AT+BAC")):
Packit 34410b
			self.send_cmd("AT+CIND=?")
Packit 34410b
		elif (cmd.startswith("AT+CIND=?")):
Packit 34410b
			self.send_cmd("AT+CIND?")
Packit 34410b
		elif (cmd.startswith("AT+CIND?")):
Packit 34410b
			self.send_cmd("AT+CMER=3,0,0,1")
Packit 34410b
		elif (cmd.startswith("AT+CMER=")):
Packit 34410b
			if (HF_FEATURES & HF_3WAY and self.features & AG_3WAY):
Packit 34410b
				self.send_cmd("AT+CHLD=?")
Packit 34410b
			else:
Packit 34410b
				self.slc_completed()
Packit 34410b
		elif (cmd.startswith("AT+CHLD=?")):
Packit 34410b
			self.slc_completed()
Packit 34410b
		else:
Packit 34410b
			print("Unknown SLC command completed: %s" % (cmd))
Packit 34410b
Packit 34410b
	def io_cb(self, fd, cond):
Packit 34410b
		buf = os.read(fd, BUF_SIZE)
Packit 34410b
		buf = buf.strip()
Packit 34410b
Packit 34410b
		print("Received: %s" % (buf))
Packit 34410b
Packit 34410b
		if (buf == "OK" or buf == "ERROR"):
Packit 34410b
			cmd = self.pending
Packit 34410b
			self.pending = None
Packit 34410b
Packit 34410b
			if (not self.slc_complete):
Packit 34410b
				self.slc_next_cmd(cmd)
Packit 34410b
Packit 34410b
			return True
Packit 34410b
Packit 34410b
		parts = buf.split(':')
Packit 34410b
Packit 34410b
		if (parts[0] == "+BRSF"):
Packit 34410b
			self.features = int(parts[1])
Packit 34410b
Packit 34410b
		return True
Packit 34410b
Packit 34410b
	def send_cmd(self, cmd):
Packit 34410b
		if (self.pending):
Packit 34410b
			print("ERROR: Another command is pending")
Packit 34410b
			return
Packit 34410b
Packit 34410b
		print("Sending: %s" % (cmd))
Packit 34410b
Packit 34410b
		os.write(self.fd, cmd + "\r\n")
Packit 34410b
		self.pending = cmd
Packit 34410b
Packit 34410b
	def __init__(self, fd, version, features):
Packit 34410b
		self.fd = fd
Packit 34410b
		self.version = version
Packit 34410b
		self.features = features
Packit 34410b
Packit 34410b
		print("Version 0x%04x Features 0x%04x" % (version, features))
Packit 34410b
Packit 34410b
		self.io_id = glib.io_add_watch(fd, glib.IO_IN, self.io_cb)
Packit 34410b
Packit 34410b
		self.slc_next_cmd(None)
Packit 34410b
Packit 34410b
class HfpProfile(dbus.service.Object):
Packit 34410b
	sco_socket = None
Packit 34410b
	io_id = 0
Packit 34410b
	conns = {}
Packit 34410b
Packit 34410b
	def sco_cb(self, sock, cond):
Packit 34410b
		(sco, peer) = sock.accept()
Packit 34410b
		print("New SCO connection from %s" % (peer))
Packit 34410b
Packit 34410b
	def init_sco(self, sock):
Packit 34410b
		self.sco_socket = sock
Packit 34410b
		self.io_id = glib.io_add_watch(sock, glib.IO_IN, self.sco_cb)
Packit 34410b
Packit 34410b
	def __init__(self, bus, path, sco):
Packit 34410b
		dbus.service.Object.__init__(self, bus, path)
Packit 34410b
Packit 34410b
		if sco:
Packit 34410b
			self.init_sco(sco)
Packit 34410b
Packit 34410b
	@dbus.service.method("org.bluez.Profile1",
Packit 34410b
					in_signature="", out_signature="")
Packit 34410b
	def Release(self):
Packit 34410b
		print("Release")
Packit 34410b
		mainloop.quit()
Packit 34410b
Packit 34410b
	@dbus.service.method("org.bluez.Profile1",
Packit 34410b
					in_signature="", out_signature="")
Packit 34410b
	def Cancel(self):
Packit 34410b
		print("Cancel")
Packit 34410b
Packit 34410b
	@dbus.service.method("org.bluez.Profile1",
Packit 34410b
				in_signature="o", out_signature="")
Packit 34410b
	def RequestDisconnection(self, path):
Packit 34410b
		conn = self.conns.pop(path)
Packit 34410b
		conn.disconnect()
Packit 34410b
Packit 34410b
	@dbus.service.method("org.bluez.Profile1",
Packit 34410b
				in_signature="oha{sv}", out_signature="")
Packit 34410b
	def NewConnection(self, path, fd, properties):
Packit 34410b
		fd = fd.take()
Packit 34410b
		version = 0x0105
Packit 34410b
		features = 0
Packit 34410b
		print("NewConnection(%s, %d)" % (path, fd))
Packit 34410b
		for key in properties.keys():
Packit 34410b
			if key == "Version":
Packit 34410b
				version = properties[key]
Packit 34410b
			elif key == "Features":
Packit 34410b
				features = properties[key]
Packit 34410b
Packit 34410b
		conn = HfpConnection(fd, version, features)
Packit 34410b
Packit 34410b
		self.conns[path] = conn
Packit 34410b
Packit 34410b
if __name__ == '__main__':
Packit 34410b
	dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
Packit 34410b
Packit 34410b
	bus = dbus.SystemBus()
Packit 34410b
Packit 34410b
	manager = dbus.Interface(bus.get_object("org.bluez",
Packit 34410b
				"/org/bluez"), "org.bluez.ProfileManager1")
Packit 34410b
Packit 34410b
	option_list = [
Packit 34410b
			make_option("-p", "--path", action="store",
Packit 34410b
					type="string", dest="path",
Packit 34410b
					default="/bluez/test/hfp"),
Packit 34410b
			make_option("-n", "--name", action="store",
Packit 34410b
					type="string", dest="name",
Packit 34410b
					default=None),
Packit 34410b
			make_option("-C", "--channel", action="store",
Packit 34410b
					type="int", dest="channel",
Packit 34410b
					default=None),
Packit 34410b
			]
Packit 34410b
Packit 34410b
	parser = OptionParser(option_list=option_list)
Packit 34410b
Packit 34410b
	(options, args) = parser.parse_args()
Packit 34410b
Packit 34410b
	mainloop = GObject.MainLoop()
Packit 34410b
Packit 34410b
	opts = {
Packit 34410b
			"Version" : dbus.UInt16(0x0106),
Packit 34410b
			"Features" : dbus.UInt16(HF_FEATURES),
Packit 34410b
		}
Packit 34410b
Packit 34410b
	if (options.name):
Packit 34410b
		opts["Name"] = options.name
Packit 34410b
Packit 34410b
	if (options.channel is not None):
Packit 34410b
		opts["Channel"] = dbus.UInt16(options.channel)
Packit 34410b
Packit 34410b
	if audio_supported:
Packit 34410b
		sco = socket(AF_BLUETOOTH, SOCK_SEQPACKET, BTPROTO_SCO)
Packit 34410b
		sco.bind(BDADDR_ANY)
Packit 34410b
		sco.listen(1)
Packit 34410b
	else:
Packit 34410b
		sco = None
Packit 34410b
Packit 34410b
	profile = HfpProfile(bus, options.path, sco)
Packit 34410b
Packit 34410b
	manager.RegisterProfile(options.path, "hfp-hf", opts)
Packit 34410b
Packit 34410b
	print("Profile registered - waiting for connections")
Packit 34410b
Packit 34410b
	mainloop.run()