#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2018-10-07
# @Filename: positioner.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
import asyncio
import logging
from distutils.version import StrictVersion
import numpy.testing
from jaeger import config, log, maskbits
from jaeger.commands import CommandID
from jaeger.exceptions import JaegerError, PositionerError
from jaeger.utils import StatusMixIn, bytes_to_int
__ALL__ = ['Positioner', 'VirtualPositioner']
[docs]class Positioner(StatusMixIn):
r"""Represents the status and parameters of a positioner.
Parameters
----------
positioner_id : int
The ID of the positioner
fps : `~jaeger.fps.FPS`
The `~jaeger.fps.FPS` instance to which this positioner is linked to.
centre : tuple
The :math:`(x_{\rm focal}, y_{\rm focal})` coordinates of the
central axis of the positioner.
"""
def __init__(self, positioner_id, fps, centre=(None, None)):
self.fps = fps
self.positioner_id = positioner_id
self.centre = centre
self.alpha = None
self.beta = None
self.speed = (None, None)
self.firmware = None
self._move_time = None
super().__init__(maskbit_flags=maskbits.PositionerStatus,
initial_status=maskbits.PositionerStatus.UNKNOWN)
@property
def position(self):
"""Returns a tuple with the ``(alpha, beta)`` position."""
return (self.alpha, self.beta)
@property
def collision(self):
"""Returns `True` if the positioner is collided."""
if not self.status:
return False
return self.status.collision
@property
def moving(self):
"""Returns `True` if the positioner is moving."""
if self.status.DISPLACEMENT_COMPLETED not in self.status:
return True
return False
@property
def move_time(self):
"""Returns the move time."""
if not self.moving:
self._move_time = None
return self._move_time
@move_time.setter
def move_time(self, value):
"""Sets the move time."""
self._move_time = value
@property
def initialised(self):
"""Returns ``True`` if the system and datums have been initialised."""
if self.status is None:
return False
if self.is_bootloader():
if self.status != maskbits.BootloaderStatus.UNKNOWN:
return True
return False
if (not self.status.initialised or
self.flags.DATUM_ALPHA_INITIALIZED not in self.status or
self.flags.DATUM_BETA_INITIALIZED not in self.status):
return False
return True
[docs] def reset(self):
"""Resets positioner values and statuses."""
self.alpha = None
self.beta = None
self.status = self.flags.UNKNOWN
self.firmware = None
def _log(self, message, level=logging.DEBUG):
"""Logs a message."""
log.log(level, f'Positioner {self.positioner_id}: {message}')
[docs] async def send_command(self, command, error=None, **kwargs):
"""Sends and awaits a command to the FPS for this positioner."""
if not self.fps:
raise PositionerError('FPS is not set.')
command = await self.fps.send_command(command,
positioner_id=self.positioner_id,
**kwargs)
if error and (command.status.failed or command.status.timed_out):
raise PositionerError(error)
return command
[docs] async def update_position(self, position=None, timeout=1):
"""Updates the position of the alpha and beta arms."""
if position is None:
command = await self.send_command(CommandID.GET_ACTUAL_POSITION,
timeout=timeout,
silent_on_conflict=True,
override=True)
if command.status.failed:
self.alpha = self.beta = None
raise PositionerError('failed updating position')
try:
position = command.get_positions()
except ValueError:
raise PositionerError('cannot parse current position.')
self.alpha, self.beta = position
self._log(f'at position ({self.alpha:.2f}, {self.beta:.2f})')
[docs] async def update_status(self, status=None, timeout=1.):
"""Updates the status of the positioner."""
# Need to update the firmware to make sure we get the right flags.
await self.update_firmware_version()
if not status:
command = await self.send_command(CommandID.GET_STATUS,
timeout=timeout,
silent_on_conflict=True,
error='cannot get status.')
n_replies = len(command.replies)
if n_replies == 1:
status = int(bytes_to_int(command.replies[0].data))
else:
self.status = self.flags.UNKNOWN
raise PositionerError(f'GET_STATUS received {n_replies} '
'replies.')
if not self.is_bootloader():
self.flags = self.get_positioner_flags()
else:
self.flags = maskbits.BootloaderStatus
self.status = self.flags(status)
# Checks if the positioner is collided. If so, locks the FPS.
if not self.is_bootloader() and self.collision and not self.fps.locked:
await self.fps.lock()
raise PositionerError('collision detected. Locking the FPS.')
return True
[docs] async def wait_for_status(self, status, delay=1, timeout=None):
"""Polls the status until it reaches a certain value.
Parameters
----------
status : `~jaeger.maskbits.PositionerStatus`
The status to wait for. Can be a list in which case it will wait
until all the statuses in the list have been reached.
delay : float
Time, in seconds, to wait between position updates.
timeout : float
How many seconds to wait for the status to reach the desired value
before aborting.
Returns
-------
result : `bool`
Returns `True` if the status has been reached or `False` if the
timeout limit was reached.
"""
if self.is_bootloader():
raise JaegerError('wait_for_status cannot be scheduled '
'in bootloader mode.')
if not isinstance(status, (list, tuple)):
status = [status]
async def status_waiter(wait_for_status):
while True:
await self.update_status()
# Check all statuses in the list
all_reached = True
for ss in wait_for_status:
if ss not in self.status:
all_reached = False
break
if all_reached:
return
await asyncio.sleep(delay)
wait_for_status = [self.flags(ss) for ss in status]
try:
await asyncio.wait_for(status_waiter(wait_for_status), timeout)
except asyncio.TimeoutError:
return False
return True
[docs] async def initialise(self):
"""Initialises the position watcher."""
# Resets all.
self.reset()
await self.update_firmware_version()
await self.update_status()
# Exits if we are in bootloader mode.
if self.is_bootloader():
return True
if not self.initialised:
raise PositionerError('failed inisialising.')
# Sets the default speed
await self.set_speed(alpha=config['positioner']['motor_speed'],
beta=config['positioner']['motor_speed'])
self._log('initialisation complete.')
return True
[docs] async def update_firmware_version(self):
"""Updates the firmware version."""
command = await self.send_command(CommandID.GET_FIRMWARE_VERSION,
error='failed retrieving '
'firmware version.')
self.firmware = command.get_firmware()
self.flags = self.get_positioner_flags()
self._log(f'firmware {self.firmware}')
return True
[docs] def get_positioner_flags(self):
"""Returns the correct position maskbits from the firmware version."""
assert self.firmware, 'Firmware is not set.'
if self.is_bootloader():
return maskbits.BootloaderStatus
if StrictVersion(self.firmware) < StrictVersion('04.01.00'):
return maskbits.PositionerStatusV4_0
else:
return maskbits.PositionerStatus
[docs] def is_bootloader(self):
"""Returns True if we are in bootloader mode."""
if self.firmware is None:
return None
return self.firmware.split('.')[1] == '80'
[docs] async def set_position(self, alpha, beta):
"""Sets the internal position of the motors."""
set_position_command = await self.send_command(
CommandID.SET_ACTUAL_POSITION,
alpha=float(alpha),
beta=float(beta),
error='failed setting position.')
self._log(f'position set to ({alpha:.2f}, {beta:.2f})')
return set_position_command
[docs] async def set_speed(self, alpha, beta, force=False):
"""Sets motor speeds.
Parameters
----------
alpha : float
The speed of the alpha arm, in RPM on the input.
beta : float
The speed of the beta arm, in RPM on the input.
force : bool
Allows to set speed limits outside the normal range.
"""
MIN_SPEED = 0
MAX_SPEED = 5000
if (alpha < MIN_SPEED or alpha > MAX_SPEED or
beta < MIN_SPEED or beta > MAX_SPEED) and not force:
raise PositionerError('speed out of limits.')
speed_command = await self.send_command(CommandID.SET_SPEED,
alpha=float(alpha),
beta=float(beta),
error='failed setting speed.')
self.speed = (alpha, beta)
self._log(f'speed set to ({alpha:.2f}, {beta:.2f})')
return speed_command
def _can_move(self):
"""Returns `True` if the positioner can be moved."""
if self.moving or self.collision or not self.initialised:
return False
if self.flags == maskbits.PositionerStatusV4_0:
return True
else:
PS = maskbits.PositionerStatus
invalid_bits = [PS.COLLISION_DETECT_ALPHA_DISABLE,
PS.COLLISION_DETECT_BETA_DISABLE]
for b in invalid_bits:
if b in self.status:
self._log(f'canot move; found status bit {b.name}.',
logging.ERROR)
return False
if (PS.CLOSED_LOOP_ALPHA not in self.status or
PS.CLOSED_LOOP_BETA not in self.status):
self._log('canot move; positioner not in closed loop mode.',
logging.ERROR)
return False
return True
async def _goto_position(self, alpha, beta, relative=False):
"""Go to a position."""
if relative:
command_id = CommandID.GO_TO_RELATIVE_POSITION
else:
command_id = CommandID.GO_TO_ABSOLUTE_POSITION
return await self.send_command(command_id,
alpha=float(alpha),
beta=float(beta),
error='failed going to position.')
[docs] async def goto(self, alpha, beta, speed=None, relative=False, force=False):
"""Moves positioner to a given position.
Parameters
----------
alpha : float
The position where to move the alpha arm, in degrees.
beta : float
The position where to move the beta arm, in degrees.
speed : tuple
The speed of the ``(alpha, beta)`` arms, in RPM on the input.
relative : bool
Whether the movement is absolute or relative to the current
position.
force : bool
Allows to set position and speed limits outside the normal range.
Returns
-------
result : `bool`
`True` if both arms have reached the desired position, `False` if
a problem was found.
Examples
--------
::
# Move alpha and beta at the currently set speed
>>> await goto(alpha=100, beta=10)
# Set the speed of the alpha arm
>>> await goto(speed=(1000, 500))
"""
if self.moving:
raise PositionerError('positioner is already moving.')
if force is False and not self._can_move():
raise PositionerError('positioner is not in a movable state.')
ALPHA_MAX = 360
BETA_MAX = 360
ALPHA_MIN = -ALPHA_MAX if relative else 0
BETA_MIN = -BETA_MAX if relative else 0
if (alpha < ALPHA_MIN or alpha > ALPHA_MAX or
beta < BETA_MIN or beta > BETA_MAX) and not force:
raise PositionerError('position out of limits.')
if not self.initialised:
raise PositionerError('not initialised.')
original_speed = self.speed
try: # Wrap in try-except to restore speed if something fails.
# Set the speed
if speed and all(speed):
await self.set_speed(*speed, force=force)
self._log(f'goto {"relative" if relative else "absolute"} '
f'position ({alpha:.3f}, {beta:.3f}) degrees.')
goto_command = await self._goto_position(alpha, beta,
relative=relative)
# Sleeps for the time the firmware believes it's going to take
# to get to the desired position.
alpha_time, beta_time = goto_command.get_move_time()
# Update status as soon as we start moving. This clears any
# possible DISPLACEMENT_COMPLETED.
await asyncio.sleep(0.1)
await self.update_status()
if not self.moving:
if not relative:
goto_position = (alpha, beta)
else:
goto_position = (alpha + self.position[0],
beta + self.position[1])
try:
numpy.testing.assert_allclose(self.position,
goto_position,
atol=0.001)
self._log('position reached (did not move).', logging.INFO)
return True
except AssertionError:
raise PositionerError('positioner is not '
'moving when it should.')
self.move_time = max([alpha_time, beta_time])
self._log(f'the move will take {self.move_time:.2f} seconds',
logging.INFO)
await asyncio.sleep(self.move_time)
# Blocks until we're sure both arms at at the position.
result = await self.wait_for_status(
self.flags.DISPLACEMENT_COMPLETED, delay=0.1, timeout=3)
if result is False:
raise PositionerError('failed to reach commanded position.')
self._log('position reached.', logging.INFO)
except BaseException:
raise
finally:
if self.speed != original_speed:
await self.set_speed(*original_speed, force=force)
return True
[docs] async def home(self):
"""Homes the positioner.
Zeroes the positioner by counter-clockwise rotating alpha and beta
until they hit the hardstops. Blocks until the move is complete.
"""
if self.moving:
raise PositionerError('positioner is already moving.')
if not self.fps:
raise PositionerError('the positioner is not '
'linked to a FPS instance.')
await self.send_command('GO_TO_DATUMS', error='failed while sending '
'GO_TO_DATUMS command.')
self._log('waiting to home.')
await self.wait_for_status(self.flags.DISPLACEMENT_COMPLETED)
self._log('homed.', logging.INFO)
[docs] async def set_loop(self, motor='both', loop='closed', collisions=True):
"""Sets the control loop for a motor.
These parameters are cleared after a restart. The motors revert to
closed loop with collision detection.
Parameters
----------
motor : str
The motor to which these changes apply, either ``'alpha`'``,
``'beta'``, or ``'both'``.
loop : str
The type of control loop, either ``'open'`` or ``'closed'``.
collisions : bool
Whether the firmware should automatically detect collisions and
stop the positioner.
"""
if motor == 'both':
motors = ['alpha', 'beta']
else:
motors = [motor]
for motor in motors:
command_name = motor.upper() + '_' + loop.upper() + '_LOOP'
if collisions:
command_name += '_COLLISION_DETECTION'
else:
command_name += '_WITHOUT_COLLISION_DETECTION'
await self.send_command(command_name,
error=f'failed setting loop for {motor}.')
self._log(f'set motor={motor!r}, loop={loop!r}, '
f'detect_collision={collisions}')
return True
def __repr__(self):
return (f'<Positioner (id={self.positioner_id}, '
f'status={self.status!s}, initialised={self.initialised})>')