ifw-daq  3.0.1
IFW Data Acquisition modules
testDaqController.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_common_libdpm
4  * @copyright (c) Copyright ESO 2022
5  * All Rights Reserved
6  * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7  *
8  * @brief Unit tests for daq::dpm::DaqControllerImpl
9  */
10 #include <daq/dpm/scheduler.hpp>
11 
12 #include <fmt/format.h>
13 #include <log4cplus/loggingmacros.h>
14 #include <utility>
15 
17 #include "mock/mockScheduler.hpp"
18 #include "mock/mockWorkspace.hpp"
19 
20 #include <gmock/gmock.h>
21 #include <gtest/gtest.h>
22 
23 namespace daq::dpm {
24 
25 using namespace ::testing;
26 
28  using Hook = std::function<void(MockRsyncAsyncProcess&)>;
29  std::unique_ptr<RsyncAsyncProcessIf> operator()(boost::asio::io_context&,
30  std::string, // NOLINT
31  std::string, // NOLINT
32  RsyncOptions const&,
34  auto rsync = std::make_unique<MockRsyncAsyncProcess>();
35  if (hook) {
36  hook(*rsync);
37  }
38  procs.push_back(rsync.get());
39  return rsync;
40  }
41  void SetHook(Hook f) {
42  hook = std::move(f);
43  }
44  std::vector<MockRsyncAsyncProcess*> procs;
46 };
47 
49  using Hook = std::function<void(MockAsyncProcess&)>;
50  // NOLINTNEXTLINE
51  std::unique_ptr<AsyncProcessIf> operator()(boost::asio::io_context&, std::vector<std::string>) {
52  auto rsync = std::make_unique<MockRsyncAsyncProcess>();
53  if (hook) {
54  hook(*rsync);
55  }
56  procs.push_back(rsync.get());
57  return rsync;
58  }
59  void SetHook(Hook f) {
60  hook = std::move(f);
61  }
62  std::vector<MockAsyncProcess*> procs;
64 };
65 
66 class TestDaqControllerBase : public ::testing::Test {
67 public:
69  }
70 
71  void SetUp() override {
72  }
73 
74  void TearDown() override {
75  // Execute possibly pending completions handlers.
76  EXPECT_NO_THROW(m_io_ctx.poll());
77  }
78 
79 protected:
81  boost::asio::io_context m_io_ctx;
84 };
85 
87 
89 public:
91  }
92  void SetUp() override {
94  // Set default status. This can be modified by each test-case before calling PostSetup()
95  m_initial_status.id = "TEST.ID";
96  m_initial_status.file_id = "TEST.2023-08-22T14:40:40.245";
98  m_initial_status.error = false;
99  m_initial_status.timestamp = Status::TimePoint::clock::now();
101  m_dpspec.target.file_id = "TEST.ID";
102  m_dpspec.target.source = json::FitsFileSource{"source1", "host:target.fits", {}};
103  m_dpspec.sources.push_back(json::FitsFileSource{"source2", "host:source.fits", {}});
104  m_dpspec.receivers.push_back(json::OlasReceiver{/*.host*/ "", /* .path */ "/local/olas/"});
105  m_dpspec.receivers.push_back(
106  json::OlasReceiver{/*.host*/ "host", /* .path */ "/remote/olas/"});
107  }
108 
109  /**
110  * Specifically sets up expectations and other things based on current state.
111  * It considers:
112  * - m_dpspec for sources and populates m_resolver based on that.
113  * - m_initial_status controls which state to start from.
114  */
115  void PostSetUp() {
116  m_arcfile_path = fmt::format("/local/olas/{}.fits", m_initial_status.file_id);
117 
118  // Set up resolver so that it matches m_dpspec sources.
119  std::filesystem::path sources_root = "sources";
120  if (m_dpspec.target.source.has_value()) {
121  m_resolver.Add(
122  {m_dpspec.target.source->source_name, m_dpspec.target.source->location},
123  sources_root / json::ParseSourceLocation(m_dpspec.target.source->location).path);
124  }
125  for (auto const& s : m_dpspec.sources) {
126  if (std::holds_alternative<json::FitsFileSource>(s)) {
127  auto const& source = std::get<json::FitsFileSource>(s);
128  m_resolver.Add({source.source_name, source.location},
129  sources_root / json::ParseSourceLocation(source.location).path);
130  }
131  }
132  auto ws = std::make_unique<daq::dpm::MockDaqWorkspace>();
133  m_ws_mock_ptr = ws.get();
134  EXPECT_CALL(*ws, LoadStatus()).WillRepeatedly(Return(m_initial_status));
135  EXPECT_CALL(*ws, LoadSourceLookup()).WillRepeatedly(Return(m_resolver.GetMapping()));
136  EXPECT_CALL(*ws, LoadSpecification()).WillRepeatedly(Return(m_dpspec));
137  EXPECT_CALL(*ws, GetSourcesPath()).WillRepeatedly(Return(std::filesystem::path("sources")));
138  EXPECT_CALL(*ws, GetResultPath()).WillRepeatedly(Return(std::filesystem::path("")));
139  EXPECT_CALL(*ws, GetSourceLookupPath())
140  .WillRepeatedly(Return(std::filesystem::path("sources.json")));
141  EXPECT_CALL(*ws, GetPath())
142  .Times(AnyNumber())
143  .WillRepeatedly(Return(std::filesystem::path("")));
144  m_daq = std::make_unique<DaqControllerImpl>(m_executor,
145  std::move(ws),
146  m_resources,
147  std::reference_wrapper(m_rsync_factory),
148  std::reference_wrapper(m_proc_factory),
149  m_options);
150  m_daq->Start();
151  }
152 
153  void TearDown() override {
154  m_daq.reset();
156  }
157 
158  /**
159  * Poll one handler at a time until predicate has been satisifed or io_context runs out of work.
160  */
161  template <class Pred>
162  void PollUntil(Pred&& p) {
163  while (true) {
164  m_io_ctx.restart();
165  try {
166  if (m_io_ctx.poll_one() == 0 || p()) {
167  break;
168  }
169  } catch (std::exception const& ex) {
170  LOG4CPLUS_ERROR("test", "Exception bubbled up through ASIO: " << ex.what());
171  throw;
172  }
173  }
174  }
175 
176 protected:
180  // Not-owned pointer to a DaqWorkspace
182  std::unique_ptr<DaqControllerImpl> m_daq;
185  // Expected path with name of the archived file
186  std::filesystem::path m_arcfile_path;
187 };
188 
189 using namespace ::testing;
190 
191 TEST_F(TestDaqController, ScheduledTransitionsToCollecting) {
192  // Additional setup
193  PostSetUp();
194  EXPECT_CALL(*m_ws_mock_ptr, StoreSourceLookup(_));
195  EXPECT_CALL(*m_ws_mock_ptr, StoreStatus(Field(&Status::state, State::Collecting)));
196 
197  // Run
198  // Only run handlers until we reach State::Collecting
199  PollUntil([&] { return m_daq->GetState() == State::Collecting; });
200  EXPECT_EQ(m_daq->GetState(), State::Collecting);
201 }
202 
203 TEST_F(TestDaqController, CollectingWithoutFilesTransitionsToMerging) {
204  // Additional setup
205  m_dpspec.target.source = std::nullopt;
206  m_dpspec.sources.clear();
207  m_initial_status.state = State::Collecting;
208  PostSetUp();
209 
210  EXPECT_CALL(*m_ws_mock_ptr, StoreStatus(Field(&Status::state, State::Merging)));
211 
212  // Run
213  // Only run handlers until we reach State::Merging
214  PollUntil([&] { return m_daq->GetState() == State::Merging; });
215  EXPECT_EQ(m_daq->GetState(), State::Merging);
216 }
217 
218 TEST_F(TestDaqController, CollectingStartsTransfersAndWhenCompletedTransitionsToMerging) {
219  // Additional setup
220  ASSERT_TRUE(m_dpspec.target.source);
221  ASSERT_TRUE(m_dpspec.sources.size() == 1);
222  m_initial_status.state = State::Collecting;
223  m_resources.net_receive.SetLimit(0);
224  PostSetUp();
225 
226  EXPECT_EQ(m_resolver.GetMapping().size(), 2u)
227  << "Setup should have added target and one extra source";
228  EXPECT_CALL(
229  *m_ws_mock_ptr,
230  Exists(std::filesystem::path("sources/target.fits"), std::filesystem::file_type::regular))
231  .Times(AnyNumber())
232  .WillRepeatedly(Return(false));
233  EXPECT_CALL(
234  *m_ws_mock_ptr,
235  Exists(std::filesystem::path("sources/source.fits"), std::filesystem::file_type::regular))
236  .Times(AnyNumber())
237  .WillRepeatedly(Return(false));
238 
239  // Run
240  m_io_ctx.poll();
241  EXPECT_EQ(m_daq->GetState(), State::Collecting);
242  // After promise is completed the files will exist
243  EXPECT_CALL(
244  *m_ws_mock_ptr,
245  Exists(std::filesystem::path("sources/source.fits"), std::filesystem::file_type::regular))
246  .WillOnce(Return(true));
247  EXPECT_CALL(
248  *m_ws_mock_ptr,
249  Exists(std::filesystem::path("sources/target.fits"), std::filesystem::file_type::regular))
250  .WillOnce(Return(true));
251  EXPECT_CALL(*m_ws_mock_ptr, StoreStatus(Field(&Status::state, State::Merging)));
252 
253  // Complete async file operations by setting promises
254  for (auto& proc : m_rsync_factory.procs) {
255  proc->promise.set_value(0);
256  }
257  PollUntil([&] { return m_daq->GetState() == State::Merging; });
258 
259  // Execute pending handlers that should have been registered after promise was set.
260  EXPECT_EQ(m_daq->GetState(), State::Merging);
261 }
262 
263 TEST_F(TestDaqController, CollectingFailsDoesNotTransition) {
264  // Additional setup
265  ASSERT_TRUE(m_dpspec.target.source);
266  ASSERT_TRUE(m_dpspec.sources.size() == 1);
267  m_initial_status.state = State::Collecting;
268  m_resources.net_receive.SetLimit(0);
269  PostSetUp();
270 
271  EXPECT_EQ(m_resolver.GetMapping().size(), 2u)
272  << "Setup should have added target and one extra source";
273  EXPECT_CALL(
274  *m_ws_mock_ptr,
275  Exists(std::filesystem::path("sources/target.fits"), std::filesystem::file_type::regular))
276  .WillOnce(Return(false));
277  EXPECT_CALL(
278  *m_ws_mock_ptr,
279  Exists(std::filesystem::path("sources/source.fits"), std::filesystem::file_type::regular))
280  .WillOnce(Return(false));
281 
282  // Test
283  // Initiates transfers
284  m_io_ctx.poll();
285 
286  EXPECT_EQ(m_daq->GetState(), State::Collecting);
287  EXPECT_CALL(*m_ws_mock_ptr,
288  StoreStatus(AllOf(Field(&Status::state, State::Collecting),
289  Field(&Status::error, true))));
290 
291  // Run test that emulate a successful first file transfer and then subsequent failures.
292  // Complete async file operations by setting promises
293  int error = 0;
294  for (auto& proc : m_rsync_factory.procs) {
295  proc->promise.set_value(error++);
296  }
297  // Execute pending handlers that should have been registered after promise was set.
298  m_io_ctx.restart();
299  m_io_ctx.poll();
300 
301  EXPECT_TRUE(m_daq->IsStopped());
302  EXPECT_EQ(m_daq->GetState(), State::Collecting);
303 }
304 
305 /**
306  * [recovery]
307  * If transfer previously failed and then stopped this should be automatically recoverable if
308  * DAQ is started again and requested files are available on FS.
309  */
310 TEST_F(TestDaqController, RecoverAutomaticallyFromCollectingIfFilesExist) {
311  // Additional setup
312  m_initial_status.state = State::Collecting;
313  m_initial_status.error = true;
314  PostSetUp();
315 
316  // Files exist!
317  EXPECT_CALL(*m_ws_mock_ptr, Exists(_, _)).WillRepeatedly(Return(true));
318  EXPECT_CALL(
319  *m_ws_mock_ptr,
320  StoreStatus(AllOf(Field(&Status::state, State::Merging), Field(&Status::error, false))));
321 
322  // Run
323  // Only run handlers until we reach State::Merging
324  PollUntil([&] { return m_daq->GetState() == State::Merging; });
325  EXPECT_EQ(m_daq->GetState(), State::Merging);
326 }
327 
328 TEST_F(TestDaqController, MergingSuccessful) {
329  // Setup
330  m_initial_status.state = State::Merging;
331  PostSetUp();
332 
333  EXPECT_CALL(*m_ws_mock_ptr, GetSpecificationPath())
334  .WillRepeatedly(Return("specification.json"));
335  EXPECT_CALL(*m_ws_mock_ptr,
336  Exists(std::filesystem::path("TEST.ID.fits"), std::filesystem::file_type::regular))
337  .WillOnce(Return(false));
338 
339  // Run
340  // First the process will be created
341  m_io_ctx.poll();
342 
343  ASSERT_EQ(m_proc_factory.procs.size(), 1u);
344  EXPECT_CALL(*m_ws_mock_ptr,
345  Exists(std::filesystem::path("TEST.ID.fits"), std::filesystem::file_type::regular))
346  .WillRepeatedly(Return(true));
347 
348  // expect alert to be set
349  EXPECT_CALL(*m_ws_mock_ptr,
350  StoreStatus(AllOf(
351  Field(&Status::alerts,
352  ElementsAre(Field(&Alert::description,
353  HasSubstr("Writing keywords required resizing")))),
354  Field(&Status::state, State::Merging),
355  Field(&Status::error, false))));
356  EXPECT_CALL(
357  *m_ws_mock_ptr,
358  StoreStatus(AllOf(Field(&Status::state, State::Releasing), Field(&Status::error, false))));
359  EXPECT_CALL(*m_ws_mock_ptr, MakeResultSymlink(std::filesystem::path("TEST.ID.fits")));
360 
361  // Emit alert from merger
362  std::string message =
363  R"({"content":{"id":"primary_hdu_resize","message":"Writing keywords required resizing of primary HDU: Add space for at least 73 keywords to avoid resize"},"timestamp":1650964356522093759,"type":"alert"})";
364  m_proc_factory.procs[0]->stdout(8, message);
365 
366  // Invalid messages should be ignored
367  m_proc_factory.procs[0]->stdout(8, "This is invalid JSON");
368 
369  // Then we complete
370  m_proc_factory.procs[0]->promise.set_value(0);
371 
372  // Only run handlers until we reach State::Merging
373  PollUntil([&] { return m_daq->GetState() == State::Releasing; });
374  EXPECT_EQ(m_daq->GetState(), State::Releasing);
375 }
376 
377 /**
378  * [recovery]
379  * - User manually invokes daqDpmMerge to create output.
380  */
381 TEST_F(TestDaqController, RecoverAutomaticallyFromMergeFailureIfResultExists) {
382  // Setup
383  // Start in Merging with failure.
384  m_initial_status.state = State::Merging;
385  m_initial_status.error = true;
386  PostSetUp();
387 
388  EXPECT_CALL(*m_ws_mock_ptr,
389  Exists(std::filesystem::path("TEST.ID.fits"), std::filesystem::file_type::regular))
390  .WillRepeatedly(Return(true));
391  EXPECT_CALL(
392  *m_ws_mock_ptr,
393  StoreStatus(AllOf(Field(&Status::state, State::Completed), Field(&Status::error, false))));
394 
395  // Run
396  // First the process will be created
397  m_io_ctx.poll();
398  ASSERT_EQ(m_proc_factory.procs.size(), 0u);
399  EXPECT_EQ(m_daq->GetState(), State::Completed);
400 }
401 
402 TEST_F(TestDaqController, NoopReleasingCompletes) {
403  // Setup
404  // Start in Releasing without errors.
405  m_initial_status.state = State::Releasing;
406  m_initial_status.error = false;
407  // Remove default receivers
408  m_dpspec.receivers.clear();
409  PostSetUp();
410 
411  EXPECT_CALL(
412  *m_ws_mock_ptr,
413  StoreStatus(AllOf(Field(&Status::state, State::Completed), Field(&Status::error, false))));
414  // Run
415  m_io_ctx.poll();
416  EXPECT_EQ(m_daq->GetState(), State::Completed);
417 }
418 
419 TEST_F(TestDaqController, ReleasingToOlasIsSuccessfulWithHardLink) {
420  // Setup
421  m_initial_status.state = State::Releasing;
422  m_initial_status.result = "TEST.ID.fits";
423  std::vector<std::string> rsync_dummy_args = {"rsync", "from", "to"};
424  m_rsync_factory.SetHook([&](MockRsyncAsyncProcess& mock) {
425  EXPECT_CALL(mock, GetArguments()).WillRepeatedly(ReturnRef(rsync_dummy_args));
426  EXPECT_CALL(mock, GetPid()).WillRepeatedly(Return(1234));
427  });
428  PostSetUp();
429 
430  EXPECT_CALL(
431  *m_ws_mock_ptr,
432  StoreStatus(AllOf(Field(&Status::state, State::Releasing), Field(&Status::error, false))))
433  .Times(AtLeast(1));
434 
435  {
436  EXPECT_CALL(*m_ws_mock_ptr, GetSpecificationPath())
437  .WillRepeatedly(Return("specification.json"));
438  EXPECT_CALL(
439  *m_ws_mock_ptr,
440  Exists(std::filesystem::path("TEST.ID.fits"), std::filesystem::file_type::regular))
441  .WillRepeatedly(Return(false));
442 
443  // Only the OLAS receiver without host will attempt to be hard-linked, which we mock as
444  // successful.
445  EXPECT_CALL(*m_ws_mock_ptr,
446  MakeHardLink(std::filesystem::path("TEST.ID.fits"),
447  std::filesystem::path(m_arcfile_path)))
448  .WillOnce(Return(std::error_code()));
449 
450  // Run
451  // First the first receiver should have have used hard-link and second rsync transfer
452  // process will be created
453  m_io_ctx.poll();
454  }
455 
456  EXPECT_EQ(m_daq->GetStatus().GetReceiverStatus(0).state, ReceiverStatus::State::Success)
457  << "Expected first receiver, which is local, to have used hard links which completes "
458  "immediately";
459  EXPECT_EQ(m_daq->GetStatus().GetReceiverStatus(1).state, ReceiverStatus::State::Started);
460 
461  // When rsync completes we expect to transition to completed.
462  EXPECT_CALL(
463  *m_ws_mock_ptr,
464  StoreStatus(AllOf(Field(&Status::state, State::Completed), Field(&Status::error, false))));
465 
466  // Then we complete rsync process
467  ASSERT_EQ(m_rsync_factory.procs.size(), 1u);
468  m_rsync_factory.procs[0]->promise.set_value(0);
469 
470  // Run handlers until we reach State::Completed
471  PollUntil([&] { return m_daq->GetState() == State::Completed; });
472 
473  EXPECT_EQ(m_daq->GetState(), State::Completed);
474 }
475 
476 TEST_F(TestDaqController, ReleasingToOlasIsSuccessfulWithSymlink) {
477  // Setup
478  m_initial_status.state = State::Releasing;
479  m_initial_status.result = "TEST.ID.fits";
480  std::vector<std::string> rsync_dummy_args = {"rsync", "from", "to"};
481  m_rsync_factory.SetHook([&](MockRsyncAsyncProcess& mock) {
482  EXPECT_CALL(mock, GetArguments()).WillRepeatedly(ReturnRef(rsync_dummy_args));
483  EXPECT_CALL(mock, GetPid()).WillRepeatedly(Return(1234));
484  });
485  PostSetUp();
486 
487  EXPECT_CALL(
488  *m_ws_mock_ptr,
489  StoreStatus(AllOf(Field(&Status::state, State::Releasing), Field(&Status::error, false))))
490  .Times(AtLeast(1));
491 
492  {
493  EXPECT_CALL(*m_ws_mock_ptr, GetSpecificationPath())
494  .WillRepeatedly(Return("specification.json"));
495  EXPECT_CALL(
496  *m_ws_mock_ptr,
497  Exists(std::filesystem::path("TEST.ID.fits"), std::filesystem::file_type::regular))
498  .WillRepeatedly(Return(false));
499 
500  // Only the OLAS receiver without host will attempt to be hard-linked, which we mock as
501  // successful.
502  EXPECT_CALL(*m_ws_mock_ptr,
503  MakeHardLink(std::filesystem::path("TEST.ID.fits"), m_arcfile_path))
504  .WillOnce(Return(std::make_error_code(std::errc::not_supported)));
505  EXPECT_CALL(*m_ws_mock_ptr,
506  MakeSymlink(std::filesystem::path("TEST.ID.fits"), m_arcfile_path))
507  .WillOnce(Return(std::error_code()));
508 
509  // Run
510  // First the first receiver should have have used hard-link and second rsync transfer
511  // process will be created
512  m_io_ctx.poll();
513  }
514 
515  EXPECT_EQ(m_daq->GetStatus().GetReceiverStatus(0).state, ReceiverStatus::State::Success)
516  << "Expected first receiver, which is local, to have used hard links which completes "
517  "immediately";
518  EXPECT_EQ(m_daq->GetStatus().GetReceiverStatus(1).state, ReceiverStatus::State::Started);
519 
520  // When rsync completes we expect to transition to completed.
521  EXPECT_CALL(
522  *m_ws_mock_ptr,
523  StoreStatus(AllOf(Field(&Status::state, State::Completed), Field(&Status::error, false))));
524 
525  // Then we complete rsync process
526  ASSERT_EQ(m_rsync_factory.procs.size(), 1u);
527  m_rsync_factory.procs[0]->promise.set_value(0);
528 
529  // Run handlers until we reach State::Completed
530  PollUntil([&] { return m_daq->GetState() == State::Completed; });
531 
532  EXPECT_EQ(m_daq->GetState(), State::Completed);
533 }
534 
535 TEST_F(TestDaqController, ReleasingToOlasHonorsOptionAllowSymlink) {
536  // Setup
537  json::OlasReceiver receiver{};
538  receiver.path = "/local/olas/";
539  receiver.options.allow_symlink = false; // note: default is true
540  m_dpspec.receivers.clear();
541  m_dpspec.receivers.push_back(receiver);
542 
543  m_initial_status.state = State::Releasing;
544  m_initial_status.result = "TEST.ID.fits";
545  std::vector<std::string> rsync_dummy_args = {"rsync", "from", "to"};
546  m_rsync_factory.SetHook([&](MockRsyncAsyncProcess& mock) {
547  EXPECT_CALL(mock, GetArguments()).WillRepeatedly(ReturnRef(rsync_dummy_args));
548  EXPECT_CALL(mock, GetPid()).WillRepeatedly(Return(1234));
549  });
550  PostSetUp();
551 
552  EXPECT_CALL(
553  *m_ws_mock_ptr,
554  StoreStatus(AllOf(Field(&Status::state, State::Releasing), Field(&Status::error, false))))
555  .Times(AtLeast(1));
556 
557  {
558  EXPECT_CALL(*m_ws_mock_ptr, GetSpecificationPath())
559  .WillRepeatedly(Return("specification.json"));
560  EXPECT_CALL(
561  *m_ws_mock_ptr,
562  Exists(std::filesystem::path("TEST.ID.fits"), std::filesystem::file_type::regular))
563  .WillRepeatedly(Return(false));
564 
565  // Only the OLAS receiver without host will attempt to be hard-linked, which we mock as
566  // successful.
567  EXPECT_CALL(*m_ws_mock_ptr,
568  MakeHardLink(std::filesystem::path("TEST.ID.fits"), m_arcfile_path))
569  .WillOnce(Return(std::make_error_code(std::errc::not_supported)));
570  EXPECT_CALL(*m_ws_mock_ptr, MakeSymlink(_, _)).Times(0);
571 
572  // Run
573  // First the first receiver should have have used hard-link and second rsync transfer
574  // process will be created
575  m_io_ctx.poll();
576  }
577  EXPECT_EQ(m_daq->GetStatus().GetReceiverStatus(0).state, ReceiverStatus::State::Started);
578 
579  // When rsync completes we expect to transition to completed.
580  EXPECT_CALL(
581  *m_ws_mock_ptr,
582  StoreStatus(AllOf(Field(&Status::state, State::Completed), Field(&Status::error, false))));
583 
584  // Then we complete rsync process
585  ASSERT_EQ(m_rsync_factory.procs.size(), 1u);
586  m_rsync_factory.procs[0]->promise.set_value(0);
587 
588  // Run handlers until we reach State::Completed
589  PollUntil([&] { return m_daq->GetState() == State::Completed; });
590 
591  EXPECT_EQ(m_daq->GetState(), State::Completed);
592 }
593 
594 TEST_F(TestDaqController, ReleasingToOlasFailingHardLinkRevertsToRsync) {
595  // Setup
596  m_initial_status.state = State::Releasing;
597  m_initial_status.result = "TEST.ID.fits";
598  std::vector<std::string> rsync_dummy_args = {"rsync", "from", "to"};
599  m_rsync_factory.SetHook([&](MockRsyncAsyncProcess& mock) {
600  EXPECT_CALL(mock, GetArguments()).WillRepeatedly(ReturnRef(rsync_dummy_args));
601  EXPECT_CALL(mock, GetPid()).WillRepeatedly(Return(1234));
602  });
603  PostSetUp();
604 
605  EXPECT_CALL(
606  *m_ws_mock_ptr,
607  StoreStatus(AllOf(Field(&Status::state, State::Releasing), Field(&Status::error, false))))
608  .Times(AtLeast(1));
609 
610  {
611  EXPECT_CALL(*m_ws_mock_ptr, GetSpecificationPath())
612  .WillRepeatedly(Return("specification.json"));
613  EXPECT_CALL(
614  *m_ws_mock_ptr,
615  Exists(std::filesystem::path("TEST.ID.fits"), std::filesystem::file_type::regular))
616  .WillRepeatedly(Return(false));
617 
618  // Only the OLAS receiver without host will attempt to be hard-linked, which we mock as
619  // successful.
620  EXPECT_CALL(*m_ws_mock_ptr,
621  MakeHardLink(std::filesystem::path("TEST.ID.fits"),
622  std::filesystem::path(m_arcfile_path)))
623  .WillOnce(Return(std::make_error_code(std::errc::not_supported)));
624  EXPECT_CALL(*m_ws_mock_ptr,
625  MakeSymlink(std::filesystem::path("TEST.ID.fits"),
626  std::filesystem::path(m_arcfile_path)))
627  .WillOnce(Return(std::make_error_code(std::errc::not_supported)));
628 
629  // Run
630  // First the first receiver should have have used hard-link and second rsync transfer
631  // process will be created
632  m_io_ctx.poll();
633  }
634 
635  EXPECT_EQ(m_daq->GetStatus().GetReceiverStatus(0).state, ReceiverStatus::State::Started);
636  EXPECT_EQ(m_daq->GetStatus().GetReceiverStatus(1).state, ReceiverStatus::State::Started);
637 
638  // When rsync completes we expect to transition to completed.
639  EXPECT_CALL(
640  *m_ws_mock_ptr,
641  StoreStatus(AllOf(Field(&Status::state, State::Completed), Field(&Status::error, false))));
642 
643  // Then we complete rsync process
644  ASSERT_EQ(m_rsync_factory.procs.size(), 2u);
645  m_rsync_factory.procs[0]->promise.set_value(0);
646  m_rsync_factory.procs[1]->promise.set_value(0);
647 
648  // Run handlers until we reach State::Completed
649  PollUntil([&] { return m_daq->GetState() == State::Completed; });
650 
651  EXPECT_EQ(m_daq->GetState(), State::Completed);
652 }
653 
654 TEST_F(TestDaqController, ReleasingToOlasCompletesEvenIfTransferFails) {
655  // Tests that failure to release file is not fatal (but DAQ should have error flag set).
656  // Setup
657  m_initial_status.state = State::Releasing;
658  m_initial_status.result = "TEST.ID.fits";
659  std::vector<std::string> rsync_dummy_args = {"rsync", "from", "to"};
660  m_rsync_factory.SetHook([&](MockRsyncAsyncProcess& mock) {
661  EXPECT_CALL(mock, GetArguments()).WillRepeatedly(ReturnRef(rsync_dummy_args));
662  EXPECT_CALL(mock, GetPid()).WillRepeatedly(Return(1234));
663  });
664  PostSetUp();
665 
666  EXPECT_CALL(
667  *m_ws_mock_ptr,
668  StoreStatus(AllOf(Field(&Status::state, State::Releasing), Field(&Status::error, false))))
669  .Times(AtLeast(1));
670  EXPECT_CALL(
671  *m_ws_mock_ptr,
672  StoreStatus(AllOf(Field(&Status::state, State::Releasing), Field(&Status::error, true))))
673  .Times(AtLeast(1));
674 
675  {
676  EXPECT_CALL(*m_ws_mock_ptr, GetSpecificationPath())
677  .WillRepeatedly(Return("specification.json"));
678  EXPECT_CALL(
679  *m_ws_mock_ptr,
680  Exists(std::filesystem::path("TEST.ID.fits"), std::filesystem::file_type::regular))
681  .WillRepeatedly(Return(false));
682 
683  // Only the OLAS receiver without host will attempt to be hard-linked, which we mock as
684  // successful.
685  EXPECT_CALL(*m_ws_mock_ptr,
686  MakeHardLink(std::filesystem::path("TEST.ID.fits"), m_arcfile_path))
687  .WillOnce(Return(std::make_error_code(std::errc::not_supported)));
688 
689  EXPECT_CALL(*m_ws_mock_ptr,
690  MakeSymlink(std::filesystem::path("TEST.ID.fits"), m_arcfile_path))
691  .WillOnce(Return(std::make_error_code(std::errc::not_supported)));
692  // Run
693  // First the first receiver should have have used hard-link and second rsync transfer
694  // process will be created
695  m_io_ctx.poll();
696  }
697 
698  EXPECT_EQ(m_daq->GetStatus().GetReceiverStatus(0).state, ReceiverStatus::State::Started);
699  EXPECT_EQ(m_daq->GetStatus().GetReceiverStatus(1).state, ReceiverStatus::State::Started);
700 
701  // When rsync completes we expect to transition to completed.
702  EXPECT_CALL(
703  *m_ws_mock_ptr,
704  StoreStatus(AllOf(Field(&Status::state, State::Completed), Field(&Status::error, true))));
705 
706  // Then we complete rsync process
707  ASSERT_EQ(m_rsync_factory.procs.size(), 2u);
708  m_rsync_factory.procs[0]->promise.set_value(1);
709  m_rsync_factory.procs[1]->promise.set_value(0);
710 
711  // Run handlers until we reach State::Completed
712  PollUntil([&] { return m_daq->GetState() == State::Completed; });
713 
714  EXPECT_EQ(m_daq->GetState(), State::Completed);
715  EXPECT_EQ(1, m_daq->GetStatus().GetAlerts().size()) << "Expected one alert from failed rsync";
716 }
717 } // namespace daq::dpm
Provides location of fits source file.
void Add(SourceFile const &source, std::filesystem::path const &path)
Adds path so it is resolved using source_name and location.
auto GetMapping() const noexcept -> Mapping const &
Get native representation of source mapping for serialization.
boost::asio::io_context m_io_ctx
std::unique_ptr< DaqControllerImpl > m_daq
daq::dpm::MockDaqWorkspace * m_ws_mock_ptr
std::filesystem::path m_arcfile_path
void PollUntil(Pred &&p)
Poll one handler at a time until predicate has been satisifed or io_context runs out of work.
void PostSetUp()
Specifically sets up expectations and other things based on current state.
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
Mocks for daq::RsyncAsyncProcessIf.
Mocks for daq::dpm::Scheduler and daq::dpm::DaqScheduler.
TEST_F(TestDaqController, ScheduledTransitionsToCollecting)
Options for DaqController.
Definition: scheduler.hpp:163
Limited resources.
Definition: scheduler.hpp:171
std::filesystem::path path
Absolute path to the OLAS "incoming directory" where DPM will drop files.
Location ParseSourceLocation(std::string const &location_str)
Parse location string from DpSpec into component parts.
Definition: dpSpec.cpp:90
Target target
Describes target which will become the data produtc.
Definition: dpSpec.hpp:49
std::optional< FitsFileSource > source
Definition: dpSpec.hpp:37
ReceiverList receivers
Ordered container of receivers where to deliver the target data product.
Definition: dpSpec.hpp:59
std::vector< SourceTypes > sources
List of sources to create data product from.
Definition: dpSpec.hpp:54
Close representation of the JSON structure but with stronger types.
Definition: dpSpec.hpp:30
Represents OlasReceiver JSON type used in StartDaqV2 and dpspec.
std::string description
Definition: status.hpp:91
@ Completed
Completed DAQ.
@ Scheduled
daq is acknowledged by dpm and is scheduled for merging (i.e.
@ Releasing
Releasing Data Product to receivers.
@ Collecting
Input files are being collected.
@ Merging
DAQ is being merged.
Options controlling rsync invocation.
Definition: main.cpp:23
daq::dpm::Scheduler and related class declarations.
Combined fake/mock.
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:153
State state
Definition: status.hpp:176
std::string id
Definition: status.hpp:174
std::map< std::size_t, ReceiverStatus > receivers
Receiver processing (e.g.
Definition: status.hpp:188
std::string file_id
Definition: status.hpp:175
bool error
Definition: status.hpp:177
std::vector< Alert > alerts
Active alerts.
Definition: status.hpp:181
TimePoint timestamp
Timestamp of last update.
Definition: status.hpp:198
std::vector< MockAsyncProcess * > procs
std::unique_ptr< AsyncProcessIf > operator()(boost::asio::io_context &, std::vector< std::string >)
std::function< void(MockAsyncProcess &)> Hook
std::unique_ptr< RsyncAsyncProcessIf > operator()(boost::asio::io_context &, std::string, std::string, RsyncOptions const &, RsyncAsyncProcess::DryRun)
std::function< void(MockRsyncAsyncProcess &)> Hook
std::vector< MockRsyncAsyncProcess * > procs
std::filesystem::path path
Definition: dpSpec.hpp:79
EXPECT_EQ(meta.rr_uri, "zpb.rr://meta")
ASSERT_EQ(meta.keyword_rules.size(), 1u)
ASSERT_TRUE(std::holds_alternative< OlasReceiver >(spec.receivers[0]))