#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2018-08-27
# @Filename: can.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
import asyncio
import binascii
import collections
import pprint
import re
import socket
import time
import can
import can.interfaces.slcan
import can.interfaces.socketcan
import can.interfaces.virtual
import jaeger
import jaeger.interfaces.cannet
from jaeger import can_log, config, log, start_file_loggers
from jaeger.commands import CommandID, StopTrajectory
from jaeger.maskbits import CommandStatus
from jaeger.utils import Poller
__ALL__ = ['JaegerCAN', 'CANnetInterface', 'JaegerReaderCallback', 'INTERFACES']
LOG_HEADER = '({cmd.command_id.name}, {cmd.positioner_id}, {cmd.command_uid}):'
#: Accepted CAN interfaces and whether they are multibus.
INTERFACES = {
'slcan': {
'class': can.interfaces.slcan.slcanBus,
'multibus': False
},
'socketcan': {
'class': can.interfaces.socketcan.SocketcanBus,
'multibus': False
},
'virtual': {
'class': can.interfaces.virtual.VirtualBus,
'multibus': False
},
'cannet': {
'class': jaeger.interfaces.cannet.CANNetBus,
'multibus': True
}
}
[docs]class JaegerReaderCallback(can.Listener):
"""A message reader that triggers a callback on message received.
Parameters
----------
callback : function
The function to run when a new message is received.
loop : event loop or `None`
If an asyncio event loop, the callback will be called with
``call_soon``, otherwise it will be called immediately.
"""
def __init__(self, callback, loop=None):
self.callback = callback
self.loop = loop
[docs] def on_message_received(self, msg):
"""Calls the callback with the received message."""
if self.loop:
self.loop.call_soon(self.callback, msg)
else:
self.callback(msg)
[docs]class JaegerCAN(object):
"""A CAN interface with a command queue and reply handling.
Provides support for multi-channel CAN networks, with each channel being
able to host more than one bus. In general, a new instance of `.JaegerCAN`
is create via the `~.JaegerCAN.from_profile` classmethod.
Parameters
----------
interface_name : str
One of `~jaeger.can.INTERFACES`. Defines the
`python-can <https://python-can.readthedocs.io/en/stable/>`_ interface
to use.
channels : list
A list of channels to be used to instantiate the interfaces.
loop
The event loop to use.
fps : .FPS
The focal plane system.
args,kwargs
Arguments and keyword arguments to pass to the interfaces when
initialising it (e.g., port, baudrate, etc).
Attributes
----------
command_queue : asyncio.Queue
Queue of messages to be sent to the bus. The messages are sent as
soon as the bus has finished processing any commands with the same
``command_id`` and ``positioner_id``.
listener : JaegerReaderCallback
A `.JaegerReaderCallback` instance that runs a callback when
a new message is received from the bus.
multibus : bool
Whether the interfaces are multibus.
notifier : can.Notifier
A `can.Notifier` instance that processes messages from the list
of buses, asynchronously.
"""
def __init__(self, interface_name, channels, *args, loop=None, fps=None, **kwargs):
# Start can file logger
start_file_loggers(start_log=False, start_can=True)
self.loop = loop or asyncio.get_event_loop()
assert interface_name in INTERFACES, f'invalid interface {interface_name}.'
self.interface_name = interface_name
InterfaceClass = INTERFACES[interface_name]['class']
self.multibus = INTERFACES[interface_name]['multibus']
if not isinstance(channels, (list, tuple)):
channels = [channels]
self.fps = fps
#: list: A list of `python-can`_ interfaces, one for each of the ``channels``.
self.interfaces = []
for channel in channels:
log.info(f'creating interface {interface_name}, '
f'channel={channel!r}, args={args}, kwargs={kwargs}.')
try:
self.interfaces.append(InterfaceClass(channel, *args, **kwargs))
except ConnectionResetError:
log.error(f'connection to {interface_name}:{channel} failed. '
'Possibly another instance is connected to the device.')
except (socket.timeout, ConnectionRefusedError, OSError):
log.error(f'connection to {interface_name}:{channel} failed. '
'The device is not responding.')
except Exception as ee:
raise ee.__class__(
f'connection to {interface_name}:{channel} failed: {ee}.')
if len(self.interfaces) == 0:
raise jaeger.JaegerError('cannot connect to any interface.')
self._start_notifier()
#: list: Currently running commands.
self.running_commands = []
self.command_queue = asyncio.Queue()
self._command_queue_task = self.loop.create_task(self._process_queue())
def _start_notifier(self):
"""Starts the listener and notifiers."""
self.listener = JaegerReaderCallback(self._process_reply, loop=self.loop)
self.notifier = can.notifier.Notifier(self.interfaces, [self.listener], loop=self.loop)
log.debug('started JaegerReaderCallback listener and notifiers')
def _process_reply(self, msg):
"""Processes replies from the bus."""
positioner_id, command_id, reply_uid, __ = \
jaeger.utils.parse_identifier(msg.arbitration_id)
if command_id == CommandID.COLLISION_DETECTED:
log.error(f'a collision was detected in positioner {positioner_id}. '
'Sending STOP_TRAJECTORIES and locking the FPS.')
# Manually send the stop trajectory to be sure it has
# priority over other messages. No need to do it if the FPS
# has been locked, which means that we have already stopped
# trajectories.
if self.fps and self.fps.locked:
return
stop_trajectory_command = StopTrajectory(positioner_id=0)
self.send_to_interface(stop_trajectory_command.get_messages()[0])
# Now lock the FPS. No need to abort trajectories because we just did.
if self.fps:
self.loop.create_task(self.fps.lock(stop_trajectories=False))
return
if command_id == 0:
can_log.warning('invalid command with command_id=0, '
f'arbitration_id={msg.arbitration_id} received. '
'Ignoring it.')
return
command_id_flag = CommandID(command_id)
# Remove done running command. Leave the failed and cancelled ones in
# the list for 60 seconds to be able to catch delayed replies. We
# also sort them so that the most recent commands are found first.
# This is important for timed out broadcast still in the list while
# another instance of the same command is running. We want replies to
# be sent to the running command first.
self.running_commands = sorted(
[rcmd for rcmd in self.running_commands
if not rcmd.status == rcmd.status.DONE and (time.time() - rcmd.start_time) < 60],
key=lambda cmd: cmd.start_time, reverse=True)
found = False
for r_cmd in self.running_commands:
if r_cmd.command_id == command_id:
if ((reply_uid == 0 and r_cmd.positioner_id == 0) or
(reply_uid in r_cmd.message_uids and
positioner_id == r_cmd.positioner_id)):
found = True
break
if found:
can_log.debug(f'({command_id_flag.name}, '
f'{positioner_id}, {r_cmd.command_uid}): '
f'queuing reply UID={reply_uid} '
f'to command {r_cmd.command_uid}.')
r_cmd.reply_queue.put_nowait(msg)
else:
can_log.debug(f'({command_id_flag.name}, {positioner_id}): '
f'cannot find running command for reply UID={reply_uid}.')
[docs] def send_to_interface(self, message, interfaces=None, bus=None):
"""Sends the message to the appropriate interface and bus."""
log_header = (f'({message.command.command_id.name}, '
f'{message.command.positioner_id} '
f'{message.command.command_uid!s}): ')
if len(self.interfaces) == 1 and not self.multibus:
data_hex = binascii.hexlify(message.data).decode()
can_log.debug(log_header + 'sending message with '
f'arbitration_id={message.arbitration_id}, '
f'UID={message.uid}, '
f'and data={data_hex!r} to interface.')
self.interfaces[0].send(message)
return
# If not interface, send the message to all interfaces.
if interfaces is None:
interfaces = self.interfaces
elif isinstance(interfaces, can.BusABC):
interfaces = [interfaces]
for iface in interfaces:
iface_idx = self.interfaces.index(iface)
data_hex = binascii.hexlify(message.data).decode()
can_log.debug(log_header + 'sending message with '
f'arbitration_id={message.arbitration_id}, '
f'UID={message.uid}, '
f'and data={data_hex!r} to '
f'interface {iface_idx}, '
f'bus={0 if not bus else bus!r}.')
if bus:
iface.send(message, bus=bus)
else:
iface.send(message)
async def _process_queue(self):
"""Processes messages in the command queue."""
while True:
cmd = await self.command_queue.get()
log_header = LOG_HEADER.format(cmd=cmd)
if cmd.status != CommandStatus.READY:
if cmd.status != CommandStatus.CANCELLED:
can_log.error(f'{log_header} command is not ready '
f'(status={cmd.status.name!r})')
cmd.cancel()
continue
can_log.debug(log_header + ' sending messages to CAN bus.')
cmd.status = CommandStatus.RUNNING
try:
self._send_messages(cmd)
self.running_commands.append(cmd)
except jaeger.JaegerError as ee:
can_log.error(f'found error while getting messages: {ee}')
continue
def _send_messages(self, cmd):
"""Sends messages to the interface.
This method exists separate from _process_queue so that it can be used
to send command messages to the interface synchronously.
"""
log_header = LOG_HEADER.format(cmd=cmd)
messages = cmd.get_messages()
for message in messages:
# Get the interface and bus to which to send the message
interfaces = getattr(cmd, '_interface', None)
bus = getattr(cmd, '_bus', None)
if cmd.status.failed:
can_log.debug(log_header + ' not sending more messages ' +
'since this command has failed.')
self.running_commands.remove(cmd)
break
self.send_to_interface(message, interfaces=interfaces, bus=bus)
[docs] @classmethod
def from_profile(cls, profile=None, **kwargs):
"""Creates a new bus interface from a configuration profile.
Parameters
----------
profile : `str` or `None`
The name of the profile that defines the bus interface, or `None`
to use the default configuration.
"""
assert 'profiles' in config, \
'configuration file does not have an interfaces section.'
if profile is None:
assert 'default' in config['profiles'], \
'default interface not set in configuration.'
profile = config['profiles']['default']
if profile not in config['profiles']:
raise ValueError(f'invalid interface profile {profile}')
config_data = config['profiles'][profile].copy()
interface = config_data.pop('interface')
if interface not in INTERFACES:
raise ValueError(f'invalid interface {interface}')
if 'channel' in config_data:
channels = [config_data.pop('channel')]
elif 'channels' in config_data:
channels = config_data.pop('channels')
assert isinstance(channels, (list, tuple)), 'channels must be a list'
else:
raise KeyError('channel or channels key not found.')
if interface == 'cannet':
return CANnetInterface(interface, channels, **kwargs, **config_data)
return cls(interface, channels, **kwargs, **config_data)
[docs] @staticmethod
def print_profiles():
"""Prints interface profiles and returns a list of profile names."""
pprint.pprint(config['interfaces'])
return config['interfaces'].keys()
CANNET_ERRORS = {
0: 'Unknown error <error_code>',
1: 'CAN <port_num> baud rate not found',
2: 'CAN <port_num> stop failed',
3: 'CAN <port_num> start failed',
4: 'CAN <port_num> extended filter is full',
5: 'CAN <port_num> standard open filter set twice',
6: 'CAN <port_num> standard filter is full',
7: 'CAN <port_num> invalid identifier or mask for filter add',
8: 'CAN <port_num> baud rate detection is busy',
9: 'CAN <port_num> invalid parameter type',
10: 'CAN <port_num> invalid CAN state',
11: 'CAN <port_num> invalid parameter mode',
12: 'CAN <port_num> invalid port number',
13: 'CAN <port_num> init auto baud failed',
14: 'CAN <port_num> filter parameter is missing',
15: 'CAN <port_num> bus off parameter is missing',
16: 'CAN <port_num> parameter is missing',
17: 'DEV parameter is missing',
18: 'CAN <port_num> invalid parameter brp',
19: 'CAN <port_num> invalid parameter sjw',
20: 'CAN <port_num> invalid parameter tSeg1',
21: 'CAN <port_num> invalid parameter tSeg2',
22: 'CAN <port_num> init custom failed',
23: 'CAN <port_num> init failed',
24: 'CAN <port_num> reset failed',
25: 'CAN <port_num> filter parameter is missing',
27: 'CYC parameter is missing',
28: 'CYC message <msg_num> stop failed',
29: 'CYC message <msg_num> init failed',
30: 'CYC message <msg_num> invalid parameter port',
31: 'CYC message <msg_num> invalid parameter msg_num',
32: 'CYC message <msg_num> invalid parameter time',
33: 'CYC message <msg_num> invalid parameter data'
}
[docs]class CANnetInterface(JaegerCAN):
r"""An interface class specifically for the CAN\@net 200/420 device.
This class bahaves as `.JaegerCAN` but allows communication with the
device itself and tracks its status.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._device_status = collections.defaultdict(dict)
# Get ID and version of interfaces. No need to ask for this in each
# status poll.
for interface in self.interfaces:
interface.write('DEV IDENTIFY')
interface.write('DEV VERSION')
# We use call_soon later to be sure the event loop is running when we start
# the poller. This prevents problems when using the library in IPython.
self.device_status_poller = Poller('cannet_device',
self._get_device_status,
delay=5,
loop=self.loop)
self.loop.call_soon(self.device_status_poller.start)
def _process_reply(self, msg):
"""Processes a message checking first if it comes from the device."""
if msg.arbitration_id == 0:
return self.handle_device_message(msg)
super()._process_reply(msg)
@property
def device_status(self):
"""Returns a dictionary with the status of the device."""
if not self.device_status_poller.running:
raise ValueError('the device status poller is not running.')
return self._device_status
[docs] def handle_device_message(self, msg):
"""Handles a reply from the device (i.e., not from the CAN network)."""
device_status = self._device_status
interface_id = self.interfaces.index(msg.interface)
message = msg.data.decode()
can_log.debug(f'received message {message!r} from interface ID {interface_id}.')
if message.lower() == 'r ok':
return
dev_identify = re.match(r'^R (?P<device>CAN@net \w+ \d+)$', message)
dev_version = re.match(r'^R V(?P<version>(\d+\.*)+)$', message)
dev_error = re.match(r'^R ERR (?P<error_code>\d{1,2}) (?P<error_descr>\.+)$', message)
dev_event = re.match(r'^E (?P<bus>\d+) (?P<event>.+)$', message)
can_status = re.match(r'^R CAN (?P<bus>\d+) (?P<status>[-|\w]{5}) '
r'(?P<buffer>\d+)$', message)
if dev_identify:
device = dev_identify.group('device')
device_status[interface_id]['name'] = device
elif dev_version:
version = dev_version.group('version')
device_status[interface_id]['version'] = version
elif dev_error:
if 'errors' not in device_status[interface_id]:
device_status[interface_id]['errors'] = []
device_status[interface_id]['errors'].insert(
0, {'error_code': dev_error.group('error_code'),
'error_description': dev_error.group('error_descr'),
'timestamp': str(msg.timestamp)})
elif dev_event:
bus, event = dev_event.groups()
bus = int(bus)
if 'events' not in device_status[interface_id]:
device_status[interface_id]['events'] = collections.defaultdict(list)
device_status[interface_id]['events'][bus].insert(
0, {'event': event, 'timestamp': str(msg.timestamp)})
elif can_status:
bus, status, buffer = can_status.groups()
bus = int(bus)
buffer = int(buffer)
# Unpack the status characters. If they are different than '-', they are True.
status_bool = list(map(lambda x: x != '-', status))
bus_off, error_warning, data_overrun, transmit_pending, init_state = status_bool
device_status[interface_id][bus] = {'status': status,
'buffer': buffer,
'bus_off': bus_off,
'error_warning_level': error_warning,
'data_overrun_detected': data_overrun,
'transmit_pending': transmit_pending,
'init_state': init_state,
'timestamp': str(msg.timestamp)}
else:
can_log.debug(f'message {message!r} cannot be parsed.')
def _get_device_status(self):
"""Sends commands to the devices to get status updates."""
for interface in self.interfaces:
for bus in interface.buses:
interface.write(f'CAN {bus} STATUS')