ifw-daq  3.0.0-pre2
IFW Data Acquisition modules
testAsyncOpAwaitPrim.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_ocm_libdaq_test
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief Unit test for op::AwaitPrimAsync
7  */
8 #include <daq/config.hpp>
9 
10 #include <gmock/gmock.h>
11 #include <gtest/gtest.h>
12 #include <log4cplus/logger.h>
13 
14 #include <daq/error.hpp>
15 #include <daq/op/awaitPrim.hpp>
16 #include <daq/op/initiate.hpp>
17 
18 #include "testAsyncOpBase.hpp"
19 
20 using namespace daq;
21 using namespace ::testing;
22 
23 namespace {
24 
25 class RecStatusFake : public recif::RecStatus {
26 public:
27  RecStatusFake(std::string id, recif::RecStatusNames status, std::vector<std::string> files)
28  : m_id(std::move(id)), m_status(status), m_files(std::move(files)) {
29  }
30  virtual std::vector<std::string> getDpFiles() const override {
31  return m_files;
32  }
33  virtual void setDpFiles(const std::vector<std::string>& dp_files) override {
34  m_files = dp_files;
35  }
36 
37  virtual double getEndTime() const override {
38  return 0.0;
39  }
40  virtual void setEndTime(double end_time) override {
41  }
42 
43  virtual int32_t getFilesGenerated() const override {
44  return m_files.size();
45  }
46  virtual void setFilesGenerated(int32_t files_generated) override {
47  }
48 
49  virtual int32_t getFramesProcessed() const override {
50  return 1;
51  }
52  virtual void setFramesProcessed(int32_t frames_processed) override {
53  }
54 
55  virtual int32_t getFramesRemaining() const override {
56  return 0;
57  }
58  virtual void setFramesRemaining(int32_t frames_remaining) override {
59  }
60 
61  virtual std::string getId() const override {
62  return m_id;
63  }
64  virtual void setId(const std::string& id) override {
65  m_id = id;
66  }
67 
68  virtual std::string getInfo() const override {
69  return {};
70  }
71  virtual void setInfo(const std::string& info) override {
72  }
73 
74  virtual double getRemainingTime() const override {
75  return 1.0;
76  }
77  virtual void setRemainingTime(double remaining_time) override {
78  }
79 
80  virtual int32_t getSizeRecorded() const override {
81  return 1;
82  }
83  virtual void setSizeRecorded(int32_t size_recorded) override {
84  }
85 
86  virtual double getStartTime() const override {
87  return 1.0;
88  }
89  virtual void setStartTime(double start_time) override {
90  }
91 
92  virtual ::recif::RecStatusNames getStatus() const override {
93  return m_status;
94  }
95  virtual void setStatus(::recif::RecStatusNames status) override {
96  m_status = status;
97  }
98 
99  virtual double getTimeElapsed() const override {
100  return 1.0;
101  }
102  virtual void setTimeElapsed(double time_elapsed) override {
103  }
104  bool hasKey() const override {
105  return false;
106  }
107  std::unique_ptr<recif::RecStatus> cloneKey() const override {
108  throw std::runtime_error("not clonable");
109  }
110  std::unique_ptr<recif::RecStatus> clone() const override {
111  throw std::runtime_error("not clonable");
112  }
113  bool keyEquals(const recif::RecStatus& other) const override {
114  return false;
115  }
116 
117 private:
118  std::string m_id;
119  recif::RecStatusNames m_status;
120  std::vector<std::string> m_files;
121 };
122 
123 class RecWaitStatusFake : public recif::RecWaitStatus {
124 public:
125  RecWaitStatusFake(recif::RecWaitStatusNames wait_status,
126  std::shared_ptr<recif::RecStatus> rec_status)
127  : m_wait_status{wait_status}, m_rec_status{std::move(rec_status)} {
128  }
129  std::shared_ptr<recif::RecStatus> getRecStatus() const override {
130  return m_rec_status;
131  }
132  // Does not exist in MAL for Devenv 3
133  // NOLINTNEXTLINE
134  void setRecStatus(const std::shared_ptr<::recif::RecStatus>& status) {
135  m_rec_status = status;
136  }
137 
138  recif::RecWaitStatusNames getStatus() const override {
139  return m_wait_status;
140  }
141  void setStatus(recif::RecWaitStatusNames wait_status) override {
142  m_wait_status = wait_status;
143  }
144  bool hasKey() const override {
145  return false;
146  }
147  std::unique_ptr<recif::RecWaitStatus> cloneKey() const override {
148  throw std::runtime_error("not clonable");
149  }
150  std::unique_ptr<recif::RecWaitStatus> clone() const override {
151  throw std::runtime_error("not clonable");
152  }
153  bool keyEquals(const recif::RecWaitStatus& other) const override {
154  return false;
155  }
156 
157 private:
158  recif::RecWaitStatusNames m_wait_status;
159  std::shared_ptr<recif::RecStatus> m_rec_status;
160 };
161 } // namespace
162 /**
163  * @ingroup daq_ocm_libdaq_test
164  */
166  std::shared_ptr<recif::RecWaitStatus> MakeWaitNotCompletedStatus() {
167  auto status = std::make_shared<RecWaitStatusFake>(
168  recif::Timeout,
169  std::make_shared<RecStatusFake>(
170  m_id, recif::RecStatusNames::Active, std::vector<std::string>()));
171  return status;
172  }
173  std::shared_ptr<recif::RecWaitStatus> MakeWaitCompletedStatus() {
174  auto status = std::make_shared<RecWaitStatusFake>(
175  recif::Success,
176  std::make_shared<RecStatusFake>(
177  m_id,
178  recif::RecStatusNames::Completed,
179  std::vector<std::string>{"/file1.fits", "/file2.fits"}));
180  return status;
181  }
182 
184  return daq::op::AwaitOpParams(MakeParams(), std::chrono::milliseconds(0));
185  }
186 };
187 
188 TEST_F(TestAsyncOpAwaitPrim, AwaitCompletesOnFirstRequest) {
189  boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_1;
190  boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_2;
191  auto reply_1 = MakeWaitCompletedStatus();
192  auto reply_2 = MakeWaitCompletedStatus();
193  EXPECT_CALL(*m_prim_rr_client, RecWait(_))
194  .WillOnce(Return(ByMove(prim_promise_1.get_future())));
195  EXPECT_CALL(*m_prim_rr_client2, RecWait(_))
196  .WillOnce(Return(ByMove(prim_promise_2.get_future())));
197 
198  // Run
199  auto fut = op::InitiateOperation<op::AwaitPrimAsync>(MakeAwaitOpParams());
200  // "Send replies"
201  prim_promise_1.set_value(reply_1);
202  prim_promise_2.set_value(reply_2);
203 
204  // Execute handlers
205  MakeTestProgress(m_io_ctx, &fut);
206 
207  ASSERT_TRUE(fut.is_ready());
208  ASSERT_FALSE(fut.has_exception()) << "Future has unexpected exception!";
209  auto result = fut.get();
210  EXPECT_FALSE(result.error) << "There should have been no errors as each request was successful";
211  EXPECT_EQ(result.result.size(), 4u) << "Each primary source returned two files by default";
212 }
213 
214 TEST_F(TestAsyncOpAwaitPrim, AwaitCompletesOnSecondRequest) {
215  // Only the second source is completed on second attempt
216  boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_1;
217  boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_2_1;
218  boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_2_2;
219  auto reply_1 = MakeWaitCompletedStatus();
220  auto reply_2_1 = MakeWaitNotCompletedStatus();
221  auto reply_2_2 = MakeWaitCompletedStatus();
222  EXPECT_CALL(*m_prim_rr_client, RecWait(_))
223  .WillOnce(Return(ByMove(prim_promise_1.get_future())));
224  EXPECT_CALL(*m_prim_rr_client2, RecWait(_))
225  .WillOnce(Return(ByMove(prim_promise_2_1.get_future())))
226  .WillOnce(Return(ByMove(prim_promise_2_2.get_future())));
227 
228  // Run
229  auto fut = op::InitiateOperation<op::AwaitPrimAsync>(MakeAwaitOpParams());
230  // "Send replies"
231  prim_promise_1.set_value(reply_1);
232  prim_promise_2_1.set_value(reply_2_1);
233 
234  // Run ready handlers
235  m_io_ctx.poll();
236  ASSERT_FALSE(fut.is_ready());
237 
238  prim_promise_2_2.set_value(reply_2_2);
239 
240  // Execute handlers
241  MakeTestProgress(m_io_ctx, &fut);
242 
243  ASSERT_TRUE(fut.is_ready());
244  ASSERT_FALSE(fut.has_exception()) << "Future has unexpected exception!";
245  auto result = fut.get();
246  EXPECT_FALSE(result.error) << "There should have been no errors as each request was successful";
247  EXPECT_EQ(result.result.size(), 4u) << "Each primary source returned two files by default";
248 }
249 
250 TEST_F(TestAsyncOpAwaitPrim, SourceThrowsExceptionIsRetriedSuccessfully) {
251  boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_1;
252  boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_2_1;
253  // Source 2 will first throw exception, which should be retried and second attempt will be
254  // succesful.
255  boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_2_2;
256  auto reply_1 = MakeWaitCompletedStatus();
257  auto reply_2_2 = MakeWaitCompletedStatus();
258  EXPECT_CALL(*m_prim_rr_client, RecWait(_))
259  .WillOnce(Return(ByMove(prim_promise_1.get_future())));
260  EXPECT_CALL(*m_prim_rr_client2, RecWait(_))
261  .WillOnce(Return(ByMove(prim_promise_2_1.get_future())))
262  .WillOnce(Return(ByMove(prim_promise_2_2.get_future())));
263 
264  // Run
265  ASSERT_TRUE(m_status->GetAlerts().empty());
266 
267  auto fut = op::InitiateOperation<op::AwaitPrimAsync>(MakeAwaitOpParams());
268  // "Send replies"
269  prim_promise_1.set_value(reply_1);
270  prim_promise_2_1.set_exception(recif::ExceptionErr("random error", 1));
271 
272  // Run ready handlers
273  m_io_ctx.poll();
274  ASSERT_FALSE(fut.is_ready());
275  // Status should now contain alert
276  ASSERT_EQ(m_status->GetAlerts().size(), 1u);
277  auto alert = m_status->GetAlerts()[0];
278  EXPECT_THAT(alert.description, HasSubstr("prim-source-2"));
279 
280  prim_promise_2_2.set_value(reply_2_2);
281 
282  // Execute handlers
283  MakeTestProgress(m_io_ctx, &fut);
284 
285  ASSERT_TRUE(fut.is_ready());
286  ASSERT_FALSE(fut.has_exception()) << "Future has unexpected exception!";
287  auto result = fut.get();
288  EXPECT_TRUE(result.error) << "A erors as each request was successful";
289  EXPECT_EQ(result.result.size(), 4u) << "Each primary source returned two files by default";
290  EXPECT_TRUE(m_status->GetAlerts().empty())
291  << "Alert should have cleared after successful retry";
292 }
293 
294 TEST_F(TestAsyncOpAwaitPrim, SourceThrowsExceptionIsAbortedSuccessfully) {
295  boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_1;
296  boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_2_1;
297  boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_2_2;
298  auto reply_1 = MakeWaitCompletedStatus();
299  auto reply_2_2 = MakeWaitCompletedStatus();
300  EXPECT_CALL(*m_prim_rr_client, RecWait(_))
301  .WillOnce(Return(ByMove(prim_promise_1.get_future())));
302  EXPECT_CALL(*m_prim_rr_client2, RecWait(_))
303  .WillOnce(Return(ByMove(prim_promise_2_1.get_future())));
304 
305  // Run
306  auto [fut, abort] = op::InitiateAbortableOperation<op::AwaitPrimAsync>(MakeAwaitOpParams());
307  // "Send replies"
308  prim_promise_1.set_value(reply_1);
309  prim_promise_2_1.set_exception(recif::ExceptionErr("random error", 1));
310 
311  // Run ready handlers
312  m_io_ctx.poll();
313  ASSERT_FALSE(fut.is_ready());
314 
315  // Abort operation. This should set the promise (but might not be ready immedately due to
316  // continuations)
317  EXPECT_TRUE(abort());
318  // Execute handlers
319  MakeTestProgress(m_io_ctx, &fut);
320  EXPECT_TRUE(fut.is_ready()) << "aborting operation should immediately set operation result";
321 
322  ASSERT_FALSE(fut.has_exception()) << "Future has unexpected exception!";
323  auto result = fut.get();
324  EXPECT_TRUE(result.error) << "Operation was completed with error (exception from source)";
325  EXPECT_EQ(result.result.size(), 2u) << "Only result from first source was received before "
326  "being aborted";
327 
328  // Send reply to trigger cleanup of pending continuations
329  prim_promise_2_2.set_value(reply_1);
330  MakeTestProgress(m_io_ctx);
331 }
Contains declaration for the AwaitPrimAsync operation.
Contains error related declarations for DAQ.
std::shared_ptr< recif::RecWaitStatus > MakeWaitCompletedStatus()
daq::op::AwaitOpParams MakeAwaitOpParams()
std::shared_ptr< recif::RecWaitStatus > MakeWaitNotCompletedStatus()
void MakeTestProgress(boost::asio::io_context &io_ctx, Future *fut=nullptr)
Test helper that progress the test by executing pending jobs and optionally wait for a future to be r...
Definition: utils.hpp:42
Base fixture for async operation tests.
Contains declarations for the helper functions to initiate operations.
TEST_F(TestDpmDaqController, StatusUpdateInNotScheduledSucceeds)
Await specific parameters that is not provided with AsyncOpParams.
Contains declaration for async operations shared base class.
EXPECT_EQ(meta.rr_uri, "zpb.rr://meta")
ASSERT_EQ(meta.keyword_rules.size(), 1u)