Telemetry Subscriber

Overview

The purpose of the Telemetry Subscriber is to receive samples published from one or more Telemetry Republishers for various DDS Topics, correlate these samples; and then write the correlated samples into a shared memory queue for subsequent consumption by Data Tasks on the same node. Samples from the various DDS topics are correlated based on their sample ID, i.e. DDS samples with the same samples ID are matched together. The payloads for the matched samples are extracted and combined into a single user defined data structure using a user supplied function, called the blender function. Each entry in the shared memory queue therefore contains data sampled at the same loop cycle.

Since the topic data structure for the shared memory queue must be provided by the RTC Toolkit user, it is not possible to have a fully working pre-compiled and linked executable for the Telemetry Subscriber. The RTC Toolkit actually provides a template version of the Telemetry Subscriber in a library. A working Telemetry Subscriber must be instantiated using the user provided topic structure, and compiled and linked against the librtctkTelemetrySubscriber.so library.

The idea is to provide an almost ready application that is a SRTC component and delivers all the boilerplate code for reading from DDS and correlating samples. The user then only needs to concentrate on defining the shared memory topic structure and a blender function that constructs the topic from correlated DDS samples.

What is described in these sections is how such a Telemetry Subscriber needs to be instantiated into an actual working application, how the application can be configured and how it is expected to behave.

Prerequisites

The Telemetry Subscriber library has the following external dependencies:

  • RTI Connext DDS - Provides the communication services to receive data from a Telemetry Republisher.

  • ipcq - Provides the shared memory queue into which correlated DDS samples are written for the downstream Data Tasks.

  • NUMA++ - This utility library is used to optionally control the NUMA node where shared memory is allocated, thread scheduling and pinning policies. This allows runtime performance optimisations.

If the RTC Toolkit has been successfully built with the Waf build system, then the above dependencies would have already been fulfilled.

It is assumed that the installation of the RTC Toolkit is performed into the INTROOT directory and that the environment variable INTROOT is defined (see section Installation). If this is not the case, any references to INTROOT in the following sections needs to be adjusted appropriately.

Although a Telemetry Subscriber is able to start with default settings for RTI Connext DDS, to properly use any Telemetry Subscriber instance together with a Telemetry Republisher, one should use the dedicated RTI Connect XML configuration file supplied by the RTC Toolkit. This involves pointing RTI Connext to the correct XML file by exporting the NDDS_QOS_PROFILES environment variable with a file URI to the file provided by the RTC Toolkit, as follows:

export NDDS_QOS_PROFILES=file://$INTROOT/resource/config/rtctk/RTCTK_DEFAULT_QOS_PROFILES.xml

In addition, the Telemetry Subscriber configuration must be prepared so that it uses the correct profile that is defined in the XML file. These configuration settings for dds_qos_library and dds_qos_profile are indicated in the Configuration section below.

Customisation

The only customisation currently available for a Telemetry Subscriber instance is to specify the shared memory topic type and provide a blender function that will appropriately construct the topic from correlated input DDS samples. It is in fact mandatory to instantiate a Telemetry Subscriber instance, with the user supplied topic and blender, to have a working Telemetry Subscriber. It is also the user’s responsibility to ensure that the topic is the appropriate one, as expected by downstream Data Tasks, and that the blender function assembles the topic in shared memory from the individual DDS samples correctly.

Shared Memory Queue Topic

A user has almost full freedom to define the topic structure. It is primarily driven by the input data format requirements of the Data Tasks. Though it is suggested to at least include the sample ID so that this information is propagated downstream.

A simple example for a topic structure could be as follows:

struct MyTopic {
    uint64_t sample_id;
    float vector_data[1024];
};

Note

The topic structure must be a continuous flat structure in memory. It cannot have pointers to other memory blocks. This will not be readable in the Data Tasks and most likely cause segmentation violations or corruption. Any complex structure with pointers to other memory regions will have to be serialised into a flat topic structure.

Ideally this should be declared in a header file that is shared with the corresponding Data Task that will process the topic’s data.

The Blender Function

A function used to construct the user defined topic in shared memory must be provided with the following signature:

// Signature as a normal function:
std::error_code Blender(const rtctk::telSub::CorrelatedDataSamplesRef& dds_samples,
                        MyTopic& shm_sample) noexcept {
    // ... user code goes here ...
}

