ddt  0.1
ddtDataTransferLib.hpp
Go to the documentation of this file.
1 // @copyright
2 // (c) Copyright ESO 2020
3 // All Rights Reserved
4 // ESO (eso.org) is an Intergovernmental Organization, and therefore special
5 // legal conditions apply.
6 //
7 // @file ddtDataTransferLib.hpp
8 // @brief Base class for DdtDataPublishers and DdtDataSubscribers.
9 //
10 // This is the base class for DdtDataPublishers and DdtDataSubscribers.
11 //
12 // @author Matthias Grimm, CGI
13 // @since 2020/01/16
14 //
15 
16 #ifndef DDTDATATRANSFERLIB_HPP_
17 #define DDTDATATRANSFERLIB_HPP_
18 
19 #include <Ddtdatatransfericd.hpp>
20 #include <boost/property_tree/ini_parser.hpp>
21 #include <boost/property_tree/ptree.hpp>
22 #include <iostream>
23 #include <mal/Cii.hpp>
24 #include <mal/rr/qos/ReplyTime.hpp>
25 #include <mal/utility/LoadMal.hpp>
26 
27 #include "ddt/ddtErrorCodes.hpp"
28 #include "ddt/ddtLogger.hpp"
30 
31 namespace mal = ::elt::mal;
32 namespace datatransfer = ::elt::ddt::datatransfer;
33 
34 namespace ddt {
35 
40  public:
44  explicit DdtDataTransferLib(DdtLogger* ddt_logger);
45 
50 
54  void SetQoS(const int ddt_latency, const int ddt_deadline);
55 
60  virtual int RegisterPublisher(std::string broker_uri,
61  std::string data_stream_identifier,
62  bool compute_checksum) {
63  return 0;
64  };
65 
69  virtual int UnregisterPublisher() { return 0; };
70 
74  virtual void PublishData(){};
75 
80  virtual int RegisterSubscriber(std::string broker_uri,
81  std::string data_stream_identifier,
82  std::string remote_broker_uri,
83  int32_t reading_interval = 10) {
84  return 0;
85  };
86 
90  virtual int UnregisterSubscriber() { return 0; };
91 
95  virtual DataSample* ReadData() { return nullptr; };
96 
97  protected:
101  void StartHeartbeat(const int32_t interval, const std::string id);
102 
106  void StopHeartbeat();
107 
112  void CheckHeartbeatTimeout(int32_t& new_reply_time);
113 
118  const std::string VerifyPathInBrokerUri(std::string broker_uri);
119 
125  const std::string GetConfigFilePath();
126 
127  int latency;
129  int deadline;
131  int32_t reply_time;
138  std::promise<void> exit_signal_heartbeat;
139 
143  std::future<void> future_object_heartbeat;
144 
149  std::atomic<bool> heartbeat_active;
150 
154  std::unique_ptr<
155  datatransfer::DataBrokerRegistrationSync,
156  std::default_delete<datatransfer::DataBrokerRegistrationSync> >
158 
162  std::atomic<bool> connected_to_broker;
163 
167  elt::mal::rr::ListenerRegistration connection_listener;
168 
173 
177  const int32_t REPLY_TIME_DEFAULT = 6;
178 
182  const int32_t REPLY_TIME_MIN = 2;
183 
184  private:
188  void Init(DdtLogger* ddt_logger);
189 
193  void HeartbeatThread();
194 
195  std::string identifier;
196 
197  const std::string BROKER_PATH{"/broker/Broker1"};
198  const int LATENCY_DEFAULT = 10000;
199  const int DEADLINE_DEFAULT = 10;
200  const int32_t HEARTBEAT_INTERVAL_DEFAULT = 1;
201 };
202 
203 } // namespace ddt
204 
205 #endif /* DDTDATATRANSFERLIB_HPP_ */
ddt::DdtDataTransferLib::~DdtDataTransferLib
virtual ~DdtDataTransferLib()
ddt::DdtDataTransferLib::GetConfigFilePath
const std::string GetConfigFilePath()
Definition: ddtDataTransferLib.cpp:146
ddt::DdtDataTransferLib::connected_to_broker
std::atomic< bool > connected_to_broker
Definition: ddtDataTransferLib.hpp:162
ddt::DdtDataTransferLib::StartHeartbeat
void StartHeartbeat(const int32_t interval, const std::string id)
Definition: ddtDataTransferLib.cpp:40
ddt::DdtDataTransferLib::VerifyPathInBrokerUri
const std::string VerifyPathInBrokerUri(std::string broker_uri)
Definition: ddtDataTransferLib.cpp:131
ddtErrorCodes.hpp
ddt::DdtLogger
Definition: ddtLogger.hpp:71
ddt
Definition: ddtClient.hpp:36
ddt::DdtDataTransferLib::REPLY_TIME_MIN
const int32_t REPLY_TIME_MIN
Definition: ddtDataTransferLib.hpp:182
ddt::DdtDataTransferLib::latency
int latency
Definition: ddtDataTransferLib.hpp:127
ddt::DdtDataTransferLib::SetQoS
void SetQoS(const int ddt_latency, const int ddt_deadline)
Definition: ddtDataTransferLib.cpp:35
ddt::DdtDataTransferLib::UnregisterPublisher
virtual int UnregisterPublisher()
Definition: ddtDataTransferLib.hpp:69
ddt::DataSample
Definition: ddtMemoryAccessor.hpp:174
ddt::DdtDataTransferLib::heartbeat_active
std::atomic< bool > heartbeat_active
Definition: ddtDataTransferLib.hpp:149
ddt::DdtDataTransferLib::RegisterSubscriber
virtual int RegisterSubscriber(std::string broker_uri, std::string data_stream_identifier, std::string remote_broker_uri, int32_t reading_interval=10)
Definition: ddtDataTransferLib.hpp:80
ddt::DdtDataTransferLib::PublishData
virtual void PublishData()
Definition: ddtDataTransferLib.hpp:74
ddt::DdtDataTransferLib::exit_signal_heartbeat
std::promise< void > exit_signal_heartbeat
Definition: ddtDataTransferLib.hpp:138
ddt::DdtDataTransferLib::CheckHeartbeatTimeout
void CheckHeartbeatTimeout(int32_t &new_reply_time)
Definition: ddtDataTransferLib.cpp:97
ddt::DdtDataTransferLib::future_object_heartbeat
std::future< void > future_object_heartbeat
Definition: ddtDataTransferLib.hpp:143
ddt::DdtDataTransferLib::UnregisterSubscriber
virtual int UnregisterSubscriber()
Definition: ddtDataTransferLib.hpp:90
ddt::DdtDataTransferLib::DdtDataTransferLib
DdtDataTransferLib(DdtLogger *ddt_logger)
Definition: ddtDataTransferLib.cpp:20
ddt::DdtDataTransferLib::logger
DdtLogger * logger
Definition: ddtDataTransferLib.hpp:172
ddt::DdtDataTransferLib::REPLY_TIME_DEFAULT
const int32_t REPLY_TIME_DEFAULT
Definition: ddtDataTransferLib.hpp:177
ddtLogger.hpp
ddt::DdtDataTransferLib::reply_time
int32_t reply_time
Definition: ddtDataTransferLib.hpp:131
ddt::DdtDataTransferLib::RegisterPublisher
virtual int RegisterPublisher(std::string broker_uri, std::string data_stream_identifier, bool compute_checksum)
Definition: ddtDataTransferLib.hpp:60
ddt::DdtDataTransferLib::ReadData
virtual DataSample * ReadData()
Definition: ddtDataTransferLib.hpp:95
ddt::DdtDataTransferLib::client
std::unique_ptr< datatransfer::DataBrokerRegistrationSync, std::default_delete< datatransfer::DataBrokerRegistrationSync > > client
Definition: ddtDataTransferLib.hpp:157
ddt::DdtDataTransferLib::StopHeartbeat
void StopHeartbeat()
Definition: ddtDataTransferLib.cpp:59
ddt::DdtDataTransferLib::deadline
int deadline
Definition: ddtDataTransferLib.hpp:129
ddtMemoryAccessor.hpp
ddt::DdtDataTransferLib
Definition: ddtDataTransferLib.hpp:39
ddt::DdtDataTransferLib::connection_listener
elt::mal::rr::ListenerRegistration connection_listener
Definition: ddtDataTransferLib.hpp:167
ddt::DdtDataTransferLib::heartbeat_interval
int32_t heartbeat_interval
Definition: ddtDataTransferLib.hpp:132