ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
util.hpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup daq_common_libdaq
4 * @copyright 2022 ESO - European Southern Observatory
5 *
6 * @brief Contains declaration for the async op utilities
7 */
8#ifndef OCF_DAQ_OP_UTIL_HPP_
9#define OCF_DAQ_OP_UTIL_HPP_
10#include "../config.hpp"
11
12#include <algorithm>
13#include <functional>
14#include <optional>
15#include <vector>
16
17#include <Metadaqif.hpp>
18#include <boost/thread/future.hpp>
19#include <fmt/format.h>
20#include <fmt/ostream.h>
21#include <log4cplus/logger.h>
22#include <log4cplus/loggingmacros.h>
23
24#include "../error.hpp"
25#include "../state.hpp"
26#include "asyncOpParams.hpp"
27
28namespace daq::op {
29namespace eventlog {
30template <class SourceType>
31void Reply(AsyncOpParams& params, SourceType const& source, std::string description, bool error) {
33 params.id,
34 fmt::format(!error ? "{}: Got reply from '{}'" : "{}: Got error or timeout from '{}'",
35 description,
36 source),
37 params.status.GetStatus()));
38}
39template <class SourceType>
40void Request(AsyncOpParams& params, SourceType const& source, std::string description) {
41 params.event_log.AddEvent(
42 GenericEvent(params.id,
43 fmt::format("{}: Sending request to'{}'", description, source),
44 params.status.GetStatus()));
45}
46
47template <class F>
48size_t CountExceptions(std::vector<F> const& futures) noexcept {
49 return std::count_if(futures.begin(), futures.end(), [](F const& future) noexcept -> bool {
50 return future.has_exception();
51 });
52}
53} // namespace eventlog
54
55/**
56 * Utility function to Send requests and collect replies.
57 *
58 * @todo: Need to generalize based on requestor reply type.
59 *
60 * @param begin Beginning of sequence of sources to send request.
61 * @param end End of sequence of sources to send request.
62 * @param filter_pred Unary predicate in the form `(Source const&) -> bool` returning true for those
63 * source a request should be sent.
64 * @param params Parameters for this async operation.
65 * @param sender Functor that sends request and returns future.
66 * @param reply_handler Handler invoked once for each reply received. The handler may transform the
67 * reply and return a value containing a different type than the input. It should not return a
68 * future.
69 * @param replies_handler Handler invoked once when all replies are received. It will be passed the
70 * vector of replies returned from reply_handler. To communicate failure it shall throw an
71 * exception.
72 *
73 * @return Future of R, as returned by replies handler
74 *
75 * Type requirement:
76 *
77 * with `T` being the requestor reply type.
78 *
79 * `Sender`: boost::future<T> (Source<S>&)
80 * `ReplyHandler`: R (AsyncOpParams, Source<S>, boost::future<T>)
81 *
82 * @ingroup daq_common_libdaq
83 */
84template <class R, class Iterator, class Pred, class Sender, class ReplyHandler>
85boost::future<std::vector<boost::future<R>>>
86SendRequestAndCollectReplies(Iterator begin,
87 Iterator end,
88 Pred filter_pred,
89 AsyncOpParams params,
90 Sender sender,
91 ReplyHandler reply_handler,
92 std::string_view logging_description);
93
94template <class T>
95std::vector<std::exception_ptr> ExtractExceptions(std::vector<boost::future<T>>& futures) {
96 std::vector<std::exception_ptr> exceptions;
97 for (auto& f : futures) {
98 try {
99 f.get();
100 } catch (std::exception const& e) {
101 LOG4CPLUS_DEBUG("TEST", "E:" << e.what());
102 exceptions.push_back(std::current_exception());
103 }
104 }
105 return exceptions;
106}
107
108/**
109 * Unwrap futures to extract errors.
110 *
111 * @throws DaqSourceErrors containing all exceptions.
112 *
113 * @ingroup daq_common_libdaq
114 */
115void UnwrapVoidReplies(boost::future<std::vector<boost::future<void>>> futures);
116
117/**
118 * Unwrap replies
119 *
120 * @ingroup daq_common_libdaq
121 */
122template <class R>
123std::vector<R> UnwrapReplies(boost::future<std::vector<boost::future<R>>>&& futures) {
124 // @todo: Implement
125 return {};
126}
127
128/**
129 * Reply handler that checks for exceptions in reply
130 *
131 * @throws Exception contained in reply.
132 *
133 * @return fut
134 *
135 * @param expected_state The state the source is expected to be in when receving the reply.
136 * @param success_state The state the source is set to on success.
137 * @param daq Data acquisition the operation belongs to.
138 * @param source The source to handle reply from.
139 * @param fut Reply.
140 * @param func Callback invoked with reply which if it throws exception will generate alert.
141 *
142 * @ingroup daq_common_libdaq
143 */
144template <class ReplyType>
145std::optional<ReplyType> HandleMetaDaqReply(char const* request,
146 std::optional<State> expected_state,
147 State success_state,
148 std::optional<State> error_state,
149 AsyncOpParams params,
150 Source<MetaSource>& source,
151 boost::future<ReplyType>&& fut,
152 std::function<void(ReplyType const&)> func = {});
153
154/**
155 * Reply handler that checks for exceptions in reply
156 *
157 * @throws Exception contained in reply.
158 *
159 * @return fut contained value or std::nullopt of expected state is already satisifed
160 *
161 * @param expected_state The state the source is expected to be in when receving the reply.
162 * @param success_state The state the source is set to on success.
163 * @param daq Data acquisition the operation belongs to.
164 * @param source The source to handle reply from.
165 * @param fut Reply.
166 *
167 * @ingroup daq_common_libdaq
168 */
169template <class ReplyType>
170std::optional<ReplyType> HandlePrimDaqReply(char const* request,
171 std::optional<State> expected_state,
172 State success_state,
173 std::optional<State> error_state,
174 AsyncOpParams params,
175 Source<PrimSource>& source,
176 boost::future<ReplyType>&& fut);
177
178// Implementation
179template <class R, class Iterator, class Pred, class Sender, class ReplyHandler>
180boost::future<std::vector<boost::future<R>>>
182 Iterator end,
183 Pred filter_pred,
184 AsyncOpParams params,
185 Sender sender,
186 ReplyHandler reply_handler,
187 std::string_view logging_description) {
188 LOG4CPLUS_INFO(params.logger,
189 fmt::format("{}: {}: Sending requests.", params.status, logging_description));
190 using Sourceype = typename Iterator::value_type;
191
192 std::vector<boost::future<R>> replies;
193 bool stop = false;
194
195 // Function that sends request and returns a processed (by reply_handler) reply
196 // This transforms the type from the raw MAL future to whatever reply_handler returns.
197 auto send_func = [&, sender = std::move(sender), descr = std::string(logging_description)](
198 Sourceype& source) -> boost::future<R> {
199 try {
200 // continuation
201 auto cont =
202 [=,
203 // The source vector should be considered immutable so it's ok to
204 // take ref.
205 & source = source,
206 reply_token = params.pending_replies.Acquire(
207 std::string(source.GetSource().GetName()), std::string(logging_description)),
208 reply_handler = std::move(reply_handler)](auto fut) mutable -> R {
209 eventlog::Reply(params, source, descr, fut.has_exception());
210 // Got reply, release reply_token.
211 reply_token.Release();
212 return std::invoke(reply_handler, params, source, std::move(fut));
213 };
214
215 eventlog::Request(params, source, descr);
216
217 // Send request
218 auto fut =
219 std::invoke(std::move(sender), source).then(params.executor, std::move(cont));
220 return fut;
221 } catch (std::exception const& e) {
222 // @todo: Do I need the error flag?
223 source.SetErrorFlag();
224 stop = true; // just an optimiazation to not Send futher requests
225
226 eventlog::Request(params, source, descr);
227
228 LOG4CPLUS_ERROR(params.logger,
229 fmt::format("{}: {}: "
230 "Error sending request to source '{}': {}",
231 params.status,
232 descr,
233 source,
234 e.what()));
235 return boost::make_exceptional_future<R>();
236 }
237 };
238
239 // Send
240 for (auto it = begin; it != end; ++it) {
241 if (stop) {
242 break;
243 }
244 if (!filter_pred(*it)) {
245 continue;
246 }
247
248 replies.emplace_back(send_func(*it));
249 }
250
251 LOG4CPLUS_INFO(
252 params.logger,
253 fmt::format(
254 "{}: {}: Awaiting {} replies.", params.status, logging_description, replies.size()));
255
256 // Join replies
257 // note: when_all returns boost::future<vector<boost::future<R>>>
258 return boost::when_all(replies.begin(), replies.end())
259 .then(params.executor,
260 [params, descr = std::string(logging_description)](
261 boost::future<std::vector<boost::future<R>>>&& replies_fut)
262 -> std::vector<boost::future<R>> {
263 auto replies = replies_fut.get(); // should never throw
264 LOG4CPLUS_INFO(
265 params.logger,
266 fmt::format("{}: {}: All replies received or timed out (num errors {}/{})",
267 params.status,
268 descr,
269 eventlog::CountExceptions(replies),
270 replies.size()));
271 return replies;
272 });
273}
274
275} // namespace daq::op
276#endif // #ifndef OCF_DAQ_OP_UTIL_HPP_
void AddEvent(EventLog::EventType event)
Records that a file has been produced for this data acquisition.
Definition: eventLog.cpp:56
Status const & GetStatus() const noexcept
Connect observer that is invoked when state is modified.
Definition: status.cpp:371
ReplyToken Acquire(std::string source_id, std::string request)
Acquire token.
void Reply(AsyncOpParams &params, SourceType const &source, std::string description, bool error)
Definition: util.hpp:31
size_t CountExceptions(std::vector< F > const &futures) noexcept
Definition: util.hpp:48
void Request(AsyncOpParams &params, SourceType const &source, std::string description)
Definition: util.hpp:40
std::vector< R > UnwrapReplies(boost::future< std::vector< boost::future< R > > > &&futures)
Unwrap replies.
Definition: util.hpp:123
std::optional< ReplyType > HandlePrimDaqReply(char const *request, std::optional< State > expected_state, State success_state, std::optional< State > error_state, AsyncOpParams params, Source< PrimSource > &source, boost::future< ReplyType > &&fut)
Reply handler that checks for exceptions in reply.
Definition: util.cpp:165
std::vector< std::exception_ptr > ExtractExceptions(std::vector< boost::future< T > > &futures)
Definition: util.hpp:95
boost::future< std::vector< boost::future< R > > > SendRequestAndCollectReplies(Iterator begin, Iterator end, Pred filter_pred, AsyncOpParams params, Sender sender, ReplyHandler reply_handler, std::string_view logging_description)
Utility function to Send requests and collect replies.
Definition: util.hpp:181
void UnwrapVoidReplies(boost::future< std::vector< boost::future< void > > > futures)
Unwrap futures to extract errors.
Definition: util.cpp:58
std::optional< ReplyType > HandleMetaDaqReply(char const *request, std::optional< State > expected_state, State success_state, std::optional< State > error_state, AsyncOpParams params, Source< MetaSource > &source, boost::future< ReplyType > &&fut, std::function< void(ReplyType const &)> func={})
Reply handler that checks for exceptions in reply.
Definition: util.cpp:70
State
Observable states of the data acquisition process.
Definition: state.hpp:41
Definition: main.cpp:24
Represents a generic event if a more specific event is not usable.
Definition: eventLog.hpp:29
Simple class that holds the source and associated state.
Definition: source.hpp:30
void SetErrorFlag()
Definition: source.hpp:66
T & GetSource()
Definition: source.hpp:78
Parameters required for each async operation.
std::string const & id
ObservableEventLog & event_log
PendingReplies & pending_replies
rad::IoExecutor & executor
ObservableStatus & status
Async operations should not modify state directly DaqController does that.
log4cplus::Logger & logger