Source code for jaeger.commands.status

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2018-08-27
# @Filename: status.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

from typing import Dict, List, Tuple

from jaeger.commands import Command, CommandID
from jaeger.utils import bytes_to_int, int_to_bytes, motor_steps_to_angle


__all__ = [
    "GetID",
    "GetStatus",
    "GetCurrent",
    "GetActualPosition",
    "GetNumberTrajectories",
    "SetNumberTrajectories",
    "SwitchLEDOn",
    "SwitchLEDOff",
    "GetTemperature",
]


[docs] class GetID(Command): """Commands the positioners to reply with their positioner id.""" command_id = CommandID.GET_ID broadcastable = True timeout = 1 safe = True bootloader = True
[docs] def get_ids(self) -> List: """Returns a list of positioners that replied back.""" return [reply.positioner_id for reply in self.replies]
[docs] class GetStatus(Command): """Gets the status bits for the positioner.""" command_id = CommandID.GET_STATUS broadcastable = True safe = True bootloader = True
[docs] def get_replies(self) -> Dict[int, int]: return self.get_positioner_status()
[docs] def get_positioner_status(self) -> Dict[int, int]: """Returns the status flag for each positioner that replied.""" return {reply.positioner_id: bytes_to_int(reply.data) for reply in self.replies}
[docs] class GetActualPosition(Command): """Gets the current position of the alpha and beta arms.""" command_id = CommandID.GET_ACTUAL_POSITION broadcastable = False safe = True
[docs] def get_replies(self) -> Dict[int, Tuple[float, float]]: return self.get_positions()
[docs] def get_positions(self) -> Dict[int, Tuple[float, float]]: """Returns the positions of alpha and beta in degrees. Raises ------ ValueError If no reply has been received or the data cannot be parsed. """ positions = {} for reply in self.replies: pid = reply.positioner_id data = reply.data beta = bytes_to_int(data[4:], dtype="i4") alpha = bytes_to_int(data[0:4], dtype="i4") positions[pid] = motor_steps_to_angle(alpha, beta) return positions
[docs] @staticmethod def encode(alpha, beta): """Returns the position as a bytearray in positioner units.""" alpha_motor, beta_motor = motor_steps_to_angle(alpha, beta, inverse=True) alpha_bytes = int_to_bytes(int(alpha_motor), "i4") beta_bytes = int_to_bytes(int(beta_motor), "i4") data = alpha_bytes + beta_bytes return data
class GetCurrent(Command): """Gets the current of the alpha and beta motors.""" command_id = CommandID.GET_CURRENT broadcastable = False safe = True def get_replies(self) -> Dict[int, Tuple[int, int]]: return self.get_currents() def get_currents(self) -> Dict[int, Tuple[int, int]]: """Returns a dictionary of current of alpha and beta for each positioner. Raises ------ ValueError If no reply has been received or the data cannot be parsed. """ currents = {} for reply in self.replies: data = reply.data alpha = bytes_to_int(data[0:4], dtype="i4") beta = bytes_to_int(data[4:], dtype="i4") currents[reply.positioner_id] = (alpha, beta) return currents class GetTemperature(Command): """Gets the temperature from the board temperature sensor, in C.""" command_id = CommandID.GET_RAW_TEMPERATURE broadcastable = False safe = True def get_replies(self) -> Dict[int, int]: return self.get_replies() def get_temperatures(self) -> Dict[int, int]: """Returns the temperature in Celsius. Raises ------ ValueError If no reply has been received or the data cannot be parsed. """ temperatures = {} for reply in self.replies: data = self.replies[0].data rawT = bytes_to_int(data, dtype="u4") # Raw temperature temperatures[reply.positioner_id] = rawT return temperatures class SwitchLEDOn(Command): """Switched the positioner LED on.""" command_id = CommandID.SWITCH_LED_ON broadcastable = False safe = True class SwitchLEDOff(Command): """Switched the positioner LED on.""" command_id = CommandID.SWITCH_LED_ON broadcastable = False safe = True class GetNumberTrajectories(Command): command_id = CommandID.GET_NUMBER_TRAJECTORIES broadcastable = False safe = True def get_replies(self): return self.get_number_trajectories() def get_number_trajectories(self) -> Dict[int, int | None]: """Returns the number of trajectories. Raises ------ ValueError If no reply has been received or the data cannot be parsed. """ number_trajectories = {} for reply in self.replies: data = reply.data if data == bytearray(): number_trajectories[reply.positioner_id] = None else: number_trajectories[reply.positioner_id] = bytes_to_int(data) return number_trajectories class SetNumberTrajectories(Command): command_id = CommandID.SET_NUMBER_TRAJECTORIES broadcastable = False safe = False def __init__( self, positioner_ids: int | List[int], moves: int | Dict[int, int], **kwargs, ): if isinstance(moves, int): data = int_to_bytes(moves) elif isinstance(moves, dict): data = {pos: int_to_bytes(moves[pos]) for pos in moves} else: raise TypeError("Invalid data type.") kwargs["data"] = data super().__init__(positioner_ids, **kwargs)