Source code for jaeger.commands.calibration

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2020-07-15
# @Filename: calibration.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import struct

from typing import TYPE_CHECKING, Dict, List, Tuple

import numpy

from jaeger import config, log
from jaeger.commands import Command, CommandID
from jaeger.exceptions import JaegerError
from jaeger.maskbits import PositionerStatus as PS
from jaeger.utils import bytes_to_int, int_to_bytes, motor_steps_to_angle


if TYPE_CHECKING:
    from jaeger import FPS


__all__ = [
    "calibrate_positioners",
    "StartDatumCalibration",
    "StartMotorCalibration",
    "StartCoggingCalibration",
    "SaveInternalCalibration",
    "HallOn",
    "HallOff",
]


MOTOR_STEPS = config["positioner"]["motor_steps"]


[docs] async def calibrate_positioners( fps: FPS, axis: str, positioner_ids: int | list[int], motors: bool = True, datums: bool = True, cogging: bool = True, ): """Runs the calibration process and saves it to the internal memory. Parameters ---------- fps The instance of `.FPS` that will receive the trajectory. axis The axis to calibrate, either `"alpha"`, `"beta"`, or `"both"`. positioner_id The ID of the positioner(s) to calibrate. motors Whether to perform the motor calibration. datums Whether to perform the datums calibration. cogging Whether to perform the cogging calibration (may take more than one hour). Raises ------ JaegerError If encounters a problem during the process. Examples -------- :: >>> fps = FPS() >>> await fps.initialise() # Calibrate positioner 31. >>> await calibrate_positioner(fps, 31) """ log.info(f"Calibrating positioner(s) {positioner_ids}.") if axis not in ["alpha", "beta", "both"]: raise JaegerError(f"Invalid axis {axis!r}.") if isinstance(positioner_ids, int): positioner_ids = [positioner_ids] for positioner_id in positioner_ids: if positioner_id not in fps.positioners: raise JaegerError(f"Positioner {positioner_id} not found.") if fps.pollers.running: log.debug("Stopping pollers") await fps.pollers.stop() if motors: log.info("Starting motor calibration.") if axis == "alpha": motor_command = CommandID.START_MOTOR_CALIBRATION_ALPHA elif axis == "beta": motor_command = CommandID.START_MOTOR_CALIBRATION_BETA else: motor_command = CommandID.START_MOTOR_CALIBRATION cmd = await fps.send_command(motor_command, positioner_ids=positioner_ids) if cmd.status.failed: raise JaegerError("Motor calibration failed.") await asyncio.sleep(1) statuses = [ PS.DISPLACEMENT_COMPLETED, PS.MOTOR_ALPHA_CALIBRATED, PS.MOTOR_BETA_CALIBRATED, ] await _wait_status(fps, positioner_ids, statuses) else: log.warning("Skipping motor calibration.") if datums: log.info("Starting datum calibration.") if axis == "alpha": datums_command = CommandID.START_DATUM_CALIBRATION_ALPHA elif axis == "beta": datums_command = CommandID.START_DATUM_CALIBRATION_BETA else: datums_command = CommandID.START_DATUM_CALIBRATION cmd = await fps.send_command(datums_command, positioner_ids=positioner_ids) if cmd.status.failed: raise JaegerError("Datum calibration failed.") await asyncio.sleep(1) statuses = [ PS.DISPLACEMENT_COMPLETED, PS.DATUM_ALPHA_CALIBRATED, PS.DATUM_BETA_CALIBRATED, ] await _wait_status(fps, positioner_ids, statuses) else: log.warning("Skipping datum calibration.") if cogging: log.info("Starting cogging calibration.") if axis == "alpha": cogging_command = CommandID.START_COGGING_CALIBRATION_ALPHA elif axis == "beta": cogging_command = CommandID.START_COGGING_CALIBRATION_BETA else: cogging_command = CommandID.START_COGGING_CALIBRATION cmd = await fps.send_command(cogging_command, positioner_ids=positioner_ids) if cmd.status.failed: raise JaegerError("Cogging calibration failed.") await asyncio.sleep(1) statuses = [PS.COGGING_ALPHA_CALIBRATED, PS.COGGING_BETA_CALIBRATED] await _wait_status(fps, positioner_ids, statuses) else: log.warning("Skipping cogging calibration.") if motors or datums or cogging: log.info("Saving calibration.") cmd = await fps.send_command( CommandID.SAVE_INTERNAL_CALIBRATION, positioner_ids=positioner_ids, ) if cmd.status.failed: raise JaegerError("Saving calibration failed.") log.info(f"Positioners {positioner_ids} have been calibrated.") return
async def _wait_status(fps: FPS, positioner_ids: list[int], statuses: list[PS]): """Waits for status.""" wait_for_status_tasks = [] for positioner_id in positioner_ids: positioner = fps.positioners[positioner_id] wait_for_status_tasks.append(positioner.wait_for_status(statuses)) await asyncio.gather(*wait_for_status_tasks)
[docs] class StartDatumCalibration(Command): """Indicates that the transmission for the trajectory has ended.""" command_id = CommandID.START_DATUM_CALIBRATION broadcastable = False move_command = True
[docs] class StartMotorCalibration(Command): """Aborts sending a trajectory.""" command_id = CommandID.START_MOTOR_CALIBRATION broadcastable = False move_command = True
[docs] class StartCoggingCalibration(Command): """Starts the trajectories.""" command_id = CommandID.START_COGGING_CALIBRATION broadcastable = False move_command = True
[docs] class SaveInternalCalibration(Command): """Stop the trajectories.""" command_id = CommandID.SAVE_INTERNAL_CALIBRATION broadcastable = False move_command = False
class GetOffset(Command): """Gets the motor offsets.""" command_id = CommandID.GET_OFFSETS broadcastable = False safe = True def get_replies(self) -> Dict[int, numpy.ndarray]: return self.get_offsets() def get_offsets(self) -> Dict[int, numpy.ndarray]: """Returns the alpha and beta offsets, in degrees. Raises ------ ValueError If no reply has been received or the data cannot be parsed. """ offsets = {} for reply in self.replies: pid = reply.positioner_id data = reply.data alpha = bytes_to_int(data[0:4], dtype="i4") beta = bytes_to_int(data[4:], dtype="i4") offsets[pid] = numpy.array(motor_steps_to_angle(alpha, beta)) return offsets class SetOffsets(Command): """Sets the motor offsets.""" command_id = CommandID.SET_OFFSETS broadcastable = False safe = True move_command = False def __init__( self, positioner_ids: int | List[int], alpha=None, beta=None, **kwargs, ): if alpha is not None and beta is not None: alpha_steps, beta_steps = motor_steps_to_angle(alpha, beta, inverse=True) data = int_to_bytes(int(alpha_steps)) + int_to_bytes(int(beta_steps)) kwargs["data"] = data super().__init__(positioner_ids, **kwargs)
[docs] class HallOn(Command): """Turns hall sensors ON.""" command_id = CommandID.HALL_ON broadcastable = False move_command = False safe = True
[docs] class HallOff(Command): """Turns hall sensors ON.""" command_id = CommandID.HALL_OFF broadcastable = False move_command = False safe = True
class SetHoldingCurrents(Command): """Sets the motors holding currents.""" command_id = CommandID.SET_HOLDING_CURRENT broadcastable = False safe = True move_command = False def __init__(self, positioner_ids, alpha=None, beta=None, **kwargs): if alpha is not None and beta is not None: data = int_to_bytes(int(alpha)) + int_to_bytes(int(beta)) kwargs["data"] = data super().__init__(positioner_ids, **kwargs) class GetHoldingCurrents(Command): """Gets the motor offsets.""" command_id = CommandID.GET_HOLDING_CURRENT broadcastable = False safe = True def get_replies(self) -> Dict[int, numpy.ndarray]: return self.get_holding_currents() def get_holding_currents(self) -> Dict[int, numpy.ndarray]: """Returns the alpha and beta holding currents, in percent. Raises ------ ValueError If no reply has been received or the data cannot be parsed. """ currents = {} for reply in self.replies: data = reply.data alpha = bytes_to_int(data[0:4], dtype="i4") beta = bytes_to_int(data[4:], dtype="i4") currents[reply.positioner_id] = numpy.array([alpha, beta]) return currents class PreciseMoveAlphaOn(Command): """Turns precise move on alpha ON.""" command_id = CommandID.SWITCH_ON_PRECISE_MOVE_ALPHA broadcastable = False move_command = False safe = True class PreciseMoveAlphaOff(Command): """Turns precise move on alpha OFF.""" command_id = CommandID.SWITCH_OFF_PRECISE_MOVE_ALPHA broadcastable = False move_command = False safe = True class PreciseMoveBetaOn(Command): """Turns precise move on beta ON.""" command_id = CommandID.SWITCH_ON_PRECISE_MOVE_BETA broadcastable = False move_command = False safe = True class PreciseMoveBetaOff(Command): """Turns precise move on beta OFF.""" command_id = CommandID.SWITCH_OFF_PRECISE_MOVE_BETA broadcastable = False move_command = False safe = True class SetIncreaseCollisionMargin(Command): """Sets the buffer for collision margin.""" command_id = CommandID.SET_INCREASE_COLLISION_MARGIN broadcastable = False move_command = False safe = False def __init__(self, positioner_ids, margin: int, **kwargs): data = int_to_bytes(int(margin), dtype="i4") kwargs["data"] = data super().__init__(positioner_ids, **kwargs) class GetAlphaHallCalibration(Command): command_id = CommandID.GET_ALPHA_HALL_CALIB broadcastable = False move_command = False safe = True def get_replies(self) -> Dict[int, Tuple[int, int, int, int]]: return self.get_values() def get_values(self) -> dict[int, Tuple[int, int, int, int]]: """Returns the ``maxA, maxB, minA, minB`` values.""" values = {} for reply in self.replies: values[reply.positioner_id] = struct.unpack("HHHH", reply.data) return values class GetBetaHallCalibration(Command): command_id = CommandID.GET_BETA_HALL_CALIB broadcastable = False move_command = False safe = True def get_replies(self) -> Dict[int, Tuple[int, int, int, int]]: return self.get_values() def get_values(self) -> dict[int, Tuple[int, int, int, int]]: """Returns the ``maxA, maxB, minA, minB`` values.""" values = {} for reply in self.replies: values[reply.positioner_id] = struct.unpack("HHHH", reply.data) return values class GetHallCalibrationError(Command): command_id = CommandID.GET_HALL_CALIB_ERROR broadcastable = False move_command = False safe = True def get_replies(self) -> Dict[int, Tuple[int, int]]: return self.get_values() def get_values(self) -> dict[int, Tuple[int, int]]: """Returns the alpha and beta error values.""" values = {} for reply in self.replies: values[reply.positioner_id] = struct.unpack("ii", reply.data) return values