API Reference

jaeger.can

class jaeger.can.CANnetInterface(*args, **kwargs)[source]

Bases: jaeger.can.JaegerCAN[jaeger.interfaces.cannet.CANNetBus]

An interface class specifically for the CAN@net 200/420 device.

This class bahaves as JaegerCAN but allows communication with the device itself and tracks its status.

handle_device_message(msg: can.message.Message)[source]

Handles a reply from the device (i.e., not from the CAN network).

property device_status

Returns a dictionary with the status of the device.

class jaeger.can.JaegerCAN(interface_type: str, channels: list | tuple, *args, fps: Optional[jaeger.FPS] = None, **kwargs)[source]

Bases: Generic[jaeger.can.Bus_co]

A CAN interface with a command queue and reply handling.

Provides support for multi-channel CAN networks, with each channel being able to host more than one bus. In general, a new instance of JaegerCAN is create via the from_profile classmethod.

Parameters
  • interface_type – One of INTERFACES. Defines the python-can interface to use.

  • channels – A list of channels to be used to instantiate the interfaces.

  • fps – The focal plane system.

  • args – Arguments and keyword arguments to pass to the interfaces when initialising it (e.g., port, baudrate, etc).

  • kwargs – Arguments and keyword arguments to pass to the interfaces when initialising it (e.g., port, baudrate, etc).

Variables
  • command_queue (asyncio.Queue) – Queue of messages to be sent to the bus. The messages are sent as soon as the bus has finished processing any commands with the same command_id and positioner_id.

  • listener (JaegerReaderCallback) – A JaegerReaderCallback instance that runs a callback when a new message is received from the bus.

  • multibus (bool) – Whether the interfaces are multibus.

  • notifier (can.Notifier) – A can.Notifier instance that processes messages from the list of buses, asynchronously.

classmethod from_profile(profile: Optional[str] = None, **kwargs)jaeger.can.JaegerCAN[source]

Creates a new bus interface from a configuration profile.

Parameters

profile – The name of the profile that defines the bus interface, or None to use the default configuration.

static print_profiles()List[str][source]

Prints interface profiles and returns a list of profile names.

send_to_interfaces(message: jaeger.commands.base.Message, interfaces: Optional[List[Bus_co]] = None, bus: Optional[Any] = None)[source]

Sends the message to the appropriate interface and bus.

interfaces: List[Bus_co]

A list of python-can interfaces, one for each of the channels.

Type

list

running_commands

Currently running commands.

Type

list

class jaeger.can.JaegerReaderCallback(callback: Callable[[can.message.Message], Any], loop: Optional[asyncio.events.AbstractEventLoop] = None)[source]

Bases: can.listener.Listener

A message reader that triggers a callback on message received.

Parameters
  • callback – The function to run when a new message is received.

  • loop – If an asyncio event loop, the callback will be called with call_soon, otherwise it will be called immediately.

on_message_received(msg)[source]

Calls the callback with the received message.

jaeger.can.INTERFACES = {'cannet': {'class': <class 'jaeger.interfaces.cannet.CANNetBus'>, 'multibus': True}, 'slcan': {'class': <class 'can.interfaces.slcan.slcanBus'>, 'multibus': False}, 'socketcan': {'class': <class 'can.interfaces.socketcan.socketcan.SocketcanBus'>, 'multibus': False}, 'virtual': {'class': <class 'can.interfaces.virtual.VirtualBus'>, 'multibus': False}}

Accepted CAN interfaces and whether they are multibus.

jaeger.fps

class jaeger.fps.BaseFPS(layout: Optional[str | pathlib.Path] = None, positioner_class: Type[Any] = <class 'jaeger.positioner.Positioner'>)[source]

Bases: dict

A class describing the Focal Plane System.

This class includes methods to read the layout and construct positioner objects and can be used by the real FPS class or the VirtualFPS.

Parameters
  • layout – The path to the layout describing the position of the robots on the focal plane.

  • positioner_class – The class to be used to create a new positioner. In principle this will be Positioner but it may be different if the positioners are created for a VirtualFPS.

add_positioner(positioner_id, centre=(None, None))jaeger.positioner.Positioner[source]

Adds a new positioner to the list, and checks for duplicates.

