ifw-daq  3.0.1
IFW Data Acquisition modules
testOcmDaqController.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_ocm_libdaq_test
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief Unit test for `daq::OcmDaqController`
7  */
8 // NOLINT
9 #include <memory>
10 #include <stdexcept>
11 #include <thread>
12 
13 #include <gtest/gtest.h>
14 
15 #include <daq/daqController.hpp>
16 #include <daq/error.hpp>
17 #include <daq/op/abort.hpp>
18 #include <daq/op/awaitPrim.hpp>
19 #include <daq/op/initiate.hpp>
20 #include <daq/op/start.hpp>
21 #include <daq/op/stop.hpp>
22 
23 #include "mock/metadaqifMock.hpp"
25 #include "mock/recifMock.hpp"
26 #include "statusObserver.hpp"
27 #include "utils.hpp"
28 
29 using namespace daq;
30 using namespace ::testing;
31 using namespace std::chrono;
32 
33 /**
34  * Fixture for daq::DaqController life cycle tests
35  *
36  * @ingroup daq_ocm_libdaq_test
37  */
38 class TestOcmDaqControllerLifeCycle : public ::testing::Test {
39 public:
41  : m_io_ctx()
42  , m_status(std::make_shared<ObservableStatus>("id", "fileid"))
43  , m_event_log(std::make_shared<ObservableEventLog>()) {
44  m_prim_rr_client = std::make_shared<RecCmdsAsyncMock>();
45  m_meta_rr_client = std::make_shared<MetaDaqAsyncMock>();
46  m_ops.start = op::InitiateOperation<op::StartAsync, boost::future<void>, op::AsyncOpParams>;
48  boost::future<Result<DpParts>>,
52  boost::future<Result<void>>,
56  boost::future<Result<DpParts>>,
58  m_context.id = "id";
59  }
60  boost::asio::io_context m_io_ctx; // NOLINT
61  std::shared_ptr<ObservableStatus> m_status; // NOLINT
62  std::shared_ptr<ObservableEventLog> m_event_log; // NOLINT
63  std::shared_ptr<PrimSource::RrClient> m_prim_rr_client; // NOLINT
64  std::shared_ptr<MetaSource::RrClient> m_meta_rr_client; // NOLINT
66  DaqContext m_context; // NOLINT
68 };
69 
70 template <class Iterator>
71 void SetSourceState(Iterator begin, Iterator end, State state) {
72  auto it = begin;
73  for (; it != end; ++it) {
74  it->SetState(state);
75  }
76 }
77 
79  SetSourceState(params.meta_sources.begin(), params.meta_sources.end(), state);
80  SetSourceState(params.prim_sources.begin(), params.prim_sources.end(), state);
81 }
82 
83 /**
84  *
85  * Developer notes:
86  * OcmDaqController use boost::when_all to compose futures. This does not support executors and will
87  * spawn a thread to perform the work. This means that the tests will either have to block
88  * indefinitely with future::get() or use a timeout.
89  *
90  * @ingroup daq_ocm_libdaq_test
91  */
92 struct TestState : ::testing::Test {
93  std::shared_ptr<PrimSource::RrClient> m_prim_rr_client; // NOLINT
94  std::shared_ptr<MetaDaqAsyncMock> m_meta_rr_client; // NOLINT
95  std::shared_ptr<MetaDaqAsyncMock> m_meta_rr_client2; // NOLINT
96  std::unique_ptr<MockAsyncOperations> m_mock_ops; // NOLINT
97 
98  boost::asio::io_context m_io_ctx; // NOLINT
100  std::shared_ptr<ObservableStatus> m_status; // NOLINT
101  std::shared_ptr<ObservableEventLog> m_event_log; // NOLINT
102  std::shared_ptr<OcmDaqController> m_daq; // NOLINT
105  std::vector<DpPart> m_files; // NOLINT
107 
109  : m_io_ctx()
110  , m_status(std::make_shared<ObservableStatus>("id", "fileid"))
111  , m_event_log(std::make_shared<ObservableEventLog>()) {
112  }
113 
114  void SetUp() override {
115  m_files.emplace_back("foo", "bar");
116  m_keywords.emplace_back(fits::EsoKeyword("FOO", "BAR"));
117  m_keywords.emplace_back(fits::ValueKeyword("FOO", "BAR"));
118 
119  m_prim_rr_client = std::make_shared<RecCmdsAsyncMock>();
120  m_meta_rr_client = std::make_shared<MetaDaqAsyncMock>();
121  m_meta_rr_client2 = std::make_shared<NiceMock<MetaDaqAsyncMock>>();
122  auto tup = CreateMockAsyncOperations();
123  m_mock_ops.swap(std::get<std::unique_ptr<MockAsyncOperations>>(tup));
124 
125  // Connect listener
126  // @todo: Add expectations for observer
127  // m_status->ConnectObserver(std::reference_wrapper(m_observer));
128 
129  MetaSource s1("meta-source-1", m_meta_rr_client);
130  MetaSource s2("meta-source-2", m_meta_rr_client2);
131  m_context.id = "id";
132  m_sources.GetPrimarySources() = std::vector<daq::PrimSource>{},
133  m_sources.GetMetadataSources() = std::vector<daq::MetaSource>{s1, s2},
134 
135  PreDaqControllerHook();
136  m_daq = std::make_shared<daq::OcmDaqController>(m_io_ctx,
137  m_context,
138  m_sources,
139  m_status,
140  m_event_log,
141  std::get<OcmAsyncOperations>(tup));
142 
143  ASSERT_TRUE(m_daq);
144  ASSERT_EQ(m_status->GetState(), m_daq->GetState());
145  }
146 
147  void TearDown() override {
148  m_daq.reset();
149  m_meta_rr_client.reset();
150  m_meta_rr_client2.reset();
151  m_prim_rr_client.reset();
152  }
153 
154  virtual void PreDaqControllerHook() {
155  }
156  virtual void PreStartAsyncHook() {
157  }
158 
159  /**
160  * Executes a successful StartAsync() call
161  */
162  void StartDaq() {
163  // Setup
164  // Set up mock so that op::StartAsync invocation returns the future from our promise.
165  boost::promise<void> reply_promise;
166  std::optional<op::AsyncOpParams> params;
167  EXPECT_CALL(*m_mock_ops, Start(_))
168  .WillOnce(DoAll(Invoke([&](auto p) { params.emplace(p); }),
169  Return(ByMove(reply_promise.get_future()))));
170  PreStartAsyncHook();
171 
172  // Run
173  auto fut = m_daq->StartAsync();
174  EXPECT_EQ(State::Starting, m_daq->GetState());
175  EXPECT_FALSE(fut.is_ready());
176 
177  // "Send reply"
178  reply_promise.set_value();
179  ASSERT_TRUE(params);
181 
182  // Execute scheduled handlers
183  MakeTestProgress(m_io_ctx, &fut);
184 
185  ASSERT_TRUE(fut.is_ready());
186  EXPECT_EQ(State::Acquiring, fut.get());
187  }
188 
189  void AbortDaq() {
190  // Setup
191  // Set up mock so that op::StartAsync invocation returns the future from our promise.
192  std::optional<op::AsyncOpParams> params;
193  boost::promise<Result<void>> reply_promise;
194  EXPECT_CALL(*m_mock_ops, Abort(ErrorPolicy::Strict, _))
195  .WillOnce(DoAll(Invoke([&](auto policy, auto p) { params.emplace(p); }),
196  Return(ByMove(reply_promise.get_future()))));
197 
198  // Run
199  auto fut = m_daq->AbortAsync(ErrorPolicy::Strict);
200 
201  EXPECT_EQ(State::AbortingAcquiring, m_daq->GetState())
202  << "Expected state to be in Stopping after requesting to abort";
203 
204  // "Send reply"
205  reply_promise.set_value({false});
206  ASSERT_TRUE(params);
208 
209  // Execute handlers
210  MakeTestProgress(m_io_ctx, &fut);
211 
212  ASSERT_TRUE(fut.is_ready());
213  ASSERT_FALSE(fut.has_exception()) << "Future has unexpected exception!";
214  auto result = fut.get();
215  EXPECT_EQ(State::Aborted, result.state);
216  EXPECT_FALSE(result.error);
217  EXPECT_EQ(State::Aborted, m_daq->GetState());
218  }
219 
220  void StopDaq() {
221  std::optional<op::AsyncOpParams> params;
222  boost::promise<Result<DpParts>> reply_promise;
223 
224  EXPECT_CALL(*m_mock_ops, Stop(ErrorPolicy::Strict, _))
225  .WillOnce(DoAll(Invoke([&](auto policy, auto p) { params.emplace(p); }),
226  Return(ByMove(reply_promise.get_future()))));
227 
228  // Run
229  auto fut = m_daq->StopAsync(ErrorPolicy::Strict);
230 
231  EXPECT_EQ(State::Stopping, m_daq->GetState())
232  << "Expected state to be in Stopping after requesting to stop";
233 
234  ASSERT_TRUE(params);
236 
237  // "Send reply"
238  Result<DpParts> reply{false, m_files};
239  reply_promise.set_value(reply);
240 
241  // Execute handlers
242  MakeTestProgress(m_io_ctx, &fut);
243 
244  ASSERT_TRUE(fut.is_ready());
245  ASSERT_FALSE(fut.has_exception()) << "Future has unexpected exception!";
246  auto status = fut.get();
247  EXPECT_EQ(State::Stopped, status.state)
248  << "Expected state to be Stopped since there were no errors";
249  EXPECT_FALSE(status.error);
250  EXPECT_EQ(State::Stopped, m_daq->GetState());
251  }
252 };
253 
254 struct TestAwait : TestState {};
255 
257 
260  m_status->SetState(State::Acquiring);
261  }
262 };
263 
266  m_status->SetState(State::Stopped);
267  }
268 };
269 
270 /**
271  * Fixture for
272  */
275  // Add a primary source, which was not needed for other tests
276  PrimSource s1("prim-source-1", m_prim_rr_client);
277  m_sources.GetPrimarySources() = std::vector<daq::PrimSource>{s1};
278  }
280  EXPECT_CALL(*m_mock_ops, AwaitPrim(_))
281  .WillOnce(Return(ByMove(m_await_promise.get_future())));
282  }
283 
284  boost::promise<Result<DpParts>> m_await_promise; // NOLINT
285 };
286 
287 // Simple test to understand boost::future::unwrap()
288 TEST(TestBoost, Unwrap) {
289  EXPECT_THROW(boost::make_ready_future()
290  .then([](auto f) -> boost::future<void> {
291  try {
292  throw std::runtime_error("Meow");
293  } catch (...) {
294  return boost::make_exceptional_future<void>();
295  }
296  })
297  .unwrap()
298  .get(),
299  std::runtime_error);
300 }
301 
302 TEST_F(TestOcmDaqControllerLifeCycle, ConstructorFailsIfNoSourcesAreProvided) {
303  ASSERT_THROW(
304  OcmDaqController::Create(m_io_ctx, m_context, m_sources, m_status, m_event_log, m_ops),
305  std::invalid_argument);
306 }
307 
308 TEST_F(TestOcmDaqControllerLifeCycle, ConstructorFailsObservableStatusIdDoesNotMatchDaqContextId) {
309  MetaSource s("source-id", m_meta_rr_client);
310  m_sources.GetMetadataSources() = {s};
311  m_context.id = "not-id";
312  ASSERT_THROW(
313  OcmDaqController::Create(m_io_ctx, m_context, m_sources, m_status, m_event_log, m_ops),
314  std::invalid_argument);
315 }
316 
317 TEST_F(TestOcmDaqControllerLifeCycle, ConstructorFailsIfAsyncOperationIsInvalid) {
318  MetaSource s("source-id", m_meta_rr_client);
319  m_sources.GetMetadataSources() = {s};
320  {
321  auto ops = m_ops;
322  ops.start = {};
323  ASSERT_THROW(
324  OcmDaqController::Create(m_io_ctx, m_context, m_sources, m_status, m_event_log, ops),
325  std::invalid_argument);
326  }
327  {
328  auto ops = m_ops;
329  ops.stop = {};
330  ASSERT_THROW(
331  OcmDaqController::Create(m_io_ctx, m_context, m_sources, m_status, m_event_log, ops),
332  std::invalid_argument);
333  }
334  {
335  auto ops = m_ops;
336  ops.abort = {};
337  ASSERT_THROW(
338  OcmDaqController::Create(m_io_ctx, m_context, m_sources, m_status, m_event_log, ops),
339  std::invalid_argument);
340  }
341  {
342  auto ops = m_ops;
343  ops.await_prim = {};
344  ASSERT_THROW(
345  OcmDaqController::Create(m_io_ctx, m_context, m_sources, m_status, m_event_log, ops),
346  std::invalid_argument);
347  }
348 }
349 
350 TEST_F(TestOcmDaqControllerLifeCycle, ConstructorSucceedsIfSingleMetadataSourceIsUsed) {
351  MetaSource s("source-id", m_meta_rr_client);
352  m_sources.GetMetadataSources() = {s};
353  OcmDaqController::Create(m_io_ctx, m_context, m_sources, m_status, m_event_log, m_ops);
354 }
355 
356 TEST_F(TestOcmDaqControllerLifeCycle, DestructionAbortsAsyncWait) {
357  MetaSource s("source-id", m_meta_rr_client);
358  m_sources.GetMetadataSources() = {s};
359  boost::future<State> fut;
360  {
361  auto daq =
362  OcmDaqController::Create(m_io_ctx, m_context, m_sources, m_status, m_event_log, m_ops);
363  fut = daq->AwaitAsync({"source-id"}, 100ms);
364  ASSERT_FALSE(fut.is_ready());
365  }
366 
367  MakeTestProgress(m_io_ctx, &fut);
368  ASSERT_TRUE(fut.is_ready())
369  << "Future should have been cancelled since daq should have been deleted.";
370  EXPECT_TRUE(fut.has_exception());
371  EXPECT_THROW(fut.get(), DaqOperationAborted);
372 }
373 
374 TEST_F(TestState, NotStarted) {
375  ASSERT_EQ(State::NotStarted, m_daq->GetState()) << "The initial state should be NotStarted";
376 }
377 
378 TEST_F(TestState, GetStatusReturnsSameStatusObject) {
379  auto status_ptr = m_daq->GetStatus();
380  EXPECT_EQ(status_ptr.get(), m_status.get());
381 }
382 
383 TEST_F(TestState, CannotStopStoppedOcmDaqController) {
384  SCOPED_TRACE("CannotStopStoppedOcmDaqController");
385  StartDaq();
386  StopDaq();
387 
388  ASSERT_EQ(State::Stopped, m_daq->GetState()) << "Setup failed";
389 
390  // Try to stop again
391  auto fut = m_daq->StopAsync(ErrorPolicy::Strict);
392  EXPECT_TRUE(fut.has_exception());
393  EXPECT_THROW(fut.get(), std::exception);
394 }
395 
396 TEST_F(TestState, CannotAbortStoppedOcmDaqController) {
397  SCOPED_TRACE("CannotAbortStoppedOcmDaqController");
398  StartDaq();
399  StopDaq();
400 
401  ASSERT_EQ(State::Stopped, m_daq->GetState()) << "Setup failed";
402 
403  // Try to stop again
404  auto fut = m_daq->AbortAsync(ErrorPolicy::Strict);
405  EXPECT_TRUE(fut.has_exception());
406  EXPECT_THROW(fut.get(), std::exception);
407 }
408 
409 TEST_F(TestState, StartingFailsToSendStartDaqWillAbortAndSetErrorFlagAndStayInStarting) {
410  // Setup
411  boost::promise<void> reply_promise;
412  EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(reply_promise.get_future())));
413 
414  // Run
415  auto fut = m_daq->StartAsync();
416  EXPECT_EQ(State::Starting, m_daq->GetState());
417 
418  // Set up mock future so that it results in an exception exception.
419  reply_promise.set_exception(std::runtime_error("Fake test failure"));
420 
421  // Run async handlers
422  MakeTestProgress(m_io_ctx, &fut);
423 
424  ASSERT_TRUE(fut.is_ready());
425  EXPECT_TRUE(fut.has_exception()) << "Expected future to contain exception";
426  EXPECT_THROW(fut.get(), std::exception) << "Expected exception to derive from std::exception";
427  EXPECT_EQ(true, m_daq->GetErrorFlag());
428 }
429 
430 TEST_F(TestState, StartAsyncReturnsExceptionalFutureInStateStarting) {
431  // Setup
432  boost::promise<void> reply_promise;
433  EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(reply_promise.get_future())));
434 
435  auto fut = m_daq->StartAsync();
436  ASSERT_EQ(State::Starting, m_daq->GetState());
437  EXPECT_FALSE(fut.is_ready());
438 
439  // Run
440  // @todo: Shouldn't this be communicated through the future?
441  auto fut2 = m_daq->StartAsync();
442  ASSERT_TRUE(fut2.has_exception());
443  EXPECT_THROW(fut2.get(), std::exception)
444  << "Multiple simultaneous start operations are not supported and an exception "
445  "was exected";
446 
447  // Complete pending operations to avoid "leaking" mock objects
448  // "Send reply"
449  reply_promise.set_value();
450 
451  // Make progress
452  MakeTestProgress(m_io_ctx, &fut);
453 }
454 
455 /**
456  * It's possible to abort but not stop (and keep)
457  */
458 TEST_F(TestState, StopAsyncThrowsIfNotStarted) {
459  auto fut = m_daq->StopAsync(ErrorPolicy::Strict);
460  EXPECT_THROW(fut.get(), std::exception)
461  << "It should not be possible to stop a data acquisition that has not started";
462 }
463 
464 TEST_F(TestState, StopAsyncDoesNotThrowWithTolerantPolicy) {
465  // Setup
466  StartDaq();
467 
468  std::optional<op::AsyncOpParams> params;
469  boost::promise<Result<DpParts>> reply_promise;
470  EXPECT_CALL(*m_mock_ops, Stop(ErrorPolicy::Tolerant, _))
471  .WillOnce(DoAll(Invoke([&](auto policy, auto p) { params.emplace(p); }),
472  Return(ByMove(reply_promise.get_future()))));
473 
474  // Since there are no primary sources in TestState fixture we expect no abort
475  EXPECT_CALL(m_mock_ops->mock_abort, Abort()).Times(0);
476 
477  // Run
478  auto fut = m_daq->StopAsync(ErrorPolicy::Tolerant);
479 
480  EXPECT_EQ(State::Stopping, m_daq->GetState())
481  << "Expected state to be in Stopping after requesting to stop";
482 
483  // "Send reply"
484  Result<DpParts> reply{true, {}};
485  reply_promise.set_value(reply);
486  ASSERT_TRUE(params);
487  // Since we are forcing stop it should be acceptable that sources are not stopped.
489 
490  // Execute handlers
491  MakeTestProgress(m_io_ctx, &fut);
492 
493  ASSERT_TRUE(fut.is_ready());
494  ASSERT_FALSE(fut.has_exception()) << "Future has unexpected exception!";
495  auto status = fut.get();
496  EXPECT_EQ(State::Stopped, status.state)
497  << "Expected state to be Stopped since there were no errors";
498  EXPECT_TRUE(status.error) << "Error flag should be set since the reply_promise had an error";
499  EXPECT_EQ(State::Stopped, m_daq->GetState());
500 }
501 
502 /**
503  * It should be possible to abort a data acquisition even if it's not started.
504  */
505 TEST_F(TestState, AbortAsyncIsOkIfNotStarted) {
506  auto fut = m_daq->AbortAsync(ErrorPolicy::Strict);
507  ASSERT_TRUE(fut.is_ready())
508  << "Aborting a NotStarted data acquisition should be ready immediately";
509  EXPECT_FALSE(fut.has_exception()) << "Future should not have failed";
510 
511  auto result = fut.get();
512  EXPECT_EQ(State::Aborted, result.state) << "Unexpected state";
513  EXPECT_FALSE(result.error);
514  EXPECT_EQ(State::Aborted, m_daq->GetState());
515 }
516 
517 /**
518  * It's possible to abort but not stop (and keep) if data acquisition is starting
519  */
520 TEST_F(TestState, StopAsyncThrowsIfStarting) {
521  // Setup
522  boost::promise<void> reply_promise;
523  EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(reply_promise.get_future())));
524 
525  auto fut = m_daq->StartAsync();
526  ASSERT_EQ(State::Starting, m_daq->GetState())
527  << "Setup failed, unexpected state, aborting test";
528  ASSERT_FALSE(fut.is_ready());
529 
530  // Run
531  {
532  auto fut = m_daq->StopAsync(ErrorPolicy::Strict);
533  EXPECT_TRUE(fut.has_exception())
534  << "Cannot stop unless DAQ is in State::Acquiring, current state: "
535  << m_daq->GetState();
536 
537  EXPECT_THROW(fut.get(), std::exception)
538  << "Cannot stop if data acquisition is `Starting`. An exeption was expected";
539  }
540  // Complete pending operations to avoid "leaking" mock objects
541  // "Send reply"
542  reply_promise.set_value();
543 
544  // Make progress
545  MakeTestProgress(m_io_ctx, &fut);
546 }
547 
548 /**
549  * Test sequence:
550  *
551  * 1. Send StartDaq
552  * 2. Send AbortDaq
553  * 3. StartDaq still succeeds in this case (simulates serial handling of client requests at source).
554  * 4. AbortDaq suceeds.
555  */
556 TEST_F(TestState, AbortingIsOkWhenStarting) {
557  SCOPED_TRACE("AbortingIsOkWhenStarting");
558  // Setup
559  // Set up mock so that StartDaq invocation returns the future from our promise.
560  boost::promise<void> start_promise;
561  EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(start_promise.get_future())));
562 
563  // Run
564  //
565  // Start data acquisition
566  auto start_fut = m_daq->StartAsync();
567  ASSERT_EQ(State::Starting, m_daq->GetState());
568  EXPECT_FALSE(start_fut.is_ready());
569 
570  // Setup
571  // And ditto for Abort
572  boost::promise<Result<void>> abort_promise;
573  EXPECT_CALL(*m_mock_ops, Abort(ErrorPolicy::Strict, _))
574  .WillOnce(Return(ByMove(abort_promise.get_future())));
575 
576  // Run
577  //
578  // Abort data acquisition
579  auto abort_fut = m_daq->AbortAsync(ErrorPolicy::Strict);
580 
581  // Complete pending operations to avoid "leaking" mock objects
582  // "Send reply"
583  start_promise.set_value();
584 
585  // Make progress
586  MakeTestProgress(m_io_ctx, &start_fut);
587 
588  ASSERT_TRUE(start_fut.is_ready()) << "Cannot proceed with test since future is not ready";
589  EXPECT_FALSE(start_fut.has_exception())
590  << "Mock did not simulate failure so future should be ok";
591  // @todo: What state do we expect to be in?
592 
593  // Complete pending operations to avoid "leaking" mock objects
594  // "Send reply"
595  abort_promise.set_value({false});
596 
597  // Make progress
598  MakeTestProgress(m_io_ctx, &abort_fut);
599 
600  ASSERT_TRUE(abort_fut.is_ready()) << "Cannot proceed with test since future is not ready";
601  EXPECT_FALSE(abort_fut.has_exception())
602  << "Mock didn't simulate failure so future should be OK";
603  auto result = abort_fut.get();
604  EXPECT_EQ(State::Aborted, result.state);
605  EXPECT_FALSE(result.error);
606  EXPECT_EQ(State::Aborted, m_daq->GetState());
607 }
608 
609 TEST_F(TestState, StartAsyncCompletesSuccessfully) {
610  SCOPED_TRACE("Acquiring");
611 
612  StartDaq();
613 }
614 
615 TEST_F(TestState, AbortOcmDaqControllerInStateAborting) {
616  // Setup
617  SCOPED_TRACE("AbortOcmDaqControllerInStateAborting");
618  StartDaq();
619 
620  ASSERT_EQ(State::Acquiring, m_daq->GetState()) << "Test Setup failed";
621 
622  AbortDaq();
623  ASSERT_EQ(State::Aborted, m_daq->GetState()) << "Test setup failed";
624 
625  // Test that abort fails if daq is already aborted
626  auto fut = m_daq->AbortAsync(ErrorPolicy::Strict);
627  ASSERT_TRUE(fut.is_ready());
628  EXPECT_THROW(fut.get(), std::runtime_error);
629 }
630 
631 TEST_F(TestState, AbortOcmDaqControllerInStateStarting) {
632  // Setup
633  SCOPED_TRACE("AbortOcmDaqControllerInStateStarting");
634  StartDaq();
635  ASSERT_EQ(State::Acquiring, m_daq->GetState()) << "Test Setup failed";
636 
637  // Test
638  AbortDaq();
639  EXPECT_EQ(State::Aborted, m_daq->GetState());
640 }
641 
642 TEST_F(TestState, AbortAsyncReturnsWithErrorInsteadOfExceptionForTolerantPolicy) {
643  // Setup
644  SCOPED_TRACE("NewAbortSupersedesFailedAbort");
645  StartDaq();
646 
647  boost::promise<Result<void>> abort_promise_1;
648 
649  // Expect two calls to abort since the first one will fail
650  EXPECT_CALL(*m_mock_ops, Abort(ErrorPolicy::Tolerant, _))
651  .WillOnce(Return(ByMove(abort_promise_1.get_future())));
652 
653  // Run
654  auto fut1 = m_daq->AbortAsync(ErrorPolicy::Tolerant);
655 
656  EXPECT_EQ(State::AbortingAcquiring, m_daq->GetState())
657  << "Expected state to be in Stopping after requesting to abort";
658 
659  // "Send reply1" where first source fails and second is ok.
660  abort_promise_1.set_value({true});
661  MakeTestProgress(m_io_ctx, &fut1);
662 
663  ASSERT_TRUE(fut1.has_value());
664 
665  auto result1 = fut1.get();
666  EXPECT_EQ(State::Aborted, result1.state);
667  EXPECT_TRUE(result1.error);
668  EXPECT_EQ(State::Aborted, m_daq->GetState());
669 }
670 
671 /**
672  * It is possible to abort even though an abort operation has already been started.
673  * Nothing special happens in this case though.
674  * @todo The command bein superseeded should probably fail.
675  */
676 TEST_F(TestState, NewAbortSupersedesSuccessfulAbort) {
677  // Setup
678  SCOPED_TRACE("NewAbortSupersedesSuccessfulAbort");
679  StartDaq();
680 
681  // First abort
682  boost::promise<Result<void>> abort_promise_1;
683  // Second abort
684  boost::promise<Result<void>> abort_promise_2;
685 
686  // Expect two calls to abort
687  EXPECT_CALL(*m_mock_ops, Abort(ErrorPolicy::Strict, _))
688  .Times(2)
689  .WillOnce(Return(ByMove(abort_promise_1.get_future())))
690  .WillOnce(Return(ByMove(abort_promise_2.get_future())));
691 
692  // Run
693  // Launch async operations concurrently
694  auto fut1 = m_daq->AbortAsync(ErrorPolicy::Strict);
695  auto fut2 = m_daq->AbortAsync(ErrorPolicy::Strict);
696 
697  EXPECT_EQ(State::AbortingAcquiring, m_daq->GetState())
698  << "Expected state to be in Stopping after requesting to abort";
699 
700  // "Send reply1" to cause interleaving
701  abort_promise_1.set_value({false});
702  MakeTestProgress(m_io_ctx, &fut1);
703 
704  ASSERT_TRUE(fut1.is_ready());
705  ASSERT_FALSE(fut1.has_exception()) << "Future has unexpected exception!";
706  auto result1 = fut1.get();
707  EXPECT_EQ(State::Aborted, result1.state);
708  EXPECT_FALSE(result1.error);
709  EXPECT_EQ(State::Aborted, m_daq->GetState());
710 
711  // "Send reply2"
712  abort_promise_2.set_value({false});
713  MakeTestProgress(m_io_ctx, &fut2);
714  auto result2 = fut2.get();
715  EXPECT_EQ(State::Aborted, result2.state);
716  EXPECT_FALSE(result2.error);
717  EXPECT_EQ(State::Aborted, m_daq->GetState());
718 }
719 
720 TEST_F(TestState, NewAbortSupersedesFailedAbortWithStrictPolicy) {
721  // Setup
722  SCOPED_TRACE("NewAbortSupersedesFailedAbort");
723  StartDaq();
724 
725  boost::promise<Result<void>> abort_promise_1;
726  // Second abort
727  boost::promise<Result<void>> abort_promise_2;
728 
729  // Expect two calls to abort since the first one will fail
730  EXPECT_CALL(*m_mock_ops, Abort(ErrorPolicy::Strict, _))
731  .Times(2)
732  .WillOnce(Return(ByMove(abort_promise_1.get_future())))
733  .WillOnce(Return(ByMove(abort_promise_2.get_future())));
734 
735  // Run
736  auto fut1 = m_daq->AbortAsync(ErrorPolicy::Strict);
737 
738  EXPECT_EQ(State::AbortingAcquiring, m_daq->GetState())
739  << "Expected state to be in Stopping after requesting to abort";
740 
741  // "Send reply1" where first source fails and second is ok.
742  abort_promise_1.set_exception(DaqSourceErrors(std::vector<std::exception_ptr>()));
743  MakeTestProgress(m_io_ctx, &fut1);
744 
745  ASSERT_TRUE(fut1.is_ready());
746  ASSERT_TRUE(fut1.has_exception()) << "Future has unexpected exception!";
747  EXPECT_THROW(fut1.get(), DaqSourceErrors);
748  EXPECT_EQ(State::AbortingAcquiring, m_daq->GetState());
749 
750  // Abort again, this time it works.
751  auto fut2 = m_daq->AbortAsync(ErrorPolicy::Strict);
752 
753  // "Send reply2"
754  abort_promise_2.set_value({false});
755 
756  MakeTestProgress(m_io_ctx, &fut2);
757  ASSERT_TRUE(fut2.has_value());
758 
759  auto result2 = fut2.get();
760  EXPECT_EQ(State::Aborted, result2.state);
761  EXPECT_FALSE(result2.error);
762  EXPECT_EQ(State::Aborted, m_daq->GetState());
763 }
764 
765 TEST_F(TestState, StopOcmDaqControllerSuccessfully) {
766  // Setup
767  SCOPED_TRACE("StopOcmDaqControllerSuccessfully");
768  StartDaq();
769 
770  ASSERT_EQ(State::Acquiring, m_daq->GetState()) << "Test Setup failed";
771 
772  StopDaq();
773 }
774 
775 TEST_F(TestAwait, AwaitNonExistantSourceFails) {
776  // Run
777  auto fut = m_daq->AwaitAsync({"non-existant"}, 0ms);
778  ASSERT_TRUE(fut.has_exception());
779  EXPECT_THROW(fut.get(), std::invalid_argument);
780 }
781 
782 TEST_F(TestAwait, AwaitTimeout) {
783  // Run
784  auto fut = m_daq->AwaitAsync({"meta-source-1"}, 1ms);
785  MakeTestProgress(m_io_ctx, &fut);
786 
787  ASSERT_TRUE(fut.has_exception());
788  EXPECT_THROW(fut.get(), DaqOperationTimeout);
789 }
790 
791 TEST_F(TestAwaitWithPrimSource, AwaitStopSingleSourceIsOk) {
792  SCOPED_TRACE("AwaitSingleSourceIsOk");
793  // Setup
794  auto fut = m_daq->AwaitAsync({"meta-source-1"}, 300ms); // large timeout needed under valgrind
795  EXPECT_FALSE(fut.is_ready())
796  << "The future shouldn't be ready yet as we haven't started the data acquisition!";
797 
798  // Run
799  StartDaq();
800  EXPECT_FALSE(fut.is_ready())
801  << "Wait condition not fulfilled, so future should not be ready yet";
802  // Stop should cancel await op
803  EXPECT_CALL(m_mock_ops->mock_abort, Abort()).WillOnce(Return(true));
804  StopDaq();
805 
806  ASSERT_TRUE(fut.is_ready());
807  ASSERT_FALSE(fut.has_exception());
808  auto state = fut.get();
809  EXPECT_TRUE(state == State::Stopping || state == State::Stopped)
810  << "Await condition should have been ready once source is stopped (meaning that DAQ is "
811  "Stopping or Stopped, depending on the order of the source)";
812 }
813 
814 TEST_F(TestAwaitWithPrimSource, AwaitAbortAllSources) {
815  SCOPED_TRACE("AwaitAbortAllMetadataSources");
816  // Setup
817  auto fut = m_daq->AwaitAsync({"meta-source-1", "meta-source-2"}, 150ms);
818  EXPECT_FALSE(fut.is_ready())
819  << "The future shouldn't be ready yet as we haven't started the data acquisition!";
820 
821  // Run
822  StartDaq();
823  EXPECT_FALSE(fut.is_ready());
824 
825  // Abort should cancel await op
826  EXPECT_CALL(m_mock_ops->mock_abort, Abort()).WillOnce(Return(true));
827  AbortDaq();
828 
829  EXPECT_TRUE(fut.is_ready());
830  ASSERT_FALSE(fut.has_exception());
831  auto state = fut.get();
832  EXPECT_TRUE(state == State::AbortingAcquiring || state == State::Aborted)
833  << "Await condition should have been ready once source is stopped (meaning that DAQ is "
834  "Aborting or Aborted, depending on the order of the source)";
835 }
836 
837 TEST_F(TestAwait, AwaitStopSingleSourceWhenConditionIsFulfilled) {
838  SCOPED_TRACE("AwaitStopSingleSourceWhenConditionIsFulfilled");
839  // Setup
840  // Run
841  StartDaq();
842  StopDaq();
843 
844  auto fut = m_daq->AwaitAsync({"meta-source-1"}, 150ms);
845 
846  EXPECT_TRUE(fut.is_ready()) << "Condition already fulfilled so future should be ready";
847  ASSERT_FALSE(fut.has_exception());
848  EXPECT_EQ(State::Stopped, fut.get());
849 }
850 
851 TEST_F(TestNotStarted, CanUpdateKeywords) {
852  // Setup
853 
854  // Run
855  m_daq->UpdateKeywords(m_keywords);
856 
857  EXPECT_EQ(m_keywords, m_daq->GetContext().keywords);
858 }
859 
860 TEST_F(TestStopped, CannotUpdateKeywordsInStopped) {
861  // Setup
862 
863  // Run
864  EXPECT_THROW(m_daq->UpdateKeywords(m_keywords), std::runtime_error);
865 }
866 
867 TEST_F(TestAwaitWithPrimSource, StartWillAwait) {
868  SCOPED_TRACE("StartWillAwait");
869  // Setup
870  // Run
871  StartDaq();
872 }
873 
874 /**
875  * Tests that DaqController automatically stops DAQ when the await-op completes.
876  */
878  SCOPED_TRACE("AutomaticStop");
879  // Setup
880  StartDaq();
881 
882  // Run
883  // DaqController is monitoring the completion of all primary data sources.
884  // By setting the value we simulate that completion. This should then trigger StopDaq.
885  DpPart prim_part{"s1", "/tmp/file.fits"};
886  m_await_promise.set_value({false, {prim_part}});
887 
888  // Setup expectations for stopping
889  Result<DpParts> stop_op_reply{false, m_files};
890  EXPECT_CALL(*m_mock_ops, Stop(ErrorPolicy::Strict, _))
891  .WillOnce(Return(ByMove(boost::make_ready_future<Result<DpParts>>(stop_op_reply))));
892  ASSERT_EQ(State::Acquiring, this->m_status->GetState());
893  EXPECT_EQ(0u, m_daq->GetContext().results.size());
894 
895  // There's no future to await-on, so we run until the observed state changes instead.
897  m_io_ctx, [this]() -> bool { return this->m_status->GetState() == State::Stopped; });
898  EXPECT_FALSE(m_status->GetError());
899  EXPECT_EQ(2u, m_daq->GetContext().results.size()) << "One from m_files (metadata), "
900  "one from primary";
901  EXPECT_THAT(m_daq->GetContext().results, Contains(prim_part));
902 }
Contains declaration for the AbortAsync operation.
Contains declaration for the AwaitPrimAsync operation.
Started operation was aborted.
Definition: error.hpp:47
Started operation timed out.
Definition: error.hpp:57
Exception thrown to carry reply errors.
Definition: error.hpp:84
Data acquisition sources.
Definition: source.hpp:184
std::vector< MetaSource > const & GetMetadataSources() const noexcept
Definition: source.hpp:198
std::vector< PrimSource > const & GetPrimarySources() const noexcept
Definition: source.hpp:190
Provides information of the location and source of a FITS file or keywords produced by a data acquisi...
Definition: dpPart.hpp:26
Keeps relevant state to be able to communicate with a primary data source.
Definition: source.hpp:140
Stores data acquisition status and allows subscription to status changes.
Definition: eventLog.hpp:107
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:210
static std::shared_ptr< OcmDaqController > Create(boost::asio::io_context &io_context, DaqContext context, DaqSources const &sources, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, OcmAsyncOperations operations)
Construct object.
Keeps relevant state to be able to communicate with a primary data source.
Definition: source.hpp:96
Contains error related declarations for DAQ.
virtual void PreDaqControllerHook()
std::shared_ptr< MetaSource::RrClient > m_meta_rr_client
void SetUp() override
std::vector< DpPart > m_files
std::shared_ptr< ObservableEventLog > m_event_log
fits::KeywordVector m_keywords
std::shared_ptr< MetaDaqAsyncMock > m_meta_rr_client2
StatusObserverMock m_observer
std::shared_ptr< MetaDaqAsyncMock > m_meta_rr_client
std::shared_ptr< OcmDaqController > m_daq
std::shared_ptr< PrimSource::RrClient > m_prim_rr_client
DaqContext m_context
boost::asio::io_context m_io_ctx
void StartDaq()
Executes a successful StartAsync() call.
void TearDown() override
boost::asio::io_context m_io_ctx
virtual void PreStartAsyncHook()
std::shared_ptr< ObservableStatus > m_status
std::unique_ptr< MockAsyncOperations > m_mock_ops
std::shared_ptr< ObservableStatus > m_status
std::shared_ptr< ObservableEventLog > m_event_log
DaqSources m_sources
std::shared_ptr< PrimSource::RrClient > m_prim_rr_client
Fixture for daq::DaqController life cycle tests.
void MakeTestProgress(boost::asio::io_context &io_ctx, Future *fut=nullptr)
Test helper that progress the test by executing pending jobs and optionally wait for a future to be r...
Definition: utils.hpp:42
Simple observer used for testing.
Developer notes: OcmDaqController use boost::when_all to compose futures.
Contains declarations for the helper functions to initiate operations.
Mockup of metadaqif classes.
std::tuple< std::unique_ptr< MockAsyncOperations >, daq::OcmAsyncOperations > CreateMockAsyncOperations()
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
Definition: keyword.hpp:414
R InitiateOperation(Params &&... params)
Constructs and initiates Op and return the future result.
Definition: initiate.hpp:66
std::pair< R, std::function< bool()> > InitiateAbortableOperation(Params &&... params)
Like InitiateOperation but in addition to returning the future it also returns an unspecified object ...
Definition: initiate.hpp:75
TEST_F(TestDpmClient, StartMonitoringSendsRequestAndReceivesReply)
ErrorPolicy
Error policy supported by certain operations.
Definition: error.hpp:25
@ Strict
Any error is considered fatal and may lead to the operation being aborted.
@ Tolerant
Errors that can be ignored with partial completion of a command will be tolerated and is reported as ...
State
Observable states of the data acquisition process.
Definition: state.hpp:39
@ Aborted
Data acquisition has been aborted by user.
@ Stopping
Transitional state between Acquiring and Stopped.
@ Acquiring
All data sources have reported data acquisition is in progress.
@ Stopped
All data sources have reported they have stopped acquiring data.
@ Starting
Transitional state between NotStarted and Acquiring when sources have not begun acquiring data yet.
@ AbortingAcquiring
Transitional state for aborting during acquisition.
@ NotStarted
Initial state of data acquisition.
TEST(TestDaqContext, Files)
Utility class that represents a result and an error.
Definition: utility.hpp:17
Mockup of metadaqif classes.
Contains declaration for for DaqController.
Contains declaration for the StartAsync operation.
Contains declaration for the StopAsync operation.
boost::promise< Result< DpParts > > m_await_promise
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:44
std::string id
DAQ identfier, possibly provided by user.
Definition: daqContext.hpp:60
OCM Async operations.
A type safe version of LiteralKeyword that consist of the three basic components of a FITS keyword ke...
Definition: keyword.hpp:266
A composite async operation that aborts a DAQ.
Definition: abort.hpp:26
Parameters required for each async operation.
std::vector< Source< MetaSource > > & meta_sources
Note: Consider vector immutable!a.
std::vector< Source< PrimSource > > & prim_sources
Note: Consider vector immutable!
Await specific parameters that is not provided with AsyncOpParams.
A composite async operation that awaits primary data sources.
Definition: awaitPrim.hpp:56
A composite async operation that starts DAQ.
Definition: stop.hpp:27
void SetAllSourceState(op::AsyncOpParams &params, State state)
void SetSourceState(Iterator begin, Iterator end, State state)
EXPECT_EQ(meta.rr_uri, "zpb.rr://meta")
ASSERT_EQ(meta.keyword_rules.size(), 1u)
ASSERT_TRUE(std::holds_alternative< OlasReceiver >(spec.receivers[0]))
Defines shared test utilities.
void MakeTestProgressUntil(boost::asio::io_context &io_ctx, Predicate &&pred, std::chrono::milliseconds timeout=std::chrono::seconds(3))
Executes io_ctx::poll until pred returns true or it times out.
Definition: utils.hpp:21