ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
testAsyncOpAwaitPrim.cpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup daq_ocm_libdaq_test
4 * @copyright 2022 ESO - European Southern Observatory
5 *
6 * @brief Unit test for op::AwaitPrimAsync
7 */
8#include <daq/config.hpp>
9
10#include <gmock/gmock.h>
11#include <gtest/gtest.h>
12#include <log4cplus/logger.h>
13
14#include <daq/error.hpp>
15#include <daq/op/awaitPrim.hpp>
16#include <daq/op/initiate.hpp>
17
18#include "testAsyncOpBase.hpp"
19#include "utils.hpp"
20
21using namespace daq;
22using namespace ::testing;
23
24namespace {
25
26class RecStatusFake : public recif::RecStatus {
27public:
28 RecStatusFake(std::string id, recif::RecStatusNames status, std::vector<std::string> files)
29 : m_id(std::move(id)), m_status(status), m_files(std::move(files)) {
30 }
31 virtual std::vector<std::string> getDpFiles() const override {
32 return m_files;
33 }
34 virtual void setDpFiles(const std::vector<std::string>& dp_files) override {
35 m_files = dp_files;
36 }
37
38 virtual double getEndTime() const override {
39 return 0.0;
40 }
41 virtual void setEndTime(double end_time) override {
42 }
43
44 virtual int32_t getFilesGenerated() const override {
45 return m_files.size();
46 }
47 virtual void setFilesGenerated(int32_t files_generated) override {
48 }
49
50 virtual int32_t getFramesProcessed() const override {
51 return 1;
52 }
53 virtual void setFramesProcessed(int32_t frames_processed) override {
54 }
55
56 virtual int32_t getFramesRemaining() const override {
57 return 0;
58 }
59 virtual void setFramesRemaining(int32_t frames_remaining) override {
60 }
61
62 virtual std::string getId() const override {
63 return m_id;
64 }
65 virtual void setId(const std::string& id) override {
66 m_id = id;
67 }
68
69 virtual std::string getInfo() const override {
70 return {};
71 }
72 virtual void setInfo(const std::string& info) override {
73 }
74
75 virtual double getRemainingTime() const override {
76 return 1.0;
77 }
78 virtual void setRemainingTime(double remaining_time) override {
79 }
80
81 virtual int64_t getSizeRecorded() const override {
82 return 1;
83 }
84 virtual void setSizeRecorded(int64_t size_recorded) override {
85 }
86
87 virtual double getStartTime() const override {
88 return 1.0;
89 }
90 virtual void setStartTime(double start_time) override {
91 }
92
93 virtual ::recif::RecStatusNames getStatus() const override {
94 return m_status;
95 }
96 virtual void setStatus(::recif::RecStatusNames status) override {
97 m_status = status;
98 }
99
100 virtual double getTimeElapsed() const override {
101 return 1.0;
102 }
103 virtual void setTimeElapsed(double time_elapsed) override {
104 }
105 bool hasKey() const override {
106 return false;
107 }
108 std::unique_ptr<recif::RecStatus> cloneKey() const override {
109 throw std::runtime_error("not clonable");
110 }
111 std::unique_ptr<recif::RecStatus> clone() const override {
112 throw std::runtime_error("not clonable");
113 }
114 bool keyEquals(const recif::RecStatus& other) const override {
115 return false;
116 }
117
118private:
119 std::string m_id;
120 recif::RecStatusNames m_status;
121 std::vector<std::string> m_files;
122};
123
124class RecWaitStatusFake : public recif::RecWaitStatus {
125public:
126 RecWaitStatusFake(recif::RecWaitStatusNames wait_status,
127 std::shared_ptr<recif::RecStatus> rec_status)
128 : m_wait_status{wait_status}, m_rec_status{std::move(rec_status)} {
129 }
130 std::shared_ptr<recif::RecStatus> getRecStat() const override {
131 return m_rec_status;
132 }
133 // Does not exist in MAL for Devenv 3
134 // NOLINTNEXTLINE
135 void setRecStat(const std::shared_ptr<::recif::RecStatus>& status) {
136 m_rec_status = status;
137 }
138
139 recif::RecWaitStatusNames getStatus() const override {
140 return m_wait_status;
141 }
142 void setStatus(recif::RecWaitStatusNames wait_status) override {
143 m_wait_status = wait_status;
144 }
145 bool hasKey() const override {
146 return false;
147 }
148 std::unique_ptr<recif::RecWaitStatus> cloneKey() const override {
149 throw std::runtime_error("not clonable");
150 }
151 std::unique_ptr<recif::RecWaitStatus> clone() const override {
152 throw std::runtime_error("not clonable");
153 }
154 bool keyEquals(const recif::RecWaitStatus& other) const override {
155 return false;
156 }
157
158private:
159 recif::RecWaitStatusNames m_wait_status;
160 std::shared_ptr<recif::RecStatus> m_rec_status;
161};
162} // namespace
163/**
164 * @ingroup daq_ocm_libdaq_test
165 */
167 std::shared_ptr<recif::RecWaitStatus> MakeWaitNotCompletedStatus() {
168 auto status = std::make_shared<RecWaitStatusFake>(
169 recif::Timeout,
170 std::make_shared<RecStatusFake>(
171 m_id, recif::RecStatusNames::Active, std::vector<std::string>()));
172 return status;
173 }
174 std::shared_ptr<recif::RecWaitStatus> MakeWaitCompletedStatus() {
175 auto status = std::make_shared<RecWaitStatusFake>(
176 recif::Success,
177 std::make_shared<RecStatusFake>(
178 m_id,
179 recif::RecStatusNames::Completed,
180 std::vector<std::string>{"/file1.fits", "/file2.fits"}));
181 return status;
182 }
183
185 return daq::op::AwaitOpParams(MakeParams(), std::chrono::milliseconds(0));
186 }
187};
188
189TEST_F(TestAsyncOpAwaitPrim, AwaitCompletesOnFirstRequest) {
190 boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_1;
191 boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_2;
192 auto reply_1 = MakeWaitCompletedStatus();
193 auto reply_2 = MakeWaitCompletedStatus();
194 EXPECT_CALL(*m_prim_rr_client, RecWait(_))
195 .WillOnce(Return(ByMove(prim_promise_1.get_future())));
196 EXPECT_CALL(*m_prim_rr_client2, RecWait(_))
197 .WillOnce(Return(ByMove(prim_promise_2.get_future())));
198
199 // Run
200 auto fut = op::InitiateOperation<op::AwaitPrimAsync>(m_executor, MakeAwaitOpParams());
201 // "Send replies"
202 prim_promise_1.set_value(reply_1);
203 prim_promise_2.set_value(reply_2);
204
205 // Execute handlers
206 MakeTestProgress(m_io_ctx, &fut);
207
208 ASSERT_TRUE(fut.is_ready());
209 ASSERT_FALSE(fut.has_exception()) << "Future has unexpected exception!";
210 auto result = fut.get();
211 EXPECT_FALSE(result.error) << "There should have been no errors as each request was successful";
212 EXPECT_EQ(result.result.size(), 4u) << "Each primary source returned two files by default";
213}
214
215TEST_F(TestAsyncOpAwaitPrim, AwaitCompletesOnSecondRequest) {
216 // Only the second source is completed on second attempt
217 boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_1;
218 boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_2_1;
219 boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_2_2;
220 auto reply_1 = MakeWaitCompletedStatus();
221 auto reply_2_1 = MakeWaitNotCompletedStatus();
222 auto reply_2_2 = MakeWaitCompletedStatus();
223 EXPECT_CALL(*m_prim_rr_client, RecWait(_))
224 .WillOnce(Return(ByMove(prim_promise_1.get_future())));
225 EXPECT_CALL(*m_prim_rr_client2, RecWait(_))
226 .WillOnce(Return(ByMove(prim_promise_2_1.get_future())))
227 .WillOnce(Return(ByMove(prim_promise_2_2.get_future())));
228
229 // Run
230 auto fut = op::InitiateOperation<op::AwaitPrimAsync>(m_executor, MakeAwaitOpParams());
231 // "Send replies"
232 prim_promise_1.set_value(reply_1);
233 prim_promise_2_1.set_value(reply_2_1);
234
235 // Run ready handlers
236 m_io_ctx.poll();
237 ASSERT_FALSE(fut.is_ready());
238
239 prim_promise_2_2.set_value(reply_2_2);
240
241 // Execute handlers
242 MakeTestProgress(m_io_ctx, &fut);
243
244 ASSERT_TRUE(fut.is_ready());
245 ASSERT_FALSE(fut.has_exception()) << "Future has unexpected exception!";
246 auto result = fut.get();
247 EXPECT_FALSE(result.error) << "There should have been no errors as each request was successful";
248 EXPECT_EQ(result.result.size(), 4u) << "Each primary source returned two files by default";
249}
250
251TEST_F(TestAsyncOpAwaitPrim, SourceThrowsExceptionIsRetriedSuccessfully) {
252 boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_1;
253 boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_2_1;
254 // Source 2 will first throw exception, which should be retried and second attempt will be
255 // succesful.
256 boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_2_2;
257 auto reply_1 = MakeWaitCompletedStatus();
258 auto reply_2_2 = MakeWaitCompletedStatus();
259 EXPECT_CALL(*m_prim_rr_client, RecWait(_))
260 .WillOnce(Return(ByMove(prim_promise_1.get_future())));
261 EXPECT_CALL(*m_prim_rr_client2, RecWait(_))
262 .WillOnce(Return(ByMove(prim_promise_2_1.get_future())))
263 .WillOnce(Return(ByMove(prim_promise_2_2.get_future())));
264
265 // Run
266 ASSERT_TRUE(m_status->GetAlerts().empty());
267
268 auto fut = op::InitiateOperation<op::AwaitPrimAsync>(m_executor, MakeAwaitOpParams());
269 // "Send replies"
270 prim_promise_1.set_value(reply_1);
271 prim_promise_2_1.set_exception(recif::ExceptionErr("random error", 1));
272
273 // Run ready handlers
274 m_io_ctx.poll();
275 ASSERT_FALSE(fut.is_ready());
276 // Status should now contain alert
277 ASSERT_EQ(m_status->GetAlerts().size(), 1u);
278 auto alert = m_status->GetAlerts()[0];
279 EXPECT_THAT(alert.description, HasSubstr("prim-source-2"));
280
281 prim_promise_2_2.set_value(reply_2_2);
282
283 // Execute handlers
284 MakeTestProgress(m_io_ctx, &fut);
285
286 ASSERT_TRUE(fut.is_ready());
287 ASSERT_FALSE(fut.has_exception()) << "Future has unexpected exception!";
288 auto result = fut.get();
289 EXPECT_TRUE(result.error) << "A erors as each request was successful";
290 EXPECT_EQ(result.result.size(), 4u) << "Each primary source returned two files by default";
291 EXPECT_TRUE(m_status->GetAlerts().empty())
292 << "Alert should have cleared after successful retry";
293}
294
295TEST_F(TestAsyncOpAwaitPrim, SourceThrowsExceptionIsAbortedSuccessfully) {
296 boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_1;
297 boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_2_1;
298 boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_2_2;
299 auto reply_1 = MakeWaitCompletedStatus();
300 auto reply_2_2 = MakeWaitCompletedStatus();
301 EXPECT_CALL(*m_prim_rr_client, RecWait(_))
302 .WillOnce(Return(ByMove(prim_promise_1.get_future())));
303 EXPECT_CALL(*m_prim_rr_client2, RecWait(_))
304 .WillOnce(Return(ByMove(prim_promise_2_1.get_future())));
305
306 // Run
307 auto [fut, abort] =
308 op::InitiateAbortableOperation<op::AwaitPrimAsync>(m_executor, MakeAwaitOpParams());
309 // "Send replies"
310 prim_promise_1.set_value(reply_1);
311 prim_promise_2_1.set_exception(recif::ExceptionErr("random error", 1));
312
313 // Run ready handlers
314 m_io_ctx.poll();
315 ASSERT_FALSE(fut.is_ready());
316
317 // Abort operation. This should set the promise (but might not be ready immedately due to
318 // continuations)
319 EXPECT_TRUE(abort());
320 // Execute handlers
321 MakeTestProgress(m_io_ctx, &fut);
322 EXPECT_TRUE(fut.is_ready()) << "aborting operation should immediately set operation result";
323
324 ASSERT_FALSE(fut.has_exception()) << "Future has unexpected exception!";
325 auto result = fut.get();
326 EXPECT_TRUE(result.error) << "Operation was completed with error (exception from source)";
327 EXPECT_EQ(result.result.size(), 2u) << "Only result from first source was received before "
328 "being aborted";
329
330 // Send reply to trigger cleanup of pending continuations
331 prim_promise_2_2.set_value(reply_1);
332 MakeTestProgress(m_io_ctx);
333}
Contains declaration for the AwaitPrimAsync operation.
Contains error related declarations for DAQ.
daq::op::AsyncOpParams MakeParams(daq::fits::KeywordFormatter const *fmt=nullptr)
Make async params using m_std_formatter if no fmt is not provided.
std::shared_ptr< recif::RecWaitStatus > MakeWaitCompletedStatus()
daq::op::AwaitOpParams MakeAwaitOpParams()
std::shared_ptr< recif::RecWaitStatus > MakeWaitNotCompletedStatus()
void MakeTestProgress(boost::asio::io_context &io_ctx, Future *fut=nullptr)
Test helper that progress the test by executing pending jobs and optionally wait for a future to be r...
Definition: utils.hpp:44
Base fixture for async operation tests.
Contains declarations for the helper functions to initiate operations.
Await specific parameters that is not provided with AsyncOpParams.
TEST_F(TestAsyncOpAwaitPrim, AwaitCompletesOnFirstRequest)
Contains declaration for async operations shared base class.
EXPECT_EQ(meta.rr_uri, "zpb.rr://meta")
ASSERT_EQ(meta.keyword_rules.size(), 1u)
ASSERT_TRUE(std::holds_alternative< OlasReceiver >(spec.receivers[0]))
Defines shared test utilities.