Go to the documentation of this file.
17 #ifndef DDTMEMORYACCESSOR_H_
18 #define DDTMEMORYACCESSOR_H_
20 #include <boost/circular_buffer.hpp>
21 #include <boost/interprocess/containers/string.hpp>
22 #include <boost/interprocess/containers/vector.hpp>
23 #include <boost/interprocess/managed_shared_memory.hpp>
24 #include <boost/signals2/signal.hpp>
33 namespace ip = boost::interprocess;
73 typedef ip::basic_string<char, std::char_traits<char>,
char_allocator>
79 typedef boost::signals2::signal<void()>
signal_t;
119 meta_data(md_length, uint8_t(), void_alloc),
121 data(vector_length, uint8_t(), void_alloc) {}
168 sample(0, 1024, vector_length, void_alloc) {}
208 data(vector_length) {}
248 DataPacket(
const char *ds_id, int32_t check,
int vector_length)
252 sample(0, 1024, vector_length) {}
269 const std::string &data_stream_identifier,
270 DdtLogger *logger,
const uint64_t time_window = 0,
271 const int32_t reading_interval = 10);
301 void WriteData(
const int32_t writer_index,
const int32_t topic_id,
302 const int32_t sample_id,
const void *datavec,
303 const int32_t datavec_size,
const void *metadata_vec,
304 const int32_t metadatavec_size,
const uint64_t timestamp);
331 void get_data_packet(std::string *data_stream_identifier, uint32_t *checksum,
332 int32_t *sample_length, int64_t *writer_index,
397 void Init(
const std::string &mem_id,
const std::string &stream_id,
398 DdtLogger *ddt_logger,
const uint64_t time_win,
399 const int32_t interval);
401 ip::managed_shared_memory *managed_shm;
408 ip::managed_shared_memory::segment_manager>
414 typedef boost::circular_buffer<DataPacketShared, cb_alloc> cb;
417 std::string data_stream_identifier;
418 uint64_t time_window;
419 int32_t reading_interval;
421 int64_t *writer_index;
423 int64_t reader_index;
425 int32_t number_of_unread_elements;
426 int32_t circ_buf_capacity;
427 int32_t number_of_lost_packages;
429 std::mutex circ_buffer_mutex;
430 std::mutex packets_mutex;
434 std::promise<void> exit_signal;
435 std::future<void> future_object;
437 std::atomic<bool> reading_active;
438 std::atomic<bool> pub_unreg;
439 std::atomic<bool> compute_checksum;
441 std::list<DataPacketShared *> packets;
int64_t writer_index
Definition: ddtMemoryAccessor.hpp:148
int32_t topic_id
Definition: ddtMemoryAccessor.hpp:178
boost::signals2::signal< void()> signal_t
Definition: ddtMemoryAccessor.hpp:79
DataSample sample
Definition: ddtMemoryAccessor.hpp:243
void NewData()
Definition: ddtMemoryAccessor.cpp:291
Definition: ddtMemoryAccessor.hpp:129
int32_t meta_data_length
Definition: ddtMemoryAccessor.hpp:183
bool get_compute_checksum()
Definition: ddtMemoryAccessor.cpp:467
int32_t sample_length
Definition: ddtMemoryAccessor.hpp:228
void Reattach()
Definition: ddtMemoryAccessor.cpp:283
Definition: ddtMemoryAccessor.hpp:86
Definition: ddtLogger.hpp:71
Definition: ddtMemoryAccessor.hpp:258
int32_t OpenSharedMemory()
Definition: ddtMemoryAccessor.cpp:113
void StopReading()
Definition: ddtMemoryAccessor.cpp:269
int32_t sample_id
Definition: ddtMemoryAccessor.hpp:193
Definition: ddtClient.hpp:36
int32_t sample_length
Definition: ddtMemoryAccessor.hpp:143
int32_t meta_data_length
Definition: ddtMemoryAccessor.hpp:95
uint64_t timestamp
Definition: ddtMemoryAccessor.hpp:238
void WriteData(const int32_t writer_index, const int32_t topic_id, const int32_t sample_id, const void *datavec, const int32_t datavec_size, const void *metadata_vec, const int32_t metadatavec_size, const uint64_t timestamp)
Definition: ddtMemoryAccessor.cpp:153
ip::allocator< char, segment_manager_t > char_allocator
Definition: ddtMemoryAccessor.hpp:68
DataSampleShared(int32_t id, int md_length, int vector_length, const void_allocator &void_alloc)
Definition: ddtMemoryAccessor.hpp:115
virtual ~DdtMemoryAccessor()
Definition: ddtMemoryAccessor.cpp:32
bool get_data_available()
Definition: ddtMemoryAccessor.cpp:338
void CloseSharedMemory()
Definition: ddtMemoryAccessor.cpp:70
void StartReading()
Definition: ddtMemoryAccessor.cpp:254
std::vector< uint8_t > meta_data
Definition: ddtMemoryAccessor.hpp:188
ip::allocator< void, segment_manager_t > void_allocator
Definition: ddtMemoryAccessor.hpp:43
std::vector< uint8_t > data
Definition: ddtMemoryAccessor.hpp:198
char_string data_stream_identifier
Definition: ddtMemoryAccessor.hpp:133
bool get_is_initialized()
Definition: ddtMemoryAccessor.cpp:469
Definition: ddtMemoryAccessor.hpp:174
void get_data_packet(std::string *data_stream_identifier, uint32_t *checksum, int32_t *sample_length, int64_t *writer_index, uint64_t *timestamp, DataSample **sample)
Definition: ddtMemoryAccessor.cpp:352
std::string data_stream_identifier
Definition: ddtMemoryAccessor.hpp:218
Definition: ddtMemoryAccessor.hpp:214
ip::basic_string< char, std::char_traits< char >, char_allocator > char_string
Definition: ddtMemoryAccessor.hpp:74
int32_t sample_id
Definition: ddtMemoryAccessor.hpp:105
int64_t writer_index
Definition: ddtMemoryAccessor.hpp:233
int32_t topic_id
Definition: ddtMemoryAccessor.hpp:90
void Reset()
Definition: ddtMemoryAccessor.cpp:441
uint8_vector meta_data
Definition: ddtMemoryAccessor.hpp:100
int32_t get_number_of_unread_elements()
Definition: ddtMemoryAccessor.cpp:347
ip::allocator< uint16_t, segment_manager_t > uint16_allocator
Definition: ddtMemoryAccessor.hpp:58
ip::vector< uint16_t, uint16_allocator > uint16_vector
Definition: ddtMemoryAccessor.hpp:63
ip::allocator< uint8_t, segment_manager_t > uint8_allocator
Definition: ddtMemoryAccessor.hpp:48
DataPacket(const char *ds_id, int32_t check, int vector_length)
Definition: ddtMemoryAccessor.hpp:248
uint8_vector data
Definition: ddtMemoryAccessor.hpp:110
void set_pub_unreg(const bool STATE)
Definition: ddtMemoryAccessor.cpp:452
uint32_t checksum
Definition: ddtMemoryAccessor.hpp:223
uint64_t timestamp
Definition: ddtMemoryAccessor.hpp:153
void set_compute_checksum(bool compute_checksum)
Definition: ddtMemoryAccessor.cpp:454
signal_t * DataAvailableSignal()
Definition: ddtMemoryAccessor.cpp:437
const uint32_t ComputeChecksum(const DataSampleShared *data_sample)
Definition: ddtMemoryAccessor.cpp:77
uint32_t checksum
Definition: ddtMemoryAccessor.hpp:138
DdtMemoryAccessor()
Definition: ddtMemoryAccessor.cpp:21
ip::managed_shared_memory::segment_manager segment_manager_t
Definition: ddtMemoryAccessor.hpp:38
DataSample(int32_t id, int md_length, int vector_length)
Definition: ddtMemoryAccessor.hpp:203
DataPacketShared(const char *ds_id, int32_t check, int vector_length, const void_allocator &void_alloc)
Definition: ddtMemoryAccessor.hpp:163
ip::vector< uint8_t, uint8_allocator > uint8_vector
Definition: ddtMemoryAccessor.hpp:53
DataSampleShared sample
Definition: ddtMemoryAccessor.hpp:158