Source code for jaeger.interfaces.notifier
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2021-07-11
# @Filename: notifier.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING, Any, Callable, Coroutine, List, TypeVar
from .message import Message
if TYPE_CHECKING:
from .bus import BusABC
__all__ = ["Notifier"]
Listener_co = Callable[..., Coroutine[Message, Any, Any]]
Bus_co = TypeVar("Bus_co", bound="BusABC")
[docs]
class Notifier:
"""Notifier class to report bus messages to multiple listeners."""
def __init__(self, listeners: List[Listener_co] = [], buses: List[Bus_co] = []):
self.loop = asyncio.get_running_loop()
self.listeners = listeners
self.tasks: list[asyncio.Task] = []
self.buses: List[BusABC] = []
for bus in buses:
self.add_bus(bus)
[docs]
def stop(self):
"""Stops the notifier."""
for task in self.tasks:
task.cancel()
self.buses = []
[docs]
def add_listener(self, callback: Listener_co):
"""Adds a listener."""
self.listeners.append(callback)
[docs]
def add_bus(self, bus):
"""Adds a bus to monitor."""
self.buses.append(bus)
self.tasks.append(asyncio.create_task(self._monitor_bus(bus)))
[docs]
def remove_notifier(self, callback: Listener_co):
"""Removes a listener."""
if callback in self.listeners:
self.listeners.remove(callback)
async def _monitor_bus(self, bus: BusABC):
"""Monitors buses and calls the listeners when a message is received."""
while True:
msg = await bus.get()
if msg is not None:
for listener in self.listeners:
asyncio.create_task(listener(msg))