Source code for jaeger.interfaces.bus
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2021-07-12
# @Filename: bus.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
from __future__ import annotations
import abc
from .message import Message
[docs]
class BusABC(object, metaclass=abc.ABCMeta):
"""A base CAN bus."""
def __init__(self, *args, **kwargs):
pass
[docs]
async def open(self, *args, **kwargs) -> bool:
"""Starts the bus.
This method call the ``_open_internal`` method in the subclass bus, if
present. It's meant mainly to initialise any process that needs to be
run as a coroutine. Must return `True` if the connection was successful,
`False` or an error otherwise.
"""
return await self._open_internal(*args, **kwargs)
async def _open_internal(self):
return True
[docs]
@abc.abstractmethod
async def get(self):
"""Receives messages from the bus."""
pass
[docs]
@abc.abstractmethod
def send(self, msg: Message, **kwargs):
"""Sends a message to the bus."""
pass