Previous topic

Usage

Next topic

Examples

This Page

API

Connection

class icotronic.can.Connection

Basic class to initialize CAN communication

To actually connect to the CAN bus you need to use the async context manager, provided by this class. If you want to manage the connection yourself, please just use __aenter__ and __aexit__ manually.

Examples

Create a new connection (without connecting to the CAN bus)

>>> connection = Connection()
async __aenter__()

Connect to the STU

Return type

STU

Returns

An object that can be used to communicate with the STU

Raises

CANInitError – if the CAN initialization fails

Examples

Import required library code

>>> from asyncio import run

Use a context manager to handle the cleanup process automatically

>>> async def connect_can_context():
...     async with Connection() as stu:
...         pass
>>> run(connect_can_context())

Create and shutdown the connection explicitly

>>> async def connect_can_manual():
...     connection = Connection()
...     connected = await connection.__aenter__()
...     await connection.__aexit__(None, None, None)
>>> run(connect_can_manual())

Nodes

class icotronic.can.STU(spu)

Communicate and control a connected STU

Parameters

spu (SPU) – The SPU object that created this STU instance

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Create an STU object

>>> async def create_stu():
...     async with Connection() as stu:
...         pass # call some coroutines of `stu` object
>>> run(create_stu())
async activate_bluetooth()

Activate Bluetooth on the STU

Return type

None

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Activate Bluetooth on the STU

>>> async def activate():
...     async with Connection() as stu:
...         await stu.activate_bluetooth()
>>> run(activate())
connect_sensor_node(identifier, sensor_node_class=<class 'icotronic.can.node.sensor.SensorNode'>)

Connect to a sensor node (e.g. SHA, SMH or STH)

Parameters
  • identifier (int | str | EUI) –

    The

    • MAC address (EUI),

    • name (str), or

    • node number (int)

    of the sensor node we want to connect to

  • sensor_node_class (type[SensorNode]) – Sensor node subclass that should be returned by context manager

Return type

AsyncSensorNodeManager

Returns

A context manager that returns a sensor node object for the connected node

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Connect to the sensor node with node number 0

>>> async def connect_sensor_node():
...     async with Connection() as stu:
...         async with stu.connect_sensor_node(0):
...             connected = await stu.is_connected()
...         after = await stu.is_connected()
...         return (connected, after)
>>> run(connect_sensor_node())
(True, False)
async connect_with_mac_address(mac_address)

Connect to a Bluetooth sensor node using its MAC address

Parameters

mac_address (EUI) – The MAC address of the sensor node

Return type

None

Examples

Import required library code

>>> from asyncio import run, sleep
>>> from icotronic.can.connection import Connection

Connect to a sensor node via its MAC address

>>> async def get_bluetooth_mac():
...     async with Connection() as stu:
...         await stu.activate_bluetooth()
...         # Wait for Bluetooth activation to take place
...         await sleep(2)
...         return await stu.get_mac_address(0)
>>> mac_address = run(get_bluetooth_mac())
>>> mac_address != EUI(0)
True
>>> async def connect(mac_address):
...     async with Connection() as stu:
...         await stu.deactivate_bluetooth()
...         # We assume that at least one STH is available
...         connected = before = await stu.is_connected()
...         await stu.activate_bluetooth()
...         while not connected:
...             await stu.connect_with_mac_address(mac_address)
...             await sleep(0.1)
...             connected = await stu.is_connected()
...         await stu.deactivate_bluetooth()
...         after = await stu.is_connected()
...         # Return status of Bluetooth node connect response
...         return before, connected, after
>>> run(connect(mac_address))
(False, True, False)
async connect_with_number(sensor_node_number=0)

Connect to a Bluetooth node using a node number

Parameters

sensor_node_number (int) – The number of the Bluetooth node (0 up to the number of available nodes - 1)

Return type

bool

Returns

  • True, if

    1. in search mode,

    2. at least single node was found,

    3. no legacy mode,

    4. and scanning mode active

  • False, otherwise

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Connect to node “0”

>>> async def connect_bluetooth_sensor_node_number():
...     async with Connection() as stu:
...         await stu.activate_bluetooth()
...         # We assume that at least one STH is available
...         connected = before = await stu.is_connected()
...         while not connected:
...             connected = await stu.connect_with_number(0)
...         await stu.deactivate_bluetooth()
...         after = await stu.is_connected()
...         # Return status of Bluetooth node connect response
...         return before, connected, after
>>> run(connect_bluetooth_sensor_node_number())
(False, True, False)
async deactivate_bluetooth()

Deactivate Bluetooth on the STU

Return type

None

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Deactivate Bluetooth on STU 1

>>> async def deactivate_bluetooth():
...     async with Connection() as stu:
...         await stu.deactivate_bluetooth()
>>> run(deactivate_bluetooth())
async get_available_nodes()

Retrieve the number of available sensor nodes

Return type

int

Returns

The number of available sensor nodes

Examples

Import required library code

>>> from asyncio import run, sleep
>>> from icotronic.can.connection import Connection

Get the number of available Bluetooth nodes at STU 1

