ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
awaitPrim.cpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup daq_op
4 * @copyright 2022 ESO - European Southern Observatory
5 *
6 * @brief Contains definition for the AwaitPrimAsync operation
7 */
9
10#include <cstddef>
11
12#include <fmt/format.h>
13#include <fmt/ostream.h>
14#include <log4cplus/loggingmacros.h>
15
16#include <daq/error/report.hpp>
17#include <daq/op/util.hpp>
18#include <daq/state.hpp>
19
20namespace {
21/**
22 * Process all replies together to decide if more awaits are necessary.
23 *
24 * @note Errors from sources are logged, but otherwise ignored.
25 *
26 * @returns true if all awaited-on sources have completed.
27 * @returns false if one or more sources are not completed.
28 */
29bool AwaitUnwrapReplies(boost::future<std::vector<boost::future<bool>>>&& futures) {
30 std::vector<boost::future<bool>> values = futures.get();
31 if (values.empty()) {
32 // No requests were sent (because they were all completed)
33 return true;
34 }
35 std::vector<std::exception_ptr> exceptions;
36 std::size_t num_ok = 0;
37 std::size_t num_exceptions = 0;
38 for (auto& f : values) {
39 try {
40 if (f.get()) {
41 num_ok++;
42 }
43 } catch (std::exception const& e) {
44 num_exceptions++;
45 LOG4CPLUS_INFO(
46 "daq",
47 fmt::format("daq::op::AwaitUnwrapReplies: Source replied with exception: {}",
48 e.what()));
49 }
50 }
51 LOG4CPLUS_DEBUG(
52 "daq",
53 fmt::format("daq::op::AwaitUnwrapReplies: {}/{} replies contained exceptions. {}/{} "
54 "replies report success",
55 num_exceptions,
56 values.size(),
57 num_ok,
58 values.size()));
59
60 return num_ok == values.size();
61}
62
63class RecWaitSpec : public recif::RecWaitSpec {
64public:
65 RecWaitSpec(float timeout) : m_timeout{timeout}, m_info{} {
66 }
67 std::string getInfo() const override {
68 return m_info;
69 }
70 void setInfo(const std::string& info) override {
71 m_info = info;
72 }
73
74 float getTimeout() const override {
75 return m_timeout;
76 }
77 void setTimeout(float timeout) override {
78 m_timeout = timeout;
79 }
80 bool hasKey() const override {
81 return false;
82 }
83 std::unique_ptr<recif::RecWaitSpec> cloneKey() const override {
84 throw std::runtime_error("not clonable");
85 }
86 std::unique_ptr<recif::RecWaitSpec> clone() const override {
87 throw std::runtime_error("not clonable");
88 }
89 bool keyEquals(const recif::RecWaitSpec& other) const override {
90 return false;
91 }
92
93private:
94 float m_timeout;
95 std::string m_info;
96};
97} // namespace
98
99namespace daq::op {
100
102 : m_params(m_params), m_error(false), m_parts(), m_abort_requested(false) {
103}
104
105[[nodiscard]] boost::future<Result<DpParts>> AwaitPrimAsync::Initiate() {
106 LOG4CPLUS_DEBUG(m_params.common.logger,
107 fmt::format("AwaitPrimAsync::Initiate: Operation initiating"));
108 InitiateAwait();
109 return m_promise.get_future();
110}
111
112void AwaitPrimAsync::InitiateAwait() {
113 // If the last await was processed before the configured interval expired wait until that time
114 // has elapsed.
115 MakeInterval()
116 .then(m_params.common.executor, [this](auto) { return AwaitOnceAsync(); })
117 .unwrap()
118 .then(m_params.common.executor, [this](boost::future<bool> fut) -> void {
119 if (m_abort_requested) {
120 LOG4CPLUS_DEBUG(
121 m_params.common.logger,
122 "AwaitPrimAsync::InitiateAwait: Operation aborted -> setting promise");
123 try {
124 m_promise.set_value({m_error, std::move(m_parts)});
125 } catch (boost::promise_already_satisfied const&) {
126 LOG4CPLUS_DEBUG(m_params.common.logger,
127 "AwaitPrimAsync::InitiateAwait: Promise already satisfied");
128 }
129 return;
130 }
131 // If condition is fulfilled, set reply value or exception.
132 try {
133 // This
134 auto is_ok = fut.get();
135 if (is_ok) {
136 // All done, set promise
137 m_promise.set_value({m_error, std::move(m_parts)});
138 } else {
139 // If condition is not fulfilled we do it again.
140 InitiateAwait();
141 }
142 } catch (boost::promise_already_satisfied const&) {
143 LOG4CPLUS_DEBUG(m_params.common.logger,
144 "AwaitPrimAsync::InitiateAwait: Promise already satisfied");
145 } catch (...) {
146 // Fatal error. Report to operation invoker.
147 m_promise.set_exception(boost::current_exception());
148 }
149 });
150}
151
152boost::future<void> AwaitPrimAsync::MakeInterval() {
153 using std::chrono::duration_cast;
154 using std::chrono::milliseconds;
155 using std::chrono::steady_clock;
156
157 if (!m_last_start) {
158 // First time -> return ready future
159 return boost::make_ready_future();
160 }
161
162 auto now = steady_clock::now();
163 auto next_start = *m_last_start + duration_cast<steady_clock::duration>(m_params.wait_interval);
164 if (now >= next_start) {
165 // Interval already expired -> return ready future
166 return boost::make_ready_future();
167 }
168 LOG4CPLUS_DEBUG(m_params.common.logger,
169 fmt::format("AwaitPrimAsync::MakeInterval: Waiting {}ms until sending RecWait",
170 duration_cast<milliseconds>(next_start - now).count()));
171 // Wait until
172 m_interval.reset();
173 m_interval.emplace(m_params.common.executor.get_io_context(), next_start);
174 m_interval->timer.async_wait([this](boost::system::error_code const& ec) {
175 // We don't care if we're cancelled, we have to set promise anyway
176 // to avoid broken promise
177 m_interval->promise.set_value();
178 });
179 return m_interval->promise.get_future();
180}
181
182void AwaitPrimAsync::Abort() noexcept {
183 if (m_abort_requested) {
184 // Already requested
185 return;
186 }
187 LOG4CPLUS_INFO(m_params.common.logger,
188 fmt::format("AwaitPrimAsync::Abort: Requested to abort! "
189 "Number of files received so far: {}",
190 m_parts.size()));
191 m_abort_requested = true;
192 if (m_interval) {
193 m_interval->timer.cancel();
194 }
195}
196
197boost::future<bool> AwaitPrimAsync::AwaitOnceAsync() {
198 using Reply = std::shared_ptr<recif::RecWaitStatus>;
199 using Seconds = std::chrono::duration<float>;
200 using std::chrono::duration_cast;
201 if (m_abort_requested) {
202 LOG4CPLUS_DEBUG(
203 m_params.common.logger,
204 fmt::format("AwaitPrimAsync::AwaitOnceAsync: Operation requested to abort"));
205 return boost::make_ready_future<bool>(true);
206 }
207 LOG4CPLUS_DEBUG(m_params.common.logger,
208 fmt::format("AwaitPrimAsync::AwaitOnceAsync: Sending requests..."));
209
210 m_last_start = std::chrono::steady_clock::now();
211
212 return SendRequestAndCollectReplies<bool>(
213 m_params.common.prim_sources.begin(),
214 m_params.common.prim_sources.end(),
215 [](Source<PrimSource> const& source) -> bool {
216 /* only send to sources that are not already stopped */
217 return IsSubsequentState(State::Stopped, source.GetState());
218 },
219 m_params.common,
220 // Sender
221 [this](Source<PrimSource>& s) -> boost::future<Reply> {
222 auto& client = s.GetSource().GetRrClient();
223 auto seconds = duration_cast<Seconds>(this->m_params.wait_interval).count();
224#if defined(UNIT_TEST)
225 // ECII-783: MAL suddenly requires that interface *must* be implemented by
226 // matching middleware otherwise it fails.
227 // To avoid complicated mocking we still use our local implementation for unit
228 // testing.
229 auto spec = std::make_shared<RecWaitSpec>(seconds);
230#else
231 auto mal = client.getMal();
232 BOOST_ASSERT_MSG(mal, "MAL RR client returned invalid MAL pointer");
233 auto spec = mal->createDataEntity<recif::RecWaitSpec>();
234 // Initialize all members by assigning from local implementation
235 *spec = RecWaitSpec(seconds);
236#endif
237 return client.RecWait(spec);
238 },
239 // reply handler
240 [this](AsyncOpParams params, Source<PrimSource>& source, boost::future<Reply>&& fut)
241 -> bool { return HandleRecWaitReply(source, std::move(fut)); },
242 std::string_view("AwaitPrimAsync: await primary data acquisition"))
243 .then(m_params.common.executor, AwaitUnwrapReplies);
244}
245
246bool AwaitPrimAsync::HandleRecWaitReply(
247 Source<PrimSource>& source, boost::future<std::shared_ptr<recif::RecWaitStatus>>&& fut) {
248 // Note: Alerts are set and cleared immedately rather than deferring to
249 // a later point as the await operation can take a very long time.
250 auto alert_id =
251 MakeAlertId(alert::REQUEST, std::string("RecWait") + source.GetSource().GetName());
252 try {
253 auto reply = fut.get();
254 // Request did not throw -> clear any alert
255 m_params.common.status.ClearAlert(alert_id);
256
257 auto status = reply->getStatus();
258 if (status == recif::Success) {
259 // Await returned
260 source.SetState(State::Stopped);
261
262 auto rec_status = reply->getRecStat();
263 LOG4CPLUS_INFO(m_params.common.logger,
264 fmt::format("Data source '{}' replied successfully and provides {} "
265 "number of files",
266 source.GetSource(),
267 rec_status->getDpFiles().size()));
268 if (rec_status->getDpFiles().empty()) {
269 LOG4CPLUS_WARN(m_params.common.logger,
270 fmt::format("Data source '{}' replied successfully for "
271 "RecWait but did not produce any files!",
272 source.GetSource()));
273 }
274 for (auto const& file : rec_status->getDpFiles()) {
275 m_parts.emplace_back(std::string(source.GetSource().GetName()), file);
276 }
277 // All ok, return true
278 return true;
279 } else {
280 // Wait timed out, return false to resend
281 return false;
282 }
283 } catch (recif::ExceptionErr const& e) {
284 m_params.common.status.SetAlert(
285 MakeAlert(alert_id,
286 fmt::format("Primary source '{}' request 'RecWaitStatus' replied "
287 "with ICD error: ({}) {}",
288 source.GetSource().GetName(),
289 e.getCode(),
290 e.getDesc())));
291 m_error = true;
292 throw boost::enable_current_exception(
293 DaqSourceError("RecWait", std::string(source.GetSource().GetName()), e.what()));
294 } catch (...) {
295 auto what = error::FormatException(std::current_exception());
296 m_params.common.status.SetAlert(
297 MakeAlert(alert_id,
298 fmt::format("Primary source '{}' request 'RecWaitStatus' replied "
299 "with non-ICD error: {}",
300 source.GetSource().GetName(),
301 what)));
302 m_error = true;
303 throw boost::enable_current_exception(DaqSourceError(
304 "RecWait", std::string(source.GetSource().GetName()), "unknown exception"));
305 }
306}
307} // namespace daq::op
Contains declaration for the AwaitPrimAsync operation.
Declares daq::State and related functions.
constexpr std::string_view REQUEST
Request.
Definition: status.hpp:32
void FormatException(std::ostream &os, std::exception_ptr ptr)
Report without nesting.
Definition: report.cpp:79
AlertId MakeAlertId(std::string_view category, std::string key)
Definition: status.cpp:55
@ Stopped
All data sources have reported they have stopped acquiring data.
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
Definition: status.cpp:45
Simple class that holds the source and associated state.
Definition: source.hpp:30
rad::IoExecutor & executor
log4cplus::Logger & logger
Await specific parameters that is not provided with AsyncOpParams.
boost::future< Result< DpParts > > Initiate()
Initiates operation that await acquisition completion.
Definition: awaitPrim.cpp:105
AwaitPrimAsync(AwaitOpParams params) noexcept
Constructs operation with the privided parameters.
Definition: awaitPrim.cpp:101
Contains declaration for the async op utilities.