Command API

Base classes

class jaeger.commands.base.Command(positioner_id, loop=None, timeout=None, done_callback=None, n_positioners=None, data=None)[source]

Bases: jaeger.utils.helpers.StatusMixIn[jaeger.maskbits.CommandStatus], _asyncio.Future

A command to be sent to the CAN controller.

Implements a base class to define CAN commands to interact with the positioner. Commands can be composed of single or multiple messages. When sending a command to the bus, the first message is written to, then asynchronously waits for a confirmation that the message has been received before sending the following message. If any of the messages returns an error code the command is failed.

Command subclasses from StatusMixIn and status_callback gets called when the status changes.

Command is a Future and must be awaited. The Future is done when finish_command is called, which happens when the status is marked done or cancelled or when the command timeouts.

Commands sent to a single positioner are marked done when a reply is received from the same positioner for the given command, or when it times out. Broadcast commands only get marked done by timing out or manually.

Parameters
  • positioner_id – The id or list of ids of the robot(s) to which this command will be sent. Use positioner_id=0 to broadcast to all robots.

  • loop – The running event loop, or uses get_event_loop.

  • timeout – Time after which the command will be marked done. Note that if the command is not a broadcast and it receives replies to each one of the messages it sends, the command will be marked done and the timer cancelled. If negative, the command runs forever or until all the replies have been received.

  • done_callback – A function to call when the command has been successfully completed.

  • n_positioners – If the command is a broadcast, the number of positioners that should reply. If defined, the command will be done once as many positioners have replied. Otherwise it waits for the command to time out.

  • data – The data to pass to the messages. It must be a list in which each element is the payload for a message. As many messages as data elements will be sent. If None, a single message without payload will be sent.

cancel(silent=False, msg=None)[source]

Cancels a command, stopping the reply queue watcher.

finish_command(status, silent=False)[source]

Cancels the queue watcher and removes the running command.

Parameters
get_messages(data=None)[source]

Returns the list of messages associated with this command.

Unless overridden, returns a single message with the associated data.

get_reply_for_positioner(positioner_id)[source]

Returns the reply for a given positioner_id.

In principle this method is only useful when the command is sent in broadcast mode and receives replies from multiples positioners.

process_reply(reply_message)[source]

Watches the reply queue.

status_callback()[source]

Callback for change status.

When the status gets set to CommandStatus.RUNNING starts a timer that marks the command as done after timeout.

bootloader = False

Whether the command is safe to be issues in bootloader mode.

broadcastable: bool = False

Whether the command can be broadcast to all robots.

command_id: jaeger.commands.CommandID

The id of the command.

data

The data payload for the messages to send.

property is_broadcast

Returns True if the command is a broadcast.

move_command = False

Whether this command produces a positioner move.

property name

Returns the name of this command.

replies: List[jaeger.commands.base.Reply]

A list of messages with the responses to this command.

safe = False

Whether it’s safe to execute this command when the FPS is locked.

timeout = 5

The default timeout for this command.

class jaeger.commands.base.Message(command, data=bytearray(b''), positioner_id=0, uid=0, response_code=0, extended_id=True)[source]

Bases: can.message.Message

An extended can.Message class.

Expands the can.Message class to handle custom arbitration IDs for extended frames.

Parameters
  • command (Command) – The command associated with this message.

  • data (bytearray) – Payload to pass to can.Message.

  • positioner_id (int) – The positioner to which the message will be sent (0 for broadcast).

  • uid (int) – The unique identifier for this message.

  • extended_id (bool) – Whether the id is an 11 bit (False) or 29 bit (True) address.

  • response_code (int) –

List of commands

class jaeger.commands.CommandID(value)[source]

Bases: enum.IntEnum

IDs associated with commands.

Parameters

value (Union[str, int]) –

get_command_class()[source]

Returns the class associated with this command.

