20#ifndef DDTCONNECTIONMANAGER_HPP_
21#define DDTCONNECTIONMANAGER_HPP_
23#define BOOST_BIND_GLOBAL_PLACEHOLDERS
25#include <boost/bind/bind.hpp>
26#include <boost/property_tree/ini_parser.hpp>
27#include <boost/property_tree/ptree.hpp>
28#include <boost/signals2/signal.hpp>
29#include <boost/bind/bind.hpp>
31#include <mal/rr/ServerAmi.hpp>
32#include <mal/rr/ServerContextProvider.hpp>
33#include <mal/rr/qos/ReplyTime.hpp>
42namespace mal = ::elt::mal;
43namespace datatransfer = ::elt::ddt::datatransfer;
52 :
public virtual datatransfer::DataBrokerRegistration {
61 const std::string uri_string,
62 const std::string config);
73 const std::string uri_string,
74 const std::string config);
99 const int32_t latency, int32_t deadline,
100 const int32_t max_data_sample_size,
101 const int32_t number_of_samples,
102 const bool compute_checksum,
103 const std::string& publishing_uri)
override;
112 const std::string& data_stream_identifier)
override;
129 void PublishData(
const std::string& data_stream_identifier)
override;
152 const std::string& data_stream_identifier,
153 const std::string& remote_broker_uri,
154 const int32_t latency,
155 const int32_t deadline)
override;
171 const std::string& subscriber_uuid,
172 const std::string& data_stream_identifier,
173 const int32_t latency,
174 const int32_t deadline)
override;
188 const std::string& subscriber_uuid)
override;
202 const std::string& data_stream_identifier)
override;
210 const std::string& data_stream_identifier)
override;
218 const std::string& data_stream_identifier)
override;
226 const std::string& data_stream_identifier)
override;
234 const std::string& data_stream_identifier)
override;
260 std::string
get_shm_id(
const std::string& data_stream_identifier)
override;
267 std::string
get_shm_full_path(
const std::string& data_stream_identifier)
override;
290 const std::string& data_stream_identifier)
override;
308 const std::string& remote_broker_uri,
309 const std::string& data_stream_identifier)
override;
318 const int32_t datavec_size,
319 const uint64_t source_timestamp)
override;
365 const std::string remote_broker_uri)
const;
383 const int32_t number_of_samples);
470 const std::string uri_string,
const std::string config);
475 void PrintConfigValues();
480 void StartHeartbeat();
485 void StopHeartbeat();
492 void HeartbeatThread();
499 void NotifySubscribers(
const std::string& data_stream_identifier,
500 std::unique_lock<std::mutex>& producer_lock);
509 int32_t CheckPublisherUsingStream(
const std::string data_stream_identifier,
510 const std::string remote_broker_uri);
527 void CreateDataConsumer(std::unique_lock<std::mutex>& consumer_lock,
528 const std::string subscriber_uuid,
529 const std::string data_stream_identifier,
530 const std::string remote_broker_uri,
531 const std::string originating_broker,
532 const int32_t latency,
const int32_t deadline,
533 const std::string subscription_uri,
534 const int32_t number_of_samples);
541 void SearchAndUnregSubscriber(
const std::string identifier);
552 int32_t RegisterLocalSubscriber(
const std::string subscriber_uuid,
553 const std::string data_stream_identifier,
554 const int32_t latency,
555 const int32_t deadline);
567 int32_t RegisterLocalSubscriberRemote(
568 const std::string subscriber_uuid,
569 const std::string data_stream_identifier,
570 const std::string remote_broker_uri,
const int32_t latency,
571 const int32_t deadline);
579 void ProcessNotificationEvent(
const datatransfer::NotificationType type,
580 const std::string& data_stream_identifier);
591 void FreeShmThread(
const std::string& data_stream_identifier,
600 bool CheckPublisherReregistration(
const std::string& data_stream_identifier);
611 bool CheckSharedMemoryRecreation(
const std::string& data_stream_identifier,
612 const int32_t max_data_sample_size,
613 const int32_t number_of_samples);
619 void Publish(
const std::string& data_stream_identifier);
625 void PubRegNotification(
const std::string& data_stream_identifier);
631 void PubUnregNotification(
const std::string& data_stream_identifier);
638 std::map<std::string, DdtDataProducer*> producer_map;
645 std::map<std::string, DdtDataConsumer*> consumer_map;
653 std::map<std::string,
654 std::chrono::time_point<std::chrono::high_resolution_clock>>
662 std::map<std::string, std::string> connected_brokers_map;
674 std::map<std::string, DdtClient*> ddt_clients;
679 std::atomic<bool> stop_threads;
684 std::atomic<int> thread_counter;
689 std::mutex producer_mutex;
694 std::mutex consumer_mutex;
699 std::mutex client_mutex;
704 std::mutex statistics_mutex;
709 std::mutex ddt_clients_mutex;
714 std::mutex registered_publishers_mutex;
719 std::mutex connected_brokers_mutex;
729 std::promise<void> exit_signal;
734 std::future<void> future_object;
739 std::atomic<bool> heartbeat_active;
744 std::string broker_uri;
749 std::string config_file;
754 boost::signals2::connection connection;
759 const int32_t SHM_TIMEOUT_MIN = 2;
764 const int32_t WAITING_TIME_MIN = 1000;
769 const int32_t REPLY_TIME_MIN = 2;
774 const int32_t HEARTBEAT_INTERVAL_MIN = 0;
779 const int32_t HEARTBEAT_TIMEOUT_MIN = 3;
784 const int32_t NUM_RETRIES = 30;
Definition ddtConnectionManager.hpp:52
int32_t get_heartbeat_timeout() override
Definition ddtConnectionManager.cpp:1482
int32_t waiting_time
Definition ddtConnectionManager.hpp:431
std::string get_shm_full_path(const std::string &data_stream_identifier) override
Definition ddtConnectionManager.cpp:1498
std::string GetConfigPath() const
Definition ddtConnectionManager.cpp:86
int32_t RegisterRemoteSubscriber(const std::string &remote_broker, const std::string &subscriber_uuid, const std::string &data_stream_identifier, const int32_t latency, const int32_t deadline) override
Definition ddtConnectionManager.cpp:1135
const int32_t REPLY_TIME_DEFAULT
Definition ddtConnectionManager.hpp:408
std::string CreateSubscriptionUri(const std::string publishing_uri, const std::string remote_broker_uri) const
Definition ddtConnectionManager.cpp:1505
int32_t get_heartbeat_interval() override
Definition ddtConnectionManager.cpp:1478
int32_t UnregisterSubscribers()
Definition ddtConnectionManager.cpp:1273
void UpdateHeartbeat(const std::string &identifier) override
Definition ddtConnectionManager.cpp:353
bool get_compute_checksum(const std::string &data_stream_identifier) override
Definition ddtConnectionManager.cpp:1486
const int32_t WAITING_TIME_DEFAULT
Definition ddtConnectionManager.hpp:402
std::string get_broker_uri() override
Definition ddtConnectionManager.cpp:1503
std::map< std::string, DdtStatistics > statistics_map
Definition ddtConnectionManager.hpp:459
int32_t UnregisterPublisher(const std::string &data_stream_identifier) override
Definition ddtConnectionManager.cpp:524
int32_t RegisterPublisher(const std::string &data_stream_identifier, const int32_t latency, int32_t deadline, const int32_t max_data_sample_size, const int32_t number_of_samples, const bool compute_checksum, const std::string &publishing_uri) override
Definition ddtConnectionManager.cpp:362
const int32_t HEARTBEAT_TIMEOUT_DEFAULT
Definition ddtConnectionManager.hpp:421
std::vector< std::string > GetRegisteredStreams() override
Definition ddtConnectionManager.cpp:1638
void UpdateStatistics(const std::string &data_stream_identifier, const int32_t datavec_size, const uint64_t source_timestamp) override
Definition ddtConnectionManager.cpp:1571
std::string get_publishing_uri(const std::string &data_stream_identifier) override
Definition ddtConnectionManager.cpp:1401
bool CheckRemotePublisherExists(const std::string &remote_broker_uri, const std::string &data_stream_identifier) override
Definition ddtConnectionManager.cpp:1352
void CreateStatistics(const std::string &data_stream_identifier, const int32_t number_of_samples)
Definition ddtConnectionManager.cpp:1593
void PublishData(const std::string &data_stream_identifier) override
Definition ddtConnectionManager.cpp:652
bool CheckPubRegistrationPermitted(const std::string &data_stream_identifier) override
Definition ddtConnectionManager.cpp:1304
std::vector< std::string > get_statistics(const std::string &data_stream_identifier) override
Definition ddtConnectionManager.cpp:1425
void ReadIni()
Definition ddtConnectionManager.cpp:103
~DdtConnectionManager() override
Definition ddtConnectionManager.cpp:42
void LoadDefaults()
Definition ddtConnectionManager.cpp:78
int32_t GetMaxPossibleBufferSize(int32_t max_data_sample_size) override
Definition ddtConnectionManager.cpp:1633
const int32_t HEARTBEAT_INTERVAL_DEFAULT
Definition ddtConnectionManager.hpp:414
bool CheckPublisherExists(const std::string &data_stream_identifier) override
Definition ddtConnectionManager.cpp:1339
const int32_t SHM_TIMEOUT_DEFAULT
Definition ddtConnectionManager.hpp:396
int32_t UnregisterSubscriber(const std::string &data_stream_identifier, const std::string &subscriber_uuid) override
Definition ddtConnectionManager.cpp:1189
int32_t shm_timeout
Definition ddtConnectionManager.hpp:426
int32_t heartbeat_interval
Definition ddtConnectionManager.hpp:441
int32_t get_notification_port(const std::string &data_stream_identifier) override
Definition ddtConnectionManager.cpp:1413
int32_t heartbeat_timeout
Definition ddtConnectionManager.hpp:446
int32_t UnregisterPublishers()
Definition ddtConnectionManager.cpp:628
int32_t reply_time
Definition ddtConnectionManager.hpp:436
std::vector< std::string > GetConnectedBrokers() override
Definition ddtConnectionManager.cpp:1663
std::string get_shm_id(const std::string &data_stream_identifier) override
Definition ddtConnectionManager.cpp:1493
bool CheckStreamIdInUse(const std::string &data_stream_identifier)
Definition ddtConnectionManager.cpp:1323
int32_t get_number_of_samples(const std::string &data_stream_identifier) override
Definition ddtConnectionManager.cpp:1396
int32_t RegisterSubscriber(const std::string &subscriber_uuid, const std::string &data_stream_identifier, const std::string &remote_broker_uri, const int32_t latency, const int32_t deadline) override
Definition ddtConnectionManager.cpp:742
std::set< std::string > registered_publishers
Definition ddtConnectionManager.hpp:452
DdtConnectionManager(DdtLogger *ddt_logger, const std::string uri_string, const std::string config)
Definition ddtConnectionManager.cpp:24
void ResetStatistics(const std::string &data_stream_identifier)
Definition ddtConnectionManager.cpp:1619
int32_t get_max_data_sample_size(const std::string &data_stream_identifier) override
Definition ddtConnectionManager.cpp:1391
Definition ddtLogger.hpp:51
Definition ddtMemoryManager.hpp:72
Client class for the connection to remote brokers. This class creates MAL clients to connect to remot...
Data Consumer. This class provides the functionality to subscribe to a data stream,...
Data Producer. This class provides the functionality to publish data over network and enables sending...
Class to wrap the usage of log4cplus as logging utility. This file provides a wrapper class for the u...
Manager for shared memories. This class manages the handling of shared memories.
Definition ddtClient.hpp:39