Programmatic API

jaeger.can

class jaeger.can.CANnetInterface(interface_type, channels, fps=None, interface_args=<factory>, status_interval=5)[source]

Bases: jaeger.can.JaegerCAN[jaeger.interfaces.cannet.CANNetBus]

An interface class specifically for the CAN@net 200/420 device.

This class bahaves as JaegerCAN but allows communication with the device itself and tracks its status.

handle_device_message(msg)[source]

Handles a reply from the device (i.e., not from the CAN network).

Parameters

msg (jaeger.interfaces.message.Message) –

async start()[source]

Starts CAN@net connection.

stop()[source]

Stops the interfaces.

property device_status

Returns a dictionary with the status of the device.

class jaeger.can.JaegerCAN(interface_type, channels, fps=None, interface_args=<factory>)[source]

Bases: Generic[jaeger.can.Bus_co]

A CAN interface with a command queue and reply handling.

Provides support for multi-channel CAN networks, with each channel being able to host more than one bus. The recommended way to instantiate a new JaegerCAN object is using the create classmethod

can = await JaegerCAN.create(...)

which is equivalent to

can = JaegerCAN(...)
await can.start()
Parameters
  • interface_type (str) – One of INTERFACES.

  • channels (list | tuple) – A list of channels to be used to instantiate the interfaces.

  • fps (Optional[jaeger.fps.FPS]) – The focal plane system.

  • interface_args (Dict[str, Any]) – Keyword arguments to pass to the interfaces when initialising it (e.g., port, baudrate, etc).

async classmethod create(profile=None, fps=None, interface_type=None, channels=[], interface_args={})[source]

Create and initialise a new bus interface from a configuration profile.

This is the preferred method to initialise a new JaegerCAN instance and is equivalent to calling JaegerCAN and then start.

Parameters
  • profile (Optional[str]) – The name of the profile that defines the bus interface, or None to use the default configuration.

  • fps (FPS) – The focal plane system.

  • interface_type (Optional[str]) – One of INTERFACES. Cannot be used with profile.

  • channels (list | tuple) – A list of channels to be used to instantiate the interfaces.

  • interface_args (Dict[str, Any]) – Keyword arguments to pass to the interfaces when initialising it (e.g., port, baudrate, etc).

Return type

JaegerCAN

static print_profiles()[source]

Prints interface profiles and returns a list of profile names.

Return type

List[str]

refresh_running_commands()[source]

Clears completed commands.

send_messages(cmd)[source]

Sends messages to the interface.

This method exists separate from _process_queue so that it can be used to send command messages to the interface synchronously.

Parameters

cmd (jaeger.commands.base.Command) –

stop()[source]

Stops the interfaces.

jaeger.can.INTERFACES = {'cannet': {'class': <class 'jaeger.interfaces.cannet.CANNetBus'>, 'multibus': True}, 'slcan': {'class': None, 'multibus': False}, 'socketcan': {'class': None, 'multibus': False}, 'virtual': {'class': <class 'jaeger.interfaces.virtual.VirtualBus'>, 'multibus': False}}

Accepted CAN interfaces and whether they are multibus.

jaeger.fps

class jaeger.fps.BaseFPS(*args, **kwargs)[source]

Bases: Dict[int, jaeger.positioner.Positioner], Generic[jaeger.fps.FPS_T]

A class describing the Focal Plane System.

This class includes methods to read the layout and construct positioner objects and can be used by the real FPS class or the VirtualFPS.

BaseFPS instances are singletons in the sense that one cannot instantiate more than one. An error is raise if __new__ is called with an existing instance. To retrieve the running instance, use get_instance.

Variables

positioner_class (ClassVar[Type[Positioner]]) – The class to be used to create a new positioner. In principle this will be Positioner but it may be different if the positioners are created for a VirtualFPS.

positioner_class

alias of jaeger.positioner.Positioner

add_positioner(positioner, centre=(None, None))[source]

Adds a new positioner to the list, and checks for duplicates.

Parameters

positioner (int | Positioner) –

Return type

Positioner

classmethod get_instance(*args, **kwargs)[source]

Returns the running instance.

Return type

jaeger.fps.FPS_T

property positioners

Dictionary of positioner associated with this FPS.

