API
Connection
- class icotronic.can.Connection
Basic class to initialize CAN communication
To actually connect to the CAN bus you need to use the async context manager, provided by this class. If you want to manage the connection yourself, please just use
__aenter__and__aexit__manually.Examples
Create a new connection (without connecting to the CAN bus)
>>> connection = Connection()
- async __aenter__()
Connect to the STU
- Return type:
- Returns:
An object that can be used to communicate with the STU
- Raises:
CANInitError – if the CAN initialization fails
Examples
Import required library code
>>> from asyncio import run
Use a context manager to handle the cleanup process automatically
>>> async def connect_can_context(): ... async with Connection() as stu: ... pass >>> run(connect_can_context())
Create and shutdown the connection explicitly
>>> async def connect_can_manual(): ... connection = Connection() ... connected = await connection.__aenter__() ... await connection.__aexit__(None, None, None) >>> run(connect_can_manual())
Nodes
- class icotronic.can.STU(spu)
Communicate and control a connected STU
- Parameters:
spu (
SPU) – The SPU object that created this STU instance
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Create an STU object
>>> async def create_stu(): ... async with Connection() as stu: ... pass # call some coroutines of `stu` object >>> run(create_stu())
- async activate_bluetooth()
Activate Bluetooth on the STU
- Return type:
None
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Activate Bluetooth on the STU
>>> async def activate(): ... async with Connection() as stu: ... await stu.activate_bluetooth() >>> run(activate())
- async collect_sensor_nodes(timeout=5)
Collect available sensor nodes
This coroutine collects sensor nodes until either
no new sensor node was found or
until the given timeout, if no sensor node was found.
- Parameters:
timeout – The timeout in seconds until this coroutine returns, if no sensor node was found at all
- Return type:
list[SensorNodeInfo]- Returns:
A list of found sensor nodes
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Collect available sensor nodes
>>> async def collect_sensor_nodes(): ... async with Connection() as stu: ... return await stu.collect_sensor_nodes()
>>> # We assume that at least one sensor node is available >>> nodes = run(collect_sensor_nodes()) >>> len(nodes) >= 1 True
- connect_sensor_node(identifier, sensor_node_class=<class 'icotronic.can.node.sensor.SensorNode'>)
Connect to a sensor node (e.g. SHA, SMH or STH)
- Parameters:
identifier (
int|str|EUI) –The
MAC address (EUI),
name (str), or
node number (int)
of the sensor node we want to connect to
sensor_node_class (
type[SensorNode]) – Sensor node subclass that should be returned by context manager
- Return type:
AsyncSensorNodeManager- Returns:
A context manager that returns a sensor node object for the connected node
- Raises:
ValueError – If you use an invalid name or node number as identifier
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Connect to the sensor node with node number
0>>> async def connect_sensor_node(): ... async with Connection() as stu: ... async with stu.connect_sensor_node(0): ... connected = await stu.is_connected() ... after = await stu.is_connected() ... return (connected, after) >>> run(connect_sensor_node()) (True, False)
- async connect_with_mac_address(mac_address)
Connect to a Bluetooth sensor node using its MAC address
- Parameters:
mac_address (
EUI) – The MAC address of the sensor node- Return type:
None
Examples
Import required library code
>>> from asyncio import run, sleep >>> from icotronic.can.connection import Connection
Connect to a sensor node via its MAC address
>>> async def get_bluetooth_mac(): ... async with Connection() as stu: ... await stu.activate_bluetooth() ... # Wait for Bluetooth activation to take place ... await sleep(2) ... return await stu.get_mac_address(0) >>> mac_address = run(get_bluetooth_mac()) >>> mac_address != EUI(0) True
>>> async def connect(mac_address): ... async with Connection() as stu: ... await stu.deactivate_bluetooth() ... # We assume that at least one STH is available ... connected = before = await stu.is_connected() ... await stu.activate_bluetooth() ... while not connected: ... await stu.connect_with_mac_address(mac_address) ... await sleep(0.1) ... connected = await stu.is_connected() ... await stu.deactivate_bluetooth() ... after = await stu.is_connected() ... # Return status of Bluetooth node connect response ... return before, connected, after >>> run(connect(mac_address)) (False, True, False)
- async connect_with_number(sensor_node_number=0)
Connect to a Bluetooth node using a node number
- Parameters:
sensor_node_number (
int) – The number of the Bluetooth node (0 up to the number of available nodes - 1)- Return type:
bool- Returns:
True, if
in search mode,
at least single node was found,
no legacy mode,
and scanning mode active
False, otherwise
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Connect to node “0”
>>> async def connect_bluetooth_sensor_node_number(): ... async with Connection() as stu: ... await stu.activate_bluetooth() ... # We assume that at least one STH is available ... connected = before = await stu.is_connected() ... while not connected: ... connected = await stu.connect_with_number(0) ... await stu.deactivate_bluetooth() ... after = await stu.is_connected() ... # Return status of Bluetooth node connect response ... return before, connected, after >>> run(connect_bluetooth_sensor_node_number()) (False, True, False)
- async deactivate_bluetooth()
Deactivate Bluetooth on the STU
- Return type:
None
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Deactivate Bluetooth on STU 1
>>> async def deactivate_bluetooth(): ... async with Connection() as stu: ... await stu.deactivate_bluetooth() >>> run(deactivate_bluetooth())
- async get_available_nodes()
Retrieve the number of available sensor nodes
- Return type:
int- Returns:
The number of available sensor nodes
Examples
Import required library code
>>> from asyncio import run, sleep >>> from icotronic.can.connection import Connection
Get the number of available Bluetooth nodes at STU 1
>>> async def get_number_bluetooth_nodes(): ... async with Connection() as stu: ... await stu.activate_bluetooth() ... ... # We assume at least one STH is available ... number_sths = 0 ... while number_sths <= 0: ... number_sths = await stu.get_available_nodes() ... await sleep(0.1) ... ... return number_sths >>> run(get_number_bluetooth_nodes()) >= 0 1
- async get_mac_address(sensor_node_number=255)
Retrieve the MAC address of the STU or a sensor node
Note
Bluetooth needs to be activated before calling this coroutine, otherwise an incorrect MAC address will be returned (for sensor nodes).
- Parameters:
sensor_node_number (
int) – The node number of the Bluetooth node (0 up to the number of available nodes - 1) or0x00(SENSOR_NODE_NUMBER_SELF_ADDRESSING) to retrieve the MAC address of the STU itself- Return type:
EUI- Returns:
The MAC address of the specified sensor node
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Retrieve the MAC address of STH 1
>>> async def get_bluetooth_mac(): ... async with Connection() as stu: ... await stu.activate_bluetooth() ... return await stu.get_mac_address(0) >>> mac_address = run(get_bluetooth_mac()) >>> isinstance(mac_address, EUI) True >>> mac_address != EUI(0) True
- async get_name(sensor_node_number=255)
Retrieve the name of a sensor node
- Parameters:
sensor_node_number (
int) – The number of the Bluetooth node (0 up to the number of available nodes - 1); Use the special node numberSENSOR_NODE_NUMBER_SELF_ADDRESSINGto retrieve the name of the STU itself.
Note
You are probably only interested in the name of the STU itself (
SENSOR_NODE_NUMBER_SELF_ADDRESSING), if you want to know the advertisement name of the STU in OTA (Over The Air) update mode for flashing a new firmware onto the STU.- Return type:
str- Returns:
The (Bluetooth broadcast) name of the node
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Get Bluetooth advertisement name of node “0” from STU 1
>>> async def get_bluetooth_node_name(): ... async with Connection() as stu: ... await stu.activate_bluetooth() ... # We assume that at least one STH is available ... return await stu.get_name(0) >>> name = run(get_bluetooth_node_name()) >>> isinstance(name, str) True >>> 0 <= len(name) <= 8 True
- async get_rssi(sensor_node_number)
Retrieve the RSSI (Received Signal Strength Indication) of an STH
- Parameters:
sensor_node_number (
int) – The number of the Bluetooth node (0 up to the number of available nodes)- Returns:
The RSSI of the node
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Retrieve the RSSI of a disconnected STH
>>> async def get_bluetooth_rssi(): ... async with Connection() as stu: ... await stu.activate_bluetooth() ... # We assume that at least one STH is available ... # Get the RSSI of node “0” ... return await stu.get_rssi(0) >>> rssi = run(get_bluetooth_rssi()) >>> -80 < rssi < 0 True
- async get_sensor_nodes()
Retrieve a list of available sensor nodes
- Return type:
list[SensorNodeInfo]- Returns:
A list of available nodes including node number, name, MAC address and RSSI for each node
Examples
Import required library code
>>> from asyncio import run, sleep >>> from netaddr import EUI >>> from icotronic.can.connection import Connection
Retrieve the list of Bluetooth nodes at STU 1
>>> async def get_sensor_nodes(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... nodes = [] ... while not nodes: ... nodes = await stu.get_sensor_nodes() ... await sleep(0.1) ... ... return nodes >>> nodes = run(get_sensor_nodes()) >>> len(nodes) >= 1 True >>> node = nodes[0]
>>> node.sensor_node_number 0
>>> isinstance(node.name, str) True >>> 0 <= len(node.name) <= 8 True
>>> -80 < node.rssi < 0 True
>>> isinstance(node.mac_address, EUI) True
- async is_connected()
Check if the STU is connected to a Bluetooth node
Returns:
True, if a Bluetooth node is connected to the node
False, otherwise
Examples
>>> from asyncio import run, sleep >>> from icotronic.can.connection import Connection
Check connection of node “0” to STU
>>> async def check_bluetooth_connection(): ... async with Connection() as stu: ... await stu.activate_bluetooth() ... await sleep(0.1) ... connected_start = await stu.is_connected() ... ... # We assume that at least one STH is available ... await stu.connect_with_number(0) ... # Wait for node connection ... connected_between = False ... while not connected_between: ... connected_between = await stu.is_connected() ... await sleep(0.1) ... await stu.connect_with_number(0) ... ... # Deactivate Bluetooth connection ... await stu.deactivate_bluetooth() ... # Wait until node is disconnected ... await sleep(0.1) ... connected_after = await stu.is_connected() ... ... return (connected_start, connected_between, ... connected_after) >>> run(check_bluetooth_connection()) (False, True, False)
- Return type:
bool
- class icotronic.can.SensorNode(spu, eeprom=<class 'icotronic.can.node.eeprom.sensor.SensorNodeEEPROM'>)
Communicate and control a connected sensor node (SHA, STH, SMH)
- Parameters:
spu (
SPU) – The SPU object used to connect to this sensor nodeeeprom (
type[SensorNodeEEPROM]) – The EEPROM class of the node
- async get_adc_configuration()
Read the current ADC configuration
- Return type:
- Returns:
The ADC configuration of the sensor node
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Read ADC sensor config of sensor node with node id 0
>>> async def read_adc_config(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0) as sensor_node: ... return await sensor_node.get_adc_configuration() >>> run(read_adc_config()) Get, Prescaler: 2, Acquisition Time: 8, Oversampling Rate: 64, Reference Voltage: 3.3 V
- async get_energy_mode_lowest()
Read the reduced lowest energy mode (mode 2) time values
See also:
- Return type:
Times- Returns:
A tuple containing the advertisement time in the lowest energy mode in milliseconds and the time until the node will switch from the reduced energy mode (mode 1) to the lowest energy mode (mode 2) – if there is no activity – in milliseconds
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Retrieve the reduced energy time values of a sensor node
>>> async def read_energy_mode_lowest(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0) as sensor_node: ... return await sensor_node.get_energy_mode_lowest() >>> times = run(read_energy_mode_lowest()) >>> round(times.advertisement) 2500 >>> times.sleep 259200000
- async get_energy_mode_reduced()
Read the reduced energy mode (mode 1) sensor node time values
See also:
- Return type:
Times- Returns:
A tuple containing the advertisement time in the reduced energy mode in milliseconds and the time until the node will switch from the disconnected state to the low energy mode (mode 1) – if there is no activity – in milliseconds
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Retrieve the reduced energy time values of a sensor node
>>> async def read_energy_mode_reduced(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0) as sensor_node: ... return await sensor_node.get_energy_mode_reduced() >>> times = run(read_energy_mode_reduced()) >>> round(times.advertisement) 1250 >>> times.sleep 300000
- async get_mac_address()
Retrieve the MAC address of the sensor node
- Return type:
EUI- Returns:
The MAC address of the specified sensor node
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Retrieve the MAC address of STH 1
>>> async def get_bluetooth_mac(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0) as sensor_node: ... return await sensor_node.get_mac_address() >>> mac_address = run(get_bluetooth_mac()) >>> isinstance(mac_address, EUI) True >>> mac_address != EUI(0) True
- async get_name()
Retrieve the name of the sensor node
- Return type:
str- Returns:
The (Bluetooth broadcast) name of the node
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Get Bluetooth advertisement name of node “0”
>>> async def get_sensor_node_name(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0) as sensor_node: ... return await sensor_node.get_name() >>> name = run(get_sensor_node_name()) >>> isinstance(name, str) True >>> 0 <= len(name) <= 8 True
- async get_rssi()
Retrieve the RSSI (Received Signal Strength Indication) of the node
- Return type:
int- Returns:
The RSSI of the node
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Get RSSI of node “0”
>>> async def get_sensor_node_rssi(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0) as sensor_node: ... return await sensor_node.get_rssi() >>> rssi = run(get_sensor_node_rssi()) >>> -70 < rssi < 0 True
- async get_sensor_configuration()
Read the current sensor configuration
- Raises:
UnsupportedFeatureException – in case the sensor node replies with an error message
- Return type:
- Returns:
The sensor number for the different axes
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Reading sensor config from node without sensor config support fails
>>> async def read_sensor_config(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0) as sensor_node: ... return await sensor_node.get_sensor_configuration() >>> config = run( ... read_sensor_config()) Traceback (most recent call last): ... UnsupportedFeatureException: Reading sensor configuration is not supported
- async get_streaming_data_single(channels=Channel 1 enabled, Channel 2 enabled, Channel 3 enabled)
Read a single set of raw ADC values from the sensor node
- Parameters:
channels – Specifies which of the three measurement channels should be enabled or disabled
- Return type:
- Returns:
The latest three ADC values measured by the sensor node
Examples
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Read a single value from all three measurement channels
>>> async def read_sensor_values(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0) as sensor_node: ... return (await ... sensor_node.get_streaming_data_single()) >>> data = run(read_sensor_values()) >>> len(data.values) 3 >>> all([0 <= value <= 0xffff for value in data.values]) True
- async get_supply_voltage()
Read the current supply voltage
- Return type:
float- Returns:
The supply voltage of the sensor node
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Read the supply voltage of the sensor node with node number 0
>>> async def get_supply_voltage(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0) as sensor_node: ... return await sensor_node.get_supply_voltage() >>> supply_voltage = run(get_supply_voltage()) >>> 3 <= supply_voltage <= 4.2 True
- open_data_stream(channels, timeout=5)
Open measurement data stream
- Parameters:
channels (
StreamingConfiguration) – Specifies which measurement channels should be enabledtimeout (
float) – The amount of seconds between two consecutive messages, before a TimeoutError will be raised
- Return type:
DataStreamContextManager- Returns:
A context manager object for managing stream data
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Read data of first and third channel
>>> async def read_streaming_data(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0) as sensor_node: ... channels = StreamingConfiguration(first=True, ... third=True) ... async with sensor_node.open_data_stream( ... channels) as stream: ... first = [] ... third = [] ... messages = 0 ... async for data, _ in stream: ... first.append(data.values[0]) ... third.append(data.values[1]) ... messages += 1 ... if messages >= 3: ... break ... return first, third >>> first, third = run(read_streaming_data()) >>> len(first) 3 >>> len(third) 3
- async set_adc_configuration(reference_voltage=3.3, prescaler=2, acquisition_time=8, oversampling_rate=64)
Change the ADC configuration of a connected sensor node
- Parameters:
reference_voltage (
float) – The ADC reference voltage in Volt (1.25, 1.65, 1.8, 2.1, 2.2, 2.5, 2.7, 3.3, 5, 6.6)prescaler (
int) – The ADC prescaler value (1 – 127)acquisition_time (
int) – The ADC acquisition time in number of cycles (1, 2, 3, 4, 8, 16, 32, … , 256)oversampling_rate (
int) – The ADC oversampling rate (1, 2, 4, 8, … , 4096)
- Return type:
None
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Read and write ADC sensor config
>>> async def write_read_adc_config(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0) as sensor_node: ... await sensor_node.set_adc_configuration( ... 3.3, 8, 8, 64) ... modified_config1 = (await ... sensor_node.get_adc_configuration()) ... ... adc_config = ADCConfiguration( ... reference_voltage=5.0, ... prescaler=16, ... acquisition_time=8, ... oversampling_rate=128) ... await sensor_node.set_adc_configuration( ... **adc_config) ... modified_config2 = (await ... sensor_node.get_adc_configuration()) ... ... # Write back default config values ... await sensor_node.set_adc_configuration( ... 3.3, 2, 8, 64) ... return modified_config1, modified_config2 >>> config1, config2 = run(write_read_adc_config()) >>> config1 Get, Prescaler: 8, Acquisition Time: 8, Oversampling Rate: 64, Reference Voltage: 3.3 V >>> config2 Get, Prescaler: 16, Acquisition Time: 8, Oversampling Rate: 128, Reference Voltage: 5.0 V
- async set_energy_mode_lowest(times=None)
Writes the time values for the lowest energy mode (mode 2)
See also:
- Parameters:
times (
Times|None) –The values for the advertisement time in the reduced energy mode in milliseconds and the time until the node will go into the lowest energy mode (mode 2) from the reduced energy mode (mode 1) – if there is no activity – in milliseconds.
If you do not specify these values then the default values from the configuration will be used
- Return type:
None
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Read and write the reduced energy time values of a sensor node
>>> async def read_write_energy_mode_lowest(sleep, advertisement): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0) as sensor_node: ... await sensor_node.set_energy_mode_lowest( ... Times(sleep=sleep, ... advertisement=advertisement)) ... times = await sensor_node.get_energy_mode_lowest() ... ... # Overwrite changed values with default config ... # values ... await sensor_node.set_energy_mode_lowest() ... ... return times >>> times = run(read_write_energy_mode_lowest(200_000, 2000)) >>> times.sleep 200000 >>> round(times.advertisement) 2000
- async set_energy_mode_reduced(times=None)
Writes the time values for the reduced energy mode (mode 1)
See also:
- Parameters:
times (
Times|None) –The values for the advertisement time in the reduced energy mode in milliseconds and the time until the node will go into the low energy mode (mode 1) from the disconnected state – if there is no activity – in milliseconds.
If you do not specify these values then the default values from the configuration will be used
- Return type:
None
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Read and write the reduced energy time values of a sensor node
>>> async def read_write_energy_mode_reduced(sleep, advertisement): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0) as sensor_node: ... await sensor_node.set_energy_mode_reduced( ... Times(sleep=sleep, ... advertisement=advertisement)) ... times = await sensor_node.get_energy_mode_reduced() ... ... # Overwrite changed values with default config ... # values ... await sensor_node.set_energy_mode_reduced() ... ... return times >>> times = run(read_write_energy_mode_reduced(200_000, 2000)) >>> times.sleep 200000 >>> round(times.advertisement) 2000
- async set_name(name)
Set the name of a sensor node
- Parameters:
name (
str) – The new name for the node- Return type:
None
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Change the name of a sensor node
>>> async def test_naming(name): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... # and that this node currently does not have the name ... # specified in the variable `name`. ... async with stu.connect_sensor_node(0) as sensor_node: ... before = await sensor_node.get_name() ... await sensor_node.set_name(name) ... updated = await sensor_node.get_name() ... await sensor_node.set_name(before) ... after = await sensor_node.get_name() ... return before, updated, after >>> before, updated, after = run(test_naming("Hello")) >>> before != "Hello" True >>> updated 'Hello' >>> before == after True
- async set_sensor_configuration(sensors)
Change the sensor numbers for the different measurement channels
If you use the sensor number 0 for one of the different measurement channels, then the sensor (number) for that channel will stay the same.
- Parameters:
sensors (
SensorConfiguration) – The sensor numbers of the different measurement channels- Return type:
None
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Setting sensor config from node without sensor config support fails
>>> async def set_sensor_config(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0) as sensor_node: ... await sensor_node.set_sensor_configuration( ... SensorConfiguration( ... first=0, second=0, third=0)) >>> config = run( ... set_sensor_config()) Traceback (most recent call last): ... UnsupportedFeatureException: Writing sensor configuration is not supported
- async start_streaming_data(channels)
Start streaming data
- Parameters:
channels (
StreamingConfiguration) – Specifies which of the three measurement channels should be enabled or disabled- Return type:
None
The CAN identifier that this coroutine returns can be used to filter CAN messages that contain the expected streaming data
- async stop_streaming_data(retries=10, ignore_errors=False)
Stop streaming data
- Parameters:
retries (
int) – The number of times the message is sent again, if no response was sent back in a certain amount of timeignore_errors – Specifies, if this coroutine should ignore, if there were any problems while stopping the stream.
- Return type:
None
- class icotronic.can.STH(spu)
Communicate and control a connected SHA or STH
- Parameters:
spu (
SPU) – The SPU object used to communicate with the node
- async activate_acceleration_self_test(dimension='x')
Activate self test of STH accelerometer
- Parameters:
dimension (
str) – The dimension (x,yorz) for which the self test should be activated.- Return type:
None
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Activate and deactivate acceleration self-test
>>> async def test_self_test(): ... async with Connection() as stu: ... # We assume that at least one sensor node is ... # available ... async with stu.connect_sensor_node(0, STH) as sth: ... await sth.activate_acceleration_self_test() ... await sth.deactivate_acceleration_self_test() >>> run(test_self_test())
- async deactivate_acceleration_self_test(dimension='x')
Deactivate self test of STH accelerometer
- Parameters:
dimension (
str) – The dimension (x,yorz) for which the self test should be deactivated.- Return type:
None
- async get_acceleration_conversion_function()
Retrieve function to convert raw sensor data into g
- Return type:
Callable[[float],float]- Returns:
A function that converts 16 bit raw values from the STH into multiples of earth’s gravitation (g)
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection >>> from icotronic.can.node.sth import STH
Convert a raw ADC value into multiples of g
>>> async def read_sensor_values(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0, STH) as sth: ... convert_to_g = (await ... sth.get_acceleration_conversion_function()) ... data = await sth.get_streaming_data_single() ... before = list(data.values) ... data.apply(convert_to_g) ... return before, data.values >>> before, after = run(read_sensor_values()) >>> all([0 <= value <= 2**16 for value in before]) True >>> all([-100 <= value <= 100 for value in after]) True
- async get_acceleration_sensor_range_in_g()
Retrieve the maximum acceleration sensor range in multiples of g₀
For a ±100 g₀ sensor this method returns 200 (100 + abs(-100)).
For a ±50 g₀ sensor this method returns 100 (50 + abs(-50)).
For this to work correctly the EEPROM value of the x-axis acceleration offset in the EEPROM has to be set.
- Return type:
int- Returns:
Range of current acceleration sensor in multiples of earth’s gravitation
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection >>> from icotronic.can.node.sth import STH
Write and read the acceleration offset of STH 1
>>> async def read_sensor_range(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0, STH) as sth: ... return (await ... sth.get_acceleration_sensor_range_in_g()) >>> sensor_range = run(read_sensor_range()) >>> 0 < sensor_range <= 200 True
- async get_acceleration_voltage(dimension='x', reference_voltage=3.3)
Retrieve the current acceleration voltage in Volt
- Parameters:
dimension (
str) – The dimension (x=1, y=2, z=3) for which the acceleration voltage should be measuredreference_voltage (
float) – The reference voltage for the ADC in Volt
- Return type:
float- Returns:
The voltage of the acceleration sensor in Volt
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Read the acceleration voltage of STH 1
>>> async def read_acceleration_voltage(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0, STH) as sth: ... before = await sth.get_acceleration_voltage() ... await sth.activate_acceleration_self_test() ... between = await sth.get_acceleration_voltage() ... await sth.deactivate_acceleration_self_test() ... after = await sth.get_acceleration_voltage() ... return (before, between, after) >>> before, between, after = run(read_acceleration_voltage()) >>> before < between and after < between True
Streaming
- class icotronic.can.streaming.AsyncStreamBuffer(timeout, max_buffer_size)
Buffer for streaming data
- Parameters:
timeout (
float) – The amount of seconds between two consecutive messages, before aStreamingTimeoutErrorwill be raisedmax_buffer_size (
int) – Maximum amount of buffered messages kept by the stream buffer. If this amount is exceeded, then this listener will raise aStreamingBufferError. A large buffer indicates that the application is not able to keep up with the current rate of retrieved messages and therefore the probability of losing messages is quite high.
- dataloss()
Calculate the overall amount of data loss
- Return type:
float- Returns:
The overall amount of data loss as number between 0 (no data loss) and 1 (all data lost).
- reset_stats()
Reset the message statistics
This method resets the amount of lost an retrieved messages used in the calculation of the method
dataloss. Using this method can be useful, if you want to calculate the amount of data loss since a specific starting point.- Return type:
None
- class icotronic.can.streaming.StreamingConfiguration(first=True, second=False, third=False)
Streaming configuration
- Parameters:
first (
bool) – Specifies if the first channel is enabled or notsecond (
bool) – Specifies if the second channel is enabled or notthird (
bool) – Specifies if the third channel is enabled or not
- Raises:
ValueError – if none of the channels is active
Examples
Create some example streaming configurations
>>> config = StreamingConfiguration() >>> config = StreamingConfiguration( ... first=False, second=True, third=True)
Creating streaming configurations without active channels will fail
>>> config = StreamingConfiguration(first=False) Traceback (most recent call last): ... ValueError: At least one channel needs to be active
>>> config = StreamingConfiguration( ... first=False, second=False, third=False) Traceback (most recent call last): ... ValueError: At least one channel needs to be active
- axes()
Get the activated axes returned by this streaming configuration
- Return type:
list[str]- Returns:
A list containing all activated axes in alphabetical order
Examples
Get the activated axes for example streaming configurations
>>> StreamingConfiguration( ... first=False, second=True, third=True).axes() ['y', 'z'] >>> StreamingConfiguration( ... first=True, second=True, third=False).axes() ['x', 'y']
- data_length()
Returns the streaming data length
This will be either:
2 (when 2 channels are active), or
3 (when 1 or 3 channels are active)
For more information, please take a look here.
- Return type:
int- Returns:
The length of the streaming data resulting from this channel configuration
Examples
Get the data length of example streaming configurations
>>> StreamingConfiguration().data_length() 3
>>> StreamingConfiguration( ... first=False, second=True, third=False).data_length() 3
>>> StreamingConfiguration( ... first=True, second=True, third=True).data_length() 3
>>> StreamingConfiguration( ... first=False, second=True, third=True).data_length() 2
- enabled_channels()
Get the number of activated channels
- Return type:
int- Returns:
The number of enabled channels
Examples
Get the number of enabled channels for example streaming configs
>>> StreamingConfiguration(first=True).enabled_channels() 1
>>> StreamingConfiguration(first=False, second=True, third=False ... ).enabled_channels() 1
>>> StreamingConfiguration(first=True, second=True, third=True ... ).enabled_channels() 3
- property first: bool
Check the activation state of the first channel
Returns:
True, if the first channel is enabled orFalseotherwiseExamples
Check channel one activation status for example configs
>>> StreamingConfiguration(first=True, second=False, ... third=False).first True >>> StreamingConfiguration(first=False, second=False, ... third=True).first False
- property second: bool
Check the activation state of the second channel
- Returns:
True, if the second channel is enabled orFalseotherwise
Examples
Check channel two activation status for example configs
>>> StreamingConfiguration( ... first=True, second=False, third=False).second False >>> StreamingConfiguration( ... first=False, second=True, third=True).second True
- property third: bool
Check the activation state of the third channel
Returns:
True, if the third channel is enabled orFalseotherwiseExamples
Check channel three activation status for example configs
>>> StreamingConfiguration( ... first=True, second=False, third=False).third False >>> StreamingConfiguration( ... first=False, second=False, third=True).third True
- class icotronic.can.streaming.StreamingData(counter, timestamp, values)
Support for storing data of a streaming message
- Parameters:
counter (
int) – The message counter valuetimestamp (
float) – The message timestampvalues (
list[float]) – The streaming values
Examples
Create new streaming data
>>> StreamingData(values=[1, 2, 3], counter=21, timestamp=1) [1, 2, 3]@1 #21
Streaming data must store either two or three values
>>> StreamingData(values=[1], counter=21, timestamp=1) Traceback (most recent call last): ... ValueError: Incorrect number of streaming values: 1 (instead of 2 or 3)
>>> StreamingData(values=[1, 2, 3, 4], counter=21, timestamp=1 ... ) Traceback (most recent call last): ... ValueError: Incorrect number of ... values: 4 (instead of 2 or 3)
- apply(function)
Apply a certain function to the streaming data
Note
This function changes the stored values in the streaming data and (as convenience feature) also returns the modified streaming data itself. This is useful if you want to use the modified streaming as parameter in a function call, i.e. you can use something like
function(stream_data.apply()).- Parameters:
function (
Callable[[float],float]) – The function that should be applied to the streaming data- Return type:
- Returns:
The modified streaming data
Examples
Add the constant 10 to some example streaming data
>>> data = StreamingData(values=[1, 2, 3], counter=21, timestamp=1) >>> data.apply(lambda value: value + 10) [11, 12, 13]@1 #21 >>> data.values [11, 12, 13]
Measurement
Data
- class icotronic.measurement.MeasurementData(configuration)
Measurement data
- Parameters:
configuration (
StreamingConfiguration) – The streaming configuration that was used to collect the measurement data
- append(data)
Append some streaming data to the measurement
- Parameters:
data (
StreamingData) – The streaming data that should be added to the measurement- Return type:
None
Examples
Append some streaming data to a measurement
>>> config = StreamingConfiguration(first=True, second=True, ... third=True) >>> data = MeasurementData(config) >>> data Channel 1 enabled, Channel 2 enabled, Channel 3 enabled
>>> s1 = StreamingData(values=[4, 5, 3], counter=15, ... timestamp=1756197008.776551) >>> data.append(s1) >>> data Channel 1 enabled, Channel 2 enabled, Channel 3 enabled [4, 5, 3]@1756197008.776551 #15
- apply(conversion)
Apply functions to the values stored in the measurement
- Parameters:
conversion (
Conversion) – The conversion functions that will be applied to the measurement- Return type:
- Returns:
The measurement data itself, after the conversion was applied
Examples
Apply functions to some measurement data with one active channel
>>> config = StreamingConfiguration(first=False, second=True, ... third=False) >>> data = MeasurementData(config) >>> s1 = StreamingData(values=[1, 20, 81], counter=22, ... timestamp=1756125747.528234) >>> s2 = StreamingData(values=[50, 1, 29], counter=25, ... timestamp=1756125747.528254) >>> data.append(s1) >>> data.append(s2)
>>> plus_two = (lambda value: value + 2) >>> conversion = Conversion(second=plus_two)
>>> data Channel 1 disabled, Channel 2 enabled, Channel 3 disabled [1, 20, 81]@1756125747.528234 #22 [50, 1, 29]@1756125747.528254 #25 >>> data.apply(conversion) Channel 1 disabled, Channel 2 enabled, Channel 3 disabled [3, 22, 83]@1756125747.528234 #22 [52, 3, 31]@1756125747.528254 #25
Apply functions to some measurement data with two active channels
>>> config = StreamingConfiguration(first=True, second=False, ... third=True) >>> data = MeasurementData(config) >>> s1 = StreamingData(values=[1, 2], counter=255, ... timestamp=1756125747.528234) >>> s2 = StreamingData(values=[3, 4], counter=0, ... timestamp=1756125747.528237) >>> data.append(s1) >>> data.append(s2)
>>> conversion = Conversion(third=plus_two)
>>> data Channel 1 enabled, Channel 2 disabled, Channel 3 enabled [1, 2]@1756125747.528234 #255 [3, 4]@1756125747.528237 #0
>>> data.apply(conversion) Channel 1 enabled, Channel 2 disabled, Channel 3 enabled [1, 4]@1756125747.528234 #255 [3, 6]@1756125747.528237 #0
Apply functions to some measurement data with three active channels
>>> config = StreamingConfiguration(first=True, second=True, ... third=True) >>> data = MeasurementData(config) >>> s1 = StreamingData(values=[4, 5, 3], counter=15, ... timestamp=1756197008.776551) >>> s2 = StreamingData(values=[8, 10, 6], counter=16, ... timestamp=1756197008.776559) >>> data.append(s1) >>> data.append(s2)
>>> double = (lambda value: value * 2) >>> conversion = Conversion(first=double, third=plus_two)
>>> data Channel 1 enabled, Channel 2 enabled, Channel 3 enabled [4, 5, 3]@1756197008.776551 #15 [8, 10, 6]@1756197008.776559 #16
>>> data.apply(conversion) Channel 1 enabled, Channel 2 enabled, Channel 3 enabled [8, 5, 5]@1756197008.776551 #15 [16, 10, 8]@1756197008.776559 #16
- dataloss()
Get measurement dataloss based on message counters
- Return type:
float- Returns:
The overall amount of dataloss as number between 0 (no data loss) and 1 (all data lost).
- extend(data)
Extend this measurement data with some other measurement data
- Parameters:
data (
MeasurementData) – The measurement data that should be added to this measurement- Return type:
None
Examples
Extend measurement data with other measurement data
>>> config = StreamingConfiguration(first=True, second=False, ... third=True) >>> data1 = MeasurementData(config) >>> s1 = StreamingData(values=[1, 2], counter=255, ... timestamp=1756125747.528234) >>> s2 = StreamingData(values=[3, 4], counter=0, ... timestamp=1756125747.528237) >>> data1.append(s1) >>> data1.append(s2) >>> data1 Channel 1 enabled, Channel 2 disabled, Channel 3 enabled [1, 2]@1756125747.528234 #255 [3, 4]@1756125747.528237 #0
>>> data2 = MeasurementData(config) >>> s3 = StreamingData(values=[10, 20], counter=1, ... timestamp=1756125747.678912) >>> data2.append(s3) >>> data2 Channel 1 enabled, Channel 2 disabled, Channel 3 enabled [10, 20]@1756125747.678912 #1
>>> data1.extend(data2) >>> data1 Channel 1 enabled, Channel 2 disabled, Channel 3 enabled [1, 2]@1756125747.528234 #255 [3, 4]@1756125747.528237 #0 [10, 20]@1756125747.678912 #1
- first()
Get all data of the first measurement channel
- Return type:
- Returns:
Data values for the first measurement channel
Examples
Get first channel data of measurement with two enabled channels
>>> config = StreamingConfiguration(first=True, second=True, ... third=False) >>> data = MeasurementData(config) >>> s1 = StreamingData(values=[1, 2], counter=255, ... timestamp=1756125747.528234) >>> s2 = StreamingData(values=[3, 4], counter=0, ... timestamp=1756125747.528237) >>> data.append(s1) >>> data.append(s2) >>> data.first() 1@1756125747.528234 #255 3@1756125747.528237 #0 >>> data.second() 2@1756125747.528234 #255 4@1756125747.528237 #0 >>> data.third()
Get first channel data of measurement with one enabled channel
>>> config = StreamingConfiguration(first=True, second=False, ... third=False) >>> data = MeasurementData(config) >>> s1 = StreamingData(values=[1, 2, 3], counter=10, ... timestamp=1756126628.820695) >>> s2 = StreamingData(values=[4, 5, 6], counter=20, ... timestamp=1756126628.8207) >>> data.append(s1) >>> data.append(s2) >>> data.first() 1@1756126628.820695 #10 2@1756126628.820695 #10 3@1756126628.820695 #10 4@1756126628.8207 #20 5@1756126628.8207 #20 6@1756126628.8207 #20 >>> data.second() >>> data.third()
- second()
Get all data of the second measurement channel
- Return type:
- Returns:
Data values for the second measurement channel
Examples
Get second channel data of measurement with two enabled channels
>>> config = StreamingConfiguration(first=False, second=True, ... third=True) >>> data = MeasurementData(config) >>> s1 = StreamingData(values=[1, 2], counter=255, ... timestamp=1756125747.528234) >>> s2 = StreamingData(values=[3, 4], counter=0, ... timestamp=1756125747.528237) >>> data.append(s1) >>> data.append(s2) >>> data.first() >>> data.second() 1@1756125747.528234 #255 3@1756125747.528237 #0 >>> data.third() 2@1756125747.528234 #255 4@1756125747.528237 #0
Get second channel data of measurement with one enabled channel
>>> config = StreamingConfiguration(first=False, second=True, ... third=False) >>> data = MeasurementData(config) >>> s1 = StreamingData(values=[1, 2, 3], counter=10, ... timestamp=1756126628.820695) >>> s2 = StreamingData(values=[4, 5, 6], counter=20, ... timestamp=1756126628.8207) >>> data.append(s1) >>> data.append(s2) >>> data.first() >>> data.second() 1@1756126628.820695 #10 2@1756126628.820695 #10 3@1756126628.820695 #10 4@1756126628.8207 #20 5@1756126628.8207 #20 6@1756126628.8207 #20 >>> data.third()
- third()
Get all data of the third measurement channel
- Return type:
- Returns:
Data values for the third measurement channel
Examples
Get third channel data of measurement with two enabled channels
>>> config = StreamingConfiguration(first=True, second=False, ... third=True) >>> data = MeasurementData(config) >>> s1 = StreamingData(values=[1, 2], counter=255, ... timestamp=1756125747.528234) >>> s2 = StreamingData(values=[3, 4], counter=0, ... timestamp=1756125747.528237) >>> data.append(s1) >>> data.append(s2) >>> data.first() 1@1756125747.528234 #255 3@1756125747.528237 #0 >>> data.second() >>> data.third() 2@1756125747.528234 #255 4@1756125747.528237 #0
Get third channel data of measurement with one enabled channel
>>> config = StreamingConfiguration(first=False, second=False, ... third=True) >>> data = MeasurementData(config) >>> s1 = StreamingData(values=[1, 2, 3], counter=10, ... timestamp=1756126628.820695) >>> s2 = StreamingData(values=[4, 5, 6], counter=20, ... timestamp=1756126628.8207) >>> data.append(s1) >>> data.append(s2) >>> data.first() >>> data.second() >>> data.third() 1@1756126628.820695 #10 2@1756126628.820695 #10 3@1756126628.820695 #10 4@1756126628.8207 #20 5@1756126628.8207 #20 6@1756126628.8207 #20
- class icotronic.measurement.ChannelData
Store measurement data for a single channel
- append(data)
Append a value to the channel data
- Parameters:
data (
DataPoint) – The data point that should be added to the channel data- Return type:
None
Examples
Add some data points to a channel data object
>>> data = ChannelData()
>>> t1 = 1756124450.256398 >>> t2 = t1 + 0.000003 >>> data.append(DataPoint(counter=255, timestamp=t1, value=10)) >>> data.append(DataPoint(counter=1, timestamp=t2, value=20)) >>> data 10@1756124450.256398 #255 20@1756124450.256401 #1
- class icotronic.measurement.Conversion(first=None, second=None, third=None)
Conversion functions for measurement data
- Parameters:
first (
Optional[Callable[[float],float]]) – The conversion function for the first channelsecond (
Optional[Callable[[float],float]]) – The conversion function for the second channelthird (
Optional[Callable[[float],float]]) – The conversion function for the third channel
Storage
- class icotronic.measurement.Storage(filepath, channels=None)
Context manager class for storing measurement data in HDF5 format
- Parameters:
filepath (
Path|str) – The filepath of the HDF5 file in which this object should store measurement datachannels (
StreamingConfiguration|None) – All channels for which data should be collected orNone, if the axes data should be taken from an existing valid file atfilepath.
Examples
Create new file
>>> filepath = Path("test.hdf5") >>> with Storage(filepath, ... channels=StreamingConfiguration(first=True) ... ) as storage: ... pass
Opening an non empty file but still providing channel info should fail
>>> with Storage(filepath, ... channels=StreamingConfiguration(first=True) ... ) as storage: ... pass Traceback (most recent call last): ... ValueError: File “...” exist but channels parameter is not None >>> filepath.unlink()
- close()
Close the HDF file
- Return type:
None
- open()
Open and initialize the HDF file for writing
- Return type:
- class icotronic.measurement.StorageData(file_handle, channels=None)
Store HDF acceleration data
- Parameters:
file_handle (
File) – The HDF file that should store the datachannels (
StreamingConfiguration|None) – All channels for which data should be collected orNone, if the axes data should be taken from an existing valid file atfilepath.
Examples
Create new data
>>> filepath = Path("test.hdf5") >>> streaming_data = StreamingData(values=[1, 2, 3], counter=1, ... timestamp=4306978.449) >>> with Storage(filepath, StreamingConfiguration(first=True)) as data: ... data.add_streaming_data(streaming_data)
Read from existing file
>>> with Storage(filepath) as data: ... print(data.dataloss_stats()) (1, 0)
>>> filepath.unlink()
- add_measurement_data(measurement_data)
Add streaming data to the storage object
- Parameters:
measurement_data (
MeasurementData) – The streaming data that should be added to the storage- Return type:
None
Examples
Import required library code
>>> from tempfile import NamedTemporaryFile >>> from icotronic.measurement import MeasurementData
Store measurement data for two enabled channels
>>> two_channels = StreamingConfiguration(first=True, second=False, ... third=True) >>> s1 = StreamingData(values=[10, -10], counter=1, timestamp=1) >>> s2 = StreamingData(values=[20, -20], counter=2, timestamp=2) >>> s3 = StreamingData(values=[30, -30], counter=2, timestamp=2) >>> data = MeasurementData(two_channels) >>> data.append(s1) >>> data.append(s2) >>> data.append(s3)
>>> with NamedTemporaryFile(suffix=".hdf5", ... delete_on_close=False) as temp: ... with Storage(temp.name, two_channels) as storage: ... storage.add_measurement_data(data) ... # Normally the class takes care about when to store ... # back data to the disk itself. We do a manual flush ... # here to check the number of stored items. ... storage.acceleration.flush() ... print(storage.acceleration.nrows) 3
- add_streaming_data(streaming_data)
Add streaming data to the storage object
- Parameters:
streaming_data (
StreamingData) – The streaming data that should be added to the storage- Return type:
None
Examples
Store streaming data for single channel
>>> channel3 = StreamingConfiguration(first=False, second=False, ... third=True) >>> data1 = StreamingData(values=[1, 2, 3], counter=21, ... timestamp=1) >>> data2 = StreamingData(values=[4, 5, 6], counter=22, ... timestamp=2) >>> filepath = Path("test.hdf5") >>> with Storage(filepath, channel3) as storage: ... storage.add_streaming_data(data1) ... storage.add_streaming_data(data2) ... # Normally the class takes care about when to store back ... # data to the disk itself. We do a manual flush here to ... # check the number of stored items. ... storage.acceleration.flush() ... print(storage.acceleration.nrows) 6 >>> filepath.unlink()
Store streaming data for three channels
>>> all = StreamingConfiguration(first=True, second=True, ... third=True) >>> data1 = StreamingData(values=[1, 2, 3], counter=21, ... timestamp=1) >>> data2 = StreamingData(values=[4, 5, 6], counter=22, ... timestamp=2) >>> with Storage(filepath, all) as storage: ... storage.add_streaming_data(data1) ... storage.add_streaming_data(data2) ... storage.acceleration.flush() ... print(storage.acceleration.nrows) 2 >>> filepath.unlink()
- dataloss()
Determine (minimum) data loss
- Return type:
float- Returns:
Amount of lost messages divided by all messages (lost and retrieved)
Examples
Import required library code
>>> from math import isclose
Calculate data loss for an example storage file
>>> def calculate_dataloss(): ... filepath = Path("test.hdf5") ... with Storage(filepath, StreamingConfiguration( ... first=True)) as storage: ... for counter in range(256): ... storage.add_streaming_data( ... StreamingData(values=[1, 2, 3], ... counter=counter, ... timestamp=counter/10)) ... for counter in range(128, 256): ... storage.add_streaming_data( ... StreamingData(values=[4, 5, 6], ... counter=counter, ... timestamp=(255 + counter)/10)) ... ... dataloss = storage.dataloss() ... filepath.unlink() ... return dataloss >>> isclose(0.25, calculate_dataloss(), rel_tol=0.005) True
- dataloss_stats()
Determine number of lost and received messages
- Return type:
tuple[int,int]- Returns:
Tuple containing the number of received and the number of lost messages
Examples
Get data loss statistics for an example file
>>> def calculate_dataloss_stats(): ... filepath = Path("test.hdf5") ... with Storage(filepath,StreamingConfiguration( ... first=True)) as storage: ... for counter in range(256): ... storage.add_streaming_data( ... StreamingData(values=[1, 2, 3], ... counter=counter, ... timestamp=counter/10)) ... for counter in range(128, 256): ... storage.add_streaming_data( ... StreamingData(values=[4, 5, 6], ... counter=counter, ... timestamp=(255 + counter)/10)) ... ... stats = storage.dataloss_stats() ... filepath.unlink() ... return stats >>> retrieved, lost = calculate_dataloss_stats() >>> retrieved 384 >>> lost 128
- measurement_time()
Get the measurement time
Note
This method returns the value of the last timestamp. If there was dataloss at the end of the measurement, then the real measurement time might have been longer.
- Return type:
int- Returns:
The measurement time in microseconds
- sampling_frequency()
Calculate sampling frequency of measurement data
- Return type:
float- Returns:
Sampling frequency (of a single data channel) in Hz
Examples
Import required library code
>>> from math import isclose
Calculate sampling frequency for an example file
>>> def calculate_sampling_frequency(): ... filepath = Path("test.hdf5") ... with Storage(filepath, StreamingConfiguration( ... first=True, second=True, third=True)) as storage: ... storage.add_streaming_data( ... StreamingData(values=[1, 2, 3], ... counter=1, ... timestamp=0)) ... storage.add_streaming_data( ... StreamingData(values=[1, 2, 3], ... counter=2, ... timestamp=1)) ... ... sampling_frequency = storage.sampling_frequency() ... filepath.unlink() ... return sampling_frequency >>> calculate_sampling_frequency() 2.0
- write_sample_rate(adc_configuration)
Store the sample rate of the ADC
- Parameters:
adc_configuration (
ADCConfiguration) – The current ADC configuration of the sensor node- Return type:
None
Examples
Write and read sample rate metadata in example HDF file
>>> filepath = Path("test.hdf5") >>> adc_configuration = ADCConfiguration( ... set=True, ... prescaler=2, ... acquisition_time=16, ... oversampling_rate=256) >>> with Storage(filepath, ... StreamingConfiguration(first=True)) as storage: ... storage.write_sample_rate(adc_configuration) ... sample_rate = storage.acceleration.attrs["Sample_Rate"] ... print(sample_rate) 1724.14 Hz (Prescaler: 2, Acquisition Time: 16, Oversampling Rate: 256) >>> filepath.unlink()
- write_sensor_range(sensor_range_in_g)
Add metadata about sensor range
This method assumes that sensor have a symmetric measurement range (e.g. a sensor with a range of 200 g measures from - 100 g up to + 100 g).
- Parameters:
sensor_range_in_g (
float) – The measurement range of the sensor in multiples of g- Return type:
None
Examples
Add sensor range metadata to example file
>>> filepath = Path("test.hdf5") >>> with Storage(filepath, ... StreamingConfiguration(third=True)) as storage: ... storage.write_sensor_range(200) ... print(storage.acceleration.attrs["Sensor_Range"]) ± 100 g₀ >>> filepath.unlink()
ADC
- class icotronic.can.adc.ADCConfiguration(*data, set=None, prescaler=None, acquisition_time=None, oversampling_rate=None, reference_voltage=None)
Support for reading and writing analog digital converter configuration
- Parameters:
*data (
bytearray|list[int]) – A list containing the (first five) bytes of the ADC configurationset (
bool|None) – Specifies if we want to set or retrieve (get) the ADC configurationprescaler (
int|None) – The ADC prescaler value (1 – 127); default: 2acquisition_time (
int|None) – The acquisition time in number of cycles (1, 2, 3, 4, 8, 16, 32, … , 256); default: 8oversampling_rate (
int|None) – The ADC oversampling rate (1, 2, 4, 8, … , 4096); default: 64reference_voltage (
float|None) – The ADC reference voltage in Volt (1.25, 1.65, 1.8, 2.1, 2.2, 2.5, 2.7, 3.3, 5, 6.6)
Examples
Create simple ADC configuration from scratch
>>> ADCConfiguration(prescaler=2, ... acquisition_time=4, ... oversampling_rate=64) Get, Prescaler: 2, Acquisition Time: 4, Oversampling Rate: 64, Reference Voltage: 3.3 V
- property acquisition_time: int
Get the acquisition time
- Returns:
The acquisition time
Examples
Get initialised acquisition time
>>> config = ADCConfiguration(acquisition_time=2) >>> config.acquisition_time 2
Get acquisition time set via property
>>> config.acquisition_time = 128 >>> config.acquisition_time 128
Trying to set an unsupported acquisition time will fail
>>> config.acquisition_time = 5 Traceback (most recent call last): ... ValueError: Acquisition time of “5” out of range, please use ...
- property oversampling_rate: int
Get the oversampling rate
- Returns:
The oversampling rate
Examples
Get initialised oversampling rate
>>> config = ADCConfiguration(oversampling_rate=128) >>> config.oversampling_rate 128
Get oversampling rate set via property
>>> config.oversampling_rate = 512 >>> config.oversampling_rate 512
Trying to set an unsupported oversampling rate will fail
>>> config.oversampling_rate = 3 Traceback (most recent call last): ... ValueError: Oversampling rate of “3” out of range, please use ...
- property prescaler: int
Get the prescaler value
- Returns:
The prescaler value
Examples
Get initialised prescaler
>>> config = ADCConfiguration(prescaler=127) >>> config.prescaler 127
Get prescaler set via property
>>> config.prescaler = 20 >>> config.prescaler 20
Trying to set an unsupported prescaler will fail
>>> config.prescaler = 128 Traceback (most recent call last): ... ValueError: Prescaler value of “128” out of range, please use ...
- property reference_voltage: float
Get the reference voltage
- Returns:
The reference voltage in Volt
Examples
Set and get different reference voltage values
>>> config = ADCConfiguration(reference_voltage=3.3) >>> config.reference_voltage 3.3 >>> config.reference_voltage = 6.6 >>> config.reference_voltage 6.6
>>> config = ADCConfiguration(reference_voltage=6.6) >>> config.reference_voltage 6.6 >>> config.reference_voltage = 1.8 >>> config.reference_voltage 1.8
>>> config = ADCConfiguration(reference_voltage=1.8) >>> config.reference_voltage 1.8
Trying to set a unsupported reference voltage will fail
>>> config.reference_voltage = 0 Traceback (most recent call last): ... ValueError: Reference voltage of “0” V out of range, please use ...
- sample_rate()
Calculate the sampling rate for the current ADC configuration
- Return type:
float- Returns:
The calculated sample rate
Examples
Get sampling rates based on ADC attribute values
>>> round(ADCConfiguration(prescaler=2, acquisition_time=8, ... oversampling_rate=64).sample_rate()) 9524
>>> round(ADCConfiguration(prescaler=8, acquisition_time=8, ... oversampling_rate=64).sample_rate()) 3175
>>> round(ADCConfiguration(reference_voltage=5.0, ... prescaler=16, ... acquisition_time=8, ... oversampling_rate=128).sample_rate()) 840
Sensor Configuration
- class icotronic.can.SensorConfiguration(first=0, second=0, third=0)
Used to store the configuration of the three sensor channels
- Parameters:
first (
int) – The sensor number for the first measurement channelsecond (
int) – The sensor number for the second measurement channelthird (
int) – The sensor number for the third measurement channel
Examples
Create an example sensor configuration
>>> SensorConfiguration(first=0, second=1, third=2) M1: None, M2: S1, M3: S2
Initializing a sensor configuration with incorrect values will fail
>>> SensorConfiguration(first=256, second=1, third=2) Traceback (most recent call last): ... ValueError: Incorrect value for first channel: “256”
>>> SensorConfiguration(first=0, second=1, third=-1) Traceback (most recent call last): ... ValueError: Incorrect value for third channel: “-1”
- check()
Check that at least one measurement channel is enabled
- Raises:
ValueError – if none of the measurement channels is enabled
Examples
>>> SensorConfiguration(second=1).check() >>> SensorConfiguration().check() Traceback (most recent call last): ... ValueError: At least one measurement channel has to be enabled
- disable_channel(first=False, second=False, third=False)
Disable certain (measurement) channels
- Parameters:
first (
bool) – Specifies if the first measurement channel should be disabled or notsecond (
bool) – Specifies if the second measurement channel should be disabled or notthird (
bool) – Specifies if the third measurement channel should be disabled or not
- Return type:
None
- empty()
Check if the sensor configuration is empty
In an empty sensor configuration all of the channels are disabled.
- Return type:
bool- Returns:
True, if all channels are disabled,Falseotherwise
Examples
Check if some example configurations are empty or not
>>> SensorConfiguration(first=3).empty() False >>> SensorConfiguration().empty() True >>> SensorConfiguration(third=0).empty() True
- property first: int
Get the sensor for the first channel
- Returns:
The sensor number of the first channel
Examples
Get the sensor number for the first channel of a sensor config
>>> SensorConfiguration(first=1, second=3, third=2).first 1
- requires_channel_configuration_support()
Check if the sensor configuration requires channel config support
- Return type:
bool- Returns:
True, if the configuration requires hardware that has support for changing the channel configurationFalse, otherwise
Examples
Check if example sensor configs require channel config support
>>> SensorConfiguration(first=1, second=3, third=2 ... ).requires_channel_configuration_support() True
>>> SensorConfiguration(first=1, second=0, third=1 ... ).requires_channel_configuration_support() True
>>> SensorConfiguration(first=1, second=2, third=3 ... ).requires_channel_configuration_support() False
>>> SensorConfiguration().requires_channel_configuration_support() False
- property second: int
Get the sensor for the second channel
- Returns:
The sensor number of the second channel
Examples
Get the sensor number for the second channel of a sensor config
>>> SensorConfiguration(first=1, second=3, third=2).second 3
- streaming_configuration()
Get a streaming configuration that represents this config
- Return type:
- Returns:
A stream configuration where
every channel that is enabled in the sensor configuration is enabled, and
every channel that is disables in the sensor configuration is disabled.
Examples
Get the streaming configuration for some example channel configs
>>> SensorConfiguration(second=1).streaming_configuration() Channel 1 disabled, Channel 2 enabled, Channel 3 disabled
>>> SensorConfiguration(first=10, third=2 ... ).streaming_configuration() Channel 1 enabled, Channel 2 disabled, Channel 3 enabled
- property third: int
Get the sensor for the third channel
- Returns:
The sensor number of the third channel
Examples
Get the sensor number for the third channel of a sensor config
>>> SensorConfiguration(first=1, second=3, third=2).third 2