Source code for jaeger.interfaces.virtual
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2021-07-11
# @Filename: virtual.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
from __future__ import annotations
import asyncio
from typing import Dict, List
from jaeger.interfaces.bus import BusABC
from .message import Message
queues: Dict[str, List[asyncio.Queue]] = {}
[docs]
class VirtualBus(BusABC):
"""A class implementing a virtual CAN bus that listens to messages on a channel."""
def __init__(self, channel: str):
self.channel = channel
self.queue: asyncio.Queue[Message] = asyncio.Queue()
if self.channel not in queues:
queues[self.channel] = [self.queue]
else:
queues[self.channel].append(self.queue)
[docs]
def send(self, msg: Message):
"""Send message to the virtual bus (self does not receive a copy)."""
for queue in queues[self.channel]:
if queue is self.queue:
continue
queue.put_nowait(msg)
[docs]
async def get(self):
"""Get messages from the bus."""
msg = await self.queue.get()
return msg