ALPHA_CLOSED_LOOP_COLLISION_DETECTION = 118
ALPHA_CLOSED_LOOP_WITHOUT_COLLISION_DETECTION = 119
ALPHA_OPEN_LOOP_COLLISION_DETECTION = 120
ALPHA_OPEN_LOOP_WITHOUT_COLLISION_DETECTION = 121
BETA_CLOSED_LOOP_COLLISION_DETECTION = 122
BETA_CLOSED_LOOP_WITHOUT_COLLISION_DETECTION = 123
BETA_OPEN_LOOP_COLLISION_DETECTION = 124
BETA_OPEN_LOOP_WITHOUT_COLLISION_DETECTION = 125
COLLISION_DETECTED = 18
GET_ACTUAL_POSITION = 32
GET_CURRENT = 56
GET_FIRMWARE_VERSION = 2
GET_HOLDING_CURRENT = 113
GET_ID = 1
GET_OFFSETS = 34
GET_RAW_TEMPERATURE = 132
GET_STATUS = 3
GO_TO_ABSOLUTE_POSITION = 30
GO_TO_DATUMS = 20
GO_TO_DATUM_ALPHA = 21
GO_TO_DATUM_BETA = 22
GO_TO_RELATIVE_POSITION = 31
HALL_OFF = 117
HALL_ON = 116
SAVE_INTERNAL_CALIBRATION = 53
SEND_FIRMWARE_DATA = 201
SEND_NEW_TRAJECTORY = 10
SEND_TRAJECTORY_DATA = 11
SET_ACTUAL_POSITION = 33
SET_CURRENT = 41
SET_HOLDING_CURRENT = 112
SET_OFFSETS = 35
SET_SPEED = 40
START_COGGING_CALIBRATION = 47
START_DATUM_CALIBRATION = 23
START_FIRMWARE_UPGRADE = 200
START_MOTOR_CALIBRATION = 26
START_TRAJECTORY = 14
STOP_TRAJECTORY = 15
SWITCH_OFF_PRECISE_MOVE_ALPHA = 129
SWITCH_OFF_PRECISE_MOVE_BETA = 131
SWITCH_ON_PRECISE_MOVE_ALPHA = 128
SWITCH_ON_PRECISE_MOVE_BETA = 130
TRAJECTORY_DATA_END = 12
TRAJECTORY_TRANSMISSION_ABORT = 13
class jaeger.commands.GetID(positioner_id, loop=None, timeout=None, done_callback=None, n_positioners=None, data=None)[source]

Bases: jaeger.utils.helpers.StatusMixIn[jaeger.maskbits.CommandStatus], _asyncio.Future

Commands the positioners to reply with their positioner id.

get_ids()[source]

Returns a list of positioners that replied back.

bootloader = True

Whether the command is safe to be issues in bootloader mode.

broadcastable: bool = True

Whether the command can be broadcast to all robots.

command_id: CommandID = 1

The id of the command.

replies: List[Reply]

A list of messages with the responses to this command.

safe = True

Whether it’s safe to execute this command when the FPS is locked.

timeout = 1

The default timeout for this command.

class jaeger.commands.GetFirmwareVersion(positioner_id, loop=None, timeout=None, done_callback=None, n_positioners=None, data=None)[source]

Bases: jaeger.utils.helpers.StatusMixIn[jaeger.maskbits.CommandStatus], _asyncio.Future

static encode(firmware)[source]

Returns the bytearray encoding the firmware version.

get_firmware(positioner_id=None)[source]

Returns the firmware version string.

Parameters

positioner_id (int) – The positioner for which to return the version. This parameter is ignored unless the command is a broadcast. If None and the command is a broadcast, returns a list with the firmware version of all the positioners, in the order of GetFirmwareVersion.replies.

Returns

firmware (str or list) – A string or list of string with the firmware version(s), with the format 'XX.YY.ZZ' where YY='80' if the positioner is in bootloader mode.

Raises

ValueError – If no positioner with positioner_id has replied.

bootloader = True

Whether the command is safe to be issues in bootloader mode.

broadcastable: bool = True

Whether the command can be broadcast to all robots.

command_id: CommandID = 2

The id of the command.

replies: List[Reply]

A list of messages with the responses to this command.