>>> async def get_number_bluetooth_nodes():
...     async with Connection() as stu:
...         await stu.activate_bluetooth()
...
...         # We assume at least one STH is available
...         number_sths = 0
...         while number_sths <= 0:
...             number_sths = await stu.get_available_nodes()
...             await sleep(0.1)
...
...         return number_sths
>>> run(get_number_bluetooth_nodes()) >= 0
1
async get_mac_address(sensor_node_number=255)

Retrieve the MAC address of the STU or a sensor node

Note

Bluetooth needs to be activated before calling this coroutine, otherwise an incorrect MAC address will be returned (for sensor nodes).

Parameters

sensor_node_number (int) – The node number of the Bluetooth node (0 up to the number of available nodes - 1) or 0x00 (SENSOR_NODE_NUMBER_SELF_ADDRESSING) to retrieve the MAC address of the STU itself

Return type

EUI

Returns

The MAC address of the specified sensor node

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Retrieve the MAC address of STH 1

>>> async def get_bluetooth_mac():
...     async with Connection() as stu:
...         await stu.activate_bluetooth()
...         return await stu.get_mac_address(0)
>>> mac_address = run(get_bluetooth_mac())
>>> isinstance(mac_address, EUI)
True
>>> mac_address != EUI(0)
True
async get_name(sensor_node_number)

Retrieve the name of a sensor node

Parameters

sensor_node_number (int) – The number of the Bluetooth node (0 up to the number of available nodes - 1)

Return type

str

Returns

The (Bluetooth broadcast) name of the node

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Get Bluetooth advertisement name of node “0” from STU 1

>>> async def get_bluetooth_node_name():
...     async with Connection() as stu:
...         await stu.activate_bluetooth()
...         # We assume that at least one STH is available
...         return await stu.get_name(0)
>>> name = run(get_bluetooth_node_name())
>>> isinstance(name, str)
True
>>> 0 <= len(name) <= 8
True
async get_rssi(sensor_node_number)

Retrieve the RSSI (Received Signal Strength Indication) of an STH

Parameters

sensor_node_number (int) – The number of the Bluetooth node (0 up to the number of available nodes)

Returns

The RSSI of the node

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Retrieve the RSSI of a disconnected STH

>>> async def get_bluetooth_rssi():
...     async with Connection() as stu:
...         await stu.activate_bluetooth()
...         # We assume that at least one STH is available
...         # Get the RSSI of node “0”
...         return await stu.get_rssi(0)
>>> rssi = run(get_bluetooth_rssi())
>>> -80 < rssi < 0
True
async get_sensor_nodes()

Retrieve a list of available sensor nodes

Return type

list[SensorNodeInfo]

Returns

A list of available nodes including node number, name, MAC address and RSSI for each node

Examples

Import required library code

>>> from asyncio import run, sleep
>>> from netaddr import EUI
>>> from icotronic.can.connection import Connection

Retrieve the list of Bluetooth nodes at STU 1

>>> async def get_sensor_nodes():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         nodes = []
...         while not nodes:
...             nodes = await stu.get_sensor_nodes()
...             await sleep(0.1)
...
...         return nodes
>>> nodes = run(get_sensor_nodes())
>>> len(nodes) >= 1
True
>>> node = nodes[0]
>>> node.sensor_node_number
0
>>> isinstance(node.name, str)
True
>>> 0 <= len(node.name) <= 8
True
>>> -80 < node.rssi < 0
True
>>> isinstance(node.mac_address, EUI)
True
async is_connected()

Check if the STU is connected to a Bluetooth node

Returns:

  • True, if a Bluetooth node is connected to the node

  • False, otherwise

Examples

>>> from asyncio import run, sleep
>>> from icotronic.can.connection import Connection

Check connection of node “0” to STU

>>> async def check_bluetooth_connection():
...     async with Connection() as stu:
...         await stu.activate_bluetooth()
...         await sleep(0.1)
...         connected_start = await stu.is_connected()
...
...         # We assume that at least one STH is available
...         await stu.connect_with_number(0)
...         # Wait for node connection
...         connected_between = False
...         while not connected_between:
...             connected_between = await stu.is_connected()
...             await sleep(0.1)
...             await stu.connect_with_number(0)
...
...         # Deactivate Bluetooth connection
...         await stu.deactivate_bluetooth()
...         # Wait until node is disconnected
...         await sleep(0.1)
...         connected_after = await stu.is_connected()
...
...         return (connected_start, connected_between,
...                 connected_after)
>>> run(check_bluetooth_connection())
(False, True, False)
Return type

bool

class icotronic.can.SensorNode(spu, eeprom=<class 'icotronic.can.node.eeprom.sensor.SensorNodeEEPROM'>)

Communicate and control a connected sensor node (SHA, STH, SMH)

Parameters
  • spu (SPU) – The SPU object used to connect to this sensor node

  • eeprom (type[SensorNodeEEPROM]) – The EEPROM class of the node

async get_adc_configuration()

Read the current ADC configuration

Return type

ADCConfiguration

Returns

The ADC configuration of the sensor node

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Read ADC sensor config of sensor node with node id 0

