Creating a Custom Data Task

This example shows how the exampleDataTask is built and configured and to give users a starting point for creating their own Data Tasks Data Task.

Data Tasks are not final components, they require to be customised by the user. In the current data task implementation the RTC Tk provides a state machine and an command interface. The data task provides an interface to the shared memory queue via the ReaderThread. The rest is up to the user to define.

The idea is to let users provide a custom Business Logic class that defines the behaviour for the different stages of the component’s life-cycle (i.e. it implements the activity methods). They are also to provide the data task specific algorithm, the current proposal is to confine the interface to this via a computation class Computation Class.

Provided Example

This customisation of the data task will be discussed in reference to the provided data task example. The main aim of this example is not to give a fully fledge Data Task that can be used as is, but to show how the pieces fit together.

The example shows how to configure the BusinessLogic and interface with both the ReaderThread to process the telemetry data and perform a very simple calculation. The Computation class provides a callback to copy the wavefront sensor slope vector to a local MatrixBuffer (See Runtime Configuration Repository for more information regarding the MatrixBuffer). The MatrixBuffer of slopes vector is averaged to calculate a avergae slope vector. This average slope vector is then projected to an average modes vector, using a matrix vector multiplication against a slopes2modes matrix.

Source Code Location

The example source code can be found in the following sub-directory of the rtctk project:

_examples/exampleDataTask

Modules

The provided example is composed into the following waf modules:

  • app - The data task application including default configuration

  • gpu - Provides a GPU data task example (optional)

  • python - Provides python scripts that support testing

  • scripts - Helper scripts for deployment and control

  • shmPub - Implements an example shmPublisher. (See Shared Memory Publisher)

Dependencies

The provided example code depends on the following modules:

  • Component Framework - for RTC Component functionality with Services

  • Client Application - to steer the application

Running the Example Data Task

The exampleDataTask can be run in isolation or as part of a SRTC system. The only requirement for the data task to be brought to Operational is that a shared memory queue writer has been created. This can be either part of the Telemetry Subscriber or as a Shared Memory Publisher. In this section the method will use a shmPublisher to bring the Data Task to Operational.

To make the example as simple as possible a script rtctkExampleDataTask.sh is provided to help bring the data task and supporting infrastructure online.

rtctkExampleDataTask.sh

After installing the RTC Tk run the example using the following sequence of commands:

# Deploy and start the example applications
rtctkExampleDataTask.sh deploy
rtctkExampleDataTask.sh start

# Use the client to step through the life-cycle of the respective component instance
rtctkExampleDataTask.sh send Init
rtctkExampleDataTask.sh send Enable
rtctkExampleDataTask.sh send Run
rtctkExampleDataTask.sh send Idle
rtctkExampleDataTask.sh send Disable
rtctkExampleDataTask.sh send Reset

# Gracefully terminate the applcations and clean-up
rtctkExampleDataTask.sh stop
rtctkExampleDataTask.sh undeploy

During the deploy action a python script is invoked to produce the required FITS input files see section Generating Input Data.

Generating Input Data

As we do not want to supply large FITS files within our git repository, the decision was made to provide a python script that can be used to generate the required data. The python script to generate the data required by the exampleDataTask is found in the following location:

_example/exampleDataTask/python/genFitsData

It provides a script called with three arguments as follows:

rtctkExampleDataTaskGenFitsData <num_slopes> <num_modes> <filepath>

This will create an identity slopes2modes matrix that is required by the data task. This is by default called by the script “rtctkExampleDataTask.sh” during the deploy action.

Development Guide

This section explains how to create a simple Data Task from scratch. see Data Task for more information about specifics, in this section the development of a custom data task is covered. The use of the exampleDataTask will be the focus.

Business Logic Class

The BusinessLogic class adds a point to customise the state and state transitions.

for more infomation see RTC Component

Computation Class

As stated before this is the place where the greatest effort would be put during customising. In the exampleDataTask a very simple calculation is provided to show how this can be done. The computation selected was a projection from slopes to modes, this is done via a Matrix Vector Multiplication.

The following interface are provided to be invoked by the BusinessLogic.

void StaticLoad()
void DynamicLoad()
void OnDataAvailable(TopicType const& sample)
void Reset()
void Compute()
bool isComputing(){return m_computation_running;}

At the current time no specific interface is enforced between the BusinessLogic and the Computation class. In the future this may change and add functionality.

void StaticLoad()

Loads static configuration data from runtime_repo. This is only called once and is done so as part of the constructor.

void DynamicLoad()

Loads dynamic configuration data from runtime_repo. This is called during Updating.

void OnDataAvailable()

Callback used by the ReaderThread to process each shared memory data point. This copies data to a buffer local to the

void Reset()

resets the buffers back to zero. When called the buffer indexing is reset to zero and any required buffers are zeroed out. This is called when the data task received the Run command and invokes the GoingRunning state transition. This puts the computation data into a state where it is safe to start receiving data.

void Compute()

The Compute function is when the bulk of the data task algorithm resides. This invokes this Data Task algorithm and uses the BusinessLogic Running thread.

bool isComputing()

A simple interface for the BusinessLogic to call to check if the computation is running. This is designed to be called only during the Update command to see if the data task is in a state to update. This means that the update command cannot be called while the Compute() is computing.

OpenBLAS

Originally the matrix vector multiplication was done using openBLAS to provide the calculation. Unfortunately, this has been modified to use a single threaded implementation with no dependencies on third-party tools. This was done due to openBLAS not being provided by ELT dev env on Centos 7. We did not want to force any new outside dependencies for an example component. The original API can still be seen in the code so it would be trivial to revert on CentOS 8.