// Signature as a lambda function:
auto blender = [](const rtctk::telSub::CorrelatedDataSamplesRef& dds_samples,
                  MyTopic& shm_sample) noexcept -> std::error_code {
    // ... user code goes here ...
}

Two alternative declarations are shown above, the first using the typical function declaration syntax and the second showing the blender function declared as a lambda. Choosing one style over the other is at the user’s discretion.

The first input argument must be the list of correlated DDS samples. Refer to the API reference documentation for details about the structure of CorrelatedDataSamplesRef.

The second output argument must be the topic structure previously declared by the user. This is a reference to the region is shared memory that the data should be written to.

The return type of the blender function must be an error code. When the user topic structure has been successfully constructed it must return a zero or empty error code, indicating success, e.g. return {};. In the case that the topic could not be constructed an appropriate error code should be returned. The suggested error code to use at the moment is std::errc::bad_message, to indicate there is something wrong with the format of the DDS samples.

Note

The blender function must not throw any exceptions, which will cause the processing thread to terminate and send the Telemetry Subscriber component to error. All error conditions must be indicated by returning an appropriate error code. If any code is executed that could throw an exception, this must be caught in a try..catch block and converted to an error code instead.

The code for the body of the blender function must be provided by the user, since it will depend on the input DDS sample topics and the user provided shared memory topic structure. Typically this only involves copying the data from the DDS sample buffers to the correct location within the shared memory. In the most trivial case this can simply be a call to std::memcpy.

Note

Since the blender function is executed in the DDS reading thread, which is time critical, minimal amount of work should be performed in the blender function. Only basic sanity checks should be applied to the input data and the minimal code to copy the data into the shared memory topic should be provided. Any complex computation should be done in a Data Task.

Instantiation

To actually instantiate a Telemetry Subscriber with a specific user define topic one needs to prepare a new application. We assume that the Waf/wtools build system is being used. Therefore, the minimal wscript build configuration file to instantiate an application called myTelSub would be similar to the following:

from wtools.module import declare_cprogram

declare_cprogram(
    target="myTelSub",
    use="reusableComponents.telSub.library"
)

As can be seen, it only needs to depend on reusableComponents.telSub.library. If the user defined shared memory topic is defined in a separate module, then this additional module would also have to be added to the use argument.

The entry-point declaration for this example myTelSub application, i.e. the contents of the main.cpp file, should look similar to the following:

#include <rtctk/telSub/main.hpp>
#include "myTopic.hpp"

void RtcComponentMain(rtctk::componentFramework::Args const& args) {
    auto blender = [](const rtctk::telSub::CorrelatedDataSamplesRef& dds_samples,
                      MyTopic& shm_sample) noexcept -> std::error_code {
        // ... user code for the blender goes here ...
        return {};
    };
    rtctk::telSub::Main<MyTopic>(args, std::move(blender));
}

Since any Telemetry Subscriber is an SRTC component, we must declare the entry point with RtcComponentMain, rather than with just int main(int argc, const char* argv[]). We also rely on the template function rtctk::telSub::Main to perform the setup specific to a Telemetry Subscriber and enter the processing loop.

Refer to the Customise a Telemetry Subscriber tutorial to see a complete working example of a Telemetry Subscriber instantiation.

Configuration

All Telemetry Subscriber components will accept the configuration parameters described in this section. These are all read from the runtime repository during initialisation when the Init command is received.

Note

Currently the runtime repository is implemented as a simple YAML file based repository. In the future, this will be replaced with a proper distributed system. For the time being, we show examples of the YAML file contents as currently implemented.

The configuration parameters are divided into groups as indicated in the sections below. If a parameter’s configuration path is indicated with bold font in the tables below it is a mandatory parameter. Otherwise it should be treated as an optional parameter.

DDS Parameters

Configuration Path

Type

Description

static/dds_domain_id

RtcInt32

RTI DDS domain identifier to subscribe to.

static/dds_qos_library

RtcString

The name of the profile library to use from the RTI QoS configuration XML file. This should normally be set to RtcTk_Default_Library. If an empty string is given then the default library for RTI QoS parameters is used.

static/dds_qos_profile

RtcString

