Source code for jaeger.commands.bootloader

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2018-09-09
# @Filename: bootloader.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import contextlib
import os
import pathlib
import time
import warnings
import zlib

from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional

import numpy

from jaeger import can_log, config, log
from jaeger.commands import Command, CommandID
from jaeger.exceptions import JaegerError, JaegerUserWarning
from jaeger.maskbits import BootloaderStatus
from jaeger.utils import int_to_bytes


if TYPE_CHECKING:
    from jaeger import FPS


__all__ = [
    "load_firmware",
    "StartFirmwareUpgrade",
    "GetFirmwareVersion",
    "SendFirmwareData",
]


[docs] async def load_firmware( fps: FPS, firmware_file: str | pathlib.Path, positioners: Optional[List[int]] = None, messages_per_positioner: Optional[int] = None, force: bool = False, show_progressbar: bool = False, progress_callback: Optional[Callable[[int, int], Any]] = None, stop_logging: bool = True, ): """Convenience function to run through the steps of loading a new firmware. This function is a coroutine and not intendend for direct use. Use the ``jaeger`` CLI instead. Parameters ---------- fps `~jaeger.fps.FPS` instance to which the commands will be sent. firmware_file Binary file containing the firmware to load. positioners A list of positioner ids whose firmware to update, or `None` to update all the positioners in ``fps``. messages_per_positioner How many messages to send to each positioner at once. This can improve the performance but also overflow the CAN bus buffer. With the default value of `None`, reverts to the configuration value ``positioner.firmware_messages_per_positioner``. force Forces the firmware load to continue even if some positioners are not responding or are not in bootloader mode. show_progressbar Whether to show a progress bar. progress_callback A function to call as data gets transferred to the positioners. The callback is called with ``(current_chunk, n_chuck)`` where ``current_chunk`` is the number of the data chunk being sent and ``n_chunk`` is the total number of chunks in the data package. stop_logging Disable logging to file for the CAN logger to improve performance. """ if show_progressbar: try: import progressbar except ImportError: warnings.warn( "progressbar2 is not installed. Cannot show a progress bar.", JaegerUserWarning, ) progressbar = None show_progressbar = False else: progressbar = None start_time = time.time() firmware_file = pathlib.Path(firmware_file) assert firmware_file.exists(), "cannot find firmware file" log.info(f"firmware file {firmware_file!s} found.") # Open firmware data as binary. firmware_data = open(firmware_file, "rb") crc32 = zlib.crc32(firmware_data.read()) filesize = os.path.getsize(firmware_file) # Check to make sure all positioners are in bootloader mode. valid_positioners = [] n_bad = 0 for positioner_id in fps.positioners: if positioners is not None and positioner_id not in positioners: continue positioner = fps.positioners[positioner_id] if ( not positioner.is_bootloader() or BootloaderStatus.BOOTLOADER_INIT not in positioner.status or BootloaderStatus.UNKNOWN in positioner.status ): n_bad += 1 continue valid_positioners.append(positioner) if len(valid_positioners) == 0: raise JaegerError( "no positioners found in bootloader mode or with valid status." ) if n_bad > 0: msg = f"{n_bad} positioners not in bootloader mode or state is invalid." if force: warnings.warn(msg + " Proceeding becasuse force=True.", JaegerUserWarning) else: raise JaegerError(msg) log.info("stopping pollers") await fps.pollers.stop() log.info(f"upgrading firmware on {len(valid_positioners)} positioners.") start_firmware_payload = int_to_bytes(filesize) + int_to_bytes(crc32) log.info(f"CRC32: {crc32}") log.info(f"File size: {filesize} bytes") pids = [pos.positioner_id for pos in valid_positioners] cmd = await fps.send_command( CommandID.START_FIRMWARE_UPGRADE, positioner_ids=pids, data=[start_firmware_payload], ) if cmd.status.failed or cmd.status.timed_out: log.error("firmware upgrade failed.") return False # Restore pointer to start of file firmware_data.seek(0) log.info("starting data send.") if stop_logging and can_log.fh: fh_handler = can_log.handlers.pop(can_log.handlers.index(can_log.fh)) else: fh_handler = None chunk_size = 8 n_chunks = int(numpy.ceil(filesize / chunk_size)) with contextlib.ExitStack() as stack: if show_progressbar and progressbar: bar = stack.enter_context(progressbar.ProgressBar(max_value=n_chunks)) else: bar = None messages_default = config["positioner"]["firmware_messages_per_positioner"] messages_per_positioner = messages_per_positioner or messages_default assert isinstance(messages_per_positioner, int) ii = 0 while True: cmds = [] stop = False for __ in range(messages_per_positioner): chunk = firmware_data.read(chunk_size) packetdata = bytearray(chunk) # packetdata.reverse() # IMPORTANT! no longer needed for P1 if len(packetdata) == 0: stop = True break cmds.append( fps.send_command( CommandID.SEND_FIRMWARE_DATA, positioner_ids=pids, data=[packetdata], timeout=15, ) ) await asyncio.gather(*cmds) if any(cmd.status.failed or cmd.status.timed_out for cmd in cmds): log.error("firmware upgrade failed.") if fh_handler: can_log.addHandler(fh_handler) return False ii += messages_per_positioner if show_progressbar and bar: if ii < n_chunks: bar.update(ii) if progress_callback: progress_callback(ii, n_chunks) if stop: break log.info("firmware upgrade complete.") if fh_handler: can_log.addHandler(fh_handler) total_time = time.time() - start_time log.info(f"upgrading firmware took {total_time:.2f} s.") return True
[docs] class GetFirmwareVersion(Command): command_id = CommandID.GET_FIRMWARE_VERSION broadcastable = True safe = True bootloader = True
[docs] def get_replies(self) -> Dict[int, Any]: return self.get_firmware()
[docs] def get_firmware(self, positioner_id=None) -> Dict[int, str]: """Returns the firmware version string. Parameters ---------- positioner_id : int The positioner for which to return the version. If `None` returns a dictionary with the firmware version of all the positioners that replied. Returns ------- firmware A string or dictionary of string with the firmware version(s), with the format ``'XX.YY.ZZ'`` where ``YY='80'`` if the positioner is in bootloader mode. Raises ------ ValueError If no positioner with ``positioner_id`` has replied. """ def format_version(reply): return ".".join(format(byt, "02d") for byt in reply.data[0:3][::-1]) firmwares = {} for reply in self.replies: version = format_version(reply) firmwares[reply.positioner_id] = version return firmwares
[docs] @staticmethod def encode(firmware): """Returns the bytearray encoding the firmware version.""" chunks = firmware.split(".")[::-1] data = b"" for chunk in chunks: data += int_to_bytes(int(chunk), "u1") return data
[docs] class StartFirmwareUpgrade(Command): command_id = CommandID.START_FIRMWARE_UPGRADE broadcastable = False safe = True bootloader = True
[docs] class SendFirmwareData(Command): command_id = CommandID.SEND_FIRMWARE_DATA broadcastable = False safe = True bootloader = True