>>> async def read_adc_config():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0) as sensor_node:
...             return await sensor_node.get_adc_configuration()
>>> run(read_adc_config())
Get, Prescaler: 2, Acquisition Time: 8, Oversampling Rate: 64,
Reference Voltage: 3.3 V
async get_energy_mode_lowest()

Read the reduced lowest energy mode (mode 2) time values

See also:

Return type

Times

Returns

A tuple containing the advertisement time in the lowest energy mode in milliseconds and the time until the node will switch from the reduced energy mode (mode 1) to the lowest energy mode (mode 2) – if there is no activity – in milliseconds

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Retrieve the reduced energy time values of a sensor node

>>> async def read_energy_mode_lowest():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0) as sensor_node:
...             return await sensor_node.get_energy_mode_lowest()
>>> times = run(read_energy_mode_lowest())
>>> round(times.advertisement)
2500
>>> times.sleep
259200000
async get_energy_mode_reduced()

Read the reduced energy mode (mode 1) sensor node time values

See also:

Return type

Times

Returns

A tuple containing the advertisement time in the reduced energy mode in milliseconds and the time until the node will switch from the disconnected state to the low energy mode (mode 1) – if there is no activity – in milliseconds

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Retrieve the reduced energy time values of a sensor node

>>> async def read_energy_mode_reduced():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0) as sensor_node:
...             return await sensor_node.get_energy_mode_reduced()
>>> times = run(read_energy_mode_reduced())
>>> round(times.advertisement)
1250
>>> times.sleep
300000
async get_mac_address()

Retrieve the MAC address of the sensor node

Return type

EUI

Returns

The MAC address of the specified sensor node

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Retrieve the MAC address of STH 1

>>> async def get_bluetooth_mac():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0) as sensor_node:
...             return await sensor_node.get_mac_address()
>>> mac_address = run(get_bluetooth_mac())
>>> isinstance(mac_address, EUI)
True
>>> mac_address != EUI(0)
True
async get_name()

Retrieve the name of the sensor node

Return type

str

Returns

The (Bluetooth broadcast) name of the node

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Get Bluetooth advertisement name of node “0”

>>> async def get_sensor_node_name():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0) as sensor_node:
...             return await sensor_node.get_name()
>>> name = run(get_sensor_node_name())
>>> isinstance(name, str)
True
>>> 0 <= len(name) <= 8
True
async get_rssi()

Retrieve the RSSI (Received Signal Strength Indication) of the node

Returns

The RSSI of the node

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Get RSSI of node “0”

>>> async def get_sensor_node_rssi():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0) as sensor_node:
...             return await sensor_node.get_rssi()
>>> rssi = run(get_sensor_node_rssi())
>>> -70 < rssi < 0
True
async get_sensor_configuration()

Read the current sensor configuration

Raises

UnsupportedFeatureException – in case the sensor node replies with an error message

Return type

SensorConfiguration

Returns

The sensor number for the different axes

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Reading sensor config from node without sensor config support fails

>>> async def read_sensor_config():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0) as sensor_node:
...             return await sensor_node.get_sensor_configuration()
>>> config = run(
...     read_sensor_config())
Traceback (most recent call last):
   ...
UnsupportedFeatureException: Reading sensor configuration is not
supported
async get_streaming_data_single(channels=Channel 1 enabled, Channel 2 enabled, Channel 3 enabled)

Read a single set of raw ADC values from the sensor node

Parameters

channels – Specifies which of the three measurement channels should be enabled or disabled

Return type

StreamingData

Returns

The latest three ADC values measured by the sensor node

Examples

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Read a single value from all three measurement channels

>>> async def read_sensor_values():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0) as sensor_node:
...             return (await
...                 sensor_node.get_streaming_data_single())
>>> data = run(read_sensor_values())
>>> len(data.values)
3
>>> all([0 <= value <= 0xffff for value in data.values])
True
async get_supply_voltage()

Read the current supply voltage

Return type

float

Returns

The supply voltage of the sensor node

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Read the supply voltage of the sensor node with node number 0

>>> async def get_supply_voltage():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0) as sensor_node:
...             return await sensor_node.get_supply_voltage()
>>> supply_voltage = run(get_supply_voltage())
>>> 3 <= supply_voltage <= 4.2
True
open_data_stream(channels, timeout=5)

Open measurement data stream

Parameters
  • channels (StreamingConfiguration) – Specifies which measurement channels should be enabled

  • timeout (float) – The amount of seconds between two consecutive messages, before a TimeoutError will be raised

Return type

DataStreamContextManager

Returns

A context manager object for managing stream data

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Read data of first and third channel

>>> async def read_streaming_data():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0) as sensor_node:
...             channels = StreamingConfiguration(first=True,
...                                               third=True)
...             async with sensor_node.open_data_stream(
...               channels) as stream:
...                 first = []
...                 third = []
...                 messages = 0
...                 async for data, _ in stream:
...                     first.append(data.values[0])
...                     third.append(data.values[1])
...                     messages += 1
...                     if messages >= 3:
...                         break
...                 return first, third
>>> first, third = run(read_streaming_data())
>>> len(first)
3
>>> len(third)
3
async set_adc_configuration(reference_voltage=3.3, prescaler=2, acquisition_time=8, oversampling_rate=64)

