RTC Toolkit  0.1.0-alpha
computation.hpp
Go to the documentation of this file.
1 #ifndef RTCTK_EXAMPLEDATATASK_COMPUTATION_HPP
2 #define RTCTK_EXAMPLEDATATASK_COMPUTATION_HPP
3 
8 
9 #include <string>
10 #include <vector>
11 #include <numeric>
12 #include <chrono>
13 // #include <cblas.h>
14 
15 using namespace rtctk::componentFramework;
18 
19 namespace rtctk::exampleDataTask {
20 
21 template<class TopicType>
23 {
24  public:
26  : m_rtr(rtr)
27  , m_oldb(oldb)
28  , m_name(name)
29  , m_dp_slopes("/common/static/wfs_1/slopes")
30  , m_dp_modes("/" + m_name + "/" + "static/computation/modes")
31  , m_dp_iterations("/" + m_name + "/" + "static/common/samples_to_read")
32  , m_dp_s2m("/" + m_name + "/" + "dynamic/computation/slopes2modes")
33  , m_dp_avg_slopes("/common/dynamic/wfs_1/avg_slopes")
34  , m_dp_avg_modes("/common/dynamic/wfs_1/avg_modes")
35  , m_dp_stats_iteration("/" + m_name + "/" + "statistics/computation/iteration")
36  , m_dp_stats_time("/" + m_name + "/" + "statistics/computation/time")
37  , m_dp_stats_last_id("/" + m_name + "/" + "statistics/computation/last_sample_id")
38  {
40  m_sample_id = 0;
41  m_callback_counter = 0;
42  m_iteration = 0;
43 
44  // create some oldb points
45  m_oldb.CreateDataPoint(m_dp_stats_iteration, "RtcInt32");
46  m_oldb.CreateDataPoint(m_dp_stats_time, "RtcDouble");
47  m_oldb.CreateDataPoint(m_dp_stats_last_id, "RtcInt32");
48  m_oldb.SetDataPoint<int32_t>(m_dp_stats_iteration, 0);
49  m_oldb.SetDataPoint<double>(m_dp_stats_time, 0.0);
50  m_oldb.SetDataPoint<int32_t>(m_dp_stats_last_id, 0);
51  // load static config for the first and only time after this onlt DynamicLoad can be called.
52  StaticLoad();
53  }
54 
55  void StaticLoad()
56  {
57  // read anything that is unlikely to change.
58  m_n_slopes = m_rtr.GetDataPoint<int32_t>(m_dp_slopes);
59  m_n_modes = m_rtr.GetDataPoint<int32_t>(m_dp_modes);
60  m_n_iterations = m_rtr.GetDataPoint<int32_t>(m_dp_iterations);
61 
62  // set vectors and matrix to correct size.
63  m_buffer_matrix.resize(m_n_iterations, m_n_slopes);
64  m_avg_slopes.resize(m_n_slopes);
65  m_s2m_matrix.resize(m_n_modes, m_n_slopes);
66  m_avg_modes.resize(m_n_modes);
67 
68  // fill with zeros where needed.
69  std::fill (m_avg_slopes.begin(), m_avg_slopes.end(), 0.0);
70  std::fill (m_avg_modes.begin(), m_avg_modes.end(), 0.0);
71 
72  // some simple debug output
73  LOG4CPLUS_DEBUG(GetLogger(), "m_n_slopes: " << m_n_slopes);
74  LOG4CPLUS_DEBUG(GetLogger(), "m_n_modes: " << m_n_modes);
75  LOG4CPLUS_DEBUG(GetLogger(), "m_n_iterations: " << m_n_iterations);
76 
77  // cause the DynamicLoad
78  DynamicLoad();
79  }
80 
81  void DynamicLoad()
82  {
83  // read things that will change
84  m_s2m_matrix = m_rtr.GetDataPoint<MatrixBuffer<float>>(m_dp_s2m);
85  if(m_n_modes != (int) m_s2m_matrix.get_nrows() || m_n_slopes != (int) m_s2m_matrix.get_ncols())
86  {
87  LOG4CPLUS_INFO(GetLogger(), "Computation::DynamicLoad() - s2m wrong shape ");
88  LOG4CPLUS_INFO(GetLogger(), "Computation::DynamicLoad() - Expected: " << m_n_modes << " x " << m_n_slopes);
89  LOG4CPLUS_INFO(GetLogger(), "Computation::DynamicLoad() - recieved: " << m_s2m_matrix.get_nrows() << " x " << m_s2m_matrix.get_ncols());
90  throw std::runtime_error("s2m wrong shape");
91  }
92  assert((int)m_s2m_matrix.get_nrows() == m_n_modes);
93  assert((int)m_s2m_matrix.get_ncols() == m_n_slopes);
94  }
95 
96  void OnDataAvailable(TopicType const& sample) // TODO: should this be nothrow?
97  {
98  m_sample_id = sample.sample_id;
99  // copy vector into matrix position
100  std::copy(sample.wfs.slopes.begin(), sample.wfs.slopes.end(), m_buffer_matrix.begin() + m_callback_counter*m_n_slopes);
101  m_callback_counter++;
102  }
103 
104  void Reset()
105  {
106  // This is invoked going to idle to clean up incase of stopped during buffering of data.
107  m_iteration = 0;
108  ResetComputation();
109  }
110 
111  void Compute()
112  {
114  m_computation_running = true;
115  LOG4CPLUS_INFO(GetLogger(), "Computation::Compute() - samples: " << m_callback_counter);
116  m_iteration++;
117  // perform computation on the data buffers
118  auto time_start = std::chrono::system_clock::now();
119  // average matrix to avg_array this is overly simplistic and not optimised
120  for(int i = 0 ; i < m_n_iterations; i++)
121  {
122  for(int j = 0; j < m_n_slopes; j++)
123  {
124  m_avg_slopes[j] += m_buffer_matrix(i,j);
125  }
126  }
127 
128  for(int j = 0; j < m_n_slopes; j++)
129  {
130  m_avg_slopes[j] = m_avg_slopes[j]/static_cast<float>(m_n_iterations);
131  }
132 
133  // multiply avg_array with S2M matrix to project the average slopes onto modes.
134  //TODO: openblas not available on Centos 7 and no point in making it a dependancy.
135  // for this example I have added a very basic matrix vector multiplication of my own
136  // It is single threaded and not optimised. It is here only for demonstration purposes.
137  // cblas_sgemv(CblasRowMajor, CblasNoTrans, m_n_modes, m_n_slopes, 1.0, m_s2m_matrix.data(), m_n_slopes, m_avg_slopes.data(), 1, 0.0, m_avg_modes.data(), 1);
138  my_sgemv(m_s2m_matrix.data(), m_avg_slopes.data(), m_avg_modes.data(), m_n_modes, m_n_slopes);
139  auto time_stop = std::chrono::system_clock::now();
140  std::chrono::duration<double> elapsed = time_stop - time_start;
141 
142  // write output back to runtime repo
143  m_rtr.SetDataPoint<std::vector<float>>(m_dp_avg_slopes, m_avg_slopes);
144  m_rtr.SetDataPoint<std::vector<float>>(m_dp_avg_modes, m_avg_modes);
145 
146  // write statistics:
147  m_oldb.SetDataPoint<int32_t>(m_dp_stats_iteration, m_iteration);
148  m_oldb.SetDataPoint<double>(m_dp_stats_time, elapsed.count());
149  m_oldb.SetDataPoint<int32_t>(m_dp_stats_last_id, m_sample_id);
150 
151  ResetComputation(); // clean up computation and reset some values
152  m_computation_running = false;
153  }
154 
155  bool isComputing(){return m_computation_running;}
156 
157  protected:
158 
160  {
161  // called at end of computation to or duing idle to put
162  m_callback_counter = 0;
163  }
164 
165 
166  // single threaded sgemv for row major matrix
167  void my_sgemv(const float * mat, const float * in_vec, float * out_vec, uint32_t rows, uint32_t cols)
168  {
169  for (uint32_t i = 0; i < rows; ++i)
170  {
171  out_vec[i] = my_dot_product(mat + i*cols, in_vec, cols);
172  }
173  }
174  // single threaded dot product
175  float my_dot_product(const float *x, const float *y, uint32_t n)
176  {
177  double res = 0.0;
178  for (uint32_t i = 0; i < n; ++i)
179  {
180  res += x[i] * y[i];
181  }
182  return res;
183  }
184 
187  std::string m_name;
188 
189  // inputs
194 
195  //outputs to write
198 
199  // write statistics:
203 
204  unsigned int m_sample_id;
205  unsigned int m_iteration;
206  std::atomic<unsigned int> m_callback_counter;
207  std::atomic<bool> m_computation_running;
208 
212 
213  std::vector<float> m_avg_slopes;
214  std::vector<float> m_avg_modes;
215 
218 
219 };
220 
221 } // namespace
222 
223 #endif
rtctk::exampleDataTask::Computation::m_oldb
rtctk::componentFramework::OldbApiIf & m_oldb
Definition: computation.hpp:186
rtctk::exampleDataTask::Computation
Definition: computation.hpp:23
rtctk::exampleDataTask::Computation::Reset
void Reset()
Definition: computation.hpp:104
rtctk::exampleDataTask::Computation::m_dp_stats_iteration
DataPointPath m_dp_stats_iteration
Definition: computation.hpp:200
rtctk::exampleDataTask::Computation::m_dp_slopes
DataPointPath m_dp_slopes
Definition: computation.hpp:190
rtctk::componentFramework
Definition: rtcComponent.hpp:17
rtctk::exampleDataTask::Computation::m_avg_modes
std::vector< float > m_avg_modes
Definition: computation.hpp:214
rtctk::exampleDataTask::Computation::m_rtr
rtctk::componentFramework::RuntimeRepoApiIf & m_rtr
Definition: computation.hpp:185
rtctk::exampleDataTask::Computation::ResetComputation
void ResetComputation()
Definition: computation.hpp:159
oldbApiIf.hpp
Header file for OldbApiIf, which defines the API for OldbAdapters.
matrixBuffer.hpp
Declaration of the MatrixBuffer template class used in APIs.
rtctk::exampleDataTask::Computation::m_sample_id
unsigned int m_sample_id
Definition: computation.hpp:204
rtctk::exampleDataTask::Computation::m_buffer_matrix
MatrixBuffer< float > m_buffer_matrix
Definition: computation.hpp:216
rtctk::exampleDataTask::Computation::isComputing
bool isComputing()
Definition: computation.hpp:155
rtctk::exampleDataTask::Computation::Compute
void Compute()
Definition: computation.hpp:111
rtctk::exampleDataTask::Computation::my_sgemv
void my_sgemv(const float *mat, const float *in_vec, float *out_vec, uint32_t rows, uint32_t cols)
Definition: computation.hpp:167
rtctk::exampleDataTask::Computation::m_name
std::string m_name
Definition: computation.hpp:187
rtctk::exampleDataTask::Computation::m_n_modes
int m_n_modes
Definition: computation.hpp:210
rtctk::exampleDataTask::Computation::m_dp_stats_last_id
DataPointPath m_dp_stats_last_id
Definition: computation.hpp:202
rtctk::componentFramework::GetLogger
log4cplus::Logger & GetLogger(const std::string &name="")
rtctk::exampleDataTask::Computation::m_dp_avg_slopes
DataPointPath m_dp_avg_slopes
Definition: computation.hpp:196
rtctkExampleDataTaskGenFitsData.mat
mat
Definition: rtctkExampleDataTaskGenFitsData.py:14
rtctk::componentFramework::OldbApiIf
Definition: oldbApiIf.hpp:18
rtctk::exampleDataTask
Definition: businessLogic.hpp:22
rtctk::exampleDataTask::Computation::m_dp_avg_modes
DataPointPath m_dp_avg_modes
Definition: computation.hpp:197
rtctk::exampleDataTask::Computation::m_computation_running
std::atomic< bool > m_computation_running
Definition: computation.hpp:207
rtctk::componentFramework::RuntimeRepoApiIf
Definition: runtimeRepoApiIf.hpp:59
rtctk::exampleDataTask::Computation::m_avg_slopes
std::vector< float > m_avg_slopes
Definition: computation.hpp:213
rtctk::exampleDataTask::Computation::m_dp_stats_time
DataPointPath m_dp_stats_time
Definition: computation.hpp:201
rtctk::exampleDataTask::Computation::Computation
Computation(rtctk::componentFramework::RuntimeRepoApiIf &rtr, rtctk::componentFramework::OldbApiIf &oldb, std::string const &name)
Definition: computation.hpp:25
mudpi::uint32_t
unsigned int uint32_t
Definition: mudpi.h:16
rtctk::exampleDataTask::Computation::m_iteration
unsigned int m_iteration
Definition: computation.hpp:205
runtimeRepoApiIf.hpp
Header file for RuntimeRepoApiIf, which defines the API for RuntimeRepoAdapters.
logger.hpp
Logging Support Library based on log4cplus.
rtctk::exampleDataTask::Computation::DynamicLoad
void DynamicLoad()
Definition: computation.hpp:81
mudpi::int32_t
int int32_t
Definition: mudpi.h:17
rtctk::exampleDataTask::Computation::m_n_iterations
int m_n_iterations
Definition: computation.hpp:211
rtctk::exampleDataTask::Computation::m_dp_modes
DataPointPath m_dp_modes
Definition: computation.hpp:191
rtctk::componentFramework::DataPointPath
Definition: dataPointPath.hpp:30
rtctk::exampleDataTask::Computation::m_dp_s2m
DataPointPath m_dp_s2m
Definition: computation.hpp:193
rtctk::exampleDataTask::Computation::m_callback_counter
std::atomic< unsigned int > m_callback_counter
Definition: computation.hpp:206
rtctk::exampleDataTask::Computation::my_dot_product
float my_dot_product(const float *x, const float *y, uint32_t n)
Definition: computation.hpp:175
rtctk::exampleDataTask::Computation::OnDataAvailable
void OnDataAvailable(TopicType const &sample)
Definition: computation.hpp:96
rtctk::componentFramework::MatrixBuffer
Definition: matrixBuffer.hpp:19
rtctk::exampleDataTask::Computation::StaticLoad
void StaticLoad()
Definition: computation.hpp:55
rtctk::exampleDataTask::Computation::m_s2m_matrix
MatrixBuffer< float > m_s2m_matrix
Definition: computation.hpp:217
rtctk::exampleDataTask::Computation::m_dp_iterations
DataPointPath m_dp_iterations
Definition: computation.hpp:192
rtctk::exampleDataTask::Computation::m_n_slopes
int m_n_slopes
Definition: computation.hpp:209