13#include <gtest/gtest.h>
31using namespace ::testing;
32using namespace std::chrono;
52 boost::future<Result<DpParts>>,
56 boost::future<Result<void>>,
60 boost::future<Result<DpParts>>,
77template <
class Iterator>
80 for (; it != end; ++it) {
111 std::shared_ptr<OcmDaqController>
m_daq;
125 m_files.emplace_back(
"foo",
"bar");
133 m_mock_ops.swap(std::get<std::unique_ptr<MockAsyncOperations>>(tup));
152 std::get<OcmAsyncOperations>(tup));
176 boost::promise<void> reply_promise;
177 std::optional<op::AsyncOpParams> params;
179 .WillOnce(DoAll(Invoke([&](
auto p) { params.emplace(p); }),
180 Return(ByMove(reply_promise.get_future()))));
184 auto fut =
m_daq->StartAsync();
186 EXPECT_FALSE(fut.is_ready());
189 reply_promise.set_value();
203 std::optional<op::AsyncOpParams> params;
204 boost::promise<Result<void>> reply_promise;
205 EXPECT_CALL(*
m_mock_ops, Abort(ErrorPolicy::Strict, _))
206 .WillOnce(DoAll(Invoke([&](
auto policy,
auto p) { params.emplace(p); }),
207 Return(ByMove(reply_promise.get_future()))));
210 auto fut =
m_daq->AbortAsync(ErrorPolicy::Strict);
213 <<
"Expected state to be in Stopping after requesting to abort";
216 reply_promise.set_value({
false});
224 ASSERT_FALSE(fut.has_exception()) <<
"Future has unexpected exception!";
225 auto result = fut.get();
232 std::optional<op::AsyncOpParams> params;
233 boost::promise<Result<DpParts>> reply_promise;
235 EXPECT_CALL(*
m_mock_ops, Stop(ErrorPolicy::Strict, _))
236 .WillOnce(DoAll(Invoke([&](
auto policy,
auto p) { params.emplace(p); }),
237 Return(ByMove(reply_promise.get_future()))));
240 auto fut =
m_daq->StopAsync(ErrorPolicy::Strict);
243 <<
"Expected state to be in Stopping after requesting to stop";
250 reply_promise.set_value(reply);
256 ASSERT_FALSE(fut.has_exception()) <<
"Future has unexpected exception!";
257 auto status = fut.get();
259 <<
"Expected state to be Stopped since there were no errors";
271 m_status->SetState(State::Acquiring);
300 EXPECT_THROW(boost::make_ready_future()
301 .then([](
auto f) -> boost::future<void> {
303 throw std::runtime_error(
"Meow");
305 return boost::make_exceptional_future<void>();
316 m_io_ctx, m_mock_formatter, m_context, m_sources, m_status, m_event_log, m_ops),
317 std::invalid_argument);
322 m_sources.GetMetadataSources() = {s};
323 m_context.id =
"not-id";
326 m_io_ctx, m_mock_formatter, m_context, m_sources, m_status, m_event_log, m_ops),
327 std::invalid_argument);
332 m_sources.GetMetadataSources() = {s};
338 m_io_ctx, m_mock_formatter, m_context, m_sources, m_status, m_event_log, ops),
339 std::invalid_argument);
346 m_io_ctx, m_mock_formatter, m_context, m_sources, m_status, m_event_log, ops),
347 std::invalid_argument);
354 m_io_ctx, m_mock_formatter, m_context, m_sources, m_status, m_event_log, ops),
355 std::invalid_argument);
362 m_io_ctx, m_mock_formatter, m_context, m_sources, m_status, m_event_log, ops),
363 std::invalid_argument);
369 m_sources.GetMetadataSources() = {s};
370 ASSERT_FALSE(m_sources.GetMetadataSources().empty());
372 m_io_ctx, m_mock_formatter, m_context, m_sources, m_status, m_event_log, m_ops);
377 m_sources.GetMetadataSources() = {s};
378 boost::future<State> fut;
381 m_io_ctx, m_mock_formatter, m_context, m_sources, m_status, m_event_log, m_ops);
382 fut =
daq->AwaitAsync({
"source-id"}, 100ms);
383 ASSERT_FALSE(fut.is_ready());
388 <<
"Future should have been cancelled since daq should have been deleted.";
389 EXPECT_TRUE(fut.has_exception());
394 ASSERT_EQ(State::NotStarted, m_daq->GetState()) <<
"The initial state should be NotStarted";
398 auto status_ptr = m_daq->GetStatus();
399 EXPECT_EQ(status_ptr.get(), m_status.get());
403 SCOPED_TRACE(
"CannotStopStoppedOcmDaqController");
407 ASSERT_EQ(State::Stopped, m_daq->GetState()) <<
"Setup failed";
410 auto fut = m_daq->StopAsync(ErrorPolicy::Strict);
411 EXPECT_TRUE(fut.has_exception());
412 EXPECT_THROW(fut.get(), std::exception);
416 SCOPED_TRACE(
"CannotAbortStoppedOcmDaqController");
420 ASSERT_EQ(State::Stopped, m_daq->GetState()) <<
"Setup failed";
423 auto fut = m_daq->AbortAsync(ErrorPolicy::Strict);
424 EXPECT_TRUE(fut.has_exception());
425 EXPECT_THROW(fut.get(), std::exception);
428TEST_F(
TestState, StartingFailsToSendStartDaqWillAbortAndSetErrorFlagAndStayInStarting) {
430 boost::promise<void> reply_promise;
431 EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(reply_promise.get_future())));
434 auto fut = m_daq->StartAsync();
435 EXPECT_EQ(State::Starting, m_daq->GetState());
438 reply_promise.set_exception(std::runtime_error(
"Fake test failure"));
444 EXPECT_TRUE(fut.has_exception()) <<
"Expected future to contain exception";
445 EXPECT_THROW(fut.get(), std::exception) <<
"Expected exception to derive from std::exception";
451 boost::promise<void> reply_promise;
452 EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(reply_promise.get_future())));
454 auto fut = m_daq->StartAsync();
455 ASSERT_EQ(State::Starting, m_daq->GetState());
456 EXPECT_FALSE(fut.is_ready());
460 auto fut2 = m_daq->StartAsync();
462 EXPECT_THROW(fut2.get(), std::exception)
463 <<
"Multiple simultaneous start operations are not supported and an exception "
468 reply_promise.set_value();
478 auto fut = m_daq->StopAsync(ErrorPolicy::Strict);
479 EXPECT_THROW(fut.get(), std::exception)
480 <<
"It should not be possible to stop a data acquisition that has not started";
487 std::optional<op::AsyncOpParams> params;
488 boost::promise<Result<DpParts>> reply_promise;
489 EXPECT_CALL(*m_mock_ops, Stop(ErrorPolicy::Tolerant, _))
490 .WillOnce(DoAll(Invoke([&](
auto policy,
auto p) { params.emplace(p); }),
491 Return(ByMove(reply_promise.get_future()))));
494 EXPECT_CALL(m_mock_ops->mock_abort, Abort()).Times(0);
497 auto fut = m_daq->StopAsync(ErrorPolicy::Tolerant);
499 EXPECT_EQ(State::Stopping, m_daq->GetState())
500 <<
"Expected state to be in Stopping after requesting to stop";
504 reply_promise.set_value(reply);
513 ASSERT_FALSE(fut.has_exception()) <<
"Future has unexpected exception!";
514 auto status = fut.get();
516 <<
"Expected state to be Stopped since there were no errors";
518 <<
"Error flag should be set since the reply_promise had an error";
519 EXPECT_EQ(State::Stopped, m_daq->GetState());
526 auto fut = m_daq->AbortAsync(ErrorPolicy::Strict);
528 <<
"Aborting a NotStarted data acquisition should be ready immediately";
529 EXPECT_FALSE(fut.has_exception()) <<
"Future should not have failed";
531 auto result = fut.get();
532 EXPECT_EQ(State::Aborted, result.state) <<
"Unexpected state";
534 EXPECT_EQ(State::Aborted, m_daq->GetState());
542 boost::promise<void> reply_promise;
543 EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(reply_promise.get_future())));
545 auto fut = m_daq->StartAsync();
546 ASSERT_EQ(State::Starting, m_daq->GetState())
547 <<
"Setup failed, unexpected state, aborting test";
548 ASSERT_FALSE(fut.is_ready());
552 auto fut = m_daq->StopAsync(ErrorPolicy::Strict);
553 EXPECT_TRUE(fut.has_exception())
554 <<
"Cannot stop unless DAQ is in State::Acquiring, current state: "
555 << m_daq->GetState();
557 EXPECT_THROW(fut.get(), std::exception)
558 <<
"Cannot stop if data acquisition is `Starting`. An exeption was expected";
562 reply_promise.set_value();
577 SCOPED_TRACE(
"AbortingIsOkWhenStarting");
580 boost::promise<void> start_promise;
581 EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(start_promise.get_future())));
586 auto start_fut = m_daq->StartAsync();
587 ASSERT_EQ(State::Starting, m_daq->GetState());
588 EXPECT_FALSE(start_fut.is_ready());
592 boost::promise<Result<void>> abort_promise;
593 EXPECT_CALL(*m_mock_ops, Abort(ErrorPolicy::Strict, _))
594 .WillOnce(Return(ByMove(abort_promise.get_future())));
599 auto abort_fut = m_daq->AbortAsync(ErrorPolicy::Strict);
603 start_promise.set_value();
608 ASSERT_TRUE(start_fut.is_ready()) <<
"Cannot proceed with test since future is not ready";
609 EXPECT_FALSE(start_fut.has_exception())
610 <<
"Mock did not simulate failure so future should be ok";
615 abort_promise.set_value({
false});
620 ASSERT_TRUE(abort_fut.is_ready()) <<
"Cannot proceed with test since future is not ready";
621 EXPECT_FALSE(abort_fut.has_exception())
622 <<
"Mock didn't simulate failure so future should be OK";
623 auto result = abort_fut.get();
626 EXPECT_EQ(State::Aborted, m_daq->GetState());
630 SCOPED_TRACE(
"Acquiring");
637 SCOPED_TRACE(
"AbortOcmDaqControllerInStateAborting");
640 ASSERT_EQ(State::Acquiring, m_daq->GetState()) <<
"Test Setup failed";
643 ASSERT_EQ(State::Aborted, m_daq->GetState()) <<
"Test setup failed";
646 auto fut = m_daq->AbortAsync(ErrorPolicy::Strict);
648 EXPECT_THROW(fut.get(), std::runtime_error);
653 SCOPED_TRACE(
"AbortOcmDaqControllerInStateStarting");
655 ASSERT_EQ(State::Acquiring, m_daq->GetState()) <<
"Test Setup failed";
659 EXPECT_EQ(State::Aborted, m_daq->GetState());
664 SCOPED_TRACE(
"NewAbortSupersedesFailedAbort");
667 boost::promise<Result<void>> abort_promise_1;
670 EXPECT_CALL(*m_mock_ops, Abort(ErrorPolicy::Tolerant, _))
671 .WillOnce(Return(ByMove(abort_promise_1.get_future())));
674 auto fut1 = m_daq->AbortAsync(ErrorPolicy::Tolerant);
676 EXPECT_EQ(State::AbortingAcquiring, m_daq->GetState())
677 <<
"Expected state to be in Stopping after requesting to abort";
680 abort_promise_1.set_value({
true});
685 auto result1 = fut1.get();
686 EXPECT_EQ(State::Aborted, result1.state);
688 EXPECT_EQ(State::Aborted, m_daq->GetState());
698 SCOPED_TRACE(
"NewAbortSupersedesSuccessfulAbort");
702 boost::promise<Result<void>> abort_promise_1;
704 boost::promise<Result<void>> abort_promise_2;
707 EXPECT_CALL(*m_mock_ops, Abort(ErrorPolicy::Strict, _))
709 .WillOnce(Return(ByMove(abort_promise_1.get_future())))
710 .WillOnce(Return(ByMove(abort_promise_2.get_future())));
714 auto fut1 = m_daq->AbortAsync(ErrorPolicy::Strict);
715 auto fut2 = m_daq->AbortAsync(ErrorPolicy::Strict);
717 EXPECT_EQ(State::AbortingAcquiring, m_daq->GetState())
718 <<
"Expected state to be in Stopping after requesting to abort";
721 abort_promise_1.set_value({
false});
725 ASSERT_FALSE(fut1.has_exception()) <<
"Future has unexpected exception!";
726 auto result1 = fut1.get();
727 EXPECT_EQ(State::Aborted, result1.state);
729 EXPECT_EQ(State::Aborted, m_daq->GetState());
732 abort_promise_2.set_value({
false});
734 auto result2 = fut2.get();
735 EXPECT_EQ(State::Aborted, result2.state);
737 EXPECT_EQ(State::Aborted, m_daq->GetState());
742 SCOPED_TRACE(
"NewAbortSupersedesFailedAbort");
745 boost::promise<Result<void>> abort_promise_1;
747 boost::promise<Result<void>> abort_promise_2;
750 EXPECT_CALL(*m_mock_ops, Abort(ErrorPolicy::Strict, _))
752 .WillOnce(Return(ByMove(abort_promise_1.get_future())))
753 .WillOnce(Return(ByMove(abort_promise_2.get_future())));
756 auto fut1 = m_daq->AbortAsync(ErrorPolicy::Strict);
758 EXPECT_EQ(State::AbortingAcquiring, m_daq->GetState())
759 <<
"Expected state to be in Stopping after requesting to abort";
762 abort_promise_1.set_exception(
DaqSourceErrors(std::vector<std::exception_ptr>()));
766 ASSERT_TRUE(fut1.has_exception()) <<
"Future has unexpected exception!";
768 EXPECT_EQ(State::AbortingAcquiring, m_daq->GetState());
771 auto fut2 = m_daq->AbortAsync(ErrorPolicy::Strict);
774 abort_promise_2.set_value({
false});
779 auto result2 = fut2.get();
780 EXPECT_EQ(State::Aborted, result2.state);
782 EXPECT_EQ(State::Aborted, m_daq->GetState());
787 SCOPED_TRACE(
"StopOcmDaqControllerSuccessfully");
790 ASSERT_EQ(State::Acquiring, m_daq->GetState()) <<
"Test Setup failed";
797 auto fut = m_daq->AwaitAsync({
"non-existant"}, 0ms);
799 EXPECT_THROW(fut.get(), std::invalid_argument);
804 auto fut = m_daq->AwaitAsync({
"meta-source-1"}, 1ms);
812 SCOPED_TRACE(
"AwaitSingleSourceIsOk");
814 auto fut = m_daq->AwaitAsync({
"meta-source-1"}, 300ms);
815 EXPECT_FALSE(fut.is_ready())
816 <<
"The future shouldn't be ready yet as we haven't started the data acquisition!";
820 EXPECT_FALSE(fut.is_ready())
821 <<
"Wait condition not fulfilled, so future should not be ready yet";
823 EXPECT_CALL(m_mock_ops->mock_abort, Abort()).WillOnce(Return(
true));
827 ASSERT_FALSE(fut.has_exception());
828 auto state = fut.get();
829 EXPECT_TRUE(state == State::Stopping || state == State::Stopped)
830 <<
"Await condition should have been ready once source is stopped (meaning that DAQ is "
831 "Stopping or Stopped, depending on the order of the source)";
835 SCOPED_TRACE(
"AwaitAbortAllMetadataSources");
837 auto fut = m_daq->AwaitAsync({
"meta-source-1",
"meta-source-2"}, 150ms);
838 EXPECT_FALSE(fut.is_ready())
839 <<
"The future shouldn't be ready yet as we haven't started the data acquisition!";
843 EXPECT_FALSE(fut.is_ready());
846 EXPECT_CALL(m_mock_ops->mock_abort, Abort()).WillOnce(Return(
true));
849 EXPECT_TRUE(fut.is_ready());
850 ASSERT_FALSE(fut.has_exception());
851 auto state = fut.get();
852 EXPECT_TRUE(state == State::AbortingAcquiring || state == State::Aborted)
853 <<
"Await condition should have been ready once source is stopped (meaning that DAQ is "
854 "Aborting or Aborted, depending on the order of the source)";
858 SCOPED_TRACE(
"AwaitStopSingleSourceWhenConditionIsFulfilled");
864 auto fut = m_daq->AwaitAsync({
"meta-source-1"}, 150ms);
866 EXPECT_TRUE(fut.is_ready()) <<
"Condition already fulfilled so future should be ready";
867 ASSERT_FALSE(fut.has_exception());
873 EXPECT_CALL(m_mock_formatter,
Format(_))
874 .Times(m_keywords.size())
875 .WillRepeatedly(Invoke(
879 m_daq->UpdateKeywords(m_keywords);
881 expected.emplace_back(std::in_place_type<fits::LiteralKeyword>,
"HIERARCH ESO FOO = 'BAR' /");
882 expected.emplace_back(std::in_place_type<fits::LiteralKeyword>,
884 EXPECT_EQ(expected, m_daq->GetContext().keywords);
890 kws.emplace_back(std::in_place_type<fits::ValueKeyword>,
892 "2019-12-12T04:25:48.0068",
894 kws.emplace_back(std::in_place_type<fits::ValueKeyword>,
"BAR",
"BAZ");
895 EXPECT_CALL(m_mock_formatter,
Format(_))
903 EXPECT_EQ(m_daq->GetContext().keywords.size(), 0u)
904 <<
"No keywords should be added if any failed";
911 EXPECT_THROW(m_daq->UpdateKeywords(m_keywords), std::runtime_error);
915 SCOPED_TRACE(
"StartWillAwait");
925 SCOPED_TRACE(
"AutomaticStop");
932 DpPart prim_part{
"s1",
"/tmp/file.fits"};
933 m_await_promise.set_value({
false, {prim_part}});
937 EXPECT_CALL(*m_mock_ops, Stop(ErrorPolicy::Strict, _))
938 .WillOnce(Return(ByMove(boost::make_ready_future<
Result<DpParts>>(stop_op_reply))));
939 ASSERT_EQ(State::Acquiring, this->m_status->GetState());
940 EXPECT_EQ(0u, m_daq->GetContext().results.size());
944 m_io_ctx, [
this]() ->
bool {
return this->m_status->GetState() == State::Stopped; });
945 EXPECT_FALSE(m_status->HasError());
946 EXPECT_EQ(2u, m_daq->GetContext().results.size()) <<
"One from m_files (metadata), "
948 EXPECT_THAT(m_daq->GetContext().results, Contains(prim_part));
Contains declaration for the AbortAsync operation.
Contains declaration for the AwaitPrimAsync operation.
Started operation was aborted.
Started operation timed out.
Exception thrown to carry reply errors.
Data acquisition sources.
std::vector< MetaSource > const & GetMetadataSources() const noexcept
std::vector< PrimSource > const & GetPrimarySources() const noexcept
Provides information of the location and source of a FITS file or keywords produced by a data acquisi...
Stores data acquisition status and allows subscription to status changes.
Stores data acquisition status and allows subscription to status changes.
static std::shared_ptr< OcmDaqController > Create(boost::asio::io_context &io_context, fits::KeywordFormatter const &kw_formatter, DaqContext context, DaqSources const &sources, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, OcmAsyncOperations operations)
Construct object.
Keeps relevant state to be able to communicate with a primary data source.
Indicates keyword is invalid for some reason.
Represents the literal 80-character FITS keyword record.
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Contains error related declarations for DAQ.
virtual void PreDaqControllerHook()
std::shared_ptr< MetaSource::RrClient > m_meta_rr_client
std::vector< DpPart > m_files
std::shared_ptr< ObservableEventLog > m_event_log
fits::KeywordVector m_keywords
KeywordFormatterMock m_mock_formatter
std::shared_ptr< MetaDaqAsyncMock > m_meta_rr_client2
StatusObserverMock m_observer
std::shared_ptr< MetaDaqAsyncMock > m_meta_rr_client
rad::IoExecutor m_executor
std::shared_ptr< OcmDaqController > m_daq
TestOcmDaqControllerLifeCycle()
std::shared_ptr< PrimSource::RrClient > m_prim_rr_client
boost::asio::io_context m_io_ctx
KeywordFormatterMock m_mock_formatter
void StartDaq()
Executes a successful StartAsync() call.
boost::asio::io_context m_io_ctx
virtual void PreStartAsyncHook()
std::shared_ptr< ObservableStatus > m_status
std::unique_ptr< MockAsyncOperations > m_mock_ops
std::shared_ptr< ObservableStatus > m_status
std::shared_ptr< ObservableEventLog > m_event_log
std::shared_ptr< PrimSource::RrClient > m_prim_rr_client
rad::IoExecutor m_executor
Fixture for daq::DaqController life cycle tests.
void MakeTestProgress(boost::asio::io_context &io_ctx, Future *fut=nullptr)
Test helper that progress the test by executing pending jobs and optionally wait for a future to be r...
Simple observer used for testing.
Developer notes: OcmDaqController use boost::when_all to compose futures.
Contains declarations for the helper functions to initiate operations.
std::tuple< std::unique_ptr< MockAsyncOperations >, daq::OcmAsyncOperations > CreateMockAsyncOperations(rad::IoExecutor &ex)
LiteralKeyword Format(KeywordVariant const &keyword)
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
std::variant< ValueKeyword, EsoKeyword, LiteralKeyword > KeywordVariant
The different variants of keywords that are supported.
R InitiateOperation(rad::IoExecutor &ex, Params &&... params)
Constructs and initiates Op and return the future result.
std::pair< R, std::function< bool()> > InitiateAbortableOperation(rad::IoExecutor &ex, Params &&... params)
Like InitiateOperation but in addition to returning the future it also returns an unspecified object ...
bool HasError(Status const &status) noexcept
ErrorPolicy
Error policy supported by certain operations.
State
Observable states of the data acquisition process.
@ NotStarted
Initial state of data acquisition.
Utility class that represents a result and an error.
Mockup of metadaqif classes.
Contains declaration for for DaqController.
Contains declaration for the StartAsync operation.
Contains declaration for the StopAsync operation.
void PreDaqControllerHook()
boost::promise< Result< DpParts > > m_await_promise
void PreDaqControllerHook()
void PreDaqControllerHook()
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
std::string id
DAQ identfier, possibly provided by user.
std::function< boost::future< Result< void > >(ErrorPolicy, op::AsyncOpParams)> abort
std::function< AwaitReturnType(op::AwaitOpParams)> await_prim
std::function< boost::future< void >(op::AsyncOpParams)> start
std::function< boost::future< Result< DpParts > >(ErrorPolicy, op::AsyncOpParams)> stop
A type safe version of LiteralKeyword that consist of the three basic components of a FITS keyword ke...
A composite async operation that aborts a DAQ.
Parameters required for each async operation.
std::vector< Source< MetaSource > > & meta_sources
Note: Consider vector immutable!a.
std::vector< Source< PrimSource > > & prim_sources
Note: Consider vector immutable!
Await specific parameters that is not provided with AsyncOpParams.
A composite async operation that awaits primary data sources.
A composite async operation that starts DAQ.
TEST_F(TestOcmDaqControllerLifeCycle, ConstructorFailsIfNoSourcesAreProvided)
void SetAllSourceState(op::AsyncOpParams ¶ms, State state)
void SetSourceState(Iterator begin, Iterator end, State state)
EXPECT_EQ(meta.rr_uri, "zpb.rr://meta")
ASSERT_EQ(meta.keyword_rules.size(), 1u)
ASSERT_TRUE(std::holds_alternative< OlasReceiver >(spec.receivers[0]))
Defines shared test utilities.
void MakeTestProgressUntil(boost::asio::io_context &io_ctx, Predicate &&pred, std::chrono::milliseconds timeout=std::chrono::seconds(3))
Executes io_ctx::poll until pred returns true or it times out.