9 #ifndef RTCTK_DATATASK_READERHELPERS_HPP
10 #define RTCTK_DATATASK_READERHELPERS_HPP
14 #include <system_error>
34 std::chrono::milliseconds
CalcDuration(
size_t count,
size_t loop_frequency,
float error_margin = 1.0)
36 using namespace std::chrono;
37 return milliseconds{
static_cast<int>(ceil(1000.0 * count / loop_frequency))};
50 std::pair<std::chrono::milliseconds, std::chrono::milliseconds>
CalcTimeout(
size_t count, \
51 size_t loop_frequency,\
52 float error_margin = 2.0)
54 using namespace std::chrono;
56 int timeout_total = ceil((
static_cast<float>(count)/loop_frequency)*1000)*error_margin;
57 int timeout_short = ceil((
static_cast<float>(1)/loop_frequency)*1000)*error_margin;
58 return {milliseconds{timeout_short}, milliseconds{timeout_total}};
74 template <
typename ReaderType,
typename Operation>
75 std::error_code
Read(ReaderType& reader, Operation&& op,
size_t count,
size_t loop_frequency, \
76 float error_margin = 2.0)
79 using namespace std::chrono;
81 auto [timeout_short, timeout_total] =
CalcTimeout(count, loop_frequency, error_margin);
84 std::error_code
const ok;
85 std::pair<std::error_code, size_t> ret;
86 milliseconds time_elapsed {0};
87 auto time_start = system_clock::now();
90 ret = reader.Read(std::forward<Operation>(op), count, timeout_short);
92 LOG4CPLUS_ERROR(
GetLogger(),
"Reading from shm timed out: check queue is being filled");
93 LOG4CPLUS_ERROR(
GetLogger(),
"Read: " << ret.second <<
" in " << timeout_short.count() <<
" ms");
94 LOG4CPLUS_ERROR(
GetLogger(),
"Expected: " << count);
102 time_elapsed = duration_cast<milliseconds>(system_clock::now() - time_start);
103 if (time_elapsed > timeout_total) {
104 LOG4CPLUS_ERROR(
GetLogger(),
"Reading from shm timed out: check queue is being filled");
105 LOG4CPLUS_ERROR(
GetLogger(),
"Read: " << read <<
" in " << time_elapsed.count() <<
" ms");
106 LOG4CPLUS_ERROR(
GetLogger(),
"Expected: " << read);
125 template <
typename ReaderType>
126 std::error_code
Skip(ReaderType& reader,
size_t count,
size_t loop_frequency, \
127 float error_margin = 2.0)
130 using namespace std::chrono;
132 auto [timeout_short, timeout_total] =
CalcTimeout(count, loop_frequency, error_margin);
135 std::error_code
const ok;
136 std::pair<std::error_code, size_t> ret;
137 milliseconds time_elapsed {0};
138 auto time_start = system_clock::now();
141 ret = reader.Skip(count, timeout_short);
142 if(ret.first != ok) {
143 LOG4CPLUS_ERROR(
GetLogger(),
"Reading from shm timed out: check queue is being filled");
144 LOG4CPLUS_ERROR(
GetLogger(),
"Read: " << ret.second <<
" in " << timeout_short.count() <<
" ms");
145 LOG4CPLUS_ERROR(
GetLogger(),
"Expected: " << count);
148 skipped += ret.second;
149 if (skipped == count) {
153 time_elapsed = duration_cast<milliseconds>(system_clock::now() - time_start);
154 if (time_elapsed > timeout_total) {
155 LOG4CPLUS_ERROR(
GetLogger(),
"Reading from shm timed out: check queue is being filled");
156 LOG4CPLUS_ERROR(
GetLogger(),
"Read: " << skipped <<
" in " << time_elapsed.count() <<
" ms");
157 LOG4CPLUS_ERROR(
GetLogger(),
"Expected: " << skipped);
172 template <
typename ReaderType>
173 std::error_code
Reset(ReaderType& reader)
175 if (reader.Size() != 0) {
176 return reader.Reset();
189 template <
typename ReaderType>
192 return reader.Size() - reader.NumAvailable();