ddt  1.0.0
ddtMemoryAccessor.hpp
Go to the documentation of this file.
1 
20 #ifndef DDTMEMORYACCESSOR_H_
21 #define DDTMEMORYACCESSOR_H_
22 
23 #include <boost/circular_buffer.hpp>
24 #include <boost/interprocess/containers/string.hpp>
25 #include <boost/interprocess/containers/vector.hpp>
26 #include <boost/interprocess/managed_shared_memory.hpp>
27 #include <boost/signals2/signal.hpp>
28 #include <fstream>
29 #include <future>
30 #include <iostream>
31 
32 #include "ddt/ddtConstants.hpp"
33 #include "ddt/ddtCrc32.hpp"
34 #include "ddt/ddtLogger.hpp"
35 
36 namespace ip = boost::interprocess;
37 
41 typedef ip::managed_shared_memory::segment_manager segment_manager_t;
42 
46 typedef ip::allocator<void, segment_manager_t> void_allocator;
47 
51 typedef ip::allocator<uint8_t, segment_manager_t> uint8_allocator;
52 
56 typedef ip::vector<uint8_t, uint8_allocator> uint8_vector;
57 
61 typedef ip::allocator<uint16_t, segment_manager_t> uint16_allocator;
62 
66 typedef ip::vector<uint16_t, uint16_allocator> uint16_vector;
67 
71 typedef ip::allocator<char, segment_manager_t> char_allocator;
72 
76 typedef ip::basic_string<char, std::char_traits<char>, char_allocator>
78 
82 typedef boost::signals2::signal<void()> SignalT;
83 
84 namespace ddt {
85 
93  int32_t topic_id = 0;
94 
99 
104 
108  int32_t sample_id;
109 
114 
122  DataSampleShared(const int32_t id, const int md_length,
123  const int vector_length, const void_allocator &void_alloc)
124  : meta_data_length(md_length),
125  meta_data(md_length, uint8_t(), void_alloc),
126  sample_id(id),
127  data(vector_length, uint8_t(), void_alloc) {}
128 };
129 
140 
144  uint32_t checksum;
145 
149  int32_t sample_length;
150 
154  int64_t writer_index = -1;
155 
159  uint64_t timestamp = 0;
160 
165 
173  DataPacketShared(const char *const ds_id, const int32_t check,
174  const int vector_length, const void_allocator &void_alloc)
175  : data_stream_identifier(ds_id, void_alloc),
176  checksum(check),
177  sample_length(vector_length),
178  sample(0, META_DATA_LENGTH, vector_length, void_alloc) {}
179 };
180 
184 struct DataSample {
188  int32_t topic_id = 0;
189 
194 
198  std::vector<uint8_t> meta_data;
199 
203  int32_t sample_id;
204 
208  std::vector<uint8_t> data;
209 
216  DataSample(const int32_t id, const int md_length, const int vector_length)
217  : meta_data_length(md_length),
218  meta_data(md_length),
219  sample_id(id),
220  data(vector_length) {}
221 };
222 
226 struct DataPacket {
231 
235  uint32_t checksum;
236 
240  int32_t sample_length;
241 
245  int64_t writer_index = -1;
246 
250  uint64_t timestamp = 0;
251 
256 
263  DataPacket(const char *const ds_id, const int32_t check,
264  const int vector_length)
265  : data_stream_identifier(ds_id),
266  checksum(check),
267  sample_length(vector_length),
268  sample(0, META_DATA_LENGTH, vector_length) {}
269 };
270 
275  public:
280 
289  explicit DdtMemoryAccessor(const std::string &shm_id,
290  const std::string &data_stream_identifier,
291  DdtLogger *logger, const uint64_t time_window = 0,
292  const int32_t reading_interval = 10);
293 
297  virtual ~DdtMemoryAccessor();
298 
304  const uint32_t ComputeChecksum(DataSampleShared *const data_sample_shared);
305 
311  const uint32_t ComputeChecksum(DataSample *const data_sample);
312 
317  int32_t OpenSharedMemory();
318 
322  void CloseSharedMemory();
323 
335  void WriteData(const int32_t writer_index, const int32_t topic_id,
336  const int32_t sample_id, const uint8_t *datavec,
337  const int32_t datavec_size, const uint8_t *metadata_vec,
338  const int32_t metadatavec_size, const uint64_t timestamp);
339 
343  void StartReading();
344 
348  void StopReading();
349 
355  void SetSizeConstraints(const int32_t max_sample_size, const int32_t space);
356 
360  void Reattach();
361 
367  void NewData();
368 
378  void get_data_packet(std::string *stream_identifier, uint32_t *checksum,
379  int32_t *sample_length, int64_t *writer_idx,
380  uint64_t *timestamp, DataSample **sample);
381 
386  bool get_data_available();
387 
393 
399 
404  void Reset();
405 
410  void set_pub_unreg(const bool STATE);
411 
417  void set_compute_checksum(const bool compute_crc);
418 
423  bool get_compute_checksum() const;
424 
429  bool get_is_initialized() const;
430 
431  private:
432  SignalT data_available_signal;
433 
437  void ReadData();
438 
442  void SetDefaults();
443 
452  void Init(const std::string &mem_id, const std::string &stream_id,
453  DdtLogger *ddt_logger, const uint64_t time_win,
454  const int32_t interval);
455 
459  void PrintData();
460 
465  int32_t CreateNewShm();
466 
471  int32_t SearchCircBuffer();
472 
477  int32_t SearchWriterIndex();
478 
479  ip::managed_shared_memory *managed_shm;
480 
485  typedef ip::allocator<DataPacketShared,
486  ip::managed_shared_memory::segment_manager>
487  cb_alloc;
488 
492  typedef boost::circular_buffer<DataPacketShared, cb_alloc> cb;
493 
494  std::string shm_id;
495  std::string data_stream_identifier;
496  uint64_t time_window; // in [ms]
497  int32_t reading_interval; // in [ms]
498  cb *circ_buffer;
499  std::atomic<int64_t> *writer_index;
500  int32_t local_index;
501  int64_t reader_index;
502 
503  int32_t number_of_unread_elements;
504  int32_t circ_buf_capacity;
505  int32_t number_of_lost_packages;
506 
507  std::mutex circ_buffer_mutex;
508  std::mutex packets_mutex;
509 
510  bool is_initialized;
511 
512  std::promise<void> exit_signal;
513  std::future<void> future_object;
514 
515  std::atomic<bool> reading_active;
516  std::atomic<bool> pub_unreg;
517  std::atomic<bool> compute_checksum;
518 
519  int32_t max_data_sample_size;
520  int additional_space;
521 
522  std::list<DataPacketShared *> packets;
523 
524  DdtLogger *logger;
525 };
526 
527 } // namespace ddt
528 
529 #endif /* DDTMEMORYACCESSOR_H_ */
530 
Definition: ddtLogger.hpp:51
Definition: ddtMemoryAccessor.hpp:274
virtual ~DdtMemoryAccessor()
Definition: ddtMemoryAccessor.cpp:37
void set_compute_checksum(const bool compute_crc)
Definition: ddtMemoryAccessor.cpp:519
void StopReading()
Definition: ddtMemoryAccessor.cpp:326
bool get_is_initialized() const
Definition: ddtMemoryAccessor.cpp:536
bool get_data_available()
Definition: ddtMemoryAccessor.cpp:403
void set_pub_unreg(const bool STATE)
Definition: ddtMemoryAccessor.cpp:517
void CloseSharedMemory()
Definition: ddtMemoryAccessor.cpp:82
void StartReading()
Definition: ddtMemoryAccessor.cpp:311
int32_t OpenSharedMemory()
Definition: ddtMemoryAccessor.cpp:126
void NewData()
Definition: ddtMemoryAccessor.cpp:354
int32_t get_number_of_unread_elements()
Definition: ddtMemoryAccessor.cpp:413
void WriteData(const int32_t writer_index, const int32_t topic_id, const int32_t sample_id, const uint8_t *datavec, const int32_t datavec_size, const uint8_t *metadata_vec, const int32_t metadatavec_size, const uint64_t timestamp)
Definition: ddtMemoryAccessor.cpp:192
void Reattach()
Definition: ddtMemoryAccessor.cpp:346
void Reset()
Definition: ddtMemoryAccessor.cpp:504
void get_data_packet(std::string *stream_identifier, uint32_t *checksum, int32_t *sample_length, int64_t *writer_idx, uint64_t *timestamp, DataSample **sample)
Definition: ddtMemoryAccessor.cpp:418
DdtMemoryAccessor()
Definition: ddtMemoryAccessor.cpp:26
SignalT * DataAvailableSignal()
Definition: ddtMemoryAccessor.cpp:500
bool get_compute_checksum() const
Definition: ddtMemoryAccessor.cpp:532
void SetSizeConstraints(const int32_t max_sample_size, const int32_t space)
Definition: ddtMemoryAccessor.cpp:340
const uint32_t ComputeChecksum(DataSampleShared *const data_sample_shared)
Definition: ddtMemoryAccessor.cpp:89
Contains common used constants. This file shall contain constants that can be used by all application...
Class to wrap the usage of log4cplus as logging utility. This file provides a wrapper class for the u...
ip::allocator< char, segment_manager_t > char_allocator
Definition: ddtMemoryAccessor.hpp:71
ip::vector< uint8_t, uint8_allocator > uint8_vector
Definition: ddtMemoryAccessor.hpp:56
ip::allocator< uint16_t, segment_manager_t > uint16_allocator
Definition: ddtMemoryAccessor.hpp:61
ip::allocator< uint8_t, segment_manager_t > uint8_allocator
Definition: ddtMemoryAccessor.hpp:51
ip::vector< uint16_t, uint16_allocator > uint16_vector
Definition: ddtMemoryAccessor.hpp:66
ip::allocator< void, segment_manager_t > void_allocator
Definition: ddtMemoryAccessor.hpp:46
ip::basic_string< char, std::char_traits< char >, char_allocator > char_string
Definition: ddtMemoryAccessor.hpp:77
boost::signals2::signal< void()> SignalT
Definition: ddtMemoryAccessor.hpp:82
ip::managed_shared_memory::segment_manager segment_manager_t
Definition: ddtMemoryAccessor.hpp:41
Definition: ddtClient.hpp:39
const int META_DATA_LENGTH
Definition: ddtConstants.hpp:68
Definition: ddtMemoryAccessor.hpp:135
DataPacketShared(const char *const ds_id, const int32_t check, const int vector_length, const void_allocator &void_alloc)
Definition: ddtMemoryAccessor.hpp:173
int32_t sample_length
Definition: ddtMemoryAccessor.hpp:149
DataSampleShared sample
Definition: ddtMemoryAccessor.hpp:164
uint32_t checksum
Definition: ddtMemoryAccessor.hpp:144
uint64_t timestamp
Definition: ddtMemoryAccessor.hpp:159
int64_t writer_index
Definition: ddtMemoryAccessor.hpp:154
char_string data_stream_identifier
Definition: ddtMemoryAccessor.hpp:139
Definition: ddtMemoryAccessor.hpp:226
int32_t sample_length
Definition: ddtMemoryAccessor.hpp:240
DataPacket(const char *const ds_id, const int32_t check, const int vector_length)
Definition: ddtMemoryAccessor.hpp:263
uint32_t checksum
Definition: ddtMemoryAccessor.hpp:235
DataSample sample
Definition: ddtMemoryAccessor.hpp:255
std::string data_stream_identifier
Definition: ddtMemoryAccessor.hpp:230
int64_t writer_index
Definition: ddtMemoryAccessor.hpp:245
uint64_t timestamp
Definition: ddtMemoryAccessor.hpp:250
Definition: ddtMemoryAccessor.hpp:89
int32_t topic_id
Definition: ddtMemoryAccessor.hpp:93
int32_t sample_id
Definition: ddtMemoryAccessor.hpp:108
int32_t meta_data_length
Definition: ddtMemoryAccessor.hpp:98
uint8_vector meta_data
Definition: ddtMemoryAccessor.hpp:103
uint8_vector data
Definition: ddtMemoryAccessor.hpp:113
DataSampleShared(const int32_t id, const int md_length, const int vector_length, const void_allocator &void_alloc)
Definition: ddtMemoryAccessor.hpp:122
Definition: ddtMemoryAccessor.hpp:184
std::vector< uint8_t > meta_data
Definition: ddtMemoryAccessor.hpp:198
int32_t topic_id
Definition: ddtMemoryAccessor.hpp:188
DataSample(const int32_t id, const int md_length, const int vector_length)
Definition: ddtMemoryAccessor.hpp:216
std::vector< uint8_t > data
Definition: ddtMemoryAccessor.hpp:208
int32_t sample_id
Definition: ddtMemoryAccessor.hpp:203
int32_t meta_data_length
Definition: ddtMemoryAccessor.hpp:193