Programmatic API¶
jaeger.can¶
- class jaeger.can.CANnetInterface(*args, **kwargs)[source]¶
Bases:
jaeger.can.JaegerCAN
[jaeger.interfaces.cannet.CANNetBus
]An interface class specifically for the CAN@net 200/420 device.
This class bahaves as
JaegerCAN
but allows communication with the device itself and tracks its status.- handle_device_message(msg)[source]¶
Handles a reply from the device (i.e., not from the CAN network).
- Parameters
msg (can.message.Message) –
- property device_status¶
Returns a dictionary with the status of the device.
- class jaeger.can.JaegerCAN(interface_type, channels, *args, fps=None, **kwargs)[source]¶
Bases:
Generic
[jaeger.can.Bus_co
]A CAN interface with a command queue and reply handling.
Provides support for multi-channel CAN networks, with each channel being able to host more than one bus. In general, a new instance of
JaegerCAN
is create via thefrom_profile
classmethod.- Parameters
interface_type – One of
INTERFACES
. Defines the python-can interface to use.channels – A list of channels to be used to instantiate the interfaces.
fps – The focal plane system.
args – Arguments and keyword arguments to pass to the interfaces when initialising it (e.g., port, baudrate, etc).
kwargs – Arguments and keyword arguments to pass to the interfaces when initialising it (e.g., port, baudrate, etc).
- Variables
command_queue (asyncio.Queue) – Queue of messages to be sent to the bus. The messages are sent as soon as the bus has finished processing any commands with the same
command_id
andpositioner_id
.listener (JaegerReaderCallback) – A
JaegerReaderCallback
instance that runs a callback when a new message is received from the bus.multibus (bool) – Whether the interfaces are multibus.
notifier (can.Notifier) – A
can.Notifier
instance that processes messages from the list of buses, asynchronously.
- classmethod from_profile(profile=None, **kwargs)[source]¶
Creates a new bus interface from a configuration profile.
- static print_profiles()[source]¶
Prints interface profiles and returns a list of profile names.
- Return type
List[str]
- send_to_interfaces(message, interfaces=None, bus=None)[source]¶
Sends the message to the appropriate interface and bus.
- Parameters
message (jaeger.commands.base.Message) –
interfaces (Optional[List[jaeger.can.Bus_co]]) –
bus (Optional[Any]) –
- interfaces: List[Bus_co]¶
A list of python-can interfaces, one for each of the
channels
.- Type
- class jaeger.can.JaegerReaderCallback(callback, loop=None)[source]¶
Bases:
can.listener.Listener
A message reader that triggers a callback on message received.
- Parameters
callback (Callable[[can.Message], Any]) – The function to run when a new message is received.
loop (Optional[asyncio.AbstractEventLoop]) – If an asyncio event loop, the callback will be called with
call_soon
, otherwise it will be called immediately.
- jaeger.can.INTERFACES = {'cannet': {'class': <class 'jaeger.interfaces.cannet.CANNetBus'>, 'multibus': True}, 'slcan': {'class': <class 'can.interfaces.slcan.slcanBus'>, 'multibus': False}, 'socketcan': {'class': <class 'can.interfaces.socketcan.socketcan.SocketcanBus'>, 'multibus': False}, 'virtual': {'class': <class 'can.interfaces.virtual.VirtualBus'>, 'multibus': False}}¶
Accepted CAN interfaces and whether they are multibus.
jaeger.fps¶
- class jaeger.fps.BaseFPS(layout=None, positioner_class=<class 'jaeger.positioner.Positioner'>)[source]¶
Bases:
dict
A class describing the Focal Plane System.
This class includes methods to read the layout and construct positioner objects and can be used by the real
FPS
class or theVirtualFPS
.- Parameters
layout (Optional[str | pathlib.Path]) – The path to the layout describing the position of the robots on the focal plane.
positioner_class (Type[Any]) – The class to be used to create a new positioner. In principle this will be
Positioner
but it may be different if the positioners are created for aVirtualFPS
.
- class jaeger.fps.FPS(can=None, layout=None, can_profile=None, ieb=True, loop=None, engineering_mode=False)[source]¶
Bases:
jaeger.fps.BaseFPS
A class describing the Focal Plane System.
- Parameters
can (Optional[str]) – The CAN bus to use.
layout (Optional[str]) – The file path to the layout describing the position of the robots on the focal plane. If
None
, the default layout will be used.can_profile (Optional[str]) – The configuration profile for the CAN interface, or
None
to use the default one. Ignored ifcan
is passed.ieb (IEBArg) – If
True
orNone
, connects the Instrument Electronics Box PLC controller using the path to the IEB configuration file stored in jaeger’s configuration. Can also be anIEB
instance, the path to a custom configuration file used to load one, or a dictionary with the configuration itself.loop (Optional[asyncio.AbstractEventLoop]) – The asyncio event loop. If
None
, usesasyncio.get_event_loop
to get a valid loop.engineering_mode (bool) – If
True
, disables most safety checks to enable debugging. This may result in hardware damage so it must not be used lightly.
Examples
After instantiating a new
FPS
object it is necessary to callinitialise
to retrieve the positioner layout and the status of the connected positioners. Note thatinitialise
is a coroutine which needs to be awaited>>> fps = FPS(can_profile='default') >>> await fps.initialise() >>> fps.positioners[4].status <Positioner (id=4, status='SYSTEM_INITIALIZED| DISPLACEMENT_COMPLETED|ALPHA_DISPLACEMENT_COMPLETED| BETA_DISPLACEMENT_COMPLETED')>
- async initialise(allow_unknown=True, start_pollers=True)[source]¶
Initialises all positioners with status and firmware version.
- Parameters
- Return type
- async lock(stop_trajectories=True)[source]¶
Locks the
FPS
and prevents commands to be sent.- Parameters
stop_trajectories (bool) – Whether to stop trajectories when locking.
- async report_status()[source]¶
Returns a dict with the position and status of each positioner.
- Return type
Dict[str, Any]
- send_command(command, positioner_id=0, data=bytearray(b''), interface=None, bus=None, broadcast=False, override=False, safe=False, synchronous=False, **kwargs)[source]¶
Sends a command to the bus.
- Parameters
command (str | int | CommandID | Command) – The ID of the command, either as the integer value, a string, or the
CommandID
flag. Alternatively, theCommand
to send.positioner_id (int) – The positioner ID to command, or zero for broadcast.
data (bytearray) – The bytes to send.
interface (Optional[BusABC]) – The index in the interface list for the interface to use. Only relevant in case of a multibus interface. If
None
, the positioner to bus map will be used.bus (Optional[int]) – The bus within the interface to be used. Only relevant in case of a multibus interface. If
None
, the positioner to bus map will be used.broadcast (bool) – If
True
, sends the command to all the buses.override (bool) – If another instance of this command_id with the same positioner_id is running, cancels it and schedules this one immediately. Otherwise the command is queued until the first one finishes.
safe (bool) – Whether the command is safe to send to a locked
FPS
.synchronous (bool) – If
True
, the command is sent to the CAN network immediately, skipping the command queue. No tracking is done for this command. It should only be used for shutdown commands.kwargs – Extra arguments to be passed to the command.
- Returns
command – The command sent to the bus. The command needs to be awaited before it is considered done.
- Return type
- async send_to_all(command, positioners=None, data=None, **kwargs)[source]¶
Sends a command to multiple positioners and awaits completion.
- Parameters
command (str | int | CommandID | Command) – The ID of the command, either as the integer value, a string, or the
CommandID
flag. Alternatively, theCommand
to send.positioners (Optional[List[int]]) – The list of
positioner_id
of the positioners to command. IfNone
, sends the command to all the positioners in the FPS that are not disabled.data (Optional[List[bytearray]]) – The payload to send. If
None
, no payload is sent. If the value is a list with a single value, the same payload is sent to all the positioners. Otherwise the list length must match the number of positioners.kwargs – Keyword argument to pass to the command.
- Returns
commands – A list with the command instances executed.
- Return type
List[Command]
- async send_trajectory(*args, **kwargs)[source]¶
Sends a set of trajectories to the positioners.
See the documentation for
send_trajectory
.
- set_interface(command, interface=None, bus=None)[source]¶
Sets the interface and bus to which to send a command.
- Parameters
command (jaeger.commands.base.Command) –
interface (Optional[can.bus.BusABC]) –
bus (Optional[int]) –
- async stop_trajectory(positioners=None, clear_flags=True, timeout=0)[source]¶
Stops all the positioners.
- Parameters
positioners (Optional[List[int]]) – The list of positioners to abort. If
None
, abort all positioners.clear_flags (bool) – If
True
, in addition to sendingTRAJECTORY_TRANSMISSION_ABORT
sendsSTOP_TRAJECTORY
which clears all the collision and warning flags.timeout (float) – How long to wait before timing out the command. By default, just sends the command and does not wait for replies.
- async update_firmware_version(positioner_ids=None, timeout=2)[source]¶
Updates the firmware version of connected positioners.
- Parameters
- Return type
- pollers¶
Position and status pollers
jaeger.positioner¶
- class jaeger.positioner.Positioner(positioner_id, fps=None, centre=(None, None))[source]¶
Bases:
Generic
[jaeger.utils.helpers.Status_co
]Represents the status and parameters of a positioner.
- Parameters
positioner_id – The ID of the positioner
fps – The
FPS
instance to which this positioner is linked to.centre – The \((x_{\rm focal}, y_{\rm focal})\) coordinates of the central axis of the positioner.
sextant – The id of the sextant to which this positioner is connected.
- async goto(alpha, beta, speed=None, relative=False, force=False)[source]¶
Moves positioner to a given position.
- Parameters
alpha (float) – The position where to move the alpha arm, in degrees.
beta (float) – The position where to move the beta arm, in degrees.
speed (Optional[Tuple[float, float]]) – The speed of the
(alpha, beta)
arms, in RPM on the input.relative – Whether the movement is absolute or relative to the current position.
force – Allows to set position and speed limits outside the normal range.
- Returns
result –
True
if both arms have reached the desired position,False
if a problem was found.- Return type
Examples
# Move alpha and beta at the currently set speed >>> await goto(alpha=100, beta=10) # Set the speed of the alpha arm >>> await goto(speed=(1000, 500))
- async home()[source]¶
Homes the positioner.
Zeroes the positioner by counter-clockwise rotating alpha and beta until they hit the hardstops. Blocks until the move is complete.
- async send_command(command, error=None, **kwargs)[source]¶
Sends and awaits a command to the FPS for this positioner.
- Parameters
error (Optional[str]) –
- async set_loop(motor='both', loop='closed', collisions=True)[source]¶
Sets the control loop for a motor.
These parameters are cleared after a restart. The motors revert to closed loop with collision detection.
- Parameters
motor – The motor to which these changes apply, either
'alpha`'
,'beta'
, or'both'
.loop – The type of control loop, either
'open'
or'closed'
.collisions – Whether the firmware should automatically detect collisions and stop the positioner.
- async set_precise_move(mode, alpha=True, beta=True)[source]¶
Switches the precise moves on alpha and beta.
- async update_position(position=None, timeout=1)[source]¶
Updates the position of the alpha and beta arms.
- async update_status(status=None, timeout=1.0)[source]¶
Updates the status of the positioner.
- Parameters
status (maskbits.PositionerStatus | int) –
- async wait_for_status(status, delay=1, timeout=None)[source]¶
Polls the status until it reaches a certain value.
- Parameters
status (List[jaeger.maskbits.PositionerStatusV4_1]) – The status to wait for. Can be a list in which case it will wait until all the statuses in the list have been reached.
delay – Time, in seconds, to wait between position updates.
timeout (Optional[float]) – How many seconds to wait for the status to reach the desired value before aborting.
- Returns
result – Returns
True
if the status has been reached orFalse
if the timeout limit was reached.- Return type
- property initialised¶
Returns
True
if the system and datums have been initialised.
- property move_time¶
Returns the move time.
- property position¶
Returns a tuple with the
(alpha, beta)
position.
jaeger.helpers¶
- class jaeger.utils.helpers.AsyncQueue(callback=None)[source]¶
Bases:
asyncio.queues.Queue
Provides an
asyncio.Queue
object with a watcher.- Parameters
callback (Optional[Callable]) – A function to call when a new item is received from the queue. It can be a coroutine.
- class jaeger.utils.helpers.AsyncioExecutor[source]¶
Bases:
concurrent.futures._base.Executor
An executor to run coroutines from a normal function.
Copied from http://bit.ly/2IYmqzN.
To use, do
with AsyncioExecutor() as executor: future = executor.submit(asyncio.sleep, 1)
- shutdown(wait=True)[source]¶
Clean-up the resources associated with the Executor.
It is safe to call this method several times. Otherwise, no other methods can be called after this one.
- Parameters
wait – If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.
- class jaeger.utils.helpers.Poller(name, callback, delay=1, loop=None)[source]¶
Bases:
object
A task that runs a callback periodically.
- Parameters
- class jaeger.utils.helpers.PollerList(pollers=[])[source]¶
Bases:
list
A list of
Poller
to be managed jointly.- start(delay=None)[source]¶
Starts all the pollers.
- Parameters
delay (float) – The delay between calls to the callback. If not specified, uses the default delays for each poller.
- property names¶
List the poller names.
- class jaeger.utils.helpers.StatusMixIn(maskbit_flags, initial_status=None, callback_func=None, call_now=False)[source]¶
Bases:
Generic
[jaeger.utils.helpers.Status_co
]A mixin that provides status tracking with callbacks.
Provides a status property that executes a list of callbacks when the status changes.
- Parameters
maskbit_flags – A class containing the available statuses as a series of maskbit flags. Usually as subclass of
enum.Flag
.initial_status – The initial status.
callback_func – The function to call if the status changes.
call_now – Whether the callback function should be called when initialising.
- Variables
callbacks – A list of the callback functions to call.
- property flags¶
Gets the flags associated to this status.
- property status: Status_co | None¶
Returns the status.
jaeger.utils¶
- jaeger.utils.utils.bytes_to_int(bytes, dtype='u4', byteorder='little')[source]¶
Returns the integer from a bytearray representation.
- Parameters
bytes (
bytearray
) – The bytearray representing the integer.dtype (
numpy.dtype
orstr
) – Thenumpy.dtype
of the byte representation for the integer, or a type code that can include the endianess. Seeget_dtype_str
to understand howdtype
andbyteorder
will be parsed.byteorder (str) – Either
'big'
for big endian representation or'little'
for little end.'>'
and'<'
are also accepted, respectively.
- Returns
integer (int) – A integer represented by
bytes
.
Examples
>>> bytes_to_int(b'\x00\x05', dtype=numpy.uint16, byteorder='big') 5
- jaeger.utils.utils.convert_kaiju_trajectory(path, speed=None, step_size=0.03, invert=True)[source]¶
Converts a raw kaiju trajectory to a jaeger trajectory format.
- Parameters
- Returns
trajectory (
dict
) – A dictionary with the trajectory in a format understood bysend_trajectory
.
- jaeger.utils.utils.get_dtype_str(dtype, byteorder='little')[source]¶
Parses dtype and byte order to return a type string code.
- Parameters
dtype (
numpy.dtype
orstr
) – Either a dtype (e.g.,numpy.uint32
) or a string with the type code ('>u4'
). If a string type code and the first character indicates the byte order ('>'
for big,'<'
for little endian),byteorder
will be ignored. The type code refers to bytes, while the dtype classes refer to bits, i.e.,'u2'
is equivalent tobyteorder (str) – Either
'big'
for big endian representation or'little'
for little end.'>'
and'<'
are also accepted, respectively.
- Returns
type_code (
str
) – The type code for the input dtype and byte order.
Examples
>>> get_dtype_str(numpy.uint32, byteorder='big') '>u4' >>> get_dtype_str('u2', byteorder='>') '>u2' >>> get_dtype_str('<u2', byteorder='big') '<u2'
- jaeger.utils.utils.get_goto_move_time(move, speed=None)[source]¶
Returns the approximate time need for a given move, in seconds.
The move time is calculated as \(\dfrac{60 \alpha r}{360 v}\) where \(\alpha\) is the angle, \(r\) is the reduction ratio, and \(v\) is the speed in the input in RPM. It adds 0.25s due to deceleration; this value is not exact but it’s a good approximation for most situations.
- jaeger.utils.utils.get_identifier(positioner_id, command_id, uid=0, response_code=0)[source]¶
Returns a 29 bits identifier with the correct format.
The CAN identifier format for the positioners uses an extended frame with 29-bit encoding so that the 11 higher bits correspond to the positioner ID, the 8 middle bits are the command number, the following 6 bits are the unique identifier, and the 4 lower bits are the response code.
- Parameters
- Returns
identifier (
int
) – The decimal integer corresponding to the 29-bit identifier.
Examples
>>> get_identifier(5, 17, uid=5) 1328128 >>> bin(1328128) '0b101000100010000000000'
- jaeger.utils.utils.int_to_bytes(value, dtype='u4', byteorder='little')[source]¶
Returns a bytearray with the representation of an integer.
- Parameters
value (int) – The integer to convert to bytes.
dtype (
numpy.dtype
orstr
) – Thenumpy.dtype
of the byte representation for the integer, or a type code that can include the endianess. Seeget_dtype_str
to understand howdtype
andbyteorder
will be parsed.byteorder (str) – Either
'big'
for big endian representation or'little'
for little end.'>'
and'<'
are also accepted, respectively.
- Returns
bytes (
bytearray
) – Abytearray
with the representation for the input integer.
Examples
>>> int_to_bytes(5, dtype=numpy.uint16, byteorder='big') bytearray(b'\x00\x05')
- jaeger.utils.utils.motor_steps_to_angle(alpha, beta, motor_steps=None, inverse=False)[source]¶
Converts motor steps to angles or vice-versa.
- Parameters
- Returns
angles (
tuple
) – A tuple with the alpha and beta angles associated to the input motor steps. Ifinverse=True
,alpha
andbeta
are considered to be angles and the associated motor steps are returned.
- jaeger.utils.utils.parse_identifier(identifier)[source]¶
Parses an extended frame identifier and returns its components.
The 29-bit extended frame identifier is composed of a positioner id, a command id, and a response code. This function parses an identifier and returns the value of each element.
- Parameters
identifier (
int
) – The identifier returned by the CAN bus.- Returns
components (tuple) – A tuple with the components of the identifier. The first element is the positioner id, the second the command id, the third is the command UID, and the last one is the response flag as an instance of
ResponseCode
.
Examples
>>> parse_identifier(1315072) (5, 17, <ResponseCode.COMMAND_ACCEPTED: 0>) >>> parse_identifier(1315074) (5, 17, <ResponseCode.INVALID_TRAJECTORY: 2>)
jaeger.interfaces¶
- class jaeger.interfaces.cannet.CANNetBus(channel, port=None, bitrate=None, buses=[1], timeout=5, **kwargs)[source]¶
Bases:
can.bus.BusABC
Interface for Ixxat CAN@net NT 200/420.
- Parameters
channel (str) – The IP address of the remote device (e.g.
192.168.1.1
, …).port (int) – The port of the device.
bitrate (int) – Bitrate in bit/s.
buses (list) – The buses to open in the device. Messages that do not specify a bus will be sent to all the open buses.
timeout (float) – Timeout for connection.
- send(msg, bus=None, timeout=None)[source]¶
Transmit a message to the CAN bus.
Override this method to enable the transmit path.
- Parameters
msg (can.Message) – A message object.
timeout (float or None) – If > 0, wait up to this many seconds for message to be ACK’ed or for transmit queue to be ready depending on driver implementation. If timeout is exceeded, an exception will be raised. Might not be supported by all interfaces. None blocks indefinitely.
- Raises
can.CanError – if the message could not be sent
- class jaeger.interfaces.cannet.CANNetMessage(timestamp=0.0, arbitration_id=0, is_extended_id=None, is_remote_frame=False, is_error_frame=False, channel=None, dlc=None, data=None, is_fd=False, bitrate_switch=False, error_state_indicator=False, extended_id=None, check=False)[source]¶
Bases:
can.message.Message