The name of the profile to use from the RTI QoS configuration XML file. This should normally be set to RtcTk_Default_Profile. If an empty string is given then the default RTI QoS parameters are used.

static/dds_topics

RtcVectorString

This is a list of strings indicating the names of the DDS topics to subscribe to.

Note

The order of the topic names declared in dds_topics matters. The Telemetry Subscriber will present the correlated DDS samples to the blender function in the same order as is encoded in dds_topics. Specifically, the dds_samples argument to the blender function will be filled such that, dds_samples.samples[0] will correspond to data from the first DDS topic in the dds_topics list, dds_samples.samples[1] will correspond to data from the second DDS topic in the dds_topics list, and so on.

The following is an example of the above configuration parameters in a YAML file:

static:
    dds_domain_id:
        type: RtcInt32
        value: 123
    dds_qos_library:
        type: RtcString
        value: RtcTk_Default_Library
    dds_qos_profile:
        type: RtcString
        value: RtcTk_Default_Profile
    dds_topics:
        type: RtcVectorString
        value:
            - SlopesTopic
            - IntensitiesTopic
            - CommandsTopic

Shared Memory Queue Parameters

Configuration Path

Type

Description

static/shm_topic_name

RtcString

The name of the shared memory queue, i.e. this corresponds to the file name /dev/shm/ipcq-<topic-name>.

static/shm_capacity

RtcInt64

Maximum number of samples that can be stored in the shared memory queue.

static/shm_memory_policy_mode

RtcString

This is the memory allocation policy to apply to the shared memory. It can be one of the values indicated in Table 1.

static/shm_memory_policy_nodes

RtcString

Indicates a mask of NUMA nodes to which the memory policy is applied. See the numa_parse_nodestring function in the numa(3) manpage for details about the correct format of this string.

Table 1 Memory policy mode options, see the set_mempolicy(2) manpage for a detailed description of the options.

Value

System API Equivalent

Default

MPOL_DEFAULT

Bind

MPOL_BIND

Interleave

MPOL_INTERLEAVE

Preferred

MPOL_PREFERRED

The following is an example of the above configuration parameters in a YAML file:

static:
    shm_topic_name:
        type: RtcString
        value: mytopic
    shm_capacity:
        type: RtcInt64
        value: 1024
    shm_memory_policy_mode:
        type: RtcString
        value: Bind
    shm_memory_policy_nodes:
        type: RtcString
        value: "1,2"

Operational Logic Parameters

Configuration Path

Type

Description

static/close_detach_delay

RtcInt32

Indicates the delay in milliseconds before receiving the signal to stop writing to the shared memory queue and closing, i.e. destroying, the shared memory writer. This provides an opportunity for the shared memory queue readers to detach gracefully.

static/correlator_poll_timeout

RtcInt32

The time in milliseconds to wait for receiving and correlating DDS input samples, before considering the operation to have timed out. If a timeout occurs, the error will be indicated, but the Telemetry Subscriber component will not enter the error state.

The following is an example of the above configuration parameters in a YAML file:

static:
    close_detach_delay:
        type: RtcInt32
        value: 1000
    correlator_poll_timeout:
        type: RtcInt32
        value: 200

Thread specific scheduling and memory policies are also configurable for the processing thread and the monitoring thread. The parameters are indicated in the following table, where one should replace the <thread> tag in the configuration path with one of processing_thread_policies or monitoring_thread_policies.

Configuration Path

Type

Description

static/<thread>/cpu_affinity:

RtcString

Sets the CPU affinity for the thread. This is a mask of CPU’s on which the thread is allowed to be scheduled. See the numa_parse_cpustring function in the numa(3) manpage for details about the correct format of this string.

static/<thread>/scheduler_policy:

RtcString

Indicates the thread scheduling policy to apply. Valid values are indicated in Table 2.

static/<thread>/scheduler_priority

RtcInt32

Indicates the priority of the thread as a number in the range [1..99]. If this parameter is set, then scheduler_policy must be set to one of Fifo or Rr.

static/<thread>/memory_policy_mode

RtcString

This is the memory allocation policy to apply to the thread and must be provided together with memory_policy_nodes. Valid values are indicated in Table 1. This parameter is only applicable if scheduler_policy is set to Fifo or Rr.

static/<thread>/memory_policy_nodes

RtcString

