ddt  0.1
ddtDataConsumer.hpp
Go to the documentation of this file.
1 // @copyright
2 // (c) Copyright ESO 2020
3 // All Rights Reserved
4 // ESO (eso.org) is an Intergovernmental Organization, and therefore special
5 // legal conditions apply.
6 //
7 // @file ddtDataConsumer.hpp
8 // @brief Data Consumer.
9 //
10 // This class provides the functionality to subscribe to a data stream, to
11 // write the received data into shared memory and to notify DdtDataSubscribers
12 // that new data is available. There will be one data consumer object per
13 // data stream identifier and thus several DdtDataSubscribers may share one
14 // consumer object.
15 //
16 // @author Matthias Grimm, CGI
17 // @since 2020/01/16
18 //
19 
20 #ifndef DDTDATACONSUMER_HPP_
21 #define DDTDATACONSUMER_HPP_
22 
24 #include "ddt/ddtStatistics.hpp"
25 
26 namespace mal = ::elt::mal;
27 namespace datatransfer = ::elt::ddt::datatransfer;
28 
32 typedef boost::signals2::signal<void(datatransfer::NotificationType,
33  const std::string&)>
35 
39 typedef signal_n::slot_type slot_n;
40 
41 namespace ddt {
42 
51  public:
55  DdtDataConsumer(const std::string& data_stream_identifier,
56  const int32_t latency, const int32_t deadline,
57  DdtLogger* ddt_logger);
58 
62  DdtDataConsumer(const std::string& data_stream_identifier,
63  const std::string& subscription_uri, const int32_t latency,
64  const int32_t deadline, DdtLogger* ddt_logger);
65 
69  virtual ~DdtDataConsumer();
70 
74  void StartSubscription();
75 
79  void StopSubscription();
80 
92  REMOTE
93  };
94 
98  void AddUuid(std::string uuid, SubscriberType type);
99 
103  void RemoveUuid(std::string uuid);
104 
108  void Notify(const NotificationType type) override;
109 
113  int32_t get_number_of_subscribers();
114 
120 
125  std::map<std::string, SubscriberType> get_subscribers();
126 
130  const std::string get_remote_broker_uri();
131 
135  const int32_t get_notification_port();
136 
141 
145  void ResetStatistics();
146 
150  const std::string get_publishing_uri();
151 
155  void set_remote_broker_uri(const std::string& remote_uri);
156 
160  void set_number_of_samples(const int32_t num_samples);
161 
165  void set_memory_accessor(DdtMemoryAccessor* memory_accessor);
166 
170  void set_notification_port(const int32_t noti_port);
171 
175  void set_publishing_uri(const std::string pub_uri);
176 
181 
182  private:
186  void Init(const std::string& ds_id, DdtLogger* ddt_logger);
187 
191  void CreateSubscriber(const std::string& subscription_uri,
192  const int32_t latency, const int32_t deadline);
193 
197  void Subscribe();
198 
202  void CreateNotifier(const int32_t latency, const int32_t deadline);
203 
208  void ReceiveDataEvent(
209  const mal::ps::DataEvent<datatransfer::DataPacket>& event);
210 
211  std::unique_ptr<
212  mal::ps::Subscriber<datatransfer::DataPacket>,
213  std::default_delete<mal::ps::Subscriber<datatransfer::DataPacket> > >
214  data_subscriber;
215  mal::ps::DataEventFilter<datatransfer::DataPacket> filter;
216  std::shared_ptr<datatransfer::DataPacket> ddt_key_sample;
217  std::shared_ptr<datatransfer::DataPacket> ddt_data_packet;
218 
219  DdtMemoryAccessor* memory_accessor;
220 
221  std::string data_stream_identifier;
222  std::string remote_broker_uri;
223  int32_t number_of_samples;
224  int32_t notification_port;
225 
226  std::mutex subscriber_mutex;
227  std::mutex statistics_mutex;
228 
229  std::string publishing_uri;
230 
237  std::chrono::system_clock::time_point
238  last_received;
239  uint64_t total_samples =
240  0;
241  uint64_t total_bytes =
242  0;
243  uint64_t total_latency = 0;
249  std::map<std::string, SubscriberType> subscriber_map;
250 
251  std::promise<void> exit_signal;
252  std::future<void> future_object;
253 
258  std::unique_ptr<mal::ps::InstancePublisher<datatransfer::NotificationSample>,
259  std::default_delete<mal::ps::InstancePublisher<
260  datatransfer::NotificationSample> > >
261  notifier;
262  std::shared_ptr<datatransfer::NotificationSample> ddt_notification_sample;
263 };
264 
265 } // namespace ddt
266 
267 #endif /* DDTDATACONSUMER_HPP_ */
ddt::DdtDataConsumer::get_publishing_uri
const std::string get_publishing_uri()
Definition: ddtDataConsumer.cpp:276
ddt::DdtStatistics
Definition: ddtStatistics.hpp:21
ddt::DdtDataConsumer::get_number_of_subscribers
int32_t get_number_of_subscribers()
Definition: ddtDataConsumer.cpp:226
ddt::DdtDataConsumer
Definition: ddtDataConsumer.hpp:50
ddt::DdtDataConsumer::set_remote_broker_uri
void set_remote_broker_uri(const std::string &remote_uri)
Definition: ddtDataConsumer.cpp:280
ddt::DdtDataConsumer::~DdtDataConsumer
virtual ~DdtDataConsumer()
ddt::DdtDataConsumer::LOCAL
@ LOCAL
Definition: ddtDataConsumer.hpp:88
slot_n
signal_n::slot_type slot_n
Definition: ddtDataConsumer.hpp:39
ddt::DdtDataConsumer::DdtDataConsumer
DdtDataConsumer(const std::string &data_stream_identifier, const int32_t latency, const int32_t deadline, DdtLogger *ddt_logger)
Definition: ddtDataConsumer.cpp:24
ddt::DdtLogger
Definition: ddtLogger.hpp:71
ddt::DdtMemoryAccessor
Definition: ddtMemoryAccessor.hpp:258
ddt
Definition: ddtClient.hpp:36
ddt::DdtDataConsumer::SubscriberType
SubscriberType
Definition: ddtDataConsumer.hpp:84
ddt::DdtDataConsumer::set_memory_accessor
void set_memory_accessor(DdtMemoryAccessor *memory_accessor)
Definition: ddtDataConsumer.cpp:288
ddt::DdtDataConsumer::REMOTE
@ REMOTE
Definition: ddtDataConsumer.hpp:92
ddt::DdtDataConsumer::StartSubscription
void StartSubscription()
Definition: ddtDataConsumer.cpp:300
ddt::DdtDataConsumer::set_notification_port
void set_notification_port(const int32_t noti_port)
Definition: ddtDataConsumer.cpp:292
ddt::DdtDataConsumer::set_publishing_uri
void set_publishing_uri(const std::string pub_uri)
Definition: ddtDataConsumer.cpp:296
ddt::DdtDataConsumer::get_remote_broker_uri
const std::string get_remote_broker_uri()
Definition: ddtDataConsumer.cpp:250
ddtProducerConsumerBase.hpp
ddt::DdtProducerConsumerBase::NotificationType
NotificationType
Definition: ddtProducerConsumerBase.hpp:58
ddt::DdtDataConsumer::ResetStatistics
void ResetStatistics()
Definition: ddtDataConsumer.cpp:270
ddt::DdtDataConsumer::get_notification_port
const int32_t get_notification_port()
Definition: ddtDataConsumer.cpp:254
ddt::DdtDataConsumer::notification_signal
signal_n notification_signal
Definition: ddtDataConsumer.hpp:180
ddt::DdtDataConsumer::StopSubscription
void StopSubscription()
Definition: ddtDataConsumer.cpp:309
ddt::DdtDataConsumer::AddUuid
void AddUuid(std::string uuid, SubscriberType type)
Definition: ddtDataConsumer.cpp:212
ddt::DdtDataConsumer::set_number_of_samples
void set_number_of_samples(const int32_t num_samples)
Definition: ddtDataConsumer.cpp:284
signal_n
boost::signals2::signal< void(datatransfer::NotificationType, const std::string &)> signal_n
Definition: ddtDataConsumer.hpp:34
ddt::DdtProducerConsumerBase
Definition: ddtProducerConsumerBase.hpp:39
ddt::DdtDataConsumer::get_statistics
DdtStatistics get_statistics()
Definition: ddtDataConsumer.cpp:258
ddt::DdtDataConsumer::RemoveUuid
void RemoveUuid(std::string uuid)
Definition: ddtDataConsumer.cpp:217
ddt::DdtDataConsumer::get_subscribers
std::map< std::string, SubscriberType > get_subscribers()
Definition: ddtDataConsumer.cpp:245
ddtStatistics.hpp
ddt::DdtDataConsumer::get_number_of_remote_subscribers
int32_t get_number_of_remote_subscribers()
Definition: ddtDataConsumer.cpp:231
ddt::DdtDataConsumer::Notify
void Notify(const NotificationType type) override
Definition: ddtDataConsumer.cpp:90