safe = True

Whether it’s safe to execute this command when the FPS is locked.

class jaeger.commands.GetStatus(positioner_id, loop=None, timeout=None, done_callback=None, n_positioners=None, data=None)[source]

Bases: jaeger.utils.helpers.StatusMixIn[jaeger.maskbits.CommandStatus], _asyncio.Future

Gets the status bits for the positioner.

get_positioner_status()[source]

Returns the positioner status flag for each reply.

bootloader = True

Whether the command is safe to be issues in bootloader mode.

broadcastable: bool = True

Whether the command can be broadcast to all robots.

command_id: CommandID = 3

The id of the command.

replies: List[Reply]

A list of messages with the responses to this command.

safe = True

Whether it’s safe to execute this command when the FPS is locked.

class jaeger.commands.GotoAbsolutePosition(alpha=0.0, beta=0.0, **kwargs)[source]

Bases: jaeger.utils.helpers.StatusMixIn[jaeger.maskbits.CommandStatus], _asyncio.Future

Moves alpha and beta to absolute positions in degrees.

static decode(data)[source]

Decodes message data into alpha and beta moves.

get_move_time()[source]

Returns the time needed to move to the commanded position.

Raises

ValueError – If no reply has been received or the data cannot be parsed.

broadcastable: bool = False

Whether the command can be broadcast to all robots.

command_id: CommandID = 30

The id of the command.

move_command = True

Whether this command produces a positioner move.

replies: List[Reply]

A list of messages with the responses to this command.

class jaeger.commands.GotoRelativePosition(alpha=0.0, beta=0.0, **kwargs)[source]

Bases: jaeger.utils.helpers.StatusMixIn[jaeger.maskbits.CommandStatus], _asyncio.Future

Moves alpha and beta a relative number of degrees.

broadcastable: bool = False

Whether the command can be broadcast to all robots.

command_id: CommandID = 31

The id of the command.

move_command = True

Whether this command produces a positioner move.

replies: List[Reply]

A list of messages with the responses to this command.

class jaeger.commands.GetActualPosition(positioner_id, loop=None, timeout=None, done_callback=None, n_positioners=None, data=None)[source]

Bases: jaeger.utils.helpers.StatusMixIn[jaeger.maskbits.CommandStatus], _asyncio.Future

Gets the current position of the alpha and beta arms.

static encode(alpha, beta)[source]

Returns the position as a bytearray in positioner units.

get_positions()[source]

Returns the positions of alpha and beta in degrees.

Raises

ValueError – If no reply has been received or the data cannot be parsed.

broadcastable: bool = False

Whether the command can be broadcast to all robots.

command_id: CommandID = 32

The id of the command.

replies: List[Reply]

A list of messages with the responses to this command.

safe = True

Whether it’s safe to execute this command when the FPS is locked.

class jaeger.commands.SetActualPosition(alpha=0.0, beta=0.0, **kwargs)[source]

Bases: jaeger.utils.helpers.StatusMixIn[jaeger.maskbits.CommandStatus], _asyncio.Future

Sets the current position of the alpha and beta arms.

broadcastable: bool = False

Whether the command can be broadcast to all robots.

command_id: CommandID = 33

The id of the command.

move_command = True

Whether this command produces a positioner move.

replies: List[Reply]

A list of messages with the responses to this command.

safe = True

Whether it’s safe to execute this command when the FPS is locked.

class jaeger.commands.SetSpeed(alpha=0, beta=0, **kwargs)[source]

Bases: jaeger.utils.helpers.StatusMixIn[jaeger.maskbits.CommandStatus], _asyncio.Future

Sets the speeds of the alpha and beta motors.

static encode(alpha, beta)[source]

Encodes the alpha and beta speed as bytes.

broadcastable: bool = False

Whether the command can be broadcast to all robots.

command_id: CommandID = 40

The id of the command.

move_command = False

Whether this command produces a positioner move.

replies: List[Reply]

A list of messages with the responses to this command.

safe = True

Whether it’s safe to execute this command when the FPS is locked.

