ifw-daq  3.0.1
IFW Data Acquisition modules
testDpmDaqController.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_ocm_libdaq_test
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief Unit test for `daq::DpmDaqController`
7  */
8 #include <gtest/gtest.h>
9 
10 #include <daq/daqController.hpp>
11 #include <mal/MalException.hpp>
12 
13 #include "mock/dpmClientMock.hpp"
14 #include "statusObserver.hpp"
15 #include "utils.hpp"
16 
17 using namespace ::testing;
18 using namespace std::chrono;
19 
20 namespace daq {
21 /**
22  * Fixture for daq::DaqController life cycle tests
23  *
24  * @ingroup daq_ocm_libdaq_test
25  */
26 class TestDpmDaqController : public ::testing::Test {
27 public:
29  : m_io_ctx()
30  , m_status(std::make_shared<ObservableStatus>("id", "fileid"))
31  , m_event_log(std::make_shared<ObservableEventLog>()) {
32  m_props.id = "id";
33  auto& s = m_props.meta_sources.emplace_back();
34  s.name = "meta";
35  s.rr_uri = "zpb.rr://127.0.0.1/daq";
36  m_props.results.emplace_back("location", "path");
37 
38  // Default state
39  m_status->SetState(State::NotScheduled);
40  }
41 
42  void SetUp() override {
43  m_dpm_status = *m_status;
44  m_dpm_client = std::make_shared<DpmClientMock>();
45  }
46 
47  void PostSetUp() {
48  m_controller = std::make_shared<DpmDaqController>(
49  m_io_ctx, m_props, m_status, m_event_log, m_dpm_client);
50  }
51 
52  boost::asio::io_context m_io_ctx; // NOLINT
53  std::shared_ptr<ObservableStatus> m_status; // NOLINT
54  std::shared_ptr<ObservableEventLog> m_event_log; // NOLINT
55  DaqContext m_props; // NOLINT
56  std::shared_ptr<DpmClientMock> m_dpm_client; // NOLINT
57  /**
58  * Fake status update sample from DPM
59  */
60  Status m_dpm_status = Status("id", "fileid");
61 
62  std::shared_ptr<DpmDaqController> m_controller;
63 };
64 
65 TEST_F(TestDpmDaqController, StatusUpdateInNotScheduledSucceeds) {
66  // Setup
67  // New state will be Scheduled
68  m_dpm_status.state = State::Scheduled;
69  PostSetUp();
70 
71  // Test
72  EXPECT_EQ(m_controller->GetState(), State::NotScheduled);
73  m_dpm_client->status_signal(m_dpm_status);
74  EXPECT_EQ(m_controller->GetState(), State::Scheduled)
75  << "Expected new state from status update signal";
76 }
77 
78 /**
79  * Test that DPM can override status of OCM as it is the authoratitive source of DAQ status.
80  */
81 TEST_F(TestDpmDaqController, StatusOverrideFromOcmAbortedToDpmMergingSucceeds) {
82  // Setup
83  // Put DAQ it in state Aborted with error from OCM point-of-view.
84  m_status->SetState(State::Aborted, true);
85  // The DPM state is Merging however.
86  m_dpm_status.state = State::Merging;
87  m_dpm_status.timestamp = Status::Clock::now();
88  PostSetUp();
89 
90  // Test
91  // Update timestamp of status so it looks new
92  m_dpm_status.timestamp = Status::Clock::now();
93  EXPECT_EQ(m_controller->GetState(), State::Aborted);
94  m_dpm_client->status_signal(m_dpm_status);
95  EXPECT_EQ(m_controller->GetState(), State::Merging)
96  << "Expected new state from status update signal";
97  EXPECT_FALSE(m_controller->GetErrorFlag());
98 }
99 
100 TEST_F(TestDpmDaqController, StartAsyncThrows) {
101  PostSetUp();
102 
103  auto fut = m_controller->StartAsync();
104  ASSERT_TRUE(fut.is_ready());
105  EXPECT_THROW(fut.get(), std::runtime_error);
106 }
107 
108 TEST_F(TestDpmDaqController, StopAsyncThrows) {
109  PostSetUp();
110 
111  auto fut = m_controller->StopAsync(ErrorPolicy::Tolerant);
112  ASSERT_TRUE(fut.is_ready());
113  EXPECT_THROW(fut.get(), std::runtime_error);
114 }
115 
116 
117 TEST_F(TestDpmDaqController, UpdateKeywordsThrows) {
118  PostSetUp();
119 
120  EXPECT_THROW(m_controller->UpdateKeywords({}), std::runtime_error);
121 }
122 
123 TEST_F(TestDpmDaqController, ScheduleMergeAsyncSucceedsIfDpmSucceeds) {
124  // Setup
125  PostSetUp();
126 
127  using R = State;
128  boost::promise<R> reply;
129  EXPECT_CALL(*m_dpm_client, ScheduleAsync(_)).WillOnce(InvokeWithoutArgs([&] {
130  return reply.get_future();
131  }));
132 
133  // Test
134  auto fut = m_controller->ScheduleMergeAsync();
135 
136  // Fake reply
137  reply.set_value(State::Scheduled);
138  MakeTestProgress(m_io_ctx, &fut);
139 
140  EXPECT_TRUE(fut.is_ready());
141  auto result = fut.get();
142  EXPECT_EQ(State::Scheduled, result);
143  EXPECT_EQ(State::Scheduled, m_controller->GetState());
144 }
145 
147  ScheduleMergeAsyncSucceedsIfDpmSucceedsWithStatusSignalReceivedBeforeReply) {
148  // Setup
149  PostSetUp();
150 
151  using R = State;
152  boost::promise<R> reply;
153  EXPECT_CALL(*m_dpm_client, ScheduleAsync(_)).WillOnce(InvokeWithoutArgs([&] {
154  return reply.get_future();
155  }));
156 
157  // Test
158  auto fut = m_controller->ScheduleMergeAsync();
159 
160  // Simulate status update from DPM before reply being received
161  m_dpm_status.state = State::Scheduled;
162  m_dpm_client->status_signal(m_dpm_status);
163  EXPECT_EQ(State::Scheduled, m_controller->GetState());
164 
165  // Fake reply
166  reply.set_value(State::Scheduled);
167  MakeTestProgress(m_io_ctx, &fut);
168 
169  EXPECT_TRUE(fut.is_ready());
170  auto result = fut.get();
171  EXPECT_EQ(State::Scheduled, result);
172  EXPECT_EQ(State::Scheduled, m_controller->GetState());
173 }
174 
175 TEST_F(TestDpmDaqController, ScheduleMergeAsyncFailsIfDpmFails) {
176  // Setup
177  PostSetUp();
178 
179  using R = State;
180  boost::promise<R> reply;
181  EXPECT_CALL(*m_dpm_client, ScheduleAsync(_)).WillOnce(InvokeWithoutArgs([&] {
182  return reply.get_future();
183  }));
184 
185  // Test
186  auto fut = m_controller->ScheduleMergeAsync();
187 
188  // Fake reply
189  reply.set_exception(std::runtime_error("some_error"));
190  MakeTestProgress(m_io_ctx, &fut);
191 
192  EXPECT_TRUE(fut.is_ready());
193  EXPECT_THROW(fut.get(), std::exception);
194  EXPECT_EQ(State::NotScheduled, m_controller->GetState());
195  EXPECT_TRUE(m_controller->GetStatus()->GetError());
196 }
197 
198 TEST_F(TestDpmDaqController, ScheduleMergeAsyncFailsIfTimeout) {
199  // Setup
200  PostSetUp();
201 
202  using R = State;
203  boost::promise<R> reply;
204  EXPECT_CALL(*m_dpm_client, ScheduleAsync(_)).WillOnce(InvokeWithoutArgs([&] {
205  return reply.get_future();
206  }));
207 
208  // Test
209  auto fut = m_controller->ScheduleMergeAsync();
210  reply.set_exception(elt::mal::TimeoutException("TIMEOUT"));
211  MakeTestProgress(m_io_ctx, &fut);
212  EXPECT_TRUE(fut.is_ready());
213 
214  // Fake reply
215  EXPECT_THROW(fut.get(), elt::mal::TimeoutException);
216  EXPECT_EQ(State::NotScheduled, m_controller->GetState()) << "State shoudln't have changed";
217  EXPECT_FALSE(m_controller->GetStatus()->GetError())
218  << "Error flag should not be set for timeouts";
219 }
220 
221 TEST_F(TestDpmDaqController, ScheduleMergeAsyncFailsIfAlreadyScheduled) {
222  // Setup
223  m_status->SetState(State::Scheduled);
224  PostSetUp();
225 
226  // Test
227  auto fut = m_controller->ScheduleMergeAsync();
228  EXPECT_TRUE(fut.is_ready());
229 
230  // Fake reply
231  EXPECT_THROW(fut.get(), std::runtime_error);
232  EXPECT_EQ(State::Scheduled, m_controller->GetState());
233  EXPECT_FALSE(m_controller->GetStatus()->GetError());
234 }
235 
236 TEST_F(TestDpmDaqController, AbortAsyncAbortImmediatelyIfNoPendingRequestsExist) {
237  // Setup
238  PostSetUp();
239 
240  // Run
241  auto fut = m_controller->AbortAsync(ErrorPolicy::Strict);
242 
243  EXPECT_TRUE(fut.is_ready());
244  auto result = fut.get();
245  EXPECT_EQ(State::Aborted, result.state);
246  EXPECT_FALSE(result.error);
247  EXPECT_EQ(State::Aborted, m_controller->GetState());
248 }
249 
250 TEST_F(TestDpmDaqController, AbortAsyncWithStrictPolicyAbortsIfDpmAborts) {
251  // Setup
252  // Put it in state Scheduled or after
253  m_status->SetState(State::Scheduled);
254  PostSetUp();
255 
256  using R = State;
257  boost::promise<R> reply;
258  EXPECT_CALL(*m_dpm_client, AbortAsync("id"))
259  .WillOnce(InvokeWithoutArgs([&] { return reply.get_future(); }));
260 
261  // Run
262  auto fut = m_controller->AbortAsync(ErrorPolicy::Strict);
263 
264  // Fake successful reply
265  reply.set_value(State::Aborted);
266 
267  // Progress test
268  MakeTestProgress(m_io_ctx, &fut);
269 
270  EXPECT_TRUE(fut.is_ready());
271  auto result = fut.get();
272  EXPECT_EQ(State::Aborted, result.state);
273  EXPECT_FALSE(result.error);
274  EXPECT_EQ(State::Aborted, m_controller->GetState());
275 }
276 
277 TEST_F(TestDpmDaqController, AbortAsyncWithStrictPolicyDoesNothingIfDpmAbortFails) {
278  // Setup
279  m_status->SetState(State::Scheduled);
280  PostSetUp();
281 
282  using R = State;
283  boost::promise<R> reply;
284  EXPECT_CALL(*m_dpm_client, AbortAsync("id"))
285  .WillOnce(InvokeWithoutArgs([&] { return reply.get_future(); }));
286 
287  // Run
288  auto fut = m_controller->AbortAsync(ErrorPolicy::Strict);
289 
290  // Fake reply
291  reply.set_exception(std::runtime_error("ERROR"));
292 
293  // Progress test
294  MakeTestProgress(m_io_ctx, &fut);
295 
296  EXPECT_TRUE(fut.is_ready());
297  EXPECT_THROW(fut.get(), std::runtime_error);
298  EXPECT_EQ(State::Scheduled, m_controller->GetState());
299  EXPECT_FALSE(m_controller->GetStatus()->GetError())
300  << "Failed to abort is not a condition for marking DAQ as error";
301 }
302 
303 } // namespace daq
Stores data acquisition status and allows subscription to status changes.
Definition: eventLog.hpp:107
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:210
daq::DpmClient
std::shared_ptr< ObservableStatus > m_status
boost::asio::io_context m_io_ctx
std::shared_ptr< DpmClientMock > m_dpm_client
std::shared_ptr< DpmDaqController > m_controller
std::shared_ptr< ObservableEventLog > m_event_log
Fixture for daq::DaqController life cycle tests.
void MakeTestProgress(boost::asio::io_context &io_ctx, Future *fut=nullptr)
Test helper that progress the test by executing pending jobs and optionally wait for a future to be r...
Definition: utils.hpp:42
TEST_F(TestDpmDaqController, AbortAsyncWithStrictPolicyDoesNothingIfDpmAbortFails)
State
Observable states of the data acquisition process.
Definition: state.hpp:39
Contains declaration for for DaqController.
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:44
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:153
EXPECT_EQ(meta.rr_uri, "zpb.rr://meta")
ASSERT_TRUE(std::holds_alternative< OlasReceiver >(spec.receivers[0]))
Defines shared test utilities.