Go to the documentation of this file.
20 #ifndef DDTDATACONSUMER_HPP_
21 #define DDTDATACONSUMER_HPP_
26 namespace mal = ::elt::mal;
27 namespace datatransfer = ::elt::ddt::datatransfer;
32 typedef boost::signals2::signal<void(datatransfer::NotificationType,
56 const int32_t latency,
const int32_t deadline,
63 const std::string& subscription_uri,
const int32_t latency,
64 const int32_t deadline,
DdtLogger* ddt_logger);
186 void Init(
const std::string& ds_id,
DdtLogger* ddt_logger);
191 void CreateSubscriber(
const std::string& subscription_uri,
192 const int32_t latency,
const int32_t deadline);
202 void CreateNotifier(
const int32_t latency,
const int32_t deadline);
208 void ReceiveDataEvent(
209 const mal::ps::DataEvent<datatransfer::DataPacket>& event);
212 mal::ps::Subscriber<datatransfer::DataPacket>,
213 std::default_delete<mal::ps::Subscriber<datatransfer::DataPacket> > >
215 mal::ps::DataEventFilter<datatransfer::DataPacket> filter;
216 std::shared_ptr<datatransfer::DataPacket> ddt_key_sample;
217 std::shared_ptr<datatransfer::DataPacket> ddt_data_packet;
221 std::string data_stream_identifier;
222 std::string remote_broker_uri;
223 int32_t number_of_samples;
224 int32_t notification_port;
226 std::mutex subscriber_mutex;
227 std::mutex statistics_mutex;
229 std::string publishing_uri;
237 std::chrono::system_clock::time_point
239 uint64_t total_samples =
241 uint64_t total_bytes =
243 uint64_t total_latency = 0;
249 std::map<std::string, SubscriberType> subscriber_map;
251 std::promise<void> exit_signal;
252 std::future<void> future_object;
258 std::unique_ptr<mal::ps::InstancePublisher<datatransfer::NotificationSample>,
259 std::default_delete<mal::ps::InstancePublisher<
260 datatransfer::NotificationSample> > >
262 std::shared_ptr<datatransfer::NotificationSample> ddt_notification_sample;
const std::string get_publishing_uri()
Definition: ddtDataConsumer.cpp:276
Definition: ddtStatistics.hpp:21
int32_t get_number_of_subscribers()
Definition: ddtDataConsumer.cpp:226
Definition: ddtDataConsumer.hpp:50
void set_remote_broker_uri(const std::string &remote_uri)
Definition: ddtDataConsumer.cpp:280
virtual ~DdtDataConsumer()
@ LOCAL
Definition: ddtDataConsumer.hpp:88
signal_n::slot_type slot_n
Definition: ddtDataConsumer.hpp:39
DdtDataConsumer(const std::string &data_stream_identifier, const int32_t latency, const int32_t deadline, DdtLogger *ddt_logger)
Definition: ddtDataConsumer.cpp:24
Definition: ddtLogger.hpp:71
Definition: ddtMemoryAccessor.hpp:258
Definition: ddtClient.hpp:36
SubscriberType
Definition: ddtDataConsumer.hpp:84
void set_memory_accessor(DdtMemoryAccessor *memory_accessor)
Definition: ddtDataConsumer.cpp:288
@ REMOTE
Definition: ddtDataConsumer.hpp:92
void StartSubscription()
Definition: ddtDataConsumer.cpp:300
void set_notification_port(const int32_t noti_port)
Definition: ddtDataConsumer.cpp:292
void set_publishing_uri(const std::string pub_uri)
Definition: ddtDataConsumer.cpp:296
const std::string get_remote_broker_uri()
Definition: ddtDataConsumer.cpp:250
NotificationType
Definition: ddtProducerConsumerBase.hpp:58
void ResetStatistics()
Definition: ddtDataConsumer.cpp:270
const int32_t get_notification_port()
Definition: ddtDataConsumer.cpp:254
signal_n notification_signal
Definition: ddtDataConsumer.hpp:180
void StopSubscription()
Definition: ddtDataConsumer.cpp:309
void AddUuid(std::string uuid, SubscriberType type)
Definition: ddtDataConsumer.cpp:212
void set_number_of_samples(const int32_t num_samples)
Definition: ddtDataConsumer.cpp:284
boost::signals2::signal< void(datatransfer::NotificationType, const std::string &)> signal_n
Definition: ddtDataConsumer.hpp:34
Definition: ddtProducerConsumerBase.hpp:39
DdtStatistics get_statistics()
Definition: ddtDataConsumer.cpp:258
void RemoveUuid(std::string uuid)
Definition: ddtDataConsumer.cpp:217
std::map< std::string, SubscriberType > get_subscribers()
Definition: ddtDataConsumer.cpp:245
int32_t get_number_of_remote_subscribers()
Definition: ddtDataConsumer.cpp:231
void Notify(const NotificationType type) override
Definition: ddtDataConsumer.cpp:90