9 #ifndef RTCTK_DATATASK_READERTHREAD_HPP
10 #define RTCTK_DATATASK_READERTHREAD_HPP
19 #include <numapp/numapolicies.hpp>
20 #include <numapp/thread.hpp>
40 :
std::runtime_error(
"Request '" + req_name +
"' timed out!") {}
47 :
std::runtime_error(
"An asynchronous error occured!") {}
50 :
std::runtime_error(
"An asynchronous error occured: '" + text +
"'!") {}
67 template<
typename TopicType,
typename ReaderType>
71 : m_thread_name(
"thread_name",
"readerThread")
72 , m_queue_name(
"queue_name")
73 , m_thread_affinity(
"thread_affinity")
74 , m_samples_to_read(
"samples_to_read")
75 , m_samples_to_skip(
"samples_to_skip")
76 , m_loop_frequency(
"loop_frequency")
77 , m_error_margin(
"error_margin")
79 , m_callback_on_data(NULL)
80 , m_callback_init_thread(NULL)
86 if(m_thread.joinable()) {
105 using namespace std::chrono_literals;
107 LOG4CPLUS_INFO(
GetLogger(),
"ReaderThread::Spawn()");
110 assert(m_callback_on_data != NULL);
112 m_queue_name.CheckSet();
113 m_thread_name.CheckSet();
114 m_samples_to_read.CheckSet();
115 m_samples_to_skip.CheckSet();
116 m_loop_frequency.CheckSet();
119 m_thread_name.Lock();
120 m_thread_affinity.Lock();
121 m_loop_frequency.Lock();
123 if(!m_error_margin.IsSet())
125 m_error_margin.Set(1.2);
127 m_error_margin.Lock();
129 auto policies = numapp::NumaPolicies();
131 if(m_thread_affinity.IsSet()) {
132 auto cpu_mask = numapp::Cpumask(std::to_string(m_thread_affinity.Get()).c_str());
133 policies.SetCpuAffinity(numapp::CpuAffinity(cpu_mask));
136 auto f = SendRequestAsync(Command::Idle);
138 m_thread = numapp::MakeThread(
144 auto status = f.wait_for(1200ms);
145 if (status != std::future_status::ready) {
162 using namespace std::chrono_literals;
164 LOG4CPLUS_INFO(
GetLogger(),
"ReaderThread::Join()");
166 auto f = SendRequestAsync(Command::Exit);
168 auto status = f.wait_for(1200ms);
171 m_queue_name.Unlock();
172 m_thread_name.Unlock();
173 m_thread_affinity.Unlock();
174 m_loop_frequency.Unlock();
176 if (status != std::future_status::ready) {
187 using namespace std::chrono_literals;
189 LOG4CPLUS_INFO(
GetLogger(),
"ReaderThread::Run()");
191 SendRequestSync(Command::Run);
201 using namespace std::chrono_literals;
203 LOG4CPLUS_INFO(
GetLogger(),
"ReaderThread::Idle()");
205 SendRequestSync(Command::Idle);
216 using namespace std::chrono_literals;
218 std::error_code
const ok{};
220 m_loop_frequency.Get(), \
223 auto ret = m_comp_allowed.Pend(timeout);
246 m_queue_name.
Set(name);
256 m_thread_name.
Set(name);
265 m_thread_affinity.
Set(affinity);
274 m_samples_to_read.
Set(value);
283 m_samples_to_skip.
Set(value);
293 m_loop_frequency.
Set(value);
303 m_error_margin.
Set(value);
316 m_callback_on_data = callback;
329 m_callback_init_thread = callback;
334 enum class State : unsigned {
347 const std::map<State, std::string> m_state_text = {
348 {State::Error,
"Error"},
350 {State::Starting,
"Starting"},
351 {State::Terminating,
"Terminating"},
352 {State::Idle,
"Idle"},
353 {State::Reading,
"Reading"},
354 {State::Skipping,
"Skipping"},
355 {State::Waiting,
"Waiting"},
356 {State::Dropping,
"Dropping"},
360 enum class Command: unsigned {
368 const std::map<Command, std::string> m_command_text = {
369 {Command::None,
"-"},
370 {Command::Run,
"Off"},
371 {Command::Idle,
"Starting"},
372 {Command::Exit,
"Terminating"},
381 std::future<void> SendRequestAsync(Command cmd) {
383 auto f = req.GetReplyFuture();
384 m_request_q.Post(std::move(req));
394 void SendRequestSync(Command cmd) {
395 using namespace std::chrono_literals;
397 auto f = SendRequestAsync(cmd);
398 auto status = f.wait_for(1200ms);
399 if (status != std::future_status::ready) {
400 throw RequestTimedOut(m_command_text.at(cmd));
405 return m_request_q.TryPend().value_or(
Request(Command::None));
416 using namespace std::chrono;
417 using namespace std::chrono_literals;
419 auto req = GetRequest();
421 m_state = State::Off;
422 State next_state = State::Starting;
423 State prev_state = State::Off;
425 auto time_start = system_clock::now();
426 milliseconds time_elapsed {0};
427 auto duration_skip =
CalcDuration(m_samples_to_skip.Get(), m_loop_frequency.Get());
428 auto timeout_wait {duration_skip / 2};
429 auto timeout_drop {duration_skip / 2};
434 std::error_code
const ok{};
435 std::error_code ret = ok;
437 auto reader = ReaderType::MakeReader(m_queue_name.Get().c_str(), 30s);
441 prev_state = m_state;
442 if(m_state != next_state) {
443 m_state = next_state;
444 LOG4CPLUS_INFO(
GetLogger(),
"ReaderThread::Work() - changed state to '" << m_state_text.at(m_state) <<
"'");
449 case State::Starting:
452 if(m_callback_init_thread)
454 m_callback_init_thread();
456 next_state = State::Idle;
459 next_state = State::Error;
466 if(m_state != prev_state) {
472 if(req.GetPayload() == Command::Exit) {
473 next_state = State::Terminating;
475 }
else if(req.GetPayload() == Command::Run) {
478 next_state = ret==ok ? State::Reading : State::Error;
482 std::this_thread::sleep_for(10ms);
483 next_state = m_state;
489 if(m_state != prev_state) {
490 if(prev_state == State::Idle) {
493 to_read = m_samples_to_read.Get();
494 to_skip = m_samples_to_skip.Get();
498 if(req.GetPayload() == Command::Exit) {
499 next_state = State::Terminating;
501 }
else if (req.GetPayload() == Command::Idle){
502 next_state = State::Idle;
506 size_t to_read_now = std::min(to_read, m_loop_frequency.Get());
507 auto time_read_start = system_clock::now();
509 m_callback_on_data, \
511 m_loop_frequency.Get(), \
512 m_error_margin.Get());
513 auto time_read_end = system_clock::now();
514 CalculateEstimatedFrequency(to_read_now, time_read_end - time_read_start);
515 to_read -= to_read_now;
518 next_state = State::Error;
519 }
else if(to_read == 0) {
520 next_state = State::Skipping;
522 next_state = m_state;
527 case State::Skipping:
529 if(m_state != prev_state) {
530 m_comp_allowed.Post(ok);
534 if(req.GetPayload() == Command::Exit) {
535 next_state = State::Terminating;
537 }
else if (req.GetPayload() == Command::Idle){
538 next_state = State::Idle;
542 size_t to_skip_now = std::min(to_skip, m_loop_frequency.Get());
543 auto time_skip_start = system_clock::now();
544 ret =
Skip(reader, to_skip_now, m_loop_frequency.Get(), m_error_margin.Get());
545 auto time_skip_end = system_clock::now();
546 CalculateEstimatedFrequency(to_skip_now, time_skip_end - time_skip_start);
548 to_skip -= to_skip_now;
551 next_state = State::Error;
552 }
else if(to_skip == 0) {
553 if(m_comp_done.TryWait()) {
554 next_state = State::Reading;
556 next_state = State::Waiting;
559 next_state = m_state;
566 if(m_state != prev_state) {
567 time_start = system_clock::now();
571 if(req.GetPayload() == Command::Exit) {
572 next_state = State::Terminating;
574 }
else if (req.GetPayload() == Command::Idle){
575 next_state = State::Idle;
579 if(m_comp_done.TryWait()) {
580 next_state = State::Reading;
584 std::this_thread::sleep_for(1ms);
586 time_elapsed = duration_cast<milliseconds>(system_clock::now() - time_start);
587 if((time_elapsed > timeout_wait) or (
NumFree(reader) == 0)) {
588 next_state = State::Dropping;
590 next_state = m_state;
595 case State::Dropping:
597 if(m_state != prev_state) {
598 time_start = system_clock::now();
602 if(req.GetPayload() == Command::Exit) {
603 next_state = State::Terminating;
605 }
else if (req.GetPayload() == Command::Idle){
606 next_state = State::Idle;
610 if(m_comp_done.TryWait()) {
612 next_state = ret==ok ? State::Reading : State::Error;
616 std::this_thread::sleep_for(1ms);
618 time_elapsed = duration_cast<milliseconds>(system_clock::now() - time_start);
619 if(time_elapsed > timeout_drop) {
621 next_state = State::Error;
623 next_state = m_state;
630 if(m_state != prev_state) {
631 m_comp_allowed.Post(ret);
632 LOG4CPLUS_ERROR(
GetLogger(),
"ReaderThread::Work() - " << ret.message());
636 if(req.GetPayload() == Command::Exit) {
637 next_state = State::Terminating;
638 }
else if(req.GetPayload() == Command::Idle) {
639 next_state = State::Idle;
641 std::this_thread::sleep_for(100ms);
642 next_state = m_state;
647 case State::Terminating:
649 next_state = State::Off;
668 template <
class Rep,
class Period>
669 void CalculateEstimatedFrequency(
size_t count,
670 std::chrono::duration<Rep, Period> time)
673 using namespace std::chrono;
674 if(time.count() != 0)
676 m_estimated_loop_frequency =
static_cast<float>(count) / \
677 duration_cast<seconds>(time).count();
679 if(!std::isinf(m_estimated_loop_frequency))
682 if(m_estimated_loop_frequency < \
683 static_cast<float>(m_loop_frequency.Get())/m_error_margin.Get() || \
684 m_estimated_loop_frequency > \
685 static_cast<float>(m_loop_frequency.Get())*m_error_margin.Get())
687 LOG4CPLUS_WARN(
GetLogger(),
"ReaderThread: Measured SHM frequency: " <<\
688 m_estimated_loop_frequency <<
" expected: " << m_loop_frequency.Get());
694 std::thread m_thread;
696 Parameter<std::string> m_thread_name;
697 Parameter<std::string> m_queue_name;
698 Parameter<size_t> m_thread_affinity;
699 Parameter<size_t> m_samples_to_read;
700 Parameter<size_t> m_samples_to_skip;
701 Parameter<size_t> m_loop_frequency;
702 Parameter<float> m_error_margin;
704 float m_estimated_loop_frequency;
706 MessageQueue<Request<Command>> m_request_q;
707 MessageQueue<std::error_code> m_comp_allowed;
709 Semaphore m_comp_done;
713 std::function<void(
const TopicType& sample)> m_callback_on_data;
714 std::function<void()> m_callback_init_thread;