Change the ADC configuration of a connected sensor node

Parameters
  • reference_voltage (float) – The ADC reference voltage in Volt (1.25, 1.65, 1.8, 2.1, 2.2, 2.5, 2.7, 3.3, 5, 6.6)

  • prescaler (int) – The ADC prescaler value (1 – 127)

  • acquisition_time (int) – The ADC acquisition time in number of cycles (1, 2, 3, 4, 8, 16, 32, … , 256)

  • oversampling_rate (int) – The ADC oversampling rate (1, 2, 4, 8, … , 4096)

Return type

None

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Read and write ADC sensor config

>>> async def write_read_adc_config():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0) as sensor_node:
...             await sensor_node.set_adc_configuration(
...                 3.3, 8, 8, 64)
...             modified_config1 = (await
...                 sensor_node.get_adc_configuration())
...
...             adc_config = ADCConfiguration(
...                 reference_voltage=5.0,
...                 prescaler=16,
...                 acquisition_time=8,
...                 oversampling_rate=128)
...             await sensor_node.set_adc_configuration(
...                 **adc_config)
...             modified_config2 = (await
...                 sensor_node.get_adc_configuration())
...
...             # Write back default config values
...             await sensor_node.set_adc_configuration(
...                 3.3, 2, 8, 64)
...             return modified_config1, modified_config2
>>> config1, config2 = run(write_read_adc_config())
>>> config1
Get, Prescaler: 8, Acquisition Time: 8, Oversampling Rate: 64,
Reference Voltage: 3.3 V
>>> config2
Get, Prescaler: 16, Acquisition Time: 8, Oversampling Rate: 128,
Reference Voltage: 5.0 V
async set_energy_mode_lowest(times=None)

Writes the time values for the lowest energy mode (mode 2)

See also:

Parameters

times (Times | None) –

The values for the advertisement time in the reduced energy mode in milliseconds and the time until the node will go into the lowest energy mode (mode 2) from the reduced energy mode (mode 1) – if there is no activity – in milliseconds.

If you do not specify these values then the default values from the configuration will be used

Return type

None

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Read and write the reduced energy time values of a sensor node

>>> async def read_write_energy_mode_lowest(sleep, advertisement):
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0) as sensor_node:
...             await sensor_node.set_energy_mode_lowest(
...                 Times(sleep=sleep,
...                       advertisement=advertisement))
...             times = await sensor_node.get_energy_mode_lowest()
...
...             # Overwrite changed values with default config
...             # values
...             await sensor_node.set_energy_mode_lowest()
...
...             return times
>>> times = run(read_write_energy_mode_lowest(200_000, 2000))
>>> times.sleep
200000
>>> round(times.advertisement)
2000
async set_energy_mode_reduced(times=None)

Writes the time values for the reduced energy mode (mode 1)

See also:

Parameters

times (Times | None) –

The values for the advertisement time in the reduced energy mode in milliseconds and the time until the node will go into the low energy mode (mode 1) from the disconnected state – if there is no activity – in milliseconds.

If you do not specify these values then the default values from the configuration will be used

Return type

None

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Read and write the reduced energy time values of a sensor node

>>> async def read_write_energy_mode_reduced(sleep, advertisement):
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0) as sensor_node:
...             await sensor_node.set_energy_mode_reduced(
...                 Times(sleep=sleep,
...                       advertisement=advertisement))
...             times = await sensor_node.get_energy_mode_reduced()
...
...             # Overwrite changed values with default config
...             # values
...             await sensor_node.set_energy_mode_reduced()
...
...             return times
>>> times = run(read_write_energy_mode_reduced(200_000, 2000))
>>> times.sleep
200000
>>> round(times.advertisement)
2000
async set_name(name)

Set the name of a sensor node

Parameters

name (str) – The new name for the node

Return type

None

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Change the name of a sensor node

>>> async def test_naming(name):
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         # and that this node currently does not have the name
...         # specified in the variable `name`.
...         async with stu.connect_sensor_node(0) as sensor_node:
...             before = await sensor_node.get_name()
...             await sensor_node.set_name(name)
...             updated = await sensor_node.get_name()
...             await sensor_node.set_name(before)
...             after = await sensor_node.get_name()
...             return before, updated, after
>>> before, updated, after = run(test_naming("Hello"))
>>> before != "Hello"
True
>>> updated
'Hello'
>>> before == after
True
async set_sensor_configuration(sensors)

Change the sensor numbers for the different measurement channels

If you use the sensor number 0 for one of the different measurement channels, then the sensor (number) for that channel will stay the same.

Parameters

sensors (SensorConfiguration) – The sensor numbers of the different measurement channels

Return type

None

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Setting sensor config from node without sensor config support fails

>>> async def set_sensor_config():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0) as sensor_node:
...             await sensor_node.set_sensor_configuration(
...                 SensorConfiguration(
...                     first=0, second=0, third=0))
>>> config = run(
...     set_sensor_config())
Traceback (most recent call last):
   ...
