ifw-daq  3.0.0-pre2
IFW Data Acquisition modules
testDpmDaqController.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_ocm_libdaq_test
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief Unit test for `daq::DpmDaqController`
7  */
8 #include <gtest/gtest.h>
9 
10 #include <daq/daqController.hpp>
11 #include <mal/MalException.hpp>
12 
13 #include "mock/dpmClientMock.hpp"
14 #include "statusObserver.hpp"
15 #include "utils.hpp"
16 
17 using namespace ::testing;
18 using namespace std::chrono;
19 
20 namespace daq {
21 /**
22  * Fixture for daq::DaqController life cycle tests
23  *
24  * @ingroup daq_ocm_libdaq_test
25  */
26 class TestDpmDaqController : public ::testing::Test {
27 public:
29  : m_io_ctx()
30  , m_status(std::make_shared<ObservableStatus>("id", "fileid"))
31  , m_event_log(std::make_shared<ObservableEventLog>()) {
32  m_props.id = "id";
33  auto& s = m_props.meta_sources.emplace_back();
34  s.name = "meta";
35  s.rr_uri = "zpb.rr://127.0.0.1/daq";
36  m_props.results.emplace_back("location", "path");
37 
38  // Default state
39  m_status->SetState(State::NotScheduled);
40  }
41 
42  void SetUp() override {
43  m_dpm_status = *m_status;
44  m_dpm_client = std::make_shared<DpmClientMock>();
45  }
46 
47  void PostSetUp() {
48  m_controller = std::make_shared<DpmDaqController>(
49  m_io_ctx, m_props, m_status, m_event_log, m_dpm_client);
50  }
51 
52  boost::asio::io_context m_io_ctx; // NOLINT
53  std::shared_ptr<ObservableStatus> m_status; // NOLINT
54  std::shared_ptr<ObservableEventLog> m_event_log; // NOLINT
55  DaqContext m_props; // NOLINT
56  std::shared_ptr<DpmClientMock> m_dpm_client; // NOLINT
57  /**
58  * Fake status update sample from DPM
59  */
60  Status m_dpm_status = Status("id", "fileid");
61 
62  std::shared_ptr<DpmDaqController> m_controller;
63 };
64 
65 TEST_F(TestDpmDaqController, StatusUpdateInNotScheduledSucceeds) {
66  // Setup
67  // New state will be Scheduled
68  m_dpm_status.state = State::Scheduled;
69  PostSetUp();
70 
71  // Test
72  EXPECT_EQ(m_controller->GetState(), State::NotScheduled);
73  m_dpm_client->status_signal(m_dpm_status);
74  EXPECT_EQ(m_controller->GetState(), State::Scheduled)
75  << "Expected new state from status update signal";
76 }
77 
78 /**
79  * Test that DPM can override status of OCM as it is the authoratitive source of DAQ status.
80  */
81 TEST_F(TestDpmDaqController, StatusOverrideFromOcmAbortedToDpmMergingSucceeds) {
82  // Setup
83  // Put DAQ it in state Aborted with error from OCM point-of-view.
84  m_status->SetState(State::Aborted, true);
85  // The DPM state is Merging however.
86  m_dpm_status.state = State::Merging;
87  PostSetUp();
88 
89  // Test
90  EXPECT_EQ(m_controller->GetState(), State::Aborted);
91  m_dpm_client->status_signal(m_dpm_status);
92  EXPECT_EQ(m_controller->GetState(), State::Merging)
93  << "Expected new state from status update signal";
94  EXPECT_FALSE(m_controller->GetErrorFlag());
95 }
96 
97 TEST_F(TestDpmDaqController, StartAsyncThrows) {
98  PostSetUp();
99 
100  auto fut = m_controller->StartAsync();
101  ASSERT_TRUE(fut.is_ready());
102  EXPECT_THROW(fut.get(), std::runtime_error);
103 }
104 
105 TEST_F(TestDpmDaqController, StopAsyncThrows) {
106  PostSetUp();
107 
108  auto fut = m_controller->StopAsync(ErrorPolicy::Tolerant);
109  ASSERT_TRUE(fut.is_ready());
110  EXPECT_THROW(fut.get(), std::runtime_error);
111 }
112 
113 
114 TEST_F(TestDpmDaqController, UpdateKeywordsThrows) {
115  PostSetUp();
116 
117  EXPECT_THROW(m_controller->UpdateKeywords({}), std::runtime_error);
118 }
119 
120 TEST_F(TestDpmDaqController, ScheduleMergeAsyncSucceedsIfDpmSucceeds) {
121  // Setup
122  PostSetUp();
123 
124  using R = State;
125  boost::promise<R> reply;
126  EXPECT_CALL(*m_dpm_client, ScheduleAsync(_)).WillOnce(InvokeWithoutArgs([&] {
127  return reply.get_future();
128  }));
129 
130  // Test
131  auto fut = m_controller->ScheduleMergeAsync();
132 
133  // Fake reply
134  reply.set_value(State::Scheduled);
135  MakeTestProgress(m_io_ctx, &fut);
136 
137  EXPECT_TRUE(fut.is_ready());
138  auto result = fut.get();
139  EXPECT_EQ(State::Scheduled, result);
140  EXPECT_EQ(State::Scheduled, m_controller->GetState());
141 }
142 
144  ScheduleMergeAsyncSucceedsIfDpmSucceedsWithStatusSignalReceivedBeforeReply) {
145  // Setup
146  PostSetUp();
147 
148  using R = State;
149  boost::promise<R> reply;
150  EXPECT_CALL(*m_dpm_client, ScheduleAsync(_)).WillOnce(InvokeWithoutArgs([&] {
151  return reply.get_future();
152  }));
153 
154  // Test
155  auto fut = m_controller->ScheduleMergeAsync();
156 
157  // Simulate status update from DPM before reply being received
158  m_dpm_status.state = State::Scheduled;
159  m_dpm_client->status_signal(m_dpm_status);
160  EXPECT_EQ(State::Scheduled, m_controller->GetState());
161 
162  // Fake reply
163  reply.set_value(State::Scheduled);
164  MakeTestProgress(m_io_ctx, &fut);
165 
166  EXPECT_TRUE(fut.is_ready());
167  auto result = fut.get();
168  EXPECT_EQ(State::Scheduled, result);
169  EXPECT_EQ(State::Scheduled, m_controller->GetState());
170 }
171 
172 TEST_F(TestDpmDaqController, ScheduleMergeAsyncFailsIfDpmFails) {
173  // Setup
174  PostSetUp();
175 
176  using R = State;
177  boost::promise<R> reply;
178  EXPECT_CALL(*m_dpm_client, ScheduleAsync(_)).WillOnce(InvokeWithoutArgs([&] {
179  return reply.get_future();
180  }));
181 
182  // Test
183  auto fut = m_controller->ScheduleMergeAsync();
184 
185  // Fake reply
186  reply.set_exception(std::runtime_error("some_error"));
187  MakeTestProgress(m_io_ctx, &fut);
188 
189  EXPECT_TRUE(fut.is_ready());
190  EXPECT_THROW(fut.get(), std::exception);
191  EXPECT_EQ(State::NotScheduled, m_controller->GetState());
192  EXPECT_TRUE(m_controller->GetStatus()->GetError());
193 }
194 
195 TEST_F(TestDpmDaqController, ScheduleMergeAsyncFailsIfTimeout) {
196  // Setup
197  PostSetUp();
198 
199  using R = State;
200  boost::promise<R> reply;
201  EXPECT_CALL(*m_dpm_client, ScheduleAsync(_)).WillOnce(InvokeWithoutArgs([&] {
202  return reply.get_future();
203  }));
204 
205  // Test
206  auto fut = m_controller->ScheduleMergeAsync();
207  reply.set_exception(elt::mal::TimeoutException("TIMEOUT"));
208  MakeTestProgress(m_io_ctx, &fut);
209  EXPECT_TRUE(fut.is_ready());
210 
211  // Fake reply
212  EXPECT_THROW(fut.get(), elt::mal::TimeoutException);
213  EXPECT_EQ(State::NotScheduled, m_controller->GetState()) << "State shoudln't have changed";
214  EXPECT_FALSE(m_controller->GetStatus()->GetError())
215  << "Error flag should not be set for timeouts";
216 }
217 
218 TEST_F(TestDpmDaqController, ScheduleMergeAsyncFailsIfAlreadyScheduled) {
219  // Setup
220  m_status->SetState(State::Scheduled);
221  PostSetUp();
222 
223  // Test
224  auto fut = m_controller->ScheduleMergeAsync();
225  EXPECT_TRUE(fut.is_ready());
226 
227  // Fake reply
228  EXPECT_THROW(fut.get(), std::runtime_error);
229  EXPECT_EQ(State::Scheduled, m_controller->GetState());
230  EXPECT_FALSE(m_controller->GetStatus()->GetError());
231 }
232 
233 TEST_F(TestDpmDaqController, AbortAsyncAbortImmediatelyIfNoPendingRequestsExist) {
234  // Setup
235  PostSetUp();
236 
237  // Run
238  auto fut = m_controller->AbortAsync(ErrorPolicy::Strict);
239 
240  EXPECT_TRUE(fut.is_ready());
241  auto result = fut.get();
242  EXPECT_EQ(State::Aborted, result.state);
243  EXPECT_FALSE(result.error);
244  EXPECT_EQ(State::Aborted, m_controller->GetState());
245 }
246 
247 TEST_F(TestDpmDaqController, AbortAsyncWithStrictPolicyAbortsIfDpmAborts) {
248  // Setup
249  // Put it in state Scheduled or after
250  m_status->SetState(State::Scheduled);
251  PostSetUp();
252 
253  using R = State;
254  boost::promise<R> reply;
255  EXPECT_CALL(*m_dpm_client, AbortAsync("id"))
256  .WillOnce(InvokeWithoutArgs([&] { return reply.get_future(); }));
257 
258  // Run
259  auto fut = m_controller->AbortAsync(ErrorPolicy::Strict);
260 
261  // Fake successful reply
262  reply.set_value(State::Aborted);
263 
264  // Progress test
265  MakeTestProgress(m_io_ctx, &fut);
266 
267  EXPECT_TRUE(fut.is_ready());
268  auto result = fut.get();
269  EXPECT_EQ(State::Aborted, result.state);
270  EXPECT_FALSE(result.error);
271  EXPECT_EQ(State::Aborted, m_controller->GetState());
272 }
273 
274 TEST_F(TestDpmDaqController, AbortAsyncWithStrictPolicyDoesNothingIfDpmAbortFails) {
275  // Setup
276  m_status->SetState(State::Scheduled);
277  PostSetUp();
278 
279  using R = State;
280  boost::promise<R> reply;
281  EXPECT_CALL(*m_dpm_client, AbortAsync("id"))
282  .WillOnce(InvokeWithoutArgs([&] { return reply.get_future(); }));
283 
284  // Run
285  auto fut = m_controller->AbortAsync(ErrorPolicy::Strict);
286 
287  // Fake reply
288  reply.set_exception(std::runtime_error("ERROR"));
289 
290  // Progress test
291  MakeTestProgress(m_io_ctx, &fut);
292 
293  EXPECT_TRUE(fut.is_ready());
294  EXPECT_THROW(fut.get(), std::runtime_error);
295  EXPECT_EQ(State::Scheduled, m_controller->GetState());
296  EXPECT_FALSE(m_controller->GetStatus()->GetError())
297  << "Failed to abort is not a condition for marking DAQ as error";
298 }
299 
300 } // namespace daq
Stores data acquisition status and allows subscription to status changes.
Definition: eventLog.hpp:107
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:165
daq::DpmClient
std::shared_ptr< ObservableStatus > m_status
boost::asio::io_context m_io_ctx
std::shared_ptr< DpmClientMock > m_dpm_client
std::shared_ptr< DpmDaqController > m_controller
std::shared_ptr< ObservableEventLog > m_event_log
Fixture for daq::DaqController life cycle tests.
void MakeTestProgress(boost::asio::io_context &io_ctx, Future *fut=nullptr)
Test helper that progress the test by executing pending jobs and optionally wait for a future to be r...
Definition: utils.hpp:42
TEST_F(TestDpmDaqController, AbortAsyncWithStrictPolicyDoesNothingIfDpmAbortFails)
State
Observable states of the data acquisition process.
Definition: state.hpp:39
Contains declaration for for DaqController.
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:44
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:124
EXPECT_EQ(meta.rr_uri, "zpb.rr://meta")
Defines shared test utilities.