RTC Toolkit  0.1.0-alpha
readerHelpers.hpp
Go to the documentation of this file.
1 
9 #ifndef RTCTK_DATATASK_READERHELPERS_HPP
10 #define RTCTK_DATATASK_READERHELPERS_HPP
12 
13 #include <chrono>
14 #include <system_error>
15 #include <cmath>
16 
17 namespace rtctk::dataTask {
22 namespace detail {
23 
24 
34 std::chrono::milliseconds CalcDuration(size_t count, size_t loop_frequency, float error_margin = 1.0)
35 {
36  using namespace std::chrono;
37  return milliseconds{static_cast<int>(ceil(1000.0 * count / loop_frequency))};
38 }
39 
50 std::pair<std::chrono::milliseconds, std::chrono::milliseconds> CalcTimeout(size_t count, \
51  size_t loop_frequency,\
52  float error_margin = 2.0)
53 {
54  using namespace std::chrono;
55 
56  int timeout_total = ceil((static_cast<float>(count)/loop_frequency)*1000)*error_margin;
57  int timeout_short = ceil((static_cast<float>(1)/loop_frequency)*1000)*error_margin;
58  return {milliseconds{timeout_short}, milliseconds{timeout_total}};
59 }
60 
74 template <typename ReaderType, typename Operation>
75 std::error_code Read(ReaderType& reader, Operation&& op, size_t count, size_t loop_frequency, \
76  float error_margin = 2.0)
77 {
79  using namespace std::chrono;
80 
81  auto [timeout_short, timeout_total] = CalcTimeout(count, loop_frequency, error_margin);
82 
83  size_t read = 0;
84  std::error_code const ok;
85  std::pair<std::error_code, size_t> ret;
86  milliseconds time_elapsed {0};
87  auto time_start = system_clock::now();
88 
89  while (1) {
90  ret = reader.Read(std::forward<Operation>(op), count, timeout_short);
91  if(ret.first != ok) {
92  LOG4CPLUS_ERROR(GetLogger(), "Reading from shm timed out: check queue is being filled");
93  LOG4CPLUS_ERROR(GetLogger(), "Read: " << ret.second << " in " << timeout_short.count() << " ms");
94  LOG4CPLUS_ERROR(GetLogger(), "Expected: " << count);
95  return ret.first;
96  }
97  read += ret.second;
98  if (read == count) {
99  return {};
100  }
101 
102  time_elapsed = duration_cast<milliseconds>(system_clock::now() - time_start);
103  if (time_elapsed > timeout_total) {
104  LOG4CPLUS_ERROR(GetLogger(), "Reading from shm timed out: check queue is being filled");
105  LOG4CPLUS_ERROR(GetLogger(), "Read: " << read << " in " << time_elapsed.count() << " ms");
106  LOG4CPLUS_ERROR(GetLogger(), "Expected: " << read);
107  return std::make_error_code(std::errc::timed_out);
108  }
109  }
110 }
111 
112 
113 
125 template <typename ReaderType>
126 std::error_code Skip(ReaderType& reader, size_t count, size_t loop_frequency, \
127  float error_margin = 2.0)
128 {
130  using namespace std::chrono;
131 
132  auto [timeout_short, timeout_total] = CalcTimeout(count, loop_frequency, error_margin);
133 
134  size_t skipped = 0;
135  std::error_code const ok;
136  std::pair<std::error_code, size_t> ret;
137  milliseconds time_elapsed {0};
138  auto time_start = system_clock::now();
139 
140  while (1) {
141  ret = reader.Skip(count, timeout_short);
142  if(ret.first != ok) {
143  LOG4CPLUS_ERROR(GetLogger(), "Reading from shm timed out: check queue is being filled");
144  LOG4CPLUS_ERROR(GetLogger(), "Read: " << ret.second << " in " << timeout_short.count() << " ms");
145  LOG4CPLUS_ERROR(GetLogger(), "Expected: " << count);
146  return ret.first;
147  }
148  skipped += ret.second;
149  if (skipped == count) {
150  return {};
151  }
152 
153  time_elapsed = duration_cast<milliseconds>(system_clock::now() - time_start);
154  if (time_elapsed > timeout_total) {
155  LOG4CPLUS_ERROR(GetLogger(), "Reading from shm timed out: check queue is being filled");
156  LOG4CPLUS_ERROR(GetLogger(), "Read: " << skipped << " in " << time_elapsed.count() << " ms");
157  LOG4CPLUS_ERROR(GetLogger(), "Expected: " << skipped);
158  return std::make_error_code(std::errc::timed_out);
159  }
160  }
161 }
162 
163 
172 template <typename ReaderType>
173 std::error_code Reset(ReaderType& reader)
174 {
175  if (reader.Size() != 0) {
176  return reader.Reset();
177  }
178  return {};
179 }
180 
189 template <typename ReaderType>
190 size_t NumFree(ReaderType& reader)
191 {
192  return reader.Size() - reader.NumAvailable();
193 }
194 
195 } // namespace
196 
197 } // namespace
198 
199 #endif
rtctk::dataTask::detail::CalcDuration
std::chrono::milliseconds CalcDuration(size_t count, size_t loop_frequency, float error_margin=1.0)
Definition: readerHelpers.hpp:34
rtctk::dataTask::detail::Reset
std::error_code Reset(ReaderType &reader)
Definition: readerHelpers.hpp:173
rtctk::dataTask::detail::Read
std::error_code Read(ReaderType &reader, Operation &&op, size_t count, size_t loop_frequency, float error_margin=2.0)
Definition: readerHelpers.hpp:75
rtctk::dataTask::detail::Skip
std::error_code Skip(ReaderType &reader, size_t count, size_t loop_frequency, float error_margin=2.0)
Definition: readerHelpers.hpp:126
rtctk::componentFramework::GetLogger
log4cplus::Logger & GetLogger(const std::string &name="")
rtctk::dataTask::detail::NumFree
size_t NumFree(ReaderType &reader)
Definition: readerHelpers.hpp:190
rtctk::dataTask
Definition: messageQueue.hpp:17
rtctk::dataTask::detail::CalcTimeout
std::pair< std::chrono::milliseconds, std::chrono::milliseconds > CalcTimeout(size_t count, size_t loop_frequency, float error_margin=2.0)
Definition: readerHelpers.hpp:50
rtctk::telRepub::make_error_code
std::error_code make_error_code(MudpiProcessingError e)
Definition: mudpiProcessingError.hpp:103
logger.hpp
Logging Support Library based on log4cplus.