#!/usr/bin/python from __future__ import absolute_import, print_function from optparse import OptionParser, make_option import dbus import dbus.service import dbus.mainloop.glib import bluezutils import time import os import logging import signal from gi.repository import GLib AGENT_INTERFACE = 'org.bluez.Agent1' gadapter = None gdiscovering = False gdevices = {} g_devices_seen = {} logging.basicConfig(filename='/var/log/bluetooth-agent.log', level=logging.DEBUG, format='%(asctime)s %(message)s') def bool2str(val, valiftrue, valiffalse): if val: return valiftrue else: return valiffalse def logging_status(msg): with open("/var/run/bt_status", "w") as file: file.write(msg + "\n") def connect_device(path, address, properties, forceConnect, typeInput, typeAudio): global gdiscovering devName = "" trusted = False connected = False # avoid devices without interesting information if "Trusted" not in properties: return if "Connected" not in properties: return rproperties = {} # Get the adapter name and bt mac for key, value in properties.items(): if type(value) is dbus.String: value = value.encode('ascii', 'replace') rproperties[key] = value trusted = rproperties["Trusted"] paired = rproperties["Paired"] devName = getDevName(rproperties) shortDevName = getShortDevName(rproperties) connected = rproperties["Connected"] # skip non input devices if "Icon" not in properties: logging.info("Skipping device {} (no type)".format(getDevName(rproperties))); return if not ( (typeInput and properties["Icon"].startswith("input")) or (typeAudio and properties["Icon"].startswith("audio-card")) ): logging.info("Skipping device {} because of type {}".format(getDevName(rproperties), properties["Icon"])); return blacklistAddr, blacklistName = getBlacklistAddrAndName(properties) # add device to the seen list if blacklistAddr not in g_devices_seen: g_devices_seen[blacklistAddr] = { "name": blacklistName } updateKnownDevicesList(g_devices_seen) # skip blacklisted devices if blacklistAddr is not None and isDeviceBlacklisted(blacklistAddr): logging.info("Skipping blacklisted device {}".format(blacklistName)) return logging.info("event for " + devName + "(paired=" + bool2str(paired, "paired", "not paired") + ", trusted=" + bool2str(trusted, "trusted", "untrusted") + ", connected=" + bool2str(connected, "connected", "disconnected") + ")") # skipping connected devices if paired and trusted and connected: logging.info("Skipping already connected device {}".format(getDevName(rproperties))); return if not paired: if connected == False and gdiscovering: doParing(address, devName, shortDevName) return # now it is paired if not trusted and (gdiscovering or forceConnect): logging.info("Trusting (" + devName + ")") logging_status("Trusting " + shortDevName + "...") bluezProps = dbus.Interface(bus.get_object("org.bluez", path), "org.freedesktop.DBus.Properties") bluezProps.Set("org.bluez.Device1", "Trusted", True) # now it is paired and trusted # Connect if Trusted and paired if not connected or forceConnect: doConnect(address, devName, shortDevName) def doParing(address, devName, shortDevName): logging.info("Pairing... (" + devName + ")") logging_status("Pairing " + shortDevName + "...") device = bluezutils.find_device(address) try: device.Pair() except Exception as e: logging.info("Pairing failed (" + devName + ")") logging_status("Pairing failed (" + shortDevName + ")") def doConnect(address, devName, shortDevName): global gadapter global gdiscovering try: # discovery stopped during connection to help some devices if gdiscovering: logging.info("Stop discovery") gadapter.StopDiscovery() device = bluezutils.find_device(address) ntry=5 while ntry > 0: ntry = ntry -1 try: logging.info("Connecting... (" + devName + ")") logging_status("Connecting " + shortDevName + "...") device.Connect() logging.info("Connected successfully (" + devName + ")") logging_status("Connected successfully (" + shortDevName + ")") if gdiscovering: logging.info("Start discovery") gadapter.StartDiscovery() return except dbus.exceptions.DBusException as err: logging.info("dbus: " + err.get_dbus_message()) time.sleep(1) except Exception as err: logging.info("Connection failed (" + devName + ")") time.sleep(1) logging.info("Connection failed. Give up. (" + devName + ")") logging_status("Connection failed. Give up. (" + shortDevName + ")") if gdiscovering: logging.info("Start discovery") gadapter.StartDiscovery() except Exception as e: if gdiscovering: logging.info("Start discovery") gadapter.StartDiscovery() # don't raise, while startdiscovery doesn't like it #raise e def getDevName(properties): #devName = properties["Name"] + " (" + properties["Address"] + ", " + properties["Icon"] + ")" #devStatus = "Trusted=" + str(properties["Trusted"]) + ", paired=" + str(properties["Paired"]) + ", connected=" + str(properties["Connected"]), ", blocked=" + str(properties["Blocked"]) #devTech = "legacyPairing: " + str(properties["LegacyPairing"]) # + ", RSSI: " + properties["RSSI"] if "Name" in properties and "Address" in properties and "Icon" in properties: return str(properties["Name"]) + " (" + str(properties["Address"]) + ", " + str(properties["Icon"]) + ")" if "Name" in properties and "Address" in properties: return str(properties["Name"]) + " (" + str(properties["Address"]) + ")" if "Name" in properties and "Icon" in properties: return str(properties["Name"]) + " (" + str(properties["Icon"]) + ")" if "Name" in properties: return str(properties["Name"]) if "Address" in properties and "Icon" in properties: return str(properties["Address"]) + " (" + str(properties["Icon"]) + ")" if "Address" in properties: return str(properties["Address"]) if "Icon" in properties: return str(properties["Icon"]) return "unknown" def updateKnownDevicesList(g_devices_seen): f = open("/var/run/bluetooth_seen", "w") for key in g_devices_seen: f.write("{} {}\n".format(key, g_devices_seen[key]["name"])) f.close() def isDeviceBlacklisted(blacklistAddr): datafile = "/var/lib/bluetooth/bluetooth_blacklisted" # reread the file each time in case somebody changed it if not os.path.isfile(datafile): return False with open(datafile) as file: lines = file.readlines() for line in lines: if line[0:len(blacklistAddr)] == blacklistAddr: return True return False def getBlacklistAddrAndName(properties): if "Address" not in properties: return None address = str(properties["Address"]) name = "" if "Name" in properties: name = str(properties["Name"]) elif "Icon" in properties: name = str(properties["Icon"]) else: name = "unknown" return address, name def getShortDevName(properties): if "Name" in properties: return str(properties["Name"]) if "Address" in properties: return str(properties["Address"]) if "Icon" in properties: return str(properties["Icon"]) return "unknown" def getBluetoothWantedTypes(): if not os.path.isfile("/var/run/bt_types"): logging.info("no bt_types file") return True, True btype = "" with open("/var/run/bt_types", "r") as file: btype = file.read().strip() logging.info("bt_type: {}".format(btype)) if btype == "": return True, True if btype == "pad": return True, False if btype == "audio": return False, True return False, False def interfaces_added(path, interfaces): global gdevices if "org.bluez.Device1" not in interfaces: return if not interfaces["org.bluez.Device1"]: return properties = interfaces["org.bluez.Device1"] if path in gdevices: gdevices[path] = merge2dicts(gdevices[path], properties) else: gdevices[path] = properties if "Address" in gdevices[path]: typeInput, typeAudio = getBluetoothWantedTypes() connect_device(path, properties["Address"], gdevices[path], False, typeInput, typeAudio) def properties_changed(interface, changed, invalidated, path): global gdevices if interface != "org.bluez.Device1": return if path in gdevices: gdevices[path] = merge2dicts(gdevices[path], changed) else: gdevices[path] = changed #logging.info("Properties changed:") #logging.info(changed) #logging.info(invalidated) if "Paired" in changed and changed["Paired"] == True: # ok, do as in simple-agent, trust and connect typeInput, typeAudio = getBluetoothWantedTypes() connect_device(path, gdevices[path]["Address"], gdevices[path], True, typeInput, typeAudio) return # ok, it is now connected, what else ? if "Connected" in changed and changed["Connected"] == True: return if "Connected" in changed and changed["Connected"] == False: logging.info("Skipping (property Connected changed to False)"); return if "Address" in gdevices[path]: typeInput, typeAudio = getBluetoothWantedTypes() connect_device(path, gdevices[path]["Address"], gdevices[path], False, typeInput, typeAudio) def merge2dicts(d1, d2): res = d1.copy() res.update(d2) return res def user_signal_start_discovery(signum, frame): global gdiscovering global gadapter try: if gdiscovering == False: gdiscovering = True logging.info("Start discovery (signal)") gadapter.StartDiscovery() except: pass def user_signal_stop_discovery(signum, frame): global gdiscovering global gadapter try: if gdiscovering: gdiscovering = False logging.info("Stop discovery (signal)") gadapter.StopDiscovery() except: pass class Agent(dbus.service.Object): exit_on_release = True def set_exit_on_release(self, exit_on_release): self.exit_on_release = exit_on_release @dbus.service.method(AGENT_INTERFACE, in_signature="", out_signature="") def Release(self): logging.info("agent: Release") if self.exit_on_release: mainloop.quit() @dbus.service.method(AGENT_INTERFACE, in_signature="os", out_signature="") def AuthorizeService(self, device, uuid): logging.info("agent: AuthorizeService") return @dbus.service.method(AGENT_INTERFACE, in_signature="o", out_signature="s") def RequestPinCode(self, device): logging.info("RequestPinCode (%s)" % (device)) return "0000" @dbus.service.method(AGENT_INTERFACE, in_signature="o", out_signature="u") def RequestPasskey(self, device): logging.info("RequestPasskey (%s)" % (device)) return 0 @dbus.service.method(AGENT_INTERFACE, in_signature="ouq", out_signature="") def DisplayPasskey(self, device, passkey, entered): logging.info("agent: DisplayPasskey (%s, %06u entered %u)" % (device, passkey, entered)) @dbus.service.method(AGENT_INTERFACE, in_signature="os", out_signature="") def DisplayPinCode(self, device, pincode): logging.info("agent: DisplayPinCode (%s, %s)" % (device, pincode)) @dbus.service.method(AGENT_INTERFACE, in_signature="ou", out_signature="") def RequestConfirmation(self, device, passkey): logging.info("agent: RequestConfirmation") return @dbus.service.method(AGENT_INTERFACE, in_signature="o", out_signature="") def RequestAuthorization(self, device): logging.info("agent: RequestAuthorization") return @dbus.service.method(AGENT_INTERFACE, in_signature="", out_signature="") def Cancel(self): logging.info("agent: Cancel") def do_main_loop(dev_id): global gadapter # adapter try: adapter = bluezutils.find_adapter(dev_id) except: # try to find any adapter adapter = bluezutils.find_adapter(None) logging.info("adapter found") gadapter = adapter adapters = {} om = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager") objects = om.GetManagedObjects() for path, interfaces in objects.items(): if "org.bluez.Device1" in interfaces: gdevices[path] = interfaces["org.bluez.Device1"] if "org.bluez.Adapter1" in interfaces: adapters[path] = interfaces["org.bluez.Adapter1"] adapter_props = adapters[adapter.object_path] logging.info(adapter_props["Name"] + "(" + adapter_props["Address"] + "), powered=" + str(adapter_props["Powered"])) # power on adapter if needed if adapter_props["Powered"] == 0: try: logging.info("powering on adapter ("+ adapter_props["Address"] +")") adapterSetter = dbus.Interface(bus.get_object("org.bluez", adapter.object_path), "org.freedesktop.DBus.Properties") adapterSetter.Set("org.bluez.Adapter1", "Powered", True) except: pass # hum, not nice gdiscovering = False # events # use events while i manage to stop discovery only from the process having started it signal.signal(signal.SIGUSR1, user_signal_start_discovery) signal.signal(signal.SIGUSR2, user_signal_stop_discovery) mainloop = GLib.MainLoop() mainloop.run() if __name__ == '__main__': # options option_list = [ make_option("-i", "--device", action="store", type="string", dest="dev_id") ] parser = OptionParser(option_list=option_list) (options, args) = parser.parse_args() # initialize the seen devices file updateKnownDevicesList(g_devices_seen) # register dbus dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() bus.add_signal_receiver(interfaces_added, dbus_interface = "org.freedesktop.DBus.ObjectManager", signal_name = "InterfacesAdded") bus.add_signal_receiver(properties_changed, dbus_interface = "org.freedesktop.DBus.Properties", signal_name = "PropertiesChanged", arg0 = "org.bluez.Device1", path_keyword = "path") # register the agent agentpath = "/storage/.cache/agent" obj = bus.get_object("org.bluez", "/org/bluez") manager = dbus.Interface(obj, "org.bluez.AgentManager1") manager.RegisterAgent(agentpath, "NoInputNoOutput") manager.RequestDefaultAgent(agentpath) agent = Agent(bus, agentpath) logging.info("agent registered") # run the agent, allows some tries while hardware can take time to initiate time.sleep(5) try: do_main_loop(options.dev_id) except Exception as e: logging.error("agent fails") logging.error(e, exc_info=True) raise logging.error("agent gave up")