ddt  0.1
ddtConnectionManager.hpp
Go to the documentation of this file.
1 // @copyright
2 // (c) Copyright ESO 2020
3 // All Rights Reserved
4 // ESO (eso.org) is an Intergovernmental Organization, and therefore special
5 // legal conditions apply.
6 //
7 // @file ddtConnectionManager.hpp
8 // @brief Connection Manager.
9 //
10 // This class manages the connection handling between Data Brokers and
11 // Publisher / Subscriber applications.
12 //
13 // @author Matthias Grimm, CGI
14 // @since 2020/01/16
15 //
16 
17 #ifndef DDTCONNECTIONMANAGER_HPP_
18 #define DDTCONNECTIONMANAGER_HPP_
19 
20 #include <boost/bind.hpp>
21 #include <boost/property_tree/ini_parser.hpp>
22 #include <boost/property_tree/ptree.hpp>
23 #include <boost/signals2/signal.hpp>
24 #include <boost/bind/bind.hpp>
25 #include <chrono>
26 #include <memory>
27 
28 #include <mal/rr/ServerAmi.hpp>
29 #include <mal/rr/ServerContextProvider.hpp>
30 #include <mal/rr/qos/ReplyTime.hpp>
31 
32 #include "ddt/ddtClient.hpp"
33 #include "ddt/ddtDataConsumer.hpp"
34 #include "ddt/ddtDataProducer.hpp"
35 #include "ddt/ddtLogger.hpp"
36 #include "ddt/ddtMemoryManager.hpp"
37 
38 namespace mal = ::elt::mal;
39 namespace datatransfer = ::elt::ddt::datatransfer;
40 
41 namespace ddt {
42 
48  : public virtual datatransfer::DataBrokerRegistration {
49  public:
53  explicit DdtConnectionManager(DdtLogger* ddt_logger,
54  const std::string uri_string);
55 
59  explicit DdtConnectionManager(DdtMemoryManager* mmgr, DdtLogger* ddt_logger,
60  const std::string uri_string);
61 
65  virtual ~DdtConnectionManager();
66 
73  int32_t RegisterPublisher(const std::string& data_stream_identifier,
74  int32_t latency, int32_t deadline,
75  int32_t max_data_sample_size,
76  int32_t number_of_samples, bool compute_checksum,
77  const std::string& publishing_uri) override;
78 
83  int32_t UnregisterPublisher(
84  const std::string& data_stream_identifier) override;
85 
89  int32_t UnregisterPublishers();
90 
99  void PublishData(const std::string& data_stream_identifier) override;
100 
113  int32_t RegisterSubscriber(const std::string& subscriber_uuid,
114  const std::string& data_stream_identifier,
115  const std::string& remote_broker_uri,
116  int32_t latency, int32_t deadline) override;
117 
123  int32_t RegisterRemoteSubscriber(const std::string& remote_broker,
124  const std::string& subscriber_uuid,
125  const std::string& data_stream_identifier,
126  const int32_t latency,
127  const int32_t deadline) override;
128 
137  int32_t UnregisterSubscriber(const std::string& data_stream_identifier,
138  const std::string& subscriber_uuid) override;
139 
143  int32_t UnregisterSubscribers();
144 
148  int32_t get_max_data_sample_size(
149  const std::string& data_stream_identifier) override;
150 
154  int32_t get_number_of_samples(
155  const std::string& data_stream_identifier) override;
156 
160  std::string get_publishing_uri(
161  const std::string& data_stream_identifier) override;
162 
166  int32_t get_notification_port(
167  const std::string& data_stream_identifier) override;
168 
172  std::vector<std::string> get_statistics(
173  const std::string& data_stream_identifier) override;
174 
178  int32_t get_heartbeat_interval() override;
179 
183  int32_t get_heartbeat_timeout() override;
184 
188  bool get_compute_checksum(const std::string& data_stream_identifier) override;
189 
193  std::string get_shm_id(const std::string& data_stream_identifier) override;
194 
198  void UpdateHeartbeat(const std::string& identifier) override;
199 
207  const std::string& data_stream_identifier) override;
208 
212  bool CheckPublisherExists(const std::string& data_stream_identifier) override;
213 
218  const std::string& remote_broker_uri,
219  const std::string& data_stream_identifier) override;
220 
227  void UpdateStatistics(const std::string& data_stream_identifier,
228  const int32_t datavec_size,
229  const uint64_t source_timestamp) override;
230 
236  int32_t GetMaxPossibleBufferSize(int32_t max_data_sample_size) override;
237 
242  std::vector<std::string> GetRegisteredStreams() override;
243 
248  std::vector<std::string> GetConnectedBrokers() override;
249 
250  protected:
254  void LoadDefaults();
255 
261  const std::string GetConfigPath();
262 
266  void ReadIni();
267 
268  const int32_t SHM_TIMEOUT_DEFAULT = 10;
269  const int32_t WAITING_TIME_DEFAULT = 1000;
270  const int32_t REPLY_TIME_DEFAULT = 6;
271  const int32_t HEARTBEAT_INTERVAL_DEFAULT = 1;
272  const int32_t HEARTBEAT_TIMEOUT_DEFAULT = 10;
273 
274  int32_t shm_timeout; // configurable timeout after which a shared memory is
275  // deleted in [s]
276  int32_t waiting_time; // configurable waiting time for MAL publishers to
277  // establish communication in [ms]
278  int32_t reply_time; // configurable reply time for MAL clients in [s]
279  int32_t heartbeat_interval; // configurable interval for the heartbeat in [s]
280  int32_t heartbeat_timeout; // configurable timeout for the heartbeat in [s]
281 
282  private:
286  void Init(DdtMemoryManager* mmgr, DdtLogger* ddt_logger,
287  const std::string uri_string);
288 
292  void PrintConfigValues();
293 
297  void StartHeartbeat();
298 
302  void StopHeartbeat();
303 
309  void HeartbeatThread();
310 
314  int32_t RegisterLocalSubscriber(const std::string subscriber_uuid,
315  const std::string data_stream_identifier,
316  int32_t latency, int32_t deadline);
317 
321  int32_t RegisterLocalSubscriberRemote(
322  const std::string subscriber_uuid,
323  const std::string data_stream_identifier, std::string remote_broker_uri,
324  int32_t latency, int32_t deadline);
325 
330  void ProcessNotificationEvent(datatransfer::NotificationType type,
331  const std::string& data_stream_identifier);
332 
336  std::string CreateSubscriptionUri(std::string publishing_uri,
337  std::string remote_broker_uri);
338 
348  void FreeShmThread(const std::string& data_stream_identifier,
349  const int shm_timeout);
350 
357  bool CheckPublisherReregistration(const std::string& data_stream_identifier);
358 
366  bool CheckStreamIdInUse(const std::string& data_stream_identifier);
367 
377  bool CheckSharedMemoryRecreation(const std::string& data_stream_identifier,
378  const int32_t max_data_sample_size,
379  const int32_t number_of_samples);
380 
387  void CreateStatistics(const std::string& data_stream_identifier,
388  const int32_t number_of_samples);
389 
394  void ResetStatistics(const std::string& data_stream_identifier);
395 
396  std::map<std::string, DdtDataProducer*>
397  producer_map;
398  std::map<std::string, DdtDataConsumer*>
399  consumer_map;
400  std::map<std::string,
401  std::chrono::time_point<std::chrono::high_resolution_clock>>
402  client_map;
403  std::map<std::string, DdtStatistics>
404  statistics_map;
405  std::map<std::string, std::string>
406  connected_brokers_map;
408  DdtMemoryManager* memory_manager;
409 
410  std::map<std::string, DdtClient*>
411  ddt_clients;
413  std::atomic<bool> stop_threads;
414  std::atomic<int> thread_counter;
415 
416  std::mutex producer_mutex;
417  std::mutex consumer_mutex;
418  std::mutex client_mutex;
419  std::mutex statistics_mutex;
420  std::mutex ddt_clients_mutex;
421  std::mutex registered_publishers_mutex;
422  std::mutex connected_brokers_mutex;
423 
427  DdtLogger* logger;
428 
429  std::promise<void> exit_signal;
430  std::future<void> future_object;
431  std::atomic<bool> heartbeat_active;
432  std::set<std::string> registered_publishers;
433  std::string broker_uri;
434 
435  boost::signals2::connection connection;
436 
437  const int32_t SHM_TIMEOUT_MIN = 2;
438  const int32_t WAITING_TIME_MIN = 1000;
439  const int32_t REPLY_TIME_MIN = 2;
440  const int32_t HEARTBEAT_INTERVAL_MIN = 0;
441  const int32_t HEARTBEAT_TIMEOUT_MIN = 3;
442  const int32_t NUM_RETRIES = 30;
443 };
444 
445 } // namespace ddt
446 
447 #endif /* DDTCONNECTIONMANAGER_HPP_ */
ddt::DdtConnectionManager::RegisterPublisher
int32_t RegisterPublisher(const std::string &data_stream_identifier, int32_t latency, int32_t deadline, int32_t max_data_sample_size, int32_t number_of_samples, bool compute_checksum, const std::string &publishing_uri) override
Definition: ddtConnectionManager.cpp:295
ddt::DdtConnectionManager::UpdateStatistics
void UpdateStatistics(const std::string &data_stream_identifier, const int32_t datavec_size, const uint64_t source_timestamp) override
Definition: ddtConnectionManager.cpp:1374
ddt::DdtConnectionManager::GetMaxPossibleBufferSize
int32_t GetMaxPossibleBufferSize(int32_t max_data_sample_size) override
Definition: ddtConnectionManager.cpp:1436
ddt::DdtConnectionManager::get_max_data_sample_size
int32_t get_max_data_sample_size(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1213
ddt::DdtConnectionManager::UnregisterPublisher
int32_t UnregisterPublisher(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:440
ddt::DdtConnectionManager::CheckPublisherExists
bool CheckPublisherExists(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1161
ddt::DdtConnectionManager::shm_timeout
int32_t shm_timeout
Definition: ddtConnectionManager.hpp:274
ddt::DdtLogger
Definition: ddtLogger.hpp:71
ddt::DdtConnectionManager::UnregisterSubscriber
int32_t UnregisterSubscriber(const std::string &data_stream_identifier, const std::string &subscriber_uuid) override
Definition: ddtConnectionManager.cpp:1023
ddt::DdtConnectionManager::UnregisterSubscribers
int32_t UnregisterSubscribers()
Definition: ddtConnectionManager.cpp:1104
ddt
Definition: ddtClient.hpp:36
ddt::DdtConnectionManager::CheckPubRegistrationPermitted
bool CheckPubRegistrationPermitted(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1126
ddt::DdtConnectionManager::reply_time
int32_t reply_time
Definition: ddtConnectionManager.hpp:278
ddt::DdtConnectionManager::ReadIni
void ReadIni()
Definition: ddtConnectionManager.cpp:78
ddt::DdtConnectionManager::UpdateHeartbeat
void UpdateHeartbeat(const std::string &identifier) override
Definition: ddtConnectionManager.cpp:286
ddt::DdtConnectionManager::get_number_of_samples
int32_t get_number_of_samples(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1218
ddt::DdtMemoryManager
Definition: ddtMemoryManager.hpp:49
ddt::DdtConnectionManager::RegisterRemoteSubscriber
int32_t RegisterRemoteSubscriber(const std::string &remote_broker, const std::string &subscriber_uuid, const std::string &data_stream_identifier, const int32_t latency, const int32_t deadline) override
Definition: ddtConnectionManager.cpp:970
ddt::DdtConnectionManager::get_publishing_uri
std::string get_publishing_uri(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1223
ddt::DdtConnectionManager::heartbeat_timeout
int32_t heartbeat_timeout
Definition: ddtConnectionManager.hpp:280
ddt::DdtConnectionManager::get_shm_id
std::string get_shm_id(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1312
ddt::DdtConnectionManager::get_notification_port
int32_t get_notification_port(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1234
ddt::DdtConnectionManager
Definition: ddtConnectionManager.hpp:48
ddt::DdtConnectionManager::DdtConnectionManager
DdtConnectionManager(DdtLogger *ddt_logger, const std::string uri_string)
Definition: ddtConnectionManager.cpp:21
ddt::DdtConnectionManager::HEARTBEAT_INTERVAL_DEFAULT
const int32_t HEARTBEAT_INTERVAL_DEFAULT
Definition: ddtConnectionManager.hpp:271
ddt::DdtConnectionManager::PublishData
void PublishData(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:561
ddt::DdtConnectionManager::~DdtConnectionManager
virtual ~DdtConnectionManager()
Definition: ddtConnectionManager.cpp:37
ddt::DdtConnectionManager::HEARTBEAT_TIMEOUT_DEFAULT
const int32_t HEARTBEAT_TIMEOUT_DEFAULT
Definition: ddtConnectionManager.hpp:272
ddt::DdtConnectionManager::SHM_TIMEOUT_DEFAULT
const int32_t SHM_TIMEOUT_DEFAULT
Definition: ddtConnectionManager.hpp:268
ddt::DdtConnectionManager::UnregisterPublishers
int32_t UnregisterPublishers()
Definition: ddtConnectionManager.cpp:542
ddt::DdtConnectionManager::RegisterSubscriber
int32_t RegisterSubscriber(const std::string &subscriber_uuid, const std::string &data_stream_identifier, const std::string &remote_broker_uri, int32_t latency, int32_t deadline) override
Definition: ddtConnectionManager.cpp:643
ddt::DdtConnectionManager::get_statistics
std::vector< std::string > get_statistics(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1245
ddtMemoryManager.hpp
ddtDataConsumer.hpp
ddt::DdtConnectionManager::GetConfigPath
const std::string GetConfigPath()
Definition: ddtConnectionManager.cpp:64
ddtDataProducer.hpp
ddtLogger.hpp
ddt::DdtConnectionManager::REPLY_TIME_DEFAULT
const int32_t REPLY_TIME_DEFAULT
Definition: ddtConnectionManager.hpp:270
ddt::DdtConnectionManager::GetRegisteredStreams
std::vector< std::string > GetRegisteredStreams() override
Definition: ddtConnectionManager.cpp:1441
ddt::DdtConnectionManager::GetConnectedBrokers
std::vector< std::string > GetConnectedBrokers() override
Definition: ddtConnectionManager.cpp:1466
ddt::DdtConnectionManager::WAITING_TIME_DEFAULT
const int32_t WAITING_TIME_DEFAULT
Definition: ddtConnectionManager.hpp:269
ddtClient.hpp
ddt::DdtConnectionManager::heartbeat_interval
int32_t heartbeat_interval
Definition: ddtConnectionManager.hpp:279
ddt::DdtConnectionManager::CheckRemotePublisherExists
bool CheckRemotePublisherExists(const std::string &remote_broker_uri, const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1174
ddt::DdtConnectionManager::get_heartbeat_timeout
int32_t get_heartbeat_timeout() override
Definition: ddtConnectionManager.cpp:1301
ddt::DdtConnectionManager::get_compute_checksum
bool get_compute_checksum(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1305
ddt::DdtConnectionManager::LoadDefaults
void LoadDefaults()
Definition: ddtConnectionManager.cpp:56
ddt::DdtConnectionManager::waiting_time
int32_t waiting_time
Definition: ddtConnectionManager.hpp:276
ddt::DdtConnectionManager::get_heartbeat_interval
int32_t get_heartbeat_interval() override
Definition: ddtConnectionManager.cpp:1297