Source code for jaeger.fvc

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2021-11-01
# @Filename: fvc.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import logging
import os
import pathlib
import warnings
from functools import partial

from typing import TYPE_CHECKING, Any, Mapping, Optional

import numpy
import polars
from astropy.io import fits
from astropy.table import Table

from clu.command import Command
from clu.legacy.tron import TronConnection
from coordio import transforms
from coordio.defaults import calibration

from jaeger import config, log
from jaeger.exceptions import FVCError, JaegerUserWarning, TrajectoryError
from jaeger.fps import FPS
from jaeger.ieb import IEB
from jaeger.kaiju import get_path_pair_in_executor, get_robot_grid
from jaeger.plotting import plot_fvc_distances
from jaeger.target import Configuration, Design, read_confSummary, wok_to_positioner
from jaeger.target.tools import get_wok_data
from jaeger.utils import run_in_executor


if TYPE_CHECKING:
    import pandas

    from coordio.transforms import FVCTransformAPO, FVCTransformLCO

    from jaeger.actor import JaegerActor
    from jaeger.target.assignment import NewPositionsType
    from jaeger.target.configuration import BaseConfiguration


__all__ = ["FVC"]


FVC_CONFIG = config["fvc"]


# Create a coroutine out of the original plotFVCResults.
async def plotFVCResultsCo(*args, **kwargs):
    transforms.plotFVCResults(*args, **kwargs)


def plotFVCResultsMP(loop: asyncio.AbstractEventLoop, *args, **kwargs):
    loop.create_task(plotFVCResultsCo(*args, **kwargs))  # type: ignore


def get_transform(observatory: str):
    """Returns the correct coordio FVC transform class for the observatory."""

    if observatory.upper() == "APO":
        from coordio.transforms import FVCTransformAPO

        return FVCTransformAPO

    elif observatory.upper() == "LCO":
        from coordio.transforms import FVCTransformLCO

        return FVCTransformLCO

    else:
        raise ValueError(f"Invalid observatory {observatory}.")


