Source code for jaeger.ieb
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2021-05-12
# @Filename: ieb.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
import math
import os
import warnings
from typing import Any, Dict
from drift import Drift, DriftError
from jaeger import config
from jaeger.exceptions import JaegerUserWarning
__all__ = ["IEB", "FVC_IEB", "_get_category_data"]
async def _get_category_data(command, category) -> list:
"""Returns data about a device category."""
ieb = command.actor.fps.ieb
schema = command.actor.model.schema
items = schema["properties"][category]["items"]
measured = []
async with ieb:
for item in items:
name = item["title"]
type_ = item["type"]
device = ieb.get_device(name)
value = (await device.read(connect=False))[0]
if type_ == "boolean" and device.__type__ == "relay":
value = True if value == "closed" else False
elif type_ == "integer":
value = int(value)
elif type_ == "number":
if "multipleOf" in item:
precision = int(-math.log10(item["multipleOf"]))
else:
precision = 3
value = round(value, precision)
measured.append(value)
return measured
[docs]
class IEB(Drift):
"""Thing wrapper around a :class:`~drift.drift.Drift` class.
Allows additional features such as disabling the interface.
"""
MAX_RETRIES: int = 5
def __init__(self, *args, **kwargs):
self.disabled = False
super().__init__(*args, **kwargs)
self._categories = None
self._n_failures: int = 0
[docs]
@classmethod
def create(cls, path=None):
"""Creates an `.IEB` instance with the default configuration."""
default_ieb_path = path or config["files"]["ieb_config"]
default_ieb_path = os.path.expanduser(os.path.expandvars(default_ieb_path))
if not os.path.isabs(default_ieb_path):
default_ieb_path = os.path.join(os.path.dirname(__file__), default_ieb_path)
return cls.from_config(default_ieb_path)
[docs]
def get_categories(self):
"""Returns a list of categories."""
if self._categories is None:
categories = [
device.category
for module in self.modules
for device in self.modules[module].devices.values()
]
self._categories = set(categories)
self._categories.discard(None)
return self._categories
async def __aenter__(self):
if self.disabled:
raise DriftError("IEB is disabled.")
n_retries = 0
while True:
try:
await Drift.__aenter__(self)
break
except DriftError:
n_retries += 1
if n_retries >= 5:
raise DriftError("Failed connecting to the IEB.")
async def __aexit__(self, *args):
await Drift.__aexit__(self, *args)
[docs]
def enable(self):
"""Re-enables the IEB instance."""
self._n_failures = 0
self.disabled = False
[docs]
async def get_status(self) -> Dict[str, Any]:
"""Returns the status of the IEB components."""
status = {}
for category in self.get_categories():
data = await self.read_category(category)
for device in data:
if self.get_device(device).__type__ == "relay":
value = False if data[device][0] == "open" else True
else:
value = data[device][0]
status[device] = value
return status
[docs]
class FVC_IEB(IEB):
"""Connects to the FVC IEB."""
[docs]
@classmethod
def create(cls, path=None):
"""Creates an `.FVC` instance with the default configuration."""
default_ieb_path = config["fvc"]["config"]
return super().create(default_ieb_path)
class Chiller(IEB):
"""Connects to the chiller Modbus PLC."""
@classmethod
def create(cls, path=None):
"""Creates a `.Chiller` instance with the default configuration."""
config_file = config["chiller"].get("config", None)
if config_file is None:
warnings.warn("Chiller configuration missing.", JaegerUserWarning)
return None
return super().create(config_file)