ddt  1.0.0
ddtConnectionManager.hpp
Go to the documentation of this file.
1 
20 #ifndef DDTCONNECTIONMANAGER_HPP_
21 #define DDTCONNECTIONMANAGER_HPP_
22 
23 #define BOOST_BIND_GLOBAL_PLACEHOLDERS
24 
25 #include <boost/bind/bind.hpp>
26 #include <boost/property_tree/ini_parser.hpp>
27 #include <boost/property_tree/ptree.hpp>
28 #include <boost/signals2/signal.hpp>
29 #include <boost/bind/bind.hpp>
30 #include <chrono>
31 #include <mal/rr/ServerAmi.hpp>
32 #include <mal/rr/ServerContextProvider.hpp>
33 #include <mal/rr/qos/ReplyTime.hpp>
34 #include <memory>
35 
36 #include "ddt/ddtClient.hpp"
37 #include "ddt/ddtDataConsumer.hpp"
38 #include "ddt/ddtDataProducer.hpp"
39 #include "ddt/ddtLogger.hpp"
40 #include "ddt/ddtMemoryManager.hpp"
41 
42 namespace mal = ::elt::mal;
43 namespace datatransfer = ::elt::ddt::datatransfer;
44 
45 namespace ddt {
46 
52  : public virtual datatransfer::DataBrokerRegistration {
53  public:
60  explicit DdtConnectionManager(DdtLogger* ddt_logger,
61  const std::string uri_string,
62  const std::string config);
63 
71  explicit DdtConnectionManager(DdtMemoryManager* const mmgr,
72  DdtLogger* ddt_logger,
73  const std::string uri_string,
74  const std::string config);
75 
79  ~DdtConnectionManager() override;
80 
98  int32_t RegisterPublisher(const std::string& data_stream_identifier,
99  const int32_t latency, int32_t deadline,
100  const int32_t max_data_sample_size,
101  const int32_t number_of_samples,
102  const bool compute_checksum,
103  const std::string& publishing_uri) override;
104 
111  int32_t UnregisterPublisher(
112  const std::string& data_stream_identifier) override;
113 
118  int32_t UnregisterPublishers();
119 
129  void PublishData(const std::string& data_stream_identifier) override;
130 
151  int32_t RegisterSubscriber(const std::string& subscriber_uuid,
152  const std::string& data_stream_identifier,
153  const std::string& remote_broker_uri,
154  const int32_t latency,
155  const int32_t deadline) override;
156 
170  int32_t RegisterRemoteSubscriber(const std::string& remote_broker,
171  const std::string& subscriber_uuid,
172  const std::string& data_stream_identifier,
173  const int32_t latency,
174  const int32_t deadline) override;
175 
187  int32_t UnregisterSubscriber(const std::string& data_stream_identifier,
188  const std::string& subscriber_uuid) override;
189 
194  int32_t UnregisterSubscribers();
195 
201  int32_t get_max_data_sample_size(
202  const std::string& data_stream_identifier) override;
203 
209  int32_t get_number_of_samples(
210  const std::string& data_stream_identifier) override;
211 
217  std::string get_publishing_uri(
218  const std::string& data_stream_identifier) override;
219 
225  int32_t get_notification_port(
226  const std::string& data_stream_identifier) override;
227 
233  std::vector<std::string> get_statistics(
234  const std::string& data_stream_identifier) override;
235 
240  int32_t get_heartbeat_interval() override;
241 
246  int32_t get_heartbeat_timeout() override;
247 
253  bool get_compute_checksum(const std::string& data_stream_identifier) override;
254 
260  std::string get_shm_id(const std::string& data_stream_identifier) override;
261 
266  std::string get_broker_uri() override;
267 
272  void UpdateHeartbeat(const std::string& identifier) override;
273 
283  const std::string& data_stream_identifier) override;
284 
291  bool CheckPublisherExists(const std::string& data_stream_identifier) override;
292 
301  const std::string& remote_broker_uri,
302  const std::string& data_stream_identifier) override;
303 
310  void UpdateStatistics(const std::string& data_stream_identifier,
311  const int32_t datavec_size,
312  const uint64_t source_timestamp) override;
313 
319  int32_t GetMaxPossibleBufferSize(int32_t max_data_sample_size) override;
320 
325  std::vector<std::string> GetRegisteredStreams() override;
326 
331  std::vector<std::string> GetConnectedBrokers() override;
332 
333  protected:
337  void LoadDefaults();
338 
344  std::string GetConfigPath() const;
345 
349  void ReadIni();
350 
357  std::string CreateSubscriptionUri(const std::string publishing_uri,
358  const std::string remote_broker_uri) const;
359 
367  bool CheckStreamIdInUse(const std::string& data_stream_identifier);
368 
375  void CreateStatistics(const std::string& data_stream_identifier,
376  const int32_t number_of_samples);
377 
382  void ResetStatistics(const std::string& data_stream_identifier);
383 
389  const int32_t SHM_TIMEOUT_DEFAULT = 10;
390 
395  const int32_t WAITING_TIME_DEFAULT = 1000;
396 
401  const int32_t REPLY_TIME_DEFAULT = 6;
402 
407  const int32_t HEARTBEAT_INTERVAL_DEFAULT = 1;
408 
414  const int32_t HEARTBEAT_TIMEOUT_DEFAULT = 10;
415 
419  int32_t shm_timeout;
420 
424  int32_t waiting_time;
425 
429  int32_t reply_time;
430 
435 
440 
445  std::set<std::string> registered_publishers;
446 
452  std::map<std::string, DdtStatistics> statistics_map;
453 
454  private:
462  void Init(DdtMemoryManager* mmgr, DdtLogger* ddt_logger,
463  const std::string uri_string, const std::string config);
464 
468  void PrintConfigValues();
469 
473  void StartHeartbeat();
474 
478  void StopHeartbeat();
479 
485  void HeartbeatThread();
486 
492  void NotifySubscribers(const std::string& data_stream_identifier,
493  std::unique_lock<std::mutex>& producer_lock);
494 
502  int32_t CheckPublisherUsingStream(const std::string data_stream_identifier,
503  const std::string remote_broker_uri);
504 
520  void CreateDataConsumer(std::unique_lock<std::mutex>& consumer_lock,
521  const std::string subscriber_uuid,
522  const std::string data_stream_identifier,
523  const std::string remote_broker_uri,
524  const std::string originating_broker,
525  const int32_t latency, const int32_t deadline,
526  const std::string subscription_uri,
527  const int32_t number_of_samples);
528 
534  void SearchAndUnregSubscriber(const std::string identifier);
535 
545  int32_t RegisterLocalSubscriber(const std::string subscriber_uuid,
546  const std::string data_stream_identifier,
547  const int32_t latency,
548  const int32_t deadline);
549 
560  int32_t RegisterLocalSubscriberRemote(
561  const std::string subscriber_uuid,
562  const std::string data_stream_identifier,
563  const std::string remote_broker_uri, const int32_t latency,
564  const int32_t deadline);
565 
572  void ProcessNotificationEvent(const datatransfer::NotificationType type,
573  const std::string& data_stream_identifier);
574 
584  void FreeShmThread(const std::string& data_stream_identifier,
585  const int shm_timeout);
586 
593  bool CheckPublisherReregistration(const std::string& data_stream_identifier);
594 
604  bool CheckSharedMemoryRecreation(const std::string& data_stream_identifier,
605  const int32_t max_data_sample_size,
606  const int32_t number_of_samples);
607 
612  void Publish(const std::string& data_stream_identifier);
613 
618  void PubRegNotification(const std::string& data_stream_identifier);
619 
624  void PubUnregNotification(const std::string& data_stream_identifier);
625 
631  std::map<std::string, DdtDataProducer*> producer_map;
632 
638  std::map<std::string, DdtDataConsumer*> consumer_map;
639 
646  std::map<std::string,
647  std::chrono::time_point<std::chrono::high_resolution_clock>>
648  client_map;
649 
655  std::map<std::string, std::string> connected_brokers_map;
656 
660  DdtMemoryManager* memory_manager;
661 
667  std::map<std::string, DdtClient*> ddt_clients;
668 
672  std::atomic<bool> stop_threads;
673 
677  std::atomic<int> thread_counter;
678 
682  std::mutex producer_mutex;
683 
687  std::mutex consumer_mutex;
688 
692  std::mutex client_mutex;
693 
697  std::mutex statistics_mutex;
698 
702  std::mutex ddt_clients_mutex;
703 
707  std::mutex registered_publishers_mutex;
708 
712  std::mutex connected_brokers_mutex;
713 
717  DdtLogger* logger;
718 
722  std::promise<void> exit_signal;
723 
727  std::future<void> future_object;
728 
732  std::atomic<bool> heartbeat_active;
733 
737  std::string broker_uri;
738 
742  std::string config_file;
743 
747  boost::signals2::connection connection;
748 
752  const int32_t SHM_TIMEOUT_MIN = 2;
753 
757  const int32_t WAITING_TIME_MIN = 1000;
758 
762  const int32_t REPLY_TIME_MIN = 2;
763 
767  const int32_t HEARTBEAT_INTERVAL_MIN = 0;
768 
772  const int32_t HEARTBEAT_TIMEOUT_MIN = 3;
773 
777  const int32_t NUM_RETRIES = 30;
778 };
779 
780 } // namespace ddt
781 
782 #endif /* DDTCONNECTIONMANAGER_HPP_ */
783 
Definition: ddtConnectionManager.hpp:52
int32_t get_heartbeat_timeout() override
Definition: ddtConnectionManager.cpp:1482
int32_t waiting_time
Definition: ddtConnectionManager.hpp:424
std::string GetConfigPath() const
Definition: ddtConnectionManager.cpp:86
int32_t RegisterRemoteSubscriber(const std::string &remote_broker, const std::string &subscriber_uuid, const std::string &data_stream_identifier, const int32_t latency, const int32_t deadline) override
Definition: ddtConnectionManager.cpp:1135
const int32_t REPLY_TIME_DEFAULT
Definition: ddtConnectionManager.hpp:401
std::string CreateSubscriptionUri(const std::string publishing_uri, const std::string remote_broker_uri) const
Definition: ddtConnectionManager.cpp:1500
int32_t get_heartbeat_interval() override
Definition: ddtConnectionManager.cpp:1478
int32_t UnregisterSubscribers()
Definition: ddtConnectionManager.cpp:1273
void UpdateHeartbeat(const std::string &identifier) override
Definition: ddtConnectionManager.cpp:353
bool get_compute_checksum(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1486
const int32_t WAITING_TIME_DEFAULT
Definition: ddtConnectionManager.hpp:395
std::string get_broker_uri() override
Definition: ddtConnectionManager.cpp:1498
std::map< std::string, DdtStatistics > statistics_map
Definition: ddtConnectionManager.hpp:452
int32_t UnregisterPublisher(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:524
int32_t RegisterPublisher(const std::string &data_stream_identifier, const int32_t latency, int32_t deadline, const int32_t max_data_sample_size, const int32_t number_of_samples, const bool compute_checksum, const std::string &publishing_uri) override
Definition: ddtConnectionManager.cpp:362
const int32_t HEARTBEAT_TIMEOUT_DEFAULT
Definition: ddtConnectionManager.hpp:414
std::vector< std::string > GetRegisteredStreams() override
Definition: ddtConnectionManager.cpp:1633
void UpdateStatistics(const std::string &data_stream_identifier, const int32_t datavec_size, const uint64_t source_timestamp) override
Definition: ddtConnectionManager.cpp:1566
std::string get_publishing_uri(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1401
bool CheckRemotePublisherExists(const std::string &remote_broker_uri, const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1352
void CreateStatistics(const std::string &data_stream_identifier, const int32_t number_of_samples)
Definition: ddtConnectionManager.cpp:1588
void PublishData(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:652
bool CheckPubRegistrationPermitted(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1304
std::vector< std::string > get_statistics(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1425
void ReadIni()
Definition: ddtConnectionManager.cpp:103
~DdtConnectionManager() override
Definition: ddtConnectionManager.cpp:42
void LoadDefaults()
Definition: ddtConnectionManager.cpp:78
int32_t GetMaxPossibleBufferSize(int32_t max_data_sample_size) override
Definition: ddtConnectionManager.cpp:1628
const int32_t HEARTBEAT_INTERVAL_DEFAULT
Definition: ddtConnectionManager.hpp:407
bool CheckPublisherExists(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1339
const int32_t SHM_TIMEOUT_DEFAULT
Definition: ddtConnectionManager.hpp:389
int32_t UnregisterSubscriber(const std::string &data_stream_identifier, const std::string &subscriber_uuid) override
Definition: ddtConnectionManager.cpp:1189
int32_t shm_timeout
Definition: ddtConnectionManager.hpp:419
int32_t heartbeat_interval
Definition: ddtConnectionManager.hpp:434
int32_t get_notification_port(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1413
int32_t heartbeat_timeout
Definition: ddtConnectionManager.hpp:439
int32_t UnregisterPublishers()
Definition: ddtConnectionManager.cpp:628
int32_t reply_time
Definition: ddtConnectionManager.hpp:429
std::vector< std::string > GetConnectedBrokers() override
Definition: ddtConnectionManager.cpp:1658
std::string get_shm_id(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1493
bool CheckStreamIdInUse(const std::string &data_stream_identifier)
Definition: ddtConnectionManager.cpp:1323
int32_t get_number_of_samples(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1396
int32_t RegisterSubscriber(const std::string &subscriber_uuid, const std::string &data_stream_identifier, const std::string &remote_broker_uri, const int32_t latency, const int32_t deadline) override
Definition: ddtConnectionManager.cpp:742
std::set< std::string > registered_publishers
Definition: ddtConnectionManager.hpp:445
DdtConnectionManager(DdtLogger *ddt_logger, const std::string uri_string, const std::string config)
Definition: ddtConnectionManager.cpp:24
void ResetStatistics(const std::string &data_stream_identifier)
Definition: ddtConnectionManager.cpp:1614
int32_t get_max_data_sample_size(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1391
Definition: ddtLogger.hpp:51
Definition: ddtMemoryManager.hpp:72
Client class for the connection to remote brokers. This class creates MAL clients to connect to remot...
Data Consumer. This class provides the functionality to subscribe to a data stream,...
Data Producer. This class provides the functionality to publish data over network and enables sending...
Class to wrap the usage of log4cplus as logging utility. This file provides a wrapper class for the u...
Manager for shared memories. This class manages the handling of shared memories.
Definition: ddtClient.hpp:39