9 #include <fmt/ostream.h>
14 #include "mock/mockWorkspace.hpp"
17 #include <gtest/gtest.h>
18 #include <log4cplus/loggingmacros.h>
23 using namespace ::testing;
24 using namespace std::literals::string_view_literals;
25 using namespace std::chrono_literals;
31 ADD_FAILURE() <<
"Future is not ready";
32 throw std::runtime_error(
"test failure");
37 LOG4CPLUS_ERROR(
"test",
38 "Future contained exception\n"
64 , m_executor(m_io_ctx)
67 , m_manager(m_executor,
73 log4cplus::Logger::getInstance(
"test")) {
81 m_daq_ctx_1.id = m_daq_id_1;
82 m_daq_ctx_1.file_id =
"fileid1";
83 m_daq_ctx_2.id = m_daq_id_2;
84 m_daq_ctx_2.file_id =
"fileid2";
86 m_daq1_status = std::make_shared<ObservableStatus>(m_daq_id_1,
"fileid1");
87 m_daq2_status = std::make_shared<ObservableStatus>(m_daq_id_2,
"fileid2");
88 m_daq1 = std::make_shared<DaqControllerMock>();
89 m_daq2 = std::make_shared<DaqControllerMock>();
90 m_daq_factory.ocm_mocks[
"daq1"] = m_daq1;
91 m_daq_factory.ocm_mocks[
"daq2"] = m_daq2;
93 m_dpm_daq1 = std::make_shared<DaqControllerMock>();
94 m_dpm_daq2 = std::make_shared<DaqControllerMock>();
95 m_daq_factory.dpm_mocks[
"daq1"] = m_dpm_daq1;
96 m_daq_factory.dpm_mocks[
"daq2"] = m_dpm_daq2;
98 EXPECT_CALL(*m_daq1, GetId()).Times(AnyNumber()).WillRepeatedly(ReturnRef(m_daq_id_1));
99 EXPECT_CALL(*m_daq2, GetId()).Times(AnyNumber()).WillRepeatedly(ReturnRef(m_daq_id_2));
100 EXPECT_CALL(*m_daq1, GetStatus()).Times(AnyNumber()).WillRepeatedly(Return(m_daq1_status));
101 EXPECT_CALL(*m_daq2, GetStatus()).Times(AnyNumber()).WillRepeatedly(Return(m_daq2_status));
102 EXPECT_CALL(Const(*m_daq1), GetStatus())
104 .WillRepeatedly(Return(m_daq1_status));
105 EXPECT_CALL(Const(*m_daq2), GetStatus())
107 .WillRepeatedly(Return(m_daq2_status));
108 EXPECT_CALL(Const(*m_daq1), GetContext())
110 .WillRepeatedly(ReturnRef(m_daq_ctx_1));
112 m_daq1_dpm_status = std::make_shared<ObservableStatus>(m_daq_id_1,
"fileid1");
114 m_daq2_dpm_status = std::make_shared<ObservableStatus>(m_daq_id_2,
"fileid2");
117 EXPECT_CALL(*m_dpm_daq1, GetId()).WillRepeatedly(ReturnRef(m_daq_id_1));
118 EXPECT_CALL(*m_dpm_daq1, GetStatus())
120 .WillRepeatedly(Return(m_daq1_status));
121 EXPECT_CALL(Const(*m_dpm_daq1), GetStatus())
123 .WillRepeatedly(Return(m_daq1_status));
124 EXPECT_CALL(Const(*m_dpm_daq1), GetContext())
126 .WillRepeatedly(ReturnRef(m_daq_ctx_1));
128 EXPECT_CALL(*m_dpm_daq2, GetId()).WillRepeatedly(ReturnRef(m_daq_id_2));
129 EXPECT_CALL(*m_dpm_daq2, GetStatus())
131 .WillRepeatedly(Return(m_daq2_dpm_status));
132 EXPECT_CALL(Const(*m_dpm_daq2), GetStatus())
134 .WillRepeatedly(Return(m_daq2_dpm_status));
135 EXPECT_CALL(Const(*m_dpm_daq2), GetContext())
137 .WillRepeatedly(ReturnRef(m_daq_ctx_2));
141 EXPECT_CALL(*m_daq1, StartAsync())
142 .WillOnce(Return(ByMove(boost::make_ready_future<State>(m_daq1_status->GetState()))));
143 EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>({
"daq1"})));
144 EXPECT_CALL(m_workspace, StoreStatus(Field(&
Status::id,
"daq1"))).Times(AtLeast(1));
145 EXPECT_CALL(m_workspace, StoreContext(_));
146 return m_manager.StartDaqAsync(m_daq_ctx_1);
154 m_daq_factory.ocm_mocks.clear();
155 m_daq_factory.dpm_mocks.clear();
167 std::shared_ptr<DaqControllerMock>
m_daq1;
168 std::shared_ptr<DaqControllerMock>
m_daq2;
181 boost::asio::io_context io_ctx;
184 auto event_log = std::make_shared<ObservableEventLog>();
187 std::shared_ptr<DpmClientMock> dpm_client = std::make_shared<DpmClientMock>();
195 log4cplus::Logger::getInstance(
"test"));
196 auto id = std::string(
"id");
197 auto status = std::make_shared<ObservableStatus>(
id,
"fileid");
200 auto daq = std::make_shared<DaqControllerMock>();
203 EXPECT_CALL(*
daq, StartAsync())
204 .WillOnce(Return(ByMove(boost::make_ready_future<State>(status->GetState()))));
205 EXPECT_CALL(*
daq, GetId()).WillRepeatedly(ReturnRef(
id));
206 EXPECT_CALL(*
daq, GetStatus()).WillRepeatedly(Return(status));
207 EXPECT_CALL(Const(*
daq), GetStatus()).WillRepeatedly(Return(status));
208 EXPECT_CALL(Const(*
daq), GetContext()).Times(AnyNumber()).WillRepeatedly(ReturnRef(daq_ctx));
210 EXPECT_CALL(workspace, StoreList(std::vector<std::string>({
"id"})));
211 EXPECT_CALL(workspace, StoreStatus(Field(&
Status::id,
"id")));
212 EXPECT_CALL(workspace, StoreContext(daq_ctx));
215 EXPECT_CALL(o, CallOperator(_));
221 ExpectNoException(f);
226 boost::asio::io_context io_ctx;
229 auto event_log = std::make_shared<ObservableEventLog>();
232 std::shared_ptr<DpmClientMock> dpm_client = std::make_shared<DpmClientMock>();
234 auto id = std::string(
"id");
237 auto status = std::make_shared<ObservableStatus>(
id,
"fileid");
238 auto daq = std::make_shared<DaqControllerMock>();
240 EXPECT_CALL(*
daq, StartAsync())
241 .WillOnce(Return(ByMove(boost::make_ready_future<State>(status->GetState()))));
242 EXPECT_CALL(*
daq, GetId()).Times(AnyNumber()).WillRepeatedly(ReturnRef(
id));
243 EXPECT_CALL(*
daq, GetStatus()).Times(AnyNumber()).WillRepeatedly(Return(status));
244 EXPECT_CALL(Const(*
daq), GetStatus()).Times(AnyNumber()).WillRepeatedly(Return(status));
245 EXPECT_CALL(Const(*
daq), GetContext()).Times(AnyNumber()).WillRepeatedly(ReturnRef(daq_ctx));
247 EXPECT_CALL(workspace, StoreList(std::vector<std::string>({
"id"})));
248 EXPECT_CALL(workspace, StoreStatus(Field(&
Status::id,
"id")));
249 EXPECT_CALL(workspace, StoreContext(daq_ctx));
251 boost::future<Result<Status>> res;
259 log4cplus::Logger::getInstance(
"test"));
264 ASSERT_FALSE(res.is_ready());
275 std::regex regex(
"INSTR\\.\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}");
277 EXPECT_TRUE(std::regex_match(
id, regex))
278 <<
"Instrument ID should be truncated to 5 characters if too long";
281 std::regex regex(
"INS\\.\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}");
282 auto id = m_manager.MakeDaqId();
283 EXPECT_TRUE(std::regex_match(
id, regex));
288 EXPECT_NE(id1, id2) <<
"Adding jitter should have made the ID different";
294 EXPECT_CALL(*m_daq1, StartAsync())
295 .WillOnce(Return(ByMove(boost::make_ready_future<State>(
State::Starting))));
296 EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>({
"daq1"})));
297 EXPECT_CALL(m_workspace, StoreStatus(Field(&
Status::id,
"daq1")));
298 EXPECT_CALL(m_workspace, StoreContext(_));
300 m_manager.StartDaqAsync(m_daq_ctx_1);
303 auto f = m_manager.StartDaqAsync(m_daq_ctx_1);
306 EXPECT_THROW(f.get(), std::invalid_argument);
311 EXPECT_CALL(*m_daq1, StartAsync())
312 .WillOnce(Return(ByMove(boost::make_ready_future<State>(
State::Starting))));
313 EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>({
"daq1"})));
314 EXPECT_CALL(m_workspace, StoreStatus(Field(&
Status::id,
"daq1")));
315 EXPECT_CALL(m_workspace, StoreContext(_));
318 auto fut = m_manager.StartDaqAsync(m_daq_ctx_1);
333 EXPECT_CALL(*m_daq1, StartAsync())
335 ByMove(boost::make_exceptional_future<State>(std::runtime_error(
"START FAILED")))));
336 boost::promise<daq::Status> abort_reply;
338 .WillOnce(Return(ByMove(abort_reply.get_future())));
339 EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>({
"daq1"})));
340 EXPECT_CALL(m_workspace, StoreStatus(Field(&
Status::id,
"daq1")));
341 EXPECT_CALL(m_workspace, StoreContext(_));
344 auto f = m_manager.StartDaqAsync(m_daq_ctx_1);
346 ASSERT_FALSE(f.is_ready());
349 abort_reply.set_exception(std::logic_error(
"ABORT ALSO FAILED"));
353 EXPECT_THROW(f.get(), std::runtime_error);
359 EXPECT_THROW(fut.get(), std::invalid_argument);
368 .WillOnce(Return(ByMove(
373 auto status = ExpectNoException(fut);
381 EXPECT_THROW(fut.get(), std::invalid_argument);
391 .WillOnce(Return(ByMove(boost::make_ready_future<Status>(reply_status))));
395 auto result = ExpectNoException(fut);
397 EXPECT_FALSE(result.error);
410 m_manager.UpdateKeywords(
"daq1"sv, keywords);
417 EXPECT_THROW(m_manager.UpdateKeywords(
"nonexistant-id"sv, keywords), std::invalid_argument);
425 auto status = m_manager.GetStatus(
"daq1"sv);
428 EXPECT_FALSE(status.error);
433 EXPECT_CALL(m_workspace, LoadArchivedStatus(
"nonexistant")).WillOnce(Return(std::nullopt));
434 EXPECT_THROW(m_manager.GetStatus(
"nonexistant"sv), std::invalid_argument);
439 auto status =
Status(
"id",
"id");
440 EXPECT_CALL(m_workspace, LoadArchivedStatus(
"archived")).WillOnce(Return(status));
441 EXPECT_EQ(m_manager.GetStatus(
"archived"sv), status);
446 auto status =
Status(
"id",
"id");
448 EXPECT_CALL(m_workspace, LoadArchivedStatus(
"archived")).WillOnce(Return(status));
452 auto [timeout, result] = fut.get();
453 EXPECT_FALSE(timeout);
459 auto status =
Status(
"id",
"id");
461 EXPECT_CALL(m_workspace, LoadArchivedStatus(
"archived")).WillOnce(Return(status));
465 EXPECT_THROW(fut.get(), std::invalid_argument);
475 auto [timeout, result] = fut.get();
476 EXPECT_FALSE(timeout);
477 EXPECT_EQ(result, m_daq1->GetStatus()->GetStatus());
484 ASSERT_FALSE(fut.is_ready());
489 auto [timeout, result] = fut.get();
490 EXPECT_FALSE(timeout);
491 EXPECT_EQ(result, m_daq1->GetStatus()->GetStatus());
498 ASSERT_FALSE(fut.is_ready());
501 ASSERT_TRUE(fut.is_ready()) <<
"Timer should have triggered to make the future ready";
502 auto [timeout, result] = fut.get();
503 EXPECT_TRUE(timeout);
504 EXPECT_EQ(result, m_daq1->GetStatus()->GetStatus());
515 EXPECT_CALL(*m_daq1, GetEventLog()).WillOnce(Return(m_event_log));
516 EXPECT_CALL(*m_daq1, GetContext()).WillOnce(ReturnRef(m_daq_ctx_1));
518 EXPECT_CALL(*m_dpm_daq1, ScheduleMergeAsync())
519 .WillOnce(Return(ByMove(boost::make_ready_future<State>(
State::Scheduled))));
521 EXPECT_CALL(m_workspace, StoreList(_)).Times(AnyNumber());
522 EXPECT_CALL(m_workspace, StoreContext(_)).Times(AnyNumber());
531 auto daqs = m_manager.GetDaqControllers();
533 EXPECT_EQ(daqs[0].get(), m_dpm_daq1.get());
539 EXPECT_CALL(*m_dpm_client, StartMonitorStatus(m_daq1_status->GetId())).Times(1);
546 EXPECT_CALL(*m_dpm_client, StopMonitorStatus(m_daq_ctx_1.id)).Times(1);
547 EXPECT_CALL(m_workspace, ArchiveDaq(m_daq_ctx_1.id)).Times(1);
552 EXPECT_FALSE(m_manager.HaveDaq(m_daq_ctx_1.id));
562 m_daq_ctx_1.creation_time = std::chrono::system_clock::now();
563 m_daq_ctx_2.creation_time = std::chrono::system_clock::now();
567 std::vector<std::string> to_load = {m_daq_ctx_1.id, m_daq_ctx_2.id};
569 EXPECT_CALL(m_workspace, LoadList()).WillOnce(Return(to_load));
571 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_1.id)).WillOnce(Return(m_daq_ctx_1));
572 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_1.id))
573 .WillOnce(Return(m_daq1_status->GetStatus()));
574 EXPECT_CALL(*m_dpm_client, StartMonitorStatus(m_daq_ctx_2.id)).Times(0);
576 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_2.id)).WillOnce(Return(m_daq_ctx_2));
577 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_2.id))
578 .WillOnce(Return(m_daq2_status->GetStatus()));
579 EXPECT_CALL(*m_dpm_client, StartMonitorStatus(m_daq_ctx_2.id));
581 EXPECT_CALL(m_workspace, StoreList(to_load)).Times(1);
584 m_manager.RestoreFromWorkspace();
593 m_daq_ctx_1.creation_time = std::chrono::system_clock::now() - m_params.acquiring_stale_age;
594 m_daq_ctx_2.creation_time = std::chrono::system_clock::now() - m_params.merging_stale_age;
596 std::vector<std::string> to_load = {m_daq_ctx_1.id, m_daq_ctx_2.id};
598 EXPECT_CALL(m_workspace, LoadList()).WillOnce(Return(to_load));
600 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_1.id)).WillOnce(Return(m_daq_ctx_1));
601 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_1.id))
602 .WillOnce(Return(m_daq1_status->GetStatus()));
603 EXPECT_CALL(m_workspace, ArchiveDaq(m_daq_ctx_1.id));
605 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_2.id)).WillOnce(Return(m_daq_ctx_2));
606 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_2.id))
607 .WillOnce(Return(m_daq2_dpm_status->GetStatus()));
608 EXPECT_CALL(m_workspace, ArchiveDaq(m_daq_ctx_2.id));
610 EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>())).Times(1);
613 m_manager.RestoreFromWorkspace();
622 m_daq_ctx_1.creation_time = std::chrono::system_clock::now();
623 m_daq_ctx_2.creation_time = std::chrono::system_clock::now();
625 std::vector<std::string> to_load = {m_daq_ctx_1.id, m_daq_ctx_2.id};
626 std::vector<std::string> to_store = {m_daq_ctx_2.id};
628 EXPECT_CALL(m_workspace, LoadList()).WillOnce(Return(to_load));
630 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_1.id))
631 .WillOnce(Throw(std::runtime_error(
"ouch")));
632 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_1.id))
633 .Times(Between(0, 1))
634 .WillRepeatedly(Return(m_daq1_status->GetStatus()));
635 EXPECT_CALL(m_workspace, ArchiveDaq(m_daq_ctx_1.id));
636 EXPECT_CALL(*m_dpm_client, StartMonitorStatus(m_daq_ctx_1.id)).Times(0);
638 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_2.id)).WillOnce(Return(m_daq_ctx_2));
639 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_2.id))
640 .WillOnce(Return(m_daq2_dpm_status->GetStatus()));
641 EXPECT_CALL(*m_dpm_client, StartMonitorStatus(m_daq_ctx_2.id)).Times(1);
643 EXPECT_CALL(m_workspace, StoreList(to_store)).Times(1);
646 m_manager.RestoreFromWorkspace();
655 m_daq_ctx_1.creation_time = std::chrono::system_clock::now();
656 m_daq_ctx_2.creation_time = std::chrono::system_clock::now();
657 std::vector<std::string> to_load = {m_daq_ctx_1.id, m_daq_ctx_2.id};
658 std::vector<std::string> to_store = {m_daq_ctx_2.id};
660 EXPECT_CALL(m_workspace, LoadList()).WillOnce(Return(to_load));
662 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_1.id))
663 .WillOnce(Throw(std::runtime_error(
"ouch")));
664 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_1.id))
665 .Times(Between(0, 1))
666 .WillRepeatedly(Return(m_daq1_status->GetStatus()));
667 EXPECT_CALL(m_workspace, ArchiveDaq(m_daq_ctx_1.id))
668 .WillOnce(Throw(std::runtime_error(
"ouch")));
669 EXPECT_CALL(*m_dpm_client, StartMonitorStatus(m_daq_ctx_1.id)).Times(0);
671 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_2.id)).WillOnce(Return(m_daq_ctx_2));
672 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_2.id))
673 .WillOnce(Return(m_daq2_dpm_status->GetStatus()));
675 EXPECT_CALL(*m_dpm_client, StartMonitorStatus(m_daq_ctx_2.id)).Times(1);
677 EXPECT_CALL(m_workspace, StoreList(to_store)).Times(1);
680 m_manager.RestoreFromWorkspace();
687 EXPECT_CALL(m_workspace, StoreContext(Field(&
DaqContext::id,
"daq1")));
690 m_daq1->signal(m_daq_ctx_1);
Started operation was aborted.
Combined mock and fake of interface to DPM server.
boost::future< Result< Status > > AwaitDaqStateAsync(std::string_view id, State, std::chrono::milliseconds timeout) override
Await DAQ state.
boost::future< State > StartDaqAsync(DaqContext ctx) override
Start DaqController identified by id.
StatusSignal & GetStatusSignal() override
Stores data acquisition status and allows subscription to status changes.
boost::signals2::connection ConnectObserver(Observer o)
Adapter object intended to be used in contexts without direct access to the output-stream object.
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
std::shared_ptr< DaqControllerMock > m_daq1
std::shared_ptr< DaqControllerMock > m_dpm_daq2
rad::IoExecutor m_executor
boost::asio::io_context m_io_ctx
std::shared_ptr< ObservableStatus > m_daq2_status
MockWorkspace m_workspace
auto StartDaq1() -> boost::future< State >
void SetUp() override
Creates manager and adds two data acquisitions.
std::shared_ptr< DaqControllerMock > m_dpm_daq1
DaqControllerFactoryFake m_daq_factory
std::shared_ptr< daq::ObservableEventLog > m_event_log
std::shared_ptr< ObservableStatus > m_daq2_dpm_status
std::shared_ptr< DaqControllerMock > m_daq2
std::shared_ptr< ObservableStatus > m_daq1_dpm_status
std::shared_ptr< ObservableStatus > m_daq1_status
std::shared_ptr< DpmClientMock > m_dpm_client
void MakeTestProgress(boost::asio::io_context &io_ctx, Future *fut=nullptr)
Test helper that progress the test by executing pending jobs and optionally wait for a future to be r...
Simple observer used for testing.
Declaration of daq::Manager
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
BasicKeyword< ValueKeywordTraits > ValueKeyword
Standard FITS value keyword.
BasicKeyword< EsoKeywordTraits > EsoKeyword
ESO hiearchical keyword.
TEST_F(TestDpmClient, StartMonitoringSendsRequestAndReceivesReply)
std::string MakeIdCandidate(char const *instrument_id, unsigned jitter=0, std::chrono::system_clock::time_point *out=nullptr)
Creates a DAQ id candidate that may or may not be unique.
void UpdateKeywords(DaqContext &ctx, fits::KeywordVector const &keywords)
Updates (adds or replaces) primary HDU keywords.
@ Strict
Any error is considered fatal and may lead to the operation being aborted.
@ Tolerant
Errors that can be ignored with partial completion of a command will be tolerated and is reported as ...
@ Completed
Completed DAQ.
@ NotScheduled
Before daq is acknowledged by dpm it remains in NotScheduled.
@ Scheduled
daq is acknowledged by dpm and is scheduled for merging (i.e.
@ Collecting
Input files are being collected.
@ Merging
DAQ is being merged.
@ Acquiring
All data sources have reported data acquisition is in progress.
@ Stopped
All data sources have reported they have stopped acquiring data.
@ Starting
Transitional state between NotStarted and Acquiring when sources have not begun acquiring data yet.
@ AbortingAcquiring
Transitional state for aborting during acquisition.
@ NotStarted
Initial state of data acquisition.
Configurations parameters directly related to manager.
T ExpectNoException(boost::future< T > &f)
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
std::string file_id
Data Product FileId as specified by OLAS ICD.
std::string id
DAQ identfier, possibly provided by user.
Factory that creates mock versions.
std::map< std::string, std::shared_ptr< DaqControllerMock > > ocm_mocks
Non observable status object that keeps stores status of data acquisition.
std::chrono::time_point< std::chrono::system_clock > TimePoint
EXPECT_EQ(meta.rr_uri, "zpb.rr://meta")
ASSERT_EQ(meta.keyword_rules.size(), 1u)
ASSERT_TRUE(std::holds_alternative< OlasReceiver >(spec.receivers[0]))
Defines shared test utilities.