Source code for jaeger.commands.bootloader

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2018-09-09
# @Filename: bootloader.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

import asyncio
import contextlib
import os
import pathlib
import time
import warnings
import zlib

import numpy

from jaeger import commands, log
from jaeger.exceptions import JaegerError, JaegerUserWarning
from jaeger.maskbits import BootloaderStatus, ResponseCode
from jaeger.utils import int_to_bytes


__ALL__ = ['load_firmware', 'StartFirmwareUpgrade', 'GetFirmwareVersion',
           'SendFirmwareData']


def _process_replies(cmds):
    """Checks if any of the replies is invalid."""

    for cmd in cmds:

        cmd_acc = ResponseCode.COMMAND_ACCEPTED

        if (cmd is False or len(cmd.replies) == 0 or
                cmd_acc not in cmd.replies[0].response_code):
            log.error(f'positioner {cmd.positioner_id} did not replied or '
                      'command was not accepted. Cancelling firmware upgrade.')

            if len(cmd.replies) > 0:
                log.error(f'positioner {cmd.positioner_id}: response code to '
                          f'{cmd.command_id.name!r} is '
                          f'{cmd.replies[0].response_code}')

            return False

    return True


[docs]async def load_firmware(fps, firmware_file, positioners=None, force=False, show_progressbar=False, progress_callback=None): """Convenience function to run through the steps of loading a new firmware. This function is a coroutine and not intendend for direct use. Use the ``jaeger`` CLI instead. Parameters ---------- fps : `~jaeger.fps.FPS` `~jaeger.fps.FPS` instance to which the commands will be sent. firmware_file : str Binary file containing the firmware to load. positioners : `list` or `None` A list of positioner ids whose firmware to update, or `None` to update all the positioners in ``fps``. force : bool Forces the firmware load to continue even if some positioners are not responding or are not in bootloader mode. show_progressbar : bool Whether to show a progress bar. progress_callback : bool A function to call as data gets transferred to the positioners. The callback is called with ``(current_chunk, n_chuck)`` where ``current_chunk`` is the number of the data chunk being sent and ``n_chunk`` is the total number of chunks in the data package. """ if show_progressbar: try: import progressbar except ImportError: warnings.warn('progressbar2 is not installed. ' 'Cannot show a progress bar.', JaegerUserWarning) show_progressbar = False start_time = time.time() firmware_file = pathlib.Path(firmware_file) assert firmware_file.exists(), 'cannot find firmware file' log.info(f'firmware file {firmware_file!s} found.') # Open firmware data as binary. firmware_data = open(firmware_file, 'rb') crc32 = zlib.crc32(firmware_data.read()) filesize = os.path.getsize(firmware_file) # Check to make sure all positioners are in bootloader mode. valid_positioners = [] n_bad = 0 for positioner_id in fps.positioners: if positioners is not None and positioner_id not in positioners: continue positioner = fps.positioners[positioner_id] if (not positioner.is_bootloader() or BootloaderStatus.BOOTLOADER_INIT not in positioner.status or BootloaderStatus.UNKNOWN in positioner.status): n_bad += 1 continue valid_positioners.append(positioner) if len(valid_positioners) == 0: raise JaegerError('no positioners found in bootloader mode or ' 'with valid status.') return if n_bad > 0: msg = (f'{n_bad} positioners not in bootloader mode or state is invalid.') if force: warnings.warn(msg + ' Proceeding becasuse force=True.', JaegerUserWarning) else: raise JaegerError(msg) log.info('stopping pollers') await fps.pollers.stop() log.info(f'upgrading firmware on {len(valid_positioners)} positioners.') start_firmware_payload = int_to_bytes(filesize) + int_to_bytes(crc32) log.info(f'CRC32: {crc32}') log.info(f'File size: {filesize} bytes') cmds = [fps.send_command(commands.CommandID.START_FIRMWARE_UPGRADE, positioner_id=positioner.positioner_id, data=start_firmware_payload) for positioner in valid_positioners] await asyncio.gather(*cmds) if any(cmd.status.failed or cmd.status.timed_out for cmd in cmds): log.error('firmware upgrade failed.') return False # Restore pointer to start of file firmware_data.seek(0) log.info('starting data send.') chunk_size = 8 n_chunks = int(numpy.ceil(filesize / chunk_size)) with contextlib.ExitStack() as stack: if show_progressbar: bar = stack.enter_context(progressbar.ProgressBar(max_value=n_chunks)) ii = 0 while True: chunk = firmware_data.read(chunk_size) packetdata = bytearray(chunk) # packetdata.reverse() # IMPORTANT! no longer needed for P1 if len(packetdata) == 0: break cmds = [fps.send_command(commands.CommandID.SEND_FIRMWARE_DATA, positioner_id=positioner.positioner_id, data=packetdata, timeout=15) for positioner in valid_positioners] await asyncio.gather(*cmds) if any(cmd.status.failed or cmd.status.timed_out for cmd in cmds): log.error('firmware upgrade failed.') return False ii += 1 if show_progressbar: bar.update(ii) if progress_callback: progress_callback(ii, n_chunks) log.info('firmware upgrade complete.') total_time = time.time() - start_time log.info(f'upgrading firmware took {total_time:.2f}') return True
[docs]class GetFirmwareVersion(commands.Command): command_id = commands.CommandID.GET_FIRMWARE_VERSION broadcastable = True safe = True
[docs] def get_firmware(self, positioner_id=None): """Returns the firmware version string. Parameters ---------- positioner_id : int The positioner for which to return the version. This parameter is ignored unless the command is a broadcast. If `None` and the command is a broadcast, returns a list with the firmware version of all the positioners, in the order of `GetFirmwareVersion.replies`. Returns ------- firmware : `str` or `list` A string or list of string with the firmware version(s), with the format ``'XX.YY.ZZ'`` where ``YY='80'`` if the positioner is in bootloader mode. Raises ------ ValueError If no positioner with ``positioner_id`` has replied. """ def format_version(reply): return '.'.join(format(byt, '02d') for byt in reply.data[0:3][::-1]) # If not a broadcast, use the positioner_id of the command if self.positioner_id != 0: positioner_id = self.positioner_id if len(self.replies) == 0: raise ValueError('no positioners have replied to this command.') if positioner_id is None: return [format_version(reply) for reply in self.replies] else: reply = self.get_reply_for_positioner(positioner_id) if reply: return format_version(reply) else: return None
[docs] @staticmethod def encode(firmware): """Returns the bytearray encoding the firmware version.""" chunks = firmware.split('.')[::-1] data = b'' for chunk in chunks: data += int_to_bytes(int(chunk), 'u1') return data
[docs]class StartFirmwareUpgrade(commands.Command): command_id = commands.CommandID.START_FIRMWARE_UPGRADE broadcastable = False safe = True
[docs]class SendFirmwareData(commands.Command): command_id = commands.CommandID.SEND_FIRMWARE_DATA broadcastable = False safe = True