ddt  1.0.0
ddtDataSubscriber.hpp
Go to the documentation of this file.
1 
21 #ifndef DDTDATASUBSCRIBER_HPP_
22 #define DDTDATASUBSCRIBER_HPP_
23 
24 #define BOOST_BIND_GLOBAL_PLACEHOLDERS
25 
26 #include <boost/bind.hpp>
27 #include <boost/uuid/uuid.hpp>
28 #include <boost/uuid/uuid_generators.hpp>
29 #include <boost/uuid/uuid_io.hpp>
31 
33 
34 namespace ddt {
35 
42  public:
48 
54  explicit DdtDataSubscriber(log4cplus::Logger const &log4cplus_logger);
55 
59  ~DdtDataSubscriber() override;
60 
61  int RegisterSubscriber(const std::string uri, const std::string dsi,
62  const std::string remote_uri,
63  const int32_t interval = 10) override;
64 
65  int UnregisterSubscriber() override;
66 
67  DataSample *ReadData() override;
68 
73 
78 
84 
90  boost::signals2::connection connect(const SignalT::slot_type &event_listener);
91 
92  protected:
96  void LoadDefaults();
97 
101  void ReadIni();
102 
107 
111  const int32_t MAX_AGE_DATA_SAMPLE_DEFAULT = 10000;
112 
113  private:
117  void Init();
118 
122  void PrintConfigValues();
123 
127  bool CheckPath();
128 
132  void InitializeNotificationSubscriber(
133  const std::string data_stream_identifier,
134  const int32_t notification_port);
135 
140  void Subscribe();
141 
146  void Reregister();
147 
154  void NotificationEvent(
155  const mal::ps::DataEvent<datatransfer::NotificationSample> &event);
156 
160  int CheckPublisher();
161 
165  int CreateAccessor();
166 
170  void ReopenShm();
171 
172  void LogSubscriberParameter();
173 
174  void PrintErrorMessage(const int result);
175 
176  DdtStatisticsClient *statistics_client;
177  DdtMemoryAccessor *memory_accessor;
178  std::string shm_id;
179  std::string data_stream_identifier;
180  std::string subscriber_uuid;
181  std::string broker_uri;
182  std::string remote_broker_uri;
183  int32_t reading_interval;
184  std::atomic<bool> event_active;
185 
186  std::unique_ptr<mal::ps::Subscriber<datatransfer::NotificationSample>,
187  std::default_delete<
188  mal::ps::Subscriber<datatransfer::NotificationSample> > >
189  notification_subscriber;
190  std::shared_ptr<datatransfer::NotificationSample> ddt_key_notification;
191  std::shared_ptr<datatransfer::NotificationSample> ddt_notification;
192  mal::ps::DataEventFilter<datatransfer::NotificationSample> filter;
193 
194  std::promise<void> exit_signal;
195  std::future<void> future_object;
196 
197  const int32_t NUM_RETRIES = 10;
198  const int32_t MAX_AGE_DATA_SAMPLE_MIN = 2000;
199 };
200 
201 } // namespace ddt
202 
203 #endif /* DDTDATASUBSCRIBER_HPP_ */
204 
Definition: ddtDataSubscriber.hpp:41
DataSample * ReadData() override
Definition: ddtDataSubscriber.cpp:423
void ReadIni()
Definition: ddtDataSubscriber.cpp:63
void StartNotificationSubscription()
Definition: ddtDataSubscriber.cpp:484
void StopNotificationSubscription()
Definition: ddtDataSubscriber.cpp:494
boost::signals2::connection connect(const SignalT::slot_type &event_listener)
Definition: ddtDataSubscriber.cpp:360
const int32_t MAX_AGE_DATA_SAMPLE_DEFAULT
Definition: ddtDataSubscriber.hpp:111
~DdtDataSubscriber() override
Definition: ddtDataSubscriber.cpp:42
DdtStatistics get_statistics()
Definition: ddtDataSubscriber.cpp:403
int RegisterSubscriber(const std::string uri, const std::string dsi, const std::string remote_uri, const int32_t interval=10) override
Definition: ddtDataSubscriber.cpp:145
int32_t max_age_data_sample
Definition: ddtDataSubscriber.hpp:106
int UnregisterSubscriber() override
Definition: ddtDataSubscriber.cpp:365
void LoadDefaults()
Definition: ddtDataSubscriber.cpp:57
DdtDataSubscriber(DdtLogger *logger)
Definition: ddtDataSubscriber.cpp:26
Definition: ddtDataTransferLib.hpp:42
DdtLogger * logger
Definition: ddtDataTransferLib.hpp:234
Definition: ddtLogger.hpp:51
Definition: ddtMemoryAccessor.hpp:274
Definition: ddtStatisticsClient.hpp:35
Base class for DdtDataPublishers and DdtDataSubscribers. This is the base class for DdtDataPublishers...
Class for providing statistics. This class provides an API to query the statistics from a broker.
Definition: ddtClient.hpp:39
Definition: ddtMemoryAccessor.hpp:184
Definition: ddtStatistics.hpp:27