ddt  1.0.0
ddtDataConsumer.hpp
Go to the documentation of this file.
1 
22 #ifndef DDTDATACONSUMER_HPP_
23 #define DDTDATACONSUMER_HPP_
24 
26 #include "ddt/ddtStatistics.hpp"
27 
28 namespace mal = ::elt::mal;
29 namespace datatransfer = ::elt::ddt::datatransfer;
30 
34 typedef boost::signals2::signal<void(datatransfer::NotificationType,
35  const std::string&)>
37 
41 typedef signal_n::slot_type slot_n;
42 
43 namespace ddt {
44 
53  public:
63  DdtDataConsumer(const std::string& data_stream_identifier,
64  const int32_t latency, const int32_t deadline,
65  DdtLogger* ddt_logger);
66 
77  DdtDataConsumer(const std::string& data_stream_identifier,
78  const std::string& subscription_uri, const int32_t latency,
79  const int32_t deadline, DdtLogger* ddt_logger);
80 
84  ~DdtDataConsumer() override;
85 
89  void StartSubscription();
90 
94  void StopSubscription();
95 
107  REMOTE
108  };
109 
115  void AddUuid(std::string uuid, SubscriberType type);
116 
121  void RemoveUuid(const std::string uuid);
122 
127  void Notify(const NotificationType type) override;
128 
133  int32_t get_number_of_subscribers();
134 
141 
147  std::map<std::string, SubscriberType> get_subscribers();
148 
153  std::string get_remote_broker_uri() const;
154 
159  int32_t get_notification_port() const;
160 
166 
170  void ResetStatistics();
171 
176  std::string get_publishing_uri() const;
177 
182  void set_remote_broker_uri(const std::string& remote_uri);
183 
188  void set_number_of_samples(const int32_t num_samples);
189 
195  void set_memory_accessor(DdtMemoryAccessor* mem_accessor);
196 
201  void set_notification_port(const int32_t noti_port);
202 
207  void set_publishing_uri(const std::string pub_uri);
208 
213  void set_originating_broker(const std::string orig_broker);
214 
219 
220  protected:
226  void Init(const std::string& ds_id, DdtLogger* ddt_logger);
227 
232 
237 
242 
247 
251  std::string publishing_uri;
252 
256  std::chrono::system_clock::time_point last_received;
257 
261  uint64_t total_samples = 0;
262 
266  uint64_t total_bytes = 0;
267 
271  uint64_t total_latency = 0;
272 
273  private:
282  void CreateSubscriber(const std::string& subscription_uri,
283  const int32_t latency, const int32_t deadline);
284 
288  void Subscribe();
289 
297  void CreateNotifier(const int32_t latency, const int32_t deadline);
298 
303  void ReceiveDataEvent(
304  const mal::ps::DataEvent<datatransfer::DataPacket>& event);
305 
306  std::unique_ptr<
307  mal::ps::Subscriber<datatransfer::DataPacket>,
308  std::default_delete<mal::ps::Subscriber<datatransfer::DataPacket> > >
309  data_subscriber;
310  mal::ps::DataEventFilter<datatransfer::DataPacket> filter;
311  std::shared_ptr<datatransfer::DataPacket> ddt_key_sample;
312  std::shared_ptr<datatransfer::DataPacket> ddt_data_packet;
313 
314  std::string remote_broker_uri;
315  std::string originating_broker;
316  std::mutex subscriber_mutex;
317  std::mutex statistics_mutex;
318 
323  std::map<std::string, SubscriberType> subscriber_map;
324 
325  std::promise<void> exit_signal;
326  std::future<void> future_object;
327 
332  std::unique_ptr<mal::ps::InstancePublisher<datatransfer::NotificationSample>,
333  std::default_delete<mal::ps::InstancePublisher<
334  datatransfer::NotificationSample> > >
335  notifier;
336  std::shared_ptr<datatransfer::NotificationSample> ddt_notification_sample;
337 };
338 
339 } // namespace ddt
340 
341 #endif /* DDTDATACONSUMER_HPP_ */
342 
Definition: ddtDataConsumer.hpp:52
void Notify(const NotificationType type) override
Definition: ddtDataConsumer.cpp:94
uint64_t total_latency
Definition: ddtDataConsumer.hpp:271
void AddUuid(std::string uuid, SubscriberType type)
Definition: ddtDataConsumer.cpp:229
void set_originating_broker(const std::string orig_broker)
Definition: ddtDataConsumer.cpp:329
signal_n notification_signal
Definition: ddtDataConsumer.hpp:218
int32_t number_of_samples
Definition: ddtDataConsumer.hpp:241
void set_notification_port(const int32_t noti_port)
Definition: ddtDataConsumer.cpp:321
void set_memory_accessor(DdtMemoryAccessor *mem_accessor)
Definition: ddtDataConsumer.cpp:317
int32_t get_number_of_remote_subscribers()
Definition: ddtDataConsumer.cpp:251
DdtDataConsumer(const std::string &data_stream_identifier, const int32_t latency, const int32_t deadline, DdtLogger *ddt_logger)
Definition: ddtDataConsumer.cpp:26
void ResetStatistics()
Definition: ddtDataConsumer.cpp:298
~DdtDataConsumer() override
void StartSubscription()
Definition: ddtDataConsumer.cpp:333
std::string get_publishing_uri() const
Definition: ddtDataConsumer.cpp:305
int32_t notification_port
Definition: ddtDataConsumer.hpp:246
std::string data_stream_identifier
Definition: ddtDataConsumer.hpp:236
std::map< std::string, SubscriberType > get_subscribers()
Definition: ddtDataConsumer.cpp:271
int32_t get_number_of_subscribers()
Definition: ddtDataConsumer.cpp:246
uint64_t total_samples
Definition: ddtDataConsumer.hpp:261
void set_publishing_uri(const std::string pub_uri)
Definition: ddtDataConsumer.cpp:325
int32_t get_notification_port() const
Definition: ddtDataConsumer.cpp:280
void RemoveUuid(const std::string uuid)
Definition: ddtDataConsumer.cpp:235
SubscriberType
Definition: ddtDataConsumer.hpp:99
@ REMOTE
Definition: ddtDataConsumer.hpp:107
@ LOCAL
Definition: ddtDataConsumer.hpp:103
void set_remote_broker_uri(const std::string &remote_uri)
Definition: ddtDataConsumer.cpp:309
DdtStatistics get_statistics()
Definition: ddtDataConsumer.cpp:284
uint64_t total_bytes
Definition: ddtDataConsumer.hpp:266
std::string get_remote_broker_uri() const
Definition: ddtDataConsumer.cpp:276
DdtMemoryAccessor * memory_accessor
Definition: ddtDataConsumer.hpp:231
std::string publishing_uri
Definition: ddtDataConsumer.hpp:251
void StopSubscription()
Definition: ddtDataConsumer.cpp:342
void Init(const std::string &ds_id, DdtLogger *ddt_logger)
Definition: ddtDataConsumer.cpp:46
std::chrono::system_clock::time_point last_received
Definition: ddtDataConsumer.hpp:256
void set_number_of_samples(const int32_t num_samples)
Definition: ddtDataConsumer.cpp:313
Definition: ddtLogger.hpp:51
Definition: ddtMemoryAccessor.hpp:274
Definition: ddtProducerConsumerBase.hpp:43
NotificationType
Definition: ddtProducerConsumerBase.hpp:59
boost::signals2::signal< void(datatransfer::NotificationType, const std::string &)> signal_n
Definition: ddtDataConsumer.hpp:36
signal_n::slot_type slot_n
Definition: ddtDataConsumer.hpp:41
Base class for DdtDataProducer and DdtDataConsumer. This class serves as a base class for DdtDataProd...
Statistics for the monitoring API. This struct contains the raw values for the monitoring API.
Definition: ddtClient.hpp:39
Definition: ddtStatistics.hpp:27