ifw-daq  3.0.1
IFW Data Acquisition modules
awaitPrim.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_op
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief Contains definition for the AwaitPrimAsync operation
7  */
8 #include <daq/op/awaitPrim.hpp>
9 
10 #include <fmt/format.h>
11 #include <fmt/ostream.h>
12 #include <log4cplus/loggingmacros.h>
13 
14 #include <daq/error/report.hpp>
15 #include <daq/op/util.hpp>
16 #include <daq/state.hpp>
17 
18 namespace {
19 /**
20  * Process all replies together to decide if more awaits are necessary.
21  *
22  * @note Errors from sources are logged, but otherwise ignored.
23  *
24  * @returns true if all awaited-on sources have completed.
25  * @returns false if one or more sources are not completed.
26  */
27 bool AwaitUnwrapReplies(boost::future<std::vector<boost::future<bool>>>&& futures) {
28  std::vector<boost::future<bool>> values = futures.get();
29  if (values.empty()) {
30  // No requests were sent (because they were all completed)
31  return true;
32  }
33  std::vector<std::exception_ptr> exceptions;
34  size_t num_ok = 0;
35  size_t num_exceptions = 0;
36  for (auto& f : values) {
37  try {
38  if (f.get()) {
39  num_ok++;
40  }
41  } catch (std::exception const& e) {
42  num_exceptions++;
43  LOG4CPLUS_INFO(
44  "daq",
45  fmt::format("daq::op::AwaitUnwrapReplies: Source replied with exception: {}",
46  e.what()));
47  }
48  }
49  LOG4CPLUS_DEBUG(
50  "daq",
51  fmt::format("daq::op::AwaitUnwrapReplies: {}/{} replies contained exceptions. {}/{} "
52  "replies report success",
53  num_exceptions,
54  values.size(),
55  num_ok,
56  values.size()));
57 
58  return num_ok == values.size();
59 }
60 
61 class RecWaitSpec : public recif::RecWaitSpec {
62 public:
63  RecWaitSpec(float timeout) : m_timeout{timeout}, m_info{} {
64  }
65  std::string getInfo() const override {
66  return m_info;
67  }
68  void setInfo(const std::string& info) override {
69  m_info = info;
70  }
71 
72  float getTimeout() const override {
73  return m_timeout;
74  }
75  void setTimeout(float timeout) override {
76  m_timeout = timeout;
77  }
78  bool hasKey() const override {
79  return false;
80  }
81  std::unique_ptr<recif::RecWaitSpec> cloneKey() const override {
82  throw std::runtime_error("not clonable");
83  }
84  std::unique_ptr<recif::RecWaitSpec> clone() const override {
85  throw std::runtime_error("not clonable");
86  }
87  bool keyEquals(const recif::RecWaitSpec& other) const override {
88  return false;
89  }
90 
91 private:
92  float m_timeout;
93  std::string m_info;
94 };
95 } // namespace
96 
97 namespace daq::op {
98 
100  : m_params(m_params), m_error(false), m_parts(), m_abort_requested(false) {
101 }
102 
103 [[nodiscard]] boost::future<Result<DpParts>> AwaitPrimAsync::Initiate() {
104  LOG4CPLUS_DEBUG(m_params.common.logger,
105  fmt::format("AwaitPrimAsync::Initiate: Operation initiating"));
106  InitiateAwait();
107  return m_promise.get_future();
108 }
109 
110 void AwaitPrimAsync::InitiateAwait() {
111  // If the last await was processed before the configured interval expired wait until that time
112  // has elapsed.
113  MakeInterval()
114  .then(m_params.common.executor, [this](auto) { return AwaitOnceAsync(); })
115  .unwrap()
116  .then(m_params.common.executor, [this](boost::future<bool> fut) -> void {
117  if (m_abort_requested) {
118  LOG4CPLUS_DEBUG(
119  m_params.common.logger,
120  "AwaitPrimAsync::InitiateAwait: Operation aborted -> setting promise");
121  try {
122  m_promise.set_value({m_error, std::move(m_parts)});
123  } catch (boost::promise_already_satisfied const&) {
124  LOG4CPLUS_DEBUG(m_params.common.logger,
125  "AwaitPrimAsync::InitiateAwait: Promise already satisfied");
126  }
127  return;
128  }
129  // If condition is fulfilled, set reply value or exception.
130  try {
131  // This
132  auto is_ok = fut.get();
133  if (is_ok) {
134  // All done, set promise
135  m_promise.set_value({m_error, std::move(m_parts)});
136  } else {
137  // If condition is not fulfilled we do it again.
138  InitiateAwait();
139  }
140  } catch (boost::promise_already_satisfied const&) {
141  LOG4CPLUS_DEBUG(m_params.common.logger,
142  "AwaitPrimAsync::InitiateAwait: Promise already satisfied");
143  } catch (...) {
144  // Fatal error. Report to operation invoker.
145  m_promise.set_exception(boost::current_exception());
146  }
147  });
148 }
149 
150 boost::future<void> AwaitPrimAsync::MakeInterval() {
151  using std::chrono::duration_cast;
152  using std::chrono::milliseconds;
153  using std::chrono::steady_clock;
154 
155  if (!m_last_start) {
156  // First time -> return ready future
157  return boost::make_ready_future();
158  }
159 
160  auto now = steady_clock::now();
161  auto next_start = *m_last_start + duration_cast<steady_clock::duration>(m_params.wait_interval);
162  if (now >= next_start) {
163  // Interval already expired -> return ready future
164  return boost::make_ready_future();
165  }
166  LOG4CPLUS_DEBUG(m_params.common.logger,
167  fmt::format("AwaitPrimAsync::MakeInterval: Waiting {}ms until sending RecWait",
168  duration_cast<milliseconds>(next_start - now).count()));
169  // Wait until
170  m_interval.reset();
171  m_interval.emplace(m_params.common.executor.get_io_context(), next_start);
172  m_interval->timer.async_wait([this](boost::system::error_code const& ec) {
173  // We don't care if we're cancelled, we have to set promise anyway
174  // to avoid broken promise
175  m_interval->promise.set_value();
176  });
177  return m_interval->promise.get_future();
178 }
179 
180 void AwaitPrimAsync::Abort() noexcept {
181  if (m_abort_requested) {
182  // Already requested
183  return;
184  }
185  LOG4CPLUS_INFO(m_params.common.logger,
186  fmt::format("AwaitPrimAsync::Abort: Requested to abort! "
187  "Number of files received so far: {}",
188  m_parts.size()));
189  m_abort_requested = true;
190  if (m_interval) {
191  m_interval->timer.cancel();
192  }
193 }
194 
195 boost::future<bool> AwaitPrimAsync::AwaitOnceAsync() {
196  using Reply = std::shared_ptr<recif::RecWaitStatus>;
197  using Seconds = std::chrono::duration<float>;
198  using std::chrono::duration_cast;
199  if (m_abort_requested) {
200  LOG4CPLUS_DEBUG(
201  m_params.common.logger,
202  fmt::format("AwaitPrimAsync::AwaitOnceAsync: Operation requested to abort"));
203  return boost::make_ready_future<bool>(true);
204  }
205  LOG4CPLUS_DEBUG(m_params.common.logger,
206  fmt::format("AwaitPrimAsync::AwaitOnceAsync: Sending requests..."));
207 
208  m_last_start = std::chrono::steady_clock::now();
209 
210  return SendRequestAndCollectReplies<bool>(
211  m_params.common.prim_sources.begin(),
212  m_params.common.prim_sources.end(),
213  [](Source<PrimSource> const& source) -> bool {
214  /* only send to sources that are not already stopped */
215  return IsSubsequentState(State::Stopped, source.GetState());
216  },
217  m_params.common,
218  // Sender
219  [this](Source<PrimSource>& s) -> boost::future<Reply> {
220  auto& client = s.GetSource().GetRrClient();
221  auto seconds = duration_cast<Seconds>(this->m_params.wait_interval).count();
222 #if defined(UNIT_TEST)
223  // ECII-783: MAL suddenly requires that interface *must* be implemented by
224  // matching middleware otherwise it fails.
225  // To avoid complicated mocking we still use our local implementation for unit
226  // testing.
227  auto spec = std::make_shared<RecWaitSpec>(seconds);
228 #else
229  auto mal = client.getMal();
230  BOOST_ASSERT_MSG(mal, "MAL RR client returned invalid MAL pointer");
231  auto spec = mal->createDataEntity<recif::RecWaitSpec>();
232  // Initialize all members by assigning from local implementation
233  *spec = RecWaitSpec(seconds);
234 #endif
235  return client.RecWait(spec);
236  },
237  // reply handler
238  [this](AsyncOpParams params, Source<PrimSource>& source, boost::future<Reply>&& fut)
239  -> bool { return HandleRecWaitReply(source, std::move(fut)); },
240  std::string_view("AwaitPrimAsync: await primary data acquisition"))
241  .then(m_params.common.executor, AwaitUnwrapReplies);
242 }
243 
244 bool AwaitPrimAsync::HandleRecWaitReply(
245  Source<PrimSource>& source, boost::future<std::shared_ptr<recif::RecWaitStatus>>&& fut) {
246  // Note: Alerts are set and cleared immedately rather than deferring to
247  // a later point as the await operation can take a very long time.
248  auto alert_id =
249  MakeAlertId(alert::REQUEST, std::string("RecWait") + source.GetSource().GetName());
250  try {
251  auto reply = fut.get();
252  // Request did not throw -> clear any alert
253  m_params.common.status.ClearAlert(alert_id);
254 
255  auto status = reply->getStatus();
256  if (status == recif::Success) {
257  // Await returned
258  source.SetState(State::Stopped);
259 
260  auto rec_status = reply->getRecStatus();
261  LOG4CPLUS_INFO(m_params.common.logger,
262  fmt::format("Data source '{}' replied successfully and provides {} "
263  "number of files",
264  source.GetSource(),
265  rec_status->getDpFiles().size()));
266  if (rec_status->getDpFiles().empty()) {
267  LOG4CPLUS_WARN(m_params.common.logger,
268  fmt::format("Data source '{}' replied successfully for "
269  "RecWait but did not produce any files!",
270  source.GetSource()));
271  }
272  for (auto const& file : rec_status->getDpFiles()) {
273  m_parts.emplace_back(std::string(source.GetSource().GetName()), file);
274  }
275  // All ok, return true
276  return true;
277  } else {
278  // Wait timed out, return false to resend
279  return false;
280  }
281  } catch (recif::ExceptionErr const& e) {
282  m_params.common.status.SetAlert(
283  MakeAlert(alert_id,
284  fmt::format("Primary source '{}' request 'RecWaitStatus' replied "
285  "with ICD error: ({}) {}",
286  source.GetSource().GetName(),
287  e.getCode(),
288  e.getDesc())));
289  m_error = true;
290  throw boost::enable_current_exception(
291  DaqSourceError("RecWait", std::string(source.GetSource().GetName()), e.what()));
292  } catch (...) {
293  auto what = error::FormatException(std::current_exception());
294  m_params.common.status.SetAlert(
295  MakeAlert(alert_id,
296  fmt::format("Primary source '{}' request 'RecWaitStatus' replied "
297  "with non-ICD error: {}",
298  source.GetSource().GetName(),
299  what)));
300  m_error = true;
301  throw boost::enable_current_exception(DaqSourceError(
302  "RecWait", std::string(source.GetSource().GetName()), "unknown exception"));
303  }
304 }
305 } // namespace daq::op
Contains declaration for the AwaitPrimAsync operation.
Declares daq::State and related functions.
constexpr std::string_view REQUEST
Request.
Definition: status.hpp:32
void FormatException(std::ostream &os, std::exception_ptr ptr)
Report without nesting.
Definition: report.cpp:79
void Reply(AsyncOpParams &params, SourceType const &source, std::string description, bool error)
Definition: util.hpp:30
AlertId MakeAlertId(std::string_view category, std::string key)
Definition: status.cpp:49
@ Stopped
All data sources have reported they have stopped acquiring data.
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
Definition: status.cpp:39
Simple class that holds the source and associated state.
Definition: source.hpp:29
rad::IoExecutor & executor
log4cplus::Logger & logger
Await specific parameters that is not provided with AsyncOpParams.
boost::future< Result< DpParts > > Initiate()
Initiates operation that await acquisition completion.
Definition: awaitPrim.cpp:103
AwaitPrimAsync(AwaitOpParams params) noexcept
Constructs operation with the privided parameters.
Definition: awaitPrim.cpp:99
Contains declaration for the async op utilities.