ddt  0.1
ddtMemoryAccessor.hpp
Go to the documentation of this file.
1 // @copyright
2 // (c) Copyright ESO 2020
3 // All Rights Reserved
4 // ESO (eso.org) is an Intergovernmental Organization, and therefore special
5 // legal conditions apply.
6 //
7 // @file ddtMemoryAccessor.hpp
8 // @brief Accessor for a shared memory.
9 //
10 // This class provides the functionalities to access created shared memories
11 // (especially read and write functionality).
12 //
13 // @author Matthias Grimm, CGI
14 // @since 2020/01/16
15 //
16 
17 #ifndef DDTMEMORYACCESSOR_H_
18 #define DDTMEMORYACCESSOR_H_
19 
20 #include <boost/circular_buffer.hpp>
21 #include <boost/interprocess/containers/string.hpp>
22 #include <boost/interprocess/containers/vector.hpp>
23 #include <boost/interprocess/managed_shared_memory.hpp>
24 #include <boost/signals2/signal.hpp>
25 #include <fstream>
26 #include <future>
27 #include <iostream>
28 
29 #include "ddt/ddtConstants.hpp"
30 #include "ddt/ddtCrc32.hpp"
31 #include "ddt/ddtLogger.hpp"
32 
33 namespace ip = boost::interprocess;
34 
38 typedef ip::managed_shared_memory::segment_manager segment_manager_t;
39 
43 typedef ip::allocator<void, segment_manager_t> void_allocator;
44 
48 typedef ip::allocator<uint8_t, segment_manager_t> uint8_allocator;
49 
53 typedef ip::vector<uint8_t, uint8_allocator> uint8_vector;
54 
58 typedef ip::allocator<uint16_t, segment_manager_t> uint16_allocator;
59 
63 typedef ip::vector<uint16_t, uint16_allocator> uint16_vector;
64 
68 typedef ip::allocator<char, segment_manager_t> char_allocator;
69 
73 typedef ip::basic_string<char, std::char_traits<char>, char_allocator>
75 
79 typedef boost::signals2::signal<void()> signal_t;
80 
81 namespace ddt {
82 
90  int32_t topic_id;
91 
96 
101 
105  int32_t sample_id;
106 
111 
115  DataSampleShared(int32_t id, int md_length, int vector_length,
116  const void_allocator &void_alloc)
117  : topic_id(0),
118  meta_data_length(md_length),
119  meta_data(md_length, uint8_t(), void_alloc),
120  sample_id(id),
121  data(vector_length, uint8_t(), void_alloc) {}
122 };
123 
134 
138  uint32_t checksum;
139 
143  int32_t sample_length;
144 
148  int64_t writer_index = -1;
149 
153  uint64_t timestamp = 0;
154 
159 
163  DataPacketShared(const char *ds_id, int32_t check, int vector_length,
164  const void_allocator &void_alloc)
165  : data_stream_identifier(ds_id, void_alloc),
166  checksum(check),
167  sample_length(vector_length),
168  sample(0, 1024, vector_length, void_alloc) {}
169 };
170 
174 struct DataSample {
178  int32_t topic_id;
179 
184 
188  std::vector<uint8_t> meta_data;
189 
193  int32_t sample_id;
194 
198  std::vector<uint8_t> data;
199 
203  DataSample(int32_t id, int md_length, int vector_length)
204  : topic_id(0),
205  meta_data_length(md_length),
206  meta_data(md_length),
207  sample_id(id),
208  data(vector_length) {}
209 };
210 
214 struct DataPacket {
219 
223  uint32_t checksum;
224 
228  int32_t sample_length;
229 
233  int64_t writer_index = -1;
234 
238  uint64_t timestamp = 0;
239 
244 
248  DataPacket(const char *ds_id, int32_t check, int vector_length)
249  : data_stream_identifier(ds_id),
250  checksum(check),
251  sample_length(vector_length),
252  sample(0, 1024, vector_length) {}
253 };
254 
259  public:
264 
268  explicit DdtMemoryAccessor(const std::string &shm_id,
269  const std::string &data_stream_identifier,
270  DdtLogger *logger, const uint64_t time_window = 0,
271  const int32_t reading_interval = 10);
272 
276  virtual ~DdtMemoryAccessor();
277 
281  const uint32_t ComputeChecksum(const DataSampleShared *data_sample);
282 
286  const uint32_t ComputeChecksum(const DataSample *data_sample);
287 
291  int32_t OpenSharedMemory();
292 
296  void CloseSharedMemory();
297 
301  void WriteData(const int32_t writer_index, const int32_t topic_id,
302  const int32_t sample_id, const void *datavec,
303  const int32_t datavec_size, const void *metadata_vec,
304  const int32_t metadatavec_size, const uint64_t timestamp);
305 
309  void StartReading();
310 
314  void StopReading();
315 
319  void Reattach();
320 
326  void NewData();
327 
331  void get_data_packet(std::string *data_stream_identifier, uint32_t *checksum,
332  int32_t *sample_length, int64_t *writer_index,
333  uint64_t *timestamp, DataSample **sample);
334 
338  bool get_data_available();
339 
344 
349 
354  void Reset();
355 
359  void set_pub_unreg(const bool STATE);
360 
364  void set_compute_checksum(bool compute_checksum);
365 
369  bool get_compute_checksum();
370 
374  bool get_is_initialized();
375 
376  private:
377  signal_t data_available_signal;
378 
382  void ReadData();
383 
387  void SetDefaults();
388 
397  void Init(const std::string &mem_id, const std::string &stream_id,
398  DdtLogger *ddt_logger, const uint64_t time_win,
399  const int32_t interval);
400 
401  ip::managed_shared_memory *managed_shm;
402 
407  typedef ip::allocator<DataPacketShared,
408  ip::managed_shared_memory::segment_manager>
409  cb_alloc;
410 
414  typedef boost::circular_buffer<DataPacketShared, cb_alloc> cb;
415 
416  std::string shm_id;
417  std::string data_stream_identifier;
418  uint64_t time_window; // in [ms]
419  int32_t reading_interval; // in [ms]
420  cb *circ_buffer;
421  int64_t *writer_index;
422  int32_t local_index;
423  int64_t reader_index;
424 
425  int32_t number_of_unread_elements;
426  int32_t circ_buf_capacity;
427  int32_t number_of_lost_packages;
428 
429  std::mutex circ_buffer_mutex;
430  std::mutex packets_mutex;
431 
432  bool is_initialized;
433 
434  std::promise<void> exit_signal;
435  std::future<void> future_object;
436 
437  std::atomic<bool> reading_active;
438  std::atomic<bool> pub_unreg;
439  std::atomic<bool> compute_checksum;
440 
441  std::list<DataPacketShared *> packets;
442 
443  DdtLogger *logger;
444 };
445 
446 } // namespace ddt
447 
448 #endif /* DDTMEMORYACCESSOR_H_ */
ddt::DataPacketShared::writer_index
int64_t writer_index
Definition: ddtMemoryAccessor.hpp:148
ddt::DataSample::topic_id
int32_t topic_id
Definition: ddtMemoryAccessor.hpp:178
signal_t
boost::signals2::signal< void()> signal_t
Definition: ddtMemoryAccessor.hpp:79
ddt::DataPacket::sample
DataSample sample
Definition: ddtMemoryAccessor.hpp:243
ddt::DdtMemoryAccessor::NewData
void NewData()
Definition: ddtMemoryAccessor.cpp:291
ddt::DataPacketShared
Definition: ddtMemoryAccessor.hpp:129
ddt::DataSample::meta_data_length
int32_t meta_data_length
Definition: ddtMemoryAccessor.hpp:183
ddt::DdtMemoryAccessor::get_compute_checksum
bool get_compute_checksum()
Definition: ddtMemoryAccessor.cpp:467
ddt::DataPacket::sample_length
int32_t sample_length
Definition: ddtMemoryAccessor.hpp:228
ddt::DdtMemoryAccessor::Reattach
void Reattach()
Definition: ddtMemoryAccessor.cpp:283
ddt::DataSampleShared
Definition: ddtMemoryAccessor.hpp:86
ddt::DdtLogger
Definition: ddtLogger.hpp:71
ddt::DdtMemoryAccessor
Definition: ddtMemoryAccessor.hpp:258
ddt::DdtMemoryAccessor::OpenSharedMemory
int32_t OpenSharedMemory()
Definition: ddtMemoryAccessor.cpp:113
ddt::DdtMemoryAccessor::StopReading
void StopReading()
Definition: ddtMemoryAccessor.cpp:269
ddt::DataSample::sample_id
int32_t sample_id
Definition: ddtMemoryAccessor.hpp:193
ddtConstants.hpp
ddt
Definition: ddtClient.hpp:36
ddt::DataPacketShared::sample_length
int32_t sample_length
Definition: ddtMemoryAccessor.hpp:143
ddt::DataSampleShared::meta_data_length
int32_t meta_data_length
Definition: ddtMemoryAccessor.hpp:95
ddt::DataPacket::timestamp
uint64_t timestamp
Definition: ddtMemoryAccessor.hpp:238
ddt::DdtMemoryAccessor::WriteData
void WriteData(const int32_t writer_index, const int32_t topic_id, const int32_t sample_id, const void *datavec, const int32_t datavec_size, const void *metadata_vec, const int32_t metadatavec_size, const uint64_t timestamp)
Definition: ddtMemoryAccessor.cpp:153
char_allocator
ip::allocator< char, segment_manager_t > char_allocator
Definition: ddtMemoryAccessor.hpp:68
ddt::DataSampleShared::DataSampleShared
DataSampleShared(int32_t id, int md_length, int vector_length, const void_allocator &void_alloc)
Definition: ddtMemoryAccessor.hpp:115
ddt::DdtMemoryAccessor::~DdtMemoryAccessor
virtual ~DdtMemoryAccessor()
Definition: ddtMemoryAccessor.cpp:32
ddt::DdtMemoryAccessor::get_data_available
bool get_data_available()
Definition: ddtMemoryAccessor.cpp:338
ddt::DdtMemoryAccessor::CloseSharedMemory
void CloseSharedMemory()
Definition: ddtMemoryAccessor.cpp:70
ddt::DdtMemoryAccessor::StartReading
void StartReading()
Definition: ddtMemoryAccessor.cpp:254
ddt::DataSample::meta_data
std::vector< uint8_t > meta_data
Definition: ddtMemoryAccessor.hpp:188
void_allocator
ip::allocator< void, segment_manager_t > void_allocator
Definition: ddtMemoryAccessor.hpp:43
ddt::DataSample::data
std::vector< uint8_t > data
Definition: ddtMemoryAccessor.hpp:198
ddt::DataPacketShared::data_stream_identifier
char_string data_stream_identifier
Definition: ddtMemoryAccessor.hpp:133
ddt::DdtMemoryAccessor::get_is_initialized
bool get_is_initialized()
Definition: ddtMemoryAccessor.cpp:469
ddt::DataSample
Definition: ddtMemoryAccessor.hpp:174
ddt::DdtMemoryAccessor::get_data_packet
void get_data_packet(std::string *data_stream_identifier, uint32_t *checksum, int32_t *sample_length, int64_t *writer_index, uint64_t *timestamp, DataSample **sample)
Definition: ddtMemoryAccessor.cpp:352
ddt::DataPacket::data_stream_identifier
std::string data_stream_identifier
Definition: ddtMemoryAccessor.hpp:218
ddt::DataPacket
Definition: ddtMemoryAccessor.hpp:214
char_string
ip::basic_string< char, std::char_traits< char >, char_allocator > char_string
Definition: ddtMemoryAccessor.hpp:74
ddt::DataSampleShared::sample_id
int32_t sample_id
Definition: ddtMemoryAccessor.hpp:105
ddt::DataPacket::writer_index
int64_t writer_index
Definition: ddtMemoryAccessor.hpp:233
ddt::DataSampleShared::topic_id
int32_t topic_id
Definition: ddtMemoryAccessor.hpp:90
ddt::DdtMemoryAccessor::Reset
void Reset()
Definition: ddtMemoryAccessor.cpp:441
ddt::DataSampleShared::meta_data
uint8_vector meta_data
Definition: ddtMemoryAccessor.hpp:100
ddt::DdtMemoryAccessor::get_number_of_unread_elements
int32_t get_number_of_unread_elements()
Definition: ddtMemoryAccessor.cpp:347
uint16_allocator
ip::allocator< uint16_t, segment_manager_t > uint16_allocator
Definition: ddtMemoryAccessor.hpp:58
uint16_vector
ip::vector< uint16_t, uint16_allocator > uint16_vector
Definition: ddtMemoryAccessor.hpp:63
uint8_allocator
ip::allocator< uint8_t, segment_manager_t > uint8_allocator
Definition: ddtMemoryAccessor.hpp:48
ddt::DataPacket::DataPacket
DataPacket(const char *ds_id, int32_t check, int vector_length)
Definition: ddtMemoryAccessor.hpp:248
ddt::DataSampleShared::data
uint8_vector data
Definition: ddtMemoryAccessor.hpp:110
ddtLogger.hpp
ddt::DdtMemoryAccessor::set_pub_unreg
void set_pub_unreg(const bool STATE)
Definition: ddtMemoryAccessor.cpp:452
ddt::DataPacket::checksum
uint32_t checksum
Definition: ddtMemoryAccessor.hpp:223
ddt::DataPacketShared::timestamp
uint64_t timestamp
Definition: ddtMemoryAccessor.hpp:153
ddt::DdtMemoryAccessor::set_compute_checksum
void set_compute_checksum(bool compute_checksum)
Definition: ddtMemoryAccessor.cpp:454
ddt::DdtMemoryAccessor::DataAvailableSignal
signal_t * DataAvailableSignal()
Definition: ddtMemoryAccessor.cpp:437
ddt::DdtMemoryAccessor::ComputeChecksum
const uint32_t ComputeChecksum(const DataSampleShared *data_sample)
Definition: ddtMemoryAccessor.cpp:77
ddt::DataPacketShared::checksum
uint32_t checksum
Definition: ddtMemoryAccessor.hpp:138
ddt::DdtMemoryAccessor::DdtMemoryAccessor
DdtMemoryAccessor()
Definition: ddtMemoryAccessor.cpp:21
segment_manager_t
ip::managed_shared_memory::segment_manager segment_manager_t
Definition: ddtMemoryAccessor.hpp:38
ddt::DataSample::DataSample
DataSample(int32_t id, int md_length, int vector_length)
Definition: ddtMemoryAccessor.hpp:203
ddt::DataPacketShared::DataPacketShared
DataPacketShared(const char *ds_id, int32_t check, int vector_length, const void_allocator &void_alloc)
Definition: ddtMemoryAccessor.hpp:163
uint8_vector
ip::vector< uint8_t, uint8_allocator > uint8_vector
Definition: ddtMemoryAccessor.hpp:53
ddtCrc32.hpp
ddt::DataPacketShared::sample
DataSampleShared sample
Definition: ddtMemoryAccessor.hpp:158