ifw-daq  3.0.1
IFW Data Acquisition modules
dpmClient.hpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_ocm_libdaq_test
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief `daq::DpmClient`
7  */
8 #ifndef DAQ_DPMCLIENT_HPP
9 #define DAQ_DPMCLIENT_HPP
10 #include "config.hpp"
11 
12 #include <filesystem>
13 
14 #include <boost/asio/io_context.hpp>
15 #include <boost/asio/steady_timer.hpp>
16 #include <boost/signals2/signal.hpp>
17 #include <boost/thread/future.hpp>
18 #include <rad/ioExecutor.hpp>
19 #include <log4cplus/logger.h>
20 
21 #include <daq/status.hpp>
22 #include <daq/resourceToken.hpp>
23 
24 namespace daqif {
25 class DpmDaqControlAsync;
26 }
27 
28 namespace elt::mal {
29 class Mal;
30 }
31 namespace mal = ::elt::mal;
32 
33 namespace daq {
34 
35 /**
36  * Interface to DPM server.
37  *
38  * Any implementation is expected to also provide a subscriber for DPM status and storage change
39  * signals.
40  */
41 class DpmClient {
42 public:
43  using StorageSignal = boost::signals2::signal<void(std::filesystem::space_info)>;
44  using StatusSignal = boost::signals2::signal<void(Status)>;
45 
46  /**
47  * Schedule merging.
48  */
49  virtual auto ScheduleAsync(std::string const& spec) -> boost::future<State> = 0;
50 
51  /**
52  * Abort merging.
53  */
54  virtual auto AbortAsync(std::string const& id) -> boost::future<State> = 0;
55 
56  /**
57  * Start monitoring DAQ status.
58  *
59  * Status is requested:
60  * - Initially.
61  * - Periodically.
62  *
63  * Changes are reported via the StatusSignal.
64  */
65  virtual void StartMonitorStatus(std::string const& id) = 0;
66 
67  /**
68  * Stop monitoring DAQ status.
69  */
70  virtual void StopMonitorStatus(std::string const& id) = 0;
71 
72  /**
73  * Connect slot to status change signal.
74  *
75  * DpmClient ensures that signals are only emitted from main thread.
76  */
77  virtual auto
78  ConnectStatusSignal(StatusSignal::slot_type const& slot) -> boost::signals2::connection = 0;
79 
80  /**
81  * Connect slot to storage change signal.
82  *
83  * DpmClient ensures that signals are only emitted from main thread.
84  */
85  virtual auto
86  ConnectStorageSignal(StorageSignal::slot_type const& slot) -> boost::signals2::connection = 0;
87 };
88 
89 /**
90  * Connection parameters for DPM
91  */
93  std::string rr_uri;
94  std::string ps_uri;
95  std::chrono::seconds timeout = std::chrono::seconds(5);
96  std::chrono::seconds status_retry_interval = std::chrono::seconds(15);
97 };
98 
99 class DpmClientImpl : public DpmClient {
100 public:
101  /**
102  * Retry delay when requesting status.
103  */
106 
107  DpmClientImpl(boost::asio::io_context& io_ctx, mal::Mal& mal, DpmClientParams params);
108 
110 
111  /**
112  * Schedule merging.
113  */
114  virtual auto ScheduleAsync(std::string const& spec) -> boost::future<State> override;
115 
116  /**
117  * Abort merging.
118  */
119  virtual auto AbortAsync(std::string const& id) -> boost::future<State> override;
120 
121  /**
122  * Start monitoring DAQ status.
123  *
124  * Status is requested:
125  * - Initially
126  * - and rescheduled if reply times out (DPM not running)
127  *
128  * Changes are reported via the StatusSignal.
129  */
130  virtual void StartMonitorStatus(std::string const& id) override;
131 
132  /**
133  * Stop monitoring DAQ status.
134  */
135  virtual void StopMonitorStatus(std::string const& id) override;
136 
137  /**
138  * Connect slot to status change signal.
139  *
140  * DpmClient ensures that signals are only emitted from main thread.
141  *
142  * @note Status may come from different sources and no total order is guaranteed. Observers of
143  * status changes should make effort to discard/ignore changes that occurred before already
144  * recorded status updated.
145  */
146  virtual auto ConnectStatusSignal(StatusSignal::slot_type const& slot)
147  -> boost::signals2::connection override;
148 
149  /**
150  * Connect slot to storage change signal.
151  *
152  * DpmClient ensures that signals are only emitted from main thread.
153  */
154  virtual auto ConnectStorageSignal(StorageSignal::slot_type const& slot)
155  -> boost::signals2::connection override;
156 
157 private:
158  void RequestStatus(std::string const& id);
159  void ScheduleRequestStatus(std::string const& id);
160  void UpdateStatus(Status const& status);
161 
162  struct Subscriptions;
163 
164  boost::asio::io_context& m_io_ctx;
165  rad::IoExecutor m_executor;
166 
167  mal::Mal& m_mal;
168  DpmClientParams m_params;
169  log4cplus::Logger m_logger;
170 
171  StatusSignal m_status_signal;
172  StorageSignal m_storage_signal;
173 
174  std::shared_ptr<daqif::DpmDaqControlAsync> m_dpmif;
175  std::unique_ptr<Subscriptions> m_subs;
176 
177 
178  std::unordered_map<std::string, boost::asio::steady_timer> m_timers;
179 
180  std::shared_ptr<bool> m_liveness = std::make_shared<bool>(true);
181  /**
182  * Limit number of concurrent status requests.
183  */
184  Resource m_res_status = Resource(5);
185 };
186 
187 } // namespace daq
188 #endif
virtual auto ScheduleAsync(std::string const &spec) -> boost::future< State > override
Schedule merging.
Definition: dpmClient.cpp:96
virtual void StopMonitorStatus(std::string const &id) override
Stop monitoring DAQ status.
Definition: dpmClient.cpp:193
DpmClientImpl(boost::asio::io_context &io_ctx, mal::Mal &mal, DpmClientParams params)
Definition: dpmClient.cpp:30
virtual auto ConnectStorageSignal(StorageSignal::slot_type const &slot) -> boost::signals2::connection override
Connect slot to storage change signal.
Definition: dpmClient.cpp:202
virtual auto ConnectStatusSignal(StatusSignal::slot_type const &slot) -> boost::signals2::connection override
Connect slot to status change signal.
Definition: dpmClient.cpp:197
virtual void StartMonitorStatus(std::string const &id) override
Start monitoring DAQ status.
Definition: dpmClient.cpp:114
virtual auto AbortAsync(std::string const &id) -> boost::future< State > override
Abort merging.
Definition: dpmClient.cpp:105
Interface to DPM server.
Definition: dpmClient.hpp:41
virtual auto ConnectStatusSignal(StatusSignal::slot_type const &slot) -> boost::signals2::connection=0
Connect slot to status change signal.
virtual void StopMonitorStatus(std::string const &id)=0
Stop monitoring DAQ status.
boost::signals2::signal< void(Status)> StatusSignal
Definition: dpmClient.hpp:44
virtual void StartMonitorStatus(std::string const &id)=0
Start monitoring DAQ status.
virtual auto ConnectStorageSignal(StorageSignal::slot_type const &slot) -> boost::signals2::connection=0
Connect slot to storage change signal.
virtual auto AbortAsync(std::string const &id) -> boost::future< State >=0
Abort merging.
virtual auto ScheduleAsync(std::string const &spec) -> boost::future< State >=0
Schedule merging.
boost::signals2::signal< void(std::filesystem::space_info)> StorageSignal
Definition: dpmClient.hpp:43
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
std::chrono::seconds timeout
Definition: dpmClient.hpp:95
std::string rr_uri
Definition: dpmClient.hpp:93
std::string ps_uri
Definition: dpmClient.hpp:94
std::chrono::seconds status_retry_interval
Definition: dpmClient.hpp:96
Connection parameters for DPM.
Definition: dpmClient.hpp:92
Config class header file.
Contains declaration for Status and ObservableStatus.
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:153