9 #include <fmt/ostream.h>
13 #include "mock/mockWorkspace.hpp"
16 #include <gtest/gtest.h>
17 #include <log4cplus/loggingmacros.h>
22 using namespace ::testing;
23 using namespace std::literals::string_view_literals;
24 using namespace std::chrono_literals;
30 ADD_FAILURE() <<
"Future is not ready";
31 throw std::runtime_error(
"test failure");
36 LOG4CPLUS_ERROR(
"test",
37 "Future contained exception\n"
63 , m_executor(m_io_ctx)
65 , m_manager(m_executor,
70 log4cplus::Logger::getInstance(
"test")) {
78 m_daq_ctx_1.id = m_daq_id_1;
79 m_daq_ctx_1.file_id =
"fileid1";
80 m_daq_ctx_2.id = m_daq_id_2;
81 m_daq_ctx_2.file_id =
"fileid2";
83 m_daq1_status = std::make_shared<ObservableStatus>(m_daq_id_1,
"fileid1");
84 m_daq2_status = std::make_shared<ObservableStatus>(m_daq_id_2,
"fileid2");
85 m_daq1 = std::make_shared<DaqControllerMock>();
86 m_daq2 = std::make_shared<DaqControllerMock>();
87 m_daq_factory.ocm_mocks[
"daq1"] = m_daq1;
88 m_daq_factory.ocm_mocks[
"daq2"] = m_daq2;
90 m_dpm_daq1 = std::make_shared<DaqControllerMock>();
91 m_dpm_daq2 = std::make_shared<DaqControllerMock>();
92 m_daq_factory.dpm_mocks[
"daq1"] = m_dpm_daq1;
93 m_daq_factory.dpm_mocks[
"daq2"] = m_dpm_daq2;
95 EXPECT_CALL(*m_daq1, GetId()).Times(AnyNumber()).WillRepeatedly(ReturnRef(m_daq_id_1));
96 EXPECT_CALL(*m_daq2, GetId()).Times(AnyNumber()).WillRepeatedly(ReturnRef(m_daq_id_2));
97 EXPECT_CALL(*m_daq1, GetStatus()).Times(AnyNumber()).WillRepeatedly(Return(m_daq1_status));
98 EXPECT_CALL(*m_daq2, GetStatus()).Times(AnyNumber()).WillRepeatedly(Return(m_daq2_status));
99 EXPECT_CALL(Const(*m_daq1), GetStatus())
101 .WillRepeatedly(Return(m_daq1_status));
102 EXPECT_CALL(Const(*m_daq2), GetStatus())
104 .WillRepeatedly(Return(m_daq2_status));
105 EXPECT_CALL(Const(*m_daq1), GetContext())
107 .WillRepeatedly(ReturnRef(m_daq_ctx_1));
109 m_daq1_dpm_status = std::make_shared<ObservableStatus>(m_daq_id_1,
"fileid1");
111 m_daq2_dpm_status = std::make_shared<ObservableStatus>(m_daq_id_2,
"fileid2");
114 EXPECT_CALL(*m_dpm_daq1, GetId()).WillRepeatedly(ReturnRef(m_daq_id_1));
115 EXPECT_CALL(*m_dpm_daq1, GetStatus())
117 .WillRepeatedly(Return(m_daq1_status));
118 EXPECT_CALL(Const(*m_dpm_daq1), GetStatus())
120 .WillRepeatedly(Return(m_daq1_status));
121 EXPECT_CALL(Const(*m_dpm_daq1), GetContext())
123 .WillRepeatedly(ReturnRef(m_daq_ctx_1));
125 EXPECT_CALL(*m_dpm_daq2, GetId()).WillRepeatedly(ReturnRef(m_daq_id_2));
126 EXPECT_CALL(*m_dpm_daq2, GetStatus())
128 .WillRepeatedly(Return(m_daq2_dpm_status));
129 EXPECT_CALL(Const(*m_dpm_daq2), GetStatus())
131 .WillRepeatedly(Return(m_daq2_dpm_status));
132 EXPECT_CALL(Const(*m_dpm_daq2), GetContext())
134 .WillRepeatedly(ReturnRef(m_daq_ctx_2));
138 EXPECT_CALL(*m_daq1, StartAsync())
139 .WillOnce(Return(ByMove(boost::make_ready_future<State>(m_daq1_status->GetState()))));
140 EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>({
"daq1"})));
141 EXPECT_CALL(m_workspace, StoreStatus(Field(&
Status::id,
"daq1"))).Times(AtLeast(1));
142 EXPECT_CALL(m_workspace, StoreContext(_));
143 return m_manager.StartDaqAsync(m_daq_ctx_1);
151 m_daq_factory.ocm_mocks.clear();
152 m_daq_factory.dpm_mocks.clear();
163 std::shared_ptr<DaqControllerMock>
m_daq1;
164 std::shared_ptr<DaqControllerMock>
m_daq2;
177 boost::asio::io_context io_ctx;
180 auto event_log = std::make_shared<ObservableEventLog>();
185 executor, m_params, workspace, event_log, factory, log4cplus::Logger::getInstance(
"test"));
186 auto id = std::string(
"id");
187 auto status = std::make_shared<ObservableStatus>(
id,
"fileid");
190 auto daq = std::make_shared<DaqControllerMock>();
193 EXPECT_CALL(*
daq, StartAsync())
194 .WillOnce(Return(ByMove(boost::make_ready_future<State>(status->GetState()))));
195 EXPECT_CALL(*
daq, GetId()).WillRepeatedly(ReturnRef(
id));
196 EXPECT_CALL(*
daq, GetStatus()).WillRepeatedly(Return(status));
197 EXPECT_CALL(Const(*
daq), GetStatus()).WillRepeatedly(Return(status));
198 EXPECT_CALL(Const(*
daq), GetContext()).Times(AnyNumber()).WillRepeatedly(ReturnRef(daq_ctx));
200 EXPECT_CALL(workspace, StoreList(std::vector<std::string>({
"id"})));
201 EXPECT_CALL(workspace, StoreStatus(Field(&
Status::id,
"id")));
202 EXPECT_CALL(workspace, StoreContext(daq_ctx));
205 EXPECT_CALL(o, CallOperator(_));
211 ExpectNoException(f);
216 boost::asio::io_context io_ctx;
219 auto event_log = std::make_shared<ObservableEventLog>();
223 auto id = std::string(
"id");
226 auto status = std::make_shared<ObservableStatus>(
id,
"fileid");
227 auto daq = std::make_shared<DaqControllerMock>();
229 EXPECT_CALL(*
daq, StartAsync())
230 .WillOnce(Return(ByMove(boost::make_ready_future<State>(status->GetState()))));
231 EXPECT_CALL(*
daq, GetId()).Times(AnyNumber()).WillRepeatedly(ReturnRef(
id));
232 EXPECT_CALL(*
daq, GetStatus()).Times(AnyNumber()).WillRepeatedly(Return(status));
233 EXPECT_CALL(Const(*
daq), GetStatus()).Times(AnyNumber()).WillRepeatedly(Return(status));
234 EXPECT_CALL(Const(*
daq), GetContext()).Times(AnyNumber()).WillRepeatedly(ReturnRef(daq_ctx));
236 EXPECT_CALL(workspace, StoreList(std::vector<std::string>({
"id"})));
237 EXPECT_CALL(workspace, StoreStatus(Field(&
Status::id,
"id")));
238 EXPECT_CALL(workspace, StoreContext(daq_ctx));
240 boost::future<Result<Status>> res;
247 log4cplus::Logger::getInstance(
"test"));
252 ASSERT_FALSE(res.is_ready());
257 ASSERT_TRUE(res.is_ready());
263 std::regex regex(
"INSTR\\.\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}");
265 EXPECT_TRUE(std::regex_match(
id, regex))
266 <<
"Instrument ID should be truncated to 5 characters if too long";
269 std::regex regex(
"INS\\.\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}");
270 auto id = m_manager.MakeDaqId();
271 EXPECT_TRUE(std::regex_match(
id, regex));
276 EXPECT_NE(id1, id2) <<
"Adding jitter should have made the ID different";
282 EXPECT_CALL(*m_daq1, StartAsync())
283 .WillOnce(Return(ByMove(boost::make_ready_future<State>(
State::Starting))));
284 EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>({
"daq1"})));
285 EXPECT_CALL(m_workspace, StoreStatus(Field(&
Status::id,
"daq1")));
286 EXPECT_CALL(m_workspace, StoreContext(_));
288 m_manager.StartDaqAsync(m_daq_ctx_1);
291 auto f = m_manager.StartDaqAsync(m_daq_ctx_1);
293 ASSERT_TRUE(f.is_ready());
294 EXPECT_THROW(f.get(), std::invalid_argument);
299 EXPECT_CALL(*m_daq1, StartAsync())
300 .WillOnce(Return(ByMove(boost::make_ready_future<State>(
State::Starting))));
301 EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>({
"daq1"})));
302 EXPECT_CALL(m_workspace, StoreStatus(Field(&
Status::id,
"daq1")));
303 EXPECT_CALL(m_workspace, StoreContext(_));
306 auto fut = m_manager.StartDaqAsync(m_daq_ctx_1);
308 ASSERT_TRUE(fut.is_ready());
321 EXPECT_CALL(*m_daq1, StartAsync())
323 ByMove(boost::make_exceptional_future<State>(std::runtime_error(
"START FAILED")))));
324 boost::promise<daq::Status> abort_reply;
326 .WillOnce(Return(ByMove(abort_reply.get_future())));
327 EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>({
"daq1"})));
328 EXPECT_CALL(m_workspace, StoreStatus(Field(&
Status::id,
"daq1")));
329 EXPECT_CALL(m_workspace, StoreContext(_));
332 auto f = m_manager.StartDaqAsync(m_daq_ctx_1);
334 ASSERT_FALSE(f.is_ready());
337 abort_reply.set_exception(std::logic_error(
"ABORT ALSO FAILED"));
340 ASSERT_TRUE(f.is_ready());
341 EXPECT_THROW(f.get(), std::runtime_error);
346 ASSERT_TRUE(fut.is_ready());
347 EXPECT_THROW(fut.get(), std::invalid_argument);
356 .WillOnce(Return(ByMove(
361 auto status = ExpectNoException(fut);
368 ASSERT_TRUE(fut.is_ready());
369 EXPECT_THROW(fut.get(), std::invalid_argument);
379 .WillOnce(Return(ByMove(boost::make_ready_future<Status>(reply_status))));
383 auto result = ExpectNoException(fut);
385 EXPECT_FALSE(result.error);
398 m_manager.UpdateKeywords(
"daq1"sv, keywords);
405 EXPECT_THROW(m_manager.UpdateKeywords(
"nonexistant-id"sv, keywords), std::invalid_argument);
413 auto status = m_manager.GetStatus(
"daq1"sv);
416 EXPECT_FALSE(status.error);
421 EXPECT_THROW(m_manager.GetStatus(
"nonexistant"sv), std::invalid_argument);
430 ASSERT_TRUE(fut.is_ready());
431 auto [timeout, result] = fut.get();
432 EXPECT_FALSE(timeout);
433 EXPECT_EQ(result, m_daq1->GetStatus()->GetStatus());
440 ASSERT_FALSE(fut.is_ready());
444 ASSERT_TRUE(fut.is_ready());
445 auto [timeout, result] = fut.get();
446 EXPECT_FALSE(timeout);
447 EXPECT_EQ(result, m_daq1->GetStatus()->GetStatus());
454 ASSERT_FALSE(fut.is_ready());
457 ASSERT_TRUE(fut.is_ready()) <<
"Timer should have triggered to make the future ready";
458 auto [timeout, result] = fut.get();
459 EXPECT_TRUE(timeout);
460 EXPECT_EQ(result, m_daq1->GetStatus()->GetStatus());
471 EXPECT_CALL(*m_daq1, GetEventLog()).WillOnce(Return(m_event_log));
472 EXPECT_CALL(*m_daq1, GetContext()).WillOnce(ReturnRef(m_daq_ctx_1));
474 EXPECT_CALL(*m_dpm_daq1, ScheduleMergeAsync())
475 .WillOnce(Return(ByMove(boost::make_ready_future<State>(
State::Scheduled))));
477 EXPECT_CALL(m_workspace, StoreList(_)).Times(AnyNumber());
478 EXPECT_CALL(m_workspace, StoreContext(_)).Times(AnyNumber());
487 auto daqs = m_manager.GetDaqControllers();
489 EXPECT_EQ(daqs[0].get(), m_dpm_daq1.get());
498 m_daq_ctx_1.creation_time = std::chrono::system_clock::now();
499 m_daq_ctx_2.creation_time = std::chrono::system_clock::now();
503 std::vector<std::string> to_load = {m_daq_ctx_1.id, m_daq_ctx_2.id};
505 EXPECT_CALL(m_workspace, LoadList()).WillOnce(Return(to_load));
507 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_1.id)).WillOnce(Return(m_daq_ctx_1));
508 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_1.id))
509 .WillOnce(Return(m_daq1_status->GetStatus()));
511 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_2.id)).WillOnce(Return(m_daq_ctx_2));
512 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_2.id))
513 .WillOnce(Return(m_daq2_status->GetStatus()));
515 EXPECT_CALL(m_workspace, StoreList(to_load)).Times(1);
518 m_manager.RestoreFromWorkspace();
527 m_daq_ctx_1.creation_time = std::chrono::system_clock::now() - m_params.acquiring_stale_age;
528 m_daq_ctx_2.creation_time = std::chrono::system_clock::now() - m_params.merging_stale_age;
530 std::vector<std::string> to_load = {m_daq_ctx_1.id, m_daq_ctx_2.id};
532 EXPECT_CALL(m_workspace, LoadList()).WillOnce(Return(to_load));
534 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_1.id)).WillOnce(Return(m_daq_ctx_1));
535 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_1.id))
536 .WillOnce(Return(m_daq1_status->GetStatus()));
537 EXPECT_CALL(m_workspace, ArchiveDaq(m_daq_ctx_1.id));
539 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_2.id)).WillOnce(Return(m_daq_ctx_2));
540 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_2.id))
541 .WillOnce(Return(m_daq2_dpm_status->GetStatus()));
542 EXPECT_CALL(m_workspace, ArchiveDaq(m_daq_ctx_2.id));
544 EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>())).Times(1);
547 m_manager.RestoreFromWorkspace();
556 m_daq_ctx_1.creation_time = std::chrono::system_clock::now();
557 m_daq_ctx_2.creation_time = std::chrono::system_clock::now();
559 std::vector<std::string> to_load = {m_daq_ctx_1.id, m_daq_ctx_2.id};
560 std::vector<std::string> to_store = {m_daq_ctx_2.id};
562 EXPECT_CALL(m_workspace, LoadList()).WillOnce(Return(to_load));
564 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_1.id))
565 .WillOnce(Throw(std::runtime_error(
"ouch")));
566 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_1.id))
567 .Times(Between(0, 1))
568 .WillRepeatedly(Return(m_daq1_status->GetStatus()));
569 EXPECT_CALL(m_workspace, ArchiveDaq(m_daq_ctx_1.id));
571 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_2.id)).WillOnce(Return(m_daq_ctx_2));
572 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_2.id))
573 .WillOnce(Return(m_daq2_dpm_status->GetStatus()));
575 EXPECT_CALL(m_workspace, StoreList(to_store)).Times(1);
578 m_manager.RestoreFromWorkspace();
587 m_daq_ctx_1.creation_time = std::chrono::system_clock::now();
588 m_daq_ctx_2.creation_time = std::chrono::system_clock::now();
589 std::vector<std::string> to_load = {m_daq_ctx_1.id, m_daq_ctx_2.id};
590 std::vector<std::string> to_store = {m_daq_ctx_2.id};
592 EXPECT_CALL(m_workspace, LoadList()).WillOnce(Return(to_load));
594 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_1.id))
595 .WillOnce(Throw(std::runtime_error(
"ouch")));
596 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_1.id))
597 .Times(Between(0, 1))
598 .WillRepeatedly(Return(m_daq1_status->GetStatus()));
599 EXPECT_CALL(m_workspace, ArchiveDaq(m_daq_ctx_1.id))
600 .WillOnce(Throw(std::runtime_error(
"ouch")));
602 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_2.id)).WillOnce(Return(m_daq_ctx_2));
603 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_2.id))
604 .WillOnce(Return(m_daq2_dpm_status->GetStatus()));
606 EXPECT_CALL(m_workspace, StoreList(to_store)).Times(1);
609 m_manager.RestoreFromWorkspace();
616 EXPECT_CALL(m_workspace, StoreContext(Field(&
DaqContext::id,
"daq1")));
619 m_daq1->signal(m_daq_ctx_1);
Started operation was aborted.
boost::future< Result< Status > > AwaitDaqStateAsync(std::string_view id, State, std::chrono::milliseconds timeout) override
Await DAQ state.
boost::future< State > StartDaqAsync(DaqContext ctx) override
Start DaqController identified by id.
StatusSignal & GetStatusSignal() override
Stores data acquisition status and allows subscription to status changes.
boost::signals2::connection ConnectObserver(Observer o)
Adapter object intended to be used in contexts without direct access to the output-stream object.
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
std::shared_ptr< DaqControllerMock > m_daq1
std::shared_ptr< DaqControllerMock > m_dpm_daq2
rad::IoExecutor m_executor
boost::asio::io_context m_io_ctx
std::shared_ptr< ObservableStatus > m_daq2_status
MockWorkspace m_workspace
auto StartDaq1() -> boost::future< State >
void SetUp() override
Creates manager and adds two data acquisitions.
std::shared_ptr< DaqControllerMock > m_dpm_daq1
DaqControllerFactoryFake m_daq_factory
std::shared_ptr< daq::ObservableEventLog > m_event_log
std::shared_ptr< ObservableStatus > m_daq2_dpm_status
std::shared_ptr< DaqControllerMock > m_daq2
std::shared_ptr< ObservableStatus > m_daq1_dpm_status
std::shared_ptr< ObservableStatus > m_daq1_status
void MakeTestProgress(boost::asio::io_context &io_ctx, Future *fut=nullptr)
Test helper that progress the test by executing pending jobs and optionally wait for a future to be r...
Simple observer used for testing.
Declaration of daq::Manager
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
BasicKeyword< ValueKeywordTraits > ValueKeyword
Standard FITS value keyword.
BasicKeyword< EsoKeywordTraits > EsoKeyword
ESO hiearchical keyword.
std::string MakeIdCandidate(char const *instrument_id, unsigned jitter=0, std::chrono::system_clock::time_point *out=nullptr)
Creates a DAQ id candidate that may or may not be unique.
TEST_F(TestDpmDaqController, StatusUpdateInNotScheduledSucceeds)
void UpdateKeywords(DaqContext &ctx, fits::KeywordVector const &keywords)
Updates (adds or replaces) primary HDU keywords.
@ Strict
Any error is considered fatal and may lead to the operation being aborted.
@ Tolerant
Errors that can be ignored with partial completion of a command will be tolerated and is reported as ...
@ NotScheduled
Before daq is acknowledged by dpm it remains in NotScheduled.
@ Scheduled
daq is acknowledged by dpm and is scheduled for merging (i.e.
@ Merging
DAQ is being merged.
@ Transferring
Input files are being transferred.
@ Acquiring
All data sources have reported data acquisition is in progress.
@ Stopped
All data sources have reported they have stopped acquiring data.
@ Starting
Transitional state between NotStarted and Acquiring when sources have not begun acquiring data yet.
@ AbortingAcquiring
Transitional state for aborting during acquisition.
@ NotStarted
Initial state of data acquisition.
Configurations parameters directly related to manager.
T ExpectNoException(boost::future< T > &f)
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
std::string file_id
Data Product FileId as specified by OLAS ICD.
std::string id
DAQ identfier, possibly provided by user.
Factory that creates mock versions.
std::map< std::string, std::shared_ptr< DaqControllerMock > > ocm_mocks
Non observable status object that keeps stores status of data acquisition.
std::chrono::time_point< std::chrono::steady_clock > TimePoint
EXPECT_EQ(meta.rr_uri, "zpb.rr://meta")
ASSERT_EQ(meta.keyword_rules.size(), 1u)
Defines shared test utilities.