ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
testScheduler.cpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup daq_common_libdpm
4 * @copyright (c) Copyright ESO 2022
5 * All Rights Reserved
6 * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7 *
8 * @brief Unit tests for daq::dpm::SchedulerImpl
9 */
10
11#include <daq/dpm/scheduler.hpp>
12
13#include <utility>
14
15#include <daq/json.hpp>
16
18#include "mock/mockWorkspace.hpp"
19
20#include <gmock/gmock.h>
21#include <gtest/gtest.h>
22
23namespace daq::dpm {
24
26 using Hook = std::function<void(MockDaqController&)>;
27 std::unique_ptr<DaqController> operator()(std::unique_ptr<DaqWorkspace>, Resources&) {
28 auto daq = std::make_unique<MockDaqController>();
29 if (hook) {
30 hook(*daq);
31 }
32 daq_controllers.push_back(daq.get());
33 return daq;
34 }
35 void SetHook(Hook f) {
36 hook = std::move(f);
37 }
38 std::vector<DaqController*> daq_controllers;
40};
41
42class TestSchedulerBase : public ::testing::Test {
43public:
45 }
46
47 void SetUp() override {
48 }
49 void TearDown() override {
50 // Execute possibly pending completions handlers.
51 EXPECT_NO_THROW(m_io_ctx.poll());
52 }
53
54 auto SerializeStatus(Status const& status) const -> std::string {
55 nlohmann::json j;
56 j = status;
57 return j.dump(2);
58 }
59
60protected:
61 boost::asio::io_context m_io_ctx;
63 std::vector<std::string> m_queue;
64
66};
67
69
71public:
72 TestScheduler() : m_status("TEST.ID", "TEST.FILEID") {
73 }
74 void SetUp() override {
76 m_spec_str = R"(
77 {
78 "id": "TEST.ID",
79 "target": {
80 "fileId": "TEST.FILEID",
81 "source": {
82 "sourceName": "dcs",
83 "location": "dcs-host:/path/to/somefile.fits",
84 "path": "dcs/somefile.fits"
85 }
86 },
87 "sources": [
88 {
89 "type": "fitsFile",
90 "sourceName": "fcs",
91 "location": "fcs-host:/path/to/somefile.fits",
92 "path": "fcs/somefile.fits"
93 }
94 ],
95 "receivers": [
96 {
97 "type": "olasReceiver",
98 "host": "1.2.3.4",
99 "path": "/remote/olas/"
100 },
101 {
102 "type": "olasReceiver",
103 "path": "/local/olas/"
104 }
105 ]
106 }
107 )";
108 }
109 void PostSetup() {
110 EXPECT_CALL(m_ws_mock, LoadQueue()).WillOnce(::testing::Return(m_queue));
111
112 m_scheduler = std::make_unique<SchedulerImpl>(
113 m_executor, m_ws_mock, std::reference_wrapper(m_daq_controller_factory), m_options);
114 }
115 void TearDown() override {
116 m_scheduler.reset();
118 }
119
120protected:
122 std::unique_ptr<SchedulerImpl> m_scheduler;
123 std::string m_spec_str;
126};
127
128using namespace ::testing;
129
130TEST_F(TestSchedulerInit, Construction) {
131 SchedulerOptions m_options;
132 EXPECT_CALL(m_ws_mock, LoadQueue()).WillOnce(Return(m_queue));
133 EXPECT_NO_THROW(SchedulerImpl(m_executor, m_ws_mock, FakeDaqControllerFactory(), m_options));
134}
135
136TEST_F(TestScheduler, QueueDaqIsSuccessful) {
137 // Setup
138 auto daq_ws = std::make_unique<MockDaqWorkspace>();
139 Status status = {};
140 status.id = "TEST.ID";
141 status.file_id = "TEST.FILEID";
142 status.state = State::Scheduled;
143
144 EXPECT_CALL(*daq_ws, StoreStatus(status));
145 EXPECT_CALL(*daq_ws, StoreSpecification(m_spec_str));
146 EXPECT_CALL(m_ws_mock, InitializeDaq("TEST.ID")).WillOnce(Return(ByMove(std::move(daq_ws))));
147 EXPECT_CALL(m_ws_mock, StoreQueue(std::vector<std::string>{"TEST.ID"}));
148
149 PostSetup();
150
151 // Test
152 m_scheduler->QueueDaq(m_spec_str, SerializeStatus(status));
153 EXPECT_TRUE(m_scheduler->IsQueued("TEST.ID"));
154
155 // Queue same DAQ should fail
156 EXPECT_THROW(m_scheduler->QueueDaq(m_spec_str, SerializeStatus(status)), std::invalid_argument);
157 EXPECT_TRUE(m_scheduler->IsQueued("TEST.ID"));
158}
159
160TEST_F(TestScheduler, QueueDaqRollbackIfStoreQueueFails) {
161 // Setup
162 auto daq_ws = std::make_unique<MockDaqWorkspace>();
163 Status status = {};
164 status.id = "TEST.ID";
165 status.file_id = "TEST.FILEID";
166 status.state = State::Scheduled;
167
168 Sequence init, rollback;
169
170 EXPECT_CALL(m_ws_mock, GetPath()).Times(AnyNumber()).WillRepeatedly(Return("/tmp/workspace"));
171
172 EXPECT_CALL(*daq_ws, StoreStatus(status));
173 EXPECT_CALL(*daq_ws, StoreSpecification(m_spec_str));
174 EXPECT_CALL(m_ws_mock, InitializeDaq("TEST.ID"))
175 .InSequence(init, rollback)
176 .WillOnce(Return(ByMove(std::move(daq_ws))));
177 EXPECT_CALL(m_ws_mock, StoreQueue(std::vector<std::string>{"TEST.ID"}))
178 .InSequence(init, rollback)
179 .WillRepeatedly(Throw(std::runtime_error("FAILED")));
180
181 // Rollback
182 EXPECT_CALL(m_ws_mock, RemoveDaq("TEST.ID")).InSequence(rollback);
183
184 PostSetup();
185
186 // Test
187 EXPECT_THROW(m_scheduler->QueueDaq(m_spec_str, SerializeStatus(status)), std::exception);
188 EXPECT_FALSE(m_scheduler->IsQueued("TEST.ID"));
189}
190
191TEST_F(TestScheduler, QueueDaqRollbackIfStoreSpecificationFails) {
192 // Setup
193 auto daq_ws = std::make_unique<MockDaqWorkspace>();
194 Status status = {};
195 status.id = "TEST.ID";
196 status.file_id = "TEST.FILEID";
197 status.state = State::Scheduled;
198
199 Sequence init, rollback;
200
201 EXPECT_CALL(m_ws_mock, GetPath()).Times(AnyNumber()).WillRepeatedly(Return("/tmp/workspace"));
202
203 EXPECT_CALL(*daq_ws, StoreStatus(status));
204 EXPECT_CALL(*daq_ws, StoreSpecification(m_spec_str))
205 .WillOnce(Throw(std::runtime_error("ERROR")));
206
207 EXPECT_CALL(m_ws_mock, InitializeDaq("TEST.ID"))
208 .InSequence(init, rollback)
209 .WillOnce(Return(ByMove(std::move(daq_ws))));
210
211 // Rollback
212 EXPECT_CALL(m_ws_mock, RemoveDaq("TEST.ID")).InSequence(rollback);
213
214 PostSetup();
215
216 // Test
217 EXPECT_THROW(m_scheduler->QueueDaq(m_spec_str, SerializeStatus(status)), std::exception);
218 EXPECT_FALSE(m_scheduler->IsQueued("TEST.ID"));
219}
220
221TEST_F(TestScheduler, QueueDaqRollbackIfInitializeDaqWorkspaceFails) {
222 // Setup
223 Status status = {};
224 status.id = "TEST.ID";
225 status.file_id = "TEST.FILEID";
226 status.state = State::Scheduled;
227 Sequence init, rollback;
228 EXPECT_CALL(m_ws_mock, GetPath()).Times(AnyNumber()).WillRepeatedly(Return("/tmp/workspace"));
229
230 EXPECT_CALL(m_ws_mock, InitializeDaq("TEST.ID"))
231 .InSequence(init, rollback)
232 .WillRepeatedly(Throw(std::runtime_error("ERROR")));
233
234 // Rollback
235 EXPECT_CALL(m_ws_mock, RemoveDaq("TEST.ID")).InSequence(rollback);
236
237 PostSetup();
238
239 // Test
240 EXPECT_THROW(m_scheduler->QueueDaq(m_spec_str, SerializeStatus(status)), std::exception);
241 EXPECT_FALSE(m_scheduler->IsQueued("TEST.ID"));
242}
243
244TEST_F(TestScheduler, QueueDaqFailsIfParseDpFails) {
245 // Setup
246 Status status = {};
247 status.id = "TEST.ID";
248 status.file_id = "TEST.FILEID";
249 status.state = State::Scheduled;
250 Sequence init, rollback;
251 EXPECT_CALL(m_ws_mock, GetPath()).Times(AnyNumber()).WillRepeatedly(Return("/tmp/workspace"));
252
253 EXPECT_CALL(m_ws_mock, InitializeDaq("TEST.ID")).Times(0);
254
255 PostSetup();
256
257 // Test
258 EXPECT_THROW(m_scheduler->QueueDaq("not a specification", SerializeStatus(status)),
259 std::invalid_argument);
260 EXPECT_FALSE(m_scheduler->IsQueued("TEST.ID"));
261}
262
263TEST_F(TestScheduler, QueueDaqFailsIfStatusParsingFails) {
264 // Setup
265 Sequence init, rollback;
266 EXPECT_CALL(m_ws_mock, GetPath()).Times(AnyNumber()).WillRepeatedly(Return("/tmp/workspace"));
267
268 EXPECT_CALL(m_ws_mock, InitializeDaq("TEST.ID")).Times(0);
269
270 PostSetup();
271
272 // Test
273 EXPECT_THROW(m_scheduler->QueueDaq(m_spec_str, "not a status"), std::invalid_argument);
274 EXPECT_FALSE(m_scheduler->IsQueued("TEST.ID"));
275}
276
277TEST_F(TestScheduler, PollActivatesDaqWhichStartsTransfer) {
278 // Setup
279 PostSetup();
280 m_scheduler->Start();
281
282 auto daq_ws = std::make_unique<MockDaqWorkspace>();
283 Status status = {};
284 status.id = "TEST.ID";
285 status.file_id = "TEST.FILEID";
286 status.state = State::Scheduled;
287
288 EXPECT_CALL(*daq_ws, StoreStatus(status));
289 EXPECT_CALL(*daq_ws, StoreSpecification(m_spec_str));
290 EXPECT_CALL(m_ws_mock, InitializeDaq("TEST.ID")).WillOnce(Return(ByMove(std::move(daq_ws))));
291 EXPECT_CALL(m_ws_mock, StoreQueue(std::vector<std::string>{"TEST.ID"}));
292
293 m_scheduler->QueueDaq(m_spec_str, SerializeStatus(status));
294 EXPECT_TRUE(m_scheduler->IsQueued("TEST.ID"));
295
296 // Queue same DAQ should fail
297 EXPECT_THROW(m_scheduler->QueueDaq(m_spec_str, SerializeStatus(status)), std::invalid_argument);
298 EXPECT_TRUE(m_scheduler->IsQueued("TEST.ID"));
299
300 // We expect Poll() should load from workspace and create DaqController from factory
301 // and then start that.
302 daq_ws = std::make_unique<MockDaqWorkspace>();
303 EXPECT_CALL(m_ws_mock, LoadDaq("TEST.ID")).WillOnce(Return(ByMove(std::move(daq_ws))));
304 m_daq_controller_factory.SetHook([&](MockDaqController& mock) {
305 EXPECT_CALL(mock, GetStatus()).WillOnce(ReturnRef(m_status));
306 EXPECT_CALL(mock, GetId()).WillRepeatedly(ReturnRef(m_status.GetId()));
307 EXPECT_CALL(mock, GetState()).WillRepeatedly(Return(State::Collecting));
308 EXPECT_CALL(mock, Start());
309 });
310
311 // Test[
312 m_io_ctx.poll();
313}
314
315TEST_F(TestScheduler, PollCompletesDaq) {
316 // Setup
317 m_queue = {"TEST.ID"};
318 PostSetup();
319
320 auto daq_ws = std::make_unique<MockDaqWorkspace>();
321
322 m_status.SetState(State::Collecting);
323 EXPECT_TRUE(m_scheduler->IsQueued("TEST.ID"));
324
325 // We expect Poll() should load from workspace
326 daq_ws = std::make_unique<MockDaqWorkspace>();
327
328 EXPECT_CALL(m_ws_mock, LoadDaq("TEST.ID")).WillOnce(Return(ByMove(std::move(daq_ws))));
329 EXPECT_CALL(m_ws_mock, ArchiveDaq("TEST.ID"));
330 EXPECT_CALL(m_ws_mock, StoreQueue(std::vector<std::string>()));
331
332 m_daq_controller_factory.SetHook([&](MockDaqController& mock) {
333 EXPECT_CALL(mock, GetStatus()).WillOnce(ReturnRef(m_status));
334 EXPECT_CALL(mock, GetId()).WillRepeatedly(ReturnRef(m_status.GetId()));
335 EXPECT_CALL(mock, GetState())
336 .WillRepeatedly(Invoke(&m_status, &ObservableStatus::GetState));
337
338 // When Start() is invoked on DaqController we set the state to Completed
339 // so that it will be recognized as completed and archived (Scheduler should monitor
340 // DaqControllers via signals).
341 EXPECT_CALL(mock, Start()).WillOnce(Invoke([&]() { m_status.SetState(State::Completed); }));
342 });
343
344 // Test
345 // First poll initiates all operations.
346 m_scheduler->Start();
347 m_io_ctx.poll();
348}
349
350TEST_F(TestScheduler, AbortDaqSucceedsIfNotActive) {
351 // Setup
352 m_queue = {"TEST.ID"};
353
354 auto daq_ws = std::make_unique<MockDaqWorkspace>();
355 Status status = {};
356 status.id = "TEST.ID";
357 status.file_id = "TEST.FILEID";
358 status.state = State::Scheduled;
359
360 EXPECT_CALL(*daq_ws, LoadStatus()).WillOnce(Return(status));
361 EXPECT_CALL(m_ws_mock, LoadDaq("TEST.ID")).WillOnce(Return(ByMove(std::move(daq_ws))));
362 EXPECT_CALL(m_ws_mock, StoreQueue(std::vector<std::string>{}));
363 EXPECT_CALL(m_ws_mock, RemoveDaq("TEST.ID"));
364
365 PostSetup();
366
367 // Test
368 ASSERT_TRUE(m_scheduler->IsQueued(status.id));
369 m_scheduler->AbortDaq(status.id);
370 EXPECT_FALSE(m_scheduler->IsQueued(status.id));
371}
372
373TEST_F(TestScheduler, AbortDaqSucceedsIfActive) {
374 // Setup
375 m_queue = {"TEST.ID"};
376 m_status.SetState(State::Scheduled);
377 auto daq_ws_init = std::make_unique<MockDaqWorkspace>();
378 auto daq_ws_abort = std::make_unique<MockDaqWorkspace>();
379
380 EXPECT_CALL(*daq_ws_init, LoadStatus()).WillRepeatedly(Return(m_status.GetStatus()));
381 EXPECT_CALL(m_ws_mock, LoadDaq("TEST.ID")).WillOnce(Return(ByMove(std::move(daq_ws_init))));
382
383 m_daq_controller_factory.SetHook([&](MockDaqController& mock) {
384 EXPECT_CALL(mock, GetStatus()).WillOnce(ReturnRef(m_status));
385 EXPECT_CALL(mock, GetId()).WillRepeatedly(ReturnRef(m_status.GetId()));
386 EXPECT_CALL(mock, GetState()).WillRepeatedly(Return(State::Collecting));
387 EXPECT_CALL(mock, Start());
388 });
389
390 PostSetup();
391
392 // Poll to activate DAQ
393 m_scheduler->Start();
394 m_io_ctx.poll();
395
396 EXPECT_CALL(*daq_ws_abort, LoadStatus()).WillRepeatedly(Return(m_status.GetStatus()));
397 EXPECT_CALL(m_ws_mock, LoadDaq("TEST.ID")).WillOnce(Return(ByMove(std::move(daq_ws_abort))));
398 EXPECT_CALL(m_ws_mock, RemoveDaq("TEST.ID"));
399 EXPECT_CALL(m_ws_mock, StoreQueue(std::vector<std::string>{}));
400
401 // Test
402 ASSERT_TRUE(m_scheduler->IsQueued("TEST.ID"));
403 m_scheduler->AbortDaq("TEST.ID");
404 EXPECT_FALSE(m_scheduler->IsQueued("TEST.ID"));
405}
406
407TEST_F(TestScheduler, AbortDaqFailsIfDaqDoesNotExit) {
408 // Setup
409 PostSetup();
410
411 // Test
412 EXPECT_THROW(m_scheduler->AbortDaq("UNKOWN"), std::runtime_error);
413}
414
415TEST_F(TestScheduler, GetStatusSucceeds) {
416 // Setup
417 m_queue = {"TEST.ID"};
418 auto daq_ws = std::make_unique<MockDaqWorkspace>();
419
420 EXPECT_CALL(*daq_ws, LoadStatus()).WillRepeatedly(Return(m_status.GetStatus()));
421 EXPECT_CALL(m_ws_mock, LoadDaq("TEST.ID")).WillOnce(Return(ByMove(std::move(daq_ws))));
422
423 PostSetup();
424
425 // Test
426 ASSERT_TRUE(m_scheduler->IsQueued("TEST.ID"));
427 auto status = m_scheduler->GetDaqStatus("TEST.ID");
428 EXPECT_EQ(status.id, "TEST.ID");
429 EXPECT_EQ(status.state, m_status.GetState());
430}
431
432TEST_F(TestScheduler, GetStatusSucceedsForArchivedDaq) {
433 // Setup
434 m_queue = {"TEST.ID"};
435 Status archived_status("TEST.ID.ARCHIVED", "TEST.ID.ARCHIVED");
436
437 EXPECT_CALL(m_ws_mock, LoadArchivedStatus("TEST.ID.ARCHIVED"))
438 .WillOnce(Return(archived_status));
439
440 PostSetup();
441
442 // Test
443 ASSERT_FALSE(m_scheduler->IsQueued("TEST.ID.ARCHIVED"));
444 auto status = m_scheduler->GetDaqStatus("TEST.ID.ARCHIVED");
445 EXPECT_EQ(status, archived_status);
446}
447
448} // namespace daq::dpm
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:224
State GetState() const noexcept
Definition: status.cpp:297
daq::dpm::MockWorkspace m_ws_mock
boost::asio::io_context m_io_ctx
auto SerializeStatus(Status const &status) const -> std::string
std::vector< std::string > m_queue
SchedulerOptions m_options
std::unique_ptr< SchedulerImpl > m_scheduler
ObservableStatus m_status
FakeDaqControllerFactory m_daq_controller_factory
void TearDown() override
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
Declares JSON support for serialization.
Mocks for daq::dpm::Scheduler and daq::dpm::DaqScheduler.
TEST_F(TestDaqController, ScheduledTransitionsToCollecting)
Limited resources.
Definition: scheduler.hpp:172
Options controlling scheduler operations.
Definition: scheduler.hpp:134
@ Completed
Completed DAQ.
@ Scheduled
daq is acknowledged by dpm and is scheduled for merging (i.e.
@ Collecting
Input files are being collected.
daq::dpm::Scheduler and related class declarations.
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:164
State state
Definition: status.hpp:186
std::string id
Definition: status.hpp:184
std::string file_id
Definition: status.hpp:185
std::unique_ptr< DaqController > operator()(std::unique_ptr< DaqWorkspace >, Resources &)
std::vector< DaqController * > daq_controllers
std::function< void(MockDaqController &)> Hook
EXPECT_EQ(meta.rr_uri, "zpb.rr://meta")
ASSERT_TRUE(std::holds_alternative< OlasReceiver >(spec.receivers[0]))