DECODE_CAN_MESSAGE
Python Code
from flojoy import flojoy, Stateful, DataFrame
import pandas as pd
import logging
from cantools.database import namedsignalvalue
@flojoy(deps={"python-can": "4.3.1", "cantools": "39.4.2"})
def DECODE_CAN_MESSAGE(
dbc: Stateful,
messages: Stateful,
ignore_undefined_id_error: bool = False,
) -> DataFrame:
"""DECODE_CAN a CAN message.
DECODE_CAN a CAN message using the provided databse.
Parameters
----------
dbc : Stateful
The database to use for decoding the message.
messages : Stateful
The message to DECODE_CAN. Must be a can.Message object.
ignore_undefined_id_error : bool
If True, ignore undefined id error. Default is False.
Returns
-------
DataFrame : DataFrame
Return dataframe containing the decoded message.
"""
db = dbc.obj
messages = messages.obj
decoded_messages = []
for message in messages:
try:
decoded_message = db.decode_message(message.arbitration_id, message.data)
# SG_ (signal) is not JSON encodable, so we need to convert it to its representation
decoded_message = {
key: value.name
if isinstance(value, namedsignalvalue.NamedSignalValue)
else value
for key, value in decoded_message.items()
}
decoded_message["timestamp"] = message.timestamp
decoded_messages.append(decoded_message)
except Exception as err:
logging.error(f"Error decoding message: {err}")
if ignore_undefined_id_error:
continue
raise Exception(f"Error decoding message: {err}")
return DataFrame(df=pd.DataFrame(decoded_messages))
Example App
This application demonstrates how to use multiple CAN blocks to connect to a PEAK-USB device and read messages from it. The PEAK-USB device is a USB-to-CAN interface that enables you to connect a computer to a CAN network. This device is used in this case to capture the messages of a particular sensor by filtering them directly at the kernel level, thus reducing the load on the CPU, and save those messages to a log file locally.
Once the app is done, the generated logs are exported to an S3 bucket to keep a record of the sensor’s data for further analysis.
To replicate this application, you must connect the PEAK-USB to your computer and install the required driver (refer to the PEAK_CONNECT
blocks for more information on the necessary driver for your platform). Then, simply specify the PEAK-USB channel in the required blocks, and this Flojoy application will log the messages received by the device!
Detecting channels
A valuable block is the PEAK_DETECT_AVAILABLE_DEVICE
. This block allows you to display the devices using the PCAN interface that are connected to your computer.