Indicates a mask of NUMA nodes to which the memory policy is applied. See the numa_parse_nodestring function in the numa(3) manpage for details about the correct format of this string. This parameter must be provided together with memory_policy_mode, but is only applicable if scheduler_policy is set to Fifo or Rr.

Table 2 Scheduling policy options, see the sched_setscheduler(2) manpage for a detailed description of the options.

Value

System API Equivalent

Other

SCHED_OTHER

Batch

SCHED_BATCH

Fifo

SCHED_FIFO

Rr

SCHED_RR

The following is an example of the configuration parameters in a YAML file that are applicable to the processing thread (for the monitoring thread example, one would simply replace processing_thread_policies with monitoring_thread_policies):

static:
    processing_thread_policies:
        cpu_affinity:
            type: RtcString
            value: "1-4"
        scheduler_policy:
            type: RtcString
            value: Rr
        scheduler_priority:
            type: RtcInt32
            value: 10
        memory_policy_mode:
            type: RtcString
            value: Bind
        memory_policy_nodes:
            type: RtcString
            value: "1-4"

Commands

All Telemetry Subscriber instances can be steered by sending commands to them via their MAL interface. One can either rely on the RTC Supervisor to send the appropriate commands or use the rtctkClient command line application to send individual commands manually.

The set of commands that is currently available for a Telemetry Subscriber is indicated in the following table, together with the expected behaviour:

Command

Behaviour

Init

Constructs the necessary RTI DDS object to read DDS input samples; constructs the shared memory writer; and spawns the low level processing and monitoring threads.

Stop

This will stop the initialisation procedure started by the Init command.

Reset

Resets the state of the component to the state before the Init command was received. This means that all RTI DDS object are destroyed, shared memory is released, and the processing and monitoring threads are also destroyed.

Enable

Enables reading of the DDS samples. Any read errors and timeouts are ignored.

Disable

Disables reading of the DDS samples. Any read errors and timeouts are ignored.

Run

Turns on writing to the shared memory queue. Any errors and timeouts are marked as errors. However, the component will not go into the error state.

Idle

Turns off writing to the shared memory queue. Any errors and timeouts are again ignored.

Errors

The main errors that can currently occur during component initialisation when it receives the Init command are due to missing mandatory configuration parameters. The following is an example of the log message for such an error when dds_topics is missing:

[13:52:21:730][ERROR][tel_sub_1] Failed to load configuration from the Runtime Repository: Path '/tel_sub_1/static/dds_topics' does not exist.
[13:52:21:730][ERROR][tel_sub_1] Activity.Initialising: failed, exception: Path '/tel_sub_1/static/dds_topics' does not exist.

For cases where the wrong data type was used in a type field in the YAML file, the following example error message would be seen in the logs:

[00:02:49:597][ERROR][tel_sub_1] Activity.Initialising: failed, exception: Wrong type used to read data point.

If the YAML file format is not correct, e.g. a value field has the wrong format, the following example error message would be seen in the logs:

[00:03:54:061][ERROR][tel_sub_1] Activity.Initialising: failed, exception: File '/home/eltdev/test_install/run/exampleTelSub/runtime_repo/tel_sub_1.yaml' has an invalid data format.

While the Telemetry Subscriber is running it is possible that timeout errors occur. This may indicate that the DDS publisher is no longer working, or the timeout threshold is too low for the sample rate and needs to be adjusted. In such cases, the following error messages will be seen in the logs:

[13:49:59:480][ERROR][tel_sub_1] Detected errors in operational logic. [Last error code = 110: Connection timed out. Total number of errors = 1]

For any other failures during reading of the DDS samples or when writing to the shared memory queue, messages about “Detected errors in operational logic” will be logged with the error code received from the sub-system, similar to the following:

[13:48:44:448][ERROR][tel_sub_1] Detected errors in operational logic. [Last error code = 74: Bad message. Total number of errors = 1]

Limitations and Known Issues

The current alpha implementation of the Telemetry Subscriber has a rather crude sample correlator that only deals with the most common cases. It is possible that it will fail to work correctly under heavy load. A robust correlator that deals with edge cases is planned for the next RTC Toolkit version.

Very limited monitoring is currently available for the Telemetry Subscriber. It does not publish any runtime statistics to the OLDB.