Go to the documentation of this file. 1 #ifndef RTCTK_STANDALONETOOLS_SHMSUBSCRIBER_H
2 #define RTCTK_STANDALONETOOLS_SHMSUBSCRIBER_H
19 #include <ipcq/reader.hpp>
20 #include <ipcq/adapter.hpp>
21 #include <boost/io/ios_state.hpp>
33 int Run(
int argc,
char *argv[]);
105 return m_sample_counter;
124 bool ParseArguments(
int argc,
char *argv[]);
125 void WriteBufferToFile(
const void* buffer,
size_t size);
126 bool TerminateProcess();
128 std::string m_queue_name;
129 std::string m_filename;
130 int64_t m_max_samples;
131 int64_t m_skip_samples;
132 bool m_print_samples;
134 int64_t m_sample_counter;
144 template<
typename Topic,
145 class ConditionPolicy = ipcq::BoostConditionPolicy,
146 class ShmTraits = ipcq::detail::BoostInterprocessTraits>
165 boost::io::ios_flags_saver saved_state(std::cout);
167 auto buffer =
reinterpret_cast<const uint8_t*
>(&sample);
168 size_t max_bytes_to_print = 64;
170 max_bytes_to_print = std::numeric_limits<size_t>::max();
172 bool last_was_endl =
false;
173 for (
size_t n = 0; n <
sizeof(Topic) and n < max_bytes_to_print; ++n) {
174 std::cout <<
"0x" << std::setfill(
'0') << std::setw(2) << std::right << std::noshowbase
175 << std::hex << (
unsigned int)(buffer[n]);
176 if ((n+1) % 16 == 0) {
177 std::cout << std::endl;
178 last_was_endl =
true;
181 last_was_endl =
false;
184 if (not last_was_endl) {
185 std::cout << std::endl;
187 if (
sizeof(Topic) > max_bytes_to_print) {
188 std::cout <<
"... (data continues) ..." << std::endl;
194 using Reader = ipcq::BasicReader<Topic, ConditionPolicy, ShmTraits>;
206 void Initialise()
override {
208 m_reader = std::make_unique<Reader>(
GetQueueName().c_str());
209 }
catch (
const std::exception&
error) {
210 std::string msg =
"Failed to create the shared memory reader for queue '"
212 throw std::runtime_error(msg);
219 void Finalise()
override {
221 m_reader.reset(
nullptr);
222 }
catch (
const std::exception&
error) {
223 std::string msg =
"Failed to destroy the shared memory reader for queue '"
225 throw std::runtime_error(msg);
237 bool ReadSample()
override {
238 if (not m_samples.empty()) {
239 m_samples.pop_front();
241 if (not m_samples.empty()) {
244 using namespace std::chrono_literals;
245 auto count = m_reader->NumAvailable();
246 auto [
error, num_elements] = m_reader->Read(ipcq::BackInserter(m_samples), count, 100ms);
256 if (!m_reader->Reset()) {
259 std::cerr <<
"Note: SHM reader state reset.\n";
263 std::string msg =
"Failed to read from shared memory: " +
error.message();
264 throw std::runtime_error(msg);
267 return num_elements > 0;
274 assert(not m_samples.empty());
281 const void* GetSampleData()
const override {
282 assert(not m_samples.empty());
283 return reinterpret_cast<const void*
>(&m_samples.front());
289 size_t GetSampleSize()
const override {
290 return sizeof(Topic);
293 std::deque<Topic> m_samples;
294 std::unique_ptr<Reader> m_reader;
299 #endif // RTCTK_STANDALONETOOLS_SHMSUBSCRIBER_H
std::error_code make_error_code(MudpiProcessingError e)
Definition: mudpiProcessingError.hpp:103
void error(const char *msg)
Definition: main.cpp:29