property positioners

Dictionary of positioner associated with this FPS.

This is just a wrapper around the BaseFPS instance which is a dictionary itself. May be deprecated in the future.

class jaeger.fps.FPS(can: Optional[str] = None, layout: Optional[str] = None, can_profile: Optional[str] = None, ieb: Union[bool, jaeger.ieb.IEB, dict, None, str, pathlib.Path] = True, loop: Optional[asyncio.events.AbstractEventLoop] = None, engineering_mode: bool = False)[source]

Bases: jaeger.fps.BaseFPS

A class describing the Focal Plane System.

Parameters
  • can – The CAN bus to use.

  • layout – The file path to the layout describing the position of the robots on the focal plane. If None, the default layout will be used.

  • can_profile – The configuration profile for the CAN interface, or None to use the default one. Ignored if can is passed.

  • ieb – If True or None, connects the Instrument Electronics Box PLC controller using the path to the IEB configuration file stored in jaeger’s configuration. Can also be an IEB instance, the path to a custom configuration file used to load one, or a dictionary with the configuration itself.

  • loop – The asyncio event loop. If None, uses asyncio.get_event_loop to get a valid loop.

  • engineering_mode – If True, disables most safety checks to enable debugging. This may result in hardware damage so it must not be used lightly.

Examples

After instantiating a new FPS object it is necessary to call initialise to retrieve the positioner layout and the status of the connected positioners. Note that initialise is a coroutine which needs to be awaited