class jaeger.commands.SetCurrent(alpha=0, beta=0, **kwargs)[source]

Bases: jaeger.utils.helpers.StatusMixIn[jaeger.maskbits.CommandStatus], _asyncio.Future

Sets the current of the alpha and beta motors.

broadcastable: bool = False

Whether the command can be broadcast to all robots.

command_id: CommandID = 41

The id of the command.

move_command = True

Whether this command produces a positioner move.

replies: List[Reply]

A list of messages with the responses to this command.

safe = True

Whether it’s safe to execute this command when the FPS is locked.

Bootloader commands

class jaeger.commands.StartFirmwareUpgrade(positioner_id, loop=None, timeout=None, done_callback=None, n_positioners=None, data=None)[source]

Bases: jaeger.utils.helpers.StatusMixIn[jaeger.maskbits.CommandStatus], _asyncio.Future

bootloader = True

Whether the command is safe to be issues in bootloader mode.

broadcastable: bool = False

Whether the command can be broadcast to all robots.

command_id: CommandID = 200

The id of the command.

replies: List[Reply]

A list of messages with the responses to this command.

safe = True

Whether it’s safe to execute this command when the FPS is locked.

class jaeger.commands.SendFirmwareData(positioner_id, loop=None, timeout=None, done_callback=None, n_positioners=None, data=None)[source]

Bases: jaeger.utils.helpers.StatusMixIn[jaeger.maskbits.CommandStatus], _asyncio.Future

bootloader = True

Whether the command is safe to be issues in bootloader mode.

broadcastable: bool = False

Whether the command can be broadcast to all robots.

command_id: CommandID = 201

The id of the command.

replies: List[Reply]

A list of messages with the responses to this command.

safe = True

Whether it’s safe to execute this command when the FPS is locked.

async jaeger.commands.load_firmware(fps, firmware_file, positioners=None, messages_per_positioner=None, force=False, show_progressbar=False, progress_callback=None, stop_logging=True)[source]

Convenience function to run through the steps of loading a new firmware.

This function is a coroutine and not intendend for direct use. Use the jaeger CLI instead.

Parameters
  • fps (FPS) – FPS instance to which the commands will be sent.

  • firmware_file (str | pathlib.Path) – Binary file containing the firmware to load.

  • positioners (Optional[List[int]]) – A list of positioner ids whose firmware to update, or None to update all the positioners in fps.

  • messages_per_positioner (Optional[int]) – How many messages to send to each positioner at once. This can improve the performance but also overflow the CAN bus buffer. With the default value of None, reverts to the configuration value positioner.firmware_messages_per_positioner.

  • force (bool) – Forces the firmware load to continue even if some positioners are not responding or are not in bootloader mode.

  • show_progressbar (bool) – Whether to show a progress bar.

  • progress_callback (Optional[Callable[[int, int], Any]]) – A function to call as data gets transferred to the positioners. The callback is called with (current_chunk, n_chuck) where current_chunk is the number of the data chunk being sent and n_chunk is the total number of chunks in the data package.

  • stop_logging (bool) – Disable logging to file for the CAN logger to improve performance.

Trajectory commands

class jaeger.commands.Trajectory(fps, trajectories)[source]

Bases: object

Prepares and sends a trajectory to the FPS.

Most user will prefer using FPS.send_trajectory, which automates the process. This class provides fine-grain control over the process.

Parameters
  • fps (FPS) – The instance of FPS that will receive the trajectory.

  • trajectories (str or dict) – Either a path to a YAML file to read or a dictionary with the trajectories. In either case the format must be a dictionary in which the keys are the positioner_ids and each value is a dictionary containing two keys: alpha and beta, each pointing to a list of tuples (position, time), where position is in degrees and time is in seconds.

Raises
  • TrajectoryError – If encounters a problem sending the trajectory.

  • FPSLockedError – If the FPS is locked.

Examples

Given the following two-point trajectory for positioner 4

points = {4: {'alpha': [(90, 0), (91, 3)],
            'beta': [(20, 0), (23, 4)]}}

