8 #include <gtest/gtest.h>
11 #include <mal/MalException.hpp>
17 using namespace ::testing;
18 using namespace std::chrono;
33 auto& s = m_props.meta_sources.emplace_back();
35 s.rr_uri =
"zpb.rr://127.0.0.1/daq";
36 m_props.results.emplace_back(
"location",
"path");
39 m_status->SetState(State::NotScheduled);
43 m_dpm_status = *m_status;
44 m_dpm_client = std::make_shared<DpmClientMock>();
48 m_controller = std::make_shared<DpmDaqController>(
49 m_io_ctx, m_props, m_status, m_event_log, m_dpm_client);
68 m_dpm_status.state = State::Scheduled;
72 EXPECT_EQ(m_controller->GetState(), State::NotScheduled);
73 m_dpm_client->status_signal(m_dpm_status);
74 EXPECT_EQ(m_controller->GetState(), State::Scheduled)
75 <<
"Expected new state from status update signal";
84 m_status->SetState(State::Aborted,
true);
86 m_dpm_status.state = State::Merging;
90 EXPECT_EQ(m_controller->GetState(), State::Aborted);
91 m_dpm_client->status_signal(m_dpm_status);
92 EXPECT_EQ(m_controller->GetState(), State::Merging)
93 <<
"Expected new state from status update signal";
94 EXPECT_FALSE(m_controller->GetErrorFlag());
100 auto fut = m_controller->StartAsync();
101 ASSERT_TRUE(fut.is_ready());
102 EXPECT_THROW(fut.get(), std::runtime_error);
108 auto fut = m_controller->StopAsync(ErrorPolicy::Tolerant);
109 ASSERT_TRUE(fut.is_ready());
110 EXPECT_THROW(fut.get(), std::runtime_error);
117 EXPECT_THROW(m_controller->UpdateKeywords({}), std::runtime_error);
125 boost::promise<R> reply;
126 EXPECT_CALL(*m_dpm_client, ScheduleAsync(_)).WillOnce(InvokeWithoutArgs([&] {
127 return reply.get_future();
131 auto fut = m_controller->ScheduleMergeAsync();
134 reply.set_value(State::Scheduled);
137 EXPECT_TRUE(fut.is_ready());
138 auto result = fut.get();
140 EXPECT_EQ(State::Scheduled, m_controller->GetState());
144 ScheduleMergeAsyncSucceedsIfDpmSucceedsWithStatusSignalReceivedBeforeReply) {
149 boost::promise<R> reply;
150 EXPECT_CALL(*m_dpm_client, ScheduleAsync(_)).WillOnce(InvokeWithoutArgs([&] {
151 return reply.get_future();
155 auto fut = m_controller->ScheduleMergeAsync();
158 m_dpm_status.state = State::Scheduled;
159 m_dpm_client->status_signal(m_dpm_status);
160 EXPECT_EQ(State::Scheduled, m_controller->GetState());
163 reply.set_value(State::Scheduled);
166 EXPECT_TRUE(fut.is_ready());
167 auto result = fut.get();
169 EXPECT_EQ(State::Scheduled, m_controller->GetState());
177 boost::promise<R> reply;
178 EXPECT_CALL(*m_dpm_client, ScheduleAsync(_)).WillOnce(InvokeWithoutArgs([&] {
179 return reply.get_future();
183 auto fut = m_controller->ScheduleMergeAsync();
186 reply.set_exception(std::runtime_error(
"some_error"));
189 EXPECT_TRUE(fut.is_ready());
190 EXPECT_THROW(fut.get(), std::exception);
191 EXPECT_EQ(State::NotScheduled, m_controller->GetState());
192 EXPECT_TRUE(m_controller->GetStatus()->GetError());
200 boost::promise<R> reply;
201 EXPECT_CALL(*m_dpm_client, ScheduleAsync(_)).WillOnce(InvokeWithoutArgs([&] {
202 return reply.get_future();
206 auto fut = m_controller->ScheduleMergeAsync();
207 reply.set_exception(elt::mal::TimeoutException(
"TIMEOUT"));
209 EXPECT_TRUE(fut.is_ready());
212 EXPECT_THROW(fut.get(), elt::mal::TimeoutException);
213 EXPECT_EQ(State::NotScheduled, m_controller->GetState()) <<
"State shoudln't have changed";
214 EXPECT_FALSE(m_controller->GetStatus()->GetError())
215 <<
"Error flag should not be set for timeouts";
220 m_status->SetState(State::Scheduled);
224 auto fut = m_controller->ScheduleMergeAsync();
225 EXPECT_TRUE(fut.is_ready());
228 EXPECT_THROW(fut.get(), std::runtime_error);
229 EXPECT_EQ(State::Scheduled, m_controller->GetState());
230 EXPECT_FALSE(m_controller->GetStatus()->GetError());
238 auto fut = m_controller->AbortAsync(ErrorPolicy::Strict);
240 EXPECT_TRUE(fut.is_ready());
241 auto result = fut.get();
243 EXPECT_FALSE(result.error);
244 EXPECT_EQ(State::Aborted, m_controller->GetState());
250 m_status->SetState(State::Scheduled);
254 boost::promise<R> reply;
255 EXPECT_CALL(*m_dpm_client, AbortAsync(
"id"))
256 .WillOnce(InvokeWithoutArgs([&] {
return reply.get_future(); }));
259 auto fut = m_controller->AbortAsync(ErrorPolicy::Strict);
262 reply.set_value(State::Aborted);
267 EXPECT_TRUE(fut.is_ready());
268 auto result = fut.get();
270 EXPECT_FALSE(result.error);
271 EXPECT_EQ(State::Aborted, m_controller->GetState());
276 m_status->SetState(State::Scheduled);
280 boost::promise<R> reply;
281 EXPECT_CALL(*m_dpm_client, AbortAsync(
"id"))
282 .WillOnce(InvokeWithoutArgs([&] {
return reply.get_future(); }));
285 auto fut = m_controller->AbortAsync(ErrorPolicy::Strict);
288 reply.set_exception(std::runtime_error(
"ERROR"));
293 EXPECT_TRUE(fut.is_ready());
294 EXPECT_THROW(fut.get(), std::runtime_error);
295 EXPECT_EQ(State::Scheduled, m_controller->GetState());
296 EXPECT_FALSE(m_controller->GetStatus()->GetError())
297 <<
"Failed to abort is not a condition for marking DAQ as error";
Stores data acquisition status and allows subscription to status changes.
Stores data acquisition status and allows subscription to status changes.
std::shared_ptr< ObservableStatus > m_status
boost::asio::io_context m_io_ctx
std::shared_ptr< DpmClientMock > m_dpm_client
std::shared_ptr< DpmDaqController > m_controller
std::shared_ptr< ObservableEventLog > m_event_log
Fixture for daq::DaqController life cycle tests.
void MakeTestProgress(boost::asio::io_context &io_ctx, Future *fut=nullptr)
Test helper that progress the test by executing pending jobs and optionally wait for a future to be r...
TEST_F(TestDpmDaqController, AbortAsyncWithStrictPolicyDoesNothingIfDpmAbortFails)
State
Observable states of the data acquisition process.
Contains declaration for for DaqController.
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Non observable status object that keeps stores status of data acquisition.
EXPECT_EQ(meta.rr_uri, "zpb.rr://meta")
Defines shared test utilities.