Basic class to initialize CAN communication
To actually connect to the CAN bus you need to use the async context
manager, provided by this class. If you want to manage the connection
yourself, please just use __aenter__ and __aexit__ manually.
Examples
Create a new connection (without connecting to the CAN bus)
>>> connection = Connection()
Connect to the STU
An object that can be used to communicate with the STU
CANInitError – if the CAN initialization fails
Examples
Import required library code
>>> from asyncio import run
Use a context manager to handle the cleanup process automatically
>>> async def connect_can_context():
... async with Connection() as stu:
... pass
>>> run(connect_can_context())
Create and shutdown the connection explicitly
>>> async def connect_can_manual():
... connection = Connection()
... connected = await connection.__aenter__()
... await connection.__aexit__(None, None, None)
>>> run(connect_can_manual())
Communicate and control a connected STU
spu (SPU) – The SPU object that created this STU instance
Examples
Import required library code
>>> from asyncio import run
>>> from icotronic.can.connection import Connection
Create an STU object
>>> async def create_stu():
... async with Connection() as stu:
... pass # call some coroutines of `stu` object
>>> run(create_stu())
Activate Bluetooth on the STU
None
Examples
Import required library code
>>> from asyncio import run
>>> from icotronic.can.connection import Connection
Activate Bluetooth on the STU
>>> async def activate():
... async with Connection() as stu:
... await stu.activate_bluetooth()
>>> run(activate())
Connect to a sensor node (e.g. SHA, SMH or STH)
identifier (int | str | EUI) –
The
MAC address (EUI),
name (str), or
node number (int)
of the sensor node we want to connect to
sensor_node_class (type[SensorNode]) – Sensor node subclass that should be returned by context manager
AsyncSensorNodeManager
A context manager that returns a sensor node object for the connected node
Examples
Import required library code
>>> from asyncio import run
>>> from icotronic.can.connection import Connection
Connect to the sensor node with node number 0
>>> async def connect_sensor_node():
... async with Connection() as stu:
... async with stu.connect_sensor_node(0):
... connected = await stu.is_connected()
... after = await stu.is_connected()
... return (connected, after)
>>> run(connect_sensor_node())
(True, False)
Connect to a Bluetooth sensor node using its MAC address
mac_address (EUI) – The MAC address of the sensor node
None
Examples
Import required library code
>>> from asyncio import run, sleep
>>> from icotronic.can.connection import Connection
Connect to a sensor node via its MAC address
>>> async def get_bluetooth_mac():
... async with Connection() as stu:
... await stu.activate_bluetooth()
... # Wait for Bluetooth activation to take place
... await sleep(2)
... return await stu.get_mac_address(0)
>>> mac_address = run(get_bluetooth_mac())
>>> mac_address != EUI(0)
True
>>> async def connect(mac_address):
... async with Connection() as stu:
... await stu.deactivate_bluetooth()
... # We assume that at least one STH is available
... connected = before = await stu.is_connected()
... await stu.activate_bluetooth()
... while not connected:
... await stu.connect_with_mac_address(mac_address)
... await sleep(0.1)
... connected = await stu.is_connected()
... await stu.deactivate_bluetooth()
... after = await stu.is_connected()
... # Return status of Bluetooth node connect response
... return before, connected, after
>>> run(connect(mac_address))
(False, True, False)
Connect to a Bluetooth node using a node number
sensor_node_number (int) – The number of the Bluetooth node (0 up to the number of
available nodes - 1)
bool
True, if
in search mode,
at least single node was found,
no legacy mode,
and scanning mode active
False, otherwise
Examples
Import required library code
>>> from asyncio import run
>>> from icotronic.can.connection import Connection
Connect to node “0”
>>> async def connect_bluetooth_sensor_node_number():
... async with Connection() as stu:
... await stu.activate_bluetooth()
... # We assume that at least one STH is available
... connected = before = await stu.is_connected()
... while not connected:
... connected = await stu.connect_with_number(0)
... await stu.deactivate_bluetooth()
... after = await stu.is_connected()
... # Return status of Bluetooth node connect response
... return before, connected, after
>>> run(connect_bluetooth_sensor_node_number())
(False, True, False)
Deactivate Bluetooth on the STU
None
Examples
Import required library code
>>> from asyncio import run
>>> from icotronic.can.connection import Connection
Deactivate Bluetooth on STU 1
>>> async def deactivate_bluetooth():
... async with Connection() as stu:
... await stu.deactivate_bluetooth()
>>> run(deactivate_bluetooth())
Retrieve the number of available sensor nodes
int
The number of available sensor nodes
Examples
Import required library code
>>> from asyncio import run, sleep
>>> from icotronic.can.connection import Connection
Get the number of available Bluetooth nodes at STU 1
>>> async def get_number_bluetooth_nodes():
... async with Connection() as stu:
... await stu.activate_bluetooth()
...
... # We assume at least one STH is available
... number_sths = 0
... while number_sths <= 0:
... number_sths = await stu.get_available_nodes()
... await sleep(0.1)
...
... return number_sths
>>> run(get_number_bluetooth_nodes()) >= 0
1
Retrieve the MAC address of the STU or a sensor node
Note
Bluetooth needs to be activated before calling this coroutine, otherwise an incorrect MAC address will be returned (for sensor nodes).
sensor_node_number (int) – The node number of the Bluetooth node (0 up to the number of
available nodes - 1) or 0x00
(SENSOR_NODE_NUMBER_SELF_ADDRESSING) to retrieve the MAC
address of the STU itself
EUI
The MAC address of the specified sensor node
Examples
Import required library code
>>> from asyncio import run
>>> from icotronic.can.connection import Connection
Retrieve the MAC address of STH 1
>>> async def get_bluetooth_mac():
... async with Connection() as stu:
... await stu.activate_bluetooth()
... return await stu.get_mac_address(0)
>>> mac_address = run(get_bluetooth_mac())
>>> isinstance(mac_address, EUI)
True
>>> mac_address != EUI(0)
True
Retrieve the name of a sensor node
sensor_node_number (int) – The number of the Bluetooth node (0 up to the number of
available nodes - 1)
str
The (Bluetooth broadcast) name of the node
Examples
Import required library code
>>> from asyncio import run
>>> from icotronic.can.connection import Connection
Get Bluetooth advertisement name of node “0” from STU 1
>>> async def get_bluetooth_node_name():
... async with Connection() as stu:
... await stu.activate_bluetooth()
... # We assume that at least one STH is available
... return await stu.get_name(0)
>>> name = run(get_bluetooth_node_name())
>>> isinstance(name, str)
True
>>> 0 <= len(name) <= 8
True
Retrieve the RSSI (Received Signal Strength Indication) of an STH
sensor_node_number (int) – The number of the Bluetooth node (0 up to the number of
available nodes)
The RSSI of the node
Examples
Import required library code
>>> from asyncio import run
>>> from icotronic.can.connection import Connection
Retrieve the RSSI of a disconnected STH
>>> async def get_bluetooth_rssi():
... async with Connection() as stu:
... await stu.activate_bluetooth()
... # We assume that at least one STH is available
... # Get the RSSI of node “0”
... return await stu.get_rssi(0)
>>> rssi = run(get_bluetooth_rssi())
>>> -80 < rssi < 0
True
Retrieve a list of available sensor nodes
list[SensorNodeInfo]
A list of available nodes including node number, name, MAC address and RSSI for each node
Examples
Import required library code
>>> from asyncio import run, sleep
>>> from netaddr import EUI
>>> from icotronic.can.connection import Connection
Retrieve the list of Bluetooth nodes at STU 1
>>> async def get_sensor_nodes():
... async with Connection() as stu:
... # We assume that at least one sensor node is available
... nodes = []
... while not nodes:
... nodes = await stu.get_sensor_nodes()
... await sleep(0.1)
...
... return nodes
>>> nodes = run(get_sensor_nodes())
>>> len(nodes) >= 1
True
>>> node = nodes[0]
>>> node.sensor_node_number
0
>>> isinstance(node.name, str)
True
>>> 0 <= len(node.name) <= 8
True
>>> -80 < node.rssi < 0
True
>>> isinstance(node.mac_address, EUI)
True
Check if the STU is connected to a Bluetooth node
Returns:
True, if a Bluetooth node is connected to the node
False, otherwise
Examples
>>> from asyncio import run, sleep
>>> from icotronic.can.connection import Connection
Check connection of node “0” to STU
>>> async def check_bluetooth_connection():
... async with Connection() as stu:
... await stu.activate_bluetooth()
... await sleep(0.1)
... connected_start = await stu.is_connected()
...
... # We assume that at least one STH is available
... await stu.connect_with_number(0)
... # Wait for node connection
... connected_between = False
... while not connected_between:
... connected_between = await stu.is_connected()
... await sleep(0.1)
... await stu.connect_with_number(0)
...
... # Deactivate Bluetooth connection
... await stu.deactivate_bluetooth()
... # Wait until node is disconnected
... await sleep(0.1)
... connected_after = await stu.is_connected()
...
... return (connected_start, connected_between,
... connected_after)
>>> run(check_bluetooth_connection())
(False, True, False)
bool
Communicate and control a connected sensor node (SHA, STH, SMH)
spu (SPU) – The SPU object used to connect to this sensor node
eeprom (type[SensorNodeEEPROM]) – The EEPROM class of the node
Read the current ADC configuration
The ADC configuration of the sensor node
Examples
Import required library code
>>> from asyncio import run
>>> from icotronic.can.connection import Connection
Read ADC sensor config of sensor node with node id 0
>>> async def read_adc_config():
... async with Connection() as stu:
... # We assume that at least one sensor node is available
... async with stu.connect_sensor_node(0) as sensor_node:
... return await sensor_node.get_adc_configuration()
>>> run(read_adc_config())
Get, Prescaler: 2, Acquisition Time: 8, Oversampling Rate: 64,
Reference Voltage: 3.3 V
Read the reduced lowest energy mode (mode 2) time values
See also:
Times
A tuple containing the advertisement time in the lowest energy mode in milliseconds and the time until the node will switch from the reduced energy mode (mode 1) to the lowest energy mode (mode 2) – if there is no activity – in milliseconds
Examples
Import required library code
>>> from asyncio import run
>>> from icotronic.can.connection import Connection
Retrieve the reduced energy time values of a sensor node
>>> async def read_energy_mode_lowest():
... async with Connection() as stu:
... # We assume that at least one sensor node is available
... async with stu.connect_sensor_node(0) as sensor_node:
... return await sensor_node.get_energy_mode_lowest()
>>> times = run(read_energy_mode_lowest())
>>> round(times.advertisement)
2500
>>> times.sleep
259200000
Read the reduced energy mode (mode 1) sensor node time values
See also:
Times
A tuple containing the advertisement time in the reduced energy mode in milliseconds and the time until the node will switch from the disconnected state to the low energy mode (mode 1) – if there is no activity – in milliseconds
Examples
Import required library code
>>> from asyncio import run
>>> from icotronic.can.connection import Connection
Retrieve the reduced energy time values of a sensor node
>>> async def read_energy_mode_reduced():
... async with Connection() as stu:
... # We assume that at least one sensor node is available
... async with stu.connect_sensor_node(0) as sensor_node:
... return await sensor_node.get_energy_mode_reduced()
>>> times = run(read_energy_mode_reduced())
>>> round(times.advertisement)
1250
>>> times.sleep
300000
Retrieve the MAC address of the sensor node
EUI
The MAC address of the specified sensor node
Examples
Import required library code
>>> from asyncio import run
>>> from icotronic.can.connection import Connection
Retrieve the MAC address of STH 1
>>> async def get_bluetooth_mac():
... async with Connection() as stu:
... # We assume that at least one sensor node is available
... async with stu.connect_sensor_node(0) as sensor_node:
... return await sensor_node.get_mac_address()
>>> mac_address = run(get_bluetooth_mac())
>>> isinstance(mac_address, EUI)
True
>>> mac_address != EUI(0)
True
Retrieve the name of the sensor node
str
The (Bluetooth broadcast) name of the node
Examples
Import required library code
>>> from asyncio import run
>>> from icotronic.can.connection import Connection
Get Bluetooth advertisement name of node “0”
>>> async def get_sensor_node_name():
... async with Connection() as stu:
... # We assume that at least one sensor node is available
... async with stu.connect_sensor_node(0) as sensor_node:
... return await sensor_node.get_name()
>>> name = run(get_sensor_node_name())
>>> isinstance(name, str)
True
>>> 0 <= len(name) <= 8
True
Retrieve the RSSI (Received Signal Strength Indication) of the node
The RSSI of the node
Examples
Import required library code
>>> from asyncio import run
>>> from icotronic.can.connection import Connection
Get RSSI of node “0”
>>> async def get_sensor_node_rssi():
... async with Connection() as stu:
... # We assume that at least one sensor node is available
... async with stu.connect_sensor_node(0) as sensor_node:
... return await sensor_node.get_rssi()
>>> rssi = run(get_sensor_node_rssi())
>>> -70 < rssi < 0
True
Read the current sensor configuration
UnsupportedFeatureException – in case the sensor node replies with an error message
SensorConfiguration
The sensor number for the different axes
Examples
Import required library code
>>> from asyncio import run
>>> from icotronic.can.connection import Connection
Reading sensor config from node without sensor config support fails
>>> async def read_sensor_config():
... async with Connection() as stu:
... # We assume that at least one sensor node is available
... async with stu.connect_sensor_node(0) as sensor_node:
... return await sensor_node.get_sensor_configuration()
>>> config = run(
... read_sensor_config())
Traceback (most recent call last):
...
UnsupportedFeatureException: Reading sensor configuration is not
supported
Read a single set of raw ADC values from the sensor node
channels – Specifies which of the three measurement channels should be enabled or disabled
The latest three ADC values measured by the sensor node
Examples
>>> from asyncio import run
>>> from icotronic.can.connection import Connection
Read a single value from all three measurement channels
>>> async def read_sensor_values():
... async with Connection() as stu:
... # We assume that at least one sensor node is available
... async with stu.connect_sensor_node(0) as sensor_node:
... return (await
... sensor_node.get_streaming_data_single())
>>> data = run(read_sensor_values())
>>> len(data.values)
3
>>> all([0 <= value <= 0xffff for value in data.values])
True
Read the current supply voltage
float
The supply voltage of the sensor node
Examples
Import required library code
>>> from asyncio import run
>>> from icotronic.can.connection import Connection
Read the supply voltage of the sensor node with node number 0
>>> async def get_supply_voltage():
... async with Connection() as stu:
... # We assume that at least one sensor node is available
... async with stu.connect_sensor_node(0) as sensor_node:
... return await sensor_node.get_supply_voltage()
>>> supply_voltage = run(get_supply_voltage())
>>> 3 <= supply_voltage <= 4.2
True
Open measurement data stream
channels (StreamingConfiguration) – Specifies which measurement channels should be enabled
timeout (float) – The amount of seconds between two consecutive messages, before
a TimeoutError will be raised
DataStreamContextManager
A context manager object for managing stream data
Examples
Import required library code
>>> from asyncio import run
>>> from icotronic.can.connection import Connection
Read data of first and third channel
>>> async def read_streaming_data():
... async with Connection() as stu:
... # We assume that at least one sensor node is available
... async with stu.connect_sensor_node(0) as sensor_node:
... channels = StreamingConfiguration(first=True,
... third=True)
... async with sensor_node.open_data_stream(
... channels) as stream:
... first = []
... third = []
... messages = 0
... async for data, _ in stream:
... first.append(data.values[0])
... third.append(data.values[1])
... messages += 1
... if messages >= 3:
... break
... return first, third
>>> first, third = run(read_streaming_data())
>>> len(first)
3
>>> len(third)
3
Change the ADC configuration of a connected sensor node
reference_voltage (float) – The ADC reference voltage in Volt
(1.25, 1.65, 1.8, 2.1, 2.2, 2.5, 2.7, 3.3, 5, 6.6)
prescaler (int) – The ADC prescaler value (1 – 127)
acquisition_time (int) – The ADC acquisition time in number of cycles
(1, 2, 3, 4, 8, 16, 32, … , 256)
oversampling_rate (int) – The ADC oversampling rate (1, 2, 4, 8, … , 4096)
None
Examples
Import required library code
>>> from asyncio import run
>>> from icotronic.can.connection import Connection
Read and write ADC sensor config
>>> async def write_read_adc_config():
... async with Connection() as stu:
... # We assume that at least one sensor node is available
... async with stu.connect_sensor_node(0) as sensor_node:
... await sensor_node.set_adc_configuration(
... 3.3, 8, 8, 64)
... modified_config1 = (await
... sensor_node.get_adc_configuration())
...
... adc_config = ADCConfiguration(
... reference_voltage=5.0,
... prescaler=16,
... acquisition_time=8,
... oversampling_rate=128)
... await sensor_node.set_adc_configuration(
... **adc_config)
... modified_config2 = (await
... sensor_node.get_adc_configuration())
...
... # Write back default config values
... await sensor_node.set_adc_configuration(
... 3.3, 2, 8, 64)
... return modified_config1, modified_config2
>>> config1, config2 = run(write_read_adc_config())
>>> config1
Get, Prescaler: 8, Acquisition Time: 8, Oversampling Rate: 64,
Reference Voltage: 3.3 V
>>> config2
Get, Prescaler: 16, Acquisition Time: 8, Oversampling Rate: 128,
Reference Voltage: 5.0 V
Writes the time values for the lowest energy mode (mode 2)
See also:
times (Times | None) –
The values for the advertisement time in the reduced energy mode in milliseconds and the time until the node will go into the lowest energy mode (mode 2) from the reduced energy mode (mode 1) – if there is no activity – in milliseconds.
If you do not specify these values then the default values from the configuration will be used
None
Examples
Import required library code
>>> from asyncio import run
>>> from icotronic.can.connection import Connection
Read and write the reduced energy time values of a sensor node
>>> async def read_write_energy_mode_lowest(sleep, advertisement):
... async with Connection() as stu:
... # We assume that at least one sensor node is available
... async with stu.connect_sensor_node(0) as sensor_node:
... await sensor_node.set_energy_mode_lowest(
... Times(sleep=sleep,
... advertisement=advertisement))
... times = await sensor_node.get_energy_mode_lowest()
...
... # Overwrite changed values with default config
... # values
... await sensor_node.set_energy_mode_lowest()
...
... return times
>>> times = run(read_write_energy_mode_lowest(200_000, 2000))
>>> times.sleep
200000
>>> round(times.advertisement)
2000
Writes the time values for the reduced energy mode (mode 1)
See also:
times (Times | None) –
The values for the advertisement time in the reduced energy mode in milliseconds and the time until the node will go into the low energy mode (mode 1) from the disconnected state – if there is no activity – in milliseconds.
If you do not specify these values then the default values from the configuration will be used
None
Examples
Import required library code
>>> from asyncio import run
>>> from icotronic.can.connection import Connection
Read and write the reduced energy time values of a sensor node
>>> async def read_write_energy_mode_reduced(sleep, advertisement):
... async with Connection() as stu:
... # We assume that at least one sensor node is available
... async with stu.connect_sensor_node(0) as sensor_node:
... await sensor_node.set_energy_mode_reduced(
... Times(sleep=sleep,
... advertisement=advertisement))
... times = await sensor_node.get_energy_mode_reduced()
...
... # Overwrite changed values with default config
... # values
... await sensor_node.set_energy_mode_reduced()
...
... return times
>>> times = run(read_write_energy_mode_reduced(200_000, 2000))
>>> times.sleep
200000
>>> round(times.advertisement)
2000
Set the name of a sensor node
name (str) – The new name for the node
None
Examples
Import required library code
>>> from asyncio import run
>>> from icotronic.can.connection import Connection
Change the name of a sensor node
>>> async def test_naming(name):
... async with Connection() as stu:
... # We assume that at least one sensor node is available
... # and that this node currently does not have the name
... # specified in the variable `name`.
... async with stu.connect_sensor_node(0) as sensor_node:
... before = await sensor_node.get_name()
... await sensor_node.set_name(name)
... updated = await sensor_node.get_name()
... await sensor_node.set_name(before)
... after = await sensor_node.get_name()
... return before, updated, after
>>> before, updated, after = run(test_naming("Hello"))
>>> before != "Hello"
True
>>> updated
'Hello'
>>> before == after
True
Change the sensor numbers for the different measurement channels
If you use the sensor number 0 for one of the different measurement channels, then the sensor (number) for that channel will stay the same.
sensors (SensorConfiguration) – The sensor numbers of the different measurement channels
None
Examples
Import required library code
>>> from asyncio import run
>>> from icotronic.can.connection import Connection
Setting sensor config from node without sensor config support fails
>>> async def set_sensor_config():
... async with Connection() as stu:
... # We assume that at least one sensor node is available
... async with stu.connect_sensor_node(0) as sensor_node:
... await sensor_node.set_sensor_configuration(
... SensorConfiguration(
... first=0, second=0, third=0))
>>> config = run(
... set_sensor_config())
Traceback (most recent call last):
...
UnsupportedFeatureException: Writing sensor configuration is not
supported
Start streaming data
channels (StreamingConfiguration) – Specifies which of the three measurement channels should be
enabled or disabled
None
The CAN identifier that this coroutine returns can be used to filter CAN messages that contain the expected streaming data
Stop streaming data
retries (int) – The number of times the message is sent again, if no response
was sent back in a certain amount of time
ignore_errors – Specifies, if this coroutine should ignore, if there were any problems while stopping the stream.
None
Communicate and control a connected SHA or STH
spu (SPU) – The SPU object used to communicate with the node
Activate self test of STH accelerometer
dimension (str) – The dimension (x, y or z) for which the self test
should be activated.
None
Examples
Import required library code
>>> from asyncio import run
>>> from icotronic.can.connection import Connection
Activate and deactivate acceleration self-test
>>> async def test_self_test():
... async with Connection() as stu:
... # We assume that at least one sensor node is
... # available
... async with stu.connect_sensor_node(0, STH) as sth:
... await sth.activate_acceleration_self_test()
... await sth.deactivate_acceleration_self_test()
>>> run(test_self_test())
Deactivate self test of STH accelerometer
dimension (str) – The dimension (x, y or z) for which the self test
should be deactivated.
None
Retrieve function to convert raw sensor data into g
Callable[[float], float]
A function that converts 16 bit raw values from the STH into multiples of earth’s gravitation (g)
Examples
Import required library code
>>> from asyncio import run
>>> from icotronic.can.connection import Connection
>>> from icotronic.can.node.sth import STH
Convert a raw ADC value into multiples of g
>>> async def read_sensor_values():
... async with Connection() as stu:
... # We assume that at least one sensor node is available
... async with stu.connect_sensor_node(0, STH) as sth:
... convert_to_g = (await
... sth.get_acceleration_conversion_function())
... data = await sth.get_streaming_data_single()
... before = list(data.values)
... data.apply(convert_to_g)
... return before, data.values
>>> before, after = run(read_sensor_values())
>>> all([0 <= value <= 2**16 for value in before])
True
>>> all([-100 <= value <= 100 for value in after])
True
Retrieve the maximum acceleration sensor range in multiples of g₀
For a ±100 g₀ sensor this method returns 200 (100 + abs(-100)).
For a ±50 g₀ sensor this method returns 100 (50 + abs(-50)).
For this to work correctly the EEPROM value of the x-axis acceleration offset in the EEPROM has to be set.
int
Range of current acceleration sensor in multiples of earth’s gravitation
Examples
Import required library code
>>> from asyncio import run
>>> from icotronic.can.connection import Connection
>>> from icotronic.can.node.sth import STH
Write and read the acceleration offset of STH 1
>>> async def read_sensor_range():
... async with Connection() as stu:
... # We assume that at least one sensor node is available
... async with stu.connect_sensor_node(0, STH) as sth:
... return (await
... sth.get_acceleration_sensor_range_in_g())
>>> sensor_range = run(read_sensor_range())
>>> 0 < sensor_range <= 200
True
Retrieve the current acceleration voltage in Volt
dimension (str) – The dimension (x=1, y=2, z=3) for which the acceleration
voltage should be measured
reference_voltage (float) – The reference voltage for the ADC in Volt
float
The voltage of the acceleration sensor in Volt
Examples
Import required library code
>>> from asyncio import run
>>> from icotronic.can.connection import Connection
Read the acceleration voltage of STH 1
>>> async def read_acceleration_voltage():
... async with Connection() as stu:
... # We assume that at least one sensor node is available
... async with stu.connect_sensor_node(0, STH) as sth:
... before = await sth.get_acceleration_voltage()
... await sth.activate_acceleration_self_test()
... between = await sth.get_acceleration_voltage()
... await sth.deactivate_acceleration_self_test()
... after = await sth.get_acceleration_voltage()
... return (before, between, after)
>>> before, between, after = run(read_acceleration_voltage())
>>> before < between and after < between
True
Streaming configuration
first (bool) – Specifies if the first channel is enabled or not
second (bool) – Specifies if the second channel is enabled or not
third (bool) – Specifies if the third channel is enabled or not
ValueError – if none of the channels is active
Examples
Create some example streaming configurations
>>> config = StreamingConfiguration()
>>> config = StreamingConfiguration(
... first=False, second=True, third=True)
Creating streaming configurations without active channels will fail
>>> config = StreamingConfiguration(first=False)
Traceback (most recent call last):
...
ValueError: At least one channel needs to be active
>>> config = StreamingConfiguration(
... first=False, second=False, third=False)
Traceback (most recent call last):
...
ValueError: At least one channel needs to be active
Get the activated axes returned by this streaming configuration
list[str]
A list containing all activated axes in alphabetical order
Examples
Get the activated axes for example streaming configurations
>>> StreamingConfiguration(
... first=False, second=True, third=True).axes()
['y', 'z']
>>> StreamingConfiguration(
... first=True, second=True, third=False).axes()
['x', 'y']
Returns the streaming data length
This will be either:
2 (when 2 channels are active), or
3 (when 1 or 3 channels are active)
For more information, please take a look here.
int
The length of the streaming data resulting from this channel configuration
Examples
Get the data length of example streaming configurations
>>> StreamingConfiguration().data_length()
3
>>> StreamingConfiguration(
... first=False, second=True, third=False).data_length()
3
>>> StreamingConfiguration(
... first=True, second=True, third=True).data_length()
3
>>> StreamingConfiguration(
... first=False, second=True, third=True).data_length()
2
Get the number of activated channels
int
The number of enabled channels
Examples
Get the number of enabled channels for example streaming configs
>>> StreamingConfiguration(first=True).enabled_channels()
1
>>> StreamingConfiguration(first=False, second=True, third=False
... ).enabled_channels()
1
>>> StreamingConfiguration(first=True, second=True, third=True
... ).enabled_channels()
3
Check the activation state of the first channel
Returns:
True, if the first channel is enabled or False otherwise
Examples
Check channel one activation status for example configs
>>> StreamingConfiguration(first=True, second=False,
... third=False).first
True
>>> StreamingConfiguration(first=False, second=False,
... third=True).first
False
Check the activation state of the second channel
True, if the second channel is enabled or False otherwise
Examples
Check channel two activation status for example configs
>>> StreamingConfiguration(
... first=True, second=False, third=False).second
False
>>> StreamingConfiguration(
... first=False, second=True, third=True).second
True
Check the activation state of the third channel
Returns:
True, if the third channel is enabled or False otherwise
Examples
Check channel three activation status for example configs
>>> StreamingConfiguration(
... first=True, second=False, third=False).third
False
>>> StreamingConfiguration(
... first=False, second=False, third=True).third
True
Support for storing data of a streaming message
counter (int) – The message counter value
timestamp (float) – The message timestamp
values (Sequence[float]) – The streaming values
Examples
Create new streaming data
>>> StreamingData(values=[1, 2, 3], counter=21, timestamp=1)
[1, 2, 3]@1 #21
Streaming data must store either two or three values
>>> StreamingData(values=[1], counter=21, timestamp=1)
Traceback (most recent call last):
...
ValueError: Incorrect number of streaming values: 1 (instead of 2 or 3)
>>> StreamingData(values=[1, 2, 3, 4], counter=21, timestamp=1
... )
Traceback (most recent call last):
...
ValueError: Incorrect number of ... values: 4 (instead of 2 or 3)
Apply a certain function to the streaming data
Note
This function changes the stored values in the streaming data and
(as convenience feature) also returns the modified streaming data
itself. This is useful if you want to use the modified streaming
as parameter in a function call, i.e. you can use something like
function(stream_data.apply()).
function (Callable[[float], float]) – The function that should be applied to the streaming data
The modified streaming data
Examples
Add the constant 10 to some example streaming data
>>> data = StreamingData(values=[1, 2, 3], counter=21, timestamp=1)
>>> data.apply(lambda value: value + 10)
[11, 12, 13]@1 #21
>>> data.values
[11, 12, 13]
Context manager class for storing measurement data in HDF5 format
filepath (Path | str) – The filepath of the HDF5 file in which this object should store
measurement data
channels (StreamingConfiguration | None) – All channels for which data should be collected or None, if the
axes data should be taken from an existing valid file at
filepath.
Examples
Create new file
>>> filepath = Path("test.hdf5")
>>> with Storage(filepath,
... channels=StreamingConfiguration(first=True)
... ) as storage:
... pass
Opening an existing file but still providing channel info should fail
>>> with Storage(filepath,
... channels=StreamingConfiguration(first=True)
... ) as storage:
... pass
Traceback (most recent call last):
...
ValueError: File “...” exist but channels parameter is not None
>>> filepath.unlink()
Close the HDF file
None
Open and initialize the HDF file for writing
Store HDF acceleration data
file_handle (File) – The HDF file that should store the data
channels (StreamingConfiguration | None) – All channels for which data should be collected or None, if the
axes data should be taken from an existing valid file at
filepath.
Examples
Create new data
>>> filepath = Path("test.hdf5")
>>> streaming_data = StreamingData(values=[1, 2, 3], counter=1,
... timestamp=4306978.449)
>>> with Storage(filepath, StreamingConfiguration(first=True)) as data:
... data.add_streaming_data(streaming_data)
Read from existing file
>>> with Storage(filepath) as data:
... print(data.dataloss_stats())
(1, 0)
>>> filepath.unlink()
Add streaming data to the storage object
streaming_data (StreamingData) – The streaming data that should be added to the storage
None
Examples
Import required library code
>>> from icotronic.can.streaming import StreamingConfigBits
Store streaming data for single channel
>>> channel3 = StreamingConfiguration(first=False, second=False,
... third=True)
>>> data1 = StreamingData(values=[1, 2, 3], counter=21,
... timestamp=1)
>>> data2 = StreamingData(values=[4, 5, 6], counter=22,
... timestamp=2)
>>> filepath = Path("test.hdf5")
>>> with Storage(filepath, channel3) as storage:
... storage.add_streaming_data(data1)
... storage.add_streaming_data(data2)
... # Normally the class takes care about when to store back
... # data to the disk itself. We do a manual flush here to
... # check the number of stored items.
... storage.acceleration.flush()
... print(storage.acceleration.nrows)
6
>>> filepath.unlink()
Store streaming data for three channels
>>> all = StreamingConfiguration(first=True, second=True,
... third=True)
>>> data1 = StreamingData(values=[1, 2, 3], counter=21,
... timestamp=1)
>>> data2 = StreamingData(values=[4, 5, 6], counter=22,
... timestamp=2)
>>> with Storage(filepath, all) as storage:
... storage.add_streaming_data(data1)
... storage.add_streaming_data(data2)
... storage.acceleration.flush()
... print(storage.acceleration.nrows)
2
>>> filepath.unlink()
Determine (minimum) data loss
float
Amount of lost messages divided by all messages (lost and retrieved)
Examples
Import required library code
>>> from math import isclose
Calculate data loss for an example storage file
>>> def calculate_dataloss():
... filepath = Path("test.hdf5")
... with Storage(filepath, StreamingConfiguration(
... first=True)) as storage:
... for counter in range(256):
... storage.add_streaming_data(
... StreamingData(values=[1, 2, 3],
... counter=counter,
... timestamp=counter/10))
... for counter in range(128, 256):
... storage.add_streaming_data(
... StreamingData(values=[4, 5, 6],
... counter=counter,
... timestamp=(255 + counter)/10))
...
... dataloss = storage.dataloss()
... filepath.unlink()
... return dataloss
>>> isclose(0.25, calculate_dataloss(), rel_tol=0.005)
True
Determine number of lost and received messages
tuple[int, int]
Tuple containing the number of received and the number of lost messages
Examples
Get data loss statistics for an example file
>>> def calculate_dataloss_stats():
... filepath = Path("test.hdf5")
... with Storage(filepath,StreamingConfiguration(
... first=True)) as storage:
... for counter in range(256):
... storage.add_streaming_data(
... StreamingData(values=[1, 2, 3],
... counter=counter,
... timestamp=counter/10))
... for counter in range(128, 256):
... storage.add_streaming_data(
... StreamingData(values=[4, 5, 6],
... counter=counter,
... timestamp=(255 + counter)/10))
...
... stats = storage.dataloss_stats()
... filepath.unlink()
... return stats
>>> retrieved, lost = calculate_dataloss_stats()
>>> retrieved
384
>>> lost
128
Calculate sampling frequency of measurement data
float
Sampling frequency (of a single data channel) in Hz
Examples
Import required library code
>>> from math import isclose
Calculate sampling frequency for an example file
>>> def calculate_sampling_frequency():
... filepath = Path("test.hdf5")
... with Storage(filepath, StreamingConfiguration(
... first=True, second=True, third=True)) as storage:
... storage.add_streaming_data(
... StreamingData(values=[1, 2, 3],
... counter=1,
... timestamp=0))
... storage.add_streaming_data(
... StreamingData(values=[1, 2, 3],
... counter=2,
... timestamp=1))
...
... sampling_frequency = storage.sampling_frequency()
... filepath.unlink()
... return sampling_frequency
>>> calculate_sampling_frequency()
2.0
Add acceleration metadata
name (str) – The name of the meta attribute
value (str) – The value of the meta attribute
None
Examples
Store some example acceleration data
>>> filepath = Path("test.hdf5")
>>> with Storage(filepath,
... StreamingConfiguration(third=True)) as storage:
... storage.store_acceleration_meta("something", "some value")
... print(storage.acceleration.attrs["something"])
some value
>>> filepath.unlink()
Store the sample rate of the ADC
adc_configuration (ADCConfiguration) – The current ADC configuration of the sensor node
None
Examples
Write and read sample rate metadata in example HDF file
>>> filepath = Path("test.hdf5")
>>> adc_configuration = ADCConfiguration(
... set=True,
... prescaler=2,
... acquisition_time=16,
... oversampling_rate=256)
>>> with Storage(filepath,
... StreamingConfiguration(first=True)) as storage:
... storage.write_sample_rate(adc_configuration)
... sample_rate = storage.acceleration.attrs["Sample_Rate"]
... print(sample_rate)
1724.14 Hz (Prescaler: 2, Acquisition Time: 16,
Oversampling Rate: 256)
>>> filepath.unlink()
Add metadata about sensor range
This method assumes that sensor have a symmetric measurement range (e.g. a sensor with a range of 200 g measures from - 100 g up to + 100 g).
sensor_range_in_g (float) – The measurement range of the sensor in multiples of g
None
Examples
Add sensor range metadata to example file
>>> filepath = Path("test.hdf5")
>>> with Storage(filepath,
... StreamingConfiguration(third=True)) as storage:
... storage.write_sensor_range(200)
... print(storage.acceleration.attrs["Sensor_Range"])
± 100 g₀
>>> filepath.unlink()
Support for reading and writing analog digital converter configuration
*data (bytearray | list[int]) – A list containing the (first five) bytes of the ADC
configuration
set (bool | None) – Specifies if we want to set or retrieve (get) the ADC
configuration
prescaler (int | None) – The ADC prescaler value (1 – 127)
acquisition_time (int | None) – The acquisition time in number of cycles
(1, 2, 3, 4, 8, 16, 32, … , 256)
oversampling_rate (int | None) – The ADC oversampling rate (1, 2, 4, 8, … , 4096)
reference_voltage (float | None) – The ADC reference voltage in Volt
(1.25, 1.65, 1.8, 2.1, 2.2, 2.5, 2.7, 3.3, 5, 6.6)
Examples
Create simple ADC configuration from scratch
>>> ADCConfiguration(prescaler=2,
... acquisition_time=4,
... oversampling_rate=64)
Get, Prescaler: 2, Acquisition Time: 4, Oversampling Rate: 64,
Reference Voltage: 3.3 V
Get the acquisition time
The acquisition time
Examples
Get initialised acquisition time
>>> config = ADCConfiguration(acquisition_time=2)
>>> config.acquisition_time
2
Get acquisition time set via property
>>> config.acquisition_time = 128
>>> config.acquisition_time
128
Trying to set an unsupported acquisition time will fail
>>> config.acquisition_time = 5
Traceback (most recent call last):
...
ValueError: Acquisition time of “5” out of range, please use ...
Get the oversampling rate
The oversampling rate
Examples
Get initialised oversampling rate
>>> config = ADCConfiguration(oversampling_rate=128)
>>> config.oversampling_rate
128
Get oversampling rate set via property
>>> config.oversampling_rate = 512
>>> config.oversampling_rate
512
Trying to set an unsupported oversampling rate will fail
>>> config.oversampling_rate = 3
Traceback (most recent call last):
...
ValueError: Oversampling rate of “3” out of range, please use ...
Get the prescaler value
The prescaler value
Examples
Get initialised prescaler
>>> config = ADCConfiguration(prescaler=127)
>>> config.prescaler
127
Get prescaler set via property
>>> config.prescaler = 20
>>> config.prescaler
20
Trying to set an unsupported prescaler will fail
>>> config.prescaler = 128
Traceback (most recent call last):
...
ValueError: Prescaler value of “128” out of range, please use ...
Get the reference voltage
The reference voltage in Volt
Examples
Set and get different reference voltage values
>>> config = ADCConfiguration(reference_voltage=3.3)
>>> config.reference_voltage
3.3
>>> config.reference_voltage = 6.6
>>> config.reference_voltage
6.6
>>> config = ADCConfiguration(reference_voltage=6.6)
>>> config.reference_voltage
6.6
>>> config.reference_voltage = 1.8
>>> config.reference_voltage
1.8
>>> config = ADCConfiguration(reference_voltage=1.8)
>>> config.reference_voltage
1.8
Trying to set a unsupported reference voltage will fail
>>> config.reference_voltage = 0
Traceback (most recent call last):
...
ValueError: Reference voltage of “0” V out of range, please use ...
Calculate the sampling rate for the current ADC configuration
float
The calculated sample rate
Examples
Get sampling rates based on ADC attribute values
>>> round(ADCConfiguration(prescaler=2, acquisition_time=8,
... oversampling_rate=64).sample_rate())
9524
>>> round(ADCConfiguration(prescaler=8, acquisition_time=8,
... oversampling_rate=64).sample_rate())
3175
>>> round(ADCConfiguration(reference_voltage=5.0,
... prescaler=16,
... acquisition_time=8,
... oversampling_rate=128).sample_rate())
840