Programmatic API#
jaeger.alerts#
- class jaeger.alerts.AlertsBot(fps)[source]#
Bases:
BaseBot
Monitors values and raises alerts.
- Parameters:
fps (FPS) –
- set_keyword(keyword, new_value)[source]#
Sets the value of an alert keyword and outputs it to the actor.
Returns a boolean indicating whether the value has changed.
jaeger.can#
- class jaeger.can.CANnetInterface(interface_type, channels, fps=None, interface_args=<factory>, status_interval=5)[source]#
-
An interface class specifically for the CAN@net 200/420 device.
This class bahaves as
JaegerCAN
but allows communication with the device itself and tracks its status.- Parameters:
- handle_device_message(msg)[source]#
Handles a reply from the device (i.e., not from the CAN network).
- Parameters:
msg (Message) –
- property device_status#
Returns a dictionary with the status of the device.
- class jaeger.can.JaegerCAN(interface_type, channels, fps=None, interface_args=<factory>)[source]#
Bases:
Generic
[Bus_co
]A CAN interface with a command queue and reply handling.
Provides support for multi-channel CAN networks, with each channel being able to host more than one bus. The recommended way to instantiate a new
JaegerCAN
object is using thecreate
classmethodcan = await JaegerCAN.create(...)
which is equivalent to
can = JaegerCAN(...) await can.start()
- Parameters:
interface_type (str) – One of
INTERFACES
.channels (list | tuple) – A list of channels to be used to instantiate the interfaces.
fps (jaeger.fps.FPS | None) – The focal plane system.
interface_args (Dict[str, Any]) – Keyword arguments to pass to the interfaces when initialising it (e.g., port, baudrate, etc).
- async classmethod create(profile=None, fps=None, interface_type=None, channels=[], interface_args={})[source]#
Create and initialise a new bus interface from a configuration profile.
This is the preferred method to initialise a new
JaegerCAN
instance and is equivalent to callingJaegerCAN
and thenstart
.- Parameters:
profile (Optional[str]) – The name of the profile that defines the bus interface, or
None
to use the default configuration.fps (FPS | None) – The focal plane system.
interface_type (Optional[str]) – One of
INTERFACES
. Cannot be used withprofile
.channels (list | tuple) – A list of channels to be used to instantiate the interfaces.
interface_args (Dict[str, Any]) – Keyword arguments to pass to the interfaces when initialising it (e.g., port, baudrate, etc).
- Return type:
- jaeger.can.INTERFACES = {'cannet': {'class': <class 'jaeger.interfaces.cannet.CANNetBus'>, 'multibus': True}, 'slcan': {'class': None, 'multibus': False}, 'socketcan': {'class': None, 'multibus': False}, 'virtual': {'class': <class 'jaeger.interfaces.virtual.VirtualBus'>, 'multibus': False}}#
Accepted CAN interfaces and whether they are multibus.
jaeger.chiller#
jaeger.fps#
- class jaeger.fps.BaseFPS(*args, **kwargs)[source]#
Bases:
Dict
[int
,Positioner
],Generic
[FPS_T
]A class describing the Focal Plane System.
This class includes methods to read the layout and construct positioner objects and can be used by the real
FPS
class or theVirtualFPS
.BaseFPS
instances are singletons in the sense that one cannot instantiate more than one. An error is raise if__new__
is called with an existing instance. To retrieve the running instance, useget_instance
.- Variables:
positioner_class (ClassVar[Type[jaeger.positioner.Positioner]]) – The class to be used to create a new positioner. In principle this will be
Positioner
but it may be different if the positioners are created for aVirtualFPS
.
- positioner_class#
alias of
Positioner
- add_positioner(positioner, centre=(None, None))[source]#
Adds a new positioner to the list, and checks for duplicates.
- Parameters:
positioner (int | Positioner) –
- Return type:
- class jaeger.fps.FPS(*args, **kwargs)[source]#
Bases:
BaseFPS
[FPS]A class describing the Focal Plane System.
The recommended way to instantiate a new
FPS
object is to use thecreate
classmethodfps = await FPS.create(...)
which is equivalent to
fps = FPS(...) await fps.initialise()
- Parameters:
can (jaeger.can.JaegerCAN | str | None) – A
JaegerCAN
instance to use to communicate with the CAN network, or the CAN profile from the configuration to use, orNone
to use the default one.ieb (bool | jaeger.ieb.IEB | dict | str | pathlib.Path | None) – If
True
orNone
, connects the Instrument Electronics Box PLC controller using the path to the IEB configuration file stored in jaeger’s configuration. Can also be anIEB
instance, the path to a custom configuration file used to load one, or a dictionary with the configuration itself.status (FPSStatus) –
Examples
After instantiating a new
FPS
object it is necessary to callinitialise
to retrieve the positioner layout and the status of the connected positioners. Note thatinitialise
is a coroutine which needs to be awaited>>> fps = FPS(can='default') >>> await fps.initialise() >>> fps.positioners[4].status <Positioner (id=4, status='SYSTEM_INITIALIZED| DISPLACEMENT_COMPLETED|ALPHA_DISPLACEMENT_COMPLETED| BETA_DISPLACEMENT_COMPLETED')>
- abort()[source]#
Aborts trajectories and stops positioners. Alias for
stop_trajectory
.
- add_positioner(positioner, centre=(None, None), interface=None, bus=None)[source]#
Adds a new positioner to the list, and checks for duplicates.
- Parameters:
positioner (int | Positioner) –
bus (int | None) –
- Return type:
- async classmethod create(can=None, ieb=None, initialise=True, start_pollers=None, enable_low_temperature=True)[source]#
Starts the CAN bus and .
- get_positions(ignore_disabled=False)[source]#
Returns the alpha and beta positions as an array.
- Return type:
- get_positions_dict(ignore_disabled=False)[source]#
Returns the alpha and beta positions as a dictionary.
- async goto(new_positions, speed=None, relative=False, use_sync_line=None, go_cowboy=False)[source]#
Sends a list of positioners to a given position.
- Parameters:
new_positions (dict[int, tuple[float, float]]) – The new positions as a dictionary of positioner ID to a tuple of new alpha and beta angles. Positioners not specified will be kept on the same positions.
speed (float | None) – The speed to use.
relative – If
True
,alpha
andbeta
are considered relative angles.use_sync_line (bool | None) – Whether to use the SYNC line to start the trajectories.
go_cowboy (bool) – If set, does not create a
kaiju
-safe trajectory. Use at your own risk.
- async initialise(start_pollers=None, enable_low_temperature=True, keep_disabled=True, skip_fibre_assignments_check=False)[source]#
Initialises all positioners with status and firmware version.
- Parameters:
- Return type:
T
- async lock(stop_trajectories=True, by=None, do_warn=True, snapshot=True)[source]#
Locks the
FPS
and prevents commands to be sent.
- async save_snapshot(path=None, collision_buffer=None, positions=None, highlight=None, show_disabled=True, write_to_actor=True)[source]#
Creates a plot with the current arrangement of the FPS array.
- Parameters:
path (Optional[str | pathlib.Path]) – The path where to save the plot. Defaults to
/data/logs/jaeger/snapshots/MJD/fps_snapshot_<SEQ>.pdf
.collision_buffer (float | None) – The collision buffer.
positions (dict | None) – A dictionary of positioner_id to a mapping of
"alpha"
and"beta"
positions ({124: {"alpha": 223.4, "beta": 98.1}, ...}
). If not provided, the internal FPS positions will be used.show_disabled (bool) – If
True
, greys out disabled positioners.write_to_actor (bool) – If
True
, writes the name of the snapshot to the actor users.
- Return type:
str | Axes
- send_command(command, positioner_ids=None, data=None, now=False, **kwargs)[source]#
Sends a command to the bus.
- Parameters:
command (str | int | CommandID | Command) – The ID of the command, either as the integer value, a string, or the
CommandID
flag. Alternatively, theCommand
to send.positioner_ids (int | List[int] | None) – The positioner IDs to command, or zero for broadcast. If
None
, sends the command to all FPS non-disabled positioners.data (Any | None) – The bytes to send. See
Command
for details on the format.interface – The index in the interface list for the interface to use. Only relevant in case of a multibus interface. If
None
, the positioner to bus map will be used.bus – The bus within the interface to be used. Only relevant in case of a multibus interface. If
None
, the positioner to bus map will be used.now (bool) – If
True
, the command is sent to the CAN network immediately, skipping the command queue. No tracking is done for this command. It should only be used for emergency and shutdown commands.kwargs – Extra arguments to be passed to the command.
- Returns:
command – The command sent to the bus. The command needs to be awaited before it is considered done.
- Return type:
- async send_to_all(*args, **kwargs)[source]#
Sends a command to all connected positioners.
This method has been deprecated. Use
send_command
with a list forpositioner_ids
instead.
- async send_trajectory(*args, **kwargs)[source]#
Sends a set of trajectories to the positioners.
See the documentation for
send_trajectory
.- Returns:
trajectory – The
Trajectory
object.- Raises:
TajectoryError – You can inspect the
Trajectory
object by capturing the error and accessingerror.trajectory
.
- async stop_trajectory(clear_flags=False)[source]#
Stops all the positioners without clearing collided flags.
- Parameters:
clear_flags – If
True
, sendsSTOP_TRAJECTORY
which clears collided flags. Otherwise sendsSEND_TRAJECTORY_ABORT
.
- async update_firmware_version(timeout=2, is_retry=False)[source]#
Updates the firmware version of all connected positioners.
- async update_position(positioner_ids=None, timeout=2, is_retry=False)[source]#
Updates positions.
- Parameters:
positioner_ids (int | List[int] | None) – The list of positioners to update. If
None
, update all initialised positioners.timeout (float) – How long to wait before timing out the command.
is_retry (bool) – A flag to determine whether the function is being called as a retry if the previous command timed out.
- Return type:
- async update_status(positioner_ids=None, timeout=2, is_retry=False)[source]#
Update statuses for all positioners.
- Parameters:
- Return type:
- property configuration#
Returns the configuration.
jaeger.fvc#
- class jaeger.fvc.FVC(site, command=None)[source]#
Bases:
object
Focal View Camera class.
- async apply_correction(offsets=None)[source]#
Applies the offsets. Fails if the trajectory is collided or deadlock.
- Parameters:
offsets (DataFrame | None) –
- calculate_offsets(reported_positions, fibre_data=None, k=None, max_correction=None)[source]#
Determines the offset to apply to the currently reported positions.
Measured wok positions from the fibre data are converted to positioner coordinates. An alpha/beta offset is calculated with respect to the expected positions. The offsets is then applied to the current positions as self-reported by the positioners. Optionally, the offset can be adjusted using a PID loop.
- Parameters:
reported_positions (ndarray) – Reported positions for the positioners as a numpy array. Usually the output of
FPS.get_positions
.fibre_data (DataFrame | None) – The fibre data table. Only the metrology entries are used. Must have the
xwok_measured
andywok_measured
column populated. IfNone
, uses the data frame calculated whenprocess_fvc_image
last run.k (float | None) – The fraction of the correction to apply.
max_correction (float | None) – The maximum offset allowed per robot and arm, in degrees. Corrections larger than
max_offset
are clipped.
- Returns:
new_positions – The new alpha and beta positions as a Pandas dataframe indexed by positions ID. If
None
, uses the valuefvc.k
from the configuration.- Return type:
DataFrame
- async expose(exposure_time=5.0, stack=1, use_tron_fallback=True)[source]#
Takes an exposure with the FVC and blocks until the exposure is complete.
Returns the path to the new image. If
use_tron_fallback=True
and the command has not been set, creates a Tron client to command the FVC.
- log(msg, level=20, to_log=True, to_command=True, broadcast=False)[source]#
Logs a message, including to the command if present.
- process_fvc_image(path, positioner_coords, configuration=None, fibre_data=None, fibre_type='Metrology', centroid_method=None, use_new_invkin=True, polids=None, plot=False, outdir=None, loop=None)[source]#
Processes a raw FVC image.
- Parameters:
path (pathlib.Path | str) – The path to the raw FVC image.
positioner_coords (dict) – A dictionary of positioner ID to
(alpha, beta)
with the current positions of the robots.configuration (Optional[BaseConfiguration]) – A configuration object to use for processing. If
None
, defaults to the currentFPS
loaded configuration.fibre_data (Optional[pandas.DataFrame]) – A Pandas data frame with the expected coordinates of the targets. It is expected the data frame will have columns
positioner_id
,hole_id
,fibre_type
,xwok
, andywok
. This frame is appended to the processed image. Normally this parameters is left empty and the fibre table from the configuration loaded into the FPS instace is used.fibre_type (str) – The
fibre_type
rows infibre_data
to use. Defaults tofibre_type='Metrology'
.centroid_method (str | None) – The centroid method to use, one of
"nudge"
,"sep"
,"winpos"
, or"simple"
. Defaults to"nudge"
.use_new_invkin (bool) – Use new inverse kinnematic to calculate alpha/beta.
polids (list[int] | None) – The list of ZB polynomial orders to use. If
None
defaults to the coordioFVCTransform
orders.plot (bool | str) – Whether to save additional debugging plots along with the processed image. If
plot
is a string, it will be used as the directory to which to save the plots.loop (asyncio.AbstractEventLoop | None) – The running event loop. Used to schedule the plotting of the FVC transform fit as a task.
outdir (str | None) –
- Returns:
result – A tuple with the read raw image HDU (with columns flipped) as the first argument and the expected coordinates, as a data frame, as the second. The data frame is the same as the input target coordinates but with the columns
xwok_measured
andywok_measured
appended.- Return type:
tuple[fits.ImageHDU, pandas.DataFrame, pandas.DataFrame | None]
jaeger.ieb#
jaeger.interfaces#
- class jaeger.interfaces.BusABC(*args, **kwargs)[source]#
Bases:
object
A base CAN bus.
- async open(*args, **kwargs)[source]#
Starts the bus.
This method call the
_open_internal
method in the subclass bus, if present. It’s meant mainly to initialise any process that needs to be run as a coroutine. Must returnTrue
if the connection was successful,False
or an error otherwise.- Return type:
- class jaeger.interfaces.CANNetBus(channel, port=None, bitrate=None, buses=[1], timeout=5, **kwargs)[source]#
Bases:
BusABC
Interface for Ixxat CAN@net NT 200/420.
- Parameters:
channel (str) – The IP address of the remote device (e.g.
192.168.1.1
, …).port (int) – The port of the device.
bitrate (int) – Bitrate in bit/s.
buses (list) – The buses to open in the device. Messages that do not specify a bus will be sent to all the open buses.
timeout (float) – Timeout for connection.
- class jaeger.interfaces.Message(timestamp=0.0, arbitration_id=0, is_extended_id=None, is_remote_frame=False, is_error_frame=False, channel=None, dlc=None, data=None, is_fd=False, bitrate_switch=False, error_state_indicator=False, extended_id=None, check=False)[source]#
Bases:
object
The
Message
object is used to represent CAN messages for sending, receiving and other purposes like converting between different logging formats.Messages can use extended identifiers, be remote or error frames, contain data and may be associated to a channel.
Messages are always compared by identity and never by value, because that may introduce unexpected behaviour. See also
equals()
.copy()
/deepcopy()
is supported as well.Messages do not support “dynamic” attributes, meaning any others than the documented ones, since it uses
__slots__
.This class is copied directly from
python-can
.
- class jaeger.interfaces.Notifier(listeners=[], buses=[])[source]#
Bases:
object
Notifier class to report bus messages to multiple listeners.
- Parameters:
listeners (List[Listener_co]) –
buses (List[Bus_co]) –
jaeger.kaiju#
- async jaeger.kaiju.check_trajectory(trajectory, current_positions=None, fps=None, atol=1.0)[source]#
Checks that the current position matches the starting point of a trajectory.
- Parameters:
trajectory (dict) – The dictionary with the trajectory to check.
current_positions (dict | None) – A mapping of positioner ID to
(alpha, beta)
with the current arrangement of the FPS array.fps (FPS | None) – If
current_positions
is not passed, theFPS
instance is used to determine the current arrangement.
- Return type:
- jaeger.kaiju.decollide(robot_grid=None, data=None, simple=False, decollide_grid_fallback=False, priority_order=[])[source]#
Decollides a potentially collided grid. Raises on fail.
- Parameters:
robot_grid (Optional[RobotGridCalib]) – The Kaiju
RobotGridCalib
instance to decollide.data (Optional[dict]) – A dictionary of data that can be used to reload a Kaiju robot grid using
load_robot_grid
. This is useful if the function is being run in an executor.simple (bool) – Runs
decollideGrid()
and returns.decollide_grid_fallback (bool) – If
True
, runsdecollideGrid()
if the positioner-by-positioner decollision fails.priority_list – A sorted list of positioner IDs with the order of which positioners to try to keep at their current positions. Positioners earlier in the list will be decollided last. Ignore in case of
simple=True
.
- Returns:
grid,decollided – If
robot_grid
is passed, returns the same grid instance after decollision. Ifdata
is passed, returns a dictionary describing the decollided grid that can be used to recreate a grid usingload_robot_grid
. Also returns the list of robots that have been decollided.- Return type:
- async jaeger.kaiju.decollide_in_executor(robot_grid, **kwargs)[source]#
Calls
decollide
with a process executor.
- jaeger.kaiju.dump_robot_grid(robot_grid)[source]#
Dump the information needed to restore a robot grid into a dictionary.
- Parameters:
robot_grid (RobotGridCalib) –
- Return type:
- async jaeger.kaiju.explode(current_positions, explode_deg=20.0, collision_buffer=None, disabled=[], positioner_id=None)[source]#
Explodes the grid by a number of degrees.
This coroutine uses a process pool executor to run Kaiju routines.
- jaeger.kaiju.get_path_pair(robot_grid=None, data=None, path_generation_mode=None, ignore_did_fail=False, explode_deg=5, explode_positioner_id=None, speed=None, smooth_points=None, path_delay=None, collision_shrink=None, greed=None, phobia=None, stop_if_deadlock=False, ignore_initial_collisions=False)[source]#
Runs path generation and returns the to and from destination paths.
- Parameters:
robot_grid (Optional[RobotGridCalib]) – The Kaiju
RobotGridCalib
instance to decollide.data (Optional[dict]) – A dictionary of data that can be used to reload a Kaiju robot grid using
load_robot_grid
. This is useful if the function is being run in an executor.path_generation_mode (str | None) – Defines the path generation algorithm to use. Either
greedy
,mdp
,explode
orexplode_one
. IfNone
, defaults tokaiju.default_path_generator
.ignore_did_fail (bool) – Generate paths even if path generation failed (i.e., deadlocks).
explode_deg (float) – Degrees for
pathGenExplode
.explode_positioner_id (int | None) – The positioner to explode.
speed – Kaiju parameters to pass to
getPathPair
. Otherwise uses the default configuration values.smooth_points – Kaiju parameters to pass to
getPathPair
. Otherwise uses the default configuration values.path_delay – Kaiju parameters to pass to
getPathPair
. Otherwise uses the default configuration values.collision_shrink – Kaiju parameters to pass to
getPathPair
. Otherwise uses the default configuration values.phobia (float | None) – Parameters for
pathGenMDP
. If not set useskaiju
configuration values.greed (float | None) – Parameters for
pathGenMDP
. If not set useskaiju
configuration values.stop_if_deadlock (bool) – If
True
, detects deadlocks early in the path and returns shorter trajectories (at the risk of some false positive deadlocks).ignore_initial_collisions (bool) – If
True
, does not fail if the initial state is collided. To be used only for offsets.
- Returns:
paths – A tuple with the to destination path, from destination path, whether path generation failed, and the list of deadlocks
- Return type:
- async jaeger.kaiju.get_path_pair_in_executor(robot_grid, **kwargs)[source]#
Calls
get_path_pair
with a process executor.- Parameters:
robot_grid (RobotGridCalib) –
- jaeger.kaiju.get_robot_grid(fps, seed=None, collision_buffer=None)[source]#
Returns a new robot grid with the destination set to the lattice position.
If an initialised instance of the FPS is available, disabled robots will be set offline in the grid at their current positions.
- async jaeger.kaiju.get_snapshot(path, fps=None, positions=None, collision_buffer=None, highlight=None, title=None, show_disabled=True)[source]#
Plots a snapshot of the FPS and saves it to disk.
jaeger.positioner#
- class jaeger.positioner.Positioner(positioner_id, fps=None, centre=(None, None))[source]#
Bases:
StatusMixIn
Represents the status and parameters of a positioner.
- Parameters:
positioner_id (int) – The ID of the positioner
fps (jaeger.FPS | None) – The
FPS
instance to which this positioner is linked to.centre (Tuple[Optional[float], Optional[float]]) – The \((x_{\rm focal}, y_{\rm focal})\) coordinates of the central axis of the positioner.
sextant – The id of the sextant to which this positioner is connected.
- async get_number_trajectories()[source]#
Returns the number of trajectories executed by the positioner.
Will return
None
if the firmware does not support theGET_NUMBER_TRAJECTORIES
.- Return type:
int | None
- async home(alpha=True, beta=True)[source]#
Homes the positioner.
Zeroes the positioner by counter-clockwise rotating alpha and beta until they hit the hardstops. Blocks until the move is complete.
- async send_command(command, error=None, **kwargs)[source]#
Sends and awaits a command to the FPS for this positioner.
- Parameters:
error (str | None) –
- async set_loop(motor='both', loop='closed', collisions=True)[source]#
Sets the control loop for a motor.
These parameters are cleared after a restart. The motors revert to closed loop with collision detection.
- Parameters:
motor – The motor to which these changes apply, either
'alpha`'
,'beta'
, or'both'
.loop – The type of control loop, either
'open'
or'closed'
.collisions – Whether the firmware should automatically detect collisions and stop the positioner.
- async set_precise_move(mode, alpha=True, beta=True)[source]#
Switches the precise moves on alpha and beta.
- async update_position(position=None, timeout=1)[source]#
Updates the position of the alpha and beta arms.
- async update_status(status=None, timeout=1.0)[source]#
Updates the status of the positioner.
- Parameters:
status (PositionerStatusV4_1 | int | None) –
- async wait_for_status(status, delay=1, timeout=None)[source]#
Polls the status until it reaches a certain value.
- Parameters:
status (List[PositionerStatusV4_1]) – The status to wait for. Can be a list in which case it will wait until all the statuses in the list have been reached.
delay – Time, in seconds, to wait between position updates.
timeout (float | None) – How many seconds to wait for the status to reach the desired value before aborting.
- Returns:
result – Returns
True
if the status has been reached orFalse
if the timeout limit was reached.- Return type:
- property initialised#
Returns
True
if the system and datums have been initialised.
- property position#
Returns a tuple with the
(alpha, beta)
position.
jaeger.utils#
- jaeger.utils.utils.bytes_to_int(bytes, dtype='u4', byteorder='little')[source]#
Returns the integer from a bytearray representation.
- Parameters:
bytes (
bytearray
) – The bytearray representing the integer.dtype (
numpy.dtype
orstr
) – Thenumpy.dtype
of the byte representation for the integer, or a type code that can include the endianess. Seeget_dtype_str
to understand howdtype
andbyteorder
will be parsed.byteorder (str) – Either
'big'
for big endian representation or'little'
for little end.'>'
and'<'
are also accepted, respectively.
- Returns:
integer (int) – A integer represented by
bytes
.
Examples
>>> bytes_to_int(b'\x00\x05', dtype=numpy.uint16, byteorder='big') 5
- jaeger.utils.utils.get_dtype_str(dtype, byteorder='little')[source]#
Parses dtype and byte order to return a type string code.
- Parameters:
dtype (
numpy.dtype
orstr
) – Either a dtype (e.g.,numpy.uint32
) or a string with the type code ('>u4'
). If a string type code and the first character indicates the byte order ('>'
for big,'<'
for little endian),byteorder
will be ignored. The type code refers to bytes, while the dtype classes refer to bits, i.e.,'u2'
is equivalent tobyteorder (str) – Either
'big'
for big endian representation or'little'
for little end.'>'
and'<'
are also accepted, respectively.
- Returns:
type_code (
str
) – The type code for the input dtype and byte order.
Examples
>>> get_dtype_str(numpy.uint32, byteorder='big') '>u4' >>> get_dtype_str('u2', byteorder='>') '>u2' >>> get_dtype_str('<u2', byteorder='big') '<u2'
- jaeger.utils.utils.get_goto_move_time(move, speed=None)[source]#
Returns the approximate time need for a given move, in seconds.
The move time is calculated as \(\dfrac{60 \alpha r}{360 v}\) where \(\alpha\) is the angle, \(r\) is the reduction ratio, and \(v\) is the speed in the input in RPM. It adds 0.25s due to deceleration; this value is not exact but it’s a good approximation for most situations.
- jaeger.utils.utils.get_identifier(positioner_id, command_id, uid=0, response_code=0)[source]#
Returns a 29 bits identifier with the correct format.
The CAN identifier format for the positioners uses an extended frame with 29-bit encoding so that the 11 higher bits correspond to the positioner ID, the 8 middle bits are the command number, the following 6 bits are the unique identifier, and the 4 lower bits are the response code.
- Parameters:
- Returns:
identifier (
int
) – The decimal integer corresponding to the 29-bit identifier.
Examples
>>> get_identifier(5, 17, uid=5) 1328128 >>> bin(1328128) '0b101000100010000000000'
- jaeger.utils.utils.get_sjd(observatory=None)[source]#
Returns the SDSS Julian Date as an integer based on the observatory.
- jaeger.utils.utils.int_to_bytes(value, dtype='u4', byteorder='little')[source]#
Returns a bytearray with the representation of an integer.
- Parameters:
value (int) – The integer to convert to bytes.
dtype (
numpy.dtype
orstr
) – Thenumpy.dtype
of the byte representation for the integer, or a type code that can include the endianess. Seeget_dtype_str
to understand howdtype
andbyteorder
will be parsed.byteorder (str) – Either
'big'
for big endian representation or'little'
for little end.'>'
and'<'
are also accepted, respectively.
- Returns:
bytes (
bytearray
) – Abytearray
with the representation for the input integer.
Examples
>>> int_to_bytes(5, dtype=numpy.uint16, byteorder='big') bytearray(b'\x00\x05')
- jaeger.utils.utils.motor_steps_to_angle(alpha, beta, motor_steps=None, inverse=False)[source]#
Converts motor steps to angles or vice-versa.
- Parameters:
- Returns:
angles (
tuple
) – A tuple with the alpha and beta angles associated to the input motor steps. Ifinverse=True
,alpha
andbeta
are considered to be angles and the associated motor steps are returned.
- jaeger.utils.utils.parse_identifier(identifier)[source]#
Parses an extended frame identifier and returns its components.
The 29-bit extended frame identifier is composed of a positioner id, a command id, and a response code. This function parses an identifier and returns the value of each element.
- Parameters:
identifier (int) – The identifier returned by the CAN bus.
- Returns:
components – A tuple with the components of the identifier. The first element is the positioner id, the second the command id, the third is the command UID, and the last one is the response flag as an instance of
ResponseCode
.- Return type:
Tuple[int, int, int, ResponseCode]
Examples
>>> parse_identifier(1315072) (5, 17, <ResponseCode.COMMAND_ACCEPTED: 0>) >>> parse_identifier(1315074) (5, 17, <ResponseCode.INVALID_TRAJECTORY: 2>)
- class jaeger.utils.helpers.AsyncQueue(callback=None)[source]#
Bases:
Queue
Provides an
asyncio.Queue
object with a watcher.- Parameters:
callback (Optional[Callable]) – A function to call when a new item is received from the queue. It can be a coroutine.
- class jaeger.utils.helpers.AsyncioExecutor[source]#
Bases:
Executor
An executor to run coroutines from a normal function.
Copied from http://bit.ly/2IYmqzN.
To use, do
with AsyncioExecutor() as executor: future = executor.submit(asyncio.sleep, 1)
- shutdown(wait=True)[source]#
Clean-up the resources associated with the Executor.
It is safe to call this method several times. Otherwise, no other methods can be called after this one.
- Parameters:
wait – If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.
cancel_futures – If True then shutdown will cancel all pending futures. Futures that are completed or running will not be cancelled.
- class jaeger.utils.helpers.BaseBot(fps)[source]#
Bases:
object
A class that monitors a subsystem.
- Parameters:
fps (FPS) –
- class jaeger.utils.helpers.Poller(name, callback, delay=1.0, loop=None)[source]#
Bases:
object
A task that runs a callback periodically.
- Parameters:
- class jaeger.utils.helpers.PollerList(pollers=[])[source]#
Bases:
list
A list of
Poller
to be managed jointly.- start(delay=None)[source]#
Starts all the pollers.
- Parameters:
delay (float) – The delay between calls to the callback. If not specified, uses the default delays for each poller.
- property names#
List the poller names.
- class jaeger.utils.helpers.StatusMixIn(maskbit_flags, initial_status=None, callback_func=None, call_now=False)[source]#
Bases:
Generic
[Status_co
]A mixin that provides status tracking with callbacks.
Provides a status property that executes a list of callbacks when the status changes.
- Parameters:
maskbit_flags (Type[Status_co]) – A class containing the available statuses as a series of maskbit flags. Usually as subclass of
enum.Flag
.initial_status (Optional[Status_co]) – The initial status.
callback_func (Optional[Callable]) – The function to call if the status changes.
call_now (bool) – Whether the callback function should be called when initialising.
- Variables:
callbacks – A list of the callback functions to call.
- property flags#
Gets the flags associated to this status.
- property status: Status_co#
Returns the status.
- async jaeger.utils.helpers.run_in_executor(fn, *args, catch_warnings=False, executor='thread', **kwargs)[source]#
Runs a function in an executor.
In addition to streamlining the use of the executor, this function catches any warning issued during the execution and reissues them after the executor is done. This is important when using the actor log handler since inside the executor there is no loop that CLU can use to output the warnings.
In general, note that the function must not try to do anything with the actor since they run on different loops.