UnsupportedFeatureException: Writing sensor configuration is not
supported
async start_streaming_data(channels)

Start streaming data

Parameters

channels (StreamingConfiguration) – Specifies which of the three measurement channels should be enabled or disabled

Return type

None

The CAN identifier that this coroutine returns can be used to filter CAN messages that contain the expected streaming data

async stop_streaming_data(retries=10, ignore_errors=False)

Stop streaming data

Parameters
  • retries (int) – The number of times the message is sent again, if no response was sent back in a certain amount of time

  • ignore_errors – Specifies, if this coroutine should ignore, if there were any problems while stopping the stream.

Return type

None

class icotronic.can.STH(spu)

Communicate and control a connected SHA or STH

Parameters

spu (SPU) – The SPU object used to communicate with the node

async activate_acceleration_self_test(dimension='x')

Activate self test of STH accelerometer

Parameters

dimension (str) – The dimension (x, y or z) for which the self test should be activated.

Return type

None

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Activate and deactivate acceleration self-test

>>> async def test_self_test():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is
...         # available
...         async with stu.connect_sensor_node(0, STH) as sth:
...             await sth.activate_acceleration_self_test()
...             await sth.deactivate_acceleration_self_test()
>>> run(test_self_test())
async deactivate_acceleration_self_test(dimension='x')

Deactivate self test of STH accelerometer

Parameters

dimension (str) – The dimension (x, y or z) for which the self test should be deactivated.

Return type

None

async get_acceleration_conversion_function()

Retrieve function to convert raw sensor data into g

Return type

Callable[[float], float]

Returns

A function that converts 16 bit raw values from the STH into multiples of earth’s gravitation (g)

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection
>>> from icotronic.can.node.sth import STH

Convert a raw ADC value into multiples of g

>>> async def read_sensor_values():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0, STH) as sth:
...             convert_to_g = (await
...                 sth.get_acceleration_conversion_function())
...             data = await sth.get_streaming_data_single()
...             before = list(data.values)
...             data.apply(convert_to_g)
...             return before, data.values
>>> before, after = run(read_sensor_values())
>>> all([0 <= value <= 2**16 for value in before])
True
>>> all([-100 <= value <= 100 for value in after])
True
async get_acceleration_sensor_range_in_g()

Retrieve the maximum acceleration sensor range in multiples of g₀

  • For a ±100 g₀ sensor this method returns 200 (100 + abs(-100)).

  • For a ±50 g₀ sensor this method returns 100 (50 + abs(-50)).

For this to work correctly the EEPROM value of the x-axis acceleration offset in the EEPROM has to be set.

Return type

int

Returns

Range of current acceleration sensor in multiples of earth’s gravitation

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection
>>> from icotronic.can.node.sth import STH

Write and read the acceleration offset of STH 1

>>> async def read_sensor_range():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0, STH) as sth:
...             return (await
...                     sth.get_acceleration_sensor_range_in_g())
>>> sensor_range = run(read_sensor_range())
>>> 0 < sensor_range <= 200
True
async get_acceleration_voltage(dimension='x', reference_voltage=3.3)

Retrieve the current acceleration voltage in Volt

Parameters
  • dimension (str) – The dimension (x=1, y=2, z=3) for which the acceleration voltage should be measured

  • reference_voltage (float) – The reference voltage for the ADC in Volt

Return type

float

Returns

The voltage of the acceleration sensor in Volt

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Read the acceleration voltage of STH 1

>>> async def read_acceleration_voltage():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0, STH) as sth:
...             before = await sth.get_acceleration_voltage()
...             await sth.activate_acceleration_self_test()
...             between = await sth.get_acceleration_voltage()
...             await sth.deactivate_acceleration_self_test()
...             after = await sth.get_acceleration_voltage()
...         return (before, between, after)
>>> before, between, after = run(read_acceleration_voltage())
>>> before < between and after < between
True

Streaming

class icotronic.can.StreamingConfiguration(first=True, second=False, third=False)

Streaming configuration

Parameters
  • first (bool) – Specifies if the first channel is enabled or not

  • second (bool) – Specifies if the second channel is enabled or not

  • third (bool) – Specifies if the third channel is enabled or not

Raises

ValueError – if none of the channels is active

Examples

Create some example streaming configurations

>>> config = StreamingConfiguration()
>>> config = StreamingConfiguration(
...          first=False, second=True, third=True)

Creating streaming configurations without active channels will fail

>>> config = StreamingConfiguration(first=False)
Traceback (most recent call last):
   ...
ValueError: At least one channel needs to be active
>>> config = StreamingConfiguration(
...     first=False, second=False, third=False)
Traceback (most recent call last):
   ...
ValueError: At least one channel needs to be active
axes()

Get the activated axes returned by this streaming configuration

Return type

list[str]

Returns

A list containing all activated axes in alphabetical order

Examples

Get the activated axes for example streaming configurations

>>> StreamingConfiguration(
...     first=False, second=True, third=True).axes()
['y', 'z']
>>> StreamingConfiguration(
...     first=True, second=True, third=False).axes()
['x', 'y']
data_length()

Returns the streaming data length

