ddt 1.1.0
Loading...
Searching...
No Matches
ddtConnectionManager.hpp
Go to the documentation of this file.
1
20#ifndef DDTCONNECTIONMANAGER_HPP_
21#define DDTCONNECTIONMANAGER_HPP_
22
23#define BOOST_BIND_GLOBAL_PLACEHOLDERS
24
25#include <boost/bind/bind.hpp>
26#include <boost/property_tree/ini_parser.hpp>
27#include <boost/property_tree/ptree.hpp>
28#include <boost/signals2/signal.hpp>
29#include <boost/bind/bind.hpp>
30#include <chrono>
31#include <mal/rr/ServerAmi.hpp>
32#include <mal/rr/ServerContextProvider.hpp>
33#include <mal/rr/qos/ReplyTime.hpp>
34#include <memory>
35
36#include "ddt/ddtClient.hpp"
39#include "ddt/ddtLogger.hpp"
41
42namespace mal = ::elt::mal;
43namespace datatransfer = ::elt::ddt::datatransfer;
44
45namespace ddt {
46
52 : public virtual datatransfer::DataBrokerRegistration {
53 public:
60 explicit DdtConnectionManager(DdtLogger* ddt_logger,
61 const std::string uri_string,
62 const std::string config);
63
71 explicit DdtConnectionManager(DdtMemoryManager* const mmgr,
72 DdtLogger* ddt_logger,
73 const std::string uri_string,
74 const std::string config);
75
79 ~DdtConnectionManager() override;
80
98 int32_t RegisterPublisher(const std::string& data_stream_identifier,
99 const int32_t latency, int32_t deadline,
100 const int32_t max_data_sample_size,
101 const int32_t number_of_samples,
102 const bool compute_checksum,
103 const std::string& publishing_uri) override;
104
111 int32_t UnregisterPublisher(
112 const std::string& data_stream_identifier) override;
113
118 int32_t UnregisterPublishers();
119
129 void PublishData(const std::string& data_stream_identifier) override;
130
151 int32_t RegisterSubscriber(const std::string& subscriber_uuid,
152 const std::string& data_stream_identifier,
153 const std::string& remote_broker_uri,
154 const int32_t latency,
155 const int32_t deadline) override;
156
170 int32_t RegisterRemoteSubscriber(const std::string& remote_broker,
171 const std::string& subscriber_uuid,
172 const std::string& data_stream_identifier,
173 const int32_t latency,
174 const int32_t deadline) override;
175
187 int32_t UnregisterSubscriber(const std::string& data_stream_identifier,
188 const std::string& subscriber_uuid) override;
189
194 int32_t UnregisterSubscribers();
195
202 const std::string& data_stream_identifier) override;
203
209 int32_t get_number_of_samples(
210 const std::string& data_stream_identifier) override;
211
217 std::string get_publishing_uri(
218 const std::string& data_stream_identifier) override;
219
225 int32_t get_notification_port(
226 const std::string& data_stream_identifier) override;
227
233 std::vector<std::string> get_statistics(
234 const std::string& data_stream_identifier) override;
235
240 int32_t get_heartbeat_interval() override;
241
246 int32_t get_heartbeat_timeout() override;
247
253 bool get_compute_checksum(const std::string& data_stream_identifier) override;
254
260 std::string get_shm_id(const std::string& data_stream_identifier) override;
261
266 std::string get_broker_uri() override;
267
272 void UpdateHeartbeat(const std::string& identifier) override;
273
283 const std::string& data_stream_identifier) override;
284
291 bool CheckPublisherExists(const std::string& data_stream_identifier) override;
292
301 const std::string& remote_broker_uri,
302 const std::string& data_stream_identifier) override;
303
310 void UpdateStatistics(const std::string& data_stream_identifier,
311 const int32_t datavec_size,
312 const uint64_t source_timestamp) override;
313
319 int32_t GetMaxPossibleBufferSize(int32_t max_data_sample_size) override;
320
325 std::vector<std::string> GetRegisteredStreams() override;
326
331 std::vector<std::string> GetConnectedBrokers() override;
332
333 protected:
337 void LoadDefaults();
338
344 std::string GetConfigPath() const;
345
349 void ReadIni();
350
357 std::string CreateSubscriptionUri(const std::string publishing_uri,
358 const std::string remote_broker_uri) const;
359
367 bool CheckStreamIdInUse(const std::string& data_stream_identifier);
368
375 void CreateStatistics(const std::string& data_stream_identifier,
376 const int32_t number_of_samples);
377
382 void ResetStatistics(const std::string& data_stream_identifier);
383
389 const int32_t SHM_TIMEOUT_DEFAULT = 10;
390
395 const int32_t WAITING_TIME_DEFAULT = 1000;
396
401 const int32_t REPLY_TIME_DEFAULT = 6;
402
407 const int32_t HEARTBEAT_INTERVAL_DEFAULT = 1;
408
414 const int32_t HEARTBEAT_TIMEOUT_DEFAULT = 10;
415
419 int32_t shm_timeout;
420
425
429 int32_t reply_time;
430
435
440
445 std::set<std::string> registered_publishers;
446
452 std::map<std::string, DdtStatistics> statistics_map;
453
454 private:
462 void Init(DdtMemoryManager* mmgr, DdtLogger* ddt_logger,
463 const std::string uri_string, const std::string config);
464
468 void PrintConfigValues();
469
473 void StartHeartbeat();
474
478 void StopHeartbeat();
479
485 void HeartbeatThread();
486
492 void NotifySubscribers(const std::string& data_stream_identifier,
493 std::unique_lock<std::mutex>& producer_lock);
494
502 int32_t CheckPublisherUsingStream(const std::string data_stream_identifier,
503 const std::string remote_broker_uri);
504
520 void CreateDataConsumer(std::unique_lock<std::mutex>& consumer_lock,
521 const std::string subscriber_uuid,
522 const std::string data_stream_identifier,
523 const std::string remote_broker_uri,
524 const std::string originating_broker,
525 const int32_t latency, const int32_t deadline,
526 const std::string subscription_uri,
527 const int32_t number_of_samples);
528
534 void SearchAndUnregSubscriber(const std::string identifier);
535
545 int32_t RegisterLocalSubscriber(const std::string subscriber_uuid,
546 const std::string data_stream_identifier,
547 const int32_t latency,
548 const int32_t deadline);
549
560 int32_t RegisterLocalSubscriberRemote(
561 const std::string subscriber_uuid,
562 const std::string data_stream_identifier,
563 const std::string remote_broker_uri, const int32_t latency,
564 const int32_t deadline);
565
572 void ProcessNotificationEvent(const datatransfer::NotificationType type,
573 const std::string& data_stream_identifier);
574
584 void FreeShmThread(const std::string& data_stream_identifier,
585 const int shm_timeout);
586
593 bool CheckPublisherReregistration(const std::string& data_stream_identifier);
594
604 bool CheckSharedMemoryRecreation(const std::string& data_stream_identifier,
605 const int32_t max_data_sample_size,
606 const int32_t number_of_samples);
607
612 void Publish(const std::string& data_stream_identifier);
613
618 void PubRegNotification(const std::string& data_stream_identifier);
619
624 void PubUnregNotification(const std::string& data_stream_identifier);
625
631 std::map<std::string, DdtDataProducer*> producer_map;
632
638 std::map<std::string, DdtDataConsumer*> consumer_map;
639
646 std::map<std::string,
647 std::chrono::time_point<std::chrono::high_resolution_clock>>
648 client_map;
649
655 std::map<std::string, std::string> connected_brokers_map;
656
660 DdtMemoryManager* memory_manager;
661
667 std::map<std::string, DdtClient*> ddt_clients;
668
672 std::atomic<bool> stop_threads;
673
677 std::atomic<int> thread_counter;
678
682 std::mutex producer_mutex;
683
687 std::mutex consumer_mutex;
688
692 std::mutex client_mutex;
693
697 std::mutex statistics_mutex;
698
702 std::mutex ddt_clients_mutex;
703
707 std::mutex registered_publishers_mutex;
708
712 std::mutex connected_brokers_mutex;
713
717 DdtLogger* logger;
718
722 std::promise<void> exit_signal;
723
727 std::future<void> future_object;
728
732 std::atomic<bool> heartbeat_active;
733
737 std::string broker_uri;
738
742 std::string config_file;
743
747 boost::signals2::connection connection;
748
752 const int32_t SHM_TIMEOUT_MIN = 2;
753
757 const int32_t WAITING_TIME_MIN = 1000;
758
762 const int32_t REPLY_TIME_MIN = 2;
763
767 const int32_t HEARTBEAT_INTERVAL_MIN = 0;
768
772 const int32_t HEARTBEAT_TIMEOUT_MIN = 3;
773
777 const int32_t NUM_RETRIES = 30;
778};
779
780} // namespace ddt
781
782#endif /* DDTCONNECTIONMANAGER_HPP_ */
783
Definition: ddtConnectionManager.hpp:52
int32_t get_heartbeat_timeout() override
Definition: ddtConnectionManager.cpp:1482
int32_t waiting_time
Definition: ddtConnectionManager.hpp:424
std::string GetConfigPath() const
Definition: ddtConnectionManager.cpp:86
int32_t RegisterRemoteSubscriber(const std::string &remote_broker, const std::string &subscriber_uuid, const std::string &data_stream_identifier, const int32_t latency, const int32_t deadline) override
Definition: ddtConnectionManager.cpp:1135
const int32_t REPLY_TIME_DEFAULT
Definition: ddtConnectionManager.hpp:401
std::string CreateSubscriptionUri(const std::string publishing_uri, const std::string remote_broker_uri) const
Definition: ddtConnectionManager.cpp:1500
int32_t get_heartbeat_interval() override
Definition: ddtConnectionManager.cpp:1478
int32_t UnregisterSubscribers()
Definition: ddtConnectionManager.cpp:1273
void UpdateHeartbeat(const std::string &identifier) override
Definition: ddtConnectionManager.cpp:353
bool get_compute_checksum(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1486
const int32_t WAITING_TIME_DEFAULT
Definition: ddtConnectionManager.hpp:395
std::string get_broker_uri() override
Definition: ddtConnectionManager.cpp:1498
std::map< std::string, DdtStatistics > statistics_map
Definition: ddtConnectionManager.hpp:452
int32_t UnregisterPublisher(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:524
int32_t RegisterPublisher(const std::string &data_stream_identifier, const int32_t latency, int32_t deadline, const int32_t max_data_sample_size, const int32_t number_of_samples, const bool compute_checksum, const std::string &publishing_uri) override
Definition: ddtConnectionManager.cpp:362
const int32_t HEARTBEAT_TIMEOUT_DEFAULT
Definition: ddtConnectionManager.hpp:414
std::vector< std::string > GetRegisteredStreams() override
Definition: ddtConnectionManager.cpp:1633
void UpdateStatistics(const std::string &data_stream_identifier, const int32_t datavec_size, const uint64_t source_timestamp) override
Definition: ddtConnectionManager.cpp:1566
std::string get_publishing_uri(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1401
bool CheckRemotePublisherExists(const std::string &remote_broker_uri, const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1352
void CreateStatistics(const std::string &data_stream_identifier, const int32_t number_of_samples)
Definition: ddtConnectionManager.cpp:1588
void PublishData(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:652
bool CheckPubRegistrationPermitted(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1304
std::vector< std::string > get_statistics(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1425
void ReadIni()
Definition: ddtConnectionManager.cpp:103
~DdtConnectionManager() override
Definition: ddtConnectionManager.cpp:42
void LoadDefaults()
Definition: ddtConnectionManager.cpp:78
int32_t GetMaxPossibleBufferSize(int32_t max_data_sample_size) override
Definition: ddtConnectionManager.cpp:1628
const int32_t HEARTBEAT_INTERVAL_DEFAULT
Definition: ddtConnectionManager.hpp:407
bool CheckPublisherExists(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1339
const int32_t SHM_TIMEOUT_DEFAULT
Definition: ddtConnectionManager.hpp:389
int32_t UnregisterSubscriber(const std::string &data_stream_identifier, const std::string &subscriber_uuid) override
Definition: ddtConnectionManager.cpp:1189
int32_t shm_timeout
Definition: ddtConnectionManager.hpp:419
int32_t heartbeat_interval
Definition: ddtConnectionManager.hpp:434
int32_t get_notification_port(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1413
int32_t heartbeat_timeout
Definition: ddtConnectionManager.hpp:439
int32_t UnregisterPublishers()
Definition: ddtConnectionManager.cpp:628
int32_t reply_time
Definition: ddtConnectionManager.hpp:429
std::vector< std::string > GetConnectedBrokers() override
Definition: ddtConnectionManager.cpp:1658
std::string get_shm_id(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1493
bool CheckStreamIdInUse(const std::string &data_stream_identifier)
Definition: ddtConnectionManager.cpp:1323
int32_t get_number_of_samples(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1396
int32_t RegisterSubscriber(const std::string &subscriber_uuid, const std::string &data_stream_identifier, const std::string &remote_broker_uri, const int32_t latency, const int32_t deadline) override
Definition: ddtConnectionManager.cpp:742
std::set< std::string > registered_publishers
Definition: ddtConnectionManager.hpp:445
void ResetStatistics(const std::string &data_stream_identifier)
Definition: ddtConnectionManager.cpp:1614
int32_t get_max_data_sample_size(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1391
Definition: ddtLogger.hpp:51
Definition: ddtMemoryManager.hpp:72
Client class for the connection to remote brokers. This class creates MAL clients to connect to remot...
Data Consumer. This class provides the functionality to subscribe to a data stream,...
Data Producer. This class provides the functionality to publish data over network and enables sending...
Class to wrap the usage of log4cplus as logging utility. This file provides a wrapper class for the u...
Manager for shared memories. This class manages the handling of shared memories.
Definition: ddtClient.hpp:39