[docs]classNotifier:"""Notifier class to report bus messages to multiple listeners."""def__init__(self,listeners:List[Listener_co]=[],buses:List[Bus_co]=[]):self.loop=asyncio.get_running_loop()self.listeners=listenersself.tasks:list[asyncio.Task]=[]self.buses:List[BusABC]=[]forbusinbuses:self.add_bus(bus)
[docs]defstop(self):"""Stops the notifier."""fortaskinself.tasks:task.cancel()self.buses=[]
[docs]defadd_listener(self,callback:Listener_co):"""Adds a listener."""self.listeners.append(callback)
[docs]defadd_bus(self,bus):"""Adds a bus to monitor."""self.buses.append(bus)self.tasks.append(asyncio.create_task(self._monitor_bus(bus)))
[docs]defremove_notifier(self,callback:Listener_co):"""Removes a listener."""ifcallbackinself.listeners:self.listeners.remove(callback)
asyncdef_monitor_bus(self,bus:BusABC):"""Monitors buses and calls the listeners when a message is received."""whileTrue:msg=awaitbus.get()ifmsgisnotNone:forlistenerinself.listeners:asyncio.create_task(listener(msg))