Source code for jaeger.commands.trajectory

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2018-10-08
# @Filename: trajectory.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

import asyncio
import pathlib
import time

import numpy

from sdsstools import read_yaml_file

from jaeger import config, log, maskbits
from jaeger.commands import Command, CommandID
from jaeger.exceptions import FPSLockedError, TrajectoryError
from jaeger.utils import int_to_bytes


__ALL__ = ['send_trajectory', 'SendNewTrajectory', 'SendTrajectoryData',
           'TrajectoryDataEnd', 'TrajectoryTransmissionAbort',
           'StartTrajectory', 'StopTrajectory']


MOTOR_STEPS = config['positioner']['motor_steps']
TIME_STEP = config['positioner']['time_step']


[docs]async def send_trajectory(fps, trajectories, use_sync_line=True): """Sends a set of trajectories to the positioners. This is a low-level function that raises errors when a problem is encountered. Most users should use `.FPS.send_trajectory` instead. `.send_trajectory` automates `.Trajectory` by calling the different methods in order, but provides less control. Parameters ---------- fps : .FPS The instance of `.FPS` that will receive the trajectory. trajectories : `str` or `dict` Either a path to a YAML file to read or a dictionary with the trajectories. In either case the format must be a dictionary in which the keys are the ``positioner_ids`` and each value is a dictionary containing two keys: ``alpha`` and ``beta``, each pointing to a list of tuples ``(position, time)``, where ``position`` is in degrees and ``time`` is in seconds. use_sync_line : bool If `True`, the SYNC line will be used to synchronise the beginning of all trajectories. Otherwise a `.START_TRAJECTORY` command will be sent over the CAN network. Raises ------ TrajectoryError If encounters a problem sending the trajectory. FPSLockedError If the FPS is locked. Examples -------- :: >>> fps = FPS() >>> await fps.initialise() # Send a trajectory with two points for positioner 4 >>> await fps.send_trajectory({4: {'alpha': [(90, 0), (91, 3)], 'beta': [(20, 0), (23, 4)]}}) """ traj = Trajectory(fps, trajectories) if use_sync_line: if not fps.ieb or fps.ieb.disabled: raise TrajectoryError('IEB is not connected. ' 'Cannot use SYNC line.') if (await fps.ieb.get_device('sync').read())[0] == 'closed': raise TrajectoryError('The SYNC line is on high.') log.debug('sending trajectory data.') await traj.send() if traj.failed: raise TrajectoryError('something went wrong sending the trajectory.') log.info(f'trajectory successfully sent in {traj.data_send_time:1f} seconds.') log.info(f'expected time to complete trajectory: {traj.move_time:.2f} seconds.') log.info('starting trajectory ...') result = await traj.start(use_sync_line=use_sync_line) if traj.failed or not result: raise TrajectoryError('something went wrong starting the trajectory.') log.info('all positioners have successfully reached their positions.') return True
[docs]class Trajectory(object): """Prepares and sends a trajectory to the FPS. Most user will prefer using `.FPS.send_trajectory`, which automates the process. This class provides fine-grain control over the process. Parameters ---------- fps : .FPS The instance of `.FPS` that will receive the trajectory. trajectories : `str` or `dict` Either a path to a YAML file to read or a dictionary with the trajectories. In either case the format must be a dictionary in which the keys are the ``positioner_ids`` and each value is a dictionary containing two keys: ``alpha`` and ``beta``, each pointing to a list of tuples ``(position, time)``, where ``position`` is in degrees and ``time`` is in seconds. Raises ------ TrajectoryError If encounters a problem sending the trajectory. FPSLockedError If the FPS is locked. Examples -------- Given the following two-point trajectory for positioner 4 :: points = {4: {'alpha': [(90, 0), (91, 3)], 'beta': [(20, 0), (23, 4)]}} the normal process to execute the trajectory is :: trajectory = Trajectory(fps, points) await trajectory.send() # Sends the trajectory but does not yet execute it. await trajectory.start() # This starts the trajectory (positioners move). """ def __init__(self, fps, trajectories): self.fps = fps self.trajectories = trajectories if self.fps.locked: raise FPSLockedError('FPS is locked. Cannot send trajectories.') if self.fps.moving: raise TrajectoryError('the FPS is moving. Cannot send new trajectory.') if isinstance(self.trajectories, (str, pathlib.Path)): self.trajectories = read_yaml_file(self.trajectories) elif isinstance(self.trajectories, dict): pass else: raise TrajectoryError('invalid trajectory data.') #: Number of points sent to each positioner as a tuple ``(alpha, beta)``. self.n_points = {} #: The time required to complete the trajectory. self.move_time = None #: How long it took to send the trajectory. self.data_send_time = None self.failed = False self._ready_to_start = False
[docs] async def send(self): """Sends the trajectory but does not start it.""" self.move_time = 0.0 await self.abort_trajectory() if not await self.fps.update_status(positioner_ids=list(self.trajectories.keys()), timeout=1.): self.failed = True raise TrajectoryError('some positioners did not respond.') # Check that all positioners are ready to receive a new trajectory. for pos_id in self.trajectories: positioner = self.fps.positioners[pos_id] status = positioner.status if (positioner.flags.DATUM_ALPHA_INITIALIZED not in status or positioner.flags.DATUM_BETA_INITIALIZED not in status or positioner.flags.DISPLACEMENT_COMPLETED not in status): self.failed = True raise TrajectoryError(f'positioner_id={pos_id} is not ' 'ready to receive a trajectory.') traj_pos = self.trajectories[pos_id] self.n_points[pos_id] = (len(traj_pos['alpha']), len(traj_pos['beta'])) # Gets maximum time for this positioner max_time_pos = max([max(list(zip(*traj_pos['alpha']))[1]), max(list(zip(*traj_pos['beta']))[1])]) # Updates the global trajectory max time. if max_time_pos > self.move_time: self.move_time = max_time_pos # Starts trajectory new_traj_cmds = [self.fps.send_command('SEND_NEW_TRAJECTORY', positioner_id=pos_id, n_alpha=self.n_points[pos_id][0], n_beta=self.n_points[pos_id][1]) for pos_id in self.trajectories] await asyncio.gather(*new_traj_cmds) start_trajectory_send_time = time.time() # How many points from the trajectory are we putting in each command. n_chunk = config['positioner']['trajectory_data_n_points'] # Gets the maximum number of points for each arm for all positioners. max_points = numpy.max(list(self.n_points.values()), axis=0) max_points = {'alpha': max_points[0], 'beta': max_points[1]} # Send chunks of size n_chunk to all the positioners in parallel. # Do alpha first, then beta. for arm in ['alpha', 'beta']: for jj in range(0, max_points[arm], n_chunk): data_cmds = [] for pos_id in self.trajectories: arm_chunk = self.trajectories[pos_id][arm][jj:jj + n_chunk] if len(arm_chunk) == 0: continue data_cmds.append( self.fps.send_command('SEND_TRAJECTORY_DATA', positioner_id=pos_id, positions=arm_chunk)) await asyncio.gather(*data_cmds) for cmd in data_cmds: if cmd.status.failed or cmd.status.timed_out: self.failed = True raise TrajectoryError('at least one SEND_TRAJECTORY_COMMAND failed.') # Finalise the trajectories end_traj_cmds = await self.fps.send_to_all('TRAJECTORY_DATA_END', positioners=list(self.trajectories.keys())) for cmd in end_traj_cmds: if cmd.status.failed: self.failed = True raise TrajectoryError('TRAJECTORY_DATA_END failed.') if maskbits.ResponseCode.INVALID_TRAJECTORY in cmd.replies[0].response_code: self.failed = True raise TrajectoryError(f'positioner_id={cmd.positioner_id} got an ' f'INVALID_TRAJECTORY reply.') self.data_send_time = time.time() - start_trajectory_send_time self._ready_to_start = True self.failed = False return True
[docs] async def start(self, use_sync_line=True): """Starts the trajectory.""" if not self._ready_to_start or self.failed: raise TrajectoryError('the trajectory has not been sent.') if use_sync_line: if not self.fps.ieb or self.fps.ieb.disabled: raise TrajectoryError('IEB is not connected. ' 'Cannot use SYNC line.') if (await self.fps.ieb.get_device('sync').read())[0] == 'closed': raise TrajectoryError('The SYNC line is on high.') for positioner_id in list(self.trajectories.keys()): self.fps[positioner_id].move_time = self.move_time if use_sync_line: sync = self.fps.ieb.get_device('sync') # Set SYNC line to high. await sync.close() await asyncio.sleep(0.5) # Reset SYNC line. await sync.open() else: # Start trajectories command = await self.fps.send_command( 'START_TRAJECTORY', positioner_id=0, timeout=1, n_positioners=len(self.trajectories)) if command.status.failed: await self.fps.stop_trajectory() self.failed = True raise TrajectoryError('START_TRAJECTORY failed') try: await self.fps.pollers.set_delay(1) use_pollers = True except Exception as ee: use_pollers = False log.error(f'failed setting poller delay: {ee}.') # Wait approximate time before starting to poll for status await asyncio.sleep(0.95 * self.move_time) remaining_time = self.move_time - 0.95 * self.move_time # Wait until all positioners have completed. wait_status = [self.fps.positioners[pos_id].wait_for_status( self.fps.positioners[pos_id].flags.DISPLACEMENT_COMPLETED, timeout=remaining_time + 3, delay=0.1) for pos_id in self.trajectories] results = await asyncio.gather(*wait_status) if not all(results): if use_pollers: await self.fps.pollers.set_delay() self.failed = True raise TrajectoryError('some positioners did not complete the move.') # Restore default polling time if use_pollers: await self.fps.pollers.set_delay() return True
[docs] async def abort_trajectory(self): """Aborts the trajectory transmission.""" if not await self.fps.send_to_all('TRAJECTORY_TRANSMISSION_ABORT', positioners=list(self.trajectories.keys())): raise TrajectoryError('cannot abort trajectory transmission.')
[docs]class SendNewTrajectory(Command): """Starts a new trajectory and sends the number of points.""" command_id = CommandID.SEND_NEW_TRAJECTORY broadcastable = False move_command = True def __init__(self, n_alpha, n_beta, **kwargs): alpha_positions = int(n_alpha) beta_positions = int(n_beta) assert alpha_positions > 0 and beta_positions > 0 data = int_to_bytes(alpha_positions) + int_to_bytes(beta_positions) kwargs['data'] = data super().__init__(**kwargs)
[docs]class SendTrajectoryData(Command): """Sends a data point in the trajectory. This command sends multiple messages that represent a full trajectory. Parameters ---------- positions : list A list of tuples in which the first element is the angle, in degrees, and the second is the associated time, in seconds. Examples -------- >>> SendTrajectoryData([(90, 10), (88, 12), (80, 20)]) """ command_id = CommandID.SEND_TRAJECTORY_DATA broadcastable = False move_command = True def __init__(self, positions, **kwargs): positions = numpy.array(positions).astype(numpy.float64) positions[:, 0] = positions[:, 0] / 360. * MOTOR_STEPS positions[:, 1] /= TIME_STEP self.trajectory_points = positions.astype(numpy.int) data = [] for angle, tt in self.trajectory_points: data.append(int_to_bytes(angle, dtype='i4') + int_to_bytes(tt, dtype='i4')) kwargs['data'] = data super().__init__(**kwargs)
[docs]class TrajectoryDataEnd(Command): """Indicates that the transmission for the trajectory has ended.""" command_id = CommandID.TRAJECTORY_DATA_END broadcastable = False move_command = True
[docs]class TrajectoryTransmissionAbort(Command): """Aborts sending a trajectory.""" command_id = CommandID.TRAJECTORY_TRANSMISSION_ABORT broadcastable = False move_command = False safe = True
[docs]class StartTrajectory(Command): """Starts the trajectories.""" command_id = CommandID.START_TRAJECTORY broadcastable = True move_command = True
[docs]class StopTrajectory(Command): """Stop the trajectories.""" command_id = CommandID.STOP_TRAJECTORY broadcastable = True safe = True