ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
dpmClient.hpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup daq_ocm_libdaq_test
4 * @copyright 2022 ESO - European Southern Observatory
5 *
6 * @brief `daq::DpmClient`
7 */
8#ifndef DAQ_DPMCLIENT_HPP
9#define DAQ_DPMCLIENT_HPP
10#include "config.hpp"
11
12#include <filesystem>
13
14#include <boost/asio/io_context.hpp>
15#include <boost/asio/steady_timer.hpp>
16#include <boost/signals2/signal.hpp>
17#include <boost/thread/future.hpp>
18#include <log4cplus/logger.h>
19#include <rad/ioExecutor.hpp>
20
21#include <daq/resourceToken.hpp>
22#include <daq/status.hpp>
23
24namespace daqif {
25class DpmDaqControlAsync;
26}
27
28namespace elt::mal {
29class Mal;
30}
31namespace mal = ::elt::mal;
32
33namespace daq {
34
35/**
36 * Interface to DPM server.
37 *
38 * Any implementation is expected to also provide a subscriber for DPM status and storage change
39 * signals.
40 */
41class DpmClient {
42public:
43 using StorageSignal = boost::signals2::signal<void(std::filesystem::space_info)>;
44 using StatusSignal = boost::signals2::signal<void(Status)>;
45
46 /**
47 * Schedule merging.
48 */
49 virtual auto ScheduleAsync(std::string const& spec, std::optional<std::string> const& status)
50 -> boost::future<State> = 0;
51
52 /**
53 * Abort merging.
54 */
55 virtual auto AbortAsync(std::string const& id) -> boost::future<State> = 0;
56
57 /**
58 * Start monitoring DAQ status.
59 *
60 * Status is requested:
61 * - Initially.
62 * - Periodically.
63 *
64 * Changes are reported via the StatusSignal.
65 */
66 virtual void StartMonitorStatus(std::string const& id) = 0;
67
68 /**
69 * Stop monitoring DAQ status.
70 */
71 virtual void StopMonitorStatus(std::string const& id) = 0;
72
73 /**
74 * Connect slot to status change signal.
75 *
76 * DpmClient ensures that signals are only emitted from main thread.
77 */
78 virtual auto
79 ConnectStatusSignal(StatusSignal::slot_type const& slot) -> boost::signals2::connection = 0;
80
81 /**
82 * Connect slot to storage change signal.
83 *
84 * DpmClient ensures that signals are only emitted from main thread.
85 */
86 virtual auto
87 ConnectStorageSignal(StorageSignal::slot_type const& slot) -> boost::signals2::connection = 0;
88};
89
90/**
91 * Connection parameters for DPM
92 */
94 std::string rr_uri;
95 std::string ps_uri;
96 std::chrono::seconds timeout = std::chrono::seconds(5);
97 std::chrono::seconds status_retry_interval = std::chrono::seconds(15);
98};
99
100class DpmClientImpl : public DpmClient {
101public:
102 /**
103 * Retry delay when requesting status.
104 */
107
108 DpmClientImpl(boost::asio::io_context& io_ctx, mal::Mal& mal, DpmClientParams params);
109
111
112 /**
113 * Schedule merging.
114 */
115 virtual auto ScheduleAsync(std::string const& spec, std::optional<std::string> const& status)
116 -> boost::future<State> override;
117
118 /**
119 * Abort merging.
120 */
121 virtual auto AbortAsync(std::string const& id) -> boost::future<State> override;
122
123 /**
124 * Start monitoring DAQ status.
125 *
126 * Status is requested:
127 * - Initially
128 * - and rescheduled if reply times out (DPM not running)
129 *
130 * Changes are reported via the StatusSignal.
131 */
132 virtual void StartMonitorStatus(std::string const& id) override;
133
134 /**
135 * Stop monitoring DAQ status.
136 */
137 virtual void StopMonitorStatus(std::string const& id) override;
138
139 /**
140 * Connect slot to status change signal.
141 *
142 * DpmClient ensures that signals are only emitted from main thread.
143 *
144 * @note Status may come from different sources and no total order is guaranteed. Observers of
145 * status changes should make effort to discard/ignore changes that occurred before already
146 * recorded status updated.
147 */
148 virtual auto ConnectStatusSignal(StatusSignal::slot_type const& slot)
149 -> boost::signals2::connection override;
150
151 /**
152 * Connect slot to storage change signal.
153 *
154 * DpmClient ensures that signals are only emitted from main thread.
155 */
156 virtual auto ConnectStorageSignal(StorageSignal::slot_type const& slot)
157 -> boost::signals2::connection override;
158
159private:
160 void RequestStatus(std::string const& id);
161 void ScheduleRequestStatus(std::string const& id);
162 void UpdateStatus(Status const& status);
163
164 struct Subscriptions;
165
166 boost::asio::io_context& m_io_ctx;
167 rad::IoExecutor m_executor;
168
169 mal::Mal& m_mal;
170 DpmClientParams m_params;
171 log4cplus::Logger m_logger;
172
173 StatusSignal m_status_signal;
174 StorageSignal m_storage_signal;
175
176 std::shared_ptr<daqif::DpmDaqControlAsync> m_dpmif;
177 std::unique_ptr<Subscriptions> m_subs;
178
179 std::unordered_map<std::string, boost::asio::steady_timer> m_timers;
180
181 std::shared_ptr<bool> m_liveness = std::make_shared<bool>(true);
182 /**
183 * Limit number of concurrent status requests.
184 */
185 Resource m_res_status = Resource(5);
186};
187
188} // namespace daq
189#endif
DpmClient::StorageSignal StorageSignal
Retry delay when requesting status.
Definition: dpmClient.hpp:105
virtual void StopMonitorStatus(std::string const &id) override
Stop monitoring DAQ status.
Definition: dpmClient.cpp:197
virtual auto ScheduleAsync(std::string const &spec, std::optional< std::string > const &status) -> boost::future< State > override
Schedule merging.
Definition: dpmClient.cpp:97
virtual auto ConnectStorageSignal(StorageSignal::slot_type const &slot) -> boost::signals2::connection override
Connect slot to storage change signal.
Definition: dpmClient.cpp:206
virtual auto ConnectStatusSignal(StatusSignal::slot_type const &slot) -> boost::signals2::connection override
Connect slot to status change signal.
Definition: dpmClient.cpp:201
virtual void StartMonitorStatus(std::string const &id) override
Start monitoring DAQ status.
Definition: dpmClient.cpp:117
DpmClient::StatusSignal StatusSignal
Definition: dpmClient.hpp:106
virtual auto AbortAsync(std::string const &id) -> boost::future< State > override
Abort merging.
Definition: dpmClient.cpp:108
Interface to DPM server.
Definition: dpmClient.hpp:41
virtual auto ConnectStatusSignal(StatusSignal::slot_type const &slot) -> boost::signals2::connection=0
Connect slot to status change signal.
virtual void StopMonitorStatus(std::string const &id)=0
Stop monitoring DAQ status.
virtual auto ScheduleAsync(std::string const &spec, std::optional< std::string > const &status) -> boost::future< State >=0
Schedule merging.
boost::signals2::signal< void(Status)> StatusSignal
Definition: dpmClient.hpp:44
virtual void StartMonitorStatus(std::string const &id)=0
Start monitoring DAQ status.
virtual auto ConnectStorageSignal(StorageSignal::slot_type const &slot) -> boost::signals2::connection=0
Connect slot to storage change signal.
virtual auto AbortAsync(std::string const &id) -> boost::future< State >=0
Abort merging.
boost::signals2::signal< void(std::filesystem::space_info)> StorageSignal
Definition: dpmClient.hpp:43
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
std::chrono::seconds timeout
Definition: dpmClient.hpp:96
std::string rr_uri
Definition: dpmClient.hpp:94
std::string ps_uri
Definition: dpmClient.hpp:95
std::chrono::seconds status_retry_interval
Definition: dpmClient.hpp:97
Connection parameters for DPM.
Definition: dpmClient.hpp:93
Config class header file.
Contains declaration for Status and ObservableStatus.
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:164