ifw-daq  3.0.1
IFW Data Acquisition modules
testScheduler.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_common_libdpm
4  * @copyright (c) Copyright ESO 2022
5  * All Rights Reserved
6  * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7  *
8  * @brief Unit tests for daq::dpm::SchedulerImpl
9  */
10 
11 #include <daq/dpm/scheduler.hpp>
12 
13 #include <utility>
14 
15 #include "mock/mockScheduler.hpp"
16 #include "mock/mockWorkspace.hpp"
17 
18 #include <gmock/gmock.h>
19 #include <gtest/gtest.h>
20 
21 namespace daq::dpm {
22 
24  using Hook = std::function<void(MockDaqController&)>;
25  std::unique_ptr<DaqController> operator()(std::unique_ptr<DaqWorkspace>, Resources&) {
26  auto daq = std::make_unique<MockDaqController>();
27  if (hook) {
28  hook(*daq);
29  }
30  daq_controllers.push_back(daq.get());
31  return daq;
32  }
33  void SetHook(Hook f) {
34  hook = std::move(f);
35  }
36  std::vector<DaqController*> daq_controllers;
38 };
39 
40 class TestSchedulerBase : public ::testing::Test {
41 public:
43  }
44 
45  void SetUp() override {
46  }
47  void TearDown() override {
48  // Execute possibly pending completions handlers.
49  EXPECT_NO_THROW(m_io_ctx.poll());
50  }
51 
52 protected:
53  boost::asio::io_context m_io_ctx;
55  std::vector<std::string> m_queue;
56 
58 };
59 
61 
63 public:
64  TestScheduler() : m_status("TEST.ID", "TEST.FILEID") {
65  }
66  void SetUp() override {
68  m_spec_str = R"(
69  {
70  "id": "TEST.ID",
71  "target": {
72  "fileId": "TEST.FILEID",
73  "source": {
74  "sourceName": "dcs",
75  "location": "dcs-host:/path/to/somefile.fits",
76  "path": "dcs/somefile.fits"
77  }
78  },
79  "sources": [
80  {
81  "type": "fitsFile",
82  "sourceName": "fcs",
83  "location": "fcs-host:/path/to/somefile.fits",
84  "path": "fcs/somefile.fits"
85  }
86  ],
87  "receivers": [
88  {
89  "type": "olasReceiver",
90  "host": "1.2.3.4",
91  "path": "/remote/olas/"
92  },
93  {
94  "type": "olasReceiver",
95  "path": "/local/olas/"
96  }
97  ]
98  }
99  )";
100  }
101  void PostSetup() {
102  EXPECT_CALL(m_ws_mock, LoadQueue()).WillOnce(::testing::Return(m_queue));
103 
104  m_scheduler = std::make_unique<SchedulerImpl>(
105  m_executor, m_ws_mock, std::reference_wrapper(m_daq_controller_factory), m_options);
106  }
107  void TearDown() override {
108  m_scheduler.reset();
110  }
111 
112 protected:
114  std::unique_ptr<SchedulerImpl> m_scheduler;
115  std::string m_spec_str;
118 };
119 
120 using namespace ::testing;
121 
122 TEST_F(TestSchedulerInit, Construction) {
123  SchedulerOptions m_options;
124  EXPECT_CALL(m_ws_mock, LoadQueue()).WillOnce(Return(m_queue));
125  EXPECT_NO_THROW(SchedulerImpl(m_executor, m_ws_mock, FakeDaqControllerFactory(), m_options));
126 }
127 
128 TEST_F(TestScheduler, QueueDaqIsSuccessful) {
129  // Setup
130  auto daq_ws = std::make_unique<MockDaqWorkspace>();
131  Status status = {};
132  status.id = "TEST.ID";
133  status.file_id = "TEST.FILEID";
134  status.state = State::Scheduled;
135  status.error = false;
136 
137  EXPECT_CALL(*daq_ws, StoreStatus(status));
138  EXPECT_CALL(*daq_ws, StoreSpecification(m_spec_str));
139  EXPECT_CALL(m_ws_mock, InitializeDaq("TEST.ID")).WillOnce(Return(ByMove(std::move(daq_ws))));
140  EXPECT_CALL(m_ws_mock, StoreQueue(std::vector<std::string>{"TEST.ID"}));
141 
142  PostSetup();
143 
144  // Test
145  m_scheduler->QueueDaq(m_spec_str);
146  EXPECT_TRUE(m_scheduler->IsQueued("TEST.ID"));
147 
148  // Queue same DAQ should fail
149  EXPECT_THROW(m_scheduler->QueueDaq(m_spec_str), std::invalid_argument);
150  EXPECT_TRUE(m_scheduler->IsQueued("TEST.ID"));
151 }
152 
153 TEST_F(TestScheduler, QueueDaqRollbackIfStoreQueueFails) {
154  // Setup
155  auto daq_ws = std::make_unique<MockDaqWorkspace>();
156  Status status = {};
157  status.id = "TEST.ID";
158  status.file_id = "TEST.FILEID";
159  status.state = State::Scheduled;
160 
161  Sequence init, rollback;
162 
163  EXPECT_CALL(m_ws_mock, GetPath()).Times(AnyNumber()).WillRepeatedly(Return("/tmp/workspace"));
164 
165  EXPECT_CALL(*daq_ws, StoreStatus(status));
166  EXPECT_CALL(*daq_ws, StoreSpecification(m_spec_str));
167  EXPECT_CALL(m_ws_mock, InitializeDaq("TEST.ID"))
168  .InSequence(init, rollback)
169  .WillOnce(Return(ByMove(std::move(daq_ws))));
170  EXPECT_CALL(m_ws_mock, StoreQueue(std::vector<std::string>{"TEST.ID"}))
171  .InSequence(init, rollback)
172  .WillRepeatedly(Throw(std::runtime_error("FAILED")));
173 
174  // Rollback
175  EXPECT_CALL(m_ws_mock, RemoveDaq("TEST.ID")).InSequence(rollback);
176 
177  PostSetup();
178 
179  // Test
180  EXPECT_THROW(m_scheduler->QueueDaq(m_spec_str), std::exception);
181  EXPECT_FALSE(m_scheduler->IsQueued("TEST.ID"));
182 }
183 
184 TEST_F(TestScheduler, QueueDaqRollbackIfStoreSpecificationFails) {
185  // Setup
186  auto daq_ws = std::make_unique<MockDaqWorkspace>();
187  Status status = {};
188  status.id = "TEST.ID";
189  status.file_id = "TEST.FILEID";
190  status.state = State::Scheduled;
191 
192  Sequence init, rollback;
193 
194  EXPECT_CALL(m_ws_mock, GetPath()).Times(AnyNumber()).WillRepeatedly(Return("/tmp/workspace"));
195 
196  EXPECT_CALL(*daq_ws, StoreStatus(status));
197  EXPECT_CALL(*daq_ws, StoreSpecification(m_spec_str))
198  .WillOnce(Throw(std::runtime_error("ERROR")));
199 
200  EXPECT_CALL(m_ws_mock, InitializeDaq("TEST.ID"))
201  .InSequence(init, rollback)
202  .WillOnce(Return(ByMove(std::move(daq_ws))));
203 
204  // Rollback
205  EXPECT_CALL(m_ws_mock, RemoveDaq("TEST.ID")).InSequence(rollback);
206 
207  PostSetup();
208 
209  // Test
210  EXPECT_THROW(m_scheduler->QueueDaq(m_spec_str), std::exception);
211  EXPECT_FALSE(m_scheduler->IsQueued("TEST.ID"));
212 }
213 
214 TEST_F(TestScheduler, QueueDaqRollbackIfInitializeDaqWorkspaceFails) {
215  // Setup
216  Sequence init, rollback;
217  EXPECT_CALL(m_ws_mock, GetPath()).Times(AnyNumber()).WillRepeatedly(Return("/tmp/workspace"));
218 
219  EXPECT_CALL(m_ws_mock, InitializeDaq("TEST.ID"))
220  .InSequence(init, rollback)
221  .WillRepeatedly(Throw(std::runtime_error("ERROR")));
222 
223  // Rollback
224  EXPECT_CALL(m_ws_mock, RemoveDaq("TEST.ID")).InSequence(rollback);
225 
226  PostSetup();
227 
228  // Test
229  EXPECT_THROW(m_scheduler->QueueDaq(m_spec_str), std::exception);
230  EXPECT_FALSE(m_scheduler->IsQueued("TEST.ID"));
231 }
232 
233 TEST_F(TestScheduler, QueueDaqFailsIfParseDpFails) {
234  // Setup
235  Sequence init, rollback;
236  EXPECT_CALL(m_ws_mock, GetPath()).Times(AnyNumber()).WillRepeatedly(Return("/tmp/workspace"));
237 
238  EXPECT_CALL(m_ws_mock, InitializeDaq("TEST.ID")).Times(0);
239 
240  PostSetup();
241 
242  // Test
243  EXPECT_THROW(m_scheduler->QueueDaq("not a specification"), std::invalid_argument);
244  EXPECT_FALSE(m_scheduler->IsQueued("TEST.ID"));
245 }
246 
247 TEST_F(TestScheduler, PollActivatesDaqWhichStartsTransfer) {
248  // Setup
249  PostSetup();
250  m_scheduler->Start();
251 
252  auto daq_ws = std::make_unique<MockDaqWorkspace>();
253  Status status = {};
254  status.id = "TEST.ID";
255  status.file_id = "TEST.FILEID";
256  status.state = State::Scheduled;
257 
258  EXPECT_CALL(*daq_ws, StoreStatus(status));
259  EXPECT_CALL(*daq_ws, StoreSpecification(m_spec_str));
260  EXPECT_CALL(m_ws_mock, InitializeDaq("TEST.ID")).WillOnce(Return(ByMove(std::move(daq_ws))));
261  EXPECT_CALL(m_ws_mock, StoreQueue(std::vector<std::string>{"TEST.ID"}));
262 
263  m_scheduler->QueueDaq(m_spec_str);
264  EXPECT_TRUE(m_scheduler->IsQueued("TEST.ID"));
265 
266  // Queue same DAQ should fail
267  EXPECT_THROW(m_scheduler->QueueDaq(m_spec_str), std::invalid_argument);
268  EXPECT_TRUE(m_scheduler->IsQueued("TEST.ID"));
269 
270  // We expect Poll() should load from workspace and create DaqController from factory
271  // and then start that.
272  daq_ws = std::make_unique<MockDaqWorkspace>();
273  EXPECT_CALL(m_ws_mock, LoadDaq("TEST.ID")).WillOnce(Return(ByMove(std::move(daq_ws))));
274  m_daq_controller_factory.SetHook([&](MockDaqController& mock) {
275  EXPECT_CALL(mock, GetStatus()).WillOnce(ReturnRef(m_status));
276  EXPECT_CALL(mock, GetId()).WillRepeatedly(ReturnRef(m_status.GetId()));
277  EXPECT_CALL(mock, GetState()).WillRepeatedly(Return(State::Collecting));
278  EXPECT_CALL(mock, Start());
279  });
280 
281  // Test[
282  m_io_ctx.poll();
283 }
284 
285 TEST_F(TestScheduler, PollCompletesDaq) {
286  // Setup
287  m_queue = {"TEST.ID"};
288  PostSetup();
289 
290  auto daq_ws = std::make_unique<MockDaqWorkspace>();
291 
292  m_status.SetState(State::Collecting);
293  EXPECT_TRUE(m_scheduler->IsQueued("TEST.ID"));
294 
295  // We expect Poll() should load from workspace
296  daq_ws = std::make_unique<MockDaqWorkspace>();
297 
298  EXPECT_CALL(m_ws_mock, LoadDaq("TEST.ID")).WillOnce(Return(ByMove(std::move(daq_ws))));
299  EXPECT_CALL(m_ws_mock, ArchiveDaq("TEST.ID"));
300  EXPECT_CALL(m_ws_mock, StoreQueue(std::vector<std::string>()));
301 
302  m_daq_controller_factory.SetHook([&](MockDaqController& mock) {
303  EXPECT_CALL(mock, GetStatus()).WillOnce(ReturnRef(m_status));
304  EXPECT_CALL(mock, GetId()).WillRepeatedly(ReturnRef(m_status.GetId()));
305  EXPECT_CALL(mock, GetState())
306  .WillRepeatedly(Invoke(&m_status, &ObservableStatus::GetState));
307 
308  // When Start() is invoked on DaqController we set the state to Completed
309  // so that it will be recognized as completed and archived (Scheduler should monitor
310  // DaqControllers via signals).
311  EXPECT_CALL(mock, Start()).WillOnce(Invoke([&]() { m_status.SetState(State::Completed); }));
312  });
313 
314  // Test
315  // First poll initiates all operations.
316  m_scheduler->Start();
317  m_io_ctx.poll();
318 }
319 
320 TEST_F(TestScheduler, AbortDaqSucceedsIfNotActive) {
321  // Setup
322  m_queue = {"TEST.ID"};
323 
324  auto daq_ws = std::make_unique<MockDaqWorkspace>();
325  Status status = {};
326  status.id = "TEST.ID";
327  status.file_id = "TEST.FILEID";
328  status.state = State::Scheduled;
329  status.error = false;
330 
331  EXPECT_CALL(*daq_ws, LoadStatus()).WillOnce(Return(status));
332  EXPECT_CALL(m_ws_mock, LoadDaq("TEST.ID")).WillOnce(Return(ByMove(std::move(daq_ws))));
333  EXPECT_CALL(m_ws_mock, StoreQueue(std::vector<std::string>{}));
334  EXPECT_CALL(m_ws_mock, RemoveDaq("TEST.ID"));
335 
336  PostSetup();
337 
338  // Test
339  ASSERT_TRUE(m_scheduler->IsQueued(status.id));
340  m_scheduler->AbortDaq(status.id);
341  EXPECT_FALSE(m_scheduler->IsQueued(status.id));
342 }
343 
344 TEST_F(TestScheduler, AbortDaqSucceedsIfActive) {
345  // Setup
346  m_queue = {"TEST.ID"};
347  m_status.SetState(State::Scheduled);
348  auto daq_ws_init = std::make_unique<MockDaqWorkspace>();
349  auto daq_ws_abort = std::make_unique<MockDaqWorkspace>();
350 
351  EXPECT_CALL(*daq_ws_init, LoadStatus()).WillRepeatedly(Return(m_status.GetStatus()));
352  EXPECT_CALL(m_ws_mock, LoadDaq("TEST.ID")).WillOnce(Return(ByMove(std::move(daq_ws_init))));
353 
354  m_daq_controller_factory.SetHook([&](MockDaqController& mock) {
355  EXPECT_CALL(mock, GetStatus()).WillOnce(ReturnRef(m_status));
356  EXPECT_CALL(mock, GetId()).WillRepeatedly(ReturnRef(m_status.GetId()));
357  EXPECT_CALL(mock, GetState()).WillRepeatedly(Return(State::Collecting));
358  EXPECT_CALL(mock, Start());
359  });
360 
361  PostSetup();
362 
363  // Poll to activate DAQ
364  m_scheduler->Start();
365  m_io_ctx.poll();
366 
367  EXPECT_CALL(*daq_ws_abort, LoadStatus()).WillRepeatedly(Return(m_status.GetStatus()));
368  EXPECT_CALL(m_ws_mock, LoadDaq("TEST.ID")).WillOnce(Return(ByMove(std::move(daq_ws_abort))));
369  EXPECT_CALL(m_ws_mock, RemoveDaq("TEST.ID"));
370  EXPECT_CALL(m_ws_mock, StoreQueue(std::vector<std::string>{}));
371 
372  // Test
373  ASSERT_TRUE(m_scheduler->IsQueued("TEST.ID"));
374  m_scheduler->AbortDaq("TEST.ID");
375  EXPECT_FALSE(m_scheduler->IsQueued("TEST.ID"));
376 }
377 
378 TEST_F(TestScheduler, AbortDaqFailsIfDaqDoesNotExit) {
379  // Setup
380  PostSetup();
381 
382  // Test
383  EXPECT_THROW(m_scheduler->AbortDaq("UNKOWN"), std::runtime_error);
384 }
385 
386 TEST_F(TestScheduler, GetStatusSucceeds) {
387  // Setup
388  m_queue = {"TEST.ID"};
389  auto daq_ws = std::make_unique<MockDaqWorkspace>();
390 
391  EXPECT_CALL(*daq_ws, LoadStatus()).WillRepeatedly(Return(m_status.GetStatus()));
392  EXPECT_CALL(m_ws_mock, LoadDaq("TEST.ID")).WillOnce(Return(ByMove(std::move(daq_ws))));
393 
394  PostSetup();
395 
396  // Test
397  ASSERT_TRUE(m_scheduler->IsQueued("TEST.ID"));
398  auto status = m_scheduler->GetDaqStatus("TEST.ID");
399  EXPECT_EQ(status.id, "TEST.ID");
400  EXPECT_EQ(status.state, m_status.GetState());
401 }
402 
403 TEST_F(TestScheduler, GetStatusSucceedsForArchivedDaq) {
404  // Setup
405  m_queue = {"TEST.ID"};
406  Status archived_status("TEST.ID.ARCHIVED", "TEST.ID.ARCHIVED");
407 
408  EXPECT_CALL(m_ws_mock, LoadArchivedStatus("TEST.ID.ARCHIVED"))
409  .WillOnce(Return(archived_status));
410 
411  PostSetup();
412 
413  // Test
414  ASSERT_FALSE(m_scheduler->IsQueued("TEST.ID.ARCHIVED"));
415  auto status = m_scheduler->GetDaqStatus("TEST.ID.ARCHIVED");
416  EXPECT_EQ(status, archived_status);
417 }
418 
419 } // namespace daq::dpm
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:210
State GetState() const noexcept
Definition: status.cpp:270
daq::dpm::MockWorkspace m_ws_mock
boost::asio::io_context m_io_ctx
std::vector< std::string > m_queue
SchedulerOptions m_options
std::unique_ptr< SchedulerImpl > m_scheduler
ObservableStatus m_status
FakeDaqControllerFactory m_daq_controller_factory
void TearDown() override
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
Mocks for daq::dpm::Scheduler and daq::dpm::DaqScheduler.
TEST_F(TestDaqController, ScheduledTransitionsToCollecting)
Limited resources.
Definition: scheduler.hpp:171
Options controlling scheduler operations.
Definition: scheduler.hpp:133
@ Completed
Completed DAQ.
@ Scheduled
daq is acknowledged by dpm and is scheduled for merging (i.e.
@ Collecting
Input files are being collected.
daq::dpm::Scheduler and related class declarations.
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:153
State state
Definition: status.hpp:176
std::string id
Definition: status.hpp:174
std::string file_id
Definition: status.hpp:175
bool error
Definition: status.hpp:177
std::unique_ptr< DaqController > operator()(std::unique_ptr< DaqWorkspace >, Resources &)
std::vector< DaqController * > daq_controllers
std::function< void(MockDaqController &)> Hook
EXPECT_EQ(meta.rr_uri, "zpb.rr://meta")
ASSERT_TRUE(std::holds_alternative< OlasReceiver >(spec.receivers[0]))