ddt  0.1
ddtClient.hpp
Go to the documentation of this file.
1 // @copyright
2 // (c) Copyright ESO 2020
3 // All Rights Reserved
4 // ESO (eso.org) is an Intergovernmental Organization, and therefore special
5 // legal conditions apply.
6 //
7 // @file ddtClient.hpp
8 // @brief Client class for the connection to remote brokers.
9 //
10 // This class creates MAL clients to connect to remote brokers. It provides a
11 // connection listener which observes the connection state and reregisters
12 // subscribers in the case a broker was restarted. It also provides the
13 // heartbeat functionality.
14 //
15 // @author Matthias Grimm, CGI
16 // @since 2020/11/18
17 //
18 
19 #ifndef DDTCLIENT_HPP_
20 #define DDTCLIENT_HPP_
21 
22 #include <Ddtdatatransfericd.hpp>
23 #include <future>
24 #include <mal/Cii.hpp>
25 #include <mal/rr/qos/ReplyTime.hpp>
26 #include <mal/utility/LoadMal.hpp>
27 #include <map>
28 #include <thread>
29 
30 #include "ddt/ddtConstants.hpp"
31 #include "ddt/ddtLogger.hpp"
32 
33 namespace mal = ::elt::mal;
34 namespace datatransfer = ::elt::ddt::datatransfer;
35 
36 namespace ddt {
37 
44 class DdtClient {
45  public:
49  DdtClient(const std::string remote_broker, const int32_t repl_time,
50  const int32_t hb_interval, const int32_t hb_timeout,
51  const std::string broker, DdtLogger* ddt_logger);
52 
56  virtual ~DdtClient();
57 
61  void AddUuid(std::string uuid, std::string dsi);
62 
67  void UnregisterSubscriber(std::string uuid);
68 
73  bool CheckIfEmpty();
74 
79  bool CheckPublisherExists(const std::string& data_stream_identifier);
80 
85  int32_t RegisterRemoteSubscriber(const std::string& subscriber_uuid,
86  const std::string& data_stream_identifier,
87  const int32_t latency,
88  const int32_t deadline);
89 
94  std::string GetPublishingUri(const std::string& data_stream_identifier);
95 
100  int32_t get_max_data_sample_size(const std::string& data_stream_identifier);
101 
106  int32_t get_number_of_samples(const std::string& data_stream_identifier);
107 
111  bool get_compute_checksum(const std::string& data_stream_identifier);
112 
113  private:
117  void Init(const std::string remote_broker, const int32_t repl_time,
118  const int32_t hb_interval, const int32_t hb_timeout,
119  const std::string broker, DdtLogger* ddt_logger);
120 
124  int32_t InitMalClient();
125 
129  void StartHeartbeat();
130 
134  void StopHeartbeat();
135 
139  void HeartbeatThread();
140 
145  void Reregister();
146 
150  std::unique_ptr<
151  datatransfer::DataBrokerRegistrationSync,
152  std::default_delete<datatransfer::DataBrokerRegistrationSync> >
153  client;
154 
158  elt::mal::rr::ListenerRegistration connection_listener;
159 
163  std::atomic<bool> connected_to_broker;
164 
169  std::map<std::string, std::string>
170  subscriber_map;
172  std::promise<void> exit_signal;
173  std::future<void> future_object;
174  std::atomic<bool> heartbeat_active;
175  std::string remote_broker_uri;
176  std::string broker_uri;
177  int32_t reply_time;
178  int32_t heartbeat_interval;
179  int32_t heartbeat_timeout;
180  std::mutex subscriber_mutex;
181  DdtLogger* logger;
182 
183  const int32_t NUM_RETRIES = 10;
184  const int32_t LATENCY = 10000;
185  const int32_t DEADLINE = 10;
186 };
187 
188 } // namespace ddt
189 
190 #endif /* DDTCLIENT_HPP_ */
ddt::DdtClient::DdtClient
DdtClient(const std::string remote_broker, const int32_t repl_time, const int32_t hb_interval, const int32_t hb_timeout, const std::string broker, DdtLogger *ddt_logger)
Definition: ddtClient.cpp:23
ddt::DdtClient::get_number_of_samples
int32_t get_number_of_samples(const std::string &data_stream_identifier)
Definition: ddtClient.cpp:267
ddt::DdtClient::get_max_data_sample_size
int32_t get_max_data_sample_size(const std::string &data_stream_identifier)
Definition: ddtClient.cpp:262
ddt::DdtLogger
Definition: ddtLogger.hpp:71
ddtConstants.hpp
ddt
Definition: ddtClient.hpp:36
ddt::DdtClient::GetPublishingUri
std::string GetPublishingUri(const std::string &data_stream_identifier)
Definition: ddtClient.cpp:257
ddt::DdtClient::~DdtClient
virtual ~DdtClient()
Definition: ddtClient.cpp:34
ddt::DdtClient::UnregisterSubscriber
void UnregisterSubscriber(std::string uuid)
Definition: ddtClient.cpp:161
ddt::DdtClient::CheckIfEmpty
bool CheckIfEmpty()
Definition: ddtClient.cpp:238
ddt::DdtClient::CheckPublisherExists
bool CheckPublisherExists(const std::string &data_stream_identifier)
Definition: ddtClient.cpp:243
ddt::DdtClient::get_compute_checksum
bool get_compute_checksum(const std::string &data_stream_identifier)
Definition: ddtClient.cpp:272
ddtLogger.hpp
ddt::DdtClient
Definition: ddtClient.hpp:44
ddt::DdtClient::AddUuid
void AddUuid(std::string uuid, std::string dsi)
Definition: ddtClient.cpp:109
ddt::DdtClient::RegisterRemoteSubscriber
int32_t RegisterRemoteSubscriber(const std::string &subscriber_uuid, const std::string &data_stream_identifier, const int32_t latency, const int32_t deadline)
Definition: ddtClient.cpp:248