This will be either:

  • 2 (when 2 channels are active), or

  • 3 (when 1 or 3 channels are active)

For more information, please take a look here.

Return type

int

Returns

The length of the streaming data resulting from this channel configuration

Examples

Get the data length of example streaming configurations

>>> StreamingConfiguration().data_length()
3
>>> StreamingConfiguration(
...     first=False, second=True, third=False).data_length()
3
>>> StreamingConfiguration(
...     first=True, second=True, third=True).data_length()
3
>>> StreamingConfiguration(
...     first=False, second=True, third=True).data_length()
2
enabled_channels()

Get the number of activated channels

Return type

int

Returns

The number of enabled channels

Examples

Get the number of enabled channels for example streaming configs

>>> StreamingConfiguration(first=True).enabled_channels()
1
>>> StreamingConfiguration(first=False, second=True, third=False
...                       ).enabled_channels()
1
>>> StreamingConfiguration(first=True, second=True, third=True
...                       ).enabled_channels()
3
property first: bool

Check the activation state of the first channel

Returns:

True, if the first channel is enabled or False otherwise

Examples

Check channel one activation status for example configs

>>> StreamingConfiguration(first=True, second=False,
...                        third=False).first
True
>>> StreamingConfiguration(first=False, second=False,
...                        third=True).first
False
property second: bool

Check the activation state of the second channel

Returns

True, if the second channel is enabled or False otherwise

Examples

Check channel two activation status for example configs

>>> StreamingConfiguration(
...     first=True, second=False, third=False).second
False
>>> StreamingConfiguration(
...     first=False, second=True, third=True).second
True
property third: bool

Check the activation state of the third channel

Returns:

True, if the third channel is enabled or False otherwise

Examples

Check channel three activation status for example configs

>>> StreamingConfiguration(
...     first=True, second=False, third=False).third
False
>>> StreamingConfiguration(
...     first=False, second=False, third=True).third
True
class icotronic.can.StreamingData(counter, timestamp, values)

Support for storing data of a streaming message

Parameters
  • counter (int) – The message counter value

  • timestamp (float) – The message timestamp

  • values (Sequence[float]) – The streaming values

Examples

Create new streaming data

>>> StreamingData(values=[1, 2, 3], counter=21, timestamp=1)
[1, 2, 3]@1 #21

Streaming data must store either two or three values

>>> StreamingData(values=[1], counter=21, timestamp=1)
Traceback (most recent call last):
...
ValueError: Incorrect number of streaming values: 1 (instead of 2 or 3)
>>> StreamingData(values=[1, 2, 3, 4], counter=21, timestamp=1
...              )
Traceback (most recent call last):
...
ValueError: Incorrect number of ... values: 4 (instead of 2 or 3)
apply(function)

Apply a certain function to the streaming data

Note

This function changes the stored values in the streaming data and (as convenience feature) also returns the modified streaming data itself. This is useful if you want to use the modified streaming as parameter in a function call, i.e. you can use something like function(stream_data.apply()).

Parameters

function (Callable[[float], float]) – The function that should be applied to the streaming data

Return type

StreamingData

Returns

The modified streaming data

Examples

Add the constant 10 to some example streaming data

>>> data = StreamingData(values=[1, 2, 3], counter=21, timestamp=1)
>>> data.apply(lambda value: value + 10)
[11, 12, 13]@1 #21
>>> data.values
[11, 12, 13]

Storage

class icotronic.measurement.Storage(filepath, channels=None)

Context manager class for storing measurement data in HDF5 format

Parameters
  • filepath (Path | str) – The filepath of the HDF5 file in which this object should store measurement data

  • channels (StreamingConfiguration | None) – All channels for which data should be collected or None, if the axes data should be taken from an existing valid file at filepath.

Examples

Create new file

>>> filepath = Path("test.hdf5")
>>> with Storage(filepath,
...              channels=StreamingConfiguration(first=True)
... ) as storage:
...     pass

Opening an existing file but still providing channel info should fail

>>> with Storage(filepath,
...             channels=StreamingConfiguration(first=True)
... ) as storage:
...     pass
Traceback (most recent call last):
    ...
ValueError: File “...” exist but channels parameter is not None
>>> filepath.unlink()
close()

Close the HDF file

Return type

None

open()

Open and initialize the HDF file for writing

Return type

StorageData

class icotronic.measurement.StorageData(file_handle, channels=None)

Store HDF acceleration data

Parameters
  • file_handle (File) – The HDF file that should store the data

  • channels (StreamingConfiguration | None) – All channels for which data should be collected or None, if the axes data should be taken from an existing valid file at filepath.

Examples

Create new data

>>> filepath = Path("test.hdf5")
>>> streaming_data = StreamingData(values=[1, 2, 3], counter=1,
...                                timestamp=4306978.449)
>>> with Storage(filepath, StreamingConfiguration(first=True)) as data:
...     data.add_streaming_data(streaming_data)

Read from existing file

>>> with Storage(filepath) as data:
...     print(data.dataloss_stats())
(1, 0)
>>> filepath.unlink()
add_streaming_data(streaming_data)

Add streaming data to the storage object

Parameters

