Source code for jaeger.utils.helpers

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2018-09-14
# @Filename: helpers.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import concurrent.futures
import enum
import logging
import warnings
from concurrent.futures import Executor
from contextlib import suppress
from functools import partial
from threading import Thread

from typing import TYPE_CHECKING, Callable, Generic, Optional, Type, TypeVar

from jaeger import log


if TYPE_CHECKING:
    from jaeger.actor import JaegerActor
    from jaeger.fps import FPS


__all__ = [
    "AsyncQueue",
    "StatusMixIn",
    "PollerList",
    "Poller",
    "AsyncioExecutor",
    "run_in_executor",
    "BaseBot",
]


[docs] class AsyncQueue(asyncio.Queue): """Provides an `asyncio.Queue` object with a watcher. Parameters ---------- callback A function to call when a new item is received from the queue. It can be a coroutine. """ def __init__(self, callback: Optional[Callable] = None): async def process_queue(): """Waits for the next item and sends it to the cb function.""" loop = asyncio.get_running_loop() while True: item = await self.get() if callback: loop.call_soon_threadsafe(callback, item) super().__init__() self.watcher = asyncio.create_task(process_queue())
Status_co = TypeVar("Status_co", bound=enum.Enum)
[docs] class StatusMixIn(Generic[Status_co]): """A mixin that provides status tracking with callbacks. Provides a status property that executes a list of callbacks when the status changes. Parameters ---------- maskbit_flags A class containing the available statuses as a series of maskbit flags. Usually as subclass of `enum.Flag`. initial_status The initial status. callback_func The function to call if the status changes. call_now Whether the callback function should be called when initialising. Attributes ---------- callbacks A list of the callback functions to call. """ def __init__( self, maskbit_flags: Type[Status_co], initial_status: Optional[Status_co] = None, callback_func: Optional[Callable] = None, call_now: bool = False, ): self._flags = maskbit_flags self.callbacks = [] self._status: Optional[Status_co] = initial_status self.watcher = None if callback_func is not None: self.callbacks.append(callback_func) if call_now is True: self.do_callbacks()
[docs] def add_callback(self, cb: Callable): """Adds a callback.""" self.callbacks.append(cb)
[docs] def remove_callback(self, cb: Callable): """Removes a callback.""" self.callbacks.remove(cb)
[docs] def do_callbacks(self): """Calls functions in ``callbacks``.""" assert hasattr( self, "callbacks" ), "missing callbacks attribute. Did you call __init__()?" for func in self.callbacks: func()
@property def status(self) -> Status_co: """Returns the status.""" if self._status is None: raise ValueError("status not set.") return self._status @status.setter def status(self, value: Status_co): """Sets the status.""" if value != self._status: self._status = self.flags(value) self.do_callbacks() if self.watcher is not None: self.watcher.set() @property def flags(self): """Gets the flags associated to this status.""" return self._flags @flags.setter def flags(self, value: Type[Status_co]): """Sets the flags associated to this status.""" self._flags = value self._status = None
[docs] async def wait_for_status( self, value, ): """Awaits until the status matches ``value``.""" if self.status == value: return self.watcher = asyncio.Event() while self.status != value: await self.watcher.wait() if self.watcher is not None: self.watcher.clear() self.watcher = None
[docs] class PollerList(list): """A list of `.Poller` to be managed jointly.""" def __init__(self, pollers=[]): names = [poller.name for poller in pollers] assert len(names) == len(set(names)), "repeated names in poller list." list.__init__(self, pollers) @property def names(self): """List the poller names.""" return [poller.name for poller in self]
[docs] def append(self, poller): """Adds a poller.""" assert isinstance(poller, Poller), "not a poller." names = [pp.name for pp in self] if poller.name in names: raise ValueError( f"a poller with name {poller.name} is already in the list." ) list.append(self, poller)
def __getattr__(self, name): """Gets a poller by its name.""" for poller in self: if name == poller.name: return poller return list.__getitem__(self, name) def __getitem__(self, item): """Gets the poller by name.""" if isinstance(item, str): return self.__getattr__(item) return list.__getitem__(self, item)
[docs] async def set_delay(self, delay=None, immediate=False): """Sets the delay for all the pollers. Parameters ---------- delay : float The delay between calls to the callback. If `None`, restores the original delay. immediate : bool If `True`, stops the currently running tasks and sets the new delay. Otherwise waits for the current tasks to complete. """ delay_coros = [ poller.set_delay(delay=delay, immediate=immediate) for poller in self ] await asyncio.gather(*delay_coros)
[docs] def start(self, delay=None): """Starts all the pollers. Parameters ---------- delay : float The delay between calls to the callback. If not specified, uses the default delays for each poller. """ for poller in self: poller.start(delay=delay)
[docs] async def stop(self): """Cancels all the poller.""" stop_coros = [poller.stop() for poller in self] await asyncio.gather(*stop_coros)
@property def running(self): """Returns `True` if at least one poller is running.""" return any([poller.running for poller in self])
[docs] class Poller(object): """A task that runs a callback periodically. Parameters ---------- name : str The name of the poller. callback : function or coroutine A function or coroutine to call periodically. delay : float Initial delay between calls to the callback. """ def __init__(self, name, callback, delay=1.0): self.name = name self.callback = callback self._orig_delay = delay self.delay = delay # Create two tasks, one for the sleep timer and another for the poller # itself. We do this because we want to be able to cancell the sleep # coroutine if we are going to change the delay. self._sleep_task = None self._task = None
[docs] async def poller(self): """The polling loop.""" if self._task is None: raise RuntimeError("Task is not running.") while True: try: if asyncio.iscoroutinefunction(self.callback): await self.callback() else: self.callback() except Exception as ee: if ee.__class__ == asyncio.CancelledError: raise if not self._task.cancelled: asyncio.get_running_loop().call_exception_handler( {"message": "failed running callback", "exception": ee} ) self._sleep_task = asyncio.create_task(asyncio.sleep(self.delay)) await self._sleep_task
[docs] async def set_delay(self, delay=None, immediate=False): """Sets the delay for polling. Parameters ---------- delay : float The delay between calls to the callback. If `None`, restores the original delay. immediate : bool If `True`, stops the currently running task and sets the new delay. Otherwise waits for the current task to complete. """ # Only change delay if the difference is significant. if delay and abs(self.delay - delay) < 1e-6: return if not self.running: return if immediate: await self.stop() self.start(delay) else: self.delay = delay or self._orig_delay
[docs] def start(self, delay=None): """Starts the poller. Parameters ---------- delay : float The delay between calls to the callback. If not specified, restores the original delay used when the class was instantiated. """ self.delay = delay or self._orig_delay if self.running: return self._task = asyncio.create_task(self.poller()) return self
[docs] async def stop(self): """Cancel the poller.""" if self._task is None or not self.running: return self._task.cancel() with suppress(asyncio.CancelledError): if self._task is not None: await self._task
[docs] async def call_now(self): """Calls the callback immediately.""" restart = False delay = self.delay if self.running: await self.stop() restart = True if asyncio.iscoroutinefunction(self.callback): await asyncio.create_task(self.callback()) else: self.callback() if restart: self.start(delay=delay)
@property def running(self): """Returns `True` if the poller is running.""" if self._task and not self._task.cancelled(): return True return False
[docs] class AsyncioExecutor(Executor): """An executor to run coroutines from a normal function. Copied from http://bit.ly/2IYmqzN. To use, do :: with AsyncioExecutor() as executor: future = executor.submit(asyncio.sleep, 1) """ def __init__(self): self._loop = asyncio.new_event_loop() self._thread = Thread(target=self._target) self._thread.start() def _target(self): asyncio.set_event_loop(self._loop) self._loop.run_forever()
[docs] def submit(self, fn, *args, **kwargs): """Submit a coroutine to the executor.""" coro = fn(*args, **kwargs) return asyncio.run_coroutine_threadsafe(coro, self._loop)
[docs] def shutdown(self, wait=True): self._loop.call_soon_threadsafe(self._loop.stop) if wait: self._thread.join()
[docs] async def run_in_executor(fn, *args, catch_warnings=False, executor="thread", **kwargs): """Runs a function in an executor. In addition to streamlining the use of the executor, this function catches any warning issued during the execution and reissues them after the executor is done. This is important when using the actor log handler since inside the executor there is no loop that CLU can use to output the warnings. In general, note that the function must not try to do anything with the actor since they run on different loops. """ fn = partial(fn, *args, **kwargs) if executor == "thread": executor = concurrent.futures.ThreadPoolExecutor elif executor == "process": executor = concurrent.futures.ProcessPoolExecutor else: raise ValueError("Invalid executor name.") if catch_warnings: with warnings.catch_warnings(record=True) as records: with executor() as pool: result = await asyncio.get_event_loop().run_in_executor(pool, fn) for ww in records: warnings.warn(ww.message, ww.category) else: with executor() as pool: result = await asyncio.get_running_loop().run_in_executor(pool, fn) return result
[docs] class BaseBot: """A class that monitors a subsystem.""" def __init__(self, fps: FPS): self.fps = fps self.ieb = fps.ieb self.actor: JaegerActor | None = None self._task: asyncio.Task | None = None def set_actor(self, actor: JaegerActor): self.actor = actor
[docs] async def start(self, delay: float | bool = False): """Stars the monitoring loop.""" await self.stop() if delay is not False and delay > 0: await asyncio.sleep(delay) self._task = asyncio.create_task(self._loop())
[docs] async def stop(self): """Stops the monitoring loop.""" if self._task: with suppress(asyncio.CancelledError): self._task.cancel() await self._task
[docs] async def restart(self): """Restarts the loop.""" await self.stop() await self.start(delay=1)
async def _loop(self): raise NotImplementedError()
[docs] def notify(self, message: str | dict, level=logging.WARNING): """Logs a message and outputs it to the actor.""" if isinstance(message, str): log.log(level, message) # No need to output to actor as well if this is a warning or above # because the ActorHandler already does it and we get duplicate messages. if level >= logging.WARNING: return if self.actor: message_code: str = "w" if level == logging.DEBUG: message_code = "d" elif level == logging.INFO: message_code = "i" elif level == logging.WARNING: message_code = "w" elif level == logging.ERROR: message_code = "e" if isinstance(message, str): message = {"text": message} self.actor.write(message_code, message=message)