Command API¶
Base classes¶
- class jaeger.commands.base.Command(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
StatusMixIn
[CommandStatus
],Future
[Command]A command to be sent to the CAN controller.
Implements a base class to define CAN commands to interact with the positioner. Commands can be composed of single or multiple messages. When sending a command to the bus, the first message is written to, then asynchronously waits for a confirmation that the message has been received before sending the following message. If any of the messages returns an error code the command is failed.
Command
subclasses fromStatusMixIn
andstatus_callback
gets called when the status changes.Command
is aFuture
and must be awaited. TheFuture
is done whenfinish_command
is called, which happens when the status is marked done or cancelled or when the command timeouts.Commands sent to a single positioner are marked done when a reply is received from the same positioner for the given command, or when it times out. Broadcast commands only get marked done by timing out or manually.
- Parameters:
positioner_ids (int | List[int]) – The id or list of ids of the robot(s) to which this command will be sent. Use
positioner_ids=0
to broadcast to all robots.loop – The running event loop, or uses
get_event_loop
.timeout (float) – Time after which the command will be marked done when not all the positioners have replies. If
None
, the default timeout will be used. If timeout is a negative number, the command won’t timeout until all the positioners have replied.done_callback (Optional[Callable]) – A function to call when the command has been successfully completed.
n_positioners (Optional[int]) – If the command is a broadcast, the number of positioners that should reply. If defined, the command will be done once as many positioners have replied. Ignored for non-broadcasts.
data (Union[None, data_co, Dict[int, data_co]]) – The data to pass to the messages. If a list, each element will be sent to each positioner as a message. It can also be a dictionary of lists in which the key is the positioner to which to send the data.
ignore_unknown (bool) – Ignores
UNKNOWN_COMMAND
replies from positioners that do now support this command.
- get_messages(data=None)[source]¶
Returns the list of messages associated with this command.
Unless overridden, returns a single message with the associated data.
- get_replies()[source]¶
Returns the formatted replies as a dictionary.
The values returned will depend on the specific command.
- status_callback()[source]¶
Callback for change status.
When the status gets set to
CommandStatus.RUNNING
starts a timer that marks the command as done aftertimeout
.
- bootloader = False¶
Whether the command is safe to be issues in bootloader mode.
- move_command = False¶
Whether this command produces a positioner move.
- property name¶
Returns the name of this command.
- replies: List[Reply]¶
A list of messages with the responses to this command.
- safe = False¶
Whether it’s safe to execute this command when the FPS is locked.
- class jaeger.commands.base.SuperMessage(command, data=bytearray(b''), positioner_id=0, uid=0, response_code=0, extended_id=True)[source]¶
Bases:
Message
An extended CAN
Message
class.Expands the
Message
class to handle custom arbitration IDs for extended frames.- Parameters:
command (Command) – The command associated with this message.
data (bytearray) – Payload to pass to
Message
.positioner_id (int) – The positioner to which the message will be sent (0 for broadcast).
uid (int) – The unique identifier for this message.
extended_id (bool) – Whether the id is an 11 bit (False) or 29 bit (True) address.
response_code (int)
List of commands¶
- class jaeger.commands.CommandID(value)[source]¶
Bases:
IntEnum
IDs associated with commands.
- ALPHA_CLOSED_LOOP_COLLISION_DETECTION = 118¶
- ALPHA_CLOSED_LOOP_WITHOUT_COLLISION_DETECTION = 119¶
- ALPHA_OPEN_LOOP_COLLISION_DETECTION = 120¶
- ALPHA_OPEN_LOOP_WITHOUT_COLLISION_DETECTION = 121¶
- BETA_CLOSED_LOOP_COLLISION_DETECTION = 122¶
- BETA_CLOSED_LOOP_WITHOUT_COLLISION_DETECTION = 123¶
- BETA_OPEN_LOOP_COLLISION_DETECTION = 124¶
- BETA_OPEN_LOOP_WITHOUT_COLLISION_DETECTION = 125¶
- COLLISION_DETECTED = 18¶
- GET_ACTUAL_POSITION = 32¶
- GET_ALPHA_HALL_CALIB = 104¶
- GET_BETA_HALL_CALIB = 105¶
- GET_CURRENT = 56¶
- GET_FIRMWARE_VERSION = 2¶
- GET_HALL_CALIB_ERROR = 45¶
- GET_HOLDING_CURRENT = 113¶
- GET_ID = 1¶
- GET_NUMBER_TRAJECTORIES = 139¶
- GET_OFFSETS = 34¶
- GET_RAW_TEMPERATURE = 132¶
- GET_STATUS = 3¶
- GO_TO_ABSOLUTE_POSITION = 30¶
- GO_TO_DATUMS = 20¶
- GO_TO_DATUM_ALPHA = 21¶
- GO_TO_DATUM_BETA = 22¶
- GO_TO_RELATIVE_POSITION = 31¶
- HALL_OFF = 117¶
- HALL_ON = 116¶
- SAVE_INTERNAL_CALIBRATION = 53¶
- SEND_FIRMWARE_DATA = 201¶
- SEND_NEW_TRAJECTORY = 10¶
- SEND_TRAJECTORY_ABORT = 13¶
- SEND_TRAJECTORY_DATA = 11¶
- SET_ACTUAL_POSITION = 33¶
- SET_CURRENT = 41¶
- SET_HOLDING_CURRENT = 112¶
- SET_INCREASE_COLLISION_MARGIN = 111¶
- SET_NUMBER_TRAJECTORIES = 140¶
- SET_OFFSETS = 35¶
- SET_SPEED = 40¶
- START_COGGING_CALIBRATION = 47¶
- START_COGGING_CALIBRATION_ALPHA = 48¶
- START_COGGING_CALIBRATION_BETA = 49¶
- START_DATUM_CALIBRATION = 23¶
- START_DATUM_CALIBRATION_ALPHA = 24¶
- START_DATUM_CALIBRATION_BETA = 25¶
- START_FIRMWARE_UPGRADE = 200¶
- START_MOTOR_CALIBRATION = 26¶
- START_MOTOR_CALIBRATION_ALPHA = 26¶
- START_MOTOR_CALIBRATION_BETA = 27¶
- START_TRAJECTORY = 14¶
- STOP_TRAJECTORY = 15¶
- SWITCH_LED_OFF = 127¶
- SWITCH_LED_ON = 126¶
- SWITCH_OFF_PRECISE_MOVE_ALPHA = 129¶
- SWITCH_OFF_PRECISE_MOVE_BETA = 131¶
- SWITCH_ON_PRECISE_MOVE_ALPHA = 128¶
- SWITCH_ON_PRECISE_MOVE_BETA = 130¶
- TRAJECTORY_DATA_END = 12¶
- class jaeger.commands.GetID(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
Command
Commands the positioners to reply with their positioner id.
- Parameters:
- bootloader = True¶
Whether the command is safe to be issues in bootloader mode.
- safe = True¶
Whether it’s safe to execute this command when the FPS is locked.
- class jaeger.commands.GetFirmwareVersion(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
Command
- Parameters:
- get_firmware(positioner_id=None)[source]¶
Returns the firmware version string.
- Parameters:
positioner_id (int) – The positioner for which to return the version. If
None
returns a dictionary with the firmware version of all the positioners that replied.- Returns:
firmware – A string or dictionary of string with the firmware version(s), with the format
'XX.YY.ZZ'
whereYY='80'
if the positioner is in bootloader mode.- Raises:
ValueError – If no positioner with
positioner_id
has replied.- Return type:
- get_replies()[source]¶
Returns the formatted replies as a dictionary.
The values returned will depend on the specific command.
- bootloader = True¶
Whether the command is safe to be issues in bootloader mode.
- safe = True¶
Whether it’s safe to execute this command when the FPS is locked.
- class jaeger.commands.GetStatus(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
Command
Gets the status bits for the positioner.
- Parameters:
- get_replies()[source]¶
Returns the formatted replies as a dictionary.
The values returned will depend on the specific command.
- bootloader = True¶
Whether the command is safe to be issues in bootloader mode.
- safe = True¶
Whether it’s safe to execute this command when the FPS is locked.
- class jaeger.commands.GotoAbsolutePosition(positioner_ids, alpha=None, beta=None, **kwargs)[source]¶
Bases:
Command
Moves alpha and beta to absolute positions in degrees.
- get_move_time()[source]¶
Returns the time needed to move to the commanded position.
- Raises:
ValueError – If no reply has been received or the data cannot be parsed.
- get_replies()[source]¶
Returns the formatted replies as a dictionary.
The values returned will depend on the specific command.
- move_command = True¶
Whether this command produces a positioner move.
- class jaeger.commands.GotoRelativePosition(positioner_ids, alpha=None, beta=None, **kwargs)[source]¶
Bases:
GotoAbsolutePosition
Moves alpha and beta a relative number of degrees.
- move_command = True¶
Whether this command produces a positioner move.
- class jaeger.commands.GetActualPosition(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
Command
Gets the current position of the alpha and beta arms.
- Parameters:
- get_replies()[source]¶
Returns the formatted replies as a dictionary.
The values returned will depend on the specific command.
- safe = True¶
Whether it’s safe to execute this command when the FPS is locked.
- class jaeger.commands.SetActualPosition(positioner_ids, alpha=None, beta=None, **kwargs)[source]¶
Bases:
Command
Sets the current position of the alpha and beta arms.
- move_command = True¶
Whether this command produces a positioner move.
- safe = True¶
Whether it’s safe to execute this command when the FPS is locked.
- class jaeger.commands.SetSpeed(positioner_ids, alpha=None, beta=None, **kwargs)[source]¶
Bases:
Command
Sets the speeds of the alpha and beta motors.
- move_command = False¶
Whether this command produces a positioner move.
- safe = True¶
Whether it’s safe to execute this command when the FPS is locked.
- class jaeger.commands.SetCurrent(positioner_ids, alpha=None, beta=None, **kwargs)[source]¶
Bases:
Command
Sets the current of the alpha and beta motors.
- move_command = True¶
Whether this command produces a positioner move.
- safe = True¶
Whether it’s safe to execute this command when the FPS is locked.
- class jaeger.commands.goto(fps, new_positions, speed=None, relative=False, use_sync_line=None, go_cowboy=False, force=False, command=None)[source]¶
Bases:
Send positioners to a given position using a trajectory with
kaiju
check.- Parameters:
new_positions (dict[int, tuple[float, float]]) – The new positions as a dictionary of positioner ID to a tuple of new alpha and beta angles. Positioners not specified will be kept on the same positions.
speed (Optional[float]) – The speed to use.
relative (bool) – If
True
,alpha
andbeta
are considered relative angles.use_sync_line (bool | None) – Whether to use the SYNC line to start the trajectories.
go_cowboy (bool) – If set, does not create a
kaiju
-safe trajectory. Use at your own risk.force (bool) – If
go_cowboy=False
and the trajectory is deadlocked, aTrajectoryError
will be raised. Useforce=True
to apply the trajectory anyway.command (CluCommand[JaegerActor] | None) – A command to pass to
send_trajectory
to output additional information.
Bootloader commands¶
- class jaeger.commands.StartFirmwareUpgrade(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
Command
- Parameters:
- bootloader = True¶
Whether the command is safe to be issues in bootloader mode.
- safe = True¶
Whether it’s safe to execute this command when the FPS is locked.
- class jaeger.commands.SendFirmwareData(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
Command
- Parameters:
- bootloader = True¶
Whether the command is safe to be issues in bootloader mode.
- safe = True¶
Whether it’s safe to execute this command when the FPS is locked.
- async jaeger.commands.load_firmware(fps, firmware_file, positioners=None, messages_per_positioner=None, force=False, show_progressbar=False, progress_callback=None, stop_logging=True)[source]¶
Convenience function to run through the steps of loading a new firmware.
This function is a coroutine and not intendend for direct use. Use the
jaeger
CLI instead.- Parameters:
fps (FPS) –
FPS
instance to which the commands will be sent.firmware_file (str | pathlib.Path) – Binary file containing the firmware to load.
positioners (Optional[List[int]]) – A list of positioner ids whose firmware to update, or
None
to update all the positioners infps
.messages_per_positioner (Optional[int]) – How many messages to send to each positioner at once. This can improve the performance but also overflow the CAN bus buffer. With the default value of
None
, reverts to the configuration valuepositioner.firmware_messages_per_positioner
.force (bool) – Forces the firmware load to continue even if some positioners are not responding or are not in bootloader mode.
show_progressbar (bool) – Whether to show a progress bar.
progress_callback (Optional[Callable[[int, int], Any]]) – A function to call as data gets transferred to the positioners. The callback is called with
(current_chunk, n_chuck)
wherecurrent_chunk
is the number of the data chunk being sent andn_chunk
is the total number of chunks in the data package.stop_logging (bool) – Disable logging to file for the CAN logger to improve performance.
Trajectory commands¶
- class jaeger.commands.Trajectory(fps, trajectories, dump=True, extra_dump_data={})[source]¶
Bases:
object
Prepares and sends a trajectory to the FPS.
Most user will prefer using
FPS.send_trajectory
, which automates the process. This class provides fine-grain control over the process.- Parameters:
fps (FPS) – The instance of
FPS
that will receive the trajectory.trajectories (str | pathlib.Path | TrajectoryDataType) – Either a path to a YAML file to read or a dictionary with the trajectories. In either case the format must be a dictionary in which the keys are the
positioner_ids
and each value is a dictionary containing two keys:alpha
andbeta
, each pointing to a list of tuples(position, time)
, whereposition
is in degrees andtime
is in seconds.dump (bool | str) – Whether to dump a JSON with the trajectory to disk. If
True
, the trajectory is stored in/data/logs/jaeger/trajectories/<MJD>
with a unique sequence for each trajectory. A string can be passed with a custom path. The dump file is created only ifTrajectory.start
is called, regardless of whether the trajectory succeeds.extra_dump_data (dict[str, Any]) – A dictionary with additional parameters to add to the dump JSON.
- Raises:
TrajectoryError – If encounters a problem sending the trajectory.
FPSLockedError – If the FPS is locked.
Examples
Given the following two-point trajectory for positioner 4
points = {4: {'alpha': [(90, 0), (91, 3)], 'beta': [(20, 0), (23, 4)]}}
the normal process to execute the trajectory is
trajectory = Trajectory(fps, points) await trajectory.send() # Sends the trajectory but does not yet execute it. await trajectory.start() # This starts the trajectory (positioners move).
- dump_trajectory(path=None)[source]¶
Dumps the trajectory to a JSON file.
- Parameters:
path (str | None)
- n_points¶
Number of points sent to each positioner as a tuple
(alpha, beta)
.
- async jaeger.commands.send_trajectory(fps, trajectories, use_sync_line=None, send_trajectory=True, start_trajectory=True, command=None, dump=True, extra_dump_data={})[source]¶
Sends a set of trajectories to the positioners.
This is a low-level function that raises errors when a problem is encountered. Most users should use
FPS.send_trajectory
instead.send_trajectory
automatesTrajectory
by calling the different methods in order, but provides less control.- Parameters:
fps (FPS) – The instance of
FPS
that will receive the trajectory.trajectories (str | pathlib.Path | TrajectoryDataType) – Either a path to a YAML file to read or a dictionary with the trajectories. In either case the format must be a dictionary in which the keys are the
positioner_ids
and each value is a dictionary containing two keys:alpha
andbeta
, each pointing to a list of tuples(position, time)
, whereposition
is in degrees andtime
is in seconds.use_sync_line (bool | None) – If
True
, the SYNC line will be used to synchronise the beginning of all trajectories. Otherwise aSTART_TRAJECTORY
command will be sent over the CAN network. IfNone
, defaults to the configuration parameter.send_trajectory (bool) – If
True
, sends the trajectory to the positioners and returns theTrajectory
instance.start_trajectory (bool) – If
True
, runs the trajectory after sending it. Otherwise, returns theTrajectory
instance after sending the data. Ignored ifsend_trajectory=False
.command (Optional[CluCommand[JaegerActor]]) – A command to which to output informational messages.
dump (bool | str) – Whether to dump a JSON with the trajectory to disk. If
True
, the trajectory is stored in/data/logs/jaeger/trajectories/<MJD>
with a unique sequence for each trajectory. A string can be passed with a custom path. The dump file is created only ifTrajectory.start
is called, regardless of whether the trajectory succeeds.extra_dump_data (dict[str, Any]) – A dictionary with additional parameters to add to the dump JSON.
- Raises:
TrajectoryError – If encounters a problem sending the trajectory.
FPSLockedError – If the FPS is locked.
- Return type:
Examples
>>> fps = FPS() >>> await fps.initialise() # Send a trajectory with two points for positioner 4 >>> await fps.send_trajectory({4: {'alpha': [(90, 0), (91, 3)], 'beta': [(20, 0), (23, 4)]}})
- class jaeger.commands.SendNewTrajectory(positioner_ids, n_alpha=None, n_beta=None, **kwargs)[source]¶
Bases:
Command
Starts a new trajectory and sends the number of points.
- static get_data(n_alpha, n_beta)[source]¶
Returns the data bytearray from a pair for
(n_alpha, n_beta)
.
- move_command = True¶
Whether this command produces a positioner move.
- class jaeger.commands.SendTrajectoryData(positioner_ids, positions=None, **kwargs)[source]¶
Bases:
Command
Sends a data point in the trajectory.
This command sends multiple messages that represent a full trajectory.
- Parameters:
positions (list) – A list of tuples in which the first element is the angle, in degrees, and the second is the associated time, in seconds.
Examples
>>> SendTrajectoryData([(90, 10), (88, 12), (80, 20)])
- move_command = True¶
Whether this command produces a positioner move.
- class jaeger.commands.TrajectoryDataEnd(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
Command
Indicates that the transmission for the trajectory has ended.
- Parameters:
- move_command = True¶
Whether this command produces a positioner move.
- class jaeger.commands.SendTrajectoryAbort(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
Command
Aborts sending a trajectory.
- Parameters:
- move_command = False¶
Whether this command produces a positioner move.
- safe = True¶
Whether it’s safe to execute this command when the FPS is locked.
- class jaeger.commands.StartTrajectory(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
Command
Starts the trajectories.
- Parameters:
- move_command = True¶
Whether this command produces a positioner move.
- safe = True¶
Whether it’s safe to execute this command when the FPS is locked.
Calibration commands¶
- async jaeger.commands.calibration.calibrate_positioners(fps, axis, positioner_ids, motors=True, datums=True, cogging=True)[source]¶
Runs the calibration process and saves it to the internal memory.
- Parameters:
fps (FPS) – The instance of
FPS
that will receive the trajectory.axis (str) – The axis to calibrate, either
"alpha"
,"beta"
, or"both"
.positioner_id – The ID of the positioner(s) to calibrate.
motors (bool) – Whether to perform the motor calibration.
datums (bool) – Whether to perform the datums calibration.
cogging (bool) – Whether to perform the cogging calibration (may take more than one hour).
- Raises:
JaegerError – If encounters a problem during the process.
Examples
>>> fps = FPS() >>> await fps.initialise() # Calibrate positioner 31. >>> await calibrate_positioner(fps, 31)
- class jaeger.commands.calibration.StartDatumCalibration(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
Command
Indicates that the transmission for the trajectory has ended.
- Parameters:
- move_command = True¶
Whether this command produces a positioner move.
- replies: List[Reply]¶
A list of messages with the responses to this command.
- class jaeger.commands.calibration.StartMotorCalibration(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
Command
Aborts sending a trajectory.
- Parameters:
- move_command = True¶
Whether this command produces a positioner move.
- replies: List[Reply]¶
A list of messages with the responses to this command.
- class jaeger.commands.calibration.StartCoggingCalibration(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
Command
Starts the trajectories.
- Parameters:
- move_command = True¶
Whether this command produces a positioner move.
- replies: List[Reply]¶
A list of messages with the responses to this command.
- class jaeger.commands.calibration.SaveInternalCalibration(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
Command
Stop the trajectories.
- Parameters:
- move_command = False¶
Whether this command produces a positioner move.
- replies: List[Reply]¶
A list of messages with the responses to this command.
- class jaeger.commands.calibration.HallOn(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
Command
Turns hall sensors ON.
- Parameters:
- move_command = False¶
Whether this command produces a positioner move.
- safe = True¶
Whether it’s safe to execute this command when the FPS is locked.
- replies: List[Reply]¶
A list of messages with the responses to this command.
- class jaeger.commands.calibration.HallOff(positioner_ids, timeout=None, done_callback=None, n_positioners=None, data=None, ignore_unknown=True)[source]¶
Bases:
Command
Turns hall sensors ON.
- Parameters:
- move_command = False¶
Whether this command produces a positioner move.
- safe = True¶
Whether it’s safe to execute this command when the FPS is locked.
- replies: List[Reply]¶
A list of messages with the responses to this command.