This is just a wrapper around the BaseFPS instance which is a dictionary itself. May be deprecated in the future.

class jaeger.fps.FPS(*args, **kwargs)[source]

Bases: jaeger.fps.BaseFPS[FPS]

A class describing the Focal Plane System.

The recommended way to instantiate a new FPS object is to use the create classmethod

fps = await FPS.create(...)

which is equivalent to

fps = FPS(...)
await fps.initialise()
Parameters
  • can (JaegerCAN | str | None) – A JaegerCAN instance to use to communicate with the CAN network, or the CAN profile from the configuration to use, or None to use the default one.

  • ieb (Union[bool, IEB, dict, str, pathlib.Path, None]) – If True or None, connects the Instrument Electronics Box PLC controller using the path to the IEB configuration file stored in jaeger’s configuration. Can also be an IEB instance, the path to a custom configuration file used to load one, or a dictionary with the configuration itself.

Examples

After instantiating a new FPS object it is necessary to call initialise to retrieve the positioner layout and the status of the connected positioners. Note that initialise is a coroutine which needs to be awaited

>>> fps = FPS(can='default')
>>> await fps.initialise()
>>> fps.positioners[4].status
<Positioner (id=4, status='SYSTEM_INITIALIZED|
DISPLACEMENT_COMPLETED|ALPHA_DISPLACEMENT_COMPLETED|
BETA_DISPLACEMENT_COMPLETED')>
abort()[source]

Aborts trajectories and stops positioners. Alias for stop_trajectory.

add_positioner(positioner, centre=(None, None), interface=None, bus=None)[source]

Adds a new positioner to the list, and checks for duplicates.

Parameters
Return type

Positioner

async async_status()[source]

Generator that yields FPS status changes.

async classmethod create(can=None, ieb=None, initialise=True, start_pollers=None, enable_low_temperature=True, use_lock=True)[source]

Starts the CAN bus and .

Parameters
  • initialise – Whether to initialise the FPS.

  • start_pollers (bool | None) – Whether to initialise the pollers.

  • use_lock (bool) – Use a lock file to prevent multiple instances of FPS.

  • kwargs – Parameters to pass to FPS.

  • enable_low_temperature (bool) –

Return type

FPS

get_positions(ignore_disabled=False)[source]

Returns the alpha and beta positions as an array.

Return type

numpy.ndarray

async goto(positioner_ids, alpha, beta, speed=None, relative=False, force=False, use_sync_line=None)[source]

Sends a list of positioners to a given position.

Parameters
  • positioner_ids (int | List[int] | None) – The list of positioner_ids to command. If None, uses all conencted positioners.

  • alpha (float | list | numpy.ndarray) – The alpha angle. Can be an array with the same size of the list of positioner IDs. Otherwise sends all the positioners to the same angle.

  • beta (float | list | numpy.ndarray) – The beta angle.

  • speed (Optional[Tuple[float, float]]) – As a tuple, the alpha and beta speeds to use. If None, uses the default ones.

  • relative – If True, alpha and beta are considered relative angles.

  • force (bool) – If positioners_ids=None, force must be set to True to move the entire array.

  • use_sync_line (bool | None) – Whether to use the SYNC line to start the trajectories.

async initialise(start_pollers=None, enable_low_temperature=True, use_lock=True)[source]

Initialises all positioners with status and firmware version.

Parameters
  • start_pollers (bool | None) – Whether to initialise the pollers.

  • self (T) –

  • enable_low_temperature (bool) –

  • use_lock (bool) –

Return type

T

is_bootloader()[source]

Returns True if any positioner is in bootloader mode.

async lock(stop_trajectories=True, by=None, do_warn=True, snapshot=True)[source]

Locks the FPS and prevents commands to be sent.

Parameters
  • stop_trajectories (bool) – Whether to stop trajectories when locking. This will not clear any collided flags.

  • by (Optional[List[int]]) –

  • do_warn (bool) –

  • snapshot (bool) –

async report_status()[source]

Returns a dict with the position and status of each positioner.

Return type

Dict[str, Any]

async save_snapshot(path=None, collision_buffer=None, return_axes=False, highlight=None)[source]

Creates a plot with the current arrangement of the FPS array.

Parameters
  • path (Optional[str | pathlib.Path]) – The path where to save the plot. Defaults to /data/fps/snapshots/MJD/fps_snapshot_<SEQ>.pdf.

  • collision_buffer (float | None) – The collision buffer.

  • return_axes (bool) – If True, returns the matplotlib axes instead of saving the plot.

  • highlight (int | None) –

Return type

str | Axes

send_command(command, positioner_ids=None, data=None, now=False, **kwargs)[source]

Sends a command to the bus.

Parameters
  • command (str | int | CommandID | Command) – The ID of the command, either as the integer value, a string, or the CommandID flag. Alternatively, the Command to send.

  • positioner_ids (int | List[int] | None) – The positioner IDs to command, or zero for broadcast. If None, sends the command to all FPS non-disabled positioners.

  • data (Any) – The bytes to send. See Command for details on the format.

  • interface – The index in the interface list for the interface to use. Only relevant in case of a multibus interface. If None, the positioner to bus map will be used.

  • bus – The bus within the interface to be used. Only relevant in case of a multibus interface. If None, the positioner to bus map will be used.

  • now (bool) – If True, the command is sent to the CAN network immediately, skipping the command queue. No tracking is done for this command. It should only be used for emergency and shutdown commands.

  • kwargs – Extra arguments to be passed to the command.

Returns

command – The command sent to the bus. The command needs to be awaited before it is considered done.

Return type

Command

async send_to_all(*args, **kwargs)[source]

Sends a command to all connected positioners.

This method has been deprecated. Use send_command with a list for positioner_ids instead.

async send_trajectory(*args, **kwargs)[source]

Sends a set of trajectories to the positioners.

See the documentation for send_trajectory.

Returns

trajectory – The Trajectory object.

Raises

TajectoryError – You can inspect the Trajectory object by capturing the error and accessing error.trajectory.

set_status(status)[source]

Sets the status of the FPS.

Parameters

status (jaeger.maskbits.FPSStatus) –

async shutdown()[source]

Stops pollers and shuts down all remaining tasks.

async start_can(use_lock=True)[source]

Starts the JaegerCAN interface.

Parameters

use_lock (bool) –

async stop_trajectory(clear_flags=False)[source]

Stops all the positioners without clearing collided flags.

Parameters

clear_flags – If True, sends STOP_TRAJECTORY which clears collided flags. Otherwise sends SEND_TRAJECTORY_ABORT.

async unlock(force=False)[source]

Unlocks the FPS if all collisions have been resolved.

async update_firmware_version(positioner_ids=None, timeout=2)[source]

Updates the firmware version of connected positioners.

Parameters
  • positioner_ids (Optional[int | List[int]]) – The list of positioners to update. If None, update all positioners.

  • timeout (float) – How long to wait before timing out the command.

Return type

bool

async update_position(positioner_ids=None, timeout=1)[source]

Updates positions.

Parameters
  • positioner_ids (Optional[int | List[int]]) – The list of positioners to update. If None, update all initialised positioners.

  • timeout (float) – How long to wait before timing out the command.

Return type

numpy.ndarray | bool

async update_status(positioner_ids=None, timeout=1)[source]

Update statuses for all positioners.

Parameters
  • positioner_ids (Optional[int | List[int]]) – The list of positioners to update. If None, update all positioners.

  • timeout (float) – How long to wait before timing out the command.

Return type

bool

property locked

Returns True if the FPS is locked.

property moving

Returns True if any of the positioners is moving.

jaeger.positioner

class jaeger.positioner.Positioner(positioner_id, fps=None, centre=(None, None))[source]

Bases: jaeger.utils.helpers.StatusMixIn

Represents the status and parameters of a positioner.

Parameters
  • positioner_id – The ID of the positioner

  • fps – The FPS instance to which this positioner is linked to.

  • centre – The \((x_{\rm focal}, y_{\rm focal})\) coordinates of the central axis of the positioner.

  • sextant – The id of the sextant to which this positioner is connected.

get_bus()[source]

Returns the interface index and bus.

Return type

Tuple[int, int | None]

async get_number_trajectories()[source]

Returns the number of trajectories executed by the positioner.

Will return None if the firmware does not support the GET_NUMBER_TRAJECTORIES.

Return type

int | None

get_positioner_flags()[source]

Returns the correct position maskbits from the firmware version.

async goto(alpha, beta, speed=None, relative=False, force=False, use_trajectory=True, use_sync_line=None)[source]

Moves positioner to a given position.

Parameters
  • alpha (float) – The position where to move the alpha arm, in degrees.

  • beta (float) – The position where to move the beta arm, in degrees.

  • speed (Optional[Tuple[float, float]]) – The speed of the (alpha, beta) arms, in RPM on the input.

  • relative – Whether the movement is absolute or relative to the current position.

  • force – Allows to set position and speed limits outside the normal range.

  • use_trajectory – If True, uses a trajectory to reach the position.

  • use_sync_line – If use_trajectory=True, whether to use the SYNC line.

Returns

resultTrue if both arms have reached the desired position, False if a problem was found.

Return type

bool

Examples

# Move alpha and beta at the currently set speed
>>> await goto(alpha=100, beta=10)

# Set the speed of the alpha arm
>>> await goto(speed=(1000, 500))
async home()[source]

Homes the positioner.

Zeroes the positioner by counter-clockwise rotating alpha and beta until they hit the hardstops. Blocks until the move is complete.

async initialise(disable_precise_moves=False)[source]

Initialises the position watcher.

is_bootloader()[source]

Returns True if we are in bootloader mode.

reset()[source]

Resets positioner values and statuses.

async send_command(command, error=None, **kwargs)[source]

Sends and awaits a command to the FPS for this positioner.

Parameters

error (Optional[str]) –

async set_loop(motor='both', loop='closed', collisions=True)[source]

Sets the control loop for a motor.

These parameters are cleared after a restart. The motors revert to closed loop with collision detection.

Parameters
  • motor – The motor to which these changes apply, either 'alpha`', 'beta', or 'both'.

  • loop – The type of control loop, either 'open' or 'closed'.

  • collisions – Whether the firmware should automatically detect collisions and stop the positioner.

async set_position(alpha, beta)[source]

Sets the internal position of the motors.

Parameters
async set_precise_move(mode, alpha=True, beta=True)[source]

Switches the precise moves on alpha and beta.

async set_speed(alpha, beta, force=False)[source]

Sets motor speeds.

Parameters
  • alpha (float) – The speed of the alpha arm, in RPM on the input.

  • beta (float) – The speed of the beta arm, in RPM on the input.

  • force – Allows to set speed limits outside the normal range.

async update_firmware_version()[source]

Updates the firmware version.

async update_position(position=None, timeout=1)[source]

Updates the position of the alpha and beta arms.

Parameters

position (Optional[Tuple[float, float]]) –

async update_status(status=None, timeout=1.0)[source]

Updates the status of the positioner.

Parameters

status (maskbits.PositionerStatus | int) –

async wait_for_status(status, delay=1, timeout=None)[source]

Polls the status until it reaches a certain value.

Parameters
  • status (List[jaeger.maskbits.PositionerStatusV4_1]) – The status to wait for. Can be a list in which case it will wait until all the statuses in the list have been reached.

  • delay – Time, in seconds, to wait between position updates.

  • timeout (Optional[float]) – How many seconds to wait for the status to reach the desired value before aborting.

Returns

result – Returns True if the status has been reached or False if the timeout limit was reached.

Return type

bool

property collision

Returns True if the positioner is collided.

property initialised

Returns True if the system and datums have been initialised.

property moving

Returns True if the positioner is moving.

property position

Returns a tuple with the (alpha, beta) position.

jaeger.helpers

class jaeger.utils.helpers.AsyncQueue(callback=None)[source]

Bases: asyncio.queues.Queue

Provides an asyncio.Queue object with a watcher.

Parameters

callback (Optional[Callable]) – A function to call when a new item is received from the queue. It can be a coroutine.

class jaeger.utils.helpers.AsyncioExecutor[source]

Bases: concurrent.futures._base.Executor

An executor to run coroutines from a normal function.

Copied from http://bit.ly/2IYmqzN.

To use, do

with AsyncioExecutor() as executor:
    future = executor.submit(asyncio.sleep, 1)
shutdown(wait=True)[source]

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Parameters

wait – If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.

submit(fn, *args, **kwargs)[source]

Submit a coroutine to the executor.

class jaeger.utils.helpers.Poller(name, callback, delay=1, loop=None)[source]

Bases: object

A task that runs a callback periodically.

Parameters
  • name (str) – The name of the poller.

  • callback (function or coroutine) – A function or coroutine to call periodically.

  • delay (float) – Initial delay between calls to the callback.

  • loop (event loop) – The event loop to which to attach the task.

async call_now()[source]

Calls the callback immediately.

async poller()[source]

The polling loop.

async set_delay(delay=None, immediate=False)[source]

Sets the delay for polling.

Parameters
  • delay (float) – The delay between calls to the callback. If None, restores the original delay.

  • immediate (bool) – If True, stops the currently running task and sets the new delay. Otherwise waits for the current task to complete.

start(delay=None)[source]

Starts the poller.

Parameters

delay (float) – The delay between calls to the callback. If not specified, restores the original delay used when the class was instantiated.

async stop()[source]

Cancel the poller.

property running

Returns True if the poller is running.

class jaeger.utils.helpers.PollerList(pollers=[])[source]

Bases: list

A list of Poller to be managed jointly.

append(poller)[source]

Adds a poller.

async set_delay(delay=None, immediate=False)[source]

Sets the delay for all the pollers.

Parameters
  • delay (float) – The delay between calls to the callback. If None, restores the original delay.

  • immediate (bool) – If True, stops the currently running tasks and sets the new delay. Otherwise waits for the current tasks to complete.

start(delay=None)[source]

Starts all the pollers.

Parameters

delay (float) – The delay between calls to the callback. If not specified, uses the default delays for each poller.

async stop()[source]

Cancels all the poller.

property names

List the poller names.

property running

Returns True if at least one poller is running.

class jaeger.utils.helpers.StatusMixIn(maskbit_flags, initial_status=None, callback_func=None, call_now=False)[source]

Bases: Generic[jaeger.utils.helpers.Status_co]

A mixin that provides status tracking with callbacks.

Provides a status property that executes a list of callbacks when the status changes.

Parameters
  • maskbit_flags – A class containing the available statuses as a series of maskbit flags. Usually as subclass of enum.Flag.

  • initial_status – The initial status.

  • callback_func – The function to call if the status changes.

  • call_now – Whether the callback function should be called when initialising.

Variables

callbacks – A list of the callback functions to call.

add_callback(cb)[source]

Adds a callback.

Parameters

cb (Callable) –

do_callbacks()[source]

Calls functions in callbacks.

remove_callback(cb)[source]

Removes a callback.

Parameters

cb (Callable) –

async wait_for_status(value)[source]

Awaits until the status matches value.

property flags

Gets the flags associated to this status.

property status: jaeger.utils.helpers.Status_co

Returns the status.

async jaeger.utils.helpers.run_in_executor(fn, *args, catch_warnings=False, executor='thread', **kwargs)[source]

Runs a function in an executor.

In addition to streamlining the use of the executor, this function catches any warning issued during the execution and reissues them after the executor is done. This is important when using the actor log handler since inside the executor there is no loop that CLU can use to output the warnings.

In general, note that the function must not try to do anything with the actor since they run on different loops.

jaeger.utils

jaeger.utils.utils.bytes_to_int(bytes, dtype='u4', byteorder='little')[source]

Returns the integer from a bytearray representation.

Parameters
  • bytes (bytearray) – The bytearray representing the integer.

  • dtype (numpy.dtype or str) – The numpy.dtype of the byte representation for the integer, or a type code that can include the endianess. See get_dtype_str to understand how dtype and byteorder will be parsed.

  • byteorder (str) – Either 'big' for big endian representation or 'little' for little end. '>' and '<' are also accepted, respectively.

Returns

integer (int) – A integer represented by bytes.

Examples

>>> bytes_to_int(b'\x00\x05', dtype=numpy.uint16, byteorder='big')
5
jaeger.utils.utils.convert_kaiju_trajectory(path, speed=None, step_size=0.03, invert=True)[source]

Converts a raw kaiju trajectory to a jaeger trajectory format.

Parameters
  • path (str) – The path to the raw trajectory.

  • speed (float) – The maximum speed, used to convert from kaiju steps to times, in degrees per second. If not set, assumes 1000 RPM.

  • step_size (float) – The step size in degrees per step.

  • invert (bool) – If True, inverts the order of the points.

Returns

trajectory (dict) – A dictionary with the trajectory in a format understood by send_trajectory.

jaeger.utils.utils.get_dtype_str(dtype, byteorder='little')[source]

Parses dtype and byte order to return a type string code.

Parameters
  • dtype (numpy.dtype or str) – Either a dtype (e.g., numpy.uint32) or a string with the type code ('>u4'). If a string type code and the first character indicates the byte order ('>' for big, '<' for little endian), byteorder will be ignored. The type code refers to bytes, while the dtype classes refer to bits, i.e., 'u2' is equivalent to

  • byteorder (str) – Either 'big' for big endian representation or 'little' for little end. '>' and '<' are also accepted, respectively.

Returns

type_code (str) – The type code for the input dtype and byte order.

Examples

>>> get_dtype_str(numpy.uint32, byteorder='big')
'>u4'
>>> get_dtype_str('u2', byteorder='>')
'>u2'
>>> get_dtype_str('<u2', byteorder='big')
'<u2'
jaeger.utils.utils.get_goto_move_time(move, speed=None)[source]

Returns the approximate time need for a given move, in seconds.

The move time is calculated as \(\dfrac{60 \alpha r}{360 v}\) where \(\alpha\) is the angle, \(r\) is the reduction ratio, and \(v\) is the speed in the input in RPM. It adds 0.25s due to deceleration; this value is not exact but it’s a good approximation for most situations.

Parameters
  • move (float) – The move, in degrees.

  • speed (float) – The speed of the motor for the move, in RPM on the input.

jaeger.utils.utils.get_identifier(positioner_id, command_id, uid=0, response_code=0)[source]

Returns a 29 bits identifier with the correct format.

The CAN identifier format for the positioners uses an extended frame with 29-bit encoding so that the 11 higher bits correspond to the positioner ID, the 8 middle bits are the command number, the following 6 bits are the unique identifier, and the 4 lower bits are the response code.

Parameters
  • positioner_id (int) – The Id of the positioner to command, or zero for broadcast.

  • command_id (int) – The ID of the command to send.

  • uid (int) – The unique identifier

  • response_code (int) – The response code.

Returns

identifier (int) – The decimal integer corresponding to the 29-bit identifier.

Examples

>>> get_identifier(5, 17, uid=5)
1328128
>>> bin(1328128)
'0b101000100010000000000'
jaeger.utils.utils.get_sjd(observatory=None)[source]

Returns the SDSS Julian Date as an integer based on the observatory.

Parameters

observatory (Optional[str]) – The current observatory, either APO or LCO. If None, uses $OBSERVATORY.

Return type

int

jaeger.utils.utils.int_to_bytes(value, dtype='u4', byteorder='little')[source]

Returns a bytearray with the representation of an integer.

Parameters
  • value (int) – The integer to convert to bytes.

  • dtype (numpy.dtype or str) – The numpy.dtype of the byte representation for the integer, or a type code that can include the endianess. See get_dtype_str to understand how dtype and byteorder will be parsed.

  • byteorder (str) – Either 'big' for big endian representation or 'little' for little end. '>' and '<' are also accepted, respectively.

Returns

bytes (bytearray) – A bytearray with the representation for the input integer.

Examples

>>> int_to_bytes(5, dtype=numpy.uint16, byteorder='big')
bytearray(b'\x00\x05')
jaeger.utils.utils.motor_steps_to_angle(alpha, beta, motor_steps=None, inverse=False)[source]

Converts motor steps to angles or vice-versa.

Parameters
  • alpha (float) – The alpha position.

  • beta (float) – The beta position.

  • motor_steps (int) – The number of steps in the motor. Defaults to the configuration value positioner.moter_steps.

  • inverse (bool) – If True, converts from angles to motor steps.

Returns

angles (tuple) – A tuple with the alpha and beta angles associated to the input motor steps. If inverse=True, alpha and beta are considered to be angles and the associated motor steps are returned.

jaeger.utils.utils.parse_identifier(identifier)[source]

Parses an extended frame identifier and returns its components.

The 29-bit extended frame identifier is composed of a positioner id, a command id, and a response code. This function parses an identifier and returns the value of each element.

Parameters

identifier (int) – The identifier returned by the CAN bus.

Returns

components – A tuple with the components of the identifier. The first element is the positioner id, the second the command id, the third is the command UID, and the last one is the response flag as an instance of ResponseCode.

Return type

Tuple[int, int, int, jaeger.maskbits.ResponseCode]

Examples

>>> parse_identifier(1315072)
(5, 17, <ResponseCode.COMMAND_ACCEPTED: 0>)
>>> parse_identifier(1315074)
(5, 17, <ResponseCode.INVALID_TRAJECTORY: 2>)

jaeger.interfaces

class jaeger.interfaces.BusABC(*args, **kwargs)[source]

Bases: object

A base CAN bus.

abstract async get()[source]

Receives messages from the bus.

async open(*args, **kwargs)[source]

Starts the bus.

This method call the _open_internal method in the subclass bus, if present. It’s meant mainly to initialise any process that needs to be run as a coroutine. Must return True if the connection was successful, False or an error otherwise.

Return type

bool

abstract send(msg, **kwargs)[source]

Sends a message to the bus.

Parameters

msg (jaeger.interfaces.message.Message) –

class jaeger.interfaces.CANNetBus(channel, port=None, bitrate=None, buses=[1], timeout=5, **kwargs)[source]

Bases: jaeger.interfaces.bus.BusABC

Interface for Ixxat CAN@net NT 200/420.

Parameters
  • channel (str) – The IP address of the remote device (e.g. 192.168.1.1, …).

  • port (int) – The port of the device.

  • bitrate (int) – Bitrate in bit/s.

  • buses (list) – The buses to open in the device. Messages that do not specify a bus will be sent to all the open buses.

  • timeout (float) – Timeout for connection.

async get()[source]

Receives messages from the bus.

send(msg, bus=None)[source]

Sends a message to the bus.

class jaeger.interfaces.Message(timestamp=0.0, arbitration_id=0, is_extended_id=None, is_remote_frame=False, is_error_frame=False, channel=None, dlc=None, data=None, is_fd=False, bitrate_switch=False, error_state_indicator=False, extended_id=None, check=False)[source]

Bases: object

The Message object is used to represent CAN messages for sending, receiving and other purposes like converting between different logging formats.

Messages can use extended identifiers, be remote or error frames, contain data and may be associated to a channel.

Messages are always compared by identity and never by value, because that may introduce unexpected behaviour. See also equals().

copy()/deepcopy() is supported as well.

Messages do not support “dynamic” attributes, meaning any others than the documented ones, since it uses __slots__.

This class is copied directly from python-can.

equals(other, timestamp_delta=1e-06)[source]

Compares a given message with this one.

Parameters
  • other (Message) – the message to compare with

  • timestamp_delta (float or int or None) – the maximum difference at which two timestamps are still considered equal or None to not compare timestamps

Return type

bool

Returns

True iff the given message equals this one

class jaeger.interfaces.Notifier(listeners=[], buses=[])[source]

Bases: object

Notifier class to report bus messages to multiple listeners.

Parameters
  • listeners (List[Listener_co]) –

  • buses (List[Bus_co]) –

add_bus(bus)[source]

Adds a bus to monitor.

add_listener(callback)[source]

Adds a listener.

Parameters

callback (Callable[[...], Coroutine[jaeger.interfaces.message.Message, Any, Any]]) –

remove_notifier(callback)[source]

Removes a listener.

Parameters

callback (Callable[[...], Coroutine[jaeger.interfaces.message.Message, Any, Any]]) –

stop()[source]

Stops the notifier.

class jaeger.interfaces.VirtualBus(channel)[source]

Bases: jaeger.interfaces.bus.BusABC

A class implementing a virtual CAN bus that listens to messages on a channel.

Parameters

channel (str) –

async get()[source]

Get messages from the bus.

send(msg)[source]

Send message to the virtual bus (self does not receive a copy).

Parameters

msg (jaeger.interfaces.message.Message) –