10#include <gmock/gmock.h>
11#include <gtest/gtest.h>
12#include <log4cplus/logger.h>
22using namespace ::testing;
26class RecStatusFake :
public recif::RecStatus {
28 RecStatusFake(std::string
id, recif::RecStatusNames status, std::vector<std::string> files)
29 : m_id(std::move(id)), m_status(status), m_files(std::move(files)) {
31 virtual std::vector<std::string> getDpFiles()
const override {
34 virtual void setDpFiles(
const std::vector<std::string>& dp_files)
override {
38 virtual double getEndTime()
const override {
41 virtual void setEndTime(
double end_time)
override {
44 virtual int32_t getFilesGenerated()
const override {
45 return m_files.size();
47 virtual void setFilesGenerated(int32_t files_generated)
override {
50 virtual int32_t getFramesProcessed()
const override {
53 virtual void setFramesProcessed(int32_t frames_processed)
override {
56 virtual int32_t getFramesRemaining()
const override {
59 virtual void setFramesRemaining(int32_t frames_remaining)
override {
62 virtual std::string getId()
const override {
65 virtual void setId(
const std::string&
id)
override {
69 virtual std::string getInfo()
const override {
72 virtual void setInfo(
const std::string& info)
override {
75 virtual double getRemainingTime()
const override {
78 virtual void setRemainingTime(
double remaining_time)
override {
81 virtual int64_t getSizeRecorded()
const override {
84 virtual void setSizeRecorded(int64_t size_recorded)
override {
87 virtual double getStartTime()
const override {
90 virtual void setStartTime(
double start_time)
override {
93 virtual ::recif::RecStatusNames getStatus()
const override {
96 virtual void setStatus(::recif::RecStatusNames status)
override {
100 virtual double getTimeElapsed()
const override {
103 virtual void setTimeElapsed(
double time_elapsed)
override {
105 bool hasKey()
const override {
108 std::unique_ptr<recif::RecStatus> cloneKey()
const override {
109 throw std::runtime_error(
"not clonable");
111 std::unique_ptr<recif::RecStatus> clone()
const override {
112 throw std::runtime_error(
"not clonable");
114 bool keyEquals(
const recif::RecStatus& other)
const override {
120 recif::RecStatusNames m_status;
121 std::vector<std::string> m_files;
124class RecWaitStatusFake :
public recif::RecWaitStatus {
126 RecWaitStatusFake(recif::RecWaitStatusNames wait_status,
127 std::shared_ptr<recif::RecStatus> rec_status)
128 : m_wait_status{wait_status}, m_rec_status{std::move(rec_status)} {
130 std::shared_ptr<recif::RecStatus> getRecStat()
const override {
135 void setRecStat(
const std::shared_ptr<::recif::RecStatus>& status) {
136 m_rec_status = status;
139 recif::RecWaitStatusNames getStatus()
const override {
140 return m_wait_status;
142 void setStatus(recif::RecWaitStatusNames wait_status)
override {
143 m_wait_status = wait_status;
145 bool hasKey()
const override {
148 std::unique_ptr<recif::RecWaitStatus> cloneKey()
const override {
149 throw std::runtime_error(
"not clonable");
151 std::unique_ptr<recif::RecWaitStatus> clone()
const override {
152 throw std::runtime_error(
"not clonable");
154 bool keyEquals(
const recif::RecWaitStatus& other)
const override {
159 recif::RecWaitStatusNames m_wait_status;
160 std::shared_ptr<recif::RecStatus> m_rec_status;
168 auto status = std::make_shared<RecWaitStatusFake>(
170 std::make_shared<RecStatusFake>(
171 m_id, recif::RecStatusNames::Active, std::vector<std::string>()));
175 auto status = std::make_shared<RecWaitStatusFake>(
177 std::make_shared<RecStatusFake>(
179 recif::RecStatusNames::Completed,
180 std::vector<std::string>{
"/file1.fits",
"/file2.fits"}));
190 boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_1;
191 boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_2;
192 auto reply_1 = MakeWaitCompletedStatus();
193 auto reply_2 = MakeWaitCompletedStatus();
194 EXPECT_CALL(*m_prim_rr_client, RecWait(_))
195 .WillOnce(Return(ByMove(prim_promise_1.get_future())));
196 EXPECT_CALL(*m_prim_rr_client2, RecWait(_))
197 .WillOnce(Return(ByMove(prim_promise_2.get_future())));
200 auto fut = op::InitiateOperation<op::AwaitPrimAsync>(m_executor, MakeAwaitOpParams());
202 prim_promise_1.set_value(reply_1);
203 prim_promise_2.set_value(reply_2);
209 ASSERT_FALSE(fut.has_exception()) <<
"Future has unexpected exception!";
210 auto result = fut.get();
211 EXPECT_FALSE(result.error) <<
"There should have been no errors as each request was successful";
212 EXPECT_EQ(result.result.size(), 4u) <<
"Each primary source returned two files by default";
217 boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_1;
218 boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_2_1;
219 boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_2_2;
220 auto reply_1 = MakeWaitCompletedStatus();
221 auto reply_2_1 = MakeWaitNotCompletedStatus();
222 auto reply_2_2 = MakeWaitCompletedStatus();
223 EXPECT_CALL(*m_prim_rr_client, RecWait(_))
224 .WillOnce(Return(ByMove(prim_promise_1.get_future())));
225 EXPECT_CALL(*m_prim_rr_client2, RecWait(_))
226 .WillOnce(Return(ByMove(prim_promise_2_1.get_future())))
227 .WillOnce(Return(ByMove(prim_promise_2_2.get_future())));
230 auto fut = op::InitiateOperation<op::AwaitPrimAsync>(m_executor, MakeAwaitOpParams());
232 prim_promise_1.set_value(reply_1);
233 prim_promise_2_1.set_value(reply_2_1);
237 ASSERT_FALSE(fut.is_ready());
239 prim_promise_2_2.set_value(reply_2_2);
245 ASSERT_FALSE(fut.has_exception()) <<
"Future has unexpected exception!";
246 auto result = fut.get();
247 EXPECT_FALSE(result.error) <<
"There should have been no errors as each request was successful";
248 EXPECT_EQ(result.result.size(), 4u) <<
"Each primary source returned two files by default";
252 boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_1;
253 boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_2_1;
256 boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_2_2;
257 auto reply_1 = MakeWaitCompletedStatus();
258 auto reply_2_2 = MakeWaitCompletedStatus();
259 EXPECT_CALL(*m_prim_rr_client, RecWait(_))
260 .WillOnce(Return(ByMove(prim_promise_1.get_future())));
261 EXPECT_CALL(*m_prim_rr_client2, RecWait(_))
262 .WillOnce(Return(ByMove(prim_promise_2_1.get_future())))
263 .WillOnce(Return(ByMove(prim_promise_2_2.get_future())));
268 auto fut = op::InitiateOperation<op::AwaitPrimAsync>(m_executor, MakeAwaitOpParams());
270 prim_promise_1.set_value(reply_1);
271 prim_promise_2_1.set_exception(recif::ExceptionErr(
"random error", 1));
275 ASSERT_FALSE(fut.is_ready());
277 ASSERT_EQ(m_status->GetAlerts().size(), 1u);
278 auto alert = m_status->GetAlerts()[0];
279 EXPECT_THAT(alert.description, HasSubstr(
"prim-source-2"));
281 prim_promise_2_2.set_value(reply_2_2);
287 ASSERT_FALSE(fut.has_exception()) <<
"Future has unexpected exception!";
288 auto result = fut.get();
289 EXPECT_TRUE(result.error) <<
"A erors as each request was successful";
290 EXPECT_EQ(result.result.size(), 4u) <<
"Each primary source returned two files by default";
291 EXPECT_TRUE(m_status->GetAlerts().empty())
292 <<
"Alert should have cleared after successful retry";
296 boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_1;
297 boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_2_1;
298 boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_2_2;
299 auto reply_1 = MakeWaitCompletedStatus();
300 auto reply_2_2 = MakeWaitCompletedStatus();
301 EXPECT_CALL(*m_prim_rr_client, RecWait(_))
302 .WillOnce(Return(ByMove(prim_promise_1.get_future())));
303 EXPECT_CALL(*m_prim_rr_client2, RecWait(_))
304 .WillOnce(Return(ByMove(prim_promise_2_1.get_future())));
308 op::InitiateAbortableOperation<op::AwaitPrimAsync>(m_executor, MakeAwaitOpParams());
310 prim_promise_1.set_value(reply_1);
311 prim_promise_2_1.set_exception(recif::ExceptionErr(
"random error", 1));
315 ASSERT_FALSE(fut.is_ready());
319 EXPECT_TRUE(abort());
322 EXPECT_TRUE(fut.is_ready()) <<
"aborting operation should immediately set operation result";
324 ASSERT_FALSE(fut.has_exception()) <<
"Future has unexpected exception!";
325 auto result = fut.get();
326 EXPECT_TRUE(result.error) <<
"Operation was completed with error (exception from source)";
327 EXPECT_EQ(result.result.size(), 2u) <<
"Only result from first source was received before "
331 prim_promise_2_2.set_value(reply_1);
Contains declaration for the AwaitPrimAsync operation.
Contains error related declarations for DAQ.
daq::op::AsyncOpParams MakeParams(daq::fits::KeywordFormatter const *fmt=nullptr)
Make async params using m_std_formatter if no fmt is not provided.
std::shared_ptr< recif::RecWaitStatus > MakeWaitCompletedStatus()
daq::op::AwaitOpParams MakeAwaitOpParams()
std::shared_ptr< recif::RecWaitStatus > MakeWaitNotCompletedStatus()
void MakeTestProgress(boost::asio::io_context &io_ctx, Future *fut=nullptr)
Test helper that progress the test by executing pending jobs and optionally wait for a future to be r...
Base fixture for async operation tests.
Contains declarations for the helper functions to initiate operations.
Await specific parameters that is not provided with AsyncOpParams.
TEST_F(TestAsyncOpAwaitPrim, AwaitCompletesOnFirstRequest)
Contains declaration for async operations shared base class.
EXPECT_EQ(meta.rr_uri, "zpb.rr://meta")
ASSERT_EQ(meta.keyword_rules.size(), 1u)
ASSERT_TRUE(std::holds_alternative< OlasReceiver >(spec.receivers[0]))
Defines shared test utilities.