RTC Toolkit  0.1.0-alpha
readerThread.hpp
Go to the documentation of this file.
1 
9 #ifndef RTCTK_DATATASK_READERTHREAD_HPP
10 #define RTCTK_DATATASK_READERTHREAD_HPP
11 
18 
19 #include <numapp/numapolicies.hpp>
20 #include <numapp/thread.hpp>
21 
22 #include <thread>
23 #include <chrono>
24 #include <ctime>
25 #include <cmath>
26 #include <map>
27 #include <functional>
28 #include <stdexcept>
29 
30 
31 namespace rtctk::dataTask {
35 // TODO: this one is more high level, different file
36 class RequestTimedOut : public std::runtime_error
37 {
38  public:
39  RequestTimedOut(const std::string& req_name)
40  : std::runtime_error("Request '" + req_name + "' timed out!") {}
41 };
42 
43 class AsynchronousError : public std::runtime_error
44 {
45  public:
47  : std::runtime_error("An asynchronous error occured!") {}
48 
49  AsynchronousError(const std::string& text)
50  : std::runtime_error("An asynchronous error occured: '" + text + "'!") {}
51 };
52 
53 
54 
67 template<typename TopicType, typename ReaderType>
68 class ReaderThread {
69  public:
71  : m_thread_name("thread_name", "readerThread")
72  , m_queue_name("queue_name")
73  , m_thread_affinity("thread_affinity")
74  , m_samples_to_read("samples_to_read")
75  , m_samples_to_skip("samples_to_skip")
76  , m_loop_frequency("loop_frequency")
77  , m_error_margin("error_margin")
78  , m_state(State::Off)
79  , m_callback_on_data(NULL)
80  , m_callback_init_thread(NULL)
81  {
82  }
83 
85  {
86  if(m_thread.joinable()) {
87  Join();
88  }
89  }
90 
102  void Spawn()
103  {
105  using namespace std::chrono_literals;
106 
107  LOG4CPLUS_INFO(GetLogger(), "ReaderThread::Spawn()");
108 
109  // check if callback to OnDataAvailable has been registered
110  assert(m_callback_on_data != NULL);
111 
112  m_queue_name.CheckSet();
113  m_thread_name.CheckSet();
114  m_samples_to_read.CheckSet();
115  m_samples_to_skip.CheckSet();
116  m_loop_frequency.CheckSet();
117 
118  m_queue_name.Lock();
119  m_thread_name.Lock();
120  m_thread_affinity.Lock();
121  m_loop_frequency.Lock();
122 
123  if(!m_error_margin.IsSet())
124  {
125  m_error_margin.Set(1.2);
126  }
127  m_error_margin.Lock();
128 
129  auto policies = numapp::NumaPolicies();
130 
131  if(m_thread_affinity.IsSet()) {
132  auto cpu_mask = numapp::Cpumask(std::to_string(m_thread_affinity.Get()).c_str());
133  policies.SetCpuAffinity(numapp::CpuAffinity(cpu_mask));
134  }
135 
136  auto f = SendRequestAsync(Command::Idle);
137 
138  m_thread = numapp::MakeThread(
139  m_thread_name.Get(),
140  policies,
141  &ReaderThread::Work,
142  this);
143 
144  auto status = f.wait_for(1200ms);
145  if (status != std::future_status::ready) {
146  throw RequestTimedOut(m_command_text.at(Command::Idle));
147  }
148  }
149 
159  void Join()
160  {
162  using namespace std::chrono_literals;
163 
164  LOG4CPLUS_INFO(GetLogger(), "ReaderThread::Join()");
165 
166  auto f = SendRequestAsync(Command::Exit);
167 
168  auto status = f.wait_for(1200ms);
169  m_thread.join();
170 
171  m_queue_name.Unlock();
172  m_thread_name.Unlock();
173  m_thread_affinity.Unlock();
174  m_loop_frequency.Unlock();
175 
176  if (status != std::future_status::ready) {
177  throw RequestTimedOut(m_command_text.at(Command::Exit));
178  }
179  }
180 
184  void Run()
185  {
187  using namespace std::chrono_literals;
188 
189  LOG4CPLUS_INFO(GetLogger(), "ReaderThread::Run()");
190 
191  SendRequestSync(Command::Run);
192  }
193 
194 
198  void Idle()
199  {
201  using namespace std::chrono_literals;
202 
203  LOG4CPLUS_INFO(GetLogger(), "ReaderThread::Idle()");
204 
205  SendRequestSync(Command::Idle);
206  }
207 
215  {
216  using namespace std::chrono_literals;
218  std::error_code const ok{};
219  auto timeout = 1000*detail::CalcDuration(m_samples_to_read.Get(), \
220  m_loop_frequency.Get(), \
221  3.0);
222 
223  auto ret = m_comp_allowed.Pend(timeout);
224  if(ret != ok) {
225  throw AsynchronousError(ret.message());
226  }
227  }
228 
234  {
235  m_comp_done.Post();
236  }
237 
238 
243  void SetQueueName(std::string name)
244  {
245  // TODO is there a max lenght of the name?
246  m_queue_name.Set(name);
247  }
248 
253  void SetThreadName(std::string name)
254  {
255  // TODO check for max 16 characters length
256  m_thread_name.Set(name);
257  }
258 
263  void SetCpuAffinity(int affinity)
264  {
265  m_thread_affinity.Set(affinity);
266  }
267 
272  void SetSamplesToRead(size_t value)
273  {
274  m_samples_to_read.Set(value);
275  }
276 
281  void SetSamplesToSkip(size_t value)
282  {
283  m_samples_to_skip.Set(value);
284  }
285 
291  void SetLoopFrequency(size_t value)
292  {
293  m_loop_frequency.Set(value);
294  }
295 
301  void SetErrorMargin(float value)
302  {
303  m_error_margin.Set(value);
304  }
305 
306  // call back registration
307  // TODO: add checks
314  void RegisterOnDataCallback(std::function<void(const TopicType& sample)> callback)
315  {
316  m_callback_on_data = callback;
317  }
318 
319 
327  void RegisterInitThreadCallback(std::function<void()> callback)
328  {
329  m_callback_init_thread = callback;
330  }
331 
332  private:
333 
334  enum class State : unsigned {
335  Error,
336  Off,
337  Starting,
338  Terminating,
339  Idle,
340  Reading,
341  Skipping,
342  Waiting,
343  Dropping,
344  }; //< known states of the readerThread
345 
346 
347  const std::map<State, std::string> m_state_text = {
348  {State::Error, "Error"},
349  {State::Off, "Off"},
350  {State::Starting, "Starting"},
351  {State::Terminating, "Terminating"},
352  {State::Idle, "Idle"},
353  {State::Reading, "Reading"},
354  {State::Skipping, "Skipping"},
355  {State::Waiting, "Waiting"},
356  {State::Dropping, "Dropping"},
357  }; //< States names
358 
359 
360  enum class Command: unsigned {
361  None,
362  Run,
363  Idle,
364  Exit,
365  }; //< expected commands
366 
367 
368  const std::map<Command, std::string> m_command_text = {
369  {Command::None, "-"},
370  {Command::Run, "Off"},
371  {Command::Idle, "Starting"},
372  {Command::Exit, "Terminating"},
373  }; //< expected commands names
374 
381  std::future<void> SendRequestAsync(Command cmd) {
382  Request<Command> req(cmd);
383  auto f = req.GetReplyFuture();
384  m_request_q.Post(std::move(req));
385  return f;
386  }
387 
394  void SendRequestSync(Command cmd) {
395  using namespace std::chrono_literals;
396 
397  auto f = SendRequestAsync(cmd);
398  auto status = f.wait_for(1200ms);
399  if (status != std::future_status::ready) {
400  throw RequestTimedOut(m_command_text.at(cmd));
401  }
402  }
403 
404  Request<Command> GetRequest() {
405  return m_request_q.TryPend().value_or(Request(Command::None));
406  }
407 
412  void Work()
413  {
415  using namespace rtctk::dataTask::detail;
416  using namespace std::chrono;
417  using namespace std::chrono_literals;
418 
419  auto req = GetRequest();
420 
421  m_state = State::Off;
422  State next_state = State::Starting;
423  State prev_state = State::Off;
424 
425  auto time_start = system_clock::now();
426  milliseconds time_elapsed {0};
427  auto duration_skip = CalcDuration(m_samples_to_skip.Get(), m_loop_frequency.Get());
428  auto timeout_wait {duration_skip / 2};
429  auto timeout_drop {duration_skip / 2};
430 
431  size_t to_read = 0;
432  size_t to_skip = 0;
433 
434  std::error_code const ok{};
435  std::error_code ret = ok;
436 
437  auto reader = ReaderType::MakeReader(m_queue_name.Get().c_str(), 30s);
438 
439  while(1) {
440 
441  prev_state = m_state;
442  if(m_state != next_state) {
443  m_state = next_state;
444  LOG4CPLUS_INFO(GetLogger(), "ReaderThread::Work() - changed state to '" << m_state_text.at(m_state) << "'");
445  }
446 
447  switch(m_state)
448  {
449  case State::Starting:
450  {
451  try {
452  if(m_callback_init_thread)
453  {
454  m_callback_init_thread();
455  }
456  next_state = State::Idle;
457  }catch(...) {
458  ret = std::make_error_code(std::errc::timed_out); // TODO: other code
459  next_state = State::Error;
460  }
461  break;
462  }
463 
464  case State::Idle:
465  {
466  if(m_state != prev_state) {
467  req.SetReply();
468  }
469 
470  req = GetRequest();
471 
472  if(req.GetPayload() == Command::Exit) {
473  next_state = State::Terminating;
474  break;
475  }else if(req.GetPayload() == Command::Run) {
476  m_comp_done.Clear();
477  ret = Reset(reader);
478  next_state = ret==ok ? State::Reading : State::Error;
479  break;
480  }
481 
482  std::this_thread::sleep_for(10ms);
483  next_state = m_state;
484  break;
485  }
486 
487  case State::Reading:
488  {
489  if(m_state != prev_state) {
490  if(prev_state == State::Idle) {
491  req.SetReply();
492  }
493  to_read = m_samples_to_read.Get();
494  to_skip = m_samples_to_skip.Get();
495  }
496 
497  req = GetRequest();
498  if(req.GetPayload() == Command::Exit) {
499  next_state = State::Terminating;
500  break;
501  }else if (req.GetPayload() == Command::Idle){
502  next_state = State::Idle;
503  break;
504  }
505 
506  size_t to_read_now = std::min(to_read, m_loop_frequency.Get());
507  auto time_read_start = system_clock::now();
508  ret = Read(reader, \
509  m_callback_on_data, \
510  to_read_now, \
511  m_loop_frequency.Get(), \
512  m_error_margin.Get());
513  auto time_read_end = system_clock::now();
514  CalculateEstimatedFrequency(to_read_now, time_read_end - time_read_start);
515  to_read -= to_read_now;
516 
517  if(ret != ok) {
518  next_state = State::Error;
519  }else if(to_read == 0) {
520  next_state = State::Skipping;
521  }else{
522  next_state = m_state;
523  }
524  break;
525  }
526 
527  case State::Skipping:
528  {
529  if(m_state != prev_state) {
530  m_comp_allowed.Post(ok);
531  }
532 
533  req = GetRequest();
534  if(req.GetPayload() == Command::Exit) {
535  next_state = State::Terminating;
536  break;
537  }else if (req.GetPayload() == Command::Idle){
538  next_state = State::Idle;
539  break;
540  }
541 
542  size_t to_skip_now = std::min(to_skip, m_loop_frequency.Get());
543  auto time_skip_start = system_clock::now();
544  ret = Skip(reader, to_skip_now, m_loop_frequency.Get(), m_error_margin.Get());
545  auto time_skip_end = system_clock::now();
546  CalculateEstimatedFrequency(to_skip_now, time_skip_end - time_skip_start);
547 
548  to_skip -= to_skip_now;
549 
550  if(ret != ok) {
551  next_state = State::Error;
552  }else if(to_skip == 0) {
553  if(m_comp_done.TryWait()) {
554  next_state = State::Reading;
555  }else {
556  next_state = State::Waiting;
557  }
558  }else{
559  next_state = m_state;
560  }
561  break;
562  }
563 
564  case State::Waiting:
565  {
566  if(m_state != prev_state) {
567  time_start = system_clock::now();
568  }
569 
570  req = GetRequest();
571  if(req.GetPayload() == Command::Exit) {
572  next_state = State::Terminating;
573  break;
574  }else if (req.GetPayload() == Command::Idle){
575  next_state = State::Idle;
576  break;
577  }
578 
579  if(m_comp_done.TryWait()) {
580  next_state = State::Reading;
581  break;
582  }
583 
584  std::this_thread::sleep_for(1ms);
585 
586  time_elapsed = duration_cast<milliseconds>(system_clock::now() - time_start);
587  if((time_elapsed > timeout_wait) or (NumFree(reader) == 0)) { // TODO: revisit
588  next_state = State::Dropping;
589  }else {
590  next_state = m_state;
591  }
592  break;
593  }
594 
595  case State::Dropping:
596  {
597  if(m_state != prev_state) {
598  time_start = system_clock::now();
599  }
600 
601  req = GetRequest();
602  if(req.GetPayload() == Command::Exit) {
603  next_state = State::Terminating;
604  break;
605  }else if (req.GetPayload() == Command::Idle){
606  next_state = State::Idle;
607  break;
608  }
609 
610  if(m_comp_done.TryWait()) {
611  ret = Reset(reader);
612  next_state = ret==ok ? State::Reading : State::Error;
613  break;
614  }
615 
616  std::this_thread::sleep_for(1ms);
617 
618  time_elapsed = duration_cast<milliseconds>(system_clock::now() - time_start);
619  if(time_elapsed > timeout_drop) {
620  ret = std::make_error_code(std::errc::timed_out);
621  next_state = State::Error;
622  }else {
623  next_state = m_state;
624  }
625  break;
626  }
627 
628  case State::Error:
629  {
630  if(m_state != prev_state) {
631  m_comp_allowed.Post(ret);
632  LOG4CPLUS_ERROR(GetLogger(), "ReaderThread::Work() - " << ret.message());
633  }
634 
635  req = GetRequest();
636  if(req.GetPayload() == Command::Exit) {
637  next_state = State::Terminating;
638  }else if(req.GetPayload() == Command::Idle) {
639  next_state = State::Idle;
640  }else{
641  std::this_thread::sleep_for(100ms);
642  next_state = m_state;
643  }
644  break;
645  }
646 
647  case State::Terminating:
648  {
649  next_state = State::Off;
650  break;
651  }
652 
653  case State::Off:
654  {
655  req.SetReply();
656  return;
657  }
658  }
659  }
660  }
661 
668  template <class Rep, class Period>
669  void CalculateEstimatedFrequency(size_t count,
670  std::chrono::duration<Rep, Period> time)
671  {
673  using namespace std::chrono;
674  if(time.count() != 0)
675  {
676  m_estimated_loop_frequency = static_cast<float>(count) / \
677  duration_cast<seconds>(time).count();
678 
679  if(!std::isinf(m_estimated_loop_frequency))
680  {
681  //check if outside of error margin
682  if(m_estimated_loop_frequency < \
683  static_cast<float>(m_loop_frequency.Get())/m_error_margin.Get() || \
684  m_estimated_loop_frequency > \
685  static_cast<float>(m_loop_frequency.Get())*m_error_margin.Get())
686  {
687  LOG4CPLUS_WARN(GetLogger(), "ReaderThread: Measured SHM frequency: " <<\
688  m_estimated_loop_frequency << " expected: " << m_loop_frequency.Get());
689  }
690  }
691  }
692  }
693 
694  std::thread m_thread; //< the readerThead member
695 
696  Parameter<std::string> m_thread_name; //< parameters thread name
697  Parameter<std::string> m_queue_name; //< parameter queue name
698  Parameter<size_t> m_thread_affinity; //< parameter cpu affinity
699  Parameter<size_t> m_samples_to_read; //< parameter samples to be read
700  Parameter<size_t> m_samples_to_skip; //< parameter samples to skip
701  Parameter<size_t> m_loop_frequency; //< parameter loop frequency
702  Parameter<float> m_error_margin; //< parameter timeout tollerance
703 
704  float m_estimated_loop_frequency;
705 
706  MessageQueue<Request<Command>> m_request_q;
707  MessageQueue<std::error_code> m_comp_allowed;
708 
709  Semaphore m_comp_done;
710 
711  State m_state;
712 
713  std::function<void(const TopicType& sample)> m_callback_on_data;
714  std::function<void()> m_callback_init_thread;
715 };
716 
717 } // namespace
718 
719 #endif
rtctk::dataTask::detail::CalcDuration
std::chrono::milliseconds CalcDuration(size_t count, size_t loop_frequency, float error_margin=1.0)
Definition: readerHelpers.hpp:34
rtctk::dataTask::ReaderThread::SetLoopFrequency
void SetLoopFrequency(size_t value)
Definition: readerThread.hpp:291
rtctk::dataTask::ReaderThread::SetCpuAffinity
void SetCpuAffinity(int affinity)
Definition: readerThread.hpp:263
rtctk::dataTask::ReaderThread::SetErrorMargin
void SetErrorMargin(float value)
Definition: readerThread.hpp:301
rtctk::dataTask::detail::Reset
std::error_code Reset(ReaderType &reader)
Definition: readerHelpers.hpp:173
rtctk::dataTask::ReaderThread::SetQueueName
void SetQueueName(std::string name)
Definition: readerThread.hpp:243
rtctk::dataTask::RequestTimedOut
Definition: readerThread.hpp:37
rtctk::componentFramework::RepositoryIf::Request
A request object to pass information about datapoints that should be read (written) from (to) the rep...
Definition: repositoryIf.hpp:37
rtctk::dataTask::detail::Read
std::error_code Read(ReaderType &reader, Operation &&op, size_t count, size_t loop_frequency, float error_margin=2.0)
Definition: readerHelpers.hpp:75
rtctk::dataTask::Parameter::Set
void Set(T const &value)
Definition: parameter.hpp:63
messageQueue.hpp
A simple message queue implementation.
rtctk::dataTask::ReaderThread::RegisterInitThreadCallback
void RegisterInitThreadCallback(std::function< void()> callback)
Definition: readerThread.hpp:327
readerHelpers.hpp
Header file needed to.
rtctk::dataTask::ReaderThread::SetSamplesToRead
void SetSamplesToRead(size_t value)
Definition: readerThread.hpp:272
rtctk::dataTask::detail
Definition: readerHelpers.hpp:22
semaphore.hpp
A simple semaphore implementation.
rtctk::dataTask::detail::Skip
std::error_code Skip(ReaderType &reader, size_t count, size_t loop_frequency, float error_margin=2.0)
Definition: readerHelpers.hpp:126
rtctk::dataTask::RequestTimedOut::RequestTimedOut
RequestTimedOut(const std::string &req_name)
Definition: readerThread.hpp:39
rtctk::dataTask::ReaderThread::SetSamplesToSkip
void SetSamplesToSkip(size_t value)
Definition: readerThread.hpp:281
rtctk::componentFramework::GetLogger
log4cplus::Logger & GetLogger(const std::string &name="")
rtctk::dataTask::AsynchronousError::AsynchronousError
AsynchronousError(const std::string &text)
Definition: readerThread.hpp:49
rtctk::dataTask::detail::NumFree
size_t NumFree(ReaderType &reader)
Definition: readerHelpers.hpp:190
rtctk::dataTask::ReaderThread::Spawn
void Spawn()
Definition: readerThread.hpp:102
rtctk::dataTask::ReaderThread::Run
void Run()
Definition: readerThread.hpp:184
rtctk::dataTask::ReaderThread::~ReaderThread
~ReaderThread()
Definition: readerThread.hpp:84
rtctk::dataTask::ReaderThread
Definition: readerThread.hpp:68
rtctk::dataTask::ReaderThread::RegisterOnDataCallback
void RegisterOnDataCallback(std::function< void(const TopicType &sample)> callback)
Definition: readerThread.hpp:314
rtctk::dataTask
Definition: messageQueue.hpp:17
parameter.hpp
Header file needed to.
rtctk::dataTask::ReaderThread::SetThreadName
void SetThreadName(std::string name)
Definition: readerThread.hpp:253
rtctk::telRepub::make_error_code
std::error_code make_error_code(MudpiProcessingError e)
Definition: mudpiProcessingError.hpp:103
Request
rtctk::componentFramework::RepositoryIf::Request Request
Definition: testFileRepository.cpp:14
std
Definition: mudpiProcessingError.hpp:109
rtctk::dataTask::Semaphore::Post
void Post()
Definition: semaphore.hpp:27
logger.hpp
Logging Support Library based on log4cplus.
rtctk::dataTask::AsynchronousError
Definition: readerThread.hpp:44
rtctk::dataTask::ReaderThread::Idle
void Idle()
Definition: readerThread.hpp:198
rtctk::dataTask::ReaderThread::Join
void Join()
Definition: readerThread.hpp:159
rtctk::dataTask::ReaderThread::WaitUntilComputationAllowed
void WaitUntilComputationAllowed()
Definition: readerThread.hpp:214
rtctk::dataTask::ReaderThread::SignalComputationDone
void SignalComputationDone()
Definition: readerThread.hpp:233
rtctk::dataTask::ReaderThread::ReaderThread
ReaderThread()
Definition: readerThread.hpp:70
rtctk::dataTask::AsynchronousError::AsynchronousError
AsynchronousError()
Definition: readerThread.hpp:46
request.hpp
Header file needed to.