>>> fps = FPS(can_profile='default')
>>> await fps.initialise()
>>> fps.positioners[4].status
<Positioner (id=4, status='SYSTEM_INITIALIZED|
DISPLACEMENT_COMPLETED|ALPHA_DISPLACEMENT_COMPLETED|
BETA_DISPLACEMENT_COMPLETED')>
abort()[source]

Aborts trajectories and stops positioners.

async initialise(allow_unknown: bool = True, start_pollers: bool = True)jaeger.fps.FPS[source]

Initialises all positioners with status and firmware version.

Parameters
  • allow_unknown – If True, allows to add positioners that are connected but not in the layout.

  • start_pollers – Whether to initialise the pollers.

is_bootloader()[source]

Returns True if any positioner is in bootloader mode.

async lock(stop_trajectories: bool = True)[source]

Locks the FPS and prevents commands to be sent.

Parameters

stop_trajectories – Whether to stop trajectories when locking.

async report_status()Dict[str, Any][source]

Returns a dict with the position and status of each positioner.

send_command(command: str | int | CommandID | Command, positioner_id: int = 0, data: bytearray = bytearray(b''), interface: Optional[BusABC] = None, bus: Optional[int] = None, broadcast: bool = False, override: bool = False, safe: bool = False, synchronous: bool = False, **kwargs)Command[source]

Sends a command to the bus.

Parameters
  • command – The ID of the command, either as the integer value, a string, or the CommandID flag. Alternatively, the Command to send.

  • positioner_id – The positioner ID to command, or zero for broadcast.

  • data – The bytes to send.

  • interface – The index in the interface list for the interface to use. Only relevant in case of a multibus interface. If None, the positioner to bus map will be used.

  • bus – The bus within the interface to be used. Only relevant in case of a multibus interface. If None, the positioner to bus map will be used.

  • broadcast – If True, sends the command to all the buses.

  • override – If another instance of this command_id with the same positioner_id is running, cancels it and schedules this one immediately. Otherwise the command is queued until the first one finishes.

  • safe – Whether the command is safe to send to a locked FPS.

  • synchronous – If True, the command is sent to the CAN network immediately, skipping the command queue. No tracking is done for this command. It should only be used for shutdown commands.

  • kwargs – Extra arguments to be passed to the command.

Returns

command – The command sent to the bus. The command needs to be awaited before it is considered done.

async send_to_all(command: str | int | CommandID | Command, positioners: Optional[List[int]] = None, data: Optional[List[bytearray]] = None, **kwargs)List[Command][source]

Sends a command to multiple positioners and awaits completion.

Parameters
  • command – The ID of the command, either as the integer value, a string, or the CommandID flag. Alternatively, the Command to send.

  • positioners – The list of positioner_id of the positioners to command. If None, sends the command to all the positioners in the FPS that are not disabled.

  • data – The payload to send. If None, no payload is sent. If the value is a list with a single value, the same payload is sent to all the positioners. Otherwise the list length must match the number of positioners.

  • kwargs – Keyword argument to pass to the command.

Returns

commands – A list with the command instances executed.

async send_trajectory(*args, **kwargs)[source]

Sends a set of trajectories to the positioners.

See the documentation for send_trajectory.

set_interface(command: jaeger.commands.base.Command, interface: Optional[can.bus.BusABC] = None, bus: Optional[int] = None)[source]

Sets the interface and bus to which to send a command.

async shutdown()[source]

Stops pollers and shuts down all remaining tasks.

async stop_trajectory(positioners: Optional[List[int]] = None, clear_flags: bool = True, timeout: float = 0)[source]

Stops all the positioners.

Parameters
  • positioners – The list of positioners to abort. If None, abort all positioners.

  • clear_flags – If True, in addition to sending TRAJECTORY_TRANSMISSION_ABORT sends STOP_TRAJECTORY which clears all the collision and warning flags.

  • timeout – How long to wait before timing out the command. By default, just sends the command and does not wait for replies.

async unlock(force=False)[source]

Unlocks the FPS if all collisions have been resolved.

async update_firmware_version(positioner_ids: Optional[List[int]] = None, timeout: float = 2)bool[source]

Updates the firmware version of connected positioners.

Parameters
  • positioner_ids – The list of positioners to update. If None, update all positioners. positioner_ids=False ignores currently connected positioners and times out to receive all possible replies.

  • timeout – How long to wait before timing out the command.

async update_position(positioner_ids: Optional[List[int]] = None, timeout: float = 1)bool[source]

Updates positions.

Parameters
  • positioner_ids – The list of positioners to update. If None, update all positioners.

  • timeout – How long to wait before timing out the command.

async update_status(positioner_ids: Optional[List[int]] = None, timeout: float = 1)bool[source]

Update statuses for all positioners.

Parameters
  • positioner_ids – The list of positioners to update. If None, update all positioners.

  • timeout – How long to wait before timing out the command.

can

The JaegerCAN instance that serves as a CAN bus interface.

ieb: IEB | None

Connection to the instrument electronics box over Modbus.

Type

IEB

property locked

Returns True if the FPS is locked.

property moving

Returns True if any of the positioners is moving.

pollers

Position and status pollers

positioner_to_bus: Dict[int, Tuple[BusABC, int | None]]

The mapping between positioners and buses.

jaeger.positioner

class jaeger.positioner.Positioner(positioner_id: int, fps: jaeger.FPS | None = None, centre: Tuple[Optional[float], Optional[float]] = (None, None))[source]

Bases: Generic[jaeger.utils.helpers.Status_co]

Represents the status and parameters of a positioner.

Parameters
  • positioner_id – The ID of the positioner

  • fps – The FPS instance to which this positioner is linked to.

  • centre – The \((x_{\rm focal}, y_{\rm focal})\) coordinates of the central axis of the positioner.

  • sextant – The id of the sextant to which this positioner is connected.

get_positioner_flags()[source]

Returns the correct position maskbits from the firmware version.

async goto(alpha: float, beta: float, speed: Optional[Tuple[float, float]] = None, relative=False, force=False)bool[source]

Moves positioner to a given position.

Parameters
  • alpha – The position where to move the alpha arm, in degrees.

  • beta – The position where to move the beta arm, in degrees.

  • speed – The speed of the (alpha, beta) arms, in RPM on the input.

  • relative – Whether the movement is absolute or relative to the current position.

  • force – Allows to set position and speed limits outside the normal range.

Returns

resultTrue if both arms have reached the desired position, False if a problem was found.

Examples

# Move alpha and beta at the currently set speed
>>> await goto(alpha=100, beta=10)

# Set the speed of the alpha arm
>>> await goto(speed=(1000, 500))
async home()[source]

Homes the positioner.

Zeroes the positioner by counter-clockwise rotating alpha and beta until they hit the hardstops. Blocks until the move is complete.

async initialise()[source]

Initialises the position watcher.

is_bootloader()[source]

Returns True if we are in bootloader mode.

reset()[source]

Resets positioner values and statuses.

async send_command(command, error: Optional[str] = None, **kwargs)[source]

Sends and awaits a command to the FPS for this positioner.

async set_loop(motor='both', loop='closed', collisions=True)[source]

Sets the control loop for a motor.

These parameters are cleared after a restart. The motors revert to closed loop with collision detection.

Parameters
  • motor – The motor to which these changes apply, either 'alpha`', 'beta', or 'both'.

  • loop – The type of control loop, either 'open' or 'closed'.

  • collisions – Whether the firmware should automatically detect collisions and stop the positioner.

async set_position(alpha: float, beta: float)[source]

Sets the internal position of the motors.

async set_speed(alpha: float, beta: float, force=False)[source]

Sets motor speeds.

Parameters
  • alpha – The speed of the alpha arm, in RPM on the input.

  • beta – The speed of the beta arm, in RPM on the input.

  • force – Allows to set speed limits outside the normal range.

async update_firmware_version()[source]

Updates the firmware version.

async update_position(position: Optional[Tuple[float, float]] = None, timeout=1)[source]

Updates the position of the alpha and beta arms.

async update_status(status: maskbits.PositionerStatus | int = None, timeout=1.0)[source]

Updates the status of the positioner.

async wait_for_status(status: List[jaeger.maskbits.PositionerStatusV4_1], delay=1, timeout: Optional[float] = None)bool[source]

Polls the status until it reaches a certain value.

Parameters
  • status – The status to wait for. Can be a list in which case it will wait until all the statuses in the list have been reached.

  • delay – Time, in seconds, to wait between position updates.

  • timeout – How many seconds to wait for the status to reach the desired value before aborting.

Returns

result – Returns True if the status has been reached or False if the timeout limit was reached.

property collision

Returns True if the positioner is collided.

property initialised

Returns True if the system and datums have been initialised.

property move_time

Returns the move time.

property moving

Returns True if the positioner is moving.

property position

Returns a tuple with the (alpha, beta) position.

jaeger.helpers

class jaeger.utils.helpers.AsyncQueue(callback: Optional[Callable] = None)[source]

Bases: asyncio.queues.Queue

Provides an asyncio.Queue object with a watcher.

Parameters

callback – A function to call when a new item is received from the queue. It can be a coroutine.

class jaeger.utils.helpers.AsyncioExecutor[source]

Bases: concurrent.futures._base.Executor

An executor to run coroutines from a normal function.

Copied from http://bit.ly/2IYmqzN.

To use, do

with AsyncioExecutor() as executor:
    future = executor.submit(asyncio.sleep, 1)
shutdown(wait=True)[source]

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Parameters

wait – If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.

submit(fn, *args, **kwargs)[source]

Submit a coroutine to the executor.

class jaeger.utils.helpers.Poller(name, callback, delay=1, loop=None)[source]

Bases: object

A task that runs a callback periodically.

Parameters
  • name (str) – The name of the poller.

  • callback (function or coroutine) – A function or coroutine to call periodically.

  • delay (float) – Initial delay between calls to the callback.

  • loop (event loop) – The event loop to which to attach the task.

async call_now()[source]

Calls the callback immediately.

async poller()[source]

The polling loop.

async set_delay(delay=None, immediate=False)[source]

Sets the delay for polling.

Parameters
  • delay (float) – The delay between calls to the callback. If None, restores the original delay.

  • immediate (bool) – If True, stops the currently running task and sets the new delay. Otherwise waits for the current task to complete.

start(delay=None)[source]

Starts the poller.

Parameters

delay (float) – The delay between calls to the callback. If not specified, restores the original delay used when the class was instantiated.

async stop()[source]

Cancel the poller.

property running

Returns True if the poller is running.

class jaeger.utils.helpers.PollerList(pollers=[])[source]

Bases: list

A list of Poller to be managed jointly.

append(poller)[source]

Adds a poller.

async set_delay(delay=None, immediate=False)[source]

Sets the delay for all the pollers.

Parameters
  • delay (float) – The delay between calls to the callback. If None, restores the original delay.

  • immediate (bool) – If True, stops the currently running tasks and sets the new delay. Otherwise waits for the current tasks to complete.

start(delay=None)[source]

Starts all the pollers.

Parameters

delay (float) – The delay between calls to the callback. If not specified, uses the default delays for each poller.

async stop()[source]

Cancels all the poller.

property names

List the poller names.

property running

Returns True if at least one poller is running.

class jaeger.utils.helpers.StatusMixIn(maskbit_flags: Type[Status_co], initial_status: Optional[Status_co] = None, callback_func: Optional[Callable] = None, call_now: bool = False)[source]

Bases: Generic[jaeger.utils.helpers.Status_co]

A mixin that provides status tracking with callbacks.

Provides a status property that executes a list of callbacks when the status changes.

Parameters
  • maskbit_flags – A class containing the available statuses as a series of maskbit flags. Usually as subclass of enum.Flag.

  • initial_status – The initial status.

  • callback_func – The function to call if the status changes.

  • call_now – Whether the callback function should be called when initialising.

Variables

callbacks – A list of the callback functions to call.

add_callback(cb: Callable)[source]

Adds a callback.

do_callbacks()[source]

Calls functions in callbacks.

remove_callback(cb: Callable)[source]

Removes a callback.

async wait_for_status(value)[source]

Awaits until the status matches value.

property flags

Gets the flags associated to this status.

property status

Returns the status.

jaeger.utils

jaeger.utils.utils.bytes_to_int(bytes, dtype='u4', byteorder='little')[source]

Returns the integer from a bytearray representation.

Parameters
  • bytes (bytearray) – The bytearray representing the integer.

  • dtype (numpy.dtype or str) – The numpy.dtype of the byte representation for the integer, or a type code that can include the endianess. See get_dtype_str to understand how dtype and byteorder will be parsed.

  • byteorder (str) – Either 'big' for big endian representation or 'little' for little end. '>' and '<' are also accepted, respectively.

Returns

integer (int) – A integer represented by bytes.

Examples

>>> bytes_to_int(b'\x00\x05', dtype=numpy.uint16, byteorder='big')
5
jaeger.utils.utils.convert_kaiju_trajectory(path, speed=None, step_size=0.03, invert=True)[source]

Converts a raw kaiju trajectory to a jaeger trajectory format.

Parameters
  • path (str) – The path to the raw trajectory.

  • speed (float) – The maximum speed, used to convert from kaiju steps to times, in degrees per second. If not set, assumes 1000 RPM.

  • step_size (float) – The step size in degrees per step.

  • invert (bool) – If True, inverts the order of the points.

Returns

trajectory (dict) – A dictionary with the trajectory in a format understood by send_trajectory.

jaeger.utils.utils.get_dtype_str(dtype, byteorder='little')[source]

Parses dtype and byte order to return a type string code.

Parameters
  • dtype (numpy.dtype or str) – Either a dtype (e.g., numpy.uint32) or a string with the type code ('>u4'). If a string type code and the first character indicates the byte order ('>' for big, '<' for little endian), byteorder will be ignored. The type code refers to bytes, while the dtype classes refer to bits, i.e., 'u2' is equivalent to

  • byteorder (str) – Either 'big' for big endian representation or 'little' for little end. '>' and '<' are also accepted, respectively.

Returns

type_code (str) – The type code for the input dtype and byte order.

Examples

>>> get_dtype_str(numpy.uint32, byteorder='big')
'>u4'
>>> get_dtype_str('u2', byteorder='>')
'>u2'
>>> get_dtype_str('<u2', byteorder='big')
'<u2'
jaeger.utils.utils.get_goto_move_time(move, speed=None)[source]

Returns the approximate time need for a given move, in seconds.

The move time is calculated as \(\dfrac{60 \alpha r}{360 v}\) where \(\alpha\) is the angle, \(r\) is the reduction ratio, and \(v\) is the speed in the input in RPM. It adds 0.25s due to deceleration; this value is not exact but it’s a good approximation for most situations.

Parameters
  • move (float) – The move, in degrees.

  • speed (float) – The speed of the motor for the move, in RPM on the input.

jaeger.utils.utils.get_identifier(positioner_id, command_id, uid=0, response_code=0)[source]

Returns a 29 bits identifier with the correct format.

The CAN identifier format for the positioners uses an extended frame with 29-bit encoding so that the 11 higher bits correspond to the positioner ID, the 8 middle bits are the command number, the following 6 bits are the unique identifier, and the 4 lower bits are the response code.

Parameters
  • positioner_id (int) – The Id of the positioner to command, or zero for broadcast.

  • command_id (int) – The ID of the command to send.

  • uid (int) – The unique identifier

  • response_code (int) – The response code.

Returns

identifier (int) – The decimal integer corresponding to the 29-bit identifier.

Examples

>>> get_identifier(5, 17, uid=5)
1328128
>>> bin(1328128)
'0b101000100010000000000'
jaeger.utils.utils.int_to_bytes(value, dtype='u4', byteorder='little')[source]

Returns a bytearray with the representation of an integer.

Parameters
  • value (int) – The integer to convert to bytes.

  • dtype (numpy.dtype or str) – The numpy.dtype of the byte representation for the integer, or a type code that can include the endianess. See get_dtype_str to understand how dtype and byteorder will be parsed.

  • byteorder (str) – Either 'big' for big endian representation or 'little' for little end. '>' and '<' are also accepted, respectively.

Returns

bytes (bytearray) – A bytearray with the representation for the input integer.

Examples

>>> int_to_bytes(5, dtype=numpy.uint16, byteorder='big')
bytearray(b'\x00\x05')
jaeger.utils.utils.motor_steps_to_angle(alpha, beta, motor_steps=None, inverse=False)[source]

Converts motor steps to angles or vice-versa.

Parameters
  • alpha (float) – The alpha position.

  • beta (float) – The beta position.

  • motor_steps (int) – The number of steps in the motor. Defaults to the configuration value positioner.moter_steps.

  • inverse (bool) – If True, converts from angles to motor steps.

Returns

angles (tuple) – A tuple with the alpha and beta angles associated to the input motor steps. If inverse=True, alpha and beta are considered to be angles and the associated motor steps are returned.

jaeger.utils.utils.parse_identifier(identifier)[source]

Parses an extended frame identifier and returns its components.

The 29-bit extended frame identifier is composed of a positioner id, a command id, and a response code. This function parses an identifier and returns the value of each element.

Parameters

identifier (int) – The identifier returned by the CAN bus.

Returns

components (tuple) – A tuple with the components of the identifier. The first element is the positioner id, the second the command id, the third is the command UID, and the last one is the response flag as an instance of ResponseCode.

Examples

>>> parse_identifier(1315072)
(5, 17, <ResponseCode.COMMAND_ACCEPTED: 0>)
>>> parse_identifier(1315074)
(5, 17, <ResponseCode.INVALID_TRAJECTORY: 2>)

jaeger.interfaces

class jaeger.interfaces.cannet.CANNetBus(channel, port=None, bitrate=None, buses=[1], timeout=5, **kwargs)[source]

Bases: can.bus.BusABC

Interface for Ixxat CAN@net NT 200/420.

Parameters
  • channel (str) – The IP address of the remote device (e.g. 192.168.1.1, …).

  • port (int) – The port of the device.

  • bitrate (int) – Bitrate in bit/s.

  • buses (list) – The buses to open in the device. Messages that do not specify a bus will be sent to all the open buses.

  • timeout (float) – Timeout for connection.

send(msg, bus=None, timeout=None)[source]

Transmit a message to the CAN bus.

Override this method to enable the transmit path.

Parameters
  • msg (can.Message) – A message object.

  • timeout (float or None) – If > 0, wait up to this many seconds for message to be ACK’ed or for transmit queue to be ready depending on driver implementation. If timeout is exceeded, an exception will be raised. Might not be supported by all interfaces. None blocks indefinitely.

Raises

can.CanError – if the message could not be sent

shutdown()[source]

Called to carry out any interface specific cleanup required in shutting down a bus.

class jaeger.interfaces.cannet.CANNetMessage(timestamp=0.0, arbitration_id=0, is_extended_id=None, is_remote_frame=False, is_error_frame=False, channel=None, dlc=None, data=None, is_fd=False, bitrate_switch=False, error_state_indicator=False, extended_id=None, check=False)[source]

Bases: can.message.Message