ddt 1.2.1
Loading...
Searching...
No Matches
ddtDataConsumer.hpp
Go to the documentation of this file.
1
22#ifndef DDTDATACONSUMER_HPP_
23#define DDTDATACONSUMER_HPP_
24
26#include "ddt/ddtStatistics.hpp"
27
28namespace mal = ::elt::mal;
29namespace datatransfer = ::elt::ddt::datatransfer;
30
34typedef boost::signals2::signal<void(datatransfer::NotificationType,
35 const std::string&)>
37
41typedef signal_n::slot_type slot_n;
42
43namespace ddt {
44
53 public:
63 DdtDataConsumer(const std::string& data_stream_identifier,
64 const int32_t latency, const int32_t deadline,
65 DdtLogger* ddt_logger);
66
77 DdtDataConsumer(const std::string& data_stream_identifier,
78 const std::string& subscription_uri, const int32_t latency,
79 const int32_t deadline, DdtLogger* ddt_logger);
80
84 ~DdtDataConsumer() override;
85
89 void StartSubscription();
90
94 void StopSubscription();
95
109
115 void AddUuid(std::string uuid, SubscriberType type);
116
121 void RemoveUuid(const std::string uuid);
122
127 void Notify(const NotificationType type) override;
128
134
141
147 std::map<std::string, SubscriberType> get_subscribers();
148
153 std::string get_remote_broker_uri() const;
154
159 int32_t get_notification_port() const;
160
166
170 void ResetStatistics();
171
176 std::string get_publishing_uri() const;
177
182 void set_remote_broker_uri(const std::string& remote_uri);
183
188 void set_number_of_samples(const int32_t num_samples);
189
195 void set_memory_accessor(DdtMemoryAccessor* mem_accessor);
196
201 void set_notification_port(const int32_t noti_port);
202
207 void set_publishing_uri(const std::string pub_uri);
208
213 void set_originating_broker(const std::string orig_broker);
214
219
220 protected:
226 void Init(const std::string& ds_id, DdtLogger* ddt_logger);
227
232
237
242
247
251 std::string publishing_uri;
252
256 std::chrono::system_clock::time_point last_received;
257
261 uint64_t total_samples = 0;
262
266 uint64_t total_bytes = 0;
267
271 uint64_t total_latency = 0;
272
273 private:
282 void CreateSubscriber(const std::string& subscription_uri,
283 const int32_t latency, const int32_t deadline);
284
288 void Subscribe();
289
297 void CreateNotifier(const int32_t latency, const int32_t deadline);
298
303 void ReceiveDataEvent(
304 const mal::ps::DataEvent<datatransfer::DataPacket>& event);
305
306 std::unique_ptr<
307 mal::ps::Subscriber<datatransfer::DataPacket>,
308 std::default_delete<mal::ps::Subscriber<datatransfer::DataPacket> > >
309 data_subscriber;
310 mal::ps::DataEventFilter<datatransfer::DataPacket> filter;
311 std::shared_ptr<datatransfer::DataPacket> ddt_key_sample;
312 std::shared_ptr<datatransfer::DataPacket> ddt_data_packet;
313
314 std::string remote_broker_uri;
315 std::string originating_broker;
316 std::mutex subscriber_mutex;
317 std::mutex statistics_mutex;
318
323 std::map<std::string, SubscriberType> subscriber_map;
324
325 std::promise<void> exit_signal;
326 std::future<void> future_object;
327
332 std::unique_ptr<mal::ps::InstancePublisher<datatransfer::NotificationSample>,
333 std::default_delete<mal::ps::InstancePublisher<
334 datatransfer::NotificationSample> > >
335 notifier;
336 std::shared_ptr<datatransfer::NotificationSample> ddt_notification_sample;
337};
338
339} // namespace ddt
340
341#endif /* DDTDATACONSUMER_HPP_ */
342
Definition ddtDataConsumer.hpp:52
void Notify(const NotificationType type) override
Definition ddtDataConsumer.cpp:94
uint64_t total_latency
Definition ddtDataConsumer.hpp:271
void AddUuid(std::string uuid, SubscriberType type)
Definition ddtDataConsumer.cpp:229
void set_originating_broker(const std::string orig_broker)
Definition ddtDataConsumer.cpp:329
signal_n notification_signal
Definition ddtDataConsumer.hpp:218
int32_t number_of_samples
Definition ddtDataConsumer.hpp:241
void set_notification_port(const int32_t noti_port)
Definition ddtDataConsumer.cpp:321
void set_memory_accessor(DdtMemoryAccessor *mem_accessor)
Definition ddtDataConsumer.cpp:317
int32_t get_number_of_remote_subscribers()
Definition ddtDataConsumer.cpp:251
DdtDataConsumer(const std::string &data_stream_identifier, const int32_t latency, const int32_t deadline, DdtLogger *ddt_logger)
Definition ddtDataConsumer.cpp:26
void ResetStatistics()
Definition ddtDataConsumer.cpp:298
~DdtDataConsumer() override
void StartSubscription()
Definition ddtDataConsumer.cpp:333
std::string get_publishing_uri() const
Definition ddtDataConsumer.cpp:305
int32_t notification_port
Definition ddtDataConsumer.hpp:246
std::string data_stream_identifier
Definition ddtDataConsumer.hpp:236
std::map< std::string, SubscriberType > get_subscribers()
Definition ddtDataConsumer.cpp:271
int32_t get_number_of_subscribers()
Definition ddtDataConsumer.cpp:246
uint64_t total_samples
Definition ddtDataConsumer.hpp:261
void set_publishing_uri(const std::string pub_uri)
Definition ddtDataConsumer.cpp:325
int32_t get_notification_port() const
Definition ddtDataConsumer.cpp:280
void RemoveUuid(const std::string uuid)
Definition ddtDataConsumer.cpp:235
SubscriberType
Definition ddtDataConsumer.hpp:99
@ REMOTE
Definition ddtDataConsumer.hpp:107
@ LOCAL
Definition ddtDataConsumer.hpp:103
void set_remote_broker_uri(const std::string &remote_uri)
Definition ddtDataConsumer.cpp:309
DdtStatistics get_statistics()
Definition ddtDataConsumer.cpp:284
uint64_t total_bytes
Definition ddtDataConsumer.hpp:266
std::string get_remote_broker_uri() const
Definition ddtDataConsumer.cpp:276
DdtMemoryAccessor * memory_accessor
Definition ddtDataConsumer.hpp:231
std::string publishing_uri
Definition ddtDataConsumer.hpp:251
void StopSubscription()
Definition ddtDataConsumer.cpp:342
void Init(const std::string &ds_id, DdtLogger *ddt_logger)
Definition ddtDataConsumer.cpp:46
std::chrono::system_clock::time_point last_received
Definition ddtDataConsumer.hpp:256
void set_number_of_samples(const int32_t num_samples)
Definition ddtDataConsumer.cpp:313
Definition ddtLogger.hpp:51
Definition ddtMemoryAccessor.hpp:274
Definition ddtProducerConsumerBase.hpp:43
NotificationType
Definition ddtProducerConsumerBase.hpp:59
boost::signals2::signal< void(datatransfer::NotificationType, const std::string &) signal_n)
Definition ddtDataConsumer.hpp:36
signal_n::slot_type slot_n
Definition ddtDataConsumer.hpp:41
Base class for DdtDataProducer and DdtDataConsumer. This class serves as a base class for DdtDataProd...
Statistics for the monitoring API. This struct contains the raw values for the monitoring API.
Definition ddtClient.hpp:39
Definition ddtStatistics.hpp:27