Blob Blame History Raw
#!/usr/bin/python

from __future__ import absolute_import, print_function, unicode_literals

from optparse import OptionParser, make_option
import os
from socket import SOCK_SEQPACKET, socket
import sys
import dbus
import dbus.service
import dbus.mainloop.glib
import glib
try:
  from gi.repository import GObject
except ImportError:
  import gobject as GObject

mainloop = None
audio_supported = True

try:
	from socket import AF_BLUETOOTH, BTPROTO_SCO
except:
	print("WARNING: python compiled without Bluetooth support"
					" - audio will not be available")
	audio_supported = False

BUF_SIZE = 1024

BDADDR_ANY = '00:00:00:00:00:00'

HF_NREC			= 0x0001
HF_3WAY			= 0x0002
HF_CLI			= 0x0004
HF_VOICE_RECOGNITION	= 0x0008
HF_REMOTE_VOL		= 0x0010
HF_ENHANCED_STATUS	= 0x0020
HF_ENHANCED_CONTROL	= 0x0040
HF_CODEC_NEGOTIATION	= 0x0080

AG_3WAY			= 0x0001
AG_NREC			= 0x0002
AG_VOICE_RECOGNITION	= 0x0004
AG_INBAND_RING		= 0x0008
AG_VOICE_TAG		= 0x0010
AG_REJECT_CALL		= 0x0020
AG_ENHANCED_STATUS	= 0x0040
AG_ENHANCED_CONTROL	= 0x0080
AG_EXTENDED_RESULT	= 0x0100
AG_CODEC_NEGOTIATION	= 0x0200

HF_FEATURES = (HF_3WAY | HF_CLI | HF_VOICE_RECOGNITION |
			HF_REMOTE_VOL | HF_ENHANCED_STATUS |
			HF_ENHANCED_CONTROL | HF_CODEC_NEGOTIATION)

AVAIL_CODECS = "1,2"

class HfpConnection:
	slc_complete = False
	fd = None
	io_id = 0
	version = 0
	features = 0
	pending = None

	def disconnect(self):
		if (self.fd >= 0):
			os.close(self.fd)
			self.fd = -1
			glib.source_remove(self.io_id)
			self.io_id = 0

	def slc_completed(self):
		print("SLC establisment complete")
		self.slc_complete = True

	def slc_next_cmd(self, cmd):
		if not cmd:
			self.send_cmd("AT+BRSF=%u" % (HF_FEATURES))
		elif (cmd.startswith("AT+BRSF")):
			if (self.features & AG_CODEC_NEGOTIATION and
					HF_FEATURES & HF_CODEC_NEGOTIATION):
				self.send_cmd("AT+BAC=%s" % (AVAIL_CODECS))
			else:
				self.send_cmd("AT+CIND=?")
		elif (cmd.startswith("AT+BAC")):
			self.send_cmd("AT+CIND=?")
		elif (cmd.startswith("AT+CIND=?")):
			self.send_cmd("AT+CIND?")
		elif (cmd.startswith("AT+CIND?")):
			self.send_cmd("AT+CMER=3,0,0,1")
		elif (cmd.startswith("AT+CMER=")):
			if (HF_FEATURES & HF_3WAY and self.features & AG_3WAY):
				self.send_cmd("AT+CHLD=?")
			else:
				self.slc_completed()
		elif (cmd.startswith("AT+CHLD=?")):
			self.slc_completed()
		else:
			print("Unknown SLC command completed: %s" % (cmd))

	def io_cb(self, fd, cond):
		buf = os.read(fd, BUF_SIZE)
		buf = buf.strip()

		print("Received: %s" % (buf))

		if (buf == "OK" or buf == "ERROR"):
			cmd = self.pending
			self.pending = None

			if (not self.slc_complete):
				self.slc_next_cmd(cmd)

			return True

		parts = buf.split(':')

		if (parts[0] == "+BRSF"):
			self.features = int(parts[1])

		return True

	def send_cmd(self, cmd):
		if (self.pending):
			print("ERROR: Another command is pending")
			return

		print("Sending: %s" % (cmd))

		os.write(self.fd, cmd + "\r\n")
		self.pending = cmd

	def __init__(self, fd, version, features):
		self.fd = fd
		self.version = version
		self.features = features

		print("Version 0x%04x Features 0x%04x" % (version, features))

		self.io_id = glib.io_add_watch(fd, glib.IO_IN, self.io_cb)

		self.slc_next_cmd(None)

class HfpProfile(dbus.service.Object):
	sco_socket = None
	io_id = 0
	conns = {}

	def sco_cb(self, sock, cond):
		(sco, peer) = sock.accept()
		print("New SCO connection from %s" % (peer))

	def init_sco(self, sock):
		self.sco_socket = sock
		self.io_id = glib.io_add_watch(sock, glib.IO_IN, self.sco_cb)

	def __init__(self, bus, path, sco):
		dbus.service.Object.__init__(self, bus, path)

		if sco:
			self.init_sco(sco)

	@dbus.service.method("org.bluez.Profile1",
					in_signature="", out_signature="")
	def Release(self):
		print("Release")
		mainloop.quit()

	@dbus.service.method("org.bluez.Profile1",
					in_signature="", out_signature="")
	def Cancel(self):
		print("Cancel")

	@dbus.service.method("org.bluez.Profile1",
				in_signature="o", out_signature="")
	def RequestDisconnection(self, path):
		conn = self.conns.pop(path)
		conn.disconnect()

	@dbus.service.method("org.bluez.Profile1",
				in_signature="oha{sv}", out_signature="")
	def NewConnection(self, path, fd, properties):
		fd = fd.take()
		version = 0x0105
		features = 0
		print("NewConnection(%s, %d)" % (path, fd))
		for key in properties.keys():
			if key == "Version":
				version = properties[key]
			elif key == "Features":
				features = properties[key]

		conn = HfpConnection(fd, version, features)

		self.conns[path] = conn

if __name__ == '__main__':
	dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

	bus = dbus.SystemBus()

	manager = dbus.Interface(bus.get_object("org.bluez",
				"/org/bluez"), "org.bluez.ProfileManager1")

	option_list = [
			make_option("-p", "--path", action="store",
					type="string", dest="path",
					default="/bluez/test/hfp"),
			make_option("-n", "--name", action="store",
					type="string", dest="name",
					default=None),
			make_option("-C", "--channel", action="store",
					type="int", dest="channel",
					default=None),
			]

	parser = OptionParser(option_list=option_list)

	(options, args) = parser.parse_args()

	mainloop = GObject.MainLoop()

	opts = {
			"Version" : dbus.UInt16(0x0106),
			"Features" : dbus.UInt16(HF_FEATURES),
		}

	if (options.name):
		opts["Name"] = options.name

	if (options.channel is not None):
		opts["Channel"] = dbus.UInt16(options.channel)

	if audio_supported:
		sco = socket(AF_BLUETOOTH, SOCK_SEQPACKET, BTPROTO_SCO)
		sco.bind(BDADDR_ANY)
		sco.listen(1)
	else:
		sco = None

	profile = HfpProfile(bus, options.path, sco)

	manager.RegisterProfile(options.path, "hfp-hf", opts)

	print("Profile registered - waiting for connections")

	mainloop.run()