distribution/packages/jelos/sources/scripts/batocera-bluetooth-agent
Tom Pratt 5ad2cf4673 Bluetooth boot and sleep handling
Also reduce bluetooth agent sleep duration. Init dependencies mean its not that sensitive.
2022-10-29 00:33:45 +02:00

461 lines
14 KiB
Python
Executable file

#!/usr/bin/python
from __future__ import absolute_import, print_function
from optparse import OptionParser, make_option
import dbus
import dbus.service
import dbus.mainloop.glib
import bluezutils
import time
import os
import logging
import signal
from gi.repository import GLib
AGENT_INTERFACE = 'org.bluez.Agent1'
gadapter = None
gdiscovering = False
gdevices = {}
g_devices_seen = {}
logging.basicConfig(filename='/var/log/bluetooth-agent.log', level=logging.DEBUG, format='%(asctime)s %(message)s')
def bool2str(val, valiftrue, valiffalse):
if val:
return valiftrue
else:
return valiffalse
def logging_status(msg):
with open("/var/run/bt_status", "w") as file:
file.write(msg + "\n")
def connect_device(path, address, properties, forceConnect, typeInput, typeAudio):
global gdiscovering
devName = ""
trusted = False
connected = False
# avoid devices without interesting information
if "Trusted" not in properties:
return
if "Connected" not in properties:
return
rproperties = {}
# Get the adapter name and bt mac
for key, value in properties.items():
if type(value) is dbus.String:
value = value.encode('ascii', 'replace')
rproperties[key] = value
trusted = rproperties["Trusted"]
paired = rproperties["Paired"]
devName = getDevName(rproperties)
shortDevName = getShortDevName(rproperties)
connected = rproperties["Connected"]
# skip non input devices
if "Icon" not in properties:
logging.info("Skipping device {} (no type)".format(getDevName(rproperties)));
return
if not ( (typeInput and properties["Icon"].startswith("input")) or (typeAudio and properties["Icon"].startswith("audio-card")) ):
logging.info("Skipping device {} because of type {}".format(getDevName(rproperties), properties["Icon"]));
return
blacklistAddr, blacklistName = getBlacklistAddrAndName(properties)
# add device to the seen list
if blacklistAddr not in g_devices_seen:
g_devices_seen[blacklistAddr] = { "name": blacklistName }
updateKnownDevicesList(g_devices_seen)
# skip blacklisted devices
if blacklistAddr is not None and isDeviceBlacklisted(blacklistAddr):
logging.info("Skipping blacklisted device {}".format(blacklistName))
return
logging.info("event for " + devName + "(paired=" + bool2str(paired, "paired", "not paired") + ", trusted=" + bool2str(trusted, "trusted", "untrusted") + ", connected=" + bool2str(connected, "connected", "disconnected") + ")")
# skipping connected devices
if paired and trusted and connected:
logging.info("Skipping already connected device {}".format(getDevName(rproperties)));
return
if not paired:
if connected == False and gdiscovering:
doParing(address, devName, shortDevName)
return
# now it is paired
if not trusted and (gdiscovering or forceConnect):
logging.info("Trusting (" + devName + ")")
logging_status("Trusting " + shortDevName + "...")
bluezProps = dbus.Interface(bus.get_object("org.bluez", path), "org.freedesktop.DBus.Properties")
bluezProps.Set("org.bluez.Device1", "Trusted", True)
# now it is paired and trusted
# Connect if Trusted and paired
if not connected or forceConnect:
doConnect(address, devName, shortDevName)
def doParing(address, devName, shortDevName):
logging.info("Pairing... (" + devName + ")")
logging_status("Pairing " + shortDevName + "...")
device = bluezutils.find_device(address)
try:
device.Pair()
except Exception as e:
logging.info("Pairing failed (" + devName + ")")
logging_status("Pairing failed (" + shortDevName + ")")
def doConnect(address, devName, shortDevName):
global gadapter
global gdiscovering
try:
# discovery stopped during connection to help some devices
if gdiscovering:
logging.info("Stop discovery")
gadapter.StopDiscovery()
device = bluezutils.find_device(address)
ntry=5
while ntry > 0:
ntry = ntry -1
try:
logging.info("Connecting... (" + devName + ")")
logging_status("Connecting " + shortDevName + "...")
device.Connect()
logging.info("Connected successfully (" + devName + ")")
logging_status("Connected successfully (" + shortDevName + ")")
if gdiscovering:
logging.info("Start discovery")
gadapter.StartDiscovery()
return
except dbus.exceptions.DBusException as err:
logging.info("dbus: " + err.get_dbus_message())
time.sleep(1)
except Exception as err:
logging.info("Connection failed (" + devName + ")")
time.sleep(1)
logging.info("Connection failed. Give up. (" + devName + ")")
logging_status("Connection failed. Give up. (" + shortDevName + ")")
if gdiscovering:
logging.info("Start discovery")
gadapter.StartDiscovery()
except Exception as e:
if gdiscovering:
logging.info("Start discovery")
gadapter.StartDiscovery()
# don't raise, while startdiscovery doesn't like it
#raise e
def getDevName(properties):
#devName = properties["Name"] + " (" + properties["Address"] + ", " + properties["Icon"] + ")"
#devStatus = "Trusted=" + str(properties["Trusted"]) + ", paired=" + str(properties["Paired"]) + ", connected=" + str(properties["Connected"]), ", blocked=" + str(properties["Blocked"])
#devTech = "legacyPairing: " + str(properties["LegacyPairing"]) # + ", RSSI: " + properties["RSSI"]
if "Name" in properties and "Address" in properties and "Icon" in properties:
return str(properties["Name"]) + " (" + str(properties["Address"]) + ", " + str(properties["Icon"]) + ")"
if "Name" in properties and "Address" in properties:
return str(properties["Name"]) + " (" + str(properties["Address"]) + ")"
if "Name" in properties and "Icon" in properties:
return str(properties["Name"]) + " (" + str(properties["Icon"]) + ")"
if "Name" in properties:
return str(properties["Name"])
if "Address" in properties and "Icon" in properties:
return str(properties["Address"]) + " (" + str(properties["Icon"]) + ")"
if "Address" in properties:
return str(properties["Address"])
if "Icon" in properties:
return str(properties["Icon"])
return "unknown"
def updateKnownDevicesList(g_devices_seen):
f = open("/var/run/bluetooth_seen", "w")
for key in g_devices_seen:
f.write("{} {}\n".format(key, g_devices_seen[key]["name"]))
f.close()
def isDeviceBlacklisted(blacklistAddr):
datafile = "/var/lib/bluetooth/bluetooth_blacklisted"
# reread the file each time in case somebody changed it
if not os.path.isfile(datafile):
return False
with open(datafile) as file:
lines = file.readlines()
for line in lines:
if line[0:len(blacklistAddr)] == blacklistAddr:
return True
return False
def getBlacklistAddrAndName(properties):
if "Address" not in properties:
return None
address = str(properties["Address"])
name = ""
if "Name" in properties:
name = str(properties["Name"])
elif "Icon" in properties:
name = str(properties["Icon"])
else:
name = "unknown"
return address, name
def getShortDevName(properties):
if "Name" in properties:
return str(properties["Name"])
if "Address" in properties:
return str(properties["Address"])
if "Icon" in properties:
return str(properties["Icon"])
return "unknown"
def getBluetoothWantedTypes():
if not os.path.isfile("/var/run/bt_types"):
logging.info("no bt_types file")
return True, True
btype = ""
with open("/var/run/bt_types", "r") as file:
btype = file.read().strip()
logging.info("bt_type: {}".format(btype))
if btype == "":
return True, True
if btype == "pad":
return True, False
if btype == "audio":
return False, True
return False, False
def interfaces_added(path, interfaces):
global gdevices
if "org.bluez.Device1" not in interfaces:
return
if not interfaces["org.bluez.Device1"]:
return
properties = interfaces["org.bluez.Device1"]
if path in gdevices:
gdevices[path] = merge2dicts(gdevices[path], properties)
else:
gdevices[path] = properties
if "Address" in gdevices[path]:
typeInput, typeAudio = getBluetoothWantedTypes()
connect_device(path, properties["Address"], gdevices[path], False, typeInput, typeAudio)
def properties_changed(interface, changed, invalidated, path):
global gdevices
if interface != "org.bluez.Device1":
return
if path in gdevices:
gdevices[path] = merge2dicts(gdevices[path], changed)
else:
gdevices[path] = changed
#logging.info("Properties changed:")
#logging.info(changed)
#logging.info(invalidated)
if "Paired" in changed and changed["Paired"] == True:
# ok, do as in simple-agent, trust and connect
typeInput, typeAudio = getBluetoothWantedTypes()
connect_device(path, gdevices[path]["Address"], gdevices[path], True, typeInput, typeAudio)
return
# ok, it is now connected, what else ?
if "Connected" in changed and changed["Connected"] == True:
return
if "Connected" in changed and changed["Connected"] == False:
logging.info("Skipping (property Connected changed to False)");
return
if "Address" in gdevices[path]:
typeInput, typeAudio = getBluetoothWantedTypes()
connect_device(path, gdevices[path]["Address"], gdevices[path], False, typeInput, typeAudio)
def merge2dicts(d1, d2):
res = d1.copy()
res.update(d2)
return res
def user_signal_start_discovery(signum, frame):
global gdiscovering
global gadapter
try:
if gdiscovering == False:
gdiscovering = True
logging.info("Start discovery (signal)")
gadapter.StartDiscovery()
except:
pass
def user_signal_stop_discovery(signum, frame):
global gdiscovering
global gadapter
try:
if gdiscovering:
gdiscovering = False
logging.info("Stop discovery (signal)")
gadapter.StopDiscovery()
except:
pass
class Agent(dbus.service.Object):
exit_on_release = True
def set_exit_on_release(self, exit_on_release):
self.exit_on_release = exit_on_release
@dbus.service.method(AGENT_INTERFACE, in_signature="", out_signature="")
def Release(self):
logging.info("agent: Release")
if self.exit_on_release:
mainloop.quit()
@dbus.service.method(AGENT_INTERFACE, in_signature="os", out_signature="")
def AuthorizeService(self, device, uuid):
logging.info("agent: AuthorizeService")
return
@dbus.service.method(AGENT_INTERFACE, in_signature="o", out_signature="s")
def RequestPinCode(self, device):
logging.info("RequestPinCode (%s)" % (device))
return "0000"
@dbus.service.method(AGENT_INTERFACE, in_signature="o", out_signature="u")
def RequestPasskey(self, device):
logging.info("RequestPasskey (%s)" % (device))
return 0
@dbus.service.method(AGENT_INTERFACE, in_signature="ouq", out_signature="")
def DisplayPasskey(self, device, passkey, entered):
logging.info("agent: DisplayPasskey (%s, %06u entered %u)" % (device, passkey, entered))
@dbus.service.method(AGENT_INTERFACE, in_signature="os", out_signature="")
def DisplayPinCode(self, device, pincode):
logging.info("agent: DisplayPinCode (%s, %s)" % (device, pincode))
@dbus.service.method(AGENT_INTERFACE, in_signature="ou", out_signature="")
def RequestConfirmation(self, device, passkey):
logging.info("agent: RequestConfirmation")
return
@dbus.service.method(AGENT_INTERFACE, in_signature="o", out_signature="")
def RequestAuthorization(self, device):
logging.info("agent: RequestAuthorization")
return
@dbus.service.method(AGENT_INTERFACE, in_signature="", out_signature="")
def Cancel(self):
logging.info("agent: Cancel")
def do_main_loop(dev_id):
global gadapter
# adapter
try:
adapter = bluezutils.find_adapter(dev_id)
except:
# try to find any adapter
adapter = bluezutils.find_adapter(None)
logging.info("adapter found")
gadapter = adapter
adapters = {}
om = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager")
objects = om.GetManagedObjects()
for path, interfaces in objects.items():
if "org.bluez.Device1" in interfaces:
gdevices[path] = interfaces["org.bluez.Device1"]
if "org.bluez.Adapter1" in interfaces:
adapters[path] = interfaces["org.bluez.Adapter1"]
adapter_props = adapters[adapter.object_path]
logging.info(adapter_props["Name"] + "(" + adapter_props["Address"] + "), powered=" + str(adapter_props["Powered"]))
# power on adapter if needed
if adapter_props["Powered"] == 0:
try:
logging.info("powering on adapter ("+ adapter_props["Address"] +")")
adapterSetter = dbus.Interface(bus.get_object("org.bluez", adapter.object_path), "org.freedesktop.DBus.Properties")
adapterSetter.Set("org.bluez.Adapter1", "Powered", True)
except:
pass # hum, not nice
gdiscovering = False
# events
# use events while i manage to stop discovery only from the process having started it
signal.signal(signal.SIGUSR1, user_signal_start_discovery)
signal.signal(signal.SIGUSR2, user_signal_stop_discovery)
mainloop = GLib.MainLoop()
mainloop.run()
if __name__ == '__main__':
# options
option_list = [ make_option("-i", "--device", action="store", type="string", dest="dev_id") ]
parser = OptionParser(option_list=option_list)
(options, args) = parser.parse_args()
# initialize the seen devices file
updateKnownDevicesList(g_devices_seen)
# register dbus
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
bus.add_signal_receiver(interfaces_added, dbus_interface = "org.freedesktop.DBus.ObjectManager", signal_name = "InterfacesAdded")
bus.add_signal_receiver(properties_changed, dbus_interface = "org.freedesktop.DBus.Properties", signal_name = "PropertiesChanged", arg0 = "org.bluez.Device1", path_keyword = "path")
# register the agent
agentpath = "/storage/agent"
obj = bus.get_object("org.bluez", "/org/bluez")
manager = dbus.Interface(obj, "org.bluez.AgentManager1")
manager.RegisterAgent(agentpath, "NoInputNoOutput")
manager.RequestDefaultAgent(agentpath)
agent = Agent(bus, agentpath)
logging.info("agent registered")
# run the agent, allows some tries while hardware can take time to initiate
time.sleep(2)
try:
do_main_loop(options.dev_id)
except Exception as e:
logging.error("agent fails")
logging.error(e, exc_info=True)
raise
logging.error("agent gave up")