ddt  0.1
ddtDataSubscriber.hpp
Go to the documentation of this file.
1 // @copyright
2 // (c) Copyright ESO 2020
3 // All Rights Reserved
4 // ESO (eso.org) is an Intergovernmental Organization, and therefore special
5 // legal conditions apply.
6 //
7 // @file ddtDataSubscriber.hpp
8 // @brief Data Subscriber.
9 //
10 // This class provides the functionality for subscriber applications to register
11 // / unregister at a local broker and to receive data for a specified data
12 // stream.
13 //
14 // @author Matthias Grimm, CGI
15 // @since 2020/01/16
16 //
17 
18 #ifndef DDTDATASUBSCRIBER_HPP_
19 #define DDTDATASUBSCRIBER_HPP_
20 
21 #include <boost/bind.hpp>
22 #include <boost/uuid/uuid.hpp>
23 #include <boost/uuid/uuid_generators.hpp>
24 #include <boost/uuid/uuid_io.hpp>
27 
28 namespace ddt {
29 
36  public:
41 
45  virtual ~DdtDataSubscriber();
46 
47  int RegisterSubscriber(std::string broker_uri,
48  std::string data_stream_identifier,
49  std::string remote_broker_uri,
50  int32_t reading_interval = 10) override;
51 
52  int UnregisterSubscriber() override;
53 
54  DataSample *ReadData() override;
55 
60 
65 
70 
75  boost::signals2::connection connect(
76  const signal_t::slot_type &event_listener);
77 
78  protected:
82  void LoadDefaults();
83 
87  void ReadIni();
88 
92  const int32_t MAX_AGE_DATA_SAMPLE_DEFAULT = 10000;
93 
94  private:
98  void Init();
99 
103  void PrintConfigValues();
104 
108  void InitializeNotificationSubscriber(std::string data_stream_identifier,
109  int32_t notification_port);
110 
115  void Subscribe();
116 
121  void Reregister();
122 
129  void NotificationEvent(
130  const mal::ps::DataEvent<datatransfer::NotificationSample> &event);
131 
135  int CheckPublisher();
136 
137  DdtStatisticsClient *statistics_client;
138  DdtMemoryAccessor *memory_accessor;
139  std::string shm_id;
140  std::string data_stream_identifier;
141  std::string subscriber_uuid;
142  std::string remote_broker_uri;
143  std::atomic<bool> event_active;
144 
145  std::unique_ptr<mal::ps::Subscriber<datatransfer::NotificationSample>,
146  std::default_delete<
147  mal::ps::Subscriber<datatransfer::NotificationSample> > >
148  notification_subscriber;
149  std::shared_ptr<datatransfer::NotificationSample> ddt_key_notification;
150  std::shared_ptr<datatransfer::NotificationSample> ddt_notification;
151  mal::ps::DataEventFilter<datatransfer::NotificationSample> filter;
152 
153  std::promise<void> exit_signal;
154  std::future<void> future_object;
155 
156  const int32_t NUM_RETRIES = 10;
157  const int32_t MAX_AGE_DATA_SAMPLE_MIN = 2000;
158 };
159 
160 } // namespace ddt
161 
162 #endif /* DDTDATASUBSCRIBER_HPP_ */
ddt::DdtStatistics
Definition: ddtStatistics.hpp:21
ddt::DdtDataSubscriber::DdtDataSubscriber
DdtDataSubscriber(DdtLogger *logger)
Definition: ddtDataSubscriber.cpp:22
ddt::DdtDataSubscriber::UnregisterSubscriber
int UnregisterSubscriber() override
Definition: ddtDataSubscriber.cpp:338
ddt::DdtLogger
Definition: ddtLogger.hpp:71
ddt::DdtMemoryAccessor
Definition: ddtMemoryAccessor.hpp:258
ddt
Definition: ddtClient.hpp:36
ddtStatisticsClient.hpp
ddt::DdtDataSubscriber::max_age_data_sample
int32_t max_age_data_sample
Definition: ddtDataSubscriber.hpp:89
ddt::DdtDataSubscriber::StartNotificationSubscription
void StartNotificationSubscription()
Definition: ddtDataSubscriber.cpp:454
ddt::DdtDataSubscriber::MAX_AGE_DATA_SAMPLE_DEFAULT
const int32_t MAX_AGE_DATA_SAMPLE_DEFAULT
Definition: ddtDataSubscriber.hpp:92
ddt::DataSample
Definition: ddtMemoryAccessor.hpp:174
ddt::DdtDataSubscriber::LoadDefaults
void LoadDefaults()
Definition: ddtDataSubscriber.cpp:48
ddt::DdtDataSubscriber::RegisterSubscriber
int RegisterSubscriber(std::string broker_uri, std::string data_stream_identifier, std::string remote_broker_uri, int32_t reading_interval=10) override
Definition: ddtDataSubscriber.cpp:105
ddt::DdtDataSubscriber::ReadData
DataSample * ReadData() override
Definition: ddtDataSubscriber.cpp:392
ddtDataTransferLib.hpp
ddt::DdtDataSubscriber::get_statistics
DdtStatistics get_statistics()
Definition: ddtDataSubscriber.cpp:374
ddt::DdtDataSubscriber::StopNotificationSubscription
void StopNotificationSubscription()
Definition: ddtDataSubscriber.cpp:464
ddt::DdtDataSubscriber::connect
boost::signals2::connection connect(const signal_t::slot_type &event_listener)
Definition: ddtDataSubscriber.cpp:333
ddt::DdtDataTransferLib::logger
DdtLogger * logger
Definition: ddtDataTransferLib.hpp:172
ddt::DdtStatisticsClient
Definition: ddtStatisticsClient.hpp:32
ddt::DdtDataSubscriber::~DdtDataSubscriber
virtual ~DdtDataSubscriber()
Definition: ddtDataSubscriber.cpp:34
ddt::DdtDataTransferLib
Definition: ddtDataTransferLib.hpp:39
ddt::DdtDataSubscriber
Definition: ddtDataSubscriber.hpp:35
ddt::DdtDataSubscriber::ReadIni
void ReadIni()
Definition: ddtDataSubscriber.cpp:53