ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
testOcmDaqController.cpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup daq_ocm_libdaq_test
4 * @copyright 2022 ESO - European Southern Observatory
5 *
6 * @brief Unit test for `daq::OcmDaqController`
7 */
8// NOLINT
9#include <memory>
10#include <stdexcept>
11#include <thread>
12
13#include <gtest/gtest.h>
14
15#include <daq/daqController.hpp>
16#include <daq/error.hpp>
17#include <daq/op/abort.hpp>
18#include <daq/op/awaitPrim.hpp>
19#include <daq/op/initiate.hpp>
20#include <daq/op/start.hpp>
21#include <daq/op/stop.hpp>
22
26#include "mock/recifMock.hpp"
27#include "statusObserver.hpp"
28#include "utils.hpp"
29
30using namespace daq;
31using namespace ::testing;
32using namespace std::chrono;
33
34/**
35 * Fixture for daq::DaqController life cycle tests
36 *
37 * @ingroup daq_ocm_libdaq_test
38 */
39class TestOcmDaqControllerLifeCycle : public ::testing::Test {
40public:
42 : m_io_ctx()
44 , m_status(std::make_shared<ObservableStatus>("id", "fileid"))
45 , m_event_log(std::make_shared<ObservableEventLog>())
46 , m_ops(m_executor) {
47 m_prim_rr_client = std::make_shared<RecCmdsAsyncMock>();
48 m_meta_rr_client = std::make_shared<MetaDaqAsyncMock>();
49#if 0
50 m_ops.start = op::InitiateOperation<op::StartAsync, boost::future<void>, op::AsyncOpParams>;
52 boost::future<Result<DpParts>>,
56 boost::future<Result<void>>,
60 boost::future<Result<DpParts>>,
62#endif
63 m_context.id = "id";
64 }
65 boost::asio::io_context m_io_ctx; // NOLINT
68 std::shared_ptr<ObservableStatus> m_status; // NOLINT
69 std::shared_ptr<ObservableEventLog> m_event_log; // NOLINT
71 std::shared_ptr<PrimSource::RrClient> m_prim_rr_client; // NOLINT
72 std::shared_ptr<MetaSource::RrClient> m_meta_rr_client; // NOLINT
75};
76
77template <class Iterator>
78void SetSourceState(Iterator begin, Iterator end, State state) {
79 auto it = begin;
80 for (; it != end; ++it) {
81 it->SetState(state);
82 }
83}
84
86 SetSourceState(params.meta_sources.begin(), params.meta_sources.end(), state);
87 SetSourceState(params.prim_sources.begin(), params.prim_sources.end(), state);
88}
89
90/**
91 *
92 * Developer notes:
93 * OcmDaqController use boost::when_all to compose futures. This does not support executors and will
94 * spawn a thread to perform the work. This means that the tests will either have to block
95 * indefinitely with future::get() or use a timeout.
96 *
97 * @ingroup daq_ocm_libdaq_test
98 */
99struct TestState : ::testing::Test {
100 std::shared_ptr<PrimSource::RrClient> m_prim_rr_client; // NOLINT
101 std::shared_ptr<MetaDaqAsyncMock> m_meta_rr_client; // NOLINT
102 std::shared_ptr<MetaDaqAsyncMock> m_meta_rr_client2; // NOLINT
103 std::unique_ptr<MockAsyncOperations> m_mock_ops; // NOLINT
105
106 boost::asio::io_context m_io_ctx; // NOLINT
109 std::shared_ptr<ObservableStatus> m_status; // NOLINT
110 std::shared_ptr<ObservableEventLog> m_event_log; // NOLINT
111 std::shared_ptr<OcmDaqController> m_daq; // NOLINT
114 std::vector<DpPart> m_files; // NOLINT
116
118 : m_io_ctx()
120 , m_status(std::make_shared<ObservableStatus>("id", "fileid"))
121 , m_event_log(std::make_shared<ObservableEventLog>()) {
122 }
123
124 void SetUp() override {
125 m_files.emplace_back("foo", "bar");
126 m_keywords.emplace_back(fits::EsoKeyword("FOO", "BAR"));
127 m_keywords.emplace_back(fits::ValueKeyword("FOO", "BAR"));
128
129 m_prim_rr_client = std::make_shared<RecCmdsAsyncMock>();
130 m_meta_rr_client = std::make_shared<MetaDaqAsyncMock>();
131 m_meta_rr_client2 = std::make_shared<NiceMock<MetaDaqAsyncMock>>();
133 m_mock_ops.swap(std::get<std::unique_ptr<MockAsyncOperations>>(tup));
134
135 // Connect listener
136 // @todo: Add expectations for observer
137 // m_status->ConnectObserver(std::reference_wrapper(m_observer));
138
139 MetaSource s1("meta-source-1", m_meta_rr_client);
140 MetaSource s2("meta-source-2", m_meta_rr_client2);
141 m_context.id = "id";
142 m_sources.GetPrimarySources() = std::vector<daq::PrimSource>{},
143 m_sources.GetMetadataSources() = std::vector<daq::MetaSource>{s1, s2},
144
146 m_daq = std::make_shared<daq::OcmDaqController>(m_io_ctx,
148 m_context,
149 m_sources,
150 m_status,
152 std::get<OcmAsyncOperations>(tup));
153
155 ASSERT_EQ(m_status->GetState(), m_daq->GetState());
156 }
157
158 void TearDown() override {
159 m_daq.reset();
160 m_meta_rr_client.reset();
161 m_meta_rr_client2.reset();
162 m_prim_rr_client.reset();
163 }
164
165 virtual void PreDaqControllerHook() {
166 }
167 virtual void PreStartAsyncHook() {
168 }
169
170 /**
171 * Executes a successful StartAsync() call
172 */
173 void StartDaq() {
174 // Setup
175 // Set up mock so that op::StartAsync invocation returns the future from our promise.
176 boost::promise<void> reply_promise;
177 std::optional<op::AsyncOpParams> params;
178 EXPECT_CALL(*m_mock_ops, Start(_))
179 .WillOnce(DoAll(Invoke([&](auto p) { params.emplace(p); }),
180 Return(ByMove(reply_promise.get_future()))));
182
183 // Run
184 auto fut = m_daq->StartAsync();
185 EXPECT_EQ(State::Starting, m_daq->GetState());
186 EXPECT_FALSE(fut.is_ready());
187
188 // "Send reply"
189 reply_promise.set_value();
190 ASSERT_TRUE(params);
191 SetAllSourceState(*params, State::Acquiring);
192
193 // Execute scheduled handlers
195
196 ASSERT_TRUE(fut.is_ready());
197 EXPECT_EQ(State::Acquiring, fut.get());
198 }
199
200 void AbortDaq() {
201 // Setup
202 // Set up mock so that op::StartAsync invocation returns the future from our promise.
203 std::optional<op::AsyncOpParams> params;
204 boost::promise<Result<void>> reply_promise;
205 EXPECT_CALL(*m_mock_ops, Abort(ErrorPolicy::Strict, _))
206 .WillOnce(DoAll(Invoke([&](auto policy, auto p) { params.emplace(p); }),
207 Return(ByMove(reply_promise.get_future()))));
208
209 // Run
210 auto fut = m_daq->AbortAsync(ErrorPolicy::Strict);
211
212 EXPECT_EQ(State::AbortingAcquiring, m_daq->GetState())
213 << "Expected state to be in Stopping after requesting to abort";
214
215 // "Send reply"
216 reply_promise.set_value({false});
217 ASSERT_TRUE(params);
218 SetAllSourceState(*params, State::Aborted);
219
220 // Execute handlers
222
223 ASSERT_TRUE(fut.is_ready());
224 ASSERT_FALSE(fut.has_exception()) << "Future has unexpected exception!";
225 auto result = fut.get();
226 EXPECT_EQ(State::Aborted, result.state);
227 EXPECT_FALSE(HasError(result));
228 EXPECT_EQ(State::Aborted, m_daq->GetState());
229 }
230
231 void StopDaq() {
232 std::optional<op::AsyncOpParams> params;
233 boost::promise<Result<DpParts>> reply_promise;
234
235 EXPECT_CALL(*m_mock_ops, Stop(ErrorPolicy::Strict, _))
236 .WillOnce(DoAll(Invoke([&](auto policy, auto p) { params.emplace(p); }),
237 Return(ByMove(reply_promise.get_future()))));
238
239 // Run
240 auto fut = m_daq->StopAsync(ErrorPolicy::Strict);
241
242 EXPECT_EQ(State::Stopping, m_daq->GetState())
243 << "Expected state to be in Stopping after requesting to stop";
244
245 ASSERT_TRUE(params);
246 SetAllSourceState(*params, State::Stopped);
247
248 // "Send reply"
249 Result<DpParts> reply{false, m_files};
250 reply_promise.set_value(reply);
251
252 // Execute handlers
254
255 ASSERT_TRUE(fut.is_ready());
256 ASSERT_FALSE(fut.has_exception()) << "Future has unexpected exception!";
257 auto status = fut.get();
258 EXPECT_EQ(State::Stopped, status.state)
259 << "Expected state to be Stopped since there were no errors";
260 EXPECT_FALSE(HasError(status));
261 EXPECT_EQ(State::Stopped, m_daq->GetState());
262 }
263};
264
266
268
271 m_status->SetState(State::Acquiring);
272 }
273};
274
277 m_status->SetState(State::Stopped);
278 }
279};
280
281/**
282 * Fixture for
283 */
286 // Add a primary source, which was not needed for other tests
287 PrimSource s1("prim-source-1", m_prim_rr_client);
288 m_sources.GetPrimarySources() = std::vector<daq::PrimSource>{s1};
289 }
291 EXPECT_CALL(*m_mock_ops, AwaitPrim(_))
292 .WillOnce(Return(ByMove(m_await_promise.get_future())));
293 }
294
295 boost::promise<Result<DpParts>> m_await_promise; // NOLINT
296};
297
298// Simple test to understand boost::future::unwrap()
299TEST(TestBoost, Unwrap) {
300 EXPECT_THROW(boost::make_ready_future()
301 .then([](auto f) -> boost::future<void> {
302 try {
303 throw std::runtime_error("Meow");
304 } catch (...) {
305 return boost::make_exceptional_future<void>();
306 }
307 })
308 .unwrap()
309 .get(),
310 std::runtime_error);
311}
312
313TEST_F(TestOcmDaqControllerLifeCycle, ConstructorFailsIfNoSourcesAreProvided) {
314 ASSERT_THROW(
316 m_io_ctx, m_mock_formatter, m_context, m_sources, m_status, m_event_log, m_ops),
317 std::invalid_argument);
318}
319
320TEST_F(TestOcmDaqControllerLifeCycle, ConstructorFailsObservableStatusIdDoesNotMatchDaqContextId) {
321 MetaSource s("source-id", m_meta_rr_client);
322 m_sources.GetMetadataSources() = {s};
323 m_context.id = "not-id";
324 ASSERT_THROW(
326 m_io_ctx, m_mock_formatter, m_context, m_sources, m_status, m_event_log, m_ops),
327 std::invalid_argument);
328}
329
330TEST_F(TestOcmDaqControllerLifeCycle, ConstructorFailsIfAsyncOperationIsInvalid) {
331 MetaSource s("source-id", m_meta_rr_client);
332 m_sources.GetMetadataSources() = {s};
333 {
334 auto ops = m_ops;
335 ops.start = {};
336 ASSERT_THROW(
338 m_io_ctx, m_mock_formatter, m_context, m_sources, m_status, m_event_log, ops),
339 std::invalid_argument);
340 }
341 {
342 auto ops = m_ops;
343 ops.stop = {};
344 ASSERT_THROW(
346 m_io_ctx, m_mock_formatter, m_context, m_sources, m_status, m_event_log, ops),
347 std::invalid_argument);
348 }
349 {
350 auto ops = m_ops;
351 ops.abort = {};
352 ASSERT_THROW(
354 m_io_ctx, m_mock_formatter, m_context, m_sources, m_status, m_event_log, ops),
355 std::invalid_argument);
356 }
357 {
358 auto ops = m_ops;
359 ops.await_prim = {};
360 ASSERT_THROW(
362 m_io_ctx, m_mock_formatter, m_context, m_sources, m_status, m_event_log, ops),
363 std::invalid_argument);
364 }
365}
366
367TEST_F(TestOcmDaqControllerLifeCycle, ConstructorSucceedsIfSingleMetadataSourceIsUsed) {
368 MetaSource s("source-id", m_meta_rr_client);
369 m_sources.GetMetadataSources() = {s};
370 ASSERT_FALSE(m_sources.GetMetadataSources().empty());
372 m_io_ctx, m_mock_formatter, m_context, m_sources, m_status, m_event_log, m_ops);
373}
374
375TEST_F(TestOcmDaqControllerLifeCycle, DestructionAbortsAsyncWait) {
376 MetaSource s("source-id", m_meta_rr_client);
377 m_sources.GetMetadataSources() = {s};
378 boost::future<State> fut;
379 {
381 m_io_ctx, m_mock_formatter, m_context, m_sources, m_status, m_event_log, m_ops);
382 fut = daq->AwaitAsync({"source-id"}, 100ms);
383 ASSERT_FALSE(fut.is_ready());
384 }
385
386 MakeTestProgress(m_io_ctx, &fut);
387 ASSERT_TRUE(fut.is_ready())
388 << "Future should have been cancelled since daq should have been deleted.";
389 EXPECT_TRUE(fut.has_exception());
390 EXPECT_THROW(fut.get(), DaqOperationAborted);
391}
392
394 ASSERT_EQ(State::NotStarted, m_daq->GetState()) << "The initial state should be NotStarted";
395}
396
397TEST_F(TestState, GetStatusReturnsSameStatusObject) {
398 auto status_ptr = m_daq->GetStatus();
399 EXPECT_EQ(status_ptr.get(), m_status.get());
400}
401
402TEST_F(TestState, CannotStopStoppedOcmDaqController) {
403 SCOPED_TRACE("CannotStopStoppedOcmDaqController");
404 StartDaq();
405 StopDaq();
406
407 ASSERT_EQ(State::Stopped, m_daq->GetState()) << "Setup failed";
408
409 // Try to stop again
410 auto fut = m_daq->StopAsync(ErrorPolicy::Strict);
411 EXPECT_TRUE(fut.has_exception());
412 EXPECT_THROW(fut.get(), std::exception);
413}
414
415TEST_F(TestState, CannotAbortStoppedOcmDaqController) {
416 SCOPED_TRACE("CannotAbortStoppedOcmDaqController");
417 StartDaq();
418 StopDaq();
419
420 ASSERT_EQ(State::Stopped, m_daq->GetState()) << "Setup failed";
421
422 // Try to stop again
423 auto fut = m_daq->AbortAsync(ErrorPolicy::Strict);
424 EXPECT_TRUE(fut.has_exception());
425 EXPECT_THROW(fut.get(), std::exception);
426}
427
428TEST_F(TestState, StartingFailsToSendStartDaqWillAbortAndSetErrorFlagAndStayInStarting) {
429 // Setup
430 boost::promise<void> reply_promise;
431 EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(reply_promise.get_future())));
432
433 // Run
434 auto fut = m_daq->StartAsync();
435 EXPECT_EQ(State::Starting, m_daq->GetState());
436
437 // Set up mock future so that it results in an exception exception.
438 reply_promise.set_exception(std::runtime_error("Fake test failure"));
439
440 // Run async handlers
441 MakeTestProgress(m_io_ctx, &fut);
442
443 ASSERT_TRUE(fut.is_ready());
444 EXPECT_TRUE(fut.has_exception()) << "Expected future to contain exception";
445 EXPECT_THROW(fut.get(), std::exception) << "Expected exception to derive from std::exception";
446 EXPECT_EQ(true, m_daq->GetErrorFlag());
447}
448
449TEST_F(TestState, StartAsyncReturnsExceptionalFutureInStateStarting) {
450 // Setup
451 boost::promise<void> reply_promise;
452 EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(reply_promise.get_future())));
453
454 auto fut = m_daq->StartAsync();
455 ASSERT_EQ(State::Starting, m_daq->GetState());
456 EXPECT_FALSE(fut.is_ready());
457
458 // Run
459 // @todo: Shouldn't this be communicated through the future?
460 auto fut2 = m_daq->StartAsync();
461 ASSERT_TRUE(fut2.has_exception());
462 EXPECT_THROW(fut2.get(), std::exception)
463 << "Multiple simultaneous start operations are not supported and an exception "
464 "was exected";
465
466 // Complete pending operations to avoid "leaking" mock objects
467 // "Send reply"
468 reply_promise.set_value();
469
470 // Make progress
471 MakeTestProgress(m_io_ctx, &fut);
472}
473
474/**
475 * It's possible to abort but not stop (and keep)
476 */
477TEST_F(TestState, StopAsyncThrowsIfNotStarted) {
478 auto fut = m_daq->StopAsync(ErrorPolicy::Strict);
479 EXPECT_THROW(fut.get(), std::exception)
480 << "It should not be possible to stop a data acquisition that has not started";
481}
482
483TEST_F(TestState, StopAsyncDoesNotThrowWithTolerantPolicy) {
484 // Setup
485 StartDaq();
486
487 std::optional<op::AsyncOpParams> params;
488 boost::promise<Result<DpParts>> reply_promise;
489 EXPECT_CALL(*m_mock_ops, Stop(ErrorPolicy::Tolerant, _))
490 .WillOnce(DoAll(Invoke([&](auto policy, auto p) { params.emplace(p); }),
491 Return(ByMove(reply_promise.get_future()))));
492
493 // Since there are no primary sources in TestState fixture we expect no abort
494 EXPECT_CALL(m_mock_ops->mock_abort, Abort()).Times(0);
495
496 // Run
497 auto fut = m_daq->StopAsync(ErrorPolicy::Tolerant);
498
499 EXPECT_EQ(State::Stopping, m_daq->GetState())
500 << "Expected state to be in Stopping after requesting to stop";
501
502 // "Send reply" with error set
503 Result<DpParts> reply{true, {}};
504 reply_promise.set_value(reply);
505 ASSERT_TRUE(params);
506 // Since we are forcing stop it should be acceptable that sources are not stopped.
507 SetAllSourceState(*params, State::Stopping);
508
509 // Execute handlers
510 MakeTestProgress(m_io_ctx, &fut);
511
512 ASSERT_TRUE(fut.is_ready());
513 ASSERT_FALSE(fut.has_exception()) << "Future has unexpected exception!";
514 auto status = fut.get();
515 EXPECT_EQ(State::Stopped, status.state)
516 << "Expected state to be Stopped since there were no errors";
517 EXPECT_TRUE(HasError(status))
518 << "Error flag should be set since the reply_promise had an error";
519 EXPECT_EQ(State::Stopped, m_daq->GetState());
520}
521
522/**
523 * It should be possible to abort a data acquisition even if it's not started.
524 */
525TEST_F(TestState, AbortAsyncIsOkIfNotStarted) {
526 auto fut = m_daq->AbortAsync(ErrorPolicy::Strict);
527 ASSERT_TRUE(fut.is_ready())
528 << "Aborting a NotStarted data acquisition should be ready immediately";
529 EXPECT_FALSE(fut.has_exception()) << "Future should not have failed";
530
531 auto result = fut.get();
532 EXPECT_EQ(State::Aborted, result.state) << "Unexpected state";
533 EXPECT_FALSE(HasError(result));
534 EXPECT_EQ(State::Aborted, m_daq->GetState());
535}
536
537/**
538 * It's possible to abort but not stop (and keep) if data acquisition is starting
539 */
540TEST_F(TestState, StopAsyncThrowsIfStarting) {
541 // Setup
542 boost::promise<void> reply_promise;
543 EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(reply_promise.get_future())));
544
545 auto fut = m_daq->StartAsync();
546 ASSERT_EQ(State::Starting, m_daq->GetState())
547 << "Setup failed, unexpected state, aborting test";
548 ASSERT_FALSE(fut.is_ready());
549
550 // Run
551 {
552 auto fut = m_daq->StopAsync(ErrorPolicy::Strict);
553 EXPECT_TRUE(fut.has_exception())
554 << "Cannot stop unless DAQ is in State::Acquiring, current state: "
555 << m_daq->GetState();
556
557 EXPECT_THROW(fut.get(), std::exception)
558 << "Cannot stop if data acquisition is `Starting`. An exeption was expected";
559 }
560 // Complete pending operations to avoid "leaking" mock objects
561 // "Send reply"
562 reply_promise.set_value();
563
564 // Make progress
565 MakeTestProgress(m_io_ctx, &fut);
566}
567
568/**
569 * Test sequence:
570 *
571 * 1. Send StartDaq
572 * 2. Send AbortDaq
573 * 3. StartDaq still succeeds in this case (simulates serial handling of client requests at source).
574 * 4. AbortDaq suceeds.
575 */
576TEST_F(TestState, AbortingIsOkWhenStarting) {
577 SCOPED_TRACE("AbortingIsOkWhenStarting");
578 // Setup
579 // Set up mock so that StartDaq invocation returns the future from our promise.
580 boost::promise<void> start_promise;
581 EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(start_promise.get_future())));
582
583 // Run
584 //
585 // Start data acquisition
586 auto start_fut = m_daq->StartAsync();
587 ASSERT_EQ(State::Starting, m_daq->GetState());
588 EXPECT_FALSE(start_fut.is_ready());
589
590 // Setup
591 // And ditto for Abort
592 boost::promise<Result<void>> abort_promise;
593 EXPECT_CALL(*m_mock_ops, Abort(ErrorPolicy::Strict, _))
594 .WillOnce(Return(ByMove(abort_promise.get_future())));
595
596 // Run
597 //
598 // Abort data acquisition
599 auto abort_fut = m_daq->AbortAsync(ErrorPolicy::Strict);
600
601 // Complete pending operations to avoid "leaking" mock objects
602 // "Send reply"
603 start_promise.set_value();
604
605 // Make progress
606 MakeTestProgress(m_io_ctx, &start_fut);
607
608 ASSERT_TRUE(start_fut.is_ready()) << "Cannot proceed with test since future is not ready";
609 EXPECT_FALSE(start_fut.has_exception())
610 << "Mock did not simulate failure so future should be ok";
611 // @todo: What state do we expect to be in?
612
613 // Complete pending operations to avoid "leaking" mock objects
614 // "Send reply"
615 abort_promise.set_value({false});
616
617 // Make progress
618 MakeTestProgress(m_io_ctx, &abort_fut);
619
620 ASSERT_TRUE(abort_fut.is_ready()) << "Cannot proceed with test since future is not ready";
621 EXPECT_FALSE(abort_fut.has_exception())
622 << "Mock didn't simulate failure so future should be OK";
623 auto result = abort_fut.get();
624 EXPECT_EQ(State::Aborted, result.state);
625 EXPECT_FALSE(HasError(result));
626 EXPECT_EQ(State::Aborted, m_daq->GetState());
627}
628
629TEST_F(TestState, StartAsyncCompletesSuccessfully) {
630 SCOPED_TRACE("Acquiring");
631
632 StartDaq();
633}
634
635TEST_F(TestState, AbortOcmDaqControllerInStateAborting) {
636 // Setup
637 SCOPED_TRACE("AbortOcmDaqControllerInStateAborting");
638 StartDaq();
639
640 ASSERT_EQ(State::Acquiring, m_daq->GetState()) << "Test Setup failed";
641
642 AbortDaq();
643 ASSERT_EQ(State::Aborted, m_daq->GetState()) << "Test setup failed";
644
645 // Test that abort fails if daq is already aborted
646 auto fut = m_daq->AbortAsync(ErrorPolicy::Strict);
647 ASSERT_TRUE(fut.is_ready());
648 EXPECT_THROW(fut.get(), std::runtime_error);
649}
650
651TEST_F(TestState, AbortOcmDaqControllerInStateStarting) {
652 // Setup
653 SCOPED_TRACE("AbortOcmDaqControllerInStateStarting");
654 StartDaq();
655 ASSERT_EQ(State::Acquiring, m_daq->GetState()) << "Test Setup failed";
656
657 // Test
658 AbortDaq();
659 EXPECT_EQ(State::Aborted, m_daq->GetState());
660}
661
662TEST_F(TestState, AbortAsyncReturnsWithErrorInsteadOfExceptionForTolerantPolicy) {
663 // Setup
664 SCOPED_TRACE("NewAbortSupersedesFailedAbort");
665 StartDaq();
666
667 boost::promise<Result<void>> abort_promise_1;
668
669 // Expect two calls to abort since the first one will fail
670 EXPECT_CALL(*m_mock_ops, Abort(ErrorPolicy::Tolerant, _))
671 .WillOnce(Return(ByMove(abort_promise_1.get_future())));
672
673 // Run
674 auto fut1 = m_daq->AbortAsync(ErrorPolicy::Tolerant);
675
676 EXPECT_EQ(State::AbortingAcquiring, m_daq->GetState())
677 << "Expected state to be in Stopping after requesting to abort";
678
679 // "Send reply1" where first source fails and second is ok.
680 abort_promise_1.set_value({true});
681 MakeTestProgress(m_io_ctx, &fut1);
682
683 ASSERT_TRUE(fut1.has_value());
684
685 auto result1 = fut1.get();
686 EXPECT_EQ(State::Aborted, result1.state);
687 EXPECT_TRUE(HasError(result1));
688 EXPECT_EQ(State::Aborted, m_daq->GetState());
689}
690
691/**
692 * It is possible to abort even though an abort operation has already been started.
693 * Nothing special happens in this case though.
694 * @todo The command bein superseeded should probably fail.
695 */
696TEST_F(TestState, NewAbortSupersedesSuccessfulAbort) {
697 // Setup
698 SCOPED_TRACE("NewAbortSupersedesSuccessfulAbort");
699 StartDaq();
700
701 // First abort
702 boost::promise<Result<void>> abort_promise_1;
703 // Second abort
704 boost::promise<Result<void>> abort_promise_2;
705
706 // Expect two calls to abort
707 EXPECT_CALL(*m_mock_ops, Abort(ErrorPolicy::Strict, _))
708 .Times(2)
709 .WillOnce(Return(ByMove(abort_promise_1.get_future())))
710 .WillOnce(Return(ByMove(abort_promise_2.get_future())));
711
712 // Run
713 // Launch async operations concurrently
714 auto fut1 = m_daq->AbortAsync(ErrorPolicy::Strict);
715 auto fut2 = m_daq->AbortAsync(ErrorPolicy::Strict);
716
717 EXPECT_EQ(State::AbortingAcquiring, m_daq->GetState())
718 << "Expected state to be in Stopping after requesting to abort";
719
720 // "Send reply1" to cause interleaving
721 abort_promise_1.set_value({false});
722 MakeTestProgress(m_io_ctx, &fut1);
723
724 ASSERT_TRUE(fut1.is_ready());
725 ASSERT_FALSE(fut1.has_exception()) << "Future has unexpected exception!";
726 auto result1 = fut1.get();
727 EXPECT_EQ(State::Aborted, result1.state);
728 EXPECT_FALSE(HasError(result1));
729 EXPECT_EQ(State::Aborted, m_daq->GetState());
730
731 // "Send reply2"
732 abort_promise_2.set_value({false});
733 MakeTestProgress(m_io_ctx, &fut2);
734 auto result2 = fut2.get();
735 EXPECT_EQ(State::Aborted, result2.state);
736 EXPECT_FALSE(HasError(result2));
737 EXPECT_EQ(State::Aborted, m_daq->GetState());
738}
739
740TEST_F(TestState, NewAbortSupersedesFailedAbortWithStrictPolicy) {
741 // Setup
742 SCOPED_TRACE("NewAbortSupersedesFailedAbort");
743 StartDaq();
744
745 boost::promise<Result<void>> abort_promise_1;
746 // Second abort
747 boost::promise<Result<void>> abort_promise_2;
748
749 // Expect two calls to abort since the first one will fail
750 EXPECT_CALL(*m_mock_ops, Abort(ErrorPolicy::Strict, _))
751 .Times(2)
752 .WillOnce(Return(ByMove(abort_promise_1.get_future())))
753 .WillOnce(Return(ByMove(abort_promise_2.get_future())));
754
755 // Run
756 auto fut1 = m_daq->AbortAsync(ErrorPolicy::Strict);
757
758 EXPECT_EQ(State::AbortingAcquiring, m_daq->GetState())
759 << "Expected state to be in Stopping after requesting to abort";
760
761 // "Send reply1" where first source fails and second is ok.
762 abort_promise_1.set_exception(DaqSourceErrors(std::vector<std::exception_ptr>()));
763 MakeTestProgress(m_io_ctx, &fut1);
764
765 ASSERT_TRUE(fut1.is_ready());
766 ASSERT_TRUE(fut1.has_exception()) << "Future has unexpected exception!";
767 EXPECT_THROW(fut1.get(), DaqSourceErrors);
768 EXPECT_EQ(State::AbortingAcquiring, m_daq->GetState());
769
770 // Abort again, this time it works.
771 auto fut2 = m_daq->AbortAsync(ErrorPolicy::Strict);
772
773 // "Send reply2"
774 abort_promise_2.set_value({false});
775
776 MakeTestProgress(m_io_ctx, &fut2);
777 ASSERT_TRUE(fut2.has_value());
778
779 auto result2 = fut2.get();
780 EXPECT_EQ(State::Aborted, result2.state);
781 EXPECT_FALSE(HasError(result2));
782 EXPECT_EQ(State::Aborted, m_daq->GetState());
783}
784
785TEST_F(TestState, StopOcmDaqControllerSuccessfully) {
786 // Setup
787 SCOPED_TRACE("StopOcmDaqControllerSuccessfully");
788 StartDaq();
789
790 ASSERT_EQ(State::Acquiring, m_daq->GetState()) << "Test Setup failed";
791
792 StopDaq();
793}
794
795TEST_F(TestAwait, AwaitNonExistantSourceFails) {
796 // Run
797 auto fut = m_daq->AwaitAsync({"non-existant"}, 0ms);
798 ASSERT_TRUE(fut.has_exception());
799 EXPECT_THROW(fut.get(), std::invalid_argument);
800}
801
802TEST_F(TestAwait, AwaitTimeout) {
803 // Run
804 auto fut = m_daq->AwaitAsync({"meta-source-1"}, 1ms);
805 MakeTestProgress(m_io_ctx, &fut);
806
807 ASSERT_TRUE(fut.has_exception());
808 EXPECT_THROW(fut.get(), DaqOperationTimeout);
809}
810
811TEST_F(TestAwaitWithPrimSource, AwaitStopSingleSourceIsOk) {
812 SCOPED_TRACE("AwaitSingleSourceIsOk");
813 // Setup
814 auto fut = m_daq->AwaitAsync({"meta-source-1"}, 300ms); // large timeout needed under valgrind
815 EXPECT_FALSE(fut.is_ready())
816 << "The future shouldn't be ready yet as we haven't started the data acquisition!";
817
818 // Run
819 StartDaq();
820 EXPECT_FALSE(fut.is_ready())
821 << "Wait condition not fulfilled, so future should not be ready yet";
822 // Stop should cancel await op
823 EXPECT_CALL(m_mock_ops->mock_abort, Abort()).WillOnce(Return(true));
824 StopDaq();
825
826 ASSERT_TRUE(fut.is_ready());
827 ASSERT_FALSE(fut.has_exception());
828 auto state = fut.get();
829 EXPECT_TRUE(state == State::Stopping || state == State::Stopped)
830 << "Await condition should have been ready once source is stopped (meaning that DAQ is "
831 "Stopping or Stopped, depending on the order of the source)";
832}
833
834TEST_F(TestAwaitWithPrimSource, AwaitAbortAllSources) {
835 SCOPED_TRACE("AwaitAbortAllMetadataSources");
836 // Setup
837 auto fut = m_daq->AwaitAsync({"meta-source-1", "meta-source-2"}, 150ms);
838 EXPECT_FALSE(fut.is_ready())
839 << "The future shouldn't be ready yet as we haven't started the data acquisition!";
840
841 // Run
842 StartDaq();
843 EXPECT_FALSE(fut.is_ready());
844
845 // Abort should cancel await op
846 EXPECT_CALL(m_mock_ops->mock_abort, Abort()).WillOnce(Return(true));
847 AbortDaq();
848
849 EXPECT_TRUE(fut.is_ready());
850 ASSERT_FALSE(fut.has_exception());
851 auto state = fut.get();
852 EXPECT_TRUE(state == State::AbortingAcquiring || state == State::Aborted)
853 << "Await condition should have been ready once source is stopped (meaning that DAQ is "
854 "Aborting or Aborted, depending on the order of the source)";
855}
856
857TEST_F(TestAwait, AwaitStopSingleSourceWhenConditionIsFulfilled) {
858 SCOPED_TRACE("AwaitStopSingleSourceWhenConditionIsFulfilled");
859 // Setup
860 // Run
861 StartDaq();
862 StopDaq();
863
864 auto fut = m_daq->AwaitAsync({"meta-source-1"}, 150ms);
865
866 EXPECT_TRUE(fut.is_ready()) << "Condition already fulfilled so future should be ready";
867 ASSERT_FALSE(fut.has_exception());
868 EXPECT_EQ(State::Stopped, fut.get());
869}
870
871TEST_F(TestNotStarted, CanUpdateKeywords) {
872 // Setup
873 EXPECT_CALL(m_mock_formatter, Format(_))
874 .Times(m_keywords.size())
875 .WillRepeatedly(Invoke(
876 static_cast<fits::LiteralKeyword (*)(fits::KeywordVariant const&)>(&fits::Format)));
877
878 // Run
879 m_daq->UpdateKeywords(m_keywords);
880 fits::KeywordVector expected;
881 expected.emplace_back(std::in_place_type<fits::LiteralKeyword>, "HIERARCH ESO FOO = 'BAR' /");
882 expected.emplace_back(std::in_place_type<fits::LiteralKeyword>,
883 "FOO = 'BAR' /");
884 EXPECT_EQ(expected, m_daq->GetContext().keywords);
885}
886
887TEST_F(TestNotStarted, UpdateKeywordFailureDoesNotUpdateKeywords) {
888 // Setup
890 kws.emplace_back(std::in_place_type<fits::ValueKeyword>,
891 "DATE-OBS",
892 "2019-12-12T04:25:48.0068",
893 "Observing date");
894 kws.emplace_back(std::in_place_type<fits::ValueKeyword>, "BAR", "BAZ");
895 EXPECT_CALL(m_mock_formatter, Format(_))
896 .WillOnce(
897 Return(fits::LiteralKeyword("DATE-OBS= '2019-12-12T04:25:48.0068' / Observing date")))
898 .WillOnce(Throw(fits::InvalidKeyword("FOO", "reason")));
899
900 // Run
901 EXPECT_THROW(m_daq->UpdateKeywords(kws), fits::InvalidKeyword);
902
903 EXPECT_EQ(m_daq->GetContext().keywords.size(), 0u)
904 << "No keywords should be added if any failed";
905}
906
907TEST_F(TestStopped, CannotUpdateKeywordsInStopped) {
908 // Setup
909
910 // Run
911 EXPECT_THROW(m_daq->UpdateKeywords(m_keywords), std::runtime_error);
912}
913
915 SCOPED_TRACE("StartWillAwait");
916 // Setup
917 // Run
918 StartDaq();
919}
920
921/**
922 * Tests that DaqController automatically stops DAQ when the await-op completes.
923 */
925 SCOPED_TRACE("AutomaticStop");
926 // Setup
927 StartDaq();
928
929 // Run
930 // DaqController is monitoring the completion of all primary data sources.
931 // By setting the value we simulate that completion. This should then trigger StopDaq.
932 DpPart prim_part{"s1", "/tmp/file.fits"};
933 m_await_promise.set_value({false, {prim_part}});
934
935 // Setup expectations for stopping
936 Result<DpParts> stop_op_reply{false, m_files};
937 EXPECT_CALL(*m_mock_ops, Stop(ErrorPolicy::Strict, _))
938 .WillOnce(Return(ByMove(boost::make_ready_future<Result<DpParts>>(stop_op_reply))));
939 ASSERT_EQ(State::Acquiring, this->m_status->GetState());
940 EXPECT_EQ(0u, m_daq->GetContext().results.size());
941
942 // There's no future to await-on, so we run until the observed state changes instead.
944 m_io_ctx, [this]() -> bool { return this->m_status->GetState() == State::Stopped; });
945 EXPECT_FALSE(m_status->HasError());
946 EXPECT_EQ(2u, m_daq->GetContext().results.size()) << "One from m_files (metadata), "
947 "one from primary";
948 EXPECT_THAT(m_daq->GetContext().results, Contains(prim_part));
949}
Contains declaration for the AbortAsync operation.
Contains declaration for the AwaitPrimAsync operation.
Started operation was aborted.
Definition: error.hpp:48
Started operation timed out.
Definition: error.hpp:58
Exception thrown to carry reply errors.
Definition: error.hpp:85
Data acquisition sources.
Definition: source.hpp:186
std::vector< MetaSource > const & GetMetadataSources() const noexcept
Definition: source.hpp:200
std::vector< PrimSource > const & GetPrimarySources() const noexcept
Definition: source.hpp:192
Provides information of the location and source of a FITS file or keywords produced by a data acquisi...
Definition: dpPart.hpp:26
Keeps relevant state to be able to communicate with a primary data source.
Definition: source.hpp:142
Stores data acquisition status and allows subscription to status changes.
Definition: eventLog.hpp:107
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:224
static std::shared_ptr< OcmDaqController > Create(boost::asio::io_context &io_context, fits::KeywordFormatter const &kw_formatter, DaqContext context, DaqSources const &sources, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, OcmAsyncOperations operations)
Construct object.
Keeps relevant state to be able to communicate with a primary data source.
Definition: source.hpp:98
Indicates keyword is invalid for some reason.
Definition: keyword.hpp:534
Represents the literal 80-character FITS keyword record.
Definition: keyword.hpp:129
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
Contains error related declarations for DAQ.
virtual void PreDaqControllerHook()
std::shared_ptr< MetaSource::RrClient > m_meta_rr_client
void SetUp() override
std::vector< DpPart > m_files
std::shared_ptr< ObservableEventLog > m_event_log
fits::KeywordVector m_keywords
std::shared_ptr< MetaDaqAsyncMock > m_meta_rr_client2
StatusObserverMock m_observer
std::shared_ptr< MetaDaqAsyncMock > m_meta_rr_client
rad::IoExecutor m_executor
std::shared_ptr< OcmDaqController > m_daq
std::shared_ptr< PrimSource::RrClient > m_prim_rr_client
DaqContext m_context
boost::asio::io_context m_io_ctx
KeywordFormatterMock m_mock_formatter
void StartDaq()
Executes a successful StartAsync() call.
void TearDown() override
boost::asio::io_context m_io_ctx
virtual void PreStartAsyncHook()
std::shared_ptr< ObservableStatus > m_status
std::unique_ptr< MockAsyncOperations > m_mock_ops
std::shared_ptr< ObservableStatus > m_status
std::shared_ptr< ObservableEventLog > m_event_log
DaqSources m_sources
std::shared_ptr< PrimSource::RrClient > m_prim_rr_client
Fixture for daq::DaqController life cycle tests.
void MakeTestProgress(boost::asio::io_context &io_ctx, Future *fut=nullptr)
Test helper that progress the test by executing pending jobs and optionally wait for a future to be r...
Definition: utils.hpp:44
Simple observer used for testing.
Developer notes: OcmDaqController use boost::when_all to compose futures.
Contains declarations for the helper functions to initiate operations.
Mockup of metadaqif classes.
std::tuple< std::unique_ptr< MockAsyncOperations >, daq::OcmAsyncOperations > CreateMockAsyncOperations(rad::IoExecutor &ex)
LiteralKeyword Format(KeywordVariant const &keyword)
Definition: keyword.cpp:782
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
Definition: keyword.hpp:423
std::variant< ValueKeyword, EsoKeyword, LiteralKeyword > KeywordVariant
The different variants of keywords that are supported.
Definition: keyword.hpp:409
R InitiateOperation(rad::IoExecutor &ex, Params &&... params)
Constructs and initiates Op and return the future result.
Definition: initiate.hpp:68
std::pair< R, std::function< bool()> > InitiateAbortableOperation(rad::IoExecutor &ex, Params &&... params)
Like InitiateOperation but in addition to returning the future it also returns an unspecified object ...
Definition: initiate.hpp:78
bool HasError(Status const &status) noexcept
Definition: status.cpp:179
ErrorPolicy
Error policy supported by certain operations.
Definition: error.hpp:26
State
Observable states of the data acquisition process.
Definition: state.hpp:41
@ NotStarted
Initial state of data acquisition.
Utility class that represents a result and an error.
Definition: utility.hpp:17
Mockup of metadaqif classes.
Contains declaration for for DaqController.
Contains declaration for the StartAsync operation.
Contains declaration for the StopAsync operation.
boost::promise< Result< DpParts > > m_await_promise
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:42
std::string id
DAQ identfier, possibly provided by user.
Definition: daqContext.hpp:58
OCM Async operations.
std::function< boost::future< Result< void > >(ErrorPolicy, op::AsyncOpParams)> abort
std::function< AwaitReturnType(op::AwaitOpParams)> await_prim
std::function< boost::future< void >(op::AsyncOpParams)> start
std::function< boost::future< Result< DpParts > >(ErrorPolicy, op::AsyncOpParams)> stop
A type safe version of LiteralKeyword that consist of the three basic components of a FITS keyword ke...
Definition: keyword.hpp:275
A composite async operation that aborts a DAQ.
Definition: abort.hpp:25
Parameters required for each async operation.
std::vector< Source< MetaSource > > & meta_sources
Note: Consider vector immutable!a.
std::vector< Source< PrimSource > > & prim_sources
Note: Consider vector immutable!
Await specific parameters that is not provided with AsyncOpParams.
A composite async operation that awaits primary data sources.
Definition: awaitPrim.hpp:54
A composite async operation that starts DAQ.
Definition: stop.hpp:26
TEST_F(TestOcmDaqControllerLifeCycle, ConstructorFailsIfNoSourcesAreProvided)
void SetAllSourceState(op::AsyncOpParams &params, State state)
void SetSourceState(Iterator begin, Iterator end, State state)
TEST(TestBoost, Unwrap)
EXPECT_EQ(meta.rr_uri, "zpb.rr://meta")
ASSERT_EQ(meta.keyword_rules.size(), 1u)
ASSERT_TRUE(std::holds_alternative< OlasReceiver >(spec.receivers[0]))
Defines shared test utilities.
void MakeTestProgressUntil(boost::asio::io_context &io_ctx, Predicate &&pred, std::chrono::milliseconds timeout=std::chrono::seconds(3))
Executes io_ctx::poll until pred returns true or it times out.
Definition: utils.hpp:22