[docs] class FVC: """Focal View Camera class.""" configuration: Optional[BaseConfiguration] fibre_data: Optional[polars.DataFrame] measurements: Optional[polars.DataFrame] centroids: Optional[polars.DataFrame] offsets: Optional[polars.DataFrame] fvc_transform: Optional[FVCTransformAPO | FVCTransformLCO] image_path: Optional[str] proc_image_path: Optional[str] raw_hdu: Optional[fits.ImageHDU] proc_hdu: Optional[fits.ImageHDU] centroid_method: str | None fitrms: float k: float ieb_data: dict[str, Any] = {} def __init__(self, site: str, command: Optional[Command[JaegerActor]] = None): if len(calibration.positionerTable) == 0: warnings.warn( "FPS calibrations not loaded or the array is empty.", JaegerUserWarning, ) self.site = site self.correction_applied: bool = False self.command = command self.fps = FPS.get_instance() # To be updated manually by the actor command. self.iteration: int = 1 self.reset()
[docs] def reset(self): """Resets the instance.""" self.configuration = None self.fvc_transform: FVCTransformAPO | FVCTransformLCO | None = None self.fibre_data = None self.centroids = None self.offsets = None self.image_path = None self.proc_image_path = None self.raw_hdu = None self.proc_hdu = None self.k = 1 self.fitrms = -9.99 self.perc_90 = -9.99 self.fvc_percent_reached = -9.99 self.centroid_method = config["fvc"]["centroid_method"]
[docs] def set_command(self, command: Command[JaegerActor]): """Sets the command.""" if command.status.is_done: raise FVCError("Command is done.") self.command = command
[docs] def log( self, msg: str, level: logging._Level = logging.INFO, to_log: bool = True, to_command: bool = True, broadcast: bool = False, ): """Logs a message, including to the command if present.""" level = int(level) msg = "[FVC]: " + msg if log and to_log: log.log(level, msg) if self.command and to_command: if level == logging.DEBUG: self.command.debug(msg, broadcast=broadcast) elif level == logging.INFO: self.command.info(msg, broadcast=broadcast) elif level == logging.WARNING: self.command.warning(msg, broadcast=broadcast) elif level == logging.ERROR: self.command.error(msg, broadcast=broadcast)
[docs] async def expose( self, exposure_time: float = 5.0, stack: int = 1, use_tron_fallback=True, ) -> pathlib.Path: # pragma: no cover """Takes an exposure with the FVC and blocks until the exposure is complete. Returns the path to the new image. If ``use_tron_fallback=True`` and the command has not been set, creates a Tron client to command the FVC. """ if self.command is None: if use_tron_fallback is False: raise FVCError("Command must be set.") else: if self.command.status.is_done: raise FVCError("Command is done.") self.log( f"Taking {stack} x {exposure_time} seconds FVC exposure.", to_command=False, ) tron = None if stack <= 1: cmd_str = f"talk -c fvc expose {exposure_time}" else: cmd_str = f"talk -c fvc expose --stack {stack} {exposure_time}" if self.command: expose_command = self.command.send_command("fliswarm", cmd_str) else: tron = TronConnection( "jaeger.jaeger", config["actor"]["tron_host"], config["actor"]["tron_port"], ) await tron.start() expose_command = tron.send_command("fliswarm", cmd_str) assert isinstance(expose_command, Command) await expose_command if tron: tron.stop() if expose_command.status.did_fail: raise FVCError("The FVC exposure failed.") try: filename = expose_command.replies.get("filename")[-1] self.log(f"FVC raw image is {filename}.", to_command=False) return pathlib.Path(filename) except KeyError: raise FVCError("The exposure succeeded but did not output the filename.")
[docs] def process_fvc_image( self, path: pathlib.Path | str, positioner_coords: Mapping[int, tuple[float, float]], configuration: Optional[BaseConfiguration] = None, fibre_data: Optional[polars.DataFrame] = None, fibre_type: str = "Metrology", centroid_method: str | None = None, use_new_invkin: bool = True, polids: list[int] | None = None, plot: bool | str = False, outdir: str | None = None, loop: asyncio.AbstractEventLoop | None = None, ) -> tuple[fits.ImageHDU, polars.DataFrame, polars.DataFrame | None]: """Processes a raw FVC image. Parameters ---------- path The path to the raw FVC image. positioner_coords A dictionary of positioner ID to ``(alpha, beta)`` with the current positions of the robots. configuration A configuration object to use for processing. If `None`, defaults to the current `.FPS` loaded configuration. fibre_data A Polars data frame with the expected coordinates of the targets. It is expected the data frame will have columns ``positioner_id``, ``hole_id``, ``fibre_type``, ``xwok``, and ``ywok``. This frame is appended to the processed image. Normally this parameters is left empty and the fibre table from the configuration loaded into the FPS instace is used. fibre_type The ``fibre_type`` rows in ``fibre_data`` to use. Defaults to ``fibre_type='Metrology'``. centroid_method The centroid method to use, one of ``"nudge"``, ``"sep"``, ``"winpos"``, or ``"simple"``. Defaults to ``"nudge"``. use_new_invkin Use new inverse kinnematic to calculate alpha/beta. polids The list of ZB polynomial orders to use. If `None` defaults to the coordio ``FVCTransform`` orders. plot Whether to save additional debugging plots along with the processed image. If ``plot`` is a string, it will be used as the directory to which to save the plots. loop The running event loop. Used to schedule the plotting of the FVC transform fit as a task. Returns ------- result A tuple with the read raw image HDU (with columns flipped) as the first argument and the expected coordinates, as a data frame, as the second. The data frame is the same as the input target coordinates but with the columns ``xwok_measured`` and ``ywok_measured`` appended. """ # Reset the instance self.reset() centroid_method = centroid_method or self.centroid_method path = str(path) if not os.path.exists(path): raise FVCError(f"FVC image {path} does not exist.") self.configuration = configuration or self.fps.configuration if fibre_data is None: if configuration is None: raise FVCError("No fibre data and no configuration has been loaded.") fibre_data = configuration.fibre_data fdata = fibre_data.clone().sort("positioner_id") # Set fibre_data initially so that if FVCTransform fails we have something. self.fibre_data = fdata.sort(["hole_id", "fibre_type"]) self.log(f"Processing raw image {path}") dirname, base = os.path.split(path) dirname = outdir or dirname proc_path_root = os.path.join(dirname, "proc-" + base[0 : base.find(".fit")]) if plot is True: plot_path_root = proc_path_root elif plot is False or plot is None: plot_path_root = None elif isinstance(plot, str): plot_path_root = os.path.join(plot, "proc-" + base[0 : base.find(".fit")]) else: plot_path_root = "" hdus = fits.open(path) self.image_path = path self.raw_hdu = hdus[1].copy() image_data = hdus[1].data.astype(numpy.float32) header = hdus[1].header # If we are using a dark frame, subtract it now. dark_image: str | bool = config["fvc"].get("dark_image", False) if dark_image: if not os.path.exists(dark_image): self.log( f"Dark frame {dark_image} not found. Skipping dark correction.", level=logging.WARNING, ) else: dark_data = fits.getdata(dark_image).astype(numpy.float32) image_data -= dark_data # Invert columns at APO. if self.fps.observatory == "APO": image_data = image_data[:, ::-1] self.log(f"Max counts in image: {numpy.max(image_data)}", level=logging.INFO) # Get the rotator angle so that we can derotate the centroids to the # x/ywok-aligned configuration. rotpos = hdus[1].header.get("IPA", 135.4) if rotpos is None: raise FVCError("IPA keyword not found in the header.") rotpos = float(rotpos) % 360.0 positioner_df = polars.DataFrame( [(kk, *vv) for kk, vv in positioner_coords.items()], schema={ "positionerID": polars.Int32, "alphaReport": polars.Float64, "betaReport": polars.Float64, }, ).sort("positionerID") FVCTransform = get_transform(self.fps.observatory) self.log(f"Using FVC transform class {FVCTransform.__name__!r}.") if loop: # Monkeypatch plotFVCResults to use a task and do plotting asynchronously. # It's important to override it here, when a loop already exists. transforms.plotFVCResults = partial(plotFVCResultsMP, loop) # ZB polynomial orders to use. polids = config["fvc"].get("polids", None) if polids is None: self.log("Using coordio default ZB polynomial orders.") else: self.log(f"Using ZB polynomial orders: {polids}") fvc_transform = FVCTransform( image_data, positioner_df.to_pandas(), rotpos, polids=polids, plotPathPrefix=plot_path_root, ) self.centroids = polars.from_pandas(fvc_transform.extractCentroids()) fvc_transform.fit(centType=centroid_method, newInvKin=use_new_invkin) self.centroid_method = fvc_transform.centType self.log(f"Centroid method: {self.centroid_method}.") if self.command: self.command.info(fvc_centroid_method=self.centroid_method) assert fvc_transform.positionerTableMeas is not None positionerTableMeas = fvc_transform.positionerTableMeas.copy() measured = polars.from_pandas(positionerTableMeas.drop("index", axis=1)) measured = measured.sort("positionerID") n_dubious = measured["wokErrWarn"].sum() if n_dubious > 0: self.log( f"Found {n_dubious} positioners with dubious centroid matches.", level=logging.WARNING, ) # Create a column to mark positioners with dubious matches. dubious_pid = measured.filter(polars.col.wokErrWarn)["positionerID"] fdata = fdata.with_columns( dubious=polars.when(polars.col.positioner_id.is_in(dubious_pid)) .then(True) .otherwise(False) ) metrology_data = fdata.clone() # Sorted by positioner_id, same as "measured" metrology_data = metrology_data.filter(polars.col.fibre_type == fibre_type) wok_measured = measured[["xWokMeasMetrology", "yWokMeasMetrology"]] fvc_fibre_idx = (fdata["fibre_type"] == fibre_type).arg_true() fdata[fvc_fibre_idx, "xwok_measured"] = wok_measured["xWokMeasMetrology"] fdata[fvc_fibre_idx, "ywok_measured"] = wok_measured["yWokMeasMetrology"] # Only use online, assigned robots for final RMS. First get groups of fibres # with an assigned robot, that are not offline or dubious. if fdata["assigned"].sum() > 0: assigned = fdata.filter( ( polars.col.assigned.any() & polars.col.offline.not_().all() & polars.col.dubious.not_().all() ).over("positioner_id") ) else: self.log("No assigned fibres found. Using all matched fibres.") assigned = fdata.filter( polars.col.offline.not_().all(), polars.col.dubious.not_().all(), ) # Now get the metrology fibre from those groups. assigned_met = assigned.filter(polars.col.fibre_type == fibre_type) # Calculate RMS from assigned fibres. dx = (assigned_met["xwok"] - assigned_met["xwok_measured"]).to_numpy() dy = (assigned_met["ywok"] - assigned_met["ywok_measured"]).to_numpy() self.fitrms = float(numpy.round(numpy.sqrt(numpy.mean(dx**2 + dy**2)), 5)) self.log(f"RMS full fit {self.fitrms * 1000:.3f} um.") # Also calculate 90% percentile and percentage of targets below threshold. distance = numpy.sqrt(dx**2 + dy**2) self.perc_90 = float(numpy.round(numpy.percentile(distance, 90), 4)) n_reached = numpy.sum(distance <= (config["fvc"]["target_distance"] / 1000)) self.fvc_percent_reached = float(numpy.round(n_reached / len(dx) * 100, 1)) # FITSRMS is the RMS of measured - expected for assigned, non-disabled # robots. This is different from FVC_RMS reported by # FVCTransformAPO.getMetadata() that is measured - reported for all # positioners. header["FVCITER"] = (self.iteration, "FVC iteration") header["FITRMS"] = (self.fitrms * 1000, "RMS full fit [um]") header["PERC90"] = (self.perc_90 * 1000, "90% percentile [um]") header["FVCREACH"] = ( self.fvc_percent_reached, "Targets that have reached their goal [%]", ) header["DARKFILE"] = (str(dark_image) or "", "Dark frame image") fdata = fdata.sort(["hole_id", "fibre_type"]) self.fibre_data = fdata self.fvc_transform = fvc_transform self.proc_hdu = fits.CompImageHDU(data=image_data, header=header) self.log(f"Finished processing {path}", level=logging.DEBUG) return (self.proc_hdu, self.fibre_data, self.centroids)
[docs] async def update_ieb_info(self): """Update the IEB data dictionary.""" # Add IEB information ieb_keys = config["fvc"]["ieb_keys"] ieb_data = {key: -999.0 for key in ieb_keys} for key in ieb_keys: device_name = ieb_keys[key] if self.fps and isinstance(self.fps.ieb, IEB): try: device = self.fps.ieb.get_device(device_name) ieb_data[key] = (await device.read())[0] except Exception as err: self.log(f"Failed getting IEB information: {err}", logging.WARNING) break self.ieb_data = ieb_data
[docs] def calculate_offsets( self, reported_positions: numpy.ndarray, fibre_data: Optional[polars.DataFrame] = None, k: Optional[float] = None, max_correction: Optional[float] = None, ) -> polars.DataFrame: """Determines the offset to apply to the currently reported positions. Measured wok positions from the fibre data are converted to positioner coordinates. An alpha/beta offset is calculated with respect to the expected positions. The offsets is then applied to the current positions as self-reported by the positioners. Optionally, the offset can be adjusted using a PID loop. Parameters ---------- reported_positions Reported positions for the positioners as a numpy array. Usually the output of `.FPS.get_positions`. fibre_data The fibre data table. Only the metrology entries are used. Must have the ``xwok_measured`` and ``ywok_measured`` column populated. If `None`, uses the data frame calculated when `.process_fvc_image` last run. k The fraction of the correction to apply. max_correction The maximum offset allowed per robot and arm, in degrees. Corrections larger than ``max_offset`` are clipped. Returns ------- new_positions The new alpha and beta positions as a Polars dataframe indexed by positions ID. If `None`, uses the value ``fvc.k`` from the configuration. """ self.log("Calculating offsets from FVC image and model fit.") site = config["observatory"] self.k = k or config["fvc"]["k"] max_offset: float = max_correction or config["fvc"]["max_correction"] if fibre_data is None and self.fibre_data is None: raise FVCError("No fibre data passed or stored in the instance.") if fibre_data is None: fibre_data = self.fibre_data assert fibre_data is not None fibre_data = fibre_data.clone() met = fibre_data.filter(polars.col.fibre_type == "Metrology") # TODO: deal with missing data invalid = (met["xwok_measured"] == -999.0) | (met["ywok_measured"] == -999.0) if invalid.any(): raise FVCError("Some metrology fibres have not been measured.") wok_data = get_wok_data(self.site) # Calculate alpha/beta from measured wok coordinates. _measured = [] first = True for row in met.iter_rows(named=True): (alpha_measured, beta_measured), _ = wok_to_positioner( row["hole_id"], site, "Metrology", row["xwok_measured"], row["ywok_measured"], wok_data=wok_data, ) if "alpha" in row: alpha_expected = row["alpha"] beta_expected = row["beta"] else: if first: self.log( "Fibre data does not include the expected alpha/beta " "positions. Using reported alpha/beta.", logging.WARNING, ) reported_pid = reported_positions[:, 0] prow = reported_positions[reported_pid == row["positioner_id"]] alpha_expected = prow[0][1] beta_expected = prow[0][2] # If beta >= 180, we would need a left handed configuration. For now we # invalidate these values. if beta_expected >= 180.0: alpha_measured = numpy.nan beta_measured = numpy.nan xwok_distance = row["xwok_measured"] - row["xwok"] ywok_distance = row["ywok_measured"] - row["ywok"] _measured.append( ( row["hole_id"], row["positioner_id"], xwok_distance, ywok_distance, alpha_expected, beta_expected, alpha_measured, beta_measured, ) ) first = False measured = polars.DataFrame( _measured, schema={ "hole_id": polars.String, "positioner_id": polars.Int32, "xwok_distance": polars.Float64, "ywok_distance": polars.Float64, "alpha_expected": polars.Float64, "beta_expected": polars.Float64, "alpha_measured": polars.Float64, "beta_measured": polars.Float64, }, ).sort("positioner_id") # Merge the reported positions. reported = polars.DataFrame( reported_positions.tolist(), schema={ "positioner_id": polars.Int32, "alpha_reported": polars.Float64, "beta_reported": polars.Float64, }, ).sort("positioner_id") offsets = reported.join(measured, on="positioner_id", how="left") # If there are measured alpha/beta that are NaN, replace those with the # previous value. offsets = offsets.with_columns(transformation_valid=True) test_col = "alpha_measured" pos_na = offsets[test_col].is_null() | offsets[test_col].is_nan() if pos_na.sum() > 0: self.log( "Failed to calculate corrected positioner coordinates for " f"{pos_na.sum()} positioners.", level=logging.WARNING, ) # For now set these values to the expected because we'll use them to # calculate the offset (which will be zero for invalid conversions). idx = pos_na.arg_true() expected = offsets[idx, ["alpha_expected", "beta_expected"]] offsets[idx, "alpha_measured"] = expected["alpha_expected"] offsets[idx, "beta_measured"] = expected["beta_expected"] offsets[idx, "transformation_valid"] = False # Calculate offset between expected and measured. alpha_offset = offsets["alpha_expected"] - offsets["alpha_measured"] beta_offset = offsets["beta_expected"] - offsets["beta_measured"] # if alpha measured and alpha reported lie on either side of the # wrap, adjust the offset accordingly wrap1 = (alpha_offset > 360).arg_true() alpha_offset[wrap1] = alpha_offset[wrap1] - 360 wrap2 = (alpha_offset < -360).arg_true() alpha_offset[wrap2] = alpha_offset[wrap2] + 360 # Clip very large offsets and apply a proportional term. alpha_offset_c = numpy.clip(self.k * alpha_offset, -max_offset, max_offset) beta_offset_c = numpy.clip(self.k * beta_offset, -max_offset, max_offset) # Add new columns to DF. offsets = offsets.with_columns( alpha_offset=alpha_offset, beta_offset=beta_offset, alpha_offset_corrected=polars.Series(alpha_offset_c, dtype=polars.Float64), beta_offset_corrected=polars.Series(beta_offset_c, dtype=polars.Float64), ) # Calculate new alpha and beta by applying the offset to the reported # positions (i.e., the ones the positioners believe they are at). alpha_new = offsets["alpha_reported"] + offsets["alpha_offset_corrected"] beta_new = offsets["beta_reported"] + offsets["beta_offset_corrected"] # Get new positions that are out of range and clip them. alpha_oor = ((alpha_new < 0.0) | (alpha_new > 360.0)).arg_true() beta_oor = ((beta_new < 0.0) | (beta_new > 180.0)).arg_true() alpha_new[alpha_oor] = numpy.clip(alpha_new[alpha_oor], 0, 360) beta_new[beta_oor] = numpy.clip(beta_new[beta_oor], 0, 180) # For the values out of range, recompute the corrected offsets. alpha_corr = alpha_new[alpha_oor] - offsets[alpha_oor, "alpha_reported"] offsets[alpha_oor, "alpha_offset_corrected"] = alpha_corr beta_corr = beta_new[beta_oor] - offsets[beta_oor, "beta_reported"] offsets[beta_oor, "beta_offset_corrected"] = beta_corr # Final check. If alpha/beta_new are NaNs, replace with reported values. alpha_new_nan = alpha_new.is_nan().arg_true() alpha_new[alpha_new_nan] = offsets[alpha_new_nan, "alpha_reported"] beta_new_nan = beta_new.is_nan().arg_true() beta_new[beta_new_nan] = offsets[beta_new_nan, "beta_reported"] # Save new positions. offsets = offsets.with_columns( alpha_new=alpha_new, beta_new=beta_new, ) # Set the invalid alpha/beta_measured back to NaN. conds = [ polars.when(polars.col.transformation_valid.not_()) .then(float("nan")) .otherwise(polars.col(column)) .alias(column) for column in [ "alpha_measured", "beta_measured", "alpha_offset", "beta_offset", "alpha_offset_corrected", "beta_offset_corrected", ] ] offsets = offsets.with_columns(*conds) self.offsets = offsets self.log("Finished calculating offsets.", level=logging.DEBUG) return offsets
[docs] async def write_proc_image( self, new_filename: Optional[str | pathlib.Path] = None, broadcast: bool = False, ) -> fits.HDUList: """Writes the processed image along with additional table data. If ``new_filename`` is not passed, defaults to adding the prefix ``proc-`` to the last processed image file path. """ if self.image_path is None or self.proc_hdu is None: raise FVCError( "No current image. Take and process " "an image before callin write_proc_image()." ) if new_filename is None: image_path = pathlib.Path(self.image_path) new_filename = image_path.with_name("proc-" + image_path.name) if self.fibre_data is None or self.raw_hdu is None or self.proc_hdu is None: raise FVCError("Need to run process_fvc_image before writing the image.") proc_hdus = fits.HDUList([fits.PrimaryHDU(), self.proc_hdu]) if self.configuration: proc_hdus[1].header["CONFIGID"] = self.configuration.configuration_id else: proc_hdus[1].header["CONFIGID"] = -999.0 proc_hdus[1].header["CAPPLIED"] = self.correction_applied # These are Pandas dataframes. We leave them be for convenience. positionerTable = calibration.positionerTable wokCoords = calibration.wokCoords fiducialCoords = calibration.fiducialCoords dfs: list[tuple[str, pandas.DataFrame]] = [ ("POSITIONERTABLE", positionerTable.reset_index()), ("WOKCOORDS", wokCoords.reset_index()), ("FIDUCIALCOORDS", fiducialCoords.reset_index()), ] # The FVCTransform tables are also Pandas. if self.fvc_transform is not None: if self.fvc_transform.positionerTableMeas is not None: pos_table_meas = self.fvc_transform.positionerTableMeas pos_table_meas = pos_table_meas.drop(columns=["index"]) dfs.append(("POSITIONERTABLEMEAS", pos_table_meas)) if self.fvc_transform.fiducialCoordsMeas is not None: fid_coords_meas = self.fvc_transform.fiducialCoordsMeas fid_coords_meas = fid_coords_meas.drop(columns=["index"]) dfs.append(("FIDUCIALCOORDSMEAS", fid_coords_meas)) for name, df in dfs: rec = Table.from_pandas(df).as_array() table = fits.BinTableHDU(rec, name=name) proc_hdus.append(table) # fibre_data is Polars. fibre_data = self.fibre_data.clone().sort("positioner_id") fibre_data_rec = Table.from_pandas(fibre_data.to_pandas()).as_array() proc_hdus.append(fits.BinTableHDU(fibre_data_rec, name="FIBERDATA")) for key, val in self.ieb_data.items(): proc_hdus[1].header[key] = val # Add header keywords from coordio.FVCTransfromAPO. if self.fvc_transform: try: proc_hdus[1].header.extend(self.fvc_transform.getMetadata()) except Exception as err: self.log( f"Cannot get FVCTransform metadata: {err}", logging.WARNING, broadcast=broadcast, ) posangles = None if self.fps: await self.fps.update_position() positions = self.fps.get_positions() current_positions = polars.DataFrame( { "positionerID": positions[:, 0].astype(int), "alphaReport": positions[:, 1], "betaReport": positions[:, 2], } ) current_positions = current_positions.with_columns( cmdAlpha=polars.lit(numpy.nan, dtype=polars.Float64), cmdBeta=polars.lit(numpy.nan, dtype=polars.Float64), startAlpha=polars.lit(numpy.nan, dtype=polars.Float64), startBeta=polars.lit(numpy.nan, dtype=polars.Float64), ) if self.configuration: robot_grid = self.configuration.robot_grid _cmd_alpha = [] _cmd_beta = [] _start_alpha = [] _start_beta = [] if len(list(robot_grid.robotDict.values())[0].alphaPath) > 0: for pid in current_positions["positionerID"]: robot = robot_grid.robotDict[pid] _cmd_alpha.append(robot.alphaPath[0][1]) _cmd_beta.append(robot.betaPath[0][1]) _start_alpha.append(robot.alphaPath[-1][1]) _start_beta.append(robot.betaPath[-1][1]) current_positions = current_positions.with_columns( cmdAlpha=polars.lit(_cmd_alpha, dtype=polars.Float64), cmdBeta=polars.lit(_cmd_beta, dtype=polars.Float64), startAlpha=polars.lit(_start_alpha, dtype=polars.Float64), startBeta=polars.lit(_start_beta, dtype=polars.Float64), ) posangles = Table.from_pandas(current_positions.to_pandas()).as_array() proc_hdus.append(fits.BinTableHDU(posangles, name="POSANGLES")) if self.centroids is not None: rec = Table.from_pandas(self.centroids.to_pandas()).as_array() else: rec = None proc_hdus.append(fits.BinTableHDU(rec, name="CENTROIDS")) offsets = None if self.offsets is not None: offsets = Table.from_pandas(self.offsets.to_pandas()).as_array() proc_hdus.append(fits.BinTableHDU(offsets, name="OFFSETS")) await run_in_executor(proc_hdus.writeto, new_filename, checksum=True) self.log(f"Processed HDU written to {new_filename}", broadcast=broadcast) self.proc_image_path = os.path.abspath(new_filename) return proc_hdus
[docs] async def apply_correction(self, offsets: Optional[polars.DataFrame] = None): """Applies the offsets. Fails if the trajectory is collided or deadlock.""" if self.fps.locked: raise FVCError("The FPS is locked. Cannot apply corrections.") self.log("Preparing correction trajectory.") if self.offsets is None and offsets is None: raise FVCError("Offsets not set or passed. Cannot apply correction.") if offsets is None: offsets = self.offsets assert offsets is not None await self.fps.update_position() target_distance = config["fvc"]["target_distance"] # Setup robot grid. grid = get_robot_grid(self.fps) for robot in grid.robotDict.values(): if robot.isOffline: continue positioner = self.fps[robot.id] robot.setAlphaBeta(positioner.alpha, positioner.beta) robot.setDestinationAlphaBeta(positioner.alpha, positioner.beta) offset_data = offsets.filter(polars.col.positioner_id == robot.id) if len(offset_data) == 0 or len(offset_data) > 1: raise ValueError(f"Invalid offset data for positioner {robot.id}.") if offset_data[0, "transformation_valid"]: continue new = offset_data[["alpha_new", "beta_new"]] dist = offset_data[["xwok_distance", "ywok_distance"]] * 1000.0 meas_distance = numpy.hypot(dist["xwok_distance"], dist["ywok_distance"])[0] if meas_distance > target_distance: robot.setDestinationAlphaBeta(new[0, "alpha_new"], new[0, "beta_new"]) else: # Mark robot offline to indicate that we won't move it. robot.isOffline = True # Check for collisions. If robots are collided just leave them there. collided = grid.getCollidedRobotList() n_coll = len(collided) if n_coll > 0: for pid in collided: positioner = self.fps[pid] alpha = positioner.alpha beta = positioner.beta grid.robotDict[pid].setAlphaBeta(alpha, beta) grid.robotDict[pid].setDestinationAlphaBeta(alpha, beta) # Generate trajectories. (to_destination, _, did_fail, deadlocks) = await get_path_pair_in_executor( grid, path_generation_mode="greedy", ignore_did_fail=True, stop_if_deadlock=True, ignore_initial_collisions=True, ) if did_fail: log.warning( f"Found {len(deadlocks)} deadlocks but applying correction anyway." ) self.log("Sending correction trajectory.") try: await self.fps.send_trajectory(to_destination, command=self.command) except TrajectoryError as err: raise FVCError(f"Failed executing the correction trajectory: {err}") self.correction_applied = True self.log("Correction applied.")
[docs] def write_summary_F( self, path: str | pathlib.Path | None = None, plot: bool = True, extra_headers: dict = {}, broadcast: bool = False, ): """Updates data with the last measured positions and write confSummaryF.""" if self.configuration is None: raise FVCError("write_summary_F can only be called with a configuration.") if self.fibre_data is None: raise FVCError("No fibre data.") self.log("Updating coordinates.", level=logging.DEBUG, broadcast=broadcast) fdata = self.fibre_data.clone().sort(["positioner_id", "fibre_type"]) keep_cols = ["positioner_id", "hole_id", "xwok_measured", "ywok_measured"] measured = ( fdata.filter(polars.col.fibre_type == "Metrology") .select(polars.col(keep_cols)) .fill_nan(None) .drop_nulls() ) new_alpha_beta: NewPositionsType = {} wok_data = get_wok_data(self.fps.observatory) for row in measured.iter_rows(named=True): (alpha, beta), _ = wok_to_positioner( row["hole_id"], self.fps.observatory, "Metrology", row["xwok_measured"], row["ywok_measured"], wok_data=wok_data, ) if not numpy.isnan(alpha) and alpha is not None: new_alpha_beta[row["positioner_id"]] = {"alpha": alpha, "beta": beta} configuration_copy = self.configuration.copy() configuration_copy.assignment.fibre_table = fdata.clone() # Update alpha/beta and upstream coordinates. configuration_copy.assignment.update_positioner_coordinates(new_alpha_beta) if self.proc_hdu and "IPA" in self.proc_hdu.header: rotator_angle = round(self.proc_hdu.header["IPA"], 2) else: rotator_angle = -999.0 headers = { "rotator_angle": rotator_angle, "fvc_centroid_method": self.centroid_method or "?", "fvc_rms": self.fitrms, "fvc_90_perc": self.perc_90, "fvc_percent_reached": self.fvc_percent_reached, "fvc_image_path": self.proc_image_path if self.proc_image_path else "", "temperature": self.ieb_data.get("TEMPT3", -999.0), } headers.update(extra_headers) configuration_copy.write_summary( path=path, flavour="F", headers=headers, overwrite=True, ) # Plot analysis of FVC loop. if plot and self.proc_image_path: if self.configuration.assignment.boresight is None: self.log( "Configuration does not have boresight set. " "Cannot produce FVC plots.", level=logging.WARNING, broadcast=broadcast, ) else: self.log("Creating FVC plots", level=logging.DEBUG, broadcast=broadcast) outpath = str(self.proc_image_path).replace(".fits", "_distances.pdf") plot_fvc_distances( self.configuration, configuration_copy.fibre_data, path=outpath, )
async def reprocess_configuration( configuration_id: int, path: pathlib.Path | str | None = None, centroid_method: str | None = None, use_suffix: bool = True, ): # pragma: no cover """Reprocesses the FVC image from a configuration with a different centroid method. Outputs a new ``confSummaryF`` file. Parameters ---------- configuration_id The configuration ID for which to reprocess data. Must have an existing ``confSummaryF`` file in ``$SDSSCORE_DIR``. path The path where to write the new ``confSummaryF`` file. If `None`, defaults to ``$SDSSCORE_DIR``. centroid_method The centroid method to use, one of ``"nudge"``, ``"sep"``, ``"winpos"``, or ``"simple"``. use_suffix If `True`, the new ``confSummaryF`` path file will have a suffix including the centroid mode used. Returns ------- path The path to the new ``confSummaryF`` file. """ site = config["observatory"] confSummaryF_path = Configuration._get_summary_file_path( configuration_id, site, "F", ) if not os.path.exists(confSummaryF_path): raise FileNotFoundError( f"Cannot find a confSummaryF file for {configuration_id}." ) header, _ = read_confSummary(confSummaryF_path) design = Design( header["design_id"], epoch=header["epoch"], scale=header["focal_scale"], ) fps = FPS.get_instance() assert not fps.can, "This function cannot be called on a running FPS instance." configuration = design.configuration configuration.configuration_id = configuration_id fvc = FVC(site) proc_fimg = header["fvc_image_path"] fimg = proc_fimg.replace("proc-", "") posangles = fits.getdata(proc_fimg, "POSANGLES") fiber_data = polars.DataFrame(fits.getdata(proc_fimg, "FIBERDATA")) positioner_coords = {} for row in posangles: positioner_coords[row["positionerID"]] = (row["alphaReport"], row["betaReport"]) fvc.process_fvc_image( fimg, positioner_coords, configuration=configuration, fibre_data=fiber_data, centroid_method=centroid_method, plot=False, ) if path is None: path = confSummaryF_path path = str(path) if use_suffix: path = path.replace(".par", f"_{fvc.centroid_method}.par") fvc.write_summary_F( path=path, plot=False, extra_headers={ "MJD": header["MJD"], "obstime": header["obstime"], "temperature": header["temperature"], }, )