Source code for jaeger.commands.base

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2018-08-27
# @Filename: base.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import binascii
import collections
import logging
import sys
import time
import warnings

from typing import Any, Callable, Dict, List, Optional, Union

from jaeger import can_log, config, log, maskbits
from jaeger.exceptions import CommandError, JaegerError, JaegerUserWarning
from jaeger.interfaces import BusABC, Message
from jaeger.maskbits import CommandStatus, ResponseCode
from jaeger.utils import StatusMixIn, get_identifier, parse_identifier

from . import CommandID


__all__ = ["SuperMessage", "Command", "EmptyPool"]


# A pool of UIDs that can be assigned to each command for a given command_id.
# The format of the pool is UID_POOL[command_id][positioner_id] so that each
# positioner has uid_bits (64) messages for each command id. UID=0 is always
# reserved for broadcasts.
UID_POOL = collections.defaultdict(dict)

# Starting value for command UID.
COMMAND_UID = 0


[docs] class SuperMessage(Message): """An extended CAN ``Message`` class. Expands the ``Message`` class to handle custom arbitration IDs for extended frames. Parameters ---------- command The command associated with this message. data Payload to pass to ``Message``. positioner_id The positioner to which the message will be sent (0 for broadcast). uid The unique identifier for this message. extended_id Whether the id is an 11 bit (False) or 29 bit (True) address. """ def __init__( self, command: Command, data: bytearray = bytearray([]), positioner_id: int = 0, uid: int = 0, response_code: int = 0, extended_id: bool = True, ): self.command = command self.positioner_id = positioner_id self.uid = uid uid_bits = config["positioner"]["uid_bits"] max_uid = 2**uid_bits assert self.uid < max_uid, f"UID must be <= {max_uid}." if extended_id: arbitration_id = get_identifier( positioner_id, int(command.command_id), uid=self.uid, response_code=response_code, ) else: arbitration_id = positioner_id Message.__init__( self, data=data, arbitration_id=arbitration_id, is_extended_id=extended_id, )
class Reply(object): """Parses a reply message. Parameters ---------- message The received message command The `.Command` to which this message is replying. """ def __init__(self, message: Message, command: Optional[Command] = None): assert isinstance(message, Message), "invalid message" #: The command for which this reply is intended. self.command = command #: The raw ``Message``. self.message = message #: The data from the message. self.data = message.data #: The `~.maskbits.ResponseCode` bit returned by the reply. self.response_code: maskbits.ResponseCode #: The UID of the message this reply is for. self.uid: int ( self.positioner_id, reply_cmd_id, self.uid, self.response_code, ) = parse_identifier(message.arbitration_id) if command is not None: assert command.command_id == reply_cmd_id, ( f"Command command_id={command.command_id} and " f"reply command_id={reply_cmd_id} do not match" ) self.command_id = CommandID(reply_cmd_id) def __repr__(self): command_name = self.command.command_id.name if self.command else "NONE" return ( f"<Reply (command_id={command_name!r}, " f"positioner_id={self.positioner_id}, " f"response_code={self.response_code.name!r})>" )
[docs] class EmptyPool(CommandError): pass
data_co = Union[None, bytearray, List[bytearray]] if sys.version_info >= (3, 9): Future_co = asyncio.Future["Command"] else: Future_co = asyncio.Future
[docs] class Command(StatusMixIn[CommandStatus], Future_co): """A command to be sent to the CAN controller. Implements a base class to define CAN commands to interact with the positioner. Commands can be composed of single or multiple messages. When sending a command to the bus, the first message is written to, then asynchronously waits for a confirmation that the message has been received before sending the following message. If any of the messages returns an error code the command is failed. `.Command` subclasses from `.StatusMixIn` and `.status_callback` gets called when the status changes. `.Command` is a `~asyncio.Future` and must be awaited. The `~asyncio.Future` is done when `~.Command.finish_command` is called, which happens when the status is marked done or cancelled or when the command timeouts. Commands sent to a single positioner are marked done when a reply is received from the same positioner for the given command, or when it times out. Broadcast commands only get marked done by timing out or manually. Parameters ---------- positioner_ids The id or list of ids of the robot(s) to which this command will be sent. Use ``positioner_ids=0`` to broadcast to all robots. loop The running event loop, or uses `~asyncio.get_event_loop`. timeout Time after which the command will be marked done when not all the positioners have replies. If `None`, the default timeout will be used. If timeout is a negative number, the command won't timeout until all the positioners have replied. done_callback A function to call when the command has been successfully completed. n_positioners If the command is a broadcast, the number of positioners that should reply. If defined, the command will be done once as many positioners have replied. Ignored for non-broadcasts. data The data to pass to the messages. If a list, each element will be sent to each positioner as a message. It can also be a dictionary of lists in which the key is the positioner to which to send the data. ignore_unknown Ignores ``UNKNOWN_COMMAND`` replies from positioners that do now support this command. """ #: The id of the command. command_id: CommandID #: Whether the command can be broadcast to all robots. broadcastable: bool = False #: The default timeout for this command. timeout: float = 5 #: Whether it's safe to execute this command when the FPS is locked. safe = False #: Whether this command produces a positioner move. move_command = False #: Whether the command is safe to be issues in bootloader mode. bootloader = False def __init__( self, positioner_ids: int | List[int], timeout: Optional[float] = None, done_callback: Optional[Callable] = None, n_positioners: Optional[int] = None, data: Union[None, data_co, Dict[int, data_co]] = None, ignore_unknown: bool = True, ): global COMMAND_UID assert self.broadcastable is not None, "broadcastable not set" assert self.command_id is not None, "command_id not set" if isinstance(positioner_ids, (list, tuple)): self.positioner_ids = list(positioner_ids) if len(positioner_ids) != len(set(positioner_ids)): raise JaegerError("The list of positioner_ids must be unique.") if len(positioner_ids) > 1 and 0 in positioner_ids: raise JaegerError("Broadcasts cannot be mixed with other positioners.") else: self.positioner_ids = [positioner_ids] if self.is_broadcast and self.broadcastable is False: raise JaegerError(f"Command {self.command_id.name} cannot be broadcast.") #: The data payload for the messages to send. if data is None: self.data = {pid: [bytearray()] for pid in self.positioner_ids} elif isinstance(data, bytearray): self.data = {pid: [data] for pid in self.positioner_ids} elif isinstance(data, (list, tuple)): self.data = {pid: data for pid in self.positioner_ids} elif isinstance(data, dict): self.data = {} for pid, value in data.items(): if value is None: self.data[pid] = [bytearray()] elif isinstance(value, (list, tuple)): self.data[pid] = value elif isinstance(value, bytearray): self.data[pid] = [value] else: raise ValueError(f"Invalid data {value!r}.") else: raise ValueError(f"Invalid data {data!r}.") if self.is_broadcast: if len(self.data[0]) > 1: raise CommandError("Broadcasts can only include a single data packet.") # Number of replies expected unless broadcast. if self.is_broadcast: if n_positioners: self._n_replies = len(self.data[0]) * n_positioners else: self._n_replies = None else: self._n_replies = sum([len(value) for value in self.data.values()]) if timeout is None: pass else: self.timeout = timeout if self.timeout < 0 and self._n_replies is None: raise CommandError( "In a broadcast a timeout is required unless n_positioners is set." ) #: A list of messages with the responses to this command. self.replies: List[Reply] = [] # Messages sent. self.messages = [] self.message_uids = [] # Generate a UUID for this command. self.command_uid = COMMAND_UID COMMAND_UID += 1 # Starting and end time self.start_time: float | None = None self.end_time: float | None = None # If this is the first time we run this command the pool will be empty. uid_bits = config["positioner"]["uid_bits"] command_pool = UID_POOL[self.command_id] if not self.is_broadcast: for pid in self.positioner_ids: if pid not in command_pool: command_pool[pid] = set(range(1, 2**uid_bits)) else: if 0 not in command_pool: command_pool[0] = set([0]) # What interface and bus this command should be sent to. Only relevant # for multibus interfaces. To be filled by the FPS class when queueing # the command. self._interfaces: Optional[List[BusABC]] = [] self._bus: Optional[int] = None self._done_callback = done_callback self._timeout_handle = None self._ignore_unknown = ignore_unknown self.loop = asyncio.get_event_loop() StatusMixIn.__init__( self, maskbit_flags=CommandStatus, initial_status=CommandStatus.READY, callback_func=self.status_callback, ) asyncio.Future.__init__(self) def __repr__(self): return ( f"<Command {self.command_id.name} " f"(positioner_ids={self.positioner_ids!r}, " f"status={self.status.name!r})>" ) def _log( self, msg, level=logging.DEBUG, command_id=None, positioner_ids=None, logs=[can_log], ): """Logs a message.""" command_id = command_id or self.command_id c_name = command_id.name pid = positioner_ids or self.positioner_ids msg = f"[{c_name}, {pid}, {self.command_uid!s}]: " + msg for ll in logs: ll.log(level, msg) @property def is_broadcast(self): """Returns `True` if the command is a broadcast.""" return self.positioner_ids == [0] @property def name(self): """Returns the name of this command.""" return CommandID(self.command_id).name def _check_replies(self): """Checks if the UIDs of the replies match the messages.""" sent_uids = self.message_uids replies_uids = [reply.uid for reply in self.replies] if self.is_broadcast: if self._n_replies is None: # This means it will timeout. return None else: sent_uids = [0] * self._n_replies assert self._n_replies, "_n_replies must be set." if len(self.replies) < self._n_replies: return None if len(self.replies) > self._n_replies: self._log( "command received more replies than messages. " "This should not be possible.", level=logging.ERROR, ) return False # Compares each message-reply UID. if sorted(sent_uids) != sorted(replies_uids): self._log( "the UIDs of the messages and replies do not match.", level=logging.ERROR, ) return False return True
[docs] async def process_reply(self, reply_message): """Watches the reply queue.""" reply = Reply(reply_message, command=self) # Return the UID to the pool. if not self.is_broadcast: UID_POOL[self.command_id][reply.positioner_id].add(reply.uid) pid = reply.positioner_id if self.status == CommandStatus.TIMEDOUT: self._log( f"received a reply from {pid} but the command has already timed out.", level=logging.ERROR, logs=[can_log], ) return elif self.status == CommandStatus.CANCELLED: return elif self.status != CommandStatus.RUNNING: self._log( f"received a reply from {pid} but command is not running", level=logging.ERROR, logs=[log, can_log], ) return if not self.is_broadcast: if reply.positioner_id not in self.positioner_ids: self._log( f"received a reply from {pid} from a non-commanded positioner.", level=logging.ERROR, logs=[log, can_log], ) return self.replies.append(reply) data_hex = binascii.hexlify(reply.data).decode() self._log( f"positioner {reply.positioner_id} replied with " f"id={reply.message.arbitration_id}, " f"UID={reply.uid}, " f"code={reply.response_code.name!r}, " f"data={data_hex!r}" ) code = reply.response_code COMMAND_ACCEPTED = ResponseCode.COMMAND_ACCEPTED UNKNOWN_COMMAND = ResponseCode.UNKNOWN_COMMAND if code != COMMAND_ACCEPTED: if not self._ignore_unknown or code != UNKNOWN_COMMAND: warnings.warn( f"Positioner {reply.positioner_id} replied to {self.name} " f"UID={self.command_uid} with {code.name!r}.", JaegerUserWarning, ) reply_status = self._check_replies() # If reply_status is True then a reply from each commanded positioner has # been received. If they are all COMMAND_ACCEPTED, mark the command as done. # If some are not COMMAND_ACCEPTED, check the invalid replies are all # UNKNOWN_COMMAND. In that case, if we are ignoring those, still mark as done. # Otherwise finish as failed. # If reply_status is False, that means we have received replies with UIDs that # do not match the UID of this command. This should not happens and it's most # likely a bug in the code. # If reply_status is None, we haven't yet matched the expected number of # replies and we just return. if reply_status is True: reply_codes = [reply.response_code for reply in self.replies] invalid = [code for code in reply_codes if code != COMMAND_ACCEPTED] ignore_unknown = self._ignore_unknown if not all([code == COMMAND_ACCEPTED for code in reply_codes]): if all([inv == UNKNOWN_COMMAND for inv in invalid]) and ignore_unknown: self.finish_command(CommandStatus.DONE) else: self.finish_command(CommandStatus.FAILED) else: self.finish_command(CommandStatus.DONE) elif reply_status is False: self.finish_command(CommandStatus.FAILED) else: return
[docs] def finish_command(self, status: CommandStatus, silent: bool = False): """Finishes a command, marking the Future as done. Parameters ---------- status The status to set the command to. If `None` the command will be set to `~.CommandStatus.DONE` if one reply for each message has been received, `~.CommandStatus.FAILED` otherwise. silent If `True`, issues error log messages as debug. """ if self._timeout_handle: self._timeout_handle.cancel() self._status = status if not self.done(): level = logging.WARNING if not silent else logging.DEBUG if not self.is_broadcast and self.status == CommandStatus.TIMEDOUT: self._log("this command timed out and it is not a broadcast.", level) elif self.status == CommandStatus.CANCELLED: self._log("command has been cancelled.", logging.DEBUG) elif self.status.failed: level = logging.ERROR if not silent else logging.DEBUG self._log(f"command finished with status {self.status.name!r}", level) elif self.status.timed_out and self._n_replies is not None: # Report the command timed out, but only if this is not a broadcast # and the number expected replies is not known. In those cases the # command will always time out and that's expected. level = logging.ERROR if not silent else logging.DEBUG self._log("command timed out.", level) # For good measure we return all the UIDs if self.is_broadcast: UID_POOL[self.command_id][0].add(0) else: for message in self.messages: UID_POOL[self.command_id][message.positioner_id].add(message.uid) self.set_result(self) self.end_time = time.time() is_done = self.status in [CommandStatus.TIMEDOUT, CommandStatus.DONE] if is_done and self._done_callback: if asyncio.iscoroutinefunction(self._done_callback): asyncio.create_task(self._done_callback()) else: self._done_callback() self._log(f"finished command with status {self.status.name!r}")
[docs] def status_callback(self): """Callback for change status. When the status gets set to `.CommandStatus.RUNNING` starts a timer that marks the command as done after `.timeout`. """ self._log(f"status changed to {self.status.name}") if self.status == CommandStatus.RUNNING: self.start_time = time.time() if self.timeout < 0: pass elif self.timeout == 0: self.finish_command(CommandStatus.TIMEDOUT) else: self._timeout_handle = self.loop.call_later( self.timeout, self.finish_command, CommandStatus.TIMEDOUT, ) elif self.status.is_done and not self.done(): self.finish_command(self.status)
def _generate_messages_internal(self): """Generates the list of messages to send to the bus for this command. This method is called by `.get_messages` and can be overridden in subclasses. Do not override `.get_messages` directly. """ cid = self.command_id messages: List[SuperMessage] = [] for pid in self.data: pid_data = self.data[pid] for d in pid_data: try: uid = UID_POOL[cid][pid].pop() except KeyError: # Before failing, put back the UIDs of the other messages for message in messages: UID_POOL[cid][pid].add(message.uid) raise EmptyPool("no UIDs left in the pool.") messages.append(SuperMessage(self, positioner_id=pid, uid=uid, data=d)) return messages
[docs] def get_messages(self, data=None): """Returns the list of messages associated with this command. Unless overridden, returns a single message with the associated data. """ if len(self.messages) > 0: raise CommandError("Messages have already been sent.") messages = self._generate_messages_internal() self.messages = messages self.message_uids = [message.uid for message in messages] return messages
[docs] def get_replies(self) -> Dict[int, Any]: """Returns the formatted replies as a dictionary. The values returned will depend on the specific command. """ return {}
[docs] def cancel(self, silent=False, msg=None): """Cancels a command, stopping the reply queue watcher.""" self.finish_command(CommandStatus.CANCELLED, silent=silent)