streaming_data (StreamingData) – The streaming data that should be added to the storage

Return type

None

Examples

Import required library code

>>> from icotronic.can.streaming import StreamingConfigBits

Store streaming data for single channel

>>> channel3 = StreamingConfiguration(first=False, second=False,
...                                   third=True)
>>> data1 = StreamingData(values=[1, 2, 3], counter=21,
...                       timestamp=1)
>>> data2 = StreamingData(values=[4, 5, 6], counter=22,
...                       timestamp=2)
>>> filepath = Path("test.hdf5")
>>> with Storage(filepath, channel3) as storage:
...     storage.add_streaming_data(data1)
...     storage.add_streaming_data(data2)
...     # Normally the class takes care about when to store back
...     # data to the disk itself. We do a manual flush here to
...     # check the number of stored items.
...     storage.acceleration.flush()
...     print(storage.acceleration.nrows)
6
>>> filepath.unlink()

Store streaming data for three channels

>>> all = StreamingConfiguration(first=True, second=True,
...                              third=True)
>>> data1 = StreamingData(values=[1, 2, 3], counter=21,
...                       timestamp=1)
>>> data2 = StreamingData(values=[4, 5, 6], counter=22,
...                       timestamp=2)
>>> with Storage(filepath, all) as storage:
...     storage.add_streaming_data(data1)
...     storage.add_streaming_data(data2)
...     storage.acceleration.flush()
...     print(storage.acceleration.nrows)
2
>>> filepath.unlink()
dataloss()

Determine (minimum) data loss

Return type

float

Returns

Amount of lost messages divided by all messages (lost and retrieved)

Examples

Import required library code

>>> from math import isclose

Calculate data loss for an example storage file

>>> def calculate_dataloss():
...     filepath = Path("test.hdf5")
...     with Storage(filepath, StreamingConfiguration(
...                  first=True)) as storage:
...         for counter in range(256):
...             storage.add_streaming_data(
...                 StreamingData(values=[1, 2, 3],
...                               counter=counter,
...                               timestamp=counter/10))
...         for counter in range(128, 256):
...             storage.add_streaming_data(
...                 StreamingData(values=[4, 5, 6],
...                               counter=counter,
...                               timestamp=(255 + counter)/10))
...
...         dataloss = storage.dataloss()
...     filepath.unlink()
...     return dataloss
>>> isclose(0.25, calculate_dataloss(), rel_tol=0.005)
True
dataloss_stats()

Determine number of lost and received messages

Return type

tuple[int, int]

Returns

Tuple containing the number of received and the number of lost messages

Examples

Get data loss statistics for an example file

>>> def calculate_dataloss_stats():
...     filepath = Path("test.hdf5")
...     with Storage(filepath,StreamingConfiguration(
...                  first=True)) as storage:
...         for counter in range(256):
...             storage.add_streaming_data(
...                 StreamingData(values=[1, 2, 3],
...                               counter=counter,
...                               timestamp=counter/10))
...         for counter in range(128, 256):
...             storage.add_streaming_data(
...                 StreamingData(values=[4, 5, 6],
...                               counter=counter,
...                               timestamp=(255 + counter)/10))
...
...         stats = storage.dataloss_stats()
...     filepath.unlink()
...     return stats
>>> retrieved, lost = calculate_dataloss_stats()
>>> retrieved
384
>>> lost
128
sampling_frequency()

Calculate sampling frequency of measurement data

Return type

float

Returns

Sampling frequency (of a single data channel) in Hz

Examples

Import required library code

>>> from math import isclose

Calculate sampling frequency for an example file

>>> def calculate_sampling_frequency():
...     filepath = Path("test.hdf5")
...     with Storage(filepath, StreamingConfiguration(
...             first=True, second=True, third=True)) as storage:
...         storage.add_streaming_data(
...             StreamingData(values=[1, 2, 3],
...                           counter=1,
...                           timestamp=0))
...         storage.add_streaming_data(
...             StreamingData(values=[1, 2, 3],
...                           counter=2,
...                           timestamp=1))
...
...         sampling_frequency = storage.sampling_frequency()
...     filepath.unlink()
...     return sampling_frequency
>>> calculate_sampling_frequency()
2.0
store_acceleration_meta(name, value)

Add acceleration metadata

Parameters
  • name (str) – The name of the meta attribute

  • value (str) – The value of the meta attribute

Return type

None

Examples

Store some example acceleration data

>>> filepath = Path("test.hdf5")
>>> with Storage(filepath,
...              StreamingConfiguration(third=True)) as storage:
...     storage.store_acceleration_meta("something", "some value")
...     print(storage.acceleration.attrs["something"])
some value
>>> filepath.unlink()
write_sample_rate(adc_configuration)

Store the sample rate of the ADC

Parameters

adc_configuration (ADCConfiguration) – The current ADC configuration of the sensor node

Return type

None

Examples

Write and read sample rate metadata in example HDF file

