22#ifndef DDTDATACONSUMER_HPP_
23#define DDTDATACONSUMER_HPP_
28namespace mal = ::elt::mal;
29namespace datatransfer = ::elt::ddt::datatransfer;
34typedef boost::signals2::signal<void(datatransfer::NotificationType,
64 const int32_t latency,
const int32_t deadline,
78 const std::string& subscription_uri,
const int32_t latency,
79 const int32_t deadline,
DdtLogger* ddt_logger);
282 void CreateSubscriber(
const std::string& subscription_uri,
283 const int32_t latency,
const int32_t deadline);
297 void CreateNotifier(
const int32_t latency,
const int32_t deadline);
303 void ReceiveDataEvent(
304 const mal::ps::DataEvent<datatransfer::DataPacket>& event);
307 mal::ps::Subscriber<datatransfer::DataPacket>,
308 std::default_delete<mal::ps::Subscriber<datatransfer::DataPacket> > >
310 mal::ps::DataEventFilter<datatransfer::DataPacket> filter;
311 std::shared_ptr<datatransfer::DataPacket> ddt_key_sample;
312 std::shared_ptr<datatransfer::DataPacket> ddt_data_packet;
314 std::string remote_broker_uri;
315 std::string originating_broker;
316 std::mutex subscriber_mutex;
317 std::mutex statistics_mutex;
323 std::map<std::string, SubscriberType> subscriber_map;
325 std::promise<void> exit_signal;
326 std::future<void> future_object;
332 std::unique_ptr<mal::ps::InstancePublisher<datatransfer::NotificationSample>,
333 std::default_delete<mal::ps::InstancePublisher<
334 datatransfer::NotificationSample> > >
336 std::shared_ptr<datatransfer::NotificationSample> ddt_notification_sample;
Definition: ddtDataConsumer.hpp:52
void Notify(const NotificationType type) override
Definition: ddtDataConsumer.cpp:94
uint64_t total_latency
Definition: ddtDataConsumer.hpp:271
void AddUuid(std::string uuid, SubscriberType type)
Definition: ddtDataConsumer.cpp:229
void set_originating_broker(const std::string orig_broker)
Definition: ddtDataConsumer.cpp:329
signal_n notification_signal
Definition: ddtDataConsumer.hpp:218
int32_t number_of_samples
Definition: ddtDataConsumer.hpp:241
void set_notification_port(const int32_t noti_port)
Definition: ddtDataConsumer.cpp:321
void set_memory_accessor(DdtMemoryAccessor *mem_accessor)
Definition: ddtDataConsumer.cpp:317
int32_t get_number_of_remote_subscribers()
Definition: ddtDataConsumer.cpp:251
void ResetStatistics()
Definition: ddtDataConsumer.cpp:298
~DdtDataConsumer() override
void StartSubscription()
Definition: ddtDataConsumer.cpp:333
std::string get_publishing_uri() const
Definition: ddtDataConsumer.cpp:305
int32_t notification_port
Definition: ddtDataConsumer.hpp:246
std::string data_stream_identifier
Definition: ddtDataConsumer.hpp:236
std::map< std::string, SubscriberType > get_subscribers()
Definition: ddtDataConsumer.cpp:271
int32_t get_number_of_subscribers()
Definition: ddtDataConsumer.cpp:246
uint64_t total_samples
Definition: ddtDataConsumer.hpp:261
void set_publishing_uri(const std::string pub_uri)
Definition: ddtDataConsumer.cpp:325
int32_t get_notification_port() const
Definition: ddtDataConsumer.cpp:280
void RemoveUuid(const std::string uuid)
Definition: ddtDataConsumer.cpp:235
SubscriberType
Definition: ddtDataConsumer.hpp:99
@ REMOTE
Definition: ddtDataConsumer.hpp:107
@ LOCAL
Definition: ddtDataConsumer.hpp:103
void set_remote_broker_uri(const std::string &remote_uri)
Definition: ddtDataConsumer.cpp:309
DdtStatistics get_statistics()
Definition: ddtDataConsumer.cpp:284
uint64_t total_bytes
Definition: ddtDataConsumer.hpp:266
std::string get_remote_broker_uri() const
Definition: ddtDataConsumer.cpp:276
DdtMemoryAccessor * memory_accessor
Definition: ddtDataConsumer.hpp:231
std::string publishing_uri
Definition: ddtDataConsumer.hpp:251
void StopSubscription()
Definition: ddtDataConsumer.cpp:342
void Init(const std::string &ds_id, DdtLogger *ddt_logger)
Definition: ddtDataConsumer.cpp:46
std::chrono::system_clock::time_point last_received
Definition: ddtDataConsumer.hpp:256
void set_number_of_samples(const int32_t num_samples)
Definition: ddtDataConsumer.cpp:313
Definition: ddtLogger.hpp:51
Definition: ddtMemoryAccessor.hpp:274
Definition: ddtProducerConsumerBase.hpp:43
NotificationType
Definition: ddtProducerConsumerBase.hpp:59
boost::signals2::signal< void(datatransfer::NotificationType, const std::string &)> signal_n
Definition: ddtDataConsumer.hpp:36
signal_n::slot_type slot_n
Definition: ddtDataConsumer.hpp:41
Base class for DdtDataProducer and DdtDataConsumer. This class serves as a base class for DdtDataProd...
Statistics for the monitoring API. This struct contains the raw values for the monitoring API.
Definition: ddtClient.hpp:39
Definition: ddtStatistics.hpp:27