17 #ifndef DDTCONNECTIONMANAGER_HPP_
18 #define DDTCONNECTIONMANAGER_HPP_
20 #include <boost/bind.hpp>
21 #include <boost/property_tree/ini_parser.hpp>
22 #include <boost/property_tree/ptree.hpp>
23 #include <boost/signals2/signal.hpp>
24 #include <boost/bind/bind.hpp>
28 #include <mal/rr/ServerAmi.hpp>
29 #include <mal/rr/ServerContextProvider.hpp>
30 #include <mal/rr/qos/ReplyTime.hpp>
38 namespace mal = ::elt::mal;
39 namespace datatransfer = ::elt::ddt::datatransfer;
48 :
public virtual datatransfer::DataBrokerRegistration {
54 const std::string uri_string);
60 const std::string uri_string);
74 int32_t latency, int32_t deadline,
75 int32_t max_data_sample_size,
76 int32_t number_of_samples,
bool compute_checksum,
77 const std::string& publishing_uri)
override;
84 const std::string& data_stream_identifier)
override;
99 void PublishData(
const std::string& data_stream_identifier)
override;
114 const std::string& data_stream_identifier,
115 const std::string& remote_broker_uri,
116 int32_t latency, int32_t deadline)
override;
124 const std::string& subscriber_uuid,
125 const std::string& data_stream_identifier,
126 const int32_t latency,
127 const int32_t deadline)
override;
138 const std::string& subscriber_uuid)
override;
149 const std::string& data_stream_identifier)
override;
155 const std::string& data_stream_identifier)
override;
161 const std::string& data_stream_identifier)
override;
167 const std::string& data_stream_identifier)
override;
173 const std::string& data_stream_identifier)
override;
193 std::string
get_shm_id(
const std::string& data_stream_identifier)
override;
207 const std::string& data_stream_identifier)
override;
218 const std::string& remote_broker_uri,
219 const std::string& data_stream_identifier)
override;
228 const int32_t datavec_size,
229 const uint64_t source_timestamp)
override;
287 const std::string uri_string);
292 void PrintConfigValues();
297 void StartHeartbeat();
302 void StopHeartbeat();
309 void HeartbeatThread();
314 int32_t RegisterLocalSubscriber(
const std::string subscriber_uuid,
315 const std::string data_stream_identifier,
316 int32_t latency, int32_t deadline);
321 int32_t RegisterLocalSubscriberRemote(
322 const std::string subscriber_uuid,
323 const std::string data_stream_identifier, std::string remote_broker_uri,
324 int32_t latency, int32_t deadline);
330 void ProcessNotificationEvent(datatransfer::NotificationType type,
331 const std::string& data_stream_identifier);
336 std::string CreateSubscriptionUri(std::string publishing_uri,
337 std::string remote_broker_uri);
348 void FreeShmThread(
const std::string& data_stream_identifier,
357 bool CheckPublisherReregistration(
const std::string& data_stream_identifier);
366 bool CheckStreamIdInUse(
const std::string& data_stream_identifier);
377 bool CheckSharedMemoryRecreation(
const std::string& data_stream_identifier,
378 const int32_t max_data_sample_size,
379 const int32_t number_of_samples);
387 void CreateStatistics(
const std::string& data_stream_identifier,
388 const int32_t number_of_samples);
394 void ResetStatistics(
const std::string& data_stream_identifier);
396 std::map<std::string, DdtDataProducer*>
398 std::map<std::string, DdtDataConsumer*>
400 std::map<std::string,
401 std::chrono::time_point<std::chrono::high_resolution_clock>>
403 std::map<std::string, DdtStatistics>
405 std::map<std::string, std::string>
406 connected_brokers_map;
410 std::map<std::string, DdtClient*>
413 std::atomic<bool> stop_threads;
414 std::atomic<int> thread_counter;
416 std::mutex producer_mutex;
417 std::mutex consumer_mutex;
418 std::mutex client_mutex;
419 std::mutex statistics_mutex;
420 std::mutex ddt_clients_mutex;
421 std::mutex registered_publishers_mutex;
422 std::mutex connected_brokers_mutex;
429 std::promise<void> exit_signal;
430 std::future<void> future_object;
431 std::atomic<bool> heartbeat_active;
432 std::set<std::string> registered_publishers;
433 std::string broker_uri;
435 boost::signals2::connection connection;
437 const int32_t SHM_TIMEOUT_MIN = 2;
438 const int32_t WAITING_TIME_MIN = 1000;
439 const int32_t REPLY_TIME_MIN = 2;
440 const int32_t HEARTBEAT_INTERVAL_MIN = 0;
441 const int32_t HEARTBEAT_TIMEOUT_MIN = 3;
442 const int32_t NUM_RETRIES = 30;