Source code for jaeger.utils.utils

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2018-09-11
# @Filename: utils.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import os

from typing import Optional, Tuple

import numpy
from astropy.time import Time

from jaeger import config
from jaeger.exceptions import JaegerError
from jaeger.maskbits import ResponseCode


__all__ = [
    "get_dtype_str",
    "int_to_bytes",
    "bytes_to_int",
    "get_identifier",
    "parse_identifier",
    "motor_steps_to_angle",
    "get_goto_move_time",
    "get_sjd",
]


MOTOR_STEPS = config["positioner"]["motor_steps"]


[docs] def get_dtype_str(dtype, byteorder="little"): """Parses dtype and byte order to return a type string code. Parameters ---------- dtype : `numpy.dtype` or `str` Either a dtype (e.g., ``numpy.uint32``) or a string with the type code (``'>u4'``). If a string type code and the first character indicates the byte order (``'>'`` for big, ``'<'`` for little endian), ``byteorder`` will be ignored. The type code refers to bytes, while the dtype classes refer to bits, i.e., ``'u2'`` is equivalent to byteorder : str Either ``'big'`` for big endian representation or ``'little'`` for little end. ``'>'`` and ``'<'`` are also accepted, respectively. Returns ------- type_code : `str` The type code for the input dtype and byte order. Examples -------- :: >>> get_dtype_str(numpy.uint32, byteorder='big') '>u4' >>> get_dtype_str('u2', byteorder='>') '>u2' >>> get_dtype_str('<u2', byteorder='big') '<u2' """ if byteorder == "big": byteorder = ">" elif byteorder == "little": byteorder = "<" elif byteorder in [">", "<"]: pass else: raise ValueError(f"invalid byteorder {byteorder}") if isinstance(dtype, str): if dtype[0] in [">", "<"]: return dtype elif dtype[0] == "=": raise ValueError("invalid byte order =. Please, use a specific endianess.") else: return byteorder + dtype dtype_str = dtype().dtype.str return byteorder + dtype_str[1:]
[docs] def int_to_bytes(value, dtype="u4", byteorder="little"): r"""Returns a bytearray with the representation of an integer. Parameters ---------- value : int The integer to convert to bytes. dtype : `numpy.dtype` or `str` The `numpy.dtype` of the byte representation for the integer, or a type code that can include the endianess. See `.get_dtype_str` to understand how ``dtype`` and ``byteorder`` will be parsed. byteorder : str Either ``'big'`` for big endian representation or ``'little'`` for little end. ``'>'`` and ``'<'`` are also accepted, respectively. Returns ------- bytes : `bytearray` A `bytearray` with the representation for the input integer. Examples -------- :: >>> int_to_bytes(5, dtype=numpy.uint16, byteorder='big') bytearray(b'\x00\x05') """ type_code = get_dtype_str(dtype, byteorder=byteorder) np_value = numpy.array(value, dtype=type_code) return bytearray(np_value.tobytes())
[docs] def bytes_to_int(bytes, dtype="u4", byteorder="little"): r"""Returns the integer from a bytearray representation. Parameters ---------- bytes : `bytearray` The bytearray representing the integer. dtype : `numpy.dtype` or `str` The `numpy.dtype` of the byte representation for the integer, or a type code that can include the endianess. See `.get_dtype_str` to understand how ``dtype`` and ``byteorder`` will be parsed. byteorder : str Either ``'big'`` for big endian representation or ``'little'`` for little end. ``'>'`` and ``'<'`` are also accepted, respectively. Returns ------- integer : int A integer represented by ``bytes``. Examples -------- :: >>> bytes_to_int(b'\x00\x05', dtype=numpy.uint16, byteorder='big') 5 """ type_code = get_dtype_str(dtype, byteorder=byteorder) np_buffer = numpy.frombuffer(bytes, dtype=type_code) return np_buffer[0]
[docs] def get_identifier(positioner_id, command_id, uid=0, response_code=0): """Returns a 29 bits identifier with the correct format. The CAN identifier format for the positioners uses an extended frame with 29-bit encoding so that the 11 higher bits correspond to the positioner ID, the 8 middle bits are the command number, the following 6 bits are the unique identifier, and the 4 lower bits are the response code. Parameters ---------- positioner_id : int The Id of the positioner to command, or zero for broadcast. command_id : int The ID of the command to send. uid : int The unique identifier response_code : int The response code. Returns ------- identifier : `int` The decimal integer corresponding to the 29-bit identifier. Examples -------- :: >>> get_identifier(5, 17, uid=5) 1328128 >>> bin(1328128) '0b101000100010000000000' """ posid_bin = format(positioner_id, "011b") cid_bin = format(command_id, "08b") cuid_bin = format(uid, "06b") response_bin = format(int(response_code), "04b") identifier = posid_bin + cid_bin + cuid_bin + response_bin assert len(identifier) == 29 return int(identifier, 2)
[docs] def parse_identifier(identifier: int) -> Tuple[int, int, int, ResponseCode]: """Parses an extended frame identifier and returns its components. The 29-bit extended frame identifier is composed of a positioner id, a command id, and a response code. This function parses an identifier and returns the value of each element. Parameters ---------- identifier The identifier returned by the CAN bus. Returns ------- components A tuple with the components of the identifier. The first element is the positioner id, the second the command id, the third is the command UID, and the last one is the response flag as an instance of `~jaeger.maskbits.ResponseCode`. Examples -------- :: >>> parse_identifier(1315072) (5, 17, <ResponseCode.COMMAND_ACCEPTED: 0>) >>> parse_identifier(1315074) (5, 17, <ResponseCode.INVALID_TRAJECTORY: 2>) """ def last(k, n): return (k) & ((1 << (n)) - 1) def mid(k, m, n): return last((k) >> (m), ((n) - (m))) positioner_id = mid(identifier, 18, 29) command_id = mid(identifier, 10, 18) command_uid = mid(identifier, 4, 10) response_code = mid(identifier, 0, 4) response_flag = ResponseCode(response_code) return positioner_id, command_id, command_uid, response_flag
[docs] def motor_steps_to_angle(alpha, beta, motor_steps=None, inverse=False): """Converts motor steps to angles or vice-versa. Parameters ---------- alpha : float The alpha position. beta : float The beta position. motor_steps : int The number of steps in the motor. Defaults to the configuration value ``positioner.moter_steps``. inverse : bool If `True`, converts from angles to motor steps. Returns ------- angles : `tuple` A tuple with the alpha and beta angles associated to the input motor steps. If ``inverse=True``, ``alpha`` and ``beta`` are considered to be angles and the associated motor steps are returned. """ motor_steps = motor_steps or MOTOR_STEPS if inverse: return ( int(numpy.round(alpha * motor_steps / 360.0)), int(numpy.round(beta * motor_steps / 360.0)), ) return alpha / motor_steps * 360.0, beta / motor_steps * 360.0
[docs] def get_goto_move_time(move, speed=None): r"""Returns the approximate time need for a given move, in seconds. The move time is calculated as :math:`\dfrac{60 \alpha r}{360 v}` where :math:`\alpha` is the angle, :math:`r` is the reduction ratio, and :math:`v` is the speed in the input in RPM. It adds 0.25s due to deceleration; this value is not exact but it's a good approximation for most situations. Parameters ---------- move : float The move, in degrees. speed : float The speed of the motor for the move, in RPM on the input. """ speed = speed or config["positioner"]["motor_speed"] return move * config["positioner"]["reduction_ratio"] / (6.0 * speed) + 0.25
[docs] def get_sjd(observatory: Optional[str] = None) -> int: """Returns the SDSS Julian Date as an integer based on the observatory. Parameters ---------- observatory The current observatory, either APO or LCO. If `None`, uses ``$OBSERVATORY``. """ if observatory is None: try: observatory = os.environ["OBSERVATORY"] except KeyError: raise JaegerError("Observatory not passed and $OBSERVATORY is not set.") observatory = observatory.upper() if observatory not in ["APO", "LCO"]: raise JaegerError(f"Invalid observatory {observatory}.") time = Time.now() mjd = time.mjd if observatory == "APO": return int(mjd + 0.3) else: return int(mjd + 0.4)