Source code for jaeger.commands.trajectory

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author: José Sánchez-Gallego (
# @Date: 2018-10-08
# @Filename:
# @License: BSD 3-clause (

from __future__ import annotations

import asyncio
import json
import os
import pathlib
import time
import warnings
from glob import glob

from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, cast

import numpy
from astropy.time import Time

import drift
from sdsstools import read_yaml_file

from jaeger import config, log
from jaeger.commands import Command, CommandID
from jaeger.exceptions import JaegerUserWarning, TrajectoryError
from jaeger.ieb import IEB
from jaeger.maskbits import FPSStatus, ResponseCode
from jaeger.utils import int_to_bytes

    from clu.command import Command as CluCommand

    from jaeger import FPS
    from import JaegerActor

__all__ = [

MOTOR_STEPS = config["positioner"]["motor_steps"]
TIME_STEP = config["positioner"]["time_step"]

TrajectoryDataType = Dict[int, Dict[str, List[Tuple[float, float]]]]

[docs] async def send_trajectory( fps: FPS, trajectories: str | pathlib.Path | TrajectoryDataType, use_sync_line: bool | None = None, send_trajectory: bool = True, start_trajectory: bool = True, command: Optional[CluCommand[JaegerActor]] = None, dump: bool | str = True, extra_dump_data: dict[str, Any] = {}, ) -> Trajectory: """Sends a set of trajectories to the positioners. This is a low-level function that raises errors when a problem is encountered. Most users should use `.FPS.send_trajectory` instead. `.send_trajectory` automates `.Trajectory` by calling the different methods in order, but provides less control. Parameters ---------- fps The instance of `.FPS` that will receive the trajectory. trajectories Either a path to a YAML file to read or a dictionary with the trajectories. In either case the format must be a dictionary in which the keys are the ``positioner_ids`` and each value is a dictionary containing two keys: ``alpha`` and ``beta``, each pointing to a list of tuples ``(position, time)``, where ``position`` is in degrees and ``time`` is in seconds. use_sync_line If `True`, the SYNC line will be used to synchronise the beginning of all trajectories. Otherwise a `.START_TRAJECTORY` command will be sent over the CAN network. If `None`, defaults to the configuration parameter. send_trajectory If `True`, sends the trajectory to the positioners and returns the `.Trajectory` instance. start_trajectory If `True`, runs the trajectory after sending it. Otherwise, returns the `.Trajectory` instance after sending the data. Ignored if `send_trajectory=False`. command A command to which to output informational messages. dump Whether to dump a JSON with the trajectory to disk. If `True`, the trajectory is stored in ``/data/logs/jaeger/trajectories/<MJD>`` with a unique sequence for each trajectory. A string can be passed with a custom path. The dump file is created only if `.Trajectory.start` is called, regardless of whether the trajectory succeeds. extra_dump_data A dictionary with additional parameters to add to the dump JSON. Raises ------ TrajectoryError If encounters a problem sending the trajectory. FPSLockedError If the FPS is locked. Examples -------- :: >>> fps = FPS() >>> await fps.initialise() # Send a trajectory with two points for positioner 4 >>> await fps.send_trajectory({4: {'alpha': [(90, 0), (91, 3)], 'beta': [(20, 0), (23, 4)]}}) """ traj = Trajectory( fps, trajectories, dump=dump, extra_dump_data=extra_dump_data, ) if use_sync_line is None: use_sync_line = config["fps"]["use_sync_line"] assert isinstance(use_sync_line, bool) if use_sync_line: if not isinstance(fps.ieb, IEB) or fps.ieb.disabled: raise TrajectoryError("IEB is not connected. Cannot use SYNC line.", traj) if (await fps.ieb.get_device("sync").read())[0] == "closed": raise TrajectoryError("The SYNC line is on high.", traj) if send_trajectory is False: return traj msg = "Sending trajectory data." log.debug(msg) if command: command.debug(msg) try: await traj.send() except TrajectoryError as err: raise TrajectoryError( f"Something went wrong sending the trajectory: {err}", err.trajectory, ) msg = f"Trajectory sent in {traj.data_send_time:.1f} seconds." log.debug(msg) if command: command.debug(msg)"Expected time to complete trajectory: {traj.move_time:.2f} seconds.") if command and traj.move_time:, 2)) if start_trajectory is False: if traj.dump_file: traj.dump_trajectory() if command: command.debug(trajectory_dump_file=traj.dump_file) return traj msg = "Starting trajectory ..." if command: try: await traj.start(use_sync_line=use_sync_line) except TrajectoryError as err: if traj.start_time is not None and traj.end_time is not None: elapsed = traj.end_time - traj.start_time elapsed_msg = f" Trajectory failed {elapsed:.2f} seconds after start." else: elapsed_msg = "" raise TrajectoryError( f"Something went wrong during the trajectory: {err}.{elapsed_msg}", err.trajectory, ) finally: if traj.dump_file and command: command.debug(trajectory_dump_file=traj.dump_file) if command: fps.is_folded())) msg = "All positioners have reached their destinations." if command: return traj
[docs] class Trajectory(object): """Prepares and sends a trajectory to the FPS. Most user will prefer using `.FPS.send_trajectory`, which automates the process. This class provides fine-grain control over the process. Parameters ---------- fps The instance of `.FPS` that will receive the trajectory. trajectories Either a path to a YAML file to read or a dictionary with the trajectories. In either case the format must be a dictionary in which the keys are the ``positioner_ids`` and each value is a dictionary containing two keys: ``alpha`` and ``beta``, each pointing to a list of tuples ``(position, time)``, where ``position`` is in degrees and ``time`` is in seconds. dump Whether to dump a JSON with the trajectory to disk. If `True`, the trajectory is stored in ``/data/logs/jaeger/trajectories/<MJD>`` with a unique sequence for each trajectory. A string can be passed with a custom path. The dump file is created only if `.Trajectory.start` is called, regardless of whether the trajectory succeeds. extra_dump_data A dictionary with additional parameters to add to the dump JSON. Raises ------ TrajectoryError If encounters a problem sending the trajectory. FPSLockedError If the FPS is locked. Examples -------- Given the following two-point trajectory for positioner 4 :: points = {4: {'alpha': [(90, 0), (91, 3)], 'beta': [(20, 0), (23, 4)]}} the normal process to execute the trajectory is :: trajectory = Trajectory(fps, points) await trajectory.send() # Sends the trajectory but does not yet execute it. await trajectory.start() # This starts the trajectory (positioners move). """ def __init__( self, fps: FPS, trajectories: str | pathlib.Path | TrajectoryDataType, dump: bool | str = True, extra_dump_data: dict[str, Any] = {}, ): self.fps = fps self.trajectories: TrajectoryDataType if isinstance(trajectories, (str, pathlib.Path)): path = pathlib.Path(trajectories) if path.suffix == ".json": self.trajectories = json.loads(open(path, "r").read())["trajectory"] else: self.trajectories = cast(dict, read_yaml_file(trajectories)) elif isinstance(trajectories, dict): self.trajectories = trajectories else: raise TrajectoryError("invalid trajectory data.", self) # List of positioners that failed receiving the trajectory and reason. self.failed_positioners: dict[int, str] = {} self.validate() #: Number of points sent to each positioner as a tuple ``(alpha, beta)``. self.n_points = {} #: The time required to complete the trajectory. self.move_time: float | None = None #: How long it took to send the trajectory. self.data_send_time: float | None = None self.failed = False self.send_new_trajectory_failed = False # Commands that will be sent. Mostly for inspection if the trajectory fails. self.data_send_cmd: Command | None = None self.end_traj_cmds: Command | None = None self.start_time: float | None = None self.end_time: float | None = None self.use_sync_line: bool = True self._ready_to_start = False self.dump_data = { "start_time": time.time(), "success": False, "trajectory_send_time": None, "trajectory_start_time": None, "end_time": None, "use_sync_line": True, "extra": extra_dump_data, "trajectory": self.trajectories, "initial_positions": self.fps.get_positions_dict(), "final_positions": {}, } if dump is False: self.dump_file = None elif isinstance(dump, (str, pathlib.Path)): self.dump_file = str(dump) else: dirname = config["positioner"]["trajectory_dump_path"] mjd = str(int( dirname = os.path.join(dirname, mjd) files = list(sorted(glob(os.path.join(dirname, "*.json")))) if len(files) == 0: seq = 1 else: seq = int(files[-1].split("-")[-1].split(".")[0]) + 1 self.dump_file = os.path.join(dirname, f"trajectory-{mjd}-{seq:04d}.json")
[docs] def validate(self): """Validates the trajectory.""" if len(self.trajectories) == 0: raise TrajectoryError("trajectory is empty.", self) if len(self.trajectories) != len(numpy.unique(list(self.trajectories))): raise TrajectoryError("Duplicate positioner trajectories.", self) for pid in self.trajectories: trajectory = self.trajectories[pid] if "alpha" not in trajectory or "beta" not in trajectory: self.failed_positioners[pid] = "NO_DATA" raise TrajectoryError( f"Positioner {pid} missing alpha or beta data.", self, ) for arm in ["alpha", "beta"]: data = numpy.array(list(zip(*trajectory[arm]))[0]) if arm == "beta": if config.get("safe_mode", False): if config["safe_mode"] is True: min_beta = 160 else: min_beta = config["safe_mode"]["min_beta"] if numpy.any(data < min_beta): self.failed_positioners[pid] = "SAFE_MODE" raise TrajectoryError( f"Positioner {pid}: safe mode is on " f"and beta < {min_beta}.", self, )
[docs] async def send(self): """Sends the trajectory but does not start it.""" if self.fps.locked: raise TrajectoryError(f"FPS is locked by {self.fps.locked_by}.", self) self.move_time = 0.0 await self.fps.stop_trajectory() await self.fps.stop_trajectory(clear_flags=True) if not await self.fps.update_status(timeout=1.0): self.failed = True raise TrajectoryError("Some positioners did not respond.", self) if self.fps.moving: raise TrajectoryError("The FPS is moving. Cannot send trajectory.", self) # Check that all positioners are ready to receive a new trajectory. for pos_id in self.trajectories: positioner = self.fps.positioners[pos_id] status = positioner.status if positioner.disabled: raise TrajectoryError( f"positioner_id={pos_id} is disabled/offline but was " "included in the trajectory.", self, ) if ( positioner.flags.DATUM_ALPHA_INITIALIZED not in status or positioner.flags.DATUM_BETA_INITIALIZED not in status or positioner.flags.DISPLACEMENT_COMPLETED not in status ): self.failed = True self.failed_positioners[pos_id] = "NOT_READY" raise TrajectoryError( f"positioner_id={pos_id} is not ready to receive a trajectory.", self, ) traj_pos = self.trajectories[pos_id] self.n_points[pos_id] = (len(traj_pos["alpha"]), len(traj_pos["beta"])) # Gets maximum time for this positioner max_time_pos = max( [ max(list(zip(*traj_pos["alpha"]))[1]), max(list(zip(*traj_pos["beta"]))[1]), ] ) # Updates the global trajectory max time. if max_time_pos > self.move_time: self.move_time = max_time_pos new_traj_data = {} for pos_id in self.trajectories: data = SendNewTrajectory.get_data( self.n_points[pos_id][0], self.n_points[pos_id][1], ) new_traj_data[pos_id] = data # Starts trajectory new_traj_cmd = await self.fps.send_command( "SEND_NEW_TRAJECTORY", positioner_ids=list(self.trajectories), data=new_traj_data, ) if new_traj_cmd.status.failed or new_traj_cmd.status.timed_out: self.failed = True raise TrajectoryError("Failed sending SEND_NEW_TRAJECTORY.", self) start_trajectory_send_time = time.time() # How many points from the trajectory are we putting in each command. n_chunk = config["positioner"]["trajectory_data_n_points"] # Gets the maximum number of points for each arm for all positioners. max_points = numpy.max(list(self.n_points.values()), axis=0) max_points = {"alpha": max_points[0], "beta": max_points[1]} # Send chunks of size n_chunk to all the positioners in parallel. # Do alpha first, then beta. for arm in ["alpha", "beta"]: for jj in range(0, max_points[arm], n_chunk): data = {} send_trajectory_pids = [] for pos_id in self.trajectories: arm_chunk = self.trajectories[pos_id][arm][jj : jj + n_chunk] if len(arm_chunk) == 0: continue send_trajectory_pids.append(pos_id) positions = numpy.array(arm_chunk).astype(numpy.float64) data_pos = SendTrajectoryData.calculate_positions(positions) data[pos_id] = data_pos self.data_send_cmd = await self.fps.send_command( "SEND_TRAJECTORY_DATA", positioner_ids=send_trajectory_pids, data=data, ) status = self.data_send_cmd.status if status.failed or status.timed_out: for reply in self.data_send_cmd.replies: if reply.response_code != ResponseCode.COMMAND_ACCEPTED: code = self.failed_positioners[reply.positioner_id] = code self.failed = True raise TrajectoryError( "At least one SEND_TRAJECTORY_COMMAND failed.", self, ) # Finalise the trajectories self.end_traj_cmds = await self.fps.send_command( "TRAJECTORY_DATA_END", positioner_ids=list(self.trajectories.keys()), ) for cmd in self.end_traj_cmds: if cmd.status.failed: self.failed = True raise TrajectoryError("TRAJECTORY_DATA_END failed.", self) if ResponseCode.INVALID_TRAJECTORY in cmd.replies[0].response_code: self.failed = True raise TrajectoryError( f"positioner_id={cmd.positioner_id} got an " f"INVALID_TRAJECTORY reply.", self, ) self.data_send_time = time.time() - start_trajectory_send_time self._ready_to_start = True self.failed = False return True
[docs] async def start(self, use_sync_line: bool = True): """Starts the trajectory.""" if not self._ready_to_start or self.failed: raise TrajectoryError("The trajectory has not been sent.", self) if self.move_time is None: raise TrajectoryError("move_time not set.", self) self.use_sync_line = use_sync_line if use_sync_line: if not isinstance(self.fps.ieb, IEB) or self.fps.ieb.disabled: raise TrajectoryError( "IEB is not connected. Cannot use SYNC line.", self, ) if (await self.fps.ieb.get_device("sync").read())[0] == "closed": raise TrajectoryError("The SYNC line is on high.", self) sync = self.fps.ieb.get_device("sync") assert isinstance(sync, drift.Relay) # Set SYNC line to high. await sync.close() # Schedule reseting of SYNC line asyncio.get_event_loop().call_later(0.5, asyncio.create_task, else: # Start trajectories n_expected = len([pos for pos in self.fps.values() if not pos.offline]) command = await self.fps.send_command( "START_TRAJECTORY", positioner_ids=0, timeout=1, # All positioners reply, including those not in the trajectory but not # offline ones. n_positioners=n_expected, ) if command.status.failed: await self.fps.stop_trajectory() self.failed = True raise TrajectoryError("START_TRAJECTORY failed", self) restart_pollers = True if self.fps.pollers.running else False await self.fps.pollers.stop() self.start_time = time.time() try: while True: await asyncio.sleep(1) if self.fps.locked: raise TrajectoryError( "The FPS got locked during the trajectory.", self, ) await self.fps.update_status() if self.fps.status & FPSStatus.IDLE: self.failed = False break elapsed = time.time() - self.start_time if elapsed > (self.move_time + 3): raise TrajectoryError( "Some positioners did not complete the move.", self, ) # TODO: There seems to be bug in the firmware. Sometimes when a positioner # fails to start its trajectory, at the end of the trajectory time it # does believe it has reached the commanded position, although it's still # at the initial position. In those cases issuing a SEND_TRAJECTORY_ABORT # followed by a position update seems to return correct positions. await self.fps.stop_trajectory() # The FPS says they have all stopped moving but check that they are # actually at their positions. await self.fps.update_position() failed_reach = False for pid in self.trajectories: alpha = self.trajectories[pid]["alpha"][-1][0] beta = self.trajectories[pid]["beta"][-1][0] current_position = numpy.array(self.fps[pid].position) if not numpy.allclose(current_position, [alpha, beta], atol=0.1): warnings.warn( f"Positioner {pid} may not have reached its destination.", JaegerUserWarning, ) failed_reach = True if failed_reach: raise TrajectoryError( "Some positioners did not reach their destinations.", self, ) except BaseException: self.failed = True await self.fps.stop_trajectory() raise finally: # Not explicitely updating the positions here because save_snapshot() # will do that and no need to waste extra time. Do not wait for this. asyncio.create_task(self.fps.save_snapshot()) if self.dump_file: self.dump_trajectory() self.end_time = time.time() if restart_pollers: self.fps.pollers.start() return True
[docs] def dump_trajectory(self, path: str | None = None): """Dumps the trajectory to a JSON file.""" if path is None: if self.dump_file is None: return path = self.dump_file self.dump_data["success"] = not self.failed self.dump_data["trajectory_start_time"] = self.start_time self.dump_data["trajectory_send_time"] = self.data_send_time self.dump_data["use_sync_line"] = self.use_sync_line self.dump_data["end_time"] = time.time() self.dump_data["final_positions"] = self.fps.get_positions_dict() if not os.path.exists(os.path.dirname(path)): os.makedirs(os.path.dirname(path)) with open(path, "w") as f: f.write(json.dumps(self.dump_data, indent=2))
[docs] async def abort_trajectory(self): """Aborts the trajectory transmission.""" cmd = await self.fps.send_command( "SEND_TRAJECTORY_ABORT", positioner_ids=list(self.trajectories.keys()), ) if cmd.status.failed: raise TrajectoryError("Cannot abort trajectory transmission.", self)
[docs] class SendNewTrajectory(Command): """Starts a new trajectory and sends the number of points.""" command_id = CommandID.SEND_NEW_TRAJECTORY broadcastable = False move_command = True def __init__(self, positioner_ids, n_alpha=None, n_beta=None, **kwargs): if n_alpha is not None and n_beta is not None: kwargs["data"] = self.get_data(n_alpha, n_beta) else: assert "data" in kwargs, "n_alpha/n_beta or data need to be passed." super().__init__(positioner_ids, **kwargs)
[docs] @staticmethod def get_data(n_alpha, n_beta): """Returns the data bytearray from a pair for ``(n_alpha, n_beta)``.""" alpha_positions = int(n_alpha) beta_positions = int(n_beta) assert alpha_positions > 0 and beta_positions > 0 data = int_to_bytes(alpha_positions) + int_to_bytes(beta_positions) return data
[docs] class SendTrajectoryData(Command): """Sends a data point in the trajectory. This command sends multiple messages that represent a full trajectory. Parameters ---------- positions : list A list of tuples in which the first element is the angle, in degrees, and the second is the associated time, in seconds. Examples -------- >>> SendTrajectoryData([(90, 10), (88, 12), (80, 20)]) """ command_id = CommandID.SEND_TRAJECTORY_DATA broadcastable = False move_command = True def __init__(self, positioner_ids, positions=None, **kwargs): if positions is not None: kwargs["data"] = self.calculate_positions(positions) else: assert "data" in kwargs, "positions or data need to be passed." super().__init__(positioner_ids, **kwargs)
[docs] @staticmethod def calculate_positions(positions): """Converts angle-time posions to bytes data.""" positions = numpy.array(positions).astype(numpy.float64) positions[:, 0] = positions[:, 0] / 360.0 * MOTOR_STEPS positions[:, 1] /= TIME_STEP positions = positions.astype(numpy.int32) data = [] for angle, tt in positions: data.append(int_to_bytes(angle, dtype="i4") + int_to_bytes(tt, dtype="i4")) return data
[docs] class TrajectoryDataEnd(Command): """Indicates that the transmission for the trajectory has ended.""" command_id = CommandID.TRAJECTORY_DATA_END broadcastable = False move_command = True
[docs] class SendTrajectoryAbort(Command): """Aborts sending a trajectory.""" command_id = CommandID.SEND_TRAJECTORY_ABORT broadcastable = False move_command = False safe = True
[docs] class StartTrajectory(Command): """Starts the trajectories.""" command_id = CommandID.START_TRAJECTORY broadcastable = True move_command = True safe = True
[docs] class StopTrajectory(Command): """Stop the trajectories.""" command_id = CommandID.STOP_TRAJECTORY broadcastable = True safe = True