Source code for jaeger.utils.helpers

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2018-09-14
# @Filename: helpers.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

import asyncio
from concurrent.futures import Executor
from contextlib import suppress
from threading import Thread


__ALL__ = ['AsyncQueue', 'StatusMixIn', 'PollerList', 'Poller', 'AsyncioExecutor']


[docs]class AsyncQueue(asyncio.Queue): """Provides an `asyncio.Queue` object with a watcher. Parameters ---------- loop : event loop or `None` The current event loop, or `asyncio.get_event_loop`. callback : callable A function to call when a new item is received from the queue. It can be a coroutine. """ def __init__(self, loop=None, callback=None): async def process_queue(loop): """Waits for the next item and sends it to the cb function.""" while True: item = await self.get() if callback: loop.call_soon_threadsafe(callback, item) super().__init__() loop = loop or asyncio.get_event_loop() self.watcher = loop.create_task(process_queue(loop))
[docs]class StatusMixIn(object): """A mixin that provides status tracking with callbacks. Provides a status property that executes a list of callbacks when the status changes. Parameters ---------- maskbit_flags : class A class containing the available statuses as a series of maskbit flags. Usually as subclass of `enum.Flag`. initial_status : str The initial status. callback_func : function The function to call if the status changes. call_now : bool Whether the callback function should be called when initialising. Attributes ---------- callbacks : list A list of the callback functions to call. """ def __init__(self, maskbit_flags, initial_status=None, callback_func=None, call_now=False): self._flags = maskbit_flags self.callbacks = [] self._status = initial_status self.watcher = None if callback_func is not None: self.callbacks.append(callback_func) if call_now is True: self.do_callbacks()
[docs] def add_callback(self, cb): """Adds a callback.""" self.callbacks.append(cb)
[docs] def remove_callback(self, cb): """Removes a callback.""" self.callbacks.remove(cb)
[docs] def do_callbacks(self): """Calls functions in ``callbacks``.""" assert hasattr(self, 'callbacks'), \ 'missing callbacks attribute. Did you call __init__()?' for func in self.callbacks: func()
@property def status(self): """Returns the status.""" return self._status @status.setter def status(self, value): """Sets the status.""" if value != self._status: self._status = self.flags(value) self.do_callbacks() if self.watcher is not None: self.watcher.set() @property def flags(self): """Gets the flags associated to this status.""" return self._flags @flags.setter def flags(self, value): """Sets the flags associated to this status.""" self._flags = value self._status = None
[docs] async def wait_for_status(self, value, loop=None): """Awaits until the status matches ``value``.""" if self.status == value: return if loop is None: if hasattr(self, 'loop') and self.loop is not None: loop = self.loop else: loop = asyncio.get_event_loop() self.watcher = asyncio.Event(loop=loop) while self.status != value: await self.watcher.wait() if self.watcher is not None: self.watcher.clear() self.watcher = None
[docs]class PollerList(list): """A list of `.Poller` to be managed jointly.""" def __init__(self, pollers=[]): names = [poller.name for poller in pollers] assert len(names) == len(set(names)), 'repeated names in poller list.' list.__init__(self, pollers) @property def names(self): """List the poller names.""" return [poller.name for poller in self]
[docs] def append(self, poller): """Adds a poller.""" assert isinstance(poller, Poller), 'not a poller.' names = [pp.name for pp in self] if poller.name in names: raise ValueError(f'a poller with name {poller.name} is ' 'already in the list.') list.append(self, poller)
def __getattr__(self, name): """Gets a poller by its name.""" for poller in self: if name == poller.name: return poller return list.__getitem__(self, name) def __getitem__(self, item): """Gets the poller by name.""" if isinstance(item, str): return self.__getattr__(item) return list.__getitem__(self, item)
[docs] async def set_delay(self, delay=None, immediate=False): """Sets the delay for all the pollers. Parameters ---------- delay : float The delay between calls to the callback. If `None`, restores the original delay. immediate : bool If `True`, stops the currently running tasks and sets the new delay. Otherwise waits for the current tasks to complete. """ delay_coros = [poller.set_delay(delay=delay, immediate=immediate) for poller in self] await asyncio.gather(*delay_coros)
[docs] def start(self, delay=None): """Starts all the pollers. Parameters ---------- delay : float The delay between calls to the callback. If not specified, uses the default delays for each poller. """ for poller in self: poller.start(delay=delay)
[docs] async def stop(self): """Cancels all the poller.""" stop_coros = [poller.stop() for poller in self] await asyncio.gather(*stop_coros)
@property def running(self): """Returns `True` if at least one poller is running.""" return any([poller.running for poller in self])
[docs]class Poller(object): """A task that runs a callback periodically. Parameters ---------- name : str The name of the poller. callback : function or coroutine A function or coroutine to call periodically. delay : float Initial delay between calls to the callback. loop : event loop The event loop to which to attach the task. """ def __init__(self, name, callback, delay=1, loop=None): self.name = name self.callback = callback self._orig_delay = delay self.delay = delay self.loop = loop or asyncio.get_event_loop() # Create two tasks, one for the sleep timer and another for the poller # itself. We do this because we want to be able to cancell the sleep # coroutine if we are going to change the delay. self._sleep_task = None self._task = None
[docs] async def poller(self): """The polling loop.""" while True: try: if asyncio.iscoroutinefunction(self.callback): await self.callback() else: self.callback() except Exception as ee: if ee.__class__ == asyncio.CancelledError: raise if not self._task.cancelled: self.loop.call_exception_handler({'message': 'failed running callback', 'exception': ee}) self._sleep_task = self.loop.create_task(asyncio.sleep(self.delay)) await self._sleep_task
[docs] async def set_delay(self, delay=None, immediate=False): """Sets the delay for polling. Parameters ---------- delay : float The delay between calls to the callback. If `None`, restores the original delay. immediate : bool If `True`, stops the currently running task and sets the new delay. Otherwise waits for the current task to complete. """ # Only change delay if the difference is significant. if delay and abs(self.delay - delay) < 1e-6: return if not self.running: return if immediate: await self.stop() self.start(delay) else: self.delay = delay or self._orig_delay
[docs] def start(self, delay=None): """Starts the poller. Parameters ---------- delay : float The delay between calls to the callback. If not specified, restores the original delay used when the class was instantiated. """ self.delay = delay or self._orig_delay if self.running: return self._task = self.loop.create_task(self.poller()) return self
[docs] async def stop(self): """Cancel the poller.""" if not self.running: return self._task.cancel() with suppress(asyncio.CancelledError): await self._task
[docs] async def call_now(self): """Calls the callback immediately.""" restart = False delay = self.delay if self.running: await self.stop() restart = True if asyncio.iscoroutinefunction(self.callback): await self.loop.create_task(self.callback()) else: self.callback() if restart: self.start(delay=delay)
@property def running(self): """Returns `True` if the poller is running.""" if self._task and not self._task.cancelled(): return True return False
[docs]class AsyncioExecutor(Executor): """An executor to run coroutines from a normal function. Copied from http://bit.ly/2IYmqzN. To use, do :: with AsyncioExecutor() as executor: future = executor.submit(asyncio.sleep, 1) """ def __init__(self): self._loop = asyncio.new_event_loop() self._thread = Thread(target=self._target) self._thread.start() def _target(self): asyncio.set_event_loop(self._loop) self._loop.run_forever()
[docs] def submit(self, fn, *args, **kwargs): """Submit a coroutine to the executor.""" coro = fn(*args, **kwargs) return asyncio.run_coroutine_threadsafe(coro, self._loop)
[docs] def shutdown(self, wait=True): self._loop.call_soon_threadsafe(self._loop.stop) if wait: self._thread.join()