#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2021-11-28
# @Filename: kaiju.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
from __future__ import annotations
import time
import warnings
from typing import TYPE_CHECKING, Literal, Optional, Sequence, cast
import numpy
from matplotlib.figure import Figure
from jaeger import config, log
from jaeger.exceptions import JaegerError, JaegerUserWarning, TrajectoryError
from jaeger.utils.helpers import run_in_executor
if TYPE_CHECKING:
from kaiju import RobotGridCalib
from jaeger.fps import FPS
__all__ = [
"get_robot_grid",
"dump_robot_grid",
"decollide",
"get_path_pair",
"get_snapshot",
"unwind",
"explode",
"get_path_pair_in_executor",
"decollide_in_executor",
"check_trajectory",
]
ArmTrajectoryType = Sequence[tuple[float, float]]
TrajectoryType = dict[int, dict[Literal["alpha", "beta"], ArmTrajectoryType]] | None
PathPairReturnType = tuple[TrajectoryType, TrajectoryType, bool, list[int]]
def warn(message):
warnings.warn(message, JaegerUserWarning)
[docs]
def get_robot_grid(fps: FPS | None, seed: int | None = None, collision_buffer=None):
"""Returns a new robot grid with the destination set to the lattice position.
If an initialised instance of the FPS is available, disabled robots will be
set offline in the grid at their current positions.
"""
from kaiju.robotGrid import RobotGridCalib
if seed is None:
t = 1000 * time.time()
seed = int(int(t) % 2**32 / 1000)
kaiju_config = config["kaiju"]
ang_step = kaiju_config["ang_step"]
collision_buffer = collision_buffer or kaiju_config["collision_buffer"]
alpha0, beta0 = kaiju_config["lattice_position"]
epsilon = ang_step * kaiju_config["epsilon_factor"]
if collision_buffer < 1.5:
raise JaegerError("Invalid collision buffer < 1.5.")
log.debug(f"Creating RobotGridCalib with stepSize={ang_step}, epsilon={epsilon}.")
robot_grid = RobotGridCalib(stepSize=ang_step, epsilon=epsilon, seed=seed)
robot_grid.setCollisionBuffer(collision_buffer)
# TODO: This is a bit hacky. Kaiju doesn't have a collisionBuffer anymore
# as collision buffers are per robot, but I want to keep this information
# for when I dump and reload robot grids.
robot_grid.collisionBuffer = collision_buffer
for robot in robot_grid.robotDict.values():
if fps is not None and robot.id in fps and fps[robot.id].disabled is True:
positioner = fps[robot.id]
robot.setDestinationAlphaBeta(positioner.alpha, positioner.beta)
robot.setAlphaBeta(positioner.alpha, positioner.beta)
robot.isOffline = True
else:
robot.setDestinationAlphaBeta(alpha0, beta0)
robot.setAlphaBeta(alpha0, beta0)
return robot_grid
[docs]
def dump_robot_grid(robot_grid: RobotGridCalib) -> dict:
"""Dump the information needed to restore a robot grid into a dictionary."""
data = {}
data["collision_buffer"] = robot_grid.collisionBuffer
data["grid"] = {}
for robot in robot_grid.robotDict.values():
alpha = robot.alpha
beta = robot.beta
destinationAlpha = robot.destinationAlpha
destinationBeta = robot.destinationBeta
data["grid"][robot.id] = (
alpha,
beta,
destinationAlpha,
destinationBeta,
robot.isOffline,
)
return data
def load_robot_grid(data: dict, set_destination: bool = True) -> RobotGridCalib:
"""Restores a robot grid from a dump."""
collision_buffer = data["collision_buffer"]
robot_grid = get_robot_grid(None, collision_buffer=collision_buffer)
for robot in robot_grid.robotDict.values():
data_robot = data["grid"][robot.id]
robot.setAlphaBeta(data_robot[0], data_robot[1])
if set_destination:
robot.setDestinationAlphaBeta(data_robot[2], data_robot[3])
if data_robot[4] is True:
robot.isOffline = True
return robot_grid
[docs]
def decollide(
robot_grid: Optional[RobotGridCalib] = None,
data: Optional[dict] = None,
simple: bool = False,
decollide_grid_fallback: bool = False,
priority_order: list[int] = [],
) -> tuple[RobotGridCalib | dict, list[int]]:
"""Decollides a potentially collided grid. Raises on fail.
Parameters
----------
robot_grid
The Kaiju ``RobotGridCalib`` instance to decollide.
data
A dictionary of data that can be used to reload a Kaiju robot grid
using `.load_robot_grid`. This is useful if the function is being
run in an executor.
simple
Runs ``decollideGrid()`` and returns.
decollide_grid_fallback
If `True`, runs ``decollideGrid()`` if the positioner-by-positioner
decollision fails.
priority_list
A sorted list of positioner IDs with the order of which positioners to
try to keep at their current positions. Positioners earlier in the list
will be decollided last. Ignore in case of ``simple=True``.
Returns
-------
grid,decollided
If ``robot_grid`` is passed, returns the same grid instance after decollision.
If ``data`` is passed, returns a dictionary describing the decollided grid
that can be used to recreate a grid using `.load_robot_grid`. Also returns
the list of robots that have been decollided.
"""
if robot_grid is not None and data is not None:
raise JaegerError("robot_grid and data are mutually exclusive.")
if data is not None:
robot_grid = load_robot_grid(data)
assert robot_grid is not None
if simple:
collided = robot_grid.getCollidedRobotList()
robot_grid.decollideGrid()
if len(robot_grid.getCollidedRobotList()) > 0:
raise JaegerError("Failed decolliding grid.")
if data is not None:
return dump_robot_grid(robot_grid), collided
else:
return robot_grid, collided
# First pass. If collided, decollide each robot one by one.
collided = robot_grid.getCollidedRobotList()
# Sort collided robots by priority order.
collided = sorted(
collided,
key=lambda x: (
len(priority_order) - priority_order.index(x) if x in priority_order else -1
),
)
decollided = []
if len(collided) > 0:
for robot_id in collided:
if robot_grid.isCollided(robot_id):
if robot_grid.robotDict[robot_id].isOffline:
continue
robot_grid.decollideRobot(robot_id)
decollided.append(robot_id) # Even if we failed it may have moved.
if robot_grid.isCollided(robot_id):
raise JaegerError(f"Failed decolliding positioner {robot_id}.")
# Second pass. If still collided, try a grid decollision.
if len(robot_grid.getCollidedRobotList()) > 0:
if decollide_grid_fallback:
warn("Grid is still colliding. Attempting full grid decollision.")
robot_grid.decollideGrid()
if robot_grid.getCollidedRobotList() is not False:
raise JaegerError("Failed decolliding grid.")
# We don't know which robots were decollided so assume all collided
# robots have moved.
decollided = collided
else:
raise JaegerError("Failed decolliding grid.")
if data is not None:
return dump_robot_grid(robot_grid), decollided
else:
return robot_grid, decollided
[docs]
def get_path_pair(
robot_grid: Optional[RobotGridCalib] = None,
data: Optional[dict] = None,
path_generation_mode: str | None = None,
ignore_did_fail: bool = False,
explode_deg: float = 5,
explode_positioner_id: int | None = None,
speed=None,
smooth_points=None,
path_delay=None,
collision_shrink=None,
greed: float | None = None,
phobia: float | None = None,
stop_if_deadlock: bool = False,
ignore_initial_collisions: bool = False,
) -> PathPairReturnType:
"""Runs path generation and returns the to and from destination paths.
Parameters
----------
robot_grid
The Kaiju ``RobotGridCalib`` instance to decollide.
data
A dictionary of data that can be used to reload a Kaiju robot grid
using `.load_robot_grid`. This is useful if the function is being
run in an executor.
path_generation_mode
Defines the path generation algorithm to use.
Either ``greedy``, ``mdp``, ``explode`` or ``explode_one``. If
`None`, defaults to ``kaiju.default_path_generator``.
ignore_did_fail
Generate paths even if path generation failed (i.e., deadlocks).
explode_deg
Degrees for ``pathGenExplode``.
explode_positioner_id
The positioner to explode.
speed, smooth_points, path_delay, collision_shrink
Kaiju parameters to pass to ``getPathPair``. Otherwise uses the default
configuration values.
phobia, greed
Parameters for ``pathGenMDP``. If not set uses ``kaiju`` configuration
values.
stop_if_deadlock
If `True`, detects deadlocks early in the path and returns shorter
trajectories (at the risk of some false positive deadlocks).
ignore_initial_collisions
If `True`, does not fail if the initial state is collided. To be used
only for offsets.
Returns
-------
paths
A tuple with the to destination path, from destination path, whether path
generation failed, and the list of deadlocks
"""
if path_generation_mode is None:
path_generation_mode = cast(str, config["kaiju"]["default_path_generator"])
if robot_grid is not None and data is not None:
raise JaegerError("robot_grid and data are mutually exclusive.")
if data is not None:
set_destination = False if path_generation_mode == "explode" else True
robot_grid = load_robot_grid(data, set_destination=set_destination)
assert robot_grid is not None
deadlocks = []
if path_generation_mode == "explode":
log.debug(f"Running pathGenExplode with explode_deg={explode_deg}.")
robot_grid.pathGenExplode(explode_deg)
elif path_generation_mode == "explode_one":
log.debug(
f"Running pathGenExplodeOne with explode_deg={explode_deg}, "
f"explode_positioner_id={explode_positioner_id}."
)
robot_grid.pathGenExplodeOne(explode_deg, explode_positioner_id)
elif path_generation_mode == "greedy":
log.debug(f"Running pathGenGreedy with stopIfDeadlock={stop_if_deadlock}.")
robot_grid.pathGenGreedy(
stopIfDeadlock=stop_if_deadlock,
ignoreInitialCollisions=ignore_initial_collisions,
)
elif path_generation_mode == "mdp":
greed = greed or config["kaiju"]["greed"]
phobia = phobia or config["kaiju"]["phobia"]
log.debug(f"Running pathGenMDP with phobia={phobia}, greed={greed}.")
robot_grid.pathGenMDP2(
greed=greed,
phobia=phobia,
ignoreInitialCollisions=ignore_initial_collisions,
)
else:
raise ValueError(f"Invalid path_generation_mode={path_generation_mode!r}.")
if path_generation_mode in ["greedy", "mdp"]:
# Check for deadlocks.
deadlocks = robot_grid.deadlockedRobots()
if robot_grid.didFail and ignore_did_fail is False:
return (None, None, robot_grid.didFail, deadlocks)
speed = speed or config["kaiju"]["speed"]
smooth_points = smooth_points or config["kaiju"]["smooth_points"]
collision_shrink = collision_shrink or config["kaiju"]["collision_shrink"]
path_delay = path_delay or config["kaiju"]["path_delay"]
log.debug(
f"Running getPathPair with speed={speed}, smoothPoints={smooth_points}, "
f"collisionShrink={collision_shrink}, pathDelay={path_delay}."
)
to_destination, from_destination = robot_grid.getPathPair(
speed=speed,
smoothPoints=smooth_points,
collisionShrink=collision_shrink,
pathDelay=path_delay,
)
return (
to_destination,
from_destination,
robot_grid.didFail,
deadlocks,
)
[docs]
async def get_path_pair_in_executor(
robot_grid: RobotGridCalib,
**kwargs,
) -> PathPairReturnType:
"""Calls `.get_path_pair` with a process executor."""
data = dump_robot_grid(robot_grid)
traj_data = await run_in_executor(
get_path_pair,
data=data,
executor="process",
**kwargs,
)
return traj_data
[docs]
async def decollide_in_executor(
robot_grid: RobotGridCalib, **kwargs
) -> tuple[RobotGridCalib, list[int]]:
"""Calls `.decollide` with a process executor."""
data = dump_robot_grid(robot_grid)
decollided_data, collided = await run_in_executor(
decollide,
data=data,
executor="process",
**kwargs,
)
return load_robot_grid(decollided_data), collided
[docs]
async def unwind(
current_positions: dict[int, tuple[float | None, float | None]],
collision_buffer: float | None = None,
disabled: list[int] = [],
force: bool = False,
):
"""Folds all the robots to the lattice position.
This coroutine uses a process pool executor to run Kaiju routines.
"""
alpha0, beta0 = config["kaiju"]["lattice_position"]
# We create the data directly since it's simple. This should be a bit faster
# than creating a grid and dumping it.
data = {"collision_buffer": collision_buffer, "grid": {}}
for pid, (alpha, beta) in current_positions.items():
data["grid"][int(pid)] = (alpha, beta, alpha0, beta0, pid in disabled)
(to_destination, _, did_fail, deadlocks) = await run_in_executor(
get_path_pair,
data=data,
path_generation_mode="greedy" if force is True else None,
ignore_did_fail=force,
stop_if_deadlock=force,
executor="process",
)
if did_fail:
if force is False:
raise TrajectoryError(
"Failed generating a valid unwind trajectory. "
f"{len(deadlocks)} deadlocks were found ({deadlocks})."
)
else:
log.warning("Deadlocks found in unwind but proceeding anyway.")
return to_destination
[docs]
async def explode(
current_positions: dict[int, tuple[float | None, float | None]],
explode_deg=20.0,
collision_buffer: float | None = None,
disabled: list[int] = [],
positioner_id: int | None = None,
):
"""Explodes the grid by a number of degrees.
This coroutine uses a process pool executor to run Kaiju routines.
"""
alpha0, beta0 = config["kaiju"]["lattice_position"]
data = {"collision_buffer": collision_buffer, "grid": {}}
for pid, (alpha, beta) in current_positions.items():
data["grid"][int(pid)] = (alpha, beta, alpha0, beta0, pid in disabled)
if positioner_id is not None:
path_generation_mode = "explode_one"
else:
path_generation_mode = "explode"
(to_destination, *_) = await run_in_executor(
get_path_pair,
data=data,
path_generation_mode=path_generation_mode,
explode_deg=explode_deg,
explode_positioner_id=positioner_id,
ignore_did_fail=False,
executor="process",
)
return to_destination
def get_snapshot_async(
path: str,
robot_grid: Optional[RobotGridCalib] = None,
data: Optional[dict] = None,
highlight: int | None = None,
title: str | None = None,
):
"""Creates an FPS snapshot and saves it to disk. To be used with an executor.
Parameters
----------
path
The path where to save the file.
robot_grid
The Kaiju ``RobotGridCalib`` instance to plot.
data
A dictionary of data that can be used to reload a Kaiju robot grid
using `.load_robot_grid`. This is useful if the function is being
run in an executor.
highlight
Robot to highlight.
title
A title for the plot.
"""
import matplotlib.pyplot as plt
if robot_grid is not None and data is not None:
raise JaegerError("robot_grid and data are mutually exclusive.")
if data is not None:
robot_grid = load_robot_grid(data)
assert robot_grid is not None
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message=".+array interface is deprecated.+")
ax = robot_grid.plot_state(highlightRobot=highlight)
assert ax is not None
if title is not None:
ax.set_title(title)
plt.tight_layout()
figure = ax.figure
assert isinstance(figure, Figure)
figure.savefig(path)
plt.close("all")
[docs]
async def get_snapshot(
path: str,
fps: FPS | None = None,
positions: dict | None = None,
collision_buffer: float | None = None,
highlight: int | list | None = None,
title: str | None = None,
show_disabled: bool = True,
):
"""Plots a snapshot of the FPS and saves it to disk."""
from jaeger.fps import FPS
fps = fps or FPS.get_instance()
if fps.initialised is False:
await fps.initialise()
data = {"collision_buffer": collision_buffer, "grid": {}}
if positions is None:
await fps.update_position()
if len(fps.positioners) == 0:
raise ValueError("No positioners connected.")
for pid in fps.positioners.keys():
data["grid"][int(pid)] = (
fps[pid].alpha,
fps[pid].beta,
0,
0,
fps[pid].disabled if show_disabled else False,
)
else:
for pid in positions:
data["grid"][int(pid)] = (
positions[pid]["alpha"],
positions[pid]["beta"],
0,
0,
fps[pid].disabled if show_disabled else False,
)
await run_in_executor(
get_snapshot_async,
path,
data=data,
highlight=highlight,
title=title,
executor="process",
)
return True
[docs]
async def check_trajectory(
trajectory: dict,
current_positions: dict | None = None,
fps: FPS | None = None,
atol=1.0,
) -> bool:
"""Checks that the current position matches the starting point of a trajectory.
Parameters
----------
trajectory
The dictionary with the trajectory to check.
current_positions
A mapping of positioner ID to ``(alpha, beta)`` with the current arrangement
of the FPS array.
fps
If ``current_positions`` is not passed, the `.FPS` instance is used to
determine the current arrangement.
"""
if current_positions is None:
if fps:
await fps.update_position()
current_positions = fps.get_positions_dict(ignore_disabled=True)
else:
raise RuntimeError("Either current_positions or fps must be passed.")
if len(current_positions) == 0:
return False
if len(current_positions) != len(trajectory):
warn("Mismatch between number of positioners and trajectory.")
return False
array_current = numpy.zeros((len(current_positions), 2), dtype=numpy.float32)
array_trajectory = numpy.zeros((len(current_positions), 2), dtype=numpy.float32)
for ii, pid in enumerate(current_positions):
array_current[ii, :] = current_positions[pid]
if pid not in trajectory:
warn(f"Positioner {pid} is not in the trajectory.")
return False
alpha0 = trajectory[pid]["alpha"][0][0]
beta0 = trajectory[pid]["beta"][0][0]
array_trajectory[ii, :] = (alpha0, beta0)
if numpy.allclose(array_current, array_trajectory, atol=atol):
return True
else:
warn("Trajectory start and current positions do not match.")
return False