Source code for jaeger.positioner

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2018-10-07
# @Filename: positioner.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import logging

from typing import List, Optional, Tuple

from packaging.version import Version

import jaeger
from jaeger import config, log, maskbits
from jaeger.can import JaegerCAN
from jaeger.commands import CommandID
from jaeger.commands.bootloader import GetFirmwareVersion
from jaeger.commands.status import GetActualPosition
from jaeger.exceptions import JaegerError, PositionerError
from jaeger.utils import StatusMixIn, bytes_to_int


__all__ = ["Positioner"]


[docs] class Positioner(StatusMixIn): r"""Represents the status and parameters of a positioner. Parameters ---------- positioner_id The ID of the positioner fps The `~jaeger.fps.FPS` instance to which this positioner is linked to. centre The :math:`(x_{\rm focal}, y_{\rm focal})` coordinates of the central axis of the positioner. sextant The id of the sextant to which this positioner is connected. """ def __init__( self, positioner_id: int, fps: jaeger.FPS | None = None, centre: Tuple[Optional[float], Optional[float]] = (None, None), ): self.fps = fps self.positioner_id = positioner_id self.centre = centre self.alpha = None self.beta = None self.speed = (None, None) self.firmware: str | None = None self.disabled = False self.offline = False self.precise_moves = True super().__init__( maskbit_flags=maskbits.PositionerStatus, initial_status=maskbits.PositionerStatus.UNKNOWN, ) @property def position(self): """Returns a tuple with the ``(alpha, beta)`` position.""" return (self.alpha, self.beta) @property def collision(self): """Returns `True` if the positioner is collided.""" if not self.status: return False return self.status.collision @property def moving(self): """Returns `True` if the positioner is moving.""" if self.status.DISPLACEMENT_COMPLETED not in self.status: return True return False @property def initialised(self): """Returns ``True`` if the system and datums have been initialised.""" if self.offline: return True if self.status is None: return False if self.is_bootloader(): if self.status != maskbits.BootloaderStatus.UNKNOWN: return True return False if ( not self.status.initialised or self.flags.DATUM_ALPHA_INITIALIZED not in self.status or self.flags.DATUM_BETA_INITIALIZED not in self.status ): return False return True
[docs] def reset(self): """Resets positioner values and statuses.""" self.alpha = None self.beta = None self.status = self.flags.UNKNOWN self.firmware = None
def _log(self, message, level=logging.DEBUG): """Logs a message.""" log.log(level, f"Positioner {self.positioner_id}: {message}")
[docs] def get_bus(self) -> Tuple[int, int | None]: """Returns the interface index and bus.""" if self.fps is None: raise PositionerError("FPS is not defined.") if not isinstance(self.fps.can, JaegerCAN): raise PositionerError("CAN bus not found.") if not self.fps.can.multibus: return (0, None) iface, bus = self.fps.positioner_to_bus[self.positioner_id] iface_idx = self.fps.can.interfaces.index(iface) return (iface_idx, bus)
[docs] async def send_command(self, command, error: Optional[str] = None, **kwargs): """Sends and awaits a command to the FPS for this positioner.""" if not self.fps: raise PositionerError("FPS is not set.") command = await self.fps.send_command( command, positioner_ids=self.positioner_id, **kwargs, ) if error and (command.status.failed or command.status.timed_out): raise PositionerError(error) return command
[docs] async def update_position( self, position: Tuple[float, float] | None = None, timeout=1, ): """Updates the position of the alpha and beta arms.""" if position is None: command = await self.send_command( CommandID.GET_ACTUAL_POSITION, timeout=timeout, ) assert isinstance(command, GetActualPosition) if command.status.failed: self.alpha = self.beta = None raise PositionerError("failed updating position") try: position = command.get_positions()[self.positioner_id] except ValueError: raise PositionerError("cannot parse current position.") self.alpha, self.beta = position
[docs] async def update_status( self, status: maskbits.PositionerStatus | int | None = None, timeout=1.0, ): """Updates the status of the positioner.""" assert self.fps, "FPS is not set." # Need to update the firmware to make sure we get the right flags. if not self.firmware: await self.update_firmware_version() if not status: command = await self.send_command( CommandID.GET_STATUS, timeout=timeout, error="cannot get status.", ) n_replies = len(command.replies) if n_replies == 1: status = int(bytes_to_int(command.replies[0].data)) else: self.status = self.flags.UNKNOWN raise PositionerError(f"GET_STATUS received {n_replies} replies.") if not self.is_bootloader(): self.flags = self.get_positioner_flags() else: self.flags = maskbits.BootloaderStatus self.status = self.flags(int(status)) # Checks if the positioner is collided. If so, locks the FPS. # if not self.is_bootloader() and self.collision and not self.fps.locked: # await self.fps.lock() # raise PositionerError("collision detected. Locking the FPS.") return True
[docs] async def wait_for_status( self, status: List[maskbits.PositionerStatus], delay=1, timeout: Optional[float] = None, ) -> bool: """Polls the status until it reaches a certain value. Parameters ---------- status The status to wait for. Can be a list in which case it will wait until all the statuses in the list have been reached. delay Time, in seconds, to wait between position updates. timeout How many seconds to wait for the status to reach the desired value before aborting. Returns ------- result Returns `True` if the status has been reached or `False` if the timeout limit was reached. """ if self.is_bootloader(): raise JaegerError("wait_for_status cannot be scheduled in bootloader mode.") if not isinstance(status, (list, tuple)): status = [status] async def status_waiter(wait_for_status): while True: await self.update_status() # Check all statuses in the list all_reached = True for ss in wait_for_status: if ss not in self.status: all_reached = False break if all_reached: return await asyncio.sleep(delay) wait_for_status = [self.flags(int(ss)) for ss in status] try: await asyncio.wait_for(status_waiter(wait_for_status), timeout) except asyncio.TimeoutError: return False return True
[docs] async def initialise(self, disable_precise_moves=False): """Initialises the position watcher.""" if self.offline is True: return True # Resets all. self.reset() await self.update_firmware_version() await self.update_status() # Exits if we are in bootloader mode. if self.is_bootloader(): return True if not self.initialised: raise PositionerError("failed initialising.") # Update position only if it's not bootloader. await self.update_position() # Sets the default speed and disable precise moves if not self.disabled: await self.set_speed( alpha=config["positioner"]["motor_speed"], beta=config["positioner"]["motor_speed"], ) if self.firmware and Version(self.firmware) < Version("04.01.17"): self._log("Disabling precise moves requires >=04.01.17", logging.DEBUG) else: await self.set_precise_move(mode=not disable_precise_moves) else: self.precise_moves = not disable_precise_moves self._log("initialisation complete.") return True
[docs] async def update_firmware_version(self): """Updates the firmware version.""" command = await self.send_command( CommandID.GET_FIRMWARE_VERSION, error="failed retrieving firmware version.", ) assert isinstance(command, GetFirmwareVersion) firmware = command.get_firmware()[self.positioner_id] assert firmware is not None and isinstance(firmware, str) self.firmware = firmware self.flags = self.get_positioner_flags() self._log(f"firmware {self.firmware}") return True
[docs] def get_positioner_flags(self): """Returns the correct position maskbits from the firmware version.""" assert self.firmware, "Firmware is not set." if self.is_bootloader(): return maskbits.BootloaderStatus if Version(self.firmware) < Version("04.01.00"): return maskbits.PositionerStatusV4_0 else: return maskbits.PositionerStatus
[docs] def is_bootloader(self): """Returns True if we are in bootloader mode.""" if self.offline: return False if self.firmware is None: return None return self.firmware.split(".")[1] == "80"
[docs] async def set_position(self, alpha: float, beta: float): """Sets the internal position of the motors.""" set_position_command = await self.send_command( CommandID.SET_ACTUAL_POSITION, alpha=float(alpha), beta=float(beta), error="failed setting position.", ) self._log(f"position set to ({alpha:.2f}, {beta:.2f})") return set_position_command
[docs] async def set_speed(self, alpha: float, beta: float, force=False): """Sets motor speeds. Parameters ---------- alpha The speed of the alpha arm, in RPM on the input. beta The speed of the beta arm, in RPM on the input. force Allows to set speed limits outside the normal range. """ MIN_SPEED = 0 MAX_SPEED = 5000 if ( alpha < MIN_SPEED or alpha > MAX_SPEED or beta < MIN_SPEED or beta > MAX_SPEED ) and not force: raise PositionerError("speed out of limits.") speed_command = await self.send_command( CommandID.SET_SPEED, alpha=float(alpha), beta=float(beta), error="failed setting speed.", ) self.speed = (alpha, beta) self._log(f"speed set to ({alpha:.2f}, {beta:.2f})") return speed_command
[docs] async def set_precise_move(self, mode, alpha=True, beta=True): """Switches the precise moves on alpha and beta.""" cmds = [] if not alpha and not beta: raise PositionerError("positioneralpha or beta need to be True.") if alpha: if mode is True: cmds.append(self.send_command(CommandID.SWITCH_ON_PRECISE_MOVE_ALPHA)) else: cmds.append(self.send_command(CommandID.SWITCH_OFF_PRECISE_MOVE_ALPHA)) if beta: if mode is True: cmds.append(self.send_command(CommandID.SWITCH_ON_PRECISE_MOVE_BETA)) else: cmds.append(self.send_command(CommandID.SWITCH_OFF_PRECISE_MOVE_BETA)) cmds = await asyncio.gather(*cmds) if any([cmd.status != maskbits.CommandStatus.DONE for cmd in cmds]): raise PositionerError("failed switching precise moves.") self.precise_moves = mode return True
def _can_move(self): """Returns `True` if the positioner can be moved.""" if self.moving or self.collision or not self.initialised or not self.status: return False if self.flags == maskbits.PositionerStatusV4_0: return True else: PS = maskbits.PositionerStatus invalid_bits = [ PS.COLLISION_DETECT_ALPHA_DISABLE, PS.COLLISION_DETECT_BETA_DISABLE, ] for b in invalid_bits: if b in self.status: self._log(f"canot move; found status bit {b.name}.", logging.ERROR) return False if ( PS.CLOSED_LOOP_ALPHA not in self.status or PS.CLOSED_LOOP_BETA not in self.status ): self._log( "canot move; positioner not in closed loop mode.", logging.ERROR, ) return False return True async def _goto_position(self, alpha: float, beta: float, relative=False): """Go to a position.""" if relative: command_id = CommandID.GO_TO_RELATIVE_POSITION else: command_id = CommandID.GO_TO_ABSOLUTE_POSITION return await self.send_command( command_id, alpha=float(alpha), beta=float(beta), error="failed going to position.", )
[docs] async def home(self, alpha: bool = True, beta: bool = True): """Homes the positioner. Zeroes the positioner by counter-clockwise rotating alpha and beta until they hit the hardstops. Blocks until the move is complete. Parameters ---------- alpha Home the alpha arm. beta Home the beta arm. """ if self.moving: raise PositionerError("positioner is already moving.") if not self.fps: raise PositionerError("the positioner is not linked to a FPS instance.") if alpha and beta: await self.send_command( "GO_TO_DATUMS", error="failed while sending GO_TO_DATUMS command.", ) elif alpha: await self.send_command( "GO_TO_DATUM_ALPHA", error="failed while sending GO_TO_DATUMS_ALPHA command.", ) elif beta: await self.send_command( "GO_TO_DATUM_BETA", error="failed while sending GO_TO_DATUMS_BETA command.", ) self._log("waiting to home.") await self.wait_for_status(self.flags.DISPLACEMENT_COMPLETED) self._log("homed.", logging.INFO)
[docs] async def set_loop(self, motor="both", loop="closed", collisions=True): """Sets the control loop for a motor. These parameters are cleared after a restart. The motors revert to closed loop with collision detection. Parameters ---------- motor The motor to which these changes apply, either ``'alpha`'``, ``'beta'``, or ``'both'``. loop The type of control loop, either ``'open'`` or ``'closed'``. collisions Whether the firmware should automatically detect collisions and stop the positioner. """ if motor == "both": motors = ["alpha", "beta"] else: motors = [motor] for motor in motors: command_name = motor.upper() + "_" + loop.upper() + "_LOOP" if collisions: command_name += "_COLLISION_DETECTION" else: command_name += "_WITHOUT_COLLISION_DETECTION" await self.send_command( command_name, error=f"failed setting loop for {motor}." ) self._log( f"set motor={motor!r}, loop={loop!r}, " f"detect_collision={collisions}", ) return True
def __repr__(self): return ( f"<Positioner (id={self.positioner_id}, " f"status={self.status!s}, initialised={self.initialised})>" )
[docs] async def get_number_trajectories(self) -> int | None: """Returns the number of trajectories executed by the positioner. Will return `None` if the firmware does not support the ``GET_NUMBER_TRAJECTORIES``. """ if not self.firmware: return None if Version(self.firmware) < Version("04.01.21"): return None assert self.fps cmd = await self.fps.send_command( CommandID.GET_NUMBER_TRAJECTORIES, positioner_ids=[self.positioner_id], timeout=1, ) n_traj = cmd.get_replies()[self.positioner_id] return n_traj