Command API¶
Base classes¶
- class jaeger.commands.base.Command(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
StatusMixIn[CommandStatus],Future[Command]A command to be sent to the CAN controller.
Implements a base class to define CAN commands to interact with the positioner. Commands can be composed of single or multiple messages. When sending a command to the bus, the first message is written to, then asynchronously waits for a confirmation that the message has been received before sending the following message. If any of the messages returns an error code the command is failed.
Commandsubclasses fromStatusMixInandstatus_callbackgets called when the status changes.Commandis aFutureand must be awaited. TheFutureis done whenfinish_commandis called, which happens when the status is marked done or cancelled or when the command timeouts.Commands sent to a single positioner are marked done when a reply is received from the same positioner for the given command, or when it times out. Broadcast commands only get marked done by timing out or manually.
- Parameters:
positioner_ids (int | List[int]) – The id or list of ids of the robot(s) to which this command will be sent. Use
positioner_ids=0to broadcast to all robots.loop – The running event loop, or uses
get_event_loop.timeout (float) – Time after which the command will be marked done when not all the positioners have replies. If
None, the default timeout will be used. If timeout is a negative number, the command won’t timeout until all the positioners have replied.done_callback (Optional[Callable]) – A function to call when the command has been successfully completed.
n_positioners (Optional[int]) – If the command is a broadcast, the number of positioners that should reply. If defined, the command will be done once as many positioners have replied. Ignored for non-broadcasts.
data (Union[None, data_co, Dict[int, data_co]]) – The data to pass to the messages. If a list, each element will be sent to each positioner as a message. It can also be a dictionary of lists in which the key is the positioner to which to send the data.
ignore_unknown (bool) – Ignores
UNKNOWN_COMMANDreplies from positioners that do now support this command.
- get_messages(data=None)[source]¶
Returns the list of messages associated with this command.
Unless overridden, returns a single message with the associated data.
- get_replies()[source]¶
Returns the formatted replies as a dictionary.
The values returned will depend on the specific command.
- status_callback()[source]¶
Callback for change status.
When the status gets set to
CommandStatus.RUNNINGstarts a timer that marks the command as done aftertimeout.
- bootloader = False¶
Whether the command is safe to be issues in bootloader mode.
- move_command = False¶
Whether this command produces a positioner move.
- property name¶
Returns the name of this command.
- replies: List[Reply]¶
A list of messages with the responses to this command.
- safe = False¶
Whether it’s safe to execute this command when the FPS is locked.
- class jaeger.commands.base.SuperMessage(command, data=bytearray(b''), positioner_id=0, uid=0, response_code=0, extended_id=True)[source]¶
Bases:
MessageAn extended CAN
Messageclass.Expands the
Messageclass to handle custom arbitration IDs for extended frames.- Parameters:
command (Command) – The command associated with this message.
data (bytearray) – Payload to pass to
Message.positioner_id (int) – The positioner to which the message will be sent (0 for broadcast).
uid (int) – The unique identifier for this message.
extended_id (bool) – Whether the id is an 11 bit (False) or 29 bit (True) address.
response_code (int)
List of commands¶
- class jaeger.commands.CommandID(value)[source]¶
Bases:
IntEnumIDs associated with commands.
- ALPHA_CLOSED_LOOP_COLLISION_DETECTION = 118¶
- ALPHA_CLOSED_LOOP_WITHOUT_COLLISION_DETECTION = 119¶
- ALPHA_OPEN_LOOP_COLLISION_DETECTION = 120¶
- ALPHA_OPEN_LOOP_WITHOUT_COLLISION_DETECTION = 121¶
- BETA_CLOSED_LOOP_COLLISION_DETECTION = 122¶
- BETA_CLOSED_LOOP_WITHOUT_COLLISION_DETECTION = 123¶
- BETA_OPEN_LOOP_COLLISION_DETECTION = 124¶
- BETA_OPEN_LOOP_WITHOUT_COLLISION_DETECTION = 125¶
- COLLISION_DETECTED = 18¶
- GET_ACTUAL_POSITION = 32¶
- GET_ALPHA_HALL_CALIB = 104¶
- GET_BETA_HALL_CALIB = 105¶
- GET_CURRENT = 56¶
- GET_FIRMWARE_VERSION = 2¶
- GET_HALL_CALIB_ERROR = 45¶
- GET_HOLDING_CURRENT = 113¶
- GET_ID = 1¶
- GET_NUMBER_TRAJECTORIES = 139¶
- GET_OFFSETS = 34¶
- GET_RAW_TEMPERATURE = 132¶
- GET_STATUS = 3¶
- GO_TO_ABSOLUTE_POSITION = 30¶
- GO_TO_DATUMS = 20¶
- GO_TO_DATUM_ALPHA = 21¶
- GO_TO_DATUM_BETA = 22¶
- GO_TO_RELATIVE_POSITION = 31¶
- HALL_OFF = 117¶
- HALL_ON = 116¶
- SAVE_INTERNAL_CALIBRATION = 53¶
- SEND_FIRMWARE_DATA = 201¶
- SEND_NEW_TRAJECTORY = 10¶
- SEND_TRAJECTORY_ABORT = 13¶
- SEND_TRAJECTORY_DATA = 11¶
- SET_ACTUAL_POSITION = 33¶
- SET_CURRENT = 41¶
- SET_HOLDING_CURRENT = 112¶
- SET_INCREASE_COLLISION_MARGIN = 111¶
- SET_NUMBER_TRAJECTORIES = 140¶
- SET_OFFSETS = 35¶
- SET_SPEED = 40¶
- START_COGGING_CALIBRATION = 47¶
- START_COGGING_CALIBRATION_ALPHA = 48¶
- START_COGGING_CALIBRATION_BETA = 49¶
- START_DATUM_CALIBRATION = 23¶
- START_DATUM_CALIBRATION_ALPHA = 24¶
- START_DATUM_CALIBRATION_BETA = 25¶
- START_FIRMWARE_UPGRADE = 200¶
- START_MOTOR_CALIBRATION = 26¶
- START_MOTOR_CALIBRATION_ALPHA = 26¶
- START_MOTOR_CALIBRATION_BETA = 27¶
- START_TRAJECTORY = 14¶
- STOP_TRAJECTORY = 15¶
- SWITCH_LED_OFF = 127¶
- SWITCH_LED_ON = 126¶
- SWITCH_OFF_PRECISE_MOVE_ALPHA = 129¶
- SWITCH_OFF_PRECISE_MOVE_BETA = 131¶
- SWITCH_ON_PRECISE_MOVE_ALPHA = 128¶
- SWITCH_ON_PRECISE_MOVE_BETA = 130¶
- TRAJECTORY_DATA_END = 12¶
- class jaeger.commands.GetID(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
CommandCommands the positioners to reply with their positioner id.
- Parameters:
- bootloader = True¶
Whether the command is safe to be issues in bootloader mode.
- safe = True¶
Whether it’s safe to execute this command when the FPS is locked.
- class jaeger.commands.GetFirmwareVersion(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
Command- Parameters:
- get_firmware(positioner_id=None)[source]¶
Returns the firmware version string.
- Parameters:
positioner_id (int) – The positioner for which to return the version. If
Nonereturns a dictionary with the firmware version of all the positioners that replied.- Returns:
firmware – A string or dictionary of string with the firmware version(s), with the format
'XX.YY.ZZ'whereYY='80'if the positioner is in bootloader mode.- Raises:
ValueError – If no positioner with
positioner_idhas replied.- Return type:
- get_replies()[source]¶
Returns the formatted replies as a dictionary.
The values returned will depend on the specific command.
- bootloader = True¶
Whether the command is safe to be issues in bootloader mode.
- safe = True¶
Whether it’s safe to execute this command when the FPS is locked.
- class jaeger.commands.GetStatus(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
CommandGets the status bits for the positioner.
- Parameters:
- get_replies()[source]¶
Returns the formatted replies as a dictionary.
The values returned will depend on the specific command.
- bootloader = True¶
Whether the command is safe to be issues in bootloader mode.
- safe = True¶
Whether it’s safe to execute this command when the FPS is locked.
- class jaeger.commands.GotoAbsolutePosition(positioner_ids, alpha=None, beta=None, **kwargs)[source]¶
Bases:
CommandMoves alpha and beta to absolute positions in degrees.
- get_move_time()[source]¶
Returns the time needed to move to the commanded position.
- Raises:
ValueError – If no reply has been received or the data cannot be parsed.
- get_replies()[source]¶
Returns the formatted replies as a dictionary.
The values returned will depend on the specific command.
- move_command = True¶
Whether this command produces a positioner move.
- class jaeger.commands.GotoRelativePosition(positioner_ids, alpha=None, beta=None, **kwargs)[source]¶
Bases:
GotoAbsolutePositionMoves alpha and beta a relative number of degrees.
- move_command = True¶
Whether this command produces a positioner move.
- class jaeger.commands.GetActualPosition(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
CommandGets the current position of the alpha and beta arms.
- Parameters:
- get_replies()[source]¶
Returns the formatted replies as a dictionary.
The values returned will depend on the specific command.
- safe = True¶
Whether it’s safe to execute this command when the FPS is locked.
- class jaeger.commands.SetActualPosition(positioner_ids, alpha=None, beta=None, **kwargs)[source]¶
Bases:
CommandSets the current position of the alpha and beta arms.
- move_command = True¶
Whether this command produces a positioner move.
- safe = True¶
Whether it’s safe to execute this command when the FPS is locked.
- class jaeger.commands.SetSpeed(positioner_ids, alpha=None, beta=None, **kwargs)[source]¶
Bases:
CommandSets the speeds of the alpha and beta motors.
- move_command = False¶
Whether this command produces a positioner move.
- safe = True¶
Whether it’s safe to execute this command when the FPS is locked.
- class jaeger.commands.SetCurrent(positioner_ids, alpha=None, beta=None, **kwargs)[source]¶
Bases:
CommandSets the current of the alpha and beta motors.
- move_command = True¶
Whether this command produces a positioner move.
- safe = True¶
Whether it’s safe to execute this command when the FPS is locked.
- class jaeger.commands.goto(fps, new_positions, speed=None, relative=False, use_sync_line=None, go_cowboy=False, force=False, command=None, save_snapshot=True)[source]¶
Bases:
Send positioners to a given position using a trajectory with
kaijucheck.- Parameters:
new_positions (dict[int, tuple[float, float]]) – The new positions as a dictionary of positioner ID to a tuple of new alpha and beta angles. Positioners not specified will be kept on the same positions.
speed (Optional[float]) – The speed to use.
relative (bool) – If
True,alphaandbetaare considered relative angles.use_sync_line (bool | None) – Whether to use the SYNC line to start the trajectories.
go_cowboy (bool) – If set, does not create a
kaiju-safe trajectory. Use at your own risk.force (bool) – If
go_cowboy=Falseand the trajectory is deadlocked, aTrajectoryErrorwill be raised. Useforce=Trueto apply the trajectory anyway.command (CluCommand[JaegerActor] | None) – A command to pass to
send_trajectoryto output additional information.save_snapshot (bool) – If
True, a snapshot image is saved at the end of the trajectory.
Bootloader commands¶
- class jaeger.commands.StartFirmwareUpgrade(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
Command- Parameters:
- bootloader = True¶
Whether the command is safe to be issues in bootloader mode.
- safe = True¶
Whether it’s safe to execute this command when the FPS is locked.
- class jaeger.commands.SendFirmwareData(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
Command- Parameters:
- bootloader = True¶
Whether the command is safe to be issues in bootloader mode.
- safe = True¶
Whether it’s safe to execute this command when the FPS is locked.
- async jaeger.commands.load_firmware(fps, firmware_file, positioners=None, messages_per_positioner=None, force=False, show_progressbar=False, progress_callback=None, stop_logging=True)[source]¶
Convenience function to run through the steps of loading a new firmware.
This function is a coroutine and not intendend for direct use. Use the
jaegerCLI instead.- Parameters:
fps (FPS) –
FPSinstance to which the commands will be sent.firmware_file (str | pathlib.Path) – Binary file containing the firmware to load.
positioners (Optional[List[int]]) – A list of positioner ids whose firmware to update, or
Noneto update all the positioners infps.messages_per_positioner (Optional[int]) – How many messages to send to each positioner at once. This can improve the performance but also overflow the CAN bus buffer. With the default value of
None, reverts to the configuration valuepositioner.firmware_messages_per_positioner.force (bool) – Forces the firmware load to continue even if some positioners are not responding or are not in bootloader mode.
show_progressbar (bool) – Whether to show a progress bar.
progress_callback (Optional[Callable[[int, int], Any]]) – A function to call as data gets transferred to the positioners. The callback is called with
(current_chunk, n_chuck)wherecurrent_chunkis the number of the data chunk being sent andn_chunkis the total number of chunks in the data package.stop_logging (bool) – Disable logging to file for the CAN logger to improve performance.
Trajectory commands¶
- class jaeger.commands.Trajectory(fps, trajectories, extra_dump_data={})[source]¶
Bases:
objectPrepares and sends a trajectory to the FPS.
Most user will prefer using
FPS.send_trajectory, which automates the process. This class provides fine-grain control over the process.- Parameters:
fps (FPS) – The instance of
FPSthat will receive the trajectory.trajectories (str | pathlib.Path | TrajectoryDataType) – Either a path to a YAML file to read or a dictionary with the trajectories. In either case the format must be a dictionary in which the keys are the
positioner_idsand each value is a dictionary containing two keys:alphaandbeta, each pointing to a list of tuples(position, time), wherepositionis in degrees andtimeis in seconds.extra_dump_data (dict[str, Any]) – A dictionary with additional parameters to add to the dump JSON.
- Raises:
TrajectoryError – If encounters a problem sending the trajectory.
FPSLockedError – If the FPS is locked.
Examples
Given the following two-point trajectory for positioner 4
points = {4: {'alpha': [(90, 0), (91, 3)], 'beta': [(20, 0), (23, 4)]}}
the normal process to execute the trajectory is
trajectory = Trajectory(fps, points) await trajectory.send() # Sends the trajectory but does not yet execute it. await trajectory.start() # This starts the trajectory (positioners move).
- dump_trajectory(path=None)[source]¶
Dumps the trajectory to a JSON file.
- Parameters:
path (str | None)
- n_points¶
Number of points sent to each positioner as a tuple
(alpha, beta).
- async jaeger.commands.send_trajectory(fps, trajectories, use_sync_line=None, send_trajectory=True, start_trajectory=True, command=None, dump=True, extra_dump_data={}, save_snapshot=True)[source]¶
Sends a set of trajectories to the positioners.
This is a low-level function that raises errors when a problem is encountered. Most users should use
FPS.send_trajectoryinstead.send_trajectoryautomatesTrajectoryby calling the different methods in order, but provides less control.- Parameters:
fps (FPS) – The instance of
FPSthat will receive the trajectory.trajectories (str | pathlib.Path | TrajectoryDataType) – Either a path to a YAML file to read or a dictionary with the trajectories. In either case the format must be a dictionary in which the keys are the
positioner_idsand each value is a dictionary containing two keys:alphaandbeta, each pointing to a list of tuples(position, time), wherepositionis in degrees andtimeis in seconds.use_sync_line (bool | None) – If
True, the SYNC line will be used to synchronise the beginning of all trajectories. Otherwise aSTART_TRAJECTORYcommand will be sent over the CAN network. IfNone, defaults to the configuration parameter.send_trajectory (bool) – If
True, sends the trajectory to the positioners and returns theTrajectoryinstance.start_trajectory (bool) – If
True, runs the trajectory after sending it. Otherwise, returns theTrajectoryinstance after sending the data. Ignored ifsend_trajectory=False.command (Optional[CluCommand[JaegerActor]]) – A command to which to output informational messages.
dump (bool | str) – Whether to dump a JSON with the trajectory to disk. If
True, the trajectory is stored in/data/logs/jaeger/trajectories/<MJD>with a unique sequence for each trajectory. A string can be passed with a custom path. The dump file is created only ifTrajectory.startis called, regardless of whether the trajectory succeeds.extra_dump_data (dict[str, Any]) – A dictionary with additional parameters to add to the dump JSON.
save_snapshot (bool) – If
True, a snapshot image is saved at the end of the trajectory.
- Raises:
TrajectoryError – If encounters a problem sending the trajectory.
FPSLockedError – If the FPS is locked.
- Return type:
Examples
>>> fps = FPS() >>> await fps.initialise() # Send a trajectory with two points for positioner 4 >>> await fps.send_trajectory({4: {'alpha': [(90, 0), (91, 3)], 'beta': [(20, 0), (23, 4)]}})
- class jaeger.commands.SendNewTrajectory(positioner_ids, n_alpha=None, n_beta=None, **kwargs)[source]¶
Bases:
CommandStarts a new trajectory and sends the number of points.
- static get_data(n_alpha, n_beta)[source]¶
Returns the data bytearray from a pair for
(n_alpha, n_beta).
- move_command = True¶
Whether this command produces a positioner move.
- class jaeger.commands.SendTrajectoryData(positioner_ids, positions=None, **kwargs)[source]¶
Bases:
CommandSends a data point in the trajectory.
This command sends multiple messages that represent a full trajectory.
- Parameters:
positions (list) – A list of tuples in which the first element is the angle, in degrees, and the second is the associated time, in seconds.
Examples
>>> SendTrajectoryData([(90, 10), (88, 12), (80, 20)])
- move_command = True¶
Whether this command produces a positioner move.
- class jaeger.commands.TrajectoryDataEnd(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
CommandIndicates that the transmission for the trajectory has ended.
- Parameters:
- move_command = True¶
Whether this command produces a positioner move.
- class jaeger.commands.SendTrajectoryAbort(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
CommandAborts sending a trajectory.
- Parameters:
- move_command = False¶
Whether this command produces a positioner move.
- safe = True¶
Whether it’s safe to execute this command when the FPS is locked.
- class jaeger.commands.StartTrajectory(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
CommandStarts the trajectories.
- Parameters:
- move_command = True¶
Whether this command produces a positioner move.
- safe = True¶
Whether it’s safe to execute this command when the FPS is locked.
Calibration commands¶
- async jaeger.commands.calibration.calibrate_positioners(fps, axis, positioner_ids, motors=True, datums=True, cogging=True)[source]¶
Runs the calibration process and saves it to the internal memory.
- Parameters:
fps (FPS) – The instance of
FPSthat will receive the trajectory.axis (str) – The axis to calibrate, either
"alpha","beta", or"both".positioner_id – The ID of the positioner(s) to calibrate.
motors (bool) – Whether to perform the motor calibration.
datums (bool) – Whether to perform the datums calibration.
cogging (bool) – Whether to perform the cogging calibration (may take more than one hour).
- Raises:
JaegerError – If encounters a problem during the process.
Examples
>>> fps = FPS() >>> await fps.initialise() # Calibrate positioner 31. >>> await calibrate_positioner(fps, 31)
- class jaeger.commands.calibration.StartDatumCalibration(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
CommandIndicates that the transmission for the trajectory has ended.
- Parameters:
- move_command = True¶
Whether this command produces a positioner move.
- replies: List[Reply]¶
A list of messages with the responses to this command.
- class jaeger.commands.calibration.StartMotorCalibration(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
CommandAborts sending a trajectory.
- Parameters:
- move_command = True¶
Whether this command produces a positioner move.
- replies: List[Reply]¶
A list of messages with the responses to this command.
- class jaeger.commands.calibration.StartCoggingCalibration(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
CommandStarts the trajectories.
- Parameters:
- move_command = True¶
Whether this command produces a positioner move.
- replies: List[Reply]¶
A list of messages with the responses to this command.
- class jaeger.commands.calibration.SaveInternalCalibration(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
CommandStop the trajectories.
- Parameters:
- move_command = False¶
Whether this command produces a positioner move.
- replies: List[Reply]¶
A list of messages with the responses to this command.
- class jaeger.commands.calibration.HallOn(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
CommandTurns hall sensors ON.
- Parameters:
- move_command = False¶
Whether this command produces a positioner move.
- safe = True¶
Whether it’s safe to execute this command when the FPS is locked.
- replies: List[Reply]¶
A list of messages with the responses to this command.
- class jaeger.commands.calibration.HallOff(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
CommandTurns hall sensors ON.
- Parameters:
- move_command = False¶
Whether this command produces a positioner move.
- safe = True¶
Whether it’s safe to execute this command when the FPS is locked.
- replies: List[Reply]¶
A list of messages with the responses to this command.