Source code for jaeger.fps

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2018-09-06
# @Filename: fps.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

import asyncio
import os
import pathlib
import warnings

import numpy

from drift import Drift, DriftError

from jaeger import config, log, start_file_loggers
from jaeger.can import JaegerCAN
from jaeger.commands import Command, CommandID, send_trajectory
from jaeger.exceptions import (FPSLockedError, JaegerError,
                               JaegerUserWarning, TrajectoryError)
from jaeger.positioner import Positioner
from jaeger.utils import Poller, PollerList, bytes_to_int


# try:
#     from sdssdb.peewee.sdss5db import targetdb
# except ImportError:
#     targetdb = False


__ALL__ = ['BaseFPS', 'FPS', 'IEB']


[docs]class IEB(Drift): """Thing wrapper around a :class:`~drift.drift.Drift` class. Allows additional features such as disabling the interface. """ def __init__(self, *args, **kwargs): self.disabled = False super().__init__(*args, **kwargs) async def __aenter__(self): if self.disabled: raise DriftError('IEB is disabled.') try: await Drift.__aenter__(self) except DriftError: self.disabled = True warnings.warn('Failed connecting to the IEB. Disabling it.', JaegerUserWarning) async def __aexit__(self, *args): await Drift.__aexit__(self, *args)
[docs]class BaseFPS(dict): """A class describing the Focal Plane System. This class includes methods to read the layout and construct positioner objects and can be used by the real `FPS` class or the `~jaeger.testing.VirtualFPS`. Parameters ---------- layout : str The layout describing the position of the robots on the focal plane. If `None`, the default layout will be used. Can be either a layout name to be recovered from the database, or a file path to the layout configuration. positioner_class : class The class to be used to create a new positioner. In principle this will be `.Positioner` but it may be different if the positioners are created for a `~jaeger.testing.VirtualFPS`. """ def __init__(self, layout=None, positioner_class=Positioner): self._class_name = self.__class__.__name__ self.layout = layout or config['fps']['default_layout'] dict.__init__(self, {}) self._positioner_class = positioner_class # Loads the positioners from the layout self._load_layout(self.layout) @property def positioners(self): """Dictionary of positioner associated with this FPS. This is just a wrapper around the `.BaseFPS` instance which is a dictionary itself. May be deprecated in the future. """ return self def _load_layout(self, layout): """Loads positioner information from a layout file or DB. Parameters ---------- layout : `str` or `pathlib.Path` Either the path to a layout file or a string with the layout name to be retrieved from the database. If ``layout=None``, retrieves the default layout as defined in the config from the DB. If no DB is present and no layout file is provided, loads an empty layout to which connected positioners will be added but without spatial information. """ if isinstance(layout, pathlib.Path) or os.path.exists(layout): log.info(f'{self._class_name}: reading layout from file {layout!s}.') data = numpy.loadtxt(layout, dtype=[('id', int), ('row', int), ('pos', int), ('x', float), ('y', float), ('type', 'U10')]) for row in data: if row['type'].lower() == 'fiducial': continue self.add_positioner(row['id'], centre=(row['x'], row['y'])) n_pos = len(self.positioners) # elif targetdb and targetdb.database.connected: # log.info(f'{self._class_name}: reading profile {layout!r} from database.') # if not targetdb.database.connected: # targetdb.database.connect() # assert targetdb.database.connected, \ # f'{self._class_name}: database is not connected.' # positioners_db = targetdb.Actuator.select().join( # targetdb.FPSLayout).switch(targetdb.Actuator).join( # targetdb.ActuatorType).filter( # targetdb.FPSLayout.label == layout, # targetdb.ActuatorType.label == 'Robot') # for pos in positioners_db: # self.add_positioner(pos.id, centre=(pos.xcen, pos.ycen)) # n_pos = positioners_db.count() else: n_pos = 0 warnings.warn(f'cannot retrieve layout {layout!r} from the database. ' 'targetdb may be down. Loading an empty FPS.', JaegerUserWarning) log.debug(f'{self._class_name}: loaded positions for {n_pos} positioners.')
[docs] def add_positioner(self, positioner_id, centre=(None, None)): """Adds a new positioner to the list, and checks for duplicates.""" if positioner_id in self.positioners: raise JaegerError(f'{self._class_name}: there is already a ' f'positioner in the list with positioner_id ' f'{positioner_id}.') self.positioners[positioner_id] = self._positioner_class(positioner_id, self, centre=centre)
[docs] def report_status(self): """Returns a dict with the position and status of each positioner.""" status = {} for positioner in self.positioners.values(): pos_status = positioner.status pos_firmware = positioner.firmware pos_alpha = positioner.alpha pos_beta = positioner.beta status[positioner.positioner_id] = {'position': [pos_alpha, pos_beta], 'status': pos_status, 'firmware': pos_firmware} try: status['devices'] = self.can.device_status except AttributeError: pass return status
[docs]class FPS(BaseFPS): """A class describing the Focal Plane System. Parameters ---------- can : `.JaegerCAN` instance The CAN bus to use. layout : str The layout describing the position of the robots on the focal plane. If `None`, the default layout will be used. Can be either a layout name to be recovered from the database, or a file path to the layout configuration. can_profile : str or None The configuration profile for the CAN interface, or `None` to use the default one. Ignored if ``can`` is passed. ieb : bool or .IEB instance If `True`, connects the Instrument Electronics Box PLC controller. loop : event loop or `None` The asyncio event loop. If `None`, uses `asyncio.get_event_loop` to get a valid loop. engineering_mode : bool If `True`, disables most safety checks to enable debugging. This may result in hardware damage so it must not be used lightly. Examples -------- After instantiating a new `.FPS` object it is necessary to call `~.FPS.initialise` to retrieve the positioner layout and the status of the connected positioners. Note that `~.FPS.initialise` is a coroutine which needs to be awaited :: >>> fps = FPS(can_profile='default') >>> await fps.initialise() >>> fps.positioners[4].status <Positioner (id=4, status='SYSTEM_INITIALIZED| DISPLACEMENT_COMPLETED|ALPHA_DISPLACEMENT_COMPLETED| BETA_DISPLACEMENT_COMPLETED')> """ def __init__(self, can=None, layout=None, can_profile=None, ieb=None, loop=None, engineering_mode=False): # Start file logger start_file_loggers(start_log=True, start_can=False) if config.__CONFIG_FILE__: log.info(f'Using configuration from {config.__CONFIG_FILE__}') else: log.warning('Cannot find SDSSCORE or user configuration. ' 'Using default values.') self.engineering_mode = engineering_mode if engineering_mode: warnings.warn('Engineering mode enable. ' 'Please don\'t break anything.', JaegerUserWarning) self.loop = loop or asyncio.get_event_loop() self.loop.set_exception_handler(log.asyncio_exception_handler) #: dict: The mapping between positioners and buses. self.positioner_to_bus = {} if isinstance(can, JaegerCAN): #: The `.JaegerCAN` instance that serves as a CAN bus interface. self.can = can else: try: self.can = JaegerCAN.from_profile(can_profile, fps=self, loop=loop) except ConnectionRefusedError: raise self._locked = False #: .IEB: Connection to the instrument electronics box over Modbus. self.ieb = None if ieb is None or ieb is True: ieb = config['fps']['ieb'] if isinstance(ieb, IEB): self.ieb = ieb elif isinstance(ieb, (str, dict)): if isinstance(ieb, str): ieb = os.path.expanduser(os.path.expandvars(ieb)) try: self.ieb = IEB.from_config(ieb) except FileNotFoundError: warnings.warn(f'IEB configuration file {ieb} not found.', JaegerUserWarning) elif ieb is False: self.ieb = False else: raise ValueError(f'Invalid input value for ieb {ieb!r}.') super().__init__(layout=layout) #: Position and status pollers self.pollers = PollerList([ Poller('status', self.update_status, delay=config['fps']['status_poller_delay'], loop=self.loop), Poller('position', self.update_position, delay=config['fps']['position_poller_delay'], loop=self.loop) ]) async def _get_positioner_bus_map(self): """Creates the positioner-to-bus map. Only relevant if the bus interface is multichannel/multibus. """ if len(self.can.interfaces) == 1 and not self.can.multibus: self._is_multibus = False return self._is_multibus = True id_cmd = self.send_command(CommandID.GET_ID, timeout=config['fps']['initialise_timeouts']) await id_cmd # Parse the replies for reply in id_cmd.replies: self.positioner_to_bus[reply.positioner_id] = (reply.message.interface, reply.message.bus)
[docs] def send_command(self, command, positioner_id=0, data=[], interface=None, bus=None, broadcast=False, silent_on_conflict=False, override=False, safe=False, synchronous=False, **kwargs): """Sends a command to the bus. Parameters ---------- command : str, int, .CommandID, or .Command The ID of the command, either as the integer value, a string, or the `.CommandID` flag. Alternatively, the `.Command` to send. positioner_id : int The positioner ID to command, or zero for broadcast. data : bytearray The bytes to send. interface : int The index in the interface list for the interface to use. Only relevant in case of a multibus interface. If `None`, the positioner to bus map will be used. bus : int The bus within the interface to be used. Only relevant in case of a multibus interface. If `None`, the positioner to bus map will be used. broadcast : bool If `True`, sends the command to all the buses. silent_on_conflict : bool If set, does not issue a warning if at the time of queuing this command there is already a command for the same positioner id running. This is useful for example for poller when we change the delay and the previous command is still running. In those cases this option avoids annoying messages. override : bool If another instance of this command_id with the same positioner_id is running, cancels it and schedules this one immediately. Otherwise the command is queued until the first one finishes. safe : bool Whether the command is safe to send to a locked `.FPS`. synchronous : bool If `True`, the command is sent to the CAN network immediately, skipping the command queue. No tracking is done for this command. It should only be used for shutdown commands. kwargs : dict Extra arguments to be passed to the command. Returns ------- command : `.Command` The command sent to the bus. The command needs to be awaited before it is considered done. """ if positioner_id == 0: broadcast = True if not isinstance(command, Command): command_flag = CommandID(command) CommandClass = command_flag.get_command_class() command = CommandClass(positioner_id=positioner_id, loop=self.loop, data=data, **kwargs) command_name = command.name command_uid = command.command_uid header = f'({command_name}, {positioner_id}, {command_uid}): ' if not self.engineering_mode and self.locked: if command.safe or safe: log.debug(f'FPS is locked but {command_name} is safe.') else: command.cancel(silent=True) raise FPSLockedError('Solve the problem and unlock the FPS ' 'before sending commands.') elif not self.engineering_mode and command.move_command and self.moving: command.cancel(silent=True) raise JaegerError('Cannot send move command while the ' 'FPS is moving. Use FPS.stop_trajectory() ' 'to stop the FPS.') if command.status.is_done: raise JaegerError(header + 'trying to send a done command.') command._override = override command._silent_on_conflict = silent_on_conflict # By default a command will be sent to all interfaces and buses. # Normally we want to set the interface and bus to which the command # will be sent. if not broadcast: self.set_interface(command, bus=bus, interface=interface) if command.status == command.status.FAILED: return command if not synchronous: self.can.command_queue.put_nowait(command) log.debug(header + 'added command to CAN processing queue.') else: self.can._send_messages(command) log.debug(header + 'sent command to CAN synchronously.') return command
[docs] def set_interface(self, command, interface=None, bus=None): """Sets the interface and bus to which to send a command.""" # Don't do anything if the interface is not multibus if not self._is_multibus or command.positioner_id == 0: return if bus or interface: command._interface = interface command._bus = bus return if command.positioner_id not in self.positioner_to_bus: raise JaegerError(f'Positioner {command.positioner_id} ' 'has no assigned bus.') command.finish_command(command.status.FAILED) return interface, bus = self.positioner_to_bus[command.positioner_id] command._interface = interface command._bus = bus return
@property def locked(self): """Returns `True` if the `.FPS` is locked.""" return self._locked
[docs] async def lock(self, stop_trajectories=True): """Locks the `.FPS` and prevents commands to be sent. Parameters ---------- stop_trajectories : bool Whether to stop trajectories when locking. """ warnings.warn('Locking FPS.', JaegerUserWarning) self._locked = True if stop_trajectories: await self.stop_trajectory()
[docs] async def unlock(self, force=False): """Unlocks the `.FPS` if all collisions have been resolved.""" await self.update_status(timeout=0.1) for positioner in self.positioners.values(): if positioner.collision and not self.engineering_mode: self._locked = True raise JaegerError('Cannot unlock the FPS until all ' 'the collisions have been cleared.') self._locked = False return True
@property def moving(self): """Returns `True` if any of the positioners is moving.""" return any([pos.moving for pos in self.values() if pos.status != pos.flags.UNKNOWN])
[docs] async def initialise(self, allow_unknown=True, start_pollers=True): """Initialises all positioners with status and firmware version. Parameters ---------- allow_unknown : bool If `True`, allows to add positioners that are connected but not in the layout. """ unknwon_positioners = [] # Test IEB connection. This will issue a warning and set # self.ieb.disabled=True if the connection fails. if self.ieb: async with self.ieb: pass # Get the positioner-to-bus map await self._get_positioner_bus_map() # Resets all positioners for positioner in self.positioners.values(): await positioner.reset() # Stop poller in case they are running await self.pollers.stop() if len(self.positioners) > 0: n_expected_positioners = len(self.positioners) else: n_expected_positioners = None get_firmware_command = self.send_command( CommandID.GET_FIRMWARE_VERSION, positioner_id=0, timeout=config['fps']['initialise_timeouts'], n_positioners=n_expected_positioners) await get_firmware_command if get_firmware_command.status.failed: if not self.engineering_mode: raise JaegerError('Failed retrieving firmware version. ' 'Cannot initialise FPS.') else: warnings.warn('Failed retrieving firmware version. ' 'Continuing because engineering mode.', JaegerUserWarning) # Loops over each reply and set the positioner status to OK. If the # positioner was not in the list, adds it. Checks how many positioner # did not reply. for reply in get_firmware_command.replies: positioner_id = reply.positioner_id if positioner_id not in self.positioners: if allow_unknown: unknwon_positioners.append(positioner_id) self.add_positioner(positioner_id) else: raise JaegerError('Found positioner with ' f'ID={positioner_id} ' 'that is not in the layout.') positioner = self.positioners[positioner_id] positioner.firmware = get_firmware_command.get_firmware(positioner_id) if len(set([pos.firmware for pos in self.values()])) > 1: warnings.warn('Positioners with different firmware ' 'versions found.', JaegerUserWarning) # Stop positioners that are not in bootloader mode. await self.stop_trajectory() await self.update_status(timeout=config['fps']['initialise_timeouts']) if len(unknwon_positioners) > 0: warnings.warn(f'Found {len(unknwon_positioners)} unknown positioners ' f'with IDs {sorted(unknwon_positioners)!r}. ' 'They have been added to the layout.', JaegerUserWarning) n_did_not_reply = len([pos for pos in self.positioners if self[pos].status == self[pos].flags.UNKNOWN]) if n_did_not_reply > 0: warnings.warn(f'{n_did_not_reply} positioners did not respond to ' f'{CommandID.GET_STATUS.name!r}', JaegerUserWarning) n_non_initialised = len([pos for pos in self.positioners if (self[pos].status != self[pos].flags.UNKNOWN and not self[pos].initialised)]) if n_non_initialised > 0: warnings.warn(f'{n_non_initialised} positioners responded but ' 'have not been initialised.', JaegerUserWarning) if self.locked: log.info('FPS is locked. Trying to unlock it.') if not await self.unlock(): raise JaegerError('FPS cannot be unlocked. ' 'Initialisation failed.') else: log.info('FPS unlocked successfully.') # This may not be techincally necessary but it's just a few messages ... initialise_cmds = [positioner.initialise() for positioner in self.positioners.values() if positioner.status != positioner.flags.UNKNOWN] results = await asyncio.gather(*initialise_cmds) if False in results: if self.engineering_mode: warnings.warn('Some positioners failed to initialise. ' 'Continuing because engineering mode ...', JaegerUserWarning) else: raise JaegerError('Some positioners failed to initialise.') await self.update_position() # Start the pollers if start_pollers: self.pollers.start() return self
[docs] async def update_status(self, positioner_ids=None, timeout=1): """Update statuses for all positioners. Parameters ---------- positioner_ids : list The list of positioners to update. If `None`, update all positioners. ``positioner_ids=False`` ignores currently connected positioners and times out to receive all possible replies. timeout : float How long to wait before timing out the command. """ assert not positioner_ids or isinstance(positioner_ids, (list, tuple)) if positioner_ids: n_positioners = len(positioner_ids) else: # This is the max number that should reply. n_positioners = len(self) if len(self) > 0 else None await self.update_firmware_version(timeout=timeout) command = self.send_command(CommandID.GET_STATUS, positioner_id=0, n_positioners=n_positioners, timeout=timeout, override=True, silent_on_conflict=True) await command if command.status.failed: log.warning(f'Failed broadcasting {CommandID.GET_STATUS.name!r} ' 'during update status.') return False update_status_coros = [] for reply in command.replies: pid = reply.positioner_id if pid not in self.positioners or (positioner_ids and pid not in positioner_ids): continue positioner = self.positioners[pid] status_int = int(bytes_to_int(reply.data)) update_status_coros.append(positioner.update_status(status_int)) await asyncio.gather(*update_status_coros) return True
[docs] async def update_position(self, positioner_ids=None, timeout=1): """Updates positions. Parameters ---------- positioner_ids : list The list of positioners to update. If `None`, update all positioners. timeout : float How long to wait before timing out the command. """ assert not positioner_ids or isinstance(positioner_ids, (list, tuple)) if not positioner_ids: positioner_ids = [pid for pid in self.positioners if self[pid].initialised and not self[pid].is_bootloader()] if not positioner_ids: return True commands_all = self.send_to_all(CommandID.GET_ACTUAL_POSITION, positioners=positioner_ids, timeout=timeout) commands = await commands_all update_position_commands = [] for command in commands: pid = command.positioner_id if (not isinstance(command, Command) or (command.status.failed and self[pid].initialised)): log.warning(f'({CommandID.GET_ACTUAL_POSITION.name}, ' f'{command.positioner_id}): ' 'failed during update position.') continue try: position = command.get_positions() update_position_commands.append(self[pid].update_position(position)) except ValueError as ee: raise JaegerError('Failed updating position for ' f'positioner {pid}: {ee}') await asyncio.gather(*update_position_commands) return True
[docs] async def update_firmware_version(self, positioner_ids=None, timeout=2): """Updates the firmware version of connected positioners. Parameters ---------- positioner_ids : list The list of positioners to update. If `None`, update all positioners. ``positioner_ids=False`` ignores currently connected positioners and times out to receive all possible replies. timeout : float How long to wait before timing out the command. """ assert not positioner_ids or isinstance(positioner_ids, (list, tuple)) if positioner_ids: n_positioners = len(positioner_ids) else: n_positioners = len(self) if len(self) > 0 else None get_firmware_command = self.send_command(CommandID.GET_FIRMWARE_VERSION, positioner_id=0, timeout=timeout, n_positioners=n_positioners) await get_firmware_command if get_firmware_command.status.failed: raise JaegerError('Failed retrieving firmware version.') for reply in get_firmware_command.replies: pid = reply.positioner_id if pid not in self.positioners or (positioner_ids and pid not in positioner_ids): continue positioner = self.positioners[pid] positioner.firmware = get_firmware_command.get_firmware(pid) return True
[docs] async def stop_trajectory(self, positioners=None, clear_flags=True, timeout=0): """Stops all the positioners. Parameters ---------- positioners : list The list of positioners to abort. If `None`, abort all positioners. clear_flags : bool If `True`, in addition to sending ``TRAJECTORY_TRANSMISSION_ABORT`` sends ``STOP_TRAJECTORY`` which clears all the collision and warning flags. timeout : float How long to wait before timing out the command. By default, just sends the command and does not wait for replies. """ if positioners is None: positioners = [positioner_id for positioner_id in self.keys() if not self[positioner_id].is_bootloader()] if positioners == []: return await self.send_to_all('TRAJECTORY_TRANSMISSION_ABORT', positioners=positioners) if clear_flags: await self.send_command('STOP_TRAJECTORY', positioner_id=0, timeout=timeout)
[docs] async def send_trajectory(self, *args, **kwargs): """Sends a set of trajectories to the positioners. See the documentation for `.send_trajectory`. """ try: return await send_trajectory(self, *args, **kwargs) except TrajectoryError as ee: raise JaegerError(f'Sending trajectory failed with error: {ee}')
[docs] def abort(self): """Aborts trajectories and stops positioners.""" cmd = self.send_command(CommandID.STOP_TRAJECTORY, positioner_id=0) return asyncio.create_task(cmd)
[docs] async def send_to_all(self, command, positioners=None, data=None, **kwargs): """Sends a command to multiple positioners and awaits completion. Parameters ---------- command : str, int, .CommandID or .Command The ID of the command, either as the integer value, a string, or the `.CommandID` flag. Alternatively, the `.Command` to send. positioners : list The list of ``positioner_id`` of the positioners to command. If `None`, sends the command to all the positioners in the FPS. data : list The payload to send. If `None`, no payload is sent. If the value is a list with a single value, the same payload is sent to all the positioners. Otherwise the list length must match the number of positioners. kwargs : dict Keyword argument to pass to the command. Returns ------- commands : `list` A list with the command instances executed. """ positioners = positioners or list(self.positioners.keys()) if data is None or len(data) == 1: commands = [self.send_command(command, positioner_id=positioner_id, **kwargs) for positioner_id in positioners] else: commands = [self.send_command(command, positioner_id=positioner_id, data=data[ii], **kwargs) for ii, positioner_id in enumerate(positioners)] results = await asyncio.gather(*commands, return_exceptions=True) if any([isinstance(rr, FPSLockedError) for rr in results]): raise FPSLockedError('One or more of the commands failed ' 'because the FPS is locked.') return commands
[docs] async def shutdown(self): """Stops pollers and shuts down all remaining tasks.""" bootloader = all([positioner.is_bootloader() is True for positioner in self.values()]) if not bootloader: log.info('Stopping positioners') await self.stop_trajectory() log.info('Stopping all pollers.') await self.pollers.stop() await asyncio.sleep(1) log.info('Cancelling all pending tasks and shutting down.') tasks = [task for task in asyncio.all_tasks(loop=self.loop) if task is not asyncio.current_task(loop=self.loop)] list(map(lambda task: task.cancel(), tasks)) await asyncio.gather(*tasks, return_exceptions=True) self.loop.stop()
async def __aenter__(self): await self.initialise() return self async def __aexit__(self, *excinfo): await self.shutdown()