ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
util.cpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup daq_op
4 * @copyright 2022 ESO - European Southern Observatory
5 *
6 * @brief Contains definition for the async op utilities
7 */
8#include <daq/op/util.hpp>
9
10#include <daq/error/report.hpp>
11
12namespace daq::op {
13
14using metadaqif::DaqReply;
15using metadaqif::DaqStopReply;
16using recif::RecStatus;
17
18template std::optional<std::shared_ptr<DaqReply>> HandleMetaDaqReply<std::shared_ptr<DaqReply>>(
19 char const* request,
20 std::optional<State> expected_state,
21 State success_state,
22 std::optional<State> error_state,
23 AsyncOpParams params,
24 Source<MetaSource>& source,
25 boost::future<std::shared_ptr<DaqReply>>&& fut,
26 std::function<void(std::shared_ptr<DaqReply> const&)>);
27
28template std::optional<std::shared_ptr<DaqStopReply>>
29HandleMetaDaqReply<std::shared_ptr<DaqStopReply>>(
30 char const* request,
31 std::optional<State> expected_state,
32 State success_state,
33 std::optional<State> error_state,
34 AsyncOpParams params,
35 Source<MetaSource>& source,
36 boost::future<std::shared_ptr<DaqStopReply>>&& fut,
37 std::function<void(std::shared_ptr<DaqStopReply> const&)>);
38
39// RecAbort return std::string
40template std::optional<std::string>
41HandlePrimDaqReply<std::string>(char const* request,
42 std::optional<State> expected_state,
43 State success_state,
44 std::optional<State> error_state,
45 AsyncOpParams params,
46 Source<PrimSource>& source,
47 boost::future<std::string>&& fut);
48
49template std::optional<std::shared_ptr<RecStatus>>
50HandlePrimDaqReply<std::shared_ptr<RecStatus>>(char const* request,
51 std::optional<State> expected_state,
52 State success_state,
53 std::optional<State> error_state,
54 AsyncOpParams params,
55 Source<PrimSource>& source,
56 boost::future<std::shared_ptr<RecStatus>>&& fut);
57
58void UnwrapVoidReplies(boost::future<std::vector<boost::future<void>>> futures) {
59 LOG4CPLUS_DEBUG("daq", "UnwrapVoidReplies: Entered.");
60 std::vector<boost::future<void>> values = futures.get();
61 auto exceptions = ExtractExceptions<void>(values);
62 if (!exceptions.empty()) {
63 LOG4CPLUS_DEBUG("daq", "UnwrapVoidReplies: Throwing gathered exceptions.");
64 throw boost::enable_current_exception(DaqSourceErrors(std::move(exceptions)));
65 }
66 LOG4CPLUS_DEBUG("daq", "UnwrapVoidReplies: Done.");
67}
68
69template <class ReplyType>
70std::optional<ReplyType> HandleMetaDaqReply(char const* request,
71 std::optional<State> expected_state,
72 State success_state,
73 std::optional<State> error_state,
74 AsyncOpParams params,
75 Source<MetaSource>& source,
76 boost::future<ReplyType>&& fut,
77 std::function<void(ReplyType const&)> func) {
78 if (source.GetState() == success_state) {
79 // Condition already satisfied.
80 LOG4CPLUS_INFO(params.logger,
81 fmt::format("{}: State of source '{}' is already satisfied. "
82 "Will ignore reply.",
83 params.status,
84 source));
85 return std::nullopt;
86 }
87 if (expected_state) {
88 if (source.GetState() != *expected_state) {
89 LOG4CPLUS_WARN(params.logger,
90 fmt::format("{}: State of source '{}' is in an unexpected state. "
91 "Expected '{}'. Proceeding normally anyway.",
92 params.status,
93 source,
94 *expected_state));
95 }
96 }
97
98 auto set_error_state = [&] {
99 if (error_state) {
100 source.SetState(*error_state, true);
101 } else {
102 source.SetErrorFlag();
103 }
104 };
105
106 auto alert_id =
107 MakeAlertId(alert::REQUEST, std::string(request) + source.GetSource().GetName());
108
109 try {
110 ReplyType r = fut.get();
111 if (func) {
112 func(r);
113 }
114 // Success
115 source.SetState(success_state, false);
116 params.alerts.Clear(alert_id);
117 return r;
118 } catch (metadaqif::DaqException const& e) {
119 auto msg = fmt::format(
120 "{}: Metadata source '{}' replied with ICD exception: "
121 "id='{}'"
122 "message='{}'",
123 params.status,
124 source,
125 e.getId(),
126 e.getMessage());
127 LOG4CPLUS_WARN(params.logger, msg);
128 params.alerts.Set(MakeAlert(alert_id, msg));
129 set_error_state();
130 // source reported error
131 throw boost::enable_current_exception(
132 DaqSourceError(request, std::string(source.GetSource().GetName()), e.getMessage()));
133 } catch (std::exception const& e) {
134 auto msg = fmt::format(
135 "{}: Metadata source '{}' reply contains error for id='{}': {}",
136 params.status,
137 params.id,
138 source,
139 e.what());
140 LOG4CPLUS_WARN(params.logger, msg);
141 params.alerts.Set(MakeAlert(alert_id, msg));
142 set_error_state();
143 // source reported error
144 throw boost::enable_current_exception(
145 DaqSourceError(request, std::string(source.GetSource().GetName()), e.what()));
146 } catch (...) {
147 auto what = error::FormatException(std::current_exception());
148 LOG4CPLUS_WARN(
149 params.logger,
150 fmt::format(
151 "{}: Metadata source '{}' reply contains error:\n{}", params.status, source, what));
152 params.alerts.Set(MakeAlert(alert_id,
153 fmt::format("Metadata source '{}' request '{}' failed: {}",
154 source.GetSource().GetName(),
155 request,
156 what)));
157 set_error_state();
158 // source reported error
159 throw boost::enable_current_exception(
160 DaqSourceError(request, std::string(source.GetSource().GetName()), what));
161 }
162}
163
164template <class ReplyType>
165std::optional<ReplyType> HandlePrimDaqReply(char const* request,
166 std::optional<State> expected_state,
167 State success_state,
168 std::optional<State> error_state,
169 AsyncOpParams params,
170 Source<PrimSource>& source,
171 boost::future<ReplyType>&& fut) {
172 if (source.GetState() == success_state) {
173 // Condition already satisfied.
174 // Since multiple concurrent requests are possible, it may be that another request completed
175 // the operation.
176 LOG4CPLUS_INFO(params.logger,
177 fmt::format("{}: State of source '{}' is already satisfied. "
178 "Will ignore reply.",
179 params.status,
180 source));
181 return std::nullopt;
182 }
183
184 if (expected_state) {
185 if (source.GetState() != *expected_state) {
186 // @todo It is not likely that we want to proceed normally here. If
187 // the expected state changed from e.g. Stopping to Aborting, it doesn't make sense to
188 // continue. At the same time, if reply is not handled it might mean data is lost.
189 LOG4CPLUS_WARN(params.logger,
190 fmt::format("{}: State of source '{}' is in an unexpected state. "
191 "Expected '{}'. Proceeding normally anyway.",
192 params.status,
193 source,
194 *expected_state));
195 }
196 }
197
198 auto set_error_state = [&] {
199 if (error_state) {
200 source.SetState(*error_state, true);
201 } else {
202 source.SetErrorFlag();
203 }
204 };
205
206 auto alert_id =
207 MakeAlertId(alert::REQUEST, std::string(request) + source.GetSource().GetName());
208
209 try {
210 auto r = fut.get();
211 // Success
212 source.SetState(success_state, false);
213 params.alerts.Clear(alert_id);
214 return r;
215 } catch (recif::ExceptionErr const& e) {
216 LOG4CPLUS_WARN(params.logger,
217 fmt::format("{}: Primdata source '{}' replied with "
218 "id='{}'"
219 "message='{}'",
220 params.status,
221 source,
222 e.getCode(),
223 e.getDesc()));
224 params.alerts.Set(MakeAlert(alert_id,
225 fmt::format("Primary source '{}' request '{}' replied "
226 "with error: ({}) {}",
227 source.GetSource().GetName(),
228 request,
229 e.getCode(),
230 e.getDesc())));
231 set_error_state();
232 // source reported error
233 throw boost::enable_current_exception(
234 DaqSourceError(request, std::string(source.GetSource().GetName()), e.getDesc()));
235 } catch (...) {
236 auto what = error::FormatException(std::current_exception());
237 LOG4CPLUS_WARN(
238 params.logger,
239 fmt::format("{}: Primdata source '{}' failed: {}", params.status, source, what));
240 params.alerts.Set(MakeAlert(alert_id,
241 fmt::format("Primary source '{}' request '{}' failed: {}",
242 source.GetSource().GetName(),
243 request,
244 what)));
245 set_error_state();
246 // source reported error
247 throw boost::enable_current_exception(
248 DaqSourceError(request, std::string(source.GetSource().GetName()), what));
249 }
250}
251
252} // namespace daq::op
Represents error in single source.
Definition: error.hpp:68
Exception thrown to carry reply errors.
Definition: error.hpp:85
constexpr std::string_view REQUEST
Request.
Definition: status.hpp:32
void FormatException(std::ostream &os, std::exception_ptr ptr)
Report without nesting.
Definition: report.cpp:79
std::optional< ReplyType > HandlePrimDaqReply(char const *request, std::optional< State > expected_state, State success_state, std::optional< State > error_state, AsyncOpParams params, Source< PrimSource > &source, boost::future< ReplyType > &&fut)
Reply handler that checks for exceptions in reply.
Definition: util.cpp:165
void UnwrapVoidReplies(boost::future< std::vector< boost::future< void > > > futures)
Unwrap futures to extract errors.
Definition: util.cpp:58
std::optional< ReplyType > HandleMetaDaqReply(char const *request, std::optional< State > expected_state, State success_state, std::optional< State > error_state, AsyncOpParams params, Source< MetaSource > &source, boost::future< ReplyType > &&fut, std::function< void(ReplyType const &)> func={})
Reply handler that checks for exceptions in reply.
Definition: util.cpp:70
template std::optional< std::string > HandlePrimDaqReply< std::string >(char const *request, std::optional< State > expected_state, State success_state, std::optional< State > error_state, AsyncOpParams params, Source< PrimSource > &source, boost::future< std::string > &&fut)
AlertId MakeAlertId(std::string_view category, std::string key)
Definition: status.cpp:55
State
Observable states of the data acquisition process.
Definition: state.hpp:41
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
Definition: status.cpp:45
Simple class that holds the source and associated state.
Definition: source.hpp:30
void SetErrorFlag()
Definition: source.hpp:66
State GetState() const
Definition: source.hpp:74
T & GetSource()
Definition: source.hpp:78
void SetState(State state, std::optional< bool > error_flag={})
Definition: source.hpp:53
void Set(Alert alert)
void Clear(AlertId id)
Parameters required for each async operation.
std::string const & id
AlertState & alerts
Alerts to be merged only after completion of async operation.
ObservableStatus & status
Async operations should not modify state directly DaqController does that.
log4cplus::Logger & logger
Contains declaration for the async op utilities.