ddt 1.1.0
Loading...
Searching...
No Matches
ddtMemoryAccessor.hpp
Go to the documentation of this file.
1
20#ifndef DDTMEMORYACCESSOR_H_
21#define DDTMEMORYACCESSOR_H_
22
23#include <boost/circular_buffer.hpp>
24#include <boost/interprocess/containers/string.hpp>
25#include <boost/interprocess/containers/vector.hpp>
26#include <boost/interprocess/managed_shared_memory.hpp>
27#include <boost/signals2/signal.hpp>
28#include <fstream>
29#include <future>
30#include <iostream>
31
32#include "ddt/ddtConstants.hpp"
33#include "ddt/ddtCrc32.hpp"
34#include "ddt/ddtLogger.hpp"
35
36namespace ip = boost::interprocess;
37
41typedef ip::managed_shared_memory::segment_manager segment_manager_t;
42
46typedef ip::allocator<void, segment_manager_t> void_allocator;
47
51typedef ip::allocator<uint8_t, segment_manager_t> uint8_allocator;
52
56typedef ip::vector<uint8_t, uint8_allocator> uint8_vector;
57
61typedef ip::allocator<uint16_t, segment_manager_t> uint16_allocator;
62
66typedef ip::vector<uint16_t, uint16_allocator> uint16_vector;
67
71typedef ip::allocator<char, segment_manager_t> char_allocator;
72
76typedef ip::basic_string<char, std::char_traits<char>, char_allocator>
78
82typedef boost::signals2::signal<void()> SignalT;
83
84namespace ddt {
85
93 int32_t topic_id = 0;
94
99
104
108 int32_t sample_id;
109
114
122 DataSampleShared(const int32_t id, const int md_length,
123 const int vector_length, const void_allocator &void_alloc)
124 : meta_data_length(md_length),
125 meta_data(md_length, uint8_t(), void_alloc),
126 sample_id(id),
127 data(vector_length, uint8_t(), void_alloc) {}
128};
129
140
144 uint32_t checksum;
145
150
154 int64_t writer_index = -1;
155
159 uint64_t timestamp = 0;
160
165
173 DataPacketShared(const char *const ds_id, const int32_t check,
174 const int vector_length, const void_allocator &void_alloc)
175 : data_stream_identifier(ds_id, void_alloc),
176 checksum(check),
177 sample_length(vector_length),
178 sample(0, META_DATA_LENGTH, vector_length, void_alloc) {}
179};
180
188 int32_t topic_id = 0;
189
194
198 std::vector<uint8_t> meta_data;
199
203 int32_t sample_id;
204
208 std::vector<uint8_t> data;
209
216 DataSample(const int32_t id, const int md_length, const int vector_length)
217 : meta_data_length(md_length),
218 meta_data(md_length),
219 sample_id(id),
220 data(vector_length) {}
221};
222
231
235 uint32_t checksum;
236
241
245 int64_t writer_index = -1;
246
250 uint64_t timestamp = 0;
251
256
263 DataPacket(const char *const ds_id, const int32_t check,
264 const int vector_length)
265 : data_stream_identifier(ds_id),
266 checksum(check),
267 sample_length(vector_length),
268 sample(0, META_DATA_LENGTH, vector_length) {}
269};
270
275 public:
280
289 explicit DdtMemoryAccessor(const std::string &shm_id,
290 const std::string &data_stream_identifier,
291 DdtLogger *logger, const uint64_t time_window = 0,
292 const int32_t reading_interval = 10);
293
297 virtual ~DdtMemoryAccessor();
298
304 const uint32_t ComputeChecksum(DataSampleShared *const data_sample_shared);
305
311 const uint32_t ComputeChecksum(DataSample *const data_sample);
312
317 int32_t OpenSharedMemory();
318
322 void CloseSharedMemory();
323
335 void WriteData(const int32_t writer_index, const int32_t topic_id,
336 const int32_t sample_id, const uint8_t *datavec,
337 const int32_t datavec_size, const uint8_t *metadata_vec,
338 const int32_t metadatavec_size, const uint64_t timestamp);
339
343 void StartReading();
344
348 void StopReading();
349
355 void SetSizeConstraints(const int32_t max_sample_size, const int32_t space);
356
360 void Reattach();
361
367 void NewData();
368
378 void get_data_packet(std::string *stream_identifier, uint32_t *checksum,
379 int32_t *sample_length, int64_t *writer_idx,
380 uint64_t *timestamp, DataSample **sample);
381
386 bool get_data_available();
387
393
399
404 void Reset();
405
410 void set_pub_unreg(const bool STATE);
411
417 void set_compute_checksum(const bool compute_crc);
418
423 bool get_compute_checksum() const;
424
429 bool get_is_initialized() const;
430
431 private:
432 SignalT data_available_signal;
433
437 void ReadData();
438
442 void SetDefaults();
443
452 void Init(const std::string &mem_id, const std::string &stream_id,
453 DdtLogger *ddt_logger, const uint64_t time_win,
454 const int32_t interval);
455
459 void PrintData();
460
465 int32_t CreateNewShm();
466
471 int32_t SearchCircBuffer();
472
477 int32_t SearchWriterIndex();
478
479 ip::managed_shared_memory *managed_shm;
480
485 typedef ip::allocator<DataPacketShared,
486 ip::managed_shared_memory::segment_manager>
487 cb_alloc;
488
492 typedef boost::circular_buffer<DataPacketShared, cb_alloc> cb;
493
494 std::string shm_id;
495 std::string data_stream_identifier;
496 uint64_t time_window; // in [ms]
497 int32_t reading_interval; // in [ms]
498 cb *circ_buffer;
499 std::atomic<int64_t> *writer_index;
500 int32_t local_index;
501 int64_t reader_index;
502
503 int32_t number_of_unread_elements;
504 int32_t circ_buf_capacity;
505 int32_t number_of_lost_packages;
506
507 std::mutex circ_buffer_mutex;
508 std::mutex packets_mutex;
509
510 bool is_initialized;
511
512 std::promise<void> exit_signal;
513 std::future<void> future_object;
514
515 std::atomic<bool> reading_active;
516 std::atomic<bool> pub_unreg;
517 std::atomic<bool> compute_checksum;
518
519 int32_t max_data_sample_size;
520 int additional_space;
521
522 std::list<DataPacketShared *> packets;
523
524 DdtLogger *logger;
525};
526
527} // namespace ddt
528
529#endif /* DDTMEMORYACCESSOR_H_ */
530
Definition: ddtLogger.hpp:51
Definition: ddtMemoryAccessor.hpp:274
virtual ~DdtMemoryAccessor()
Definition: ddtMemoryAccessor.cpp:37
void set_compute_checksum(const bool compute_crc)
Definition: ddtMemoryAccessor.cpp:519
void StopReading()
Definition: ddtMemoryAccessor.cpp:326
bool get_is_initialized() const
Definition: ddtMemoryAccessor.cpp:536
bool get_data_available()
Definition: ddtMemoryAccessor.cpp:403
void set_pub_unreg(const bool STATE)
Definition: ddtMemoryAccessor.cpp:517
void CloseSharedMemory()
Definition: ddtMemoryAccessor.cpp:82
void StartReading()
Definition: ddtMemoryAccessor.cpp:311
int32_t OpenSharedMemory()
Definition: ddtMemoryAccessor.cpp:126
void NewData()
Definition: ddtMemoryAccessor.cpp:354
int32_t get_number_of_unread_elements()
Definition: ddtMemoryAccessor.cpp:413
void WriteData(const int32_t writer_index, const int32_t topic_id, const int32_t sample_id, const uint8_t *datavec, const int32_t datavec_size, const uint8_t *metadata_vec, const int32_t metadatavec_size, const uint64_t timestamp)
Definition: ddtMemoryAccessor.cpp:192
void Reattach()
Definition: ddtMemoryAccessor.cpp:346
void Reset()
Definition: ddtMemoryAccessor.cpp:504
void get_data_packet(std::string *stream_identifier, uint32_t *checksum, int32_t *sample_length, int64_t *writer_idx, uint64_t *timestamp, DataSample **sample)
Definition: ddtMemoryAccessor.cpp:418
DdtMemoryAccessor()
Definition: ddtMemoryAccessor.cpp:26
SignalT * DataAvailableSignal()
Definition: ddtMemoryAccessor.cpp:500
bool get_compute_checksum() const
Definition: ddtMemoryAccessor.cpp:532
void SetSizeConstraints(const int32_t max_sample_size, const int32_t space)
Definition: ddtMemoryAccessor.cpp:340
const uint32_t ComputeChecksum(DataSampleShared *const data_sample_shared)
Definition: ddtMemoryAccessor.cpp:89
Contains common used constants. This file shall contain constants that can be used by all application...
Class to wrap the usage of log4cplus as logging utility. This file provides a wrapper class for the u...
ip::allocator< char, segment_manager_t > char_allocator
Definition: ddtMemoryAccessor.hpp:71
ip::vector< uint8_t, uint8_allocator > uint8_vector
Definition: ddtMemoryAccessor.hpp:56
ip::allocator< uint16_t, segment_manager_t > uint16_allocator
Definition: ddtMemoryAccessor.hpp:61
ip::allocator< uint8_t, segment_manager_t > uint8_allocator
Definition: ddtMemoryAccessor.hpp:51
ip::vector< uint16_t, uint16_allocator > uint16_vector
Definition: ddtMemoryAccessor.hpp:66
ip::allocator< void, segment_manager_t > void_allocator
Definition: ddtMemoryAccessor.hpp:46
ip::basic_string< char, std::char_traits< char >, char_allocator > char_string
Definition: ddtMemoryAccessor.hpp:77
boost::signals2::signal< void()> SignalT
Definition: ddtMemoryAccessor.hpp:82
ip::managed_shared_memory::segment_manager segment_manager_t
Definition: ddtMemoryAccessor.hpp:41
Definition: ddtClient.hpp:39
const int META_DATA_LENGTH
Definition: ddtConstants.hpp:68
Definition: ddtMemoryAccessor.hpp:135
DataPacketShared(const char *const ds_id, const int32_t check, const int vector_length, const void_allocator &void_alloc)
Definition: ddtMemoryAccessor.hpp:173
int32_t sample_length
Definition: ddtMemoryAccessor.hpp:149
DataSampleShared sample
Definition: ddtMemoryAccessor.hpp:164
uint32_t checksum
Definition: ddtMemoryAccessor.hpp:144
uint64_t timestamp
Definition: ddtMemoryAccessor.hpp:159
int64_t writer_index
Definition: ddtMemoryAccessor.hpp:154
char_string data_stream_identifier
Definition: ddtMemoryAccessor.hpp:139
Definition: ddtMemoryAccessor.hpp:226
int32_t sample_length
Definition: ddtMemoryAccessor.hpp:240
DataPacket(const char *const ds_id, const int32_t check, const int vector_length)
Definition: ddtMemoryAccessor.hpp:263
uint32_t checksum
Definition: ddtMemoryAccessor.hpp:235
DataSample sample
Definition: ddtMemoryAccessor.hpp:255
std::string data_stream_identifier
Definition: ddtMemoryAccessor.hpp:230
int64_t writer_index
Definition: ddtMemoryAccessor.hpp:245
uint64_t timestamp
Definition: ddtMemoryAccessor.hpp:250
Definition: ddtMemoryAccessor.hpp:89
int32_t topic_id
Definition: ddtMemoryAccessor.hpp:93
int32_t sample_id
Definition: ddtMemoryAccessor.hpp:108
int32_t meta_data_length
Definition: ddtMemoryAccessor.hpp:98
uint8_vector meta_data
Definition: ddtMemoryAccessor.hpp:103
uint8_vector data
Definition: ddtMemoryAccessor.hpp:113
DataSampleShared(const int32_t id, const int md_length, const int vector_length, const void_allocator &void_alloc)
Definition: ddtMemoryAccessor.hpp:122
Definition: ddtMemoryAccessor.hpp:184
std::vector< uint8_t > meta_data
Definition: ddtMemoryAccessor.hpp:198
int32_t topic_id
Definition: ddtMemoryAccessor.hpp:188
DataSample(const int32_t id, const int md_length, const int vector_length)
Definition: ddtMemoryAccessor.hpp:216
std::vector< uint8_t > data
Definition: ddtMemoryAccessor.hpp:208
int32_t sample_id
Definition: ddtMemoryAccessor.hpp:203
int32_t meta_data_length
Definition: ddtMemoryAccessor.hpp:193