20#ifndef DDTCONNECTIONMANAGER_HPP_
21#define DDTCONNECTIONMANAGER_HPP_
23#define BOOST_BIND_GLOBAL_PLACEHOLDERS
25#include <boost/bind/bind.hpp>
26#include <boost/property_tree/ini_parser.hpp>
27#include <boost/property_tree/ptree.hpp>
28#include <boost/signals2/signal.hpp>
29#include <boost/bind/bind.hpp>
31#include <mal/rr/ServerAmi.hpp>
32#include <mal/rr/ServerContextProvider.hpp>
33#include <mal/rr/qos/ReplyTime.hpp>
42namespace mal = ::elt::mal;
43namespace datatransfer = ::elt::ddt::datatransfer;
52 :
public virtual datatransfer::DataBrokerRegistration {
61 const std::string uri_string,
62 const std::string config);
73 const std::string uri_string,
74 const std::string config);
99 const int32_t latency, int32_t deadline,
100 const int32_t max_data_sample_size,
101 const int32_t number_of_samples,
102 const bool compute_checksum,
103 const std::string& publishing_uri)
override;
112 const std::string& data_stream_identifier)
override;
129 void PublishData(
const std::string& data_stream_identifier)
override;
152 const std::string& data_stream_identifier,
153 const std::string& remote_broker_uri,
154 const int32_t latency,
155 const int32_t deadline)
override;
171 const std::string& subscriber_uuid,
172 const std::string& data_stream_identifier,
173 const int32_t latency,
174 const int32_t deadline)
override;
188 const std::string& subscriber_uuid)
override;
202 const std::string& data_stream_identifier)
override;
210 const std::string& data_stream_identifier)
override;
218 const std::string& data_stream_identifier)
override;
226 const std::string& data_stream_identifier)
override;
234 const std::string& data_stream_identifier)
override;
260 std::string
get_shm_id(
const std::string& data_stream_identifier)
override;
283 const std::string& data_stream_identifier)
override;
301 const std::string& remote_broker_uri,
302 const std::string& data_stream_identifier)
override;
311 const int32_t datavec_size,
312 const uint64_t source_timestamp)
override;
358 const std::string remote_broker_uri)
const;
376 const int32_t number_of_samples);
463 const std::string uri_string,
const std::string config);
468 void PrintConfigValues();
473 void StartHeartbeat();
478 void StopHeartbeat();
485 void HeartbeatThread();
492 void NotifySubscribers(
const std::string& data_stream_identifier,
493 std::unique_lock<std::mutex>& producer_lock);
502 int32_t CheckPublisherUsingStream(
const std::string data_stream_identifier,
503 const std::string remote_broker_uri);
520 void CreateDataConsumer(std::unique_lock<std::mutex>& consumer_lock,
521 const std::string subscriber_uuid,
522 const std::string data_stream_identifier,
523 const std::string remote_broker_uri,
524 const std::string originating_broker,
525 const int32_t latency,
const int32_t deadline,
526 const std::string subscription_uri,
527 const int32_t number_of_samples);
534 void SearchAndUnregSubscriber(
const std::string identifier);
545 int32_t RegisterLocalSubscriber(
const std::string subscriber_uuid,
546 const std::string data_stream_identifier,
547 const int32_t latency,
548 const int32_t deadline);
560 int32_t RegisterLocalSubscriberRemote(
561 const std::string subscriber_uuid,
562 const std::string data_stream_identifier,
563 const std::string remote_broker_uri,
const int32_t latency,
564 const int32_t deadline);
572 void ProcessNotificationEvent(
const datatransfer::NotificationType type,
573 const std::string& data_stream_identifier);
584 void FreeShmThread(
const std::string& data_stream_identifier,
593 bool CheckPublisherReregistration(
const std::string& data_stream_identifier);
604 bool CheckSharedMemoryRecreation(
const std::string& data_stream_identifier,
605 const int32_t max_data_sample_size,
606 const int32_t number_of_samples);
612 void Publish(
const std::string& data_stream_identifier);
618 void PubRegNotification(
const std::string& data_stream_identifier);
624 void PubUnregNotification(
const std::string& data_stream_identifier);
631 std::map<std::string, DdtDataProducer*> producer_map;
638 std::map<std::string, DdtDataConsumer*> consumer_map;
646 std::map<std::string,
647 std::chrono::time_point<std::chrono::high_resolution_clock>>
655 std::map<std::string, std::string> connected_brokers_map;
667 std::map<std::string, DdtClient*> ddt_clients;
672 std::atomic<bool> stop_threads;
677 std::atomic<int> thread_counter;
682 std::mutex producer_mutex;
687 std::mutex consumer_mutex;
692 std::mutex client_mutex;
697 std::mutex statistics_mutex;
702 std::mutex ddt_clients_mutex;
707 std::mutex registered_publishers_mutex;
712 std::mutex connected_brokers_mutex;
722 std::promise<void> exit_signal;
727 std::future<void> future_object;
732 std::atomic<bool> heartbeat_active;
737 std::string broker_uri;
742 std::string config_file;
747 boost::signals2::connection connection;
752 const int32_t SHM_TIMEOUT_MIN = 2;
757 const int32_t WAITING_TIME_MIN = 1000;
762 const int32_t REPLY_TIME_MIN = 2;
767 const int32_t HEARTBEAT_INTERVAL_MIN = 0;
772 const int32_t HEARTBEAT_TIMEOUT_MIN = 3;
777 const int32_t NUM_RETRIES = 30;
Definition: ddtConnectionManager.hpp:52
int32_t get_heartbeat_timeout() override
Definition: ddtConnectionManager.cpp:1482
int32_t waiting_time
Definition: ddtConnectionManager.hpp:424
std::string GetConfigPath() const
Definition: ddtConnectionManager.cpp:86
int32_t RegisterRemoteSubscriber(const std::string &remote_broker, const std::string &subscriber_uuid, const std::string &data_stream_identifier, const int32_t latency, const int32_t deadline) override
Definition: ddtConnectionManager.cpp:1135
const int32_t REPLY_TIME_DEFAULT
Definition: ddtConnectionManager.hpp:401
std::string CreateSubscriptionUri(const std::string publishing_uri, const std::string remote_broker_uri) const
Definition: ddtConnectionManager.cpp:1500
int32_t get_heartbeat_interval() override
Definition: ddtConnectionManager.cpp:1478
int32_t UnregisterSubscribers()
Definition: ddtConnectionManager.cpp:1273
void UpdateHeartbeat(const std::string &identifier) override
Definition: ddtConnectionManager.cpp:353
bool get_compute_checksum(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1486
const int32_t WAITING_TIME_DEFAULT
Definition: ddtConnectionManager.hpp:395
std::string get_broker_uri() override
Definition: ddtConnectionManager.cpp:1498
std::map< std::string, DdtStatistics > statistics_map
Definition: ddtConnectionManager.hpp:452
int32_t UnregisterPublisher(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:524
int32_t RegisterPublisher(const std::string &data_stream_identifier, const int32_t latency, int32_t deadline, const int32_t max_data_sample_size, const int32_t number_of_samples, const bool compute_checksum, const std::string &publishing_uri) override
Definition: ddtConnectionManager.cpp:362
const int32_t HEARTBEAT_TIMEOUT_DEFAULT
Definition: ddtConnectionManager.hpp:414
std::vector< std::string > GetRegisteredStreams() override
Definition: ddtConnectionManager.cpp:1633
void UpdateStatistics(const std::string &data_stream_identifier, const int32_t datavec_size, const uint64_t source_timestamp) override
Definition: ddtConnectionManager.cpp:1566
std::string get_publishing_uri(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1401
bool CheckRemotePublisherExists(const std::string &remote_broker_uri, const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1352
void CreateStatistics(const std::string &data_stream_identifier, const int32_t number_of_samples)
Definition: ddtConnectionManager.cpp:1588
void PublishData(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:652
bool CheckPubRegistrationPermitted(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1304
std::vector< std::string > get_statistics(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1425
void ReadIni()
Definition: ddtConnectionManager.cpp:103
~DdtConnectionManager() override
Definition: ddtConnectionManager.cpp:42
void LoadDefaults()
Definition: ddtConnectionManager.cpp:78
int32_t GetMaxPossibleBufferSize(int32_t max_data_sample_size) override
Definition: ddtConnectionManager.cpp:1628
const int32_t HEARTBEAT_INTERVAL_DEFAULT
Definition: ddtConnectionManager.hpp:407
bool CheckPublisherExists(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1339
const int32_t SHM_TIMEOUT_DEFAULT
Definition: ddtConnectionManager.hpp:389
int32_t UnregisterSubscriber(const std::string &data_stream_identifier, const std::string &subscriber_uuid) override
Definition: ddtConnectionManager.cpp:1189
int32_t shm_timeout
Definition: ddtConnectionManager.hpp:419
int32_t heartbeat_interval
Definition: ddtConnectionManager.hpp:434
int32_t get_notification_port(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1413
int32_t heartbeat_timeout
Definition: ddtConnectionManager.hpp:439
int32_t UnregisterPublishers()
Definition: ddtConnectionManager.cpp:628
int32_t reply_time
Definition: ddtConnectionManager.hpp:429
std::vector< std::string > GetConnectedBrokers() override
Definition: ddtConnectionManager.cpp:1658
std::string get_shm_id(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1493
bool CheckStreamIdInUse(const std::string &data_stream_identifier)
Definition: ddtConnectionManager.cpp:1323
int32_t get_number_of_samples(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1396
int32_t RegisterSubscriber(const std::string &subscriber_uuid, const std::string &data_stream_identifier, const std::string &remote_broker_uri, const int32_t latency, const int32_t deadline) override
Definition: ddtConnectionManager.cpp:742
std::set< std::string > registered_publishers
Definition: ddtConnectionManager.hpp:445
void ResetStatistics(const std::string &data_stream_identifier)
Definition: ddtConnectionManager.cpp:1614
int32_t get_max_data_sample_size(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1391
Definition: ddtLogger.hpp:51
Definition: ddtMemoryManager.hpp:72
Client class for the connection to remote brokers. This class creates MAL clients to connect to remot...
Data Consumer. This class provides the functionality to subscribe to a data stream,...
Data Producer. This class provides the functionality to publish data over network and enables sending...
Class to wrap the usage of log4cplus as logging utility. This file provides a wrapper class for the u...
Manager for shared memories. This class manages the handling of shared memories.
Definition: ddtClient.hpp:39