#!/usr/bin/env python# -*- coding: utf-8 -*-## @Author: José Sánchez-Gallego (gallegoj@uw.edu)# @Date: 2021-05-12# @Filename: ieb.py# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)importmathimportosimportwarningsfromtypingimportAny,DictfromdriftimportDrift,DriftErrorfromjaegerimportconfigfromjaeger.exceptionsimportJaegerUserWarning__all__=["IEB","FVC_IEB","_get_category_data"]asyncdef_get_category_data(command,category)->list:"""Returns data about a device category."""ieb=command.actor.fps.iebschema=command.actor.model.schemaitems=schema["properties"][category]["items"]measured=[]asyncwithieb:foriteminitems:name=item["title"]type_=item["type"]device=ieb.get_device(name)value=(awaitdevice.read(connect=False))[0]iftype_=="boolean"anddevice.__type__=="relay":value=Trueifvalue=="closed"elseFalseeliftype_=="integer":value=int(value)eliftype_=="number":if"multipleOf"initem:precision=int(-math.log10(item["multipleOf"]))else:precision=3value=round(value,precision)measured.append(value)returnmeasured
[docs]classIEB(Drift):"""Thing wrapper around a :class:`~drift.drift.Drift` class. Allows additional features such as disabling the interface. """MAX_RETRIES:int=5def__init__(self,*args,**kwargs):self.disabled=Falsesuper().__init__(*args,**kwargs)self._categories=Noneself._n_failures:int=0
[docs]@classmethoddefcreate(cls,path=None):"""Creates an `.IEB` instance with the default configuration."""default_ieb_path=pathorconfig["files"]["ieb_config"]default_ieb_path=os.path.expanduser(os.path.expandvars(default_ieb_path))ifnotos.path.isabs(default_ieb_path):default_ieb_path=os.path.join(os.path.dirname(__file__),default_ieb_path)returncls.from_config(default_ieb_path)
[docs]defget_categories(self):"""Returns a list of categories."""ifself._categoriesisNone:categories=[device.categoryformoduleinself.modulesfordeviceinself.modules[module].devices.values()]self._categories=set(categories)self._categories.discard(None)returnself._categories
asyncdef__aenter__(self):ifself.disabled:raiseDriftError("IEB is disabled.")n_retries=0whileTrue:try:awaitDrift.__aenter__(self)breakexceptDriftError:n_retries+=1ifn_retries>=5:raiseDriftError("Failed connecting to the IEB.")asyncdef__aexit__(self,*args):awaitDrift.__aexit__(self,*args)
[docs]defenable(self):"""Re-enables the IEB instance."""self._n_failures=0self.disabled=False
[docs]asyncdefget_status(self)->Dict[str,Any]:"""Returns the status of the IEB components."""status={}forcategoryinself.get_categories():data=awaitself.read_category(category)fordeviceindata:ifself.get_device(device).__type__=="relay":value=Falseifdata[device][0]=="open"elseTrueelse:value=data[device][0]status[device]=valuereturnstatus
[docs]classFVC_IEB(IEB):"""Connects to the FVC IEB."""
[docs]@classmethoddefcreate(cls,path=None):"""Creates an `.FVC` instance with the default configuration."""default_ieb_path=config["fvc"]["config"]returnsuper().create(default_ieb_path)
classChiller(IEB):"""Connects to the chiller Modbus PLC."""@classmethoddefcreate(cls,path=None):"""Creates a `.Chiller` instance with the default configuration."""config_file=config["chiller"].get("config",None)ifconfig_fileisNone:warnings.warn("Chiller configuration missing.",JaegerUserWarning)returnNonereturnsuper().create(config_file)