13 #include <gtest/gtest.h>
30 using namespace ::testing;
31 using namespace std::chrono;
44 m_prim_rr_client = std::make_shared<RecCmdsAsyncMock>();
45 m_meta_rr_client = std::make_shared<MetaDaqAsyncMock>();
46 m_ops.start = op::InitiateOperation<op::StartAsync, boost::future<void>,
op::AsyncOpParams>;
48 boost::future<Result<DpParts>>,
52 boost::future<Result<void>>,
56 boost::future<Result<DpParts>>,
70 template <
class Iterator>
73 for (; it != end; ++it) {
102 std::shared_ptr<OcmDaqController>
m_daq;
115 m_files.emplace_back(
"foo",
"bar");
119 m_prim_rr_client = std::make_shared<RecCmdsAsyncMock>();
120 m_meta_rr_client = std::make_shared<MetaDaqAsyncMock>();
121 m_meta_rr_client2 = std::make_shared<NiceMock<MetaDaqAsyncMock>>();
123 m_mock_ops.swap(std::get<std::unique_ptr<MockAsyncOperations>>(tup));
129 MetaSource s1(
"meta-source-1", m_meta_rr_client);
130 MetaSource s2(
"meta-source-2", m_meta_rr_client2);
135 PreDaqControllerHook();
136 m_daq = std::make_shared<daq::OcmDaqController>(m_io_ctx,
141 std::get<OcmAsyncOperations>(tup));
144 ASSERT_EQ(m_status->GetState(), m_daq->GetState());
149 m_meta_rr_client.reset();
150 m_meta_rr_client2.reset();
151 m_prim_rr_client.reset();
165 boost::promise<void> reply_promise;
166 std::optional<op::AsyncOpParams> params;
167 EXPECT_CALL(*m_mock_ops, Start(_))
168 .WillOnce(DoAll(Invoke([&](
auto p) { params.emplace(p); }),
169 Return(ByMove(reply_promise.get_future()))));
173 auto fut = m_daq->StartAsync();
175 EXPECT_FALSE(fut.is_ready());
178 reply_promise.set_value();
192 std::optional<op::AsyncOpParams> params;
193 boost::promise<Result<void>> reply_promise;
195 .WillOnce(DoAll(Invoke([&](
auto policy,
auto p) { params.emplace(p); }),
196 Return(ByMove(reply_promise.get_future()))));
202 <<
"Expected state to be in Stopping after requesting to abort";
205 reply_promise.set_value({
false});
213 ASSERT_FALSE(fut.has_exception()) <<
"Future has unexpected exception!";
214 auto result = fut.get();
216 EXPECT_FALSE(result.error);
221 std::optional<op::AsyncOpParams> params;
222 boost::promise<Result<DpParts>> reply_promise;
225 .WillOnce(DoAll(Invoke([&](
auto policy,
auto p) { params.emplace(p); }),
226 Return(ByMove(reply_promise.get_future()))));
232 <<
"Expected state to be in Stopping after requesting to stop";
239 reply_promise.set_value(reply);
245 ASSERT_FALSE(fut.has_exception()) <<
"Future has unexpected exception!";
246 auto status = fut.get();
248 <<
"Expected state to be Stopped since there were no errors";
249 EXPECT_FALSE(status.error);
276 PrimSource s1(
"prim-source-1", m_prim_rr_client);
277 m_sources.GetPrimarySources() = std::vector<daq::PrimSource>{s1};
280 EXPECT_CALL(*m_mock_ops, AwaitPrim(_))
281 .WillOnce(Return(ByMove(m_await_promise.get_future())));
289 EXPECT_THROW(boost::make_ready_future()
290 .then([](
auto f) -> boost::future<void> {
292 throw std::runtime_error(
"Meow");
294 return boost::make_exceptional_future<void>();
305 std::invalid_argument);
310 m_sources.GetMetadataSources() = {s};
311 m_context.id =
"not-id";
314 std::invalid_argument);
319 m_sources.GetMetadataSources() = {s};
325 std::invalid_argument);
332 std::invalid_argument);
339 std::invalid_argument);
346 std::invalid_argument);
352 m_sources.GetMetadataSources() = {s};
358 m_sources.GetMetadataSources() = {s};
359 boost::future<State> fut;
363 fut =
daq->AwaitAsync({
"source-id"}, 100ms);
364 ASSERT_FALSE(fut.is_ready());
369 <<
"Future should have been cancelled since daq should have been deleted.";
370 EXPECT_TRUE(fut.has_exception());
379 auto status_ptr = m_daq->GetStatus();
380 EXPECT_EQ(status_ptr.get(), m_status.get());
384 SCOPED_TRACE(
"CannotStopStoppedOcmDaqController");
392 EXPECT_TRUE(fut.has_exception());
393 EXPECT_THROW(fut.get(), std::exception);
397 SCOPED_TRACE(
"CannotAbortStoppedOcmDaqController");
405 EXPECT_TRUE(fut.has_exception());
406 EXPECT_THROW(fut.get(), std::exception);
409 TEST_F(
TestState, StartingFailsToSendStartDaqWillAbortAndSetErrorFlagAndStayInStarting) {
411 boost::promise<void> reply_promise;
412 EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(reply_promise.get_future())));
415 auto fut = m_daq->StartAsync();
419 reply_promise.set_exception(std::runtime_error(
"Fake test failure"));
425 EXPECT_TRUE(fut.has_exception()) <<
"Expected future to contain exception";
426 EXPECT_THROW(fut.get(), std::exception) <<
"Expected exception to derive from std::exception";
432 boost::promise<void> reply_promise;
433 EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(reply_promise.get_future())));
435 auto fut = m_daq->StartAsync();
437 EXPECT_FALSE(fut.is_ready());
441 auto fut2 = m_daq->StartAsync();
443 EXPECT_THROW(fut2.get(), std::exception)
444 <<
"Multiple simultaneous start operations are not supported and an exception "
449 reply_promise.set_value();
460 EXPECT_THROW(fut.get(), std::exception)
461 <<
"It should not be possible to stop a data acquisition that has not started";
468 std::optional<op::AsyncOpParams> params;
469 boost::promise<Result<DpParts>> reply_promise;
471 .WillOnce(DoAll(Invoke([&](
auto policy,
auto p) { params.emplace(p); }),
472 Return(ByMove(reply_promise.get_future()))));
475 EXPECT_CALL(m_mock_ops->mock_abort, Abort()).Times(0);
481 <<
"Expected state to be in Stopping after requesting to stop";
485 reply_promise.set_value(reply);
494 ASSERT_FALSE(fut.has_exception()) <<
"Future has unexpected exception!";
495 auto status = fut.get();
497 <<
"Expected state to be Stopped since there were no errors";
498 EXPECT_TRUE(status.error) <<
"Error flag should be set since the reply_promise had an error";
508 <<
"Aborting a NotStarted data acquisition should be ready immediately";
509 EXPECT_FALSE(fut.has_exception()) <<
"Future should not have failed";
511 auto result = fut.get();
513 EXPECT_FALSE(result.error);
522 boost::promise<void> reply_promise;
523 EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(reply_promise.get_future())));
525 auto fut = m_daq->StartAsync();
527 <<
"Setup failed, unexpected state, aborting test";
528 ASSERT_FALSE(fut.is_ready());
533 EXPECT_TRUE(fut.has_exception())
534 <<
"Cannot stop unless DAQ is in State::Acquiring, current state: "
535 << m_daq->GetState();
537 EXPECT_THROW(fut.get(), std::exception)
538 <<
"Cannot stop if data acquisition is `Starting`. An exeption was expected";
542 reply_promise.set_value();
557 SCOPED_TRACE(
"AbortingIsOkWhenStarting");
560 boost::promise<void> start_promise;
561 EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(start_promise.get_future())));
566 auto start_fut = m_daq->StartAsync();
568 EXPECT_FALSE(start_fut.is_ready());
572 boost::promise<Result<void>> abort_promise;
574 .WillOnce(Return(ByMove(abort_promise.get_future())));
583 start_promise.set_value();
588 ASSERT_TRUE(start_fut.is_ready()) <<
"Cannot proceed with test since future is not ready";
589 EXPECT_FALSE(start_fut.has_exception())
590 <<
"Mock did not simulate failure so future should be ok";
595 abort_promise.set_value({
false});
600 ASSERT_TRUE(abort_fut.is_ready()) <<
"Cannot proceed with test since future is not ready";
601 EXPECT_FALSE(abort_fut.has_exception())
602 <<
"Mock didn't simulate failure so future should be OK";
603 auto result = abort_fut.get();
605 EXPECT_FALSE(result.error);
610 SCOPED_TRACE(
"Acquiring");
617 SCOPED_TRACE(
"AbortOcmDaqControllerInStateAborting");
628 EXPECT_THROW(fut.get(), std::runtime_error);
633 SCOPED_TRACE(
"AbortOcmDaqControllerInStateStarting");
644 SCOPED_TRACE(
"NewAbortSupersedesFailedAbort");
647 boost::promise<Result<void>> abort_promise_1;
651 .WillOnce(Return(ByMove(abort_promise_1.get_future())));
657 <<
"Expected state to be in Stopping after requesting to abort";
660 abort_promise_1.set_value({
true});
665 auto result1 = fut1.get();
667 EXPECT_TRUE(result1.error);
678 SCOPED_TRACE(
"NewAbortSupersedesSuccessfulAbort");
682 boost::promise<Result<void>> abort_promise_1;
684 boost::promise<Result<void>> abort_promise_2;
689 .WillOnce(Return(ByMove(abort_promise_1.get_future())))
690 .WillOnce(Return(ByMove(abort_promise_2.get_future())));
698 <<
"Expected state to be in Stopping after requesting to abort";
701 abort_promise_1.set_value({
false});
705 ASSERT_FALSE(fut1.has_exception()) <<
"Future has unexpected exception!";
706 auto result1 = fut1.get();
708 EXPECT_FALSE(result1.error);
712 abort_promise_2.set_value({
false});
714 auto result2 = fut2.get();
716 EXPECT_FALSE(result2.error);
722 SCOPED_TRACE(
"NewAbortSupersedesFailedAbort");
725 boost::promise<Result<void>> abort_promise_1;
727 boost::promise<Result<void>> abort_promise_2;
732 .WillOnce(Return(ByMove(abort_promise_1.get_future())))
733 .WillOnce(Return(ByMove(abort_promise_2.get_future())));
739 <<
"Expected state to be in Stopping after requesting to abort";
742 abort_promise_1.set_exception(
DaqSourceErrors(std::vector<std::exception_ptr>()));
746 ASSERT_TRUE(fut1.has_exception()) <<
"Future has unexpected exception!";
754 abort_promise_2.set_value({
false});
759 auto result2 = fut2.get();
761 EXPECT_FALSE(result2.error);
767 SCOPED_TRACE(
"StopOcmDaqControllerSuccessfully");
777 auto fut = m_daq->AwaitAsync({
"non-existant"}, 0ms);
779 EXPECT_THROW(fut.get(), std::invalid_argument);
784 auto fut = m_daq->AwaitAsync({
"meta-source-1"}, 1ms);
792 SCOPED_TRACE(
"AwaitSingleSourceIsOk");
794 auto fut = m_daq->AwaitAsync({
"meta-source-1"}, 300ms);
795 EXPECT_FALSE(fut.is_ready())
796 <<
"The future shouldn't be ready yet as we haven't started the data acquisition!";
800 EXPECT_FALSE(fut.is_ready())
801 <<
"Wait condition not fulfilled, so future should not be ready yet";
803 EXPECT_CALL(m_mock_ops->mock_abort, Abort()).WillOnce(Return(
true));
807 ASSERT_FALSE(fut.has_exception());
808 auto state = fut.get();
810 <<
"Await condition should have been ready once source is stopped (meaning that DAQ is "
811 "Stopping or Stopped, depending on the order of the source)";
815 SCOPED_TRACE(
"AwaitAbortAllMetadataSources");
817 auto fut = m_daq->AwaitAsync({
"meta-source-1",
"meta-source-2"}, 150ms);
818 EXPECT_FALSE(fut.is_ready())
819 <<
"The future shouldn't be ready yet as we haven't started the data acquisition!";
823 EXPECT_FALSE(fut.is_ready());
826 EXPECT_CALL(m_mock_ops->mock_abort, Abort()).WillOnce(Return(
true));
829 EXPECT_TRUE(fut.is_ready());
830 ASSERT_FALSE(fut.has_exception());
831 auto state = fut.get();
833 <<
"Await condition should have been ready once source is stopped (meaning that DAQ is "
834 "Aborting or Aborted, depending on the order of the source)";
838 SCOPED_TRACE(
"AwaitStopSingleSourceWhenConditionIsFulfilled");
844 auto fut = m_daq->AwaitAsync({
"meta-source-1"}, 150ms);
846 EXPECT_TRUE(fut.is_ready()) <<
"Condition already fulfilled so future should be ready";
847 ASSERT_FALSE(fut.has_exception());
855 m_daq->UpdateKeywords(m_keywords);
857 EXPECT_EQ(m_keywords, m_daq->GetContext().keywords);
864 EXPECT_THROW(m_daq->UpdateKeywords(m_keywords), std::runtime_error);
868 SCOPED_TRACE(
"StartWillAwait");
878 SCOPED_TRACE(
"AutomaticStop");
885 DpPart prim_part{
"s1",
"/tmp/file.fits"};
886 m_await_promise.set_value({
false, {prim_part}});
891 .WillOnce(Return(ByMove(boost::make_ready_future<
Result<DpParts>>(stop_op_reply))));
893 EXPECT_EQ(0u, m_daq->GetContext().results.size());
897 m_io_ctx, [
this]() ->
bool {
return this->m_status->GetState() ==
State::Stopped; });
898 EXPECT_FALSE(m_status->GetError());
899 EXPECT_EQ(2u, m_daq->GetContext().results.size()) <<
"One from m_files (metadata), "
901 EXPECT_THAT(m_daq->GetContext().results, Contains(prim_part));
Contains declaration for the AbortAsync operation.
Contains declaration for the AwaitPrimAsync operation.
Started operation was aborted.
Started operation timed out.
Exception thrown to carry reply errors.
Data acquisition sources.
std::vector< MetaSource > const & GetMetadataSources() const noexcept
std::vector< PrimSource > const & GetPrimarySources() const noexcept
Provides information of the location and source of a FITS file or keywords produced by a data acquisi...
Stores data acquisition status and allows subscription to status changes.
Stores data acquisition status and allows subscription to status changes.
static std::shared_ptr< OcmDaqController > Create(boost::asio::io_context &io_context, DaqContext context, DaqSources const &sources, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, OcmAsyncOperations operations)
Construct object.
Keeps relevant state to be able to communicate with a primary data source.
Contains error related declarations for DAQ.
virtual void PreDaqControllerHook()
std::shared_ptr< MetaSource::RrClient > m_meta_rr_client
std::vector< DpPart > m_files
std::shared_ptr< ObservableEventLog > m_event_log
fits::KeywordVector m_keywords
std::shared_ptr< MetaDaqAsyncMock > m_meta_rr_client2
StatusObserverMock m_observer
std::shared_ptr< MetaDaqAsyncMock > m_meta_rr_client
std::shared_ptr< OcmDaqController > m_daq
TestOcmDaqControllerLifeCycle()
std::shared_ptr< PrimSource::RrClient > m_prim_rr_client
boost::asio::io_context m_io_ctx
void StartDaq()
Executes a successful StartAsync() call.
boost::asio::io_context m_io_ctx
virtual void PreStartAsyncHook()
std::shared_ptr< ObservableStatus > m_status
std::unique_ptr< MockAsyncOperations > m_mock_ops
std::shared_ptr< ObservableStatus > m_status
std::shared_ptr< ObservableEventLog > m_event_log
std::shared_ptr< PrimSource::RrClient > m_prim_rr_client
Fixture for daq::DaqController life cycle tests.
void MakeTestProgress(boost::asio::io_context &io_ctx, Future *fut=nullptr)
Test helper that progress the test by executing pending jobs and optionally wait for a future to be r...
Simple observer used for testing.
Developer notes: OcmDaqController use boost::when_all to compose futures.
Contains declarations for the helper functions to initiate operations.
std::tuple< std::unique_ptr< MockAsyncOperations >, daq::OcmAsyncOperations > CreateMockAsyncOperations()
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
R InitiateOperation(Params &&... params)
Constructs and initiates Op and return the future result.
std::pair< R, std::function< bool()> > InitiateAbortableOperation(Params &&... params)
Like InitiateOperation but in addition to returning the future it also returns an unspecified object ...
TEST_F(TestDpmClient, StartMonitoringSendsRequestAndReceivesReply)
ErrorPolicy
Error policy supported by certain operations.
@ Strict
Any error is considered fatal and may lead to the operation being aborted.
@ Tolerant
Errors that can be ignored with partial completion of a command will be tolerated and is reported as ...
State
Observable states of the data acquisition process.
@ Aborted
Data acquisition has been aborted by user.
@ Stopping
Transitional state between Acquiring and Stopped.
@ Acquiring
All data sources have reported data acquisition is in progress.
@ Stopped
All data sources have reported they have stopped acquiring data.
@ Starting
Transitional state between NotStarted and Acquiring when sources have not begun acquiring data yet.
@ AbortingAcquiring
Transitional state for aborting during acquisition.
@ NotStarted
Initial state of data acquisition.
TEST(TestDaqContext, Files)
Utility class that represents a result and an error.
Mockup of metadaqif classes.
Contains declaration for for DaqController.
Contains declaration for the StartAsync operation.
Contains declaration for the StopAsync operation.
void PreDaqControllerHook()
boost::promise< Result< DpParts > > m_await_promise
void PreDaqControllerHook()
void PreDaqControllerHook()
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
std::string id
DAQ identfier, possibly provided by user.
A type safe version of LiteralKeyword that consist of the three basic components of a FITS keyword ke...
A composite async operation that aborts a DAQ.
Parameters required for each async operation.
std::vector< Source< MetaSource > > & meta_sources
Note: Consider vector immutable!a.
std::vector< Source< PrimSource > > & prim_sources
Note: Consider vector immutable!
Await specific parameters that is not provided with AsyncOpParams.
A composite async operation that awaits primary data sources.
A composite async operation that starts DAQ.
void SetAllSourceState(op::AsyncOpParams ¶ms, State state)
void SetSourceState(Iterator begin, Iterator end, State state)
EXPECT_EQ(meta.rr_uri, "zpb.rr://meta")
ASSERT_EQ(meta.keyword_rules.size(), 1u)
ASSERT_TRUE(std::holds_alternative< OlasReceiver >(spec.receivers[0]))
Defines shared test utilities.
void MakeTestProgressUntil(boost::asio::io_context &io_ctx, Predicate &&pred, std::chrono::milliseconds timeout=std::chrono::seconds(3))
Executes io_ctx::poll until pred returns true or it times out.