>>> filepath = Path("test.hdf5")
>>> adc_configuration = ADCConfiguration(
...     set=True,
...     prescaler=2,
...     acquisition_time=16,
...     oversampling_rate=256)
>>> with Storage(filepath,
...              StreamingConfiguration(first=True)) as storage:
...     storage.write_sample_rate(adc_configuration)
...     sample_rate = storage.acceleration.attrs["Sample_Rate"]
...     print(sample_rate)
1724.14 Hz (Prescaler: 2, Acquisition Time: 16,
Oversampling Rate: 256)
>>> filepath.unlink()
write_sensor_range(sensor_range_in_g)

Add metadata about sensor range

This method assumes that sensor have a symmetric measurement range (e.g. a sensor with a range of 200 g measures from - 100 g up to + 100 g).

Parameters

sensor_range_in_g (float) – The measurement range of the sensor in multiples of g

Return type

None

Examples

Add sensor range metadata to example file

>>> filepath = Path("test.hdf5")
>>> with Storage(filepath,
...              StreamingConfiguration(third=True)) as storage:
...     storage.write_sensor_range(200)
...     print(storage.acceleration.attrs["Sensor_Range"])
± 100 g₀
>>> filepath.unlink()

ADC

class icotronic.can.adc.ADCConfiguration(*data, set=None, prescaler=None, acquisition_time=None, oversampling_rate=None, reference_voltage=None)

Support for reading and writing analog digital converter configuration

Parameters
  • *data (bytearray | list[int]) – A list containing the (first five) bytes of the ADC configuration

  • set (bool | None) – Specifies if we want to set or retrieve (get) the ADC configuration

  • prescaler (int | None) – The ADC prescaler value (1 – 127)

  • acquisition_time (int | None) – The acquisition time in number of cycles (1, 2, 3, 4, 8, 16, 32, … , 256)

  • oversampling_rate (int | None) – The ADC oversampling rate (1, 2, 4, 8, … , 4096)

  • reference_voltage (float | None) – The ADC reference voltage in Volt (1.25, 1.65, 1.8, 2.1, 2.2, 2.5, 2.7, 3.3, 5, 6.6)

Examples

Create simple ADC configuration from scratch

>>> ADCConfiguration(prescaler=2,
...     acquisition_time=4,
...     oversampling_rate=64)
Get, Prescaler: 2, Acquisition Time: 4, Oversampling Rate: 64,
Reference Voltage: 3.3 V
property acquisition_time: int

Get the acquisition time

Returns

The acquisition time

Examples

Get initialised acquisition time

>>> config = ADCConfiguration(acquisition_time=2)
>>> config.acquisition_time
2

Get acquisition time set via property

>>> config.acquisition_time = 128
>>> config.acquisition_time
128

Trying to set an unsupported acquisition time will fail

>>> config.acquisition_time = 5
Traceback (most recent call last):
   ...
ValueError: Acquisition time of “5” out of range, please use ...
property oversampling_rate: int

Get the oversampling rate

Returns

The oversampling rate

Examples

Get initialised oversampling rate

>>> config = ADCConfiguration(oversampling_rate=128)
>>> config.oversampling_rate
128

Get oversampling rate set via property

>>> config.oversampling_rate = 512
>>> config.oversampling_rate
512

Trying to set an unsupported oversampling rate will fail

>>> config.oversampling_rate = 3
Traceback (most recent call last):
   ...
ValueError: Oversampling rate of “3” out of range, please use ...
property prescaler: int

Get the prescaler value

Returns

The prescaler value

Examples

Get initialised prescaler

>>> config = ADCConfiguration(prescaler=127)
>>> config.prescaler
127

Get prescaler set via property

>>> config.prescaler = 20
>>> config.prescaler
20

Trying to set an unsupported prescaler will fail

>>> config.prescaler = 128
Traceback (most recent call last):
   ...
ValueError: Prescaler value of “128” out of range, please use ...
property reference_voltage: float

Get the reference voltage

Returns

The reference voltage in Volt

Examples

Set and get different reference voltage values

>>> config = ADCConfiguration(reference_voltage=3.3)
>>> config.reference_voltage
3.3
>>> config.reference_voltage = 6.6
>>> config.reference_voltage
6.6
>>> config = ADCConfiguration(reference_voltage=6.6)
>>> config.reference_voltage
6.6
>>> config.reference_voltage = 1.8
>>> config.reference_voltage
1.8
>>> config = ADCConfiguration(reference_voltage=1.8)
>>> config.reference_voltage
1.8

Trying to set a unsupported reference voltage will fail

>>> config.reference_voltage = 0
Traceback (most recent call last):
   ...
ValueError: Reference voltage of “0” V out of range, please use ...
sample_rate()

Calculate the sampling rate for the current ADC configuration

Return type

float

Returns

The calculated sample rate

Examples

Get sampling rates based on ADC attribute values

>>> round(ADCConfiguration(prescaler=2, acquisition_time=8,
...                        oversampling_rate=64).sample_rate())
9524
>>> round(ADCConfiguration(prescaler=8, acquisition_time=8,
...                        oversampling_rate=64).sample_rate())
3175
>>> round(ADCConfiguration(reference_voltage=5.0,
...                        prescaler=16,
...                        acquisition_time=8,
...                        oversampling_rate=128).sample_rate())
840