GPU

See GPU support for information about GPU support.

ReaderThread Class

Template arguments

The ReaderThread is mainly configurable via configuration, The only compile-time configuration is the template arguments which are the definition of the telemetry data topic (see Shared Memory Queue Topic) and the reader type used. The default used in this example is using ReaderType = ipcq::Reader<TopicType>;. See the roadrunner documentation for more details.

Note

The ReaderThread and the writer it is attached to (assuming the writer is part of the Telemetry Sub, or the shmPub) require to use the same coditional policy. If using the default Reader/writer this is handled by default. See roadrunner documentaion for more details. using ReaderType = ipcq::Reader<TopicType>;

using WriterType = ipcq::Writer<TopicType>;

Callbacks

Aside from the configuration of the ReaderThread via a series of setters (see ReaderThread configuration), two callback maybe provided by the user. The first is required for processing the data in the shared memory queue. The second callback is optional giving the user the option to initialise some things on the ReaderThread that may be beneficial during the data copy.

The callback to process data is registered by with the function. void RegisterOnDataCallback(std::function<void(const TopicType& sample)> callback)

In the example this is called during Initialising with the following

RegisterOnDataCallback( [this] (const TopicType& sample) {m_computation->OnDataAvailable(sample);});

This Registers the callback from the Computation class shown below. Where it copies the wavefront sensor slope vector into a local MatrixBuffer local to the Computation Class.

void OnDataAvailable(TopicType const& sample)
{
   m_sample_id = sample.sample_id;
   std::copy(sample.wfs.slopes.begin(), sample.wfs.slopes.end(), m_buffer_matrix.begin() + m_callback_counter*m_n_slopes);
   m_callback_counter++;
}

An additional optional callback can be provided that is invoked once after the ReaderThread is created. This callback is registered at the same point in Initalising with the function void RegisterInitThreadCallback(std::function<void()> callback).

The aim of this callback is to give the user a point where they can initialise some code that maybe beneficial during the data copy. This optional callback is not used in the exampleDataTask, though this has been used in the GPU example (See GPU support). In this GPU example the callback allows GPU specific initialisation such as setting which GPU is to be used by this thread. This could be done in each data processing callback but only requires a single call by the thread so can be done in this callback.

Multiple readers

While not explicitly shown it should be possible configure a data task that uses multiple ReaderThreads to read telemetry data from multiple loops. A ReaderThread would be require per loops data is required to be read from. This was one of the driving forces for the development to implement the callback based API for interacting with the shared memory queue.

This should be fairly simple to do in the BusinessLogic, the Running state would become more complex as it would have to check every ReaderThread has finished reading before launching the calculation. Separate callbacks from the Computation class would be required.

Configuration

ReaderThread

The configuration of the ReaderThread has been discussed previously. See ReaderThread configuration for the configuration of the ReaderThread.

Computation class

In the computation class, it would be possible to use the setter method used by the ReaderThread. Since the Computation class is user specified we decided to read directly from the runtime repo. This is done in our example from the StaticLoad() and DynamicLoad() functions. The data task also has a series of output configuration that might be used by other components.

These are listed below.

Input:

cfg attribute

Type

Description

slopes

RtcInt32

Number of slopes for the wavefront sensor

modes

RtcInt32

Number of modes to project onto

iteration

RtcInt32

Number of frames to process

s2m

RtcFloatMatrix

Slopes to modes projecttion matrix

Output:

cfg attribute

Type

Description

avg_slopes

RtcFloatVector

Average slope vector (length=number of slopes)

avg_mode

RtcFloatVector

Averge modes (length=number of modes)

As with the ReaderThread configuration (see ReaderThread configuration) data points are spread between the two yaml files common.yaml and data_task_1.yaml. In the exampleDataTask these yaml look as follows:

common.yaml

static:
    wfs_1:
        subapertures:
            type: RtcInt32
            value: 4616
        slopes:
            type: RtcInt32
            value: 9232
dynamic:
    wfs_1:
        avg_slopes:
            type: RtcVectorFloat
            value: file:$REPO_DIR/runtime_repo/data_task_1.dynamic.wfs_1.avg_slopes.fits
        avg_modes:
            type: RtcVectorFloat
            value: file:$REPO_DIR/runtime_repo/data_task_1.dynamic.wfs_1.avg_modes.fits

and data_task.yaml

static:
    common:
        samples_to_read:
            type: RtcInt32
            value: 5
        gpu:
            type: RtcInt32
            value: 0
    computation:
        modes:
            type: RtcInt32
            value: 50
dynamic:
    computation:
        slopes2modes:
            type: RtcMatrixFloat
            value: file:$REPO_DIR/runtime_repo/data_task_1.dynamic.computation.slopes2modes.fits

The data point static/computation/gpu is provided. This is used in the GPU example, not here. This was done as both the example here and the GPU example provide the same calculation and work on the same configuration data. See GPU support for more information.

OLDB

cfg attribute

Type

Description

iteration

RtcInt32

Number of cycles of the calculation

time

RtcFloat

How long the last calculation took

last_sample_id

RtcInt32

The final sample id used in the previous calculation.

This creates some statistics in the OLDB to monitor the data task.

statistics:
   computation:
      iteration:
         type: RtcInt32
         value: 0
      time:
         type: RtcFloat
         value: 0
      statistics:
         type: RtcInt32
         last_sample_id: 0

GPU Support

See GPU support for information about GPU support.

Python Support

See Python support for more information about python interfaces.