RTC Toolkit  0.1.0-alpha
shmSubscriber.hpp
Go to the documentation of this file.
1 #ifndef RTCTK_STANDALONETOOLS_SHMSUBSCRIBER_H
2 #define RTCTK_STANDALONETOOLS_SHMSUBSCRIBER_H
3 
11 #include <memory>
12 #include <deque>
13 #include <string>
14 #include <chrono>
15 #include <iostream>
16 #include <iomanip>
17 #include <limits>
18 #include <cassert>
19 #include <ipcq/reader.hpp>
20 #include <ipcq/adapter.hpp>
21 #include <boost/io/ios_state.hpp>
22 
23 namespace rtctk::standaloneTools {
24 
29 public:
30 
31  ShmSubscriberBase() = default;
32  virtual ~ShmSubscriberBase() = default;
33  int Run(int argc, char *argv[]);
34 
35 protected:
36 
44  virtual void Initialise() = 0;
45 
51  virtual void Finalise() = 0;
52 
66  virtual bool ReadSample() = 0;
67 
74  virtual void PrintSample() = 0;
75 
79  virtual const void* GetSampleData() const = 0;
80 
84  virtual size_t GetSampleSize() const = 0;
85 
89  inline const std::string& GetQueueName() const {
90  return m_queue_name;
91  }
92 
97  inline const std::string& GetFilename() const {
98  return m_filename;
99  }
100 
104  inline const int64_t GetSampleNumber() const {
105  return m_sample_counter;
106  }
107 
112  inline const int64_t PrintWihtLongFormat() const {
113  return m_print_long;
114  }
115 
116 private:
117 
118  // Do not allow copying or moving of this object.
119  ShmSubscriberBase(const ShmSubscriberBase& rhs) = delete;
120  ShmSubscriberBase& operator=(const ShmSubscriberBase& rhs) = delete;
121  ShmSubscriberBase(ShmSubscriberBase&& rhs) = default;
122  ShmSubscriberBase& operator=(ShmSubscriberBase&& rhs) = default;
123 
124  bool ParseArguments(int argc, char *argv[]);
125  void WriteBufferToFile(const void* buffer, size_t size);
126  bool TerminateProcess();
127 
128  std::string m_queue_name;
129  std::string m_filename;
130  int64_t m_max_samples;
131  int64_t m_skip_samples;
132  bool m_print_samples;
133  bool m_print_long;
134  int64_t m_sample_counter;
135 };
136 
144 template<typename Topic,
145  class ConditionPolicy = ipcq::BoostConditionPolicy,
146  class ShmTraits = ipcq::detail::BoostInterprocessTraits>
148 public:
149  ShmSubscriber() = default;
150  virtual ~ShmSubscriber() = default;
151 
152 protected:
153 
164  virtual void PrintSample(const Topic& sample) {
165  boost::io::ios_flags_saver saved_state(std::cout);
166  std::cout << "Sample " << GetSampleNumber() << ":" << std::endl;
167  auto buffer = reinterpret_cast<const uint8_t*>(&sample);
168  size_t max_bytes_to_print = 64;
169  if (PrintWihtLongFormat()) {
170  max_bytes_to_print = std::numeric_limits<size_t>::max();
171  }
172  bool last_was_endl = false;
173  for (size_t n = 0; n < sizeof(Topic) and n < max_bytes_to_print; ++n) {
174  std::cout << "0x" << std::setfill('0') << std::setw(2) << std::right << std::noshowbase
175  << std::hex << (unsigned int)(buffer[n]);
176  if ((n+1) % 16 == 0) {
177  std::cout << std::endl;
178  last_was_endl = true;
179  } else {
180  std::cout << " ";
181  last_was_endl = false;
182  }
183  }
184  if (not last_was_endl) {
185  std::cout << std::endl;
186  }
187  if (sizeof(Topic) > max_bytes_to_print) {
188  std::cout << "... (data continues) ..." << std::endl;
189  }
190  }
191 
192 private:
193 
194  using Reader = ipcq::BasicReader<Topic, ConditionPolicy, ShmTraits>;
195 
196  // Do not allow copying or moving of this object.
197  ShmSubscriber(const ShmSubscriber& rhs) = delete;
198  ShmSubscriber& operator=(const ShmSubscriber& rhs) = delete;
199  ShmSubscriber(ShmSubscriber&& rhs) = default;
200  ShmSubscriber& operator=(ShmSubscriber&& rhs) = default;
201 
202 
206  void Initialise() override {
207  try {
208  m_reader = std::make_unique<Reader>(GetQueueName().c_str());
209  } catch (const std::exception& error) {
210  std::string msg = "Failed to create the shared memory reader for queue '"
211  + GetQueueName() + "': " + error.what();
212  throw std::runtime_error(msg);
213  }
214  }
215 
219  void Finalise() override {
220  try {
221  m_reader.reset(nullptr);
222  } catch (const std::exception& error) {
223  std::string msg = "Failed to destroy the shared memory reader for queue '"
224  + GetQueueName() + "': " + error.what();
225  throw std::runtime_error(msg);
226  }
227  m_samples.clear();
228  }
229 
237  bool ReadSample() override {
238  if (not m_samples.empty()) {
239  m_samples.pop_front();
240  }
241  if (not m_samples.empty()) {
242  return true;
243  }
244  using namespace std::chrono_literals;
245  auto count = m_reader->NumAvailable();
246  auto [error, num_elements] = m_reader->Read(ipcq::BackInserter(m_samples), count, 100ms);
247  if (error) {
248  if (error == ipcq::make_error_code(ipcq::Error::Timeout)) {
249  return false;
250  } else if (error == ipcq::make_error_code(ipcq::Error::InconsistentState)) {
251  // Use case for this tool is not conserned about missed samples. So when we get
252  // InconsistentState because of e.g. late joining we simply reset.
253  // Reset may fail if queue is Closed or if it is empty (nothing to reset to). We
254  // ignore that as well. Eventually there will be data in the queue and Reset() will
255  // succeed, or queue will be closed and Reset won't be attempted again.
256  if (!m_reader->Reset()) {
257  // Only log in the successful case as attempts to Reset will possibly otherwise
258  // flood the console with attempts if theres no data in the queue.
259  std::cerr << "Note: SHM reader state reset.\n";
260  }
261  return false;
262  } else {
263  std::string msg = "Failed to read from shared memory: " + error.message();
264  throw std::runtime_error(msg);
265  }
266  }
267  return num_elements > 0;
268  }
269 
273  void PrintSample() override {
274  assert(not m_samples.empty());
275  PrintSample(m_samples.front());
276  }
277 
281  const void* GetSampleData() const override {
282  assert(not m_samples.empty());
283  return reinterpret_cast<const void*>(&m_samples.front());
284  }
285 
289  size_t GetSampleSize() const override {
290  return sizeof(Topic);
291  }
292 
293  std::deque<Topic> m_samples;
294  std::unique_ptr<Reader> m_reader;
295 };
296 
297 } // namespace rtctk::standaloneTools
298 
299 #endif // RTCTK_STANDALONETOOLS_SHMSUBSCRIBER_H
rtctk::standaloneTools::ShmSubscriberBase::Initialise
virtual void Initialise()=0
Should perform any needed initialisation steps for the program.
rtctk::standaloneTools::ShmSubscriberBase::GetQueueName
const std::string & GetQueueName() const
Definition: shmSubscriber.hpp:89
rtctk::standaloneTools::ShmSubscriber::ShmSubscriber
ShmSubscriber()=default
rtctk::standaloneTools::ShmSubscriber::~ShmSubscriber
virtual ~ShmSubscriber()=default
rtctk::standaloneTools::ShmSubscriberBase::Run
int Run(int argc, char *argv[])
Executes the shared memory subscriber program.
Definition: shmSubscriber.cpp:43
rtctk::standaloneTools::ShmSubscriberBase::GetSampleData
virtual const void * GetSampleData() const =0
rtctk::standaloneTools::ShmSubscriberBase::GetSampleSize
virtual size_t GetSampleSize() const =0
rtctk::standaloneTools::ShmSubscriberBase::ShmSubscriberBase
ShmSubscriberBase()=default
rtctk::standaloneTools::ShmSubscriber
Implements basic features for a simple shared memory subscriber program.
Definition: shmSubscriber.hpp:147
rtctk::standaloneTools::ShmSubscriberBase::GetSampleNumber
const int64_t GetSampleNumber() const
Definition: shmSubscriber.hpp:104
rtctk::standaloneTools::ShmSubscriberBase::GetFilename
const std::string & GetFilename() const
Definition: shmSubscriber.hpp:97
rtctk::standaloneTools::ShmSubscriberBase::~ShmSubscriberBase
virtual ~ShmSubscriberBase()=default
rtctk::standaloneTools::ShmSubscriberBase::PrintWihtLongFormat
const int64_t PrintWihtLongFormat() const
Definition: shmSubscriber.hpp:112
rtctk::standaloneTools::ShmSubscriber::PrintSample
virtual void PrintSample(const Topic &sample)
Prints a hex dump of the sample.
Definition: shmSubscriber.hpp:164
rtctk::telRepub::make_error_code
std::error_code make_error_code(MudpiProcessingError e)
Definition: mudpiProcessingError.hpp:103
rtctk::standaloneTools
Definition: shmPub.hpp:34
rtctk::standaloneTools::ShmSubscriberBase::ReadSample
virtual bool ReadSample()=0
Should read a sample into internal buffers from the shared memory.
rtctk::standaloneTools::ShmSubscriberBase::Finalise
virtual void Finalise()=0
Must cleanup any objects created in Initialise.
error
void error(const char *msg)
Definition: main.cpp:29
rtctk::standaloneTools::ShmSubscriberBase::PrintSample
virtual void PrintSample()=0
Should print the contents of the read sample to console in a human readable format.
rtctk::standaloneTools::ShmSubscriberBase
Definition: shmSubscriber.hpp:28