ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
testDaqController.cpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup daq_common_libdpm
4 * @copyright (c) Copyright ESO 2022
5 * All Rights Reserved
6 * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7 *
8 * @brief Unit tests for daq::dpm::DaqControllerImpl
9 */
10#include <daq/dpm/scheduler.hpp>
11
12#include <fmt/format.h>
13#include <log4cplus/loggingmacros.h>
14#include <utility>
15
18#include "mock/mockWorkspace.hpp"
19
20#include <gmock/gmock.h>
21#include <gtest/gtest.h>
22
23namespace daq::dpm {
24
25using namespace ::testing;
26
28 using Hook = std::function<void(MockRsyncAsyncProcess&)>;
29 std::unique_ptr<RsyncAsyncProcessIf> operator()(boost::asio::io_context&,
30 std::string, // NOLINT
31 std::string, // NOLINT
32 RsyncOptions const&,
34 auto rsync = std::make_unique<MockRsyncAsyncProcess>();
35 if (hook) {
36 hook(*rsync);
37 } else {
39 }
40 procs.push_back(rsync.get());
41 return rsync;
42 }
44 EXPECT_CALL(mock, GetArguments()).WillRepeatedly(ReturnRef(rsync_dummy_args));
45 EXPECT_CALL(mock, GetPid()).WillRepeatedly(Return(1234));
46 }
47 void SetHook(Hook f) {
48 hook = std::move(f);
49 }
50 std::vector<MockRsyncAsyncProcess*> procs;
52 std::vector<std::string> rsync_dummy_args;
53};
54
56 using Hook = std::function<void(MockAsyncProcess&)>;
57 // NOLINTNEXTLINE
58 std::unique_ptr<AsyncProcessIf> operator()(boost::asio::io_context&, std::vector<std::string>) {
59 auto rsync = std::make_unique<MockRsyncAsyncProcess>();
60 if (hook) {
61 hook(*rsync);
62 }
63 procs.push_back(rsync.get());
64 return rsync;
65 }
66 void SetHook(Hook f) {
67 hook = std::move(f);
68 }
69 std::vector<MockAsyncProcess*> procs;
71};
72
73class TestDaqControllerBase : public ::testing::Test {
74public:
76 }
77
78 void SetUp() override {
79 }
80
81 void TearDown() override {
82 // Execute possibly pending completions handlers.
83 EXPECT_NO_THROW(m_io_ctx.poll());
84 }
85
86protected:
88 boost::asio::io_context m_io_ctx;
91};
92
94
96public:
98 }
99 void SetUp() override {
101 // Set default status. This can be modified by each test-case before calling PostSetup()
102 m_initial_status.id = "TEST.ID";
103 m_initial_status.file_id = "TEST.2023-08-22T14:40:40.245";
105 m_initial_status.timestamp = Status::TimePoint::clock::now();
107 m_dpspec.target.file_id = "TEST.ID";
108 m_dpspec.target.source = json::FitsFileSource{"source1", "host:target.fits", {}};
109 m_dpspec.sources.push_back(json::FitsFileSource{"source2", "host:source.fits", {}});
110 m_dpspec.receivers.push_back(json::OlasReceiver{/*.host*/ "", /* .path */ "/local/olas/"});
111 m_dpspec.receivers.push_back(
112 json::OlasReceiver{/*.host*/ "host", /* .path */ "/remote/olas/"});
113 }
114
115 /**
116 * Specifically sets up expectations and other things based on current state.
117 * It considers:
118 * - m_dpspec for sources and populates m_resolver based on that.
119 * - m_initial_status controls which state to start from.
120 */
121 void PostSetUp() {
122 m_arcfile_path = fmt::format("/local/olas/{}.fits", m_initial_status.file_id);
123
124 // Set up resolver so that it matches m_dpspec sources.
125 std::filesystem::path sources_root = "sources";
126 if (m_dpspec.target.source.has_value()) {
128 {m_dpspec.target.source->source_name, m_dpspec.target.source->location},
129 sources_root / json::ParseSourceLocation(m_dpspec.target.source->location).path);
130 }
131 for (auto const& s : m_dpspec.sources) {
132 if (std::holds_alternative<json::FitsFileSource>(s)) {
133 auto const& source = std::get<json::FitsFileSource>(s);
134 m_resolver.Add({source.source_name, source.location},
135 sources_root / json::ParseSourceLocation(source.location).path);
136 }
137 }
138 auto ws = std::make_unique<daq::dpm::MockDaqWorkspace>();
139 m_ws_mock_ptr = ws.get();
140 EXPECT_CALL(*ws, LoadStatus()).WillRepeatedly(Return(m_initial_status));
141 EXPECT_CALL(*ws, LoadSourceLookup()).WillRepeatedly(Return(m_resolver.GetMapping()));
142 EXPECT_CALL(*ws, LoadSpecification()).WillRepeatedly(Return(m_dpspec));
143 EXPECT_CALL(*ws, GetSourcesPath()).WillRepeatedly(Return(std::filesystem::path("sources")));
144 EXPECT_CALL(*ws, GetResultPath()).WillRepeatedly(Return(std::filesystem::path("")));
145 EXPECT_CALL(*ws, GetSourceLookupPath())
146 .WillRepeatedly(Return(std::filesystem::path("sources.json")));
147 EXPECT_CALL(*ws, GetPath())
148 .Times(AnyNumber())
149 .WillRepeatedly(Return(std::filesystem::path("")));
150 EXPECT_CALL(*ws, GetLogsPath())
151 .Times(AnyNumber())
152 .WillRepeatedly(Return(std::filesystem::path("logs")));
153 m_daq = std::make_unique<DaqControllerImpl>(m_executor,
154 std::move(ws),
156 std::reference_wrapper(m_rsync_factory),
157 std::reference_wrapper(m_proc_factory),
158 m_options);
159 m_daq->Start();
160 }
161
162 void TearDown() override {
163 m_daq.reset();
165 }
166
167 /**
168 * Poll one handler at a time until predicate has been satisifed or io_context runs out of work.
169 */
170 template <class Pred>
171 void PollUntil(Pred&& p) {
172 while (true) {
173 m_io_ctx.restart();
174 try {
175 if (m_io_ctx.poll_one() == 0 || p()) {
176 break;
177 }
178 } catch (std::exception const& ex) {
179 LOG4CPLUS_ERROR("test", "Exception bubbled up through ASIO: " << ex.what());
180 throw;
181 }
182 }
183 }
184
185protected:
189 // Not-owned pointer to a DaqWorkspace
191 std::unique_ptr<DaqControllerImpl> m_daq;
194 // Expected path with name of the archived file
195 std::filesystem::path m_arcfile_path;
196};
197
198using namespace ::testing;
199
200TEST_F(TestDaqController, ScheduledTransitionsToCollecting) {
201 // Additional setup
202 PostSetUp();
203 EXPECT_CALL(*m_ws_mock_ptr, StoreSourceLookup(_));
204 EXPECT_CALL(*m_ws_mock_ptr, StoreStatus(Field(&Status::state, State::Collecting)));
205
206 // Run
207 // Only run handlers until we reach State::Collecting
208 PollUntil([&] { return m_daq->GetState() == State::Collecting; });
209 EXPECT_EQ(m_daq->GetState(), State::Collecting);
210}
211
212TEST_F(TestDaqController, CollectingWithoutFilesTransitionsToMerging) {
213 // Additional setup
214 m_dpspec.target.source = std::nullopt;
215 m_dpspec.sources.clear();
216 m_initial_status.state = State::Collecting;
217 PostSetUp();
218
219 EXPECT_CALL(*m_ws_mock_ptr, StoreStatus(Field(&Status::state, State::Merging)));
220
221 // Run
222 // Only run handlers until we reach State::Merging
223 PollUntil([&] { return m_daq->GetState() == State::Merging; });
224 EXPECT_EQ(m_daq->GetState(), State::Merging);
225}
226
227TEST_F(TestDaqController, CollectingStartsTransfersAndWhenCompletedTransitionsToMerging) {
228 // Additional setup
229 ASSERT_TRUE(m_dpspec.target.source);
230 ASSERT_TRUE(m_dpspec.sources.size() == 1);
231 m_initial_status.state = State::Collecting;
232 m_resources.net_receive.SetLimit(0);
233 PostSetUp();
234
235 EXPECT_EQ(m_resolver.GetMapping().size(), 2u)
236 << "Setup should have added target and one extra source";
237 EXPECT_CALL(
238 *m_ws_mock_ptr,
239 Exists(std::filesystem::path("sources/target.fits"), std::filesystem::file_type::regular))
240 .Times(AnyNumber())
241 .WillRepeatedly(Return(false));
242 EXPECT_CALL(
243 *m_ws_mock_ptr,
244 Exists(std::filesystem::path("sources/source.fits"), std::filesystem::file_type::regular))
245 .Times(AnyNumber())
246 .WillRepeatedly(Return(false));
247
248 // Run
249 m_io_ctx.poll();
250 EXPECT_EQ(m_daq->GetState(), State::Collecting);
251 // After promise is completed the files will exist
252 EXPECT_CALL(
253 *m_ws_mock_ptr,
254 Exists(std::filesystem::path("sources/source.fits"), std::filesystem::file_type::regular))
255 .WillOnce(Return(true));
256 EXPECT_CALL(
257 *m_ws_mock_ptr,
258 Exists(std::filesystem::path("sources/target.fits"), std::filesystem::file_type::regular))
259 .WillOnce(Return(true));
260 EXPECT_CALL(*m_ws_mock_ptr, StoreStatus(Field(&Status::state, State::Merging)));
261
262 // Complete async file operations by setting promises
263 for (auto& proc : m_rsync_factory.procs) {
264 proc->promise.set_value(0);
265 }
266 PollUntil([&] { return m_daq->GetState() == State::Merging; });
267
268 // Execute pending handlers that should have been registered after promise was set.
269 EXPECT_EQ(m_daq->GetState(), State::Merging);
270}
271
272TEST_F(TestDaqController, CollectingFailsDoesNotTransition) {
273 // Additional setup
274 ASSERT_TRUE(m_dpspec.target.source);
275 ASSERT_TRUE(m_dpspec.sources.size() == 1);
276 m_initial_status.state = State::Collecting;
277 m_resources.net_receive.SetLimit(0);
278 PostSetUp();
279
280 EXPECT_EQ(m_resolver.GetMapping().size(), 2u)
281 << "Setup should have added target and one extra source";
282 EXPECT_CALL(
283 *m_ws_mock_ptr,
284 Exists(std::filesystem::path("sources/target.fits"), std::filesystem::file_type::regular))
285 .WillOnce(Return(false));
286 EXPECT_CALL(
287 *m_ws_mock_ptr,
288 Exists(std::filesystem::path("sources/source.fits"), std::filesystem::file_type::regular))
289 .WillOnce(Return(false));
290
291 // Test
292 // Initiates transfers
293 m_io_ctx.poll();
294
295 EXPECT_EQ(m_daq->GetState(), State::Collecting);
296 EXPECT_CALL(
297 *m_ws_mock_ptr,
298 StoreStatus(AllOf(Field(&Status::state, State::Collecting), ResultOf(HasError, true))));
299
300 // Run test that emulate a successful first file transfer and then subsequent failures.
301 // Complete async file operations by setting promises
302 int error = 0;
303 for (auto& proc : m_rsync_factory.procs) {
304 proc->promise.set_value(error++);
305 }
306 // Execute pending handlers that should have been registered after promise was set.
307 m_io_ctx.restart();
308 m_io_ctx.poll();
309
310 EXPECT_TRUE(m_daq->IsStopped());
311 EXPECT_EQ(m_daq->GetState(), State::Collecting);
312}
313
314/**
315 * [recovery]
316 * If transfer previously failed and then stopped this should be automatically recoverable if
317 * DAQ is started again and requested files are available on FS.
318 */
319TEST_F(TestDaqController, RecoverAutomaticallyFromCollectingIfFilesExist) {
320 // Additional setup
321 m_initial_status.state = State::Collecting;
322 // Emulate alert from previous failure (note that source mapping must match)
323 m_initial_status.alerts.push_back(
324 MakeAlert(alert::COLLECTING_RSYNC, "sources/source.fits", ""));
325 PostSetUp();
326
327 // Files exist!
328 EXPECT_CALL(*m_ws_mock_ptr, Exists(_, _)).WillRepeatedly(Return(true));
329 EXPECT_CALL(
330 *m_ws_mock_ptr,
331 StoreStatus(AllOf(Field(&Status::state, State::Merging), ResultOf(HasError, false))));
332
333 // Run
334 // Only run handlers until we reach State::Merging
335 PollUntil([&] { return m_daq->GetState() == State::Merging; });
336 EXPECT_EQ(m_daq->GetState(), State::Merging);
337}
338
339TEST_F(TestDaqController, MergingSuccessful) {
340 // Setup
341 m_initial_status.state = State::Merging;
342 PostSetUp();
343
344 EXPECT_CALL(*m_ws_mock_ptr, GetLogsPath()).WillRepeatedly(Return("logs"));
345 EXPECT_CALL(*m_ws_mock_ptr, GetSpecificationPath())
346 .WillRepeatedly(Return("specification.json"));
347 EXPECT_CALL(*m_ws_mock_ptr,
348 Exists(std::filesystem::path("TEST.ID.fits"), std::filesystem::file_type::regular))
349 .WillOnce(Return(false));
350
351 // Run
352 // First the process will be created
353 m_io_ctx.poll();
354
355 ASSERT_EQ(m_proc_factory.procs.size(), 1u);
356 EXPECT_CALL(*m_ws_mock_ptr,
357 Exists(std::filesystem::path("TEST.ID.fits"), std::filesystem::file_type::regular))
358 .WillRepeatedly(Return(true));
359
360 // expect alert to be set
361 EXPECT_CALL(*m_ws_mock_ptr,
362 StoreStatus(AllOf(
363 Field(&Status::alerts,
364 ElementsAre(Field(&Alert::description,
365 HasSubstr("Writing keywords required resizing")))),
367 ResultOf(HasError, true))));
368 EXPECT_CALL(
369 *m_ws_mock_ptr,
370 StoreStatus(AllOf(Field(&Status::state, State::Releasing), ResultOf(HasError, true))));
371 EXPECT_CALL(*m_ws_mock_ptr, MakeResultSymlink(std::filesystem::path("TEST.ID.fits")));
372
373 // Emit alert from merger
374 std::string message =
375 R"({"content":{"id":"primary_hdu_resize","message":"Writing keywords required resizing of primary HDU: Add space for at least 73 keywords to avoid resize"},"timestamp":1650964356522093759,"type":"alert"})";
376 m_proc_factory.procs[0]->stdout(8, message);
377
378 // Invalid messages should be ignored
379 m_proc_factory.procs[0]->stdout(8, "This is invalid JSON");
380
381 // Then we complete
382 m_proc_factory.procs[0]->promise.set_value(0);
383
384 // Only run handlers until we reach State::Merging
385 PollUntil([&] { return m_daq->GetState() == State::Releasing; });
386 EXPECT_EQ(m_daq->GetState(), State::Releasing);
387}
388
389/**
390 * [recovery]
391 * - User manually invokes daqDpmMerge to create output.
392 */
393TEST_F(TestDaqController, RecoverAutomaticallyFromMergeFailureIfResultExists) {
394 // Setup
395 // Start in Merging with failure.
396 m_initial_status.state = State::Merging;
397 m_initial_status.alerts.push_back(MakeAlert(alert::MERGING_MERGE, "", ""));
398 PostSetUp();
399
400 EXPECT_CALL(*m_ws_mock_ptr,
401 Exists(std::filesystem::path("TEST.ID.fits"), std::filesystem::file_type::regular))
402 .WillRepeatedly(Return(true));
403 EXPECT_CALL(
404 *m_ws_mock_ptr,
405 StoreStatus(AllOf(Field(&Status::state, State::Completed), ResultOf(HasError, false))));
406
407 // Run
408 // First the process will be created
409 m_io_ctx.poll();
410 ASSERT_EQ(m_proc_factory.procs.size(), 0u);
411 EXPECT_EQ(m_daq->GetState(), State::Completed);
412}
413
414TEST_F(TestDaqController, NoopReleasingCompletes) {
415 // Setup
416 // Start in Releasing without errors.
417 m_initial_status.state = State::Releasing;
418 // Remove default receivers
419 m_dpspec.receivers.clear();
420 PostSetUp();
421
422 EXPECT_CALL(
423 *m_ws_mock_ptr,
424 StoreStatus(AllOf(Field(&Status::state, State::Completed), ResultOf(HasError, false))));
425 // Run
426 m_io_ctx.poll();
427 EXPECT_EQ(m_daq->GetState(), State::Completed);
428}
429
430TEST_F(TestDaqController, ReleasingToOlasIsSuccessfulWithHardLink) {
431 // Setup
432 m_initial_status.state = State::Releasing;
433 m_initial_status.result = "TEST.ID.fits";
434 std::vector<std::string> rsync_dummy_args = {"rsync", "from", "to"};
435 m_rsync_factory.SetHook([&](MockRsyncAsyncProcess& mock) {
436 EXPECT_CALL(mock, GetArguments()).WillRepeatedly(ReturnRef(rsync_dummy_args));
437 EXPECT_CALL(mock, GetPid()).WillRepeatedly(Return(1234));
438 });
439 PostSetUp();
440
441 EXPECT_CALL(
442 *m_ws_mock_ptr,
443 StoreStatus(AllOf(Field(&Status::state, State::Releasing), ResultOf(HasError, false))))
444 .Times(AtLeast(1));
445
446 {
447 EXPECT_CALL(*m_ws_mock_ptr, GetSpecificationPath())
448 .WillRepeatedly(Return("specification.json"));
449 EXPECT_CALL(
450 *m_ws_mock_ptr,
451 Exists(std::filesystem::path("TEST.ID.fits"), std::filesystem::file_type::regular))
452 .WillRepeatedly(Return(false));
453
454 // Only the OLAS receiver without host will attempt to be hard-linked, which we mock as
455 // successful.
456 EXPECT_CALL(*m_ws_mock_ptr,
457 MakeHardLink(std::filesystem::path("TEST.ID.fits"),
458 std::filesystem::path(m_arcfile_path)))
459 .WillOnce(Return(std::error_code()));
460
461 // Run
462 // First the first receiver should have have used hard-link and second rsync transfer
463 // process will be created
464 m_io_ctx.poll();
465 }
466
467 EXPECT_EQ(m_daq->GetStatus().GetReceiverStatus(0).state, ReceiverStatus::State::Success)
468 << "Expected first receiver, which is local, to have used hard links which completes "
469 "immediately";
470 EXPECT_EQ(m_daq->GetStatus().GetReceiverStatus(1).state, ReceiverStatus::State::Started);
471
472 // When rsync completes we expect to transition to completed.
473 EXPECT_CALL(
474 *m_ws_mock_ptr,
475 StoreStatus(AllOf(Field(&Status::state, State::Completed), ResultOf(HasError, false))));
476
477 // Then we complete rsync process
478 ASSERT_EQ(m_rsync_factory.procs.size(), 1u);
479 m_rsync_factory.procs[0]->promise.set_value(0);
480
481 // Run handlers until we reach State::Completed
482 PollUntil([&] { return m_daq->GetState() == State::Completed; });
483
484 EXPECT_EQ(m_daq->GetState(), State::Completed);
485}
486
487TEST_F(TestDaqController, ReleasingToOlasIsSuccessfulWithSymlink) {
488 // Setup
489 m_initial_status.state = State::Releasing;
490 m_initial_status.result = "TEST.ID.fits";
491 std::vector<std::string> rsync_dummy_args = {"rsync", "from", "to"};
492 m_rsync_factory.SetHook([&](MockRsyncAsyncProcess& mock) {
493 EXPECT_CALL(mock, GetArguments()).WillRepeatedly(ReturnRef(rsync_dummy_args));
494 EXPECT_CALL(mock, GetPid()).WillRepeatedly(Return(1234));
495 });
496 PostSetUp();
497
498 EXPECT_CALL(
499 *m_ws_mock_ptr,
500 StoreStatus(AllOf(Field(&Status::state, State::Releasing), ResultOf(HasError, false))))
501 .Times(AtLeast(1));
502
503 {
504 EXPECT_CALL(*m_ws_mock_ptr, GetSpecificationPath())
505 .WillRepeatedly(Return("specification.json"));
506 EXPECT_CALL(
507 *m_ws_mock_ptr,
508 Exists(std::filesystem::path("TEST.ID.fits"), std::filesystem::file_type::regular))
509 .WillRepeatedly(Return(false));
510
511 // Only the OLAS receiver without host will attempt to be hard-linked, which we mock as
512 // successful.
513 EXPECT_CALL(*m_ws_mock_ptr,
514 MakeHardLink(std::filesystem::path("TEST.ID.fits"), m_arcfile_path))
515 .WillOnce(Return(std::make_error_code(std::errc::not_supported)));
516 EXPECT_CALL(*m_ws_mock_ptr,
517 MakeSymlink(std::filesystem::path("TEST.ID.fits"), m_arcfile_path))
518 .WillOnce(Return(std::error_code()));
519
520 // Run
521 // First the first receiver should have have used hard-link and second rsync transfer
522 // process will be created
523 m_io_ctx.poll();
524 }
525
526 EXPECT_EQ(m_daq->GetStatus().GetReceiverStatus(0).state, ReceiverStatus::State::Success)
527 << "Expected first receiver, which is local, to have used hard links which completes "
528 "immediately";
529 EXPECT_EQ(m_daq->GetStatus().GetReceiverStatus(1).state, ReceiverStatus::State::Started);
530
531 // When rsync completes we expect to transition to completed.
532 EXPECT_CALL(
533 *m_ws_mock_ptr,
534 StoreStatus(AllOf(Field(&Status::state, State::Completed), ResultOf(HasError, false))));
535
536 // Then we complete rsync process
537 ASSERT_EQ(m_rsync_factory.procs.size(), 1u);
538 m_rsync_factory.procs[0]->promise.set_value(0);
539
540 // Run handlers until we reach State::Completed
541 PollUntil([&] { return m_daq->GetState() == State::Completed; });
542
543 EXPECT_EQ(m_daq->GetState(), State::Completed);
544}
545
546TEST_F(TestDaqController, ReleasingToOlasHonorsOptionAllowSymlink) {
547 // Setup
548 json::OlasReceiver receiver{};
549 receiver.path = "/local/olas/";
550 receiver.options.allow_symlink = false; // note: default is true
551 m_dpspec.receivers.clear();
552 m_dpspec.receivers.push_back(receiver);
553
554 m_initial_status.state = State::Releasing;
555 m_initial_status.result = "TEST.ID.fits";
556 std::vector<std::string> rsync_dummy_args = {"rsync", "from", "to"};
557 m_rsync_factory.SetHook([&](MockRsyncAsyncProcess& mock) {
558 EXPECT_CALL(mock, GetArguments()).WillRepeatedly(ReturnRef(rsync_dummy_args));
559 EXPECT_CALL(mock, GetPid()).WillRepeatedly(Return(1234));
560 });
561 PostSetUp();
562
563 EXPECT_CALL(
564 *m_ws_mock_ptr,
565 StoreStatus(AllOf(Field(&Status::state, State::Releasing), ResultOf(HasError, false))))
566 .Times(AtLeast(1));
567
568 {
569 EXPECT_CALL(*m_ws_mock_ptr, GetSpecificationPath())
570 .WillRepeatedly(Return("specification.json"));
571 EXPECT_CALL(
572 *m_ws_mock_ptr,
573 Exists(std::filesystem::path("TEST.ID.fits"), std::filesystem::file_type::regular))
574 .WillRepeatedly(Return(false));
575
576 // Only the OLAS receiver without host will attempt to be hard-linked, which we mock as
577 // successful.
578 EXPECT_CALL(*m_ws_mock_ptr,
579 MakeHardLink(std::filesystem::path("TEST.ID.fits"), m_arcfile_path))
580 .WillOnce(Return(std::make_error_code(std::errc::not_supported)));
581 EXPECT_CALL(*m_ws_mock_ptr, MakeSymlink(_, _)).Times(0);
582
583 // Run
584 // First the first receiver should have have used hard-link and second rsync transfer
585 // process will be created
586 m_io_ctx.poll();
587 }
588 EXPECT_EQ(m_daq->GetStatus().GetReceiverStatus(0).state, ReceiverStatus::State::Started);
589
590 // When rsync completes we expect to transition to completed.
591 EXPECT_CALL(
592 *m_ws_mock_ptr,
593 StoreStatus(AllOf(Field(&Status::state, State::Completed), ResultOf(HasError, false))));
594
595 // Then we complete rsync process
596 ASSERT_EQ(m_rsync_factory.procs.size(), 1u);
597 m_rsync_factory.procs[0]->promise.set_value(0);
598
599 // Run handlers until we reach State::Completed
600 PollUntil([&] { return m_daq->GetState() == State::Completed; });
601
602 EXPECT_EQ(m_daq->GetState(), State::Completed);
603}
604
605TEST_F(TestDaqController, ReleasingToOlasFailingHardLinkRevertsToRsync) {
606 // Setup
607 m_initial_status.state = State::Releasing;
608 m_initial_status.result = "TEST.ID.fits";
609 std::vector<std::string> rsync_dummy_args = {"rsync", "from", "to"};
610 m_rsync_factory.SetHook([&](MockRsyncAsyncProcess& mock) {
611 EXPECT_CALL(mock, GetArguments()).WillRepeatedly(ReturnRef(rsync_dummy_args));
612 EXPECT_CALL(mock, GetPid()).WillRepeatedly(Return(1234));
613 });
614 PostSetUp();
615
616 EXPECT_CALL(
617 *m_ws_mock_ptr,
618 StoreStatus(AllOf(Field(&Status::state, State::Releasing), ResultOf(HasError, false))))
619 .Times(AtLeast(1));
620
621 {
622 EXPECT_CALL(*m_ws_mock_ptr, GetSpecificationPath())
623 .WillRepeatedly(Return("specification.json"));
624 EXPECT_CALL(
625 *m_ws_mock_ptr,
626 Exists(std::filesystem::path("TEST.ID.fits"), std::filesystem::file_type::regular))
627 .WillRepeatedly(Return(false));
628
629 // Only the OLAS receiver without host will attempt to be hard-linked, which we mock as
630 // successful.
631 EXPECT_CALL(*m_ws_mock_ptr,
632 MakeHardLink(std::filesystem::path("TEST.ID.fits"),
633 std::filesystem::path(m_arcfile_path)))
634 .WillOnce(Return(std::make_error_code(std::errc::not_supported)));
635 EXPECT_CALL(*m_ws_mock_ptr,
636 MakeSymlink(std::filesystem::path("TEST.ID.fits"),
637 std::filesystem::path(m_arcfile_path)))
638 .WillOnce(Return(std::make_error_code(std::errc::not_supported)));
639
640 // Run
641 // First the first receiver should have have used hard-link and second rsync transfer
642 // process will be created
643 m_io_ctx.poll();
644 }
645
646 EXPECT_EQ(m_daq->GetStatus().GetReceiverStatus(0).state, ReceiverStatus::State::Started);
647 EXPECT_EQ(m_daq->GetStatus().GetReceiverStatus(1).state, ReceiverStatus::State::Started);
648
649 // When rsync completes we expect to transition to completed.
650 EXPECT_CALL(
651 *m_ws_mock_ptr,
652 StoreStatus(AllOf(Field(&Status::state, State::Completed), ResultOf(HasError, false))));
653
654 // Then we complete rsync process
655 ASSERT_EQ(m_rsync_factory.procs.size(), 2u);
656 m_rsync_factory.procs[0]->promise.set_value(0);
657 m_rsync_factory.procs[1]->promise.set_value(0);
658
659 // Run handlers until we reach State::Completed
660 PollUntil([&] { return m_daq->GetState() == State::Completed; });
661
662 EXPECT_EQ(m_daq->GetState(), State::Completed);
663}
664
665TEST_F(TestDaqController, ReleasingToOlasCompletesEvenIfTransferFails) {
666 // Tests that failure to release file is not fatal (but DAQ should have error flag set).
667 // Setup
668 m_initial_status.state = State::Releasing;
669 m_initial_status.result = "TEST.ID.fits";
670 std::vector<std::string> rsync_dummy_args = {"rsync", "from", "to"};
671 m_rsync_factory.SetHook([&](MockRsyncAsyncProcess& mock) {
672 EXPECT_CALL(mock, GetArguments()).WillRepeatedly(ReturnRef(rsync_dummy_args));
673 EXPECT_CALL(mock, GetPid()).WillRepeatedly(Return(1234));
674 });
675 PostSetUp();
676
677 EXPECT_CALL(
678 *m_ws_mock_ptr,
679 StoreStatus(AllOf(Field(&Status::state, State::Releasing), ResultOf(HasError, false))))
680 .Times(AtLeast(1));
681 EXPECT_CALL(
682 *m_ws_mock_ptr,
683 StoreStatus(AllOf(Field(&Status::state, State::Releasing), ResultOf(HasError, true))))
684 .Times(AtLeast(1));
685
686 {
687 EXPECT_CALL(*m_ws_mock_ptr, GetSpecificationPath())
688 .WillRepeatedly(Return("specification.json"));
689 EXPECT_CALL(
690 *m_ws_mock_ptr,
691 Exists(std::filesystem::path("TEST.ID.fits"), std::filesystem::file_type::regular))
692 .WillRepeatedly(Return(false));
693
694 // Only the OLAS receiver without host will attempt to be hard-linked, which we mock as
695 // successful.
696 EXPECT_CALL(*m_ws_mock_ptr,
697 MakeHardLink(std::filesystem::path("TEST.ID.fits"), m_arcfile_path))
698 .WillOnce(Return(std::make_error_code(std::errc::not_supported)));
699
700 EXPECT_CALL(*m_ws_mock_ptr,
701 MakeSymlink(std::filesystem::path("TEST.ID.fits"), m_arcfile_path))
702 .WillOnce(Return(std::make_error_code(std::errc::not_supported)));
703 // Run
704 // First the first receiver should have have used hard-link and second rsync transfer
705 // process will be created
706 m_io_ctx.poll();
707 }
708
709 EXPECT_EQ(m_daq->GetStatus().GetReceiverStatus(0).state, ReceiverStatus::State::Started);
710 EXPECT_EQ(m_daq->GetStatus().GetReceiverStatus(1).state, ReceiverStatus::State::Started);
711
712 // When rsync completes we expect to transition to completed.
713 EXPECT_CALL(
714 *m_ws_mock_ptr,
715 StoreStatus(AllOf(Field(&Status::state, State::Completed), ResultOf(HasError, true))));
716
717 // Then we complete rsync process
718 ASSERT_EQ(m_rsync_factory.procs.size(), 2u);
719 m_rsync_factory.procs[0]->promise.set_value(1);
720 m_rsync_factory.procs[1]->promise.set_value(0);
721
722 // Run handlers until we reach State::Completed
723 PollUntil([&] { return m_daq->GetState() == State::Completed; });
724
725 EXPECT_EQ(m_daq->GetState(), State::Completed);
726 EXPECT_EQ(1, m_daq->GetStatus().GetAlerts().size()) << "Expected one alert from failed rsync";
727}
728} // namespace daq::dpm
Provides location of fits source file.
void Add(SourceFile const &source, std::filesystem::path const &path)
Adds path so it is resolved using source_name and location.
auto GetMapping() const noexcept -> Mapping const &
Get native representation of source mapping for serialization.
boost::asio::io_context m_io_ctx
std::unique_ptr< DaqControllerImpl > m_daq
daq::dpm::MockDaqWorkspace * m_ws_mock_ptr
std::filesystem::path m_arcfile_path
void PollUntil(Pred &&p)
Poll one handler at a time until predicate has been satisifed or io_context runs out of work.
void PostSetUp()
Specifically sets up expectations and other things based on current state.
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
Mocks for daq::RsyncAsyncProcessIf.
Mocks for daq::dpm::Scheduler and daq::dpm::DaqScheduler.
constexpr std::string_view COLLECTING_RSYNC
Failure during rsync source copy.
Definition: status.hpp:38
constexpr std::string_view MERGING_MERGE
Merging failed.
Definition: status.hpp:53
TEST_F(TestDaqController, ScheduledTransitionsToCollecting)
Options for DaqController.
Definition: scheduler.hpp:164
Limited resources.
Definition: scheduler.hpp:172
std::filesystem::path path
Absolute path to the OLAS "incoming directory" where DPM will drop files.
Location ParseSourceLocation(std::string const &location_str)
Parse location string from DpSpec into component parts.
Definition: dpSpec.cpp:90
Target target
Describes target which will become the data produtc.
Definition: dpSpec.hpp:49
std::optional< FitsFileSource > source
Definition: dpSpec.hpp:37
ReceiverList receivers
Ordered container of receivers where to deliver the target data product.
Definition: dpSpec.hpp:59
std::vector< SourceTypes > sources
List of sources to create data product from.
Definition: dpSpec.hpp:54
Close representation of the JSON structure but with stronger types.
Definition: dpSpec.hpp:30
Represents OlasReceiver JSON type used in StartDaqV2 and dpspec.
std::string description
Definition: status.hpp:100
bool HasError(Status const &status) noexcept
Definition: status.cpp:179
@ Completed
Completed DAQ.
@ Scheduled
daq is acknowledged by dpm and is scheduled for merging (i.e.
@ Releasing
Releasing Data Product to receivers.
@ Collecting
Input files are being collected.
@ Merging
DAQ is being merged.
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
Definition: status.cpp:45
Options controlling rsync invocation.
Definition: main.cpp:24
daq::dpm::Scheduler and related class declarations.
Combined fake/mock.
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:164
State state
Definition: status.hpp:186
std::string id
Definition: status.hpp:184
std::map< std::size_t, ReceiverStatus > receivers
Receiver processing (e.g.
Definition: status.hpp:197
std::string file_id
Definition: status.hpp:185
std::vector< Alert > alerts
Active alerts.
Definition: status.hpp:190
TimePoint timestamp
Timestamp of last update.
Definition: status.hpp:207
std::unique_ptr< AsyncProcessIf > operator()(boost::asio::io_context &, std::vector< std::string >)
std::vector< MockAsyncProcess * > procs
std::function< void(MockAsyncProcess &)> Hook
std::function< void(MockRsyncAsyncProcess &)> Hook
std::vector< std::string > rsync_dummy_args
std::unique_ptr< RsyncAsyncProcessIf > operator()(boost::asio::io_context &, std::string, std::string, RsyncOptions const &, RsyncAsyncProcess::DryRun)
void AddDefaultExpectations(MockRsyncAsyncProcess &mock) const
std::vector< MockRsyncAsyncProcess * > procs
std::filesystem::path path
Definition: dpSpec.hpp:79
EXPECT_EQ(meta.rr_uri, "zpb.rr://meta")
ASSERT_EQ(meta.keyword_rules.size(), 1u)
ASSERT_TRUE(std::holds_alternative< OlasReceiver >(spec.receivers[0]))