the normal process to execute the trajectory is

trajectory = Trajectory(fps, points)
await trajectory.send()    # Sends the trajectory but does not yet execute it.
await trajectory.start()   # This starts the trajectory (positioners move).
async abort_trajectory()[source]

Aborts the trajectory transmission.

async send()[source]

Sends the trajectory but does not start it.

async start(use_sync_line=True)[source]

Starts the trajectory.

validate()[source]

Validates the trajectory.

data_send_time

How long it took to send the trajectory.

move_time

The time required to complete the trajectory.

n_points

Number of points sent to each positioner as a tuple (alpha, beta).

async jaeger.commands.send_trajectory(fps, trajectories, use_sync_line=True)[source]

Sends a set of trajectories to the positioners.

This is a low-level function that raises errors when a problem is encountered. Most users should use FPS.send_trajectory instead. send_trajectory automates Trajectory by calling the different methods in order, but provides less control.

Parameters
  • fps (FPS) – The instance of FPS that will receive the trajectory.

  • trajectories (str | pathlib.Path | TrajectoryDataType) – Either a path to a YAML file to read or a dictionary with the trajectories. In either case the format must be a dictionary in which the keys are the positioner_ids and each value is a dictionary containing two keys: alpha and beta, each pointing to a list of tuples (position, time), where position is in degrees and time is in seconds.

  • use_sync_line – If True, the SYNC line will be used to synchronise the beginning of all trajectories. Otherwise a START_TRAJECTORY command will be sent over the CAN network.

Raises
  • TrajectoryError – If encounters a problem sending the trajectory.

  • FPSLockedError – If the FPS is locked.

Examples

>>> fps = FPS()
>>> await fps.initialise()

# Send a trajectory with two points for positioner 4
>>> await fps.send_trajectory({4: {'alpha': [(90, 0), (91, 3)],
                                   'beta': [(20, 0), (23, 4)]}})
class jaeger.commands.SendNewTrajectory(n_alpha, n_beta, **kwargs)[source]

Bases: jaeger.utils.helpers.StatusMixIn[jaeger.maskbits.CommandStatus], _asyncio.Future

Starts a new trajectory and sends the number of points.

broadcastable: bool = False

Whether the command can be broadcast to all robots.

command_id: CommandID = 10

The id of the command.

move_command = True

Whether this command produces a positioner move.

replies: List[Reply]

A list of messages with the responses to this command.

class jaeger.commands.SendTrajectoryData(positions, **kwargs)[source]

Bases: jaeger.utils.helpers.StatusMixIn[jaeger.maskbits.CommandStatus], _asyncio.Future

Sends a data point in the trajectory.

This command sends multiple messages that represent a full trajectory.

Parameters

positions (list) – A list of tuples in which the first element is the angle, in degrees, and the second is the associated time, in seconds.

Examples

>>> SendTrajectoryData([(90, 10), (88, 12), (80, 20)])
broadcastable: bool = False

Whether the command can be broadcast to all robots.

command_id: CommandID = 11

The id of the command.

move_command = True

Whether this command produces a positioner move.

replies: List[Reply]

A list of messages with the responses to this command.

class jaeger.commands.TrajectoryDataEnd(positioner_id, loop=None, timeout=None, done_callback=None, n_positioners=None, data=None)[source]

Bases: jaeger.utils.helpers.StatusMixIn[jaeger.maskbits.CommandStatus], _asyncio.Future

Indicates that the transmission for the trajectory has ended.

broadcastable: bool = False

Whether the command can be broadcast to all robots.

command_id: CommandID = 12

The id of the command.

move_command = True

Whether this command produces a positioner move.

replies: List[Reply]

A list of messages with the responses to this command.

class jaeger.commands.TrajectoryTransmissionAbort(positioner_id, loop=None, timeout=None, done_callback=None, n_positioners=None, data=None)[source]

Bases: jaeger.utils.helpers.StatusMixIn[jaeger.maskbits.CommandStatus], _asyncio.Future

Aborts sending a trajectory.

broadcastable: bool = False

Whether the command can be broadcast to all robots.

command_id: CommandID = 13

The id of the command.

move_command = False

Whether this command produces a positioner move.

replies: List[Reply]

A list of messages with the responses to this command.

safe = True

Whether it’s safe to execute this command when the FPS is locked.

class jaeger.commands.StartTrajectory(positioner_id, loop=None, timeout=None, done_callback=None, n_positioners=None, data=None)[source]

Bases: jaeger.utils.helpers.StatusMixIn[jaeger.maskbits.CommandStatus], _asyncio.Future

Starts the trajectories.

broadcastable: bool = True

Whether the command can be broadcast to all robots.

command_id: CommandID = 14

The id of the command.

move_command = True

Whether this command produces a positioner move.

replies: List[Reply]

A list of messages with the responses to this command.

class jaeger.commands.StopTrajectory(positioner_id, loop=None, timeout=None, done_callback=None, n_positioners=None, data=None)[source]

Bases: jaeger.utils.helpers.StatusMixIn[jaeger.maskbits.CommandStatus], _asyncio.Future

Stop the trajectories.

broadcastable: bool = True

Whether the command can be broadcast to all robots.

command_id: CommandID = 15

The id of the command.

replies: List[Reply]

A list of messages with the responses to this command.

safe = True

Whether it’s safe to execute this command when the FPS is locked.

Calibration commands

async jaeger.commands.calibration.calibrate_positioner(fps, positioner_id, motors=True, datums=True, cogging=True)[source]

Runs the calibration process and saves it to the internal memory.

Parameters
  • fps (FPS) – The instance of FPS that will receive the trajectory.

  • positioner_id (int) – The ID of the positioner to calibrate.

  • motors (bool) – Whether to perform the motor calibration.

  • datums (bool) – Whether to perform the datums calibration.

  • cogging (bool) – Whether to perform the cogging calibration (may take more than one hour).

Raises

JaegerError – If encounters a problem during the process.

Examples

>>> fps = FPS()
>>> await fps.initialise()

# Calibrate positioner 31.
>>> await calibrate_positioner(fps, 31)
class jaeger.commands.calibration.StartDatumCalibration(positioner_id, loop=None, timeout=None, done_callback=None, n_positioners=None, data=None)[source]

Bases: jaeger.utils.helpers.StatusMixIn[jaeger.maskbits.CommandStatus], _asyncio.Future

Indicates that the transmission for the trajectory has ended.

class jaeger.commands.calibration.StartMotorCalibration(positioner_id, loop=None, timeout=None, done_callback=None, n_positioners=None, data=None)[source]

Bases: jaeger.utils.helpers.StatusMixIn[jaeger.maskbits.CommandStatus], _asyncio.Future

Aborts sending a trajectory.

class jaeger.commands.calibration.StartCoggingCalibration(positioner_id, loop=None, timeout=None, done_callback=None, n_positioners=None, data=None)[source]

Bases: jaeger.utils.helpers.StatusMixIn[jaeger.maskbits.CommandStatus], _asyncio.Future

Starts the trajectories.

class jaeger.commands.calibration.SaveInternalCalibration(positioner_id, loop=None, timeout=None, done_callback=None, n_positioners=None, data=None)[source]

Bases: jaeger.utils.helpers.StatusMixIn[jaeger.maskbits.CommandStatus], _asyncio.Future

Stop the trajectories.

class jaeger.commands.calibration.HallOn(positioner_id, loop=None, timeout=None, done_callback=None, n_positioners=None, data=None)[source]

Bases: jaeger.utils.helpers.StatusMixIn[jaeger.maskbits.CommandStatus], _asyncio.Future

Turns hall sensors ON.

class jaeger.commands.calibration.HallOff(positioner_id, loop=None, timeout=None, done_callback=None, n_positioners=None, data=None)[source]

Bases: jaeger.utils.helpers.StatusMixIn[jaeger.maskbits.CommandStatus], _asyncio.Future

Turns hall sensors ON.