8 #include <gtest/gtest.h>
11 #include <mal/MalException.hpp>
17 using namespace ::testing;
18 using namespace std::chrono;
33 auto& s = m_props.meta_sources.emplace_back();
35 s.rr_uri =
"zpb.rr://127.0.0.1/daq";
36 m_props.results.emplace_back(
"location",
"path");
39 m_status->SetState(State::NotScheduled);
43 m_dpm_status = *m_status;
44 m_dpm_client = std::make_shared<DpmClientMock>();
48 m_controller = std::make_shared<DpmDaqController>(
49 m_io_ctx, m_props, m_status, m_event_log, m_dpm_client);
68 m_dpm_status.state = State::Scheduled;
72 EXPECT_EQ(m_controller->GetState(), State::NotScheduled);
73 m_dpm_client->status_signal(m_dpm_status);
74 EXPECT_EQ(m_controller->GetState(), State::Scheduled)
75 <<
"Expected new state from status update signal";
84 m_status->SetState(State::Aborted,
true);
86 m_dpm_status.state = State::Merging;
87 m_dpm_status.timestamp = Status::Clock::now();
92 m_dpm_status.timestamp = Status::Clock::now();
93 EXPECT_EQ(m_controller->GetState(), State::Aborted);
94 m_dpm_client->status_signal(m_dpm_status);
95 EXPECT_EQ(m_controller->GetState(), State::Merging)
96 <<
"Expected new state from status update signal";
97 EXPECT_FALSE(m_controller->GetErrorFlag());
103 auto fut = m_controller->StartAsync();
105 EXPECT_THROW(fut.get(), std::runtime_error);
111 auto fut = m_controller->StopAsync(ErrorPolicy::Tolerant);
113 EXPECT_THROW(fut.get(), std::runtime_error);
120 EXPECT_THROW(m_controller->UpdateKeywords({}), std::runtime_error);
128 boost::promise<R> reply;
129 EXPECT_CALL(*m_dpm_client, ScheduleAsync(_)).WillOnce(InvokeWithoutArgs([&] {
130 return reply.get_future();
134 auto fut = m_controller->ScheduleMergeAsync();
137 reply.set_value(State::Scheduled);
140 EXPECT_TRUE(fut.is_ready());
141 auto result = fut.get();
143 EXPECT_EQ(State::Scheduled, m_controller->GetState());
147 ScheduleMergeAsyncSucceedsIfDpmSucceedsWithStatusSignalReceivedBeforeReply) {
152 boost::promise<R> reply;
153 EXPECT_CALL(*m_dpm_client, ScheduleAsync(_)).WillOnce(InvokeWithoutArgs([&] {
154 return reply.get_future();
158 auto fut = m_controller->ScheduleMergeAsync();
161 m_dpm_status.state = State::Scheduled;
162 m_dpm_client->status_signal(m_dpm_status);
163 EXPECT_EQ(State::Scheduled, m_controller->GetState());
166 reply.set_value(State::Scheduled);
169 EXPECT_TRUE(fut.is_ready());
170 auto result = fut.get();
172 EXPECT_EQ(State::Scheduled, m_controller->GetState());
180 boost::promise<R> reply;
181 EXPECT_CALL(*m_dpm_client, ScheduleAsync(_)).WillOnce(InvokeWithoutArgs([&] {
182 return reply.get_future();
186 auto fut = m_controller->ScheduleMergeAsync();
189 reply.set_exception(std::runtime_error(
"some_error"));
192 EXPECT_TRUE(fut.is_ready());
193 EXPECT_THROW(fut.get(), std::exception);
194 EXPECT_EQ(State::NotScheduled, m_controller->GetState());
195 EXPECT_TRUE(m_controller->GetStatus()->GetError());
203 boost::promise<R> reply;
204 EXPECT_CALL(*m_dpm_client, ScheduleAsync(_)).WillOnce(InvokeWithoutArgs([&] {
205 return reply.get_future();
209 auto fut = m_controller->ScheduleMergeAsync();
210 reply.set_exception(elt::mal::TimeoutException(
"TIMEOUT"));
212 EXPECT_TRUE(fut.is_ready());
215 EXPECT_THROW(fut.get(), elt::mal::TimeoutException);
216 EXPECT_EQ(State::NotScheduled, m_controller->GetState()) <<
"State shoudln't have changed";
217 EXPECT_FALSE(m_controller->GetStatus()->GetError())
218 <<
"Error flag should not be set for timeouts";
223 m_status->SetState(State::Scheduled);
227 auto fut = m_controller->ScheduleMergeAsync();
228 EXPECT_TRUE(fut.is_ready());
231 EXPECT_THROW(fut.get(), std::runtime_error);
232 EXPECT_EQ(State::Scheduled, m_controller->GetState());
233 EXPECT_FALSE(m_controller->GetStatus()->GetError());
241 auto fut = m_controller->AbortAsync(ErrorPolicy::Strict);
243 EXPECT_TRUE(fut.is_ready());
244 auto result = fut.get();
246 EXPECT_FALSE(result.error);
247 EXPECT_EQ(State::Aborted, m_controller->GetState());
253 m_status->SetState(State::Scheduled);
257 boost::promise<R> reply;
258 EXPECT_CALL(*m_dpm_client, AbortAsync(
"id"))
259 .WillOnce(InvokeWithoutArgs([&] {
return reply.get_future(); }));
262 auto fut = m_controller->AbortAsync(ErrorPolicy::Strict);
265 reply.set_value(State::Aborted);
270 EXPECT_TRUE(fut.is_ready());
271 auto result = fut.get();
273 EXPECT_FALSE(result.error);
274 EXPECT_EQ(State::Aborted, m_controller->GetState());
279 m_status->SetState(State::Scheduled);
283 boost::promise<R> reply;
284 EXPECT_CALL(*m_dpm_client, AbortAsync(
"id"))
285 .WillOnce(InvokeWithoutArgs([&] {
return reply.get_future(); }));
288 auto fut = m_controller->AbortAsync(ErrorPolicy::Strict);
291 reply.set_exception(std::runtime_error(
"ERROR"));
296 EXPECT_TRUE(fut.is_ready());
297 EXPECT_THROW(fut.get(), std::runtime_error);
298 EXPECT_EQ(State::Scheduled, m_controller->GetState());
299 EXPECT_FALSE(m_controller->GetStatus()->GetError())
300 <<
"Failed to abort is not a condition for marking DAQ as error";
Stores data acquisition status and allows subscription to status changes.
Stores data acquisition status and allows subscription to status changes.
std::shared_ptr< ObservableStatus > m_status
boost::asio::io_context m_io_ctx
std::shared_ptr< DpmClientMock > m_dpm_client
std::shared_ptr< DpmDaqController > m_controller
std::shared_ptr< ObservableEventLog > m_event_log
Fixture for daq::DaqController life cycle tests.
void MakeTestProgress(boost::asio::io_context &io_ctx, Future *fut=nullptr)
Test helper that progress the test by executing pending jobs and optionally wait for a future to be r...
TEST_F(TestDpmDaqController, AbortAsyncWithStrictPolicyDoesNothingIfDpmAbortFails)
State
Observable states of the data acquisition process.
Contains declaration for for DaqController.
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Non observable status object that keeps stores status of data acquisition.
EXPECT_EQ(meta.rr_uri, "zpb.rr://meta")
ASSERT_TRUE(std::holds_alternative< OlasReceiver >(spec.receivers[0]))
Defines shared test utilities.