9#include <fmt/ostream.h>
16#include "mock/mockWorkspace.hpp"
19#include <gtest/gtest.h>
20#include <log4cplus/loggingmacros.h>
25using namespace ::testing;
26using namespace std::literals::string_view_literals;
27using namespace std::chrono_literals;
29MATCHER_P(KeywordNameEq, name,
"keyword name equals") {
31 *result_listener <<
"keyword is " << (arg);
32 return kw.name == name;
39 ADD_FAILURE() <<
"Future is not ready";
40 throw std::runtime_error(
"test failure");
45 LOG4CPLUS_ERROR(
"test",
46 "Future contained exception\n"
83 log4cplus::Logger::getInstance(
"test")) {
98 m_daq1 = std::make_shared<DaqControllerMock>();
99 m_daq2 = std::make_shared<DaqControllerMock>();
103 m_dpm_daq1 = std::make_shared<DaqControllerMock>();
104 m_dpm_daq2 = std::make_shared<DaqControllerMock>();
110 .WillByDefault(Invoke(
119 EXPECT_CALL(*
m_daq1, GetId()).Times(AnyNumber()).WillRepeatedly(ReturnRef(
m_daq_id_1));
120 EXPECT_CALL(*
m_daq2, GetId()).Times(AnyNumber()).WillRepeatedly(ReturnRef(
m_daq_id_2));
121 EXPECT_CALL(*
m_daq1, GetStatus()).Times(AnyNumber()).WillRepeatedly(Return(
m_daq1_status));
122 EXPECT_CALL(*
m_daq2, GetStatus()).Times(AnyNumber()).WillRepeatedly(Return(
m_daq2_status));
123 EXPECT_CALL(Const(*
m_daq1), GetStatus())
126 EXPECT_CALL(Const(*
m_daq2), GetStatus())
129 EXPECT_CALL(Const(*
m_daq1), GetContext())
162 EXPECT_CALL(*
m_daq1, StartAsync())
163 .WillOnce(Return(ByMove(boost::make_ready_future<State>(
m_daq1_status->GetState()))));
164 EXPECT_CALL(
m_workspace, StoreList(std::vector<std::string>({
"daq1"})));
188 std::shared_ptr<DaqControllerMock>
m_daq1;
189 std::shared_ptr<DaqControllerMock>
m_daq2;
202 boost::asio::io_context io_ctx;
205 auto event_log = std::make_shared<ObservableEventLog>();
208 std::shared_ptr<DpmClientMock> dpm_client = std::make_shared<DpmClientMock>();
217 log4cplus::Logger::getInstance(
"test"));
218 auto id = std::string(
"id");
219 auto status = std::make_shared<ObservableStatus>(
id,
"fileid");
222 auto daq = std::make_shared<DaqControllerMock>();
225 EXPECT_CALL(*
daq, StartAsync())
226 .WillOnce(Return(ByMove(boost::make_ready_future<State>(status->GetState()))));
227 EXPECT_CALL(*
daq, GetId()).WillRepeatedly(ReturnRef(
id));
228 EXPECT_CALL(*
daq, GetStatus()).WillRepeatedly(Return(status));
229 EXPECT_CALL(Const(*
daq), GetStatus()).WillRepeatedly(Return(status));
230 EXPECT_CALL(Const(*
daq), GetContext()).Times(AnyNumber()).WillRepeatedly(ReturnRef(daq_ctx));
232 EXPECT_CALL(workspace, StoreList(std::vector<std::string>({
"id"})));
233 EXPECT_CALL(workspace, StoreStatus(Field(&
Status::id,
"id")));
234 EXPECT_CALL(workspace, StoreContext(daq_ctx));
237 EXPECT_CALL(o, CallOperator(_));
243 ExpectNoException(f);
248 boost::asio::io_context io_ctx;
251 auto event_log = std::make_shared<ObservableEventLog>();
254 std::shared_ptr<DpmClientMock> dpm_client = std::make_shared<DpmClientMock>();
256 auto id = std::string(
"id");
259 auto status = std::make_shared<ObservableStatus>(
id,
"fileid");
260 auto daq = std::make_shared<DaqControllerMock>();
262 EXPECT_CALL(*
daq, StartAsync())
263 .WillOnce(Return(ByMove(boost::make_ready_future<State>(status->GetState()))));
264 EXPECT_CALL(*
daq, GetId()).Times(AnyNumber()).WillRepeatedly(ReturnRef(
id));
265 EXPECT_CALL(*
daq, GetStatus()).Times(AnyNumber()).WillRepeatedly(Return(status));
266 EXPECT_CALL(Const(*
daq), GetStatus()).Times(AnyNumber()).WillRepeatedly(Return(status));
267 EXPECT_CALL(Const(*
daq), GetContext()).Times(AnyNumber()).WillRepeatedly(ReturnRef(daq_ctx));
269 EXPECT_CALL(workspace, StoreList(std::vector<std::string>({
"id"})));
270 EXPECT_CALL(workspace, StoreStatus(Field(&
Status::id,
"id")));
271 EXPECT_CALL(workspace, StoreContext(daq_ctx));
273 boost::future<Result<Status>> res;
282 log4cplus::Logger::getInstance(
"test"));
287 ASSERT_FALSE(res.is_ready());
298 std::regex regex(
"INSTR\\.\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}");
300 EXPECT_TRUE(std::regex_match(
id, regex))
301 <<
"Instrument ID should be truncated to 5 characters if too long";
304 std::regex regex(
"INS\\.\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}");
305 auto id = m_manager.MakeDaqId();
306 EXPECT_TRUE(std::regex_match(
id, regex));
311 EXPECT_NE(id1, id2) <<
"Adding jitter should have made the ID different";
317 EXPECT_CALL(*m_daq1, StartAsync())
318 .WillOnce(Return(ByMove(boost::make_ready_future<State>(State::Starting))));
319 EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>({
"daq1"})));
320 EXPECT_CALL(m_workspace, StoreStatus(Field(&
Status::id,
"daq1")));
321 EXPECT_CALL(m_workspace, StoreContext(_));
323 m_manager.StartDaqAsync(m_daq_ctx_1);
326 auto f = m_manager.StartDaqAsync(m_daq_ctx_1);
334 EXPECT_CALL(*m_daq1, StartAsync()).Times(0);
337 kws.emplace_back(std::in_place_type<fits::EsoKeyword>,
"OK",
"BAR");
338 kws.emplace_back(std::in_place_type<fits::EsoKeyword>,
"UNKNOWN",
"BAR");
340 m_daq_ctx_1.results.emplace_back(
"keywords", kws);
343 auto f = m_manager.StartDaqAsync(m_daq_ctx_1);
351 EXPECT_CALL(*m_daq1, StartAsync()).Times(0);
354 kws.emplace_back(std::in_place_type<fits::EsoKeyword>,
"OK",
"BAR");
355 kws.emplace_back(std::in_place_type<fits::EsoKeyword>,
"INVALID",
"BAR");
357 m_daq_ctx_1.results.emplace_back(
"keywords", kws);
360 auto f = m_manager.StartDaqAsync(m_daq_ctx_1);
368 EXPECT_CALL(*m_daq1, StartAsync())
369 .WillOnce(Return(ByMove(boost::make_ready_future<State>(State::Starting))));
370 EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>({
"daq1"})));
371 EXPECT_CALL(m_workspace, StoreStatus(Field(&
Status::id,
"daq1")));
372 EXPECT_CALL(m_workspace, StoreContext(_));
375 auto fut = m_manager.StartDaqAsync(m_daq_ctx_1);
390 EXPECT_CALL(*m_daq1, StartAsync())
392 ByMove(boost::make_exceptional_future<State>(std::runtime_error(
"START FAILED")))));
393 boost::promise<daq::Status> abort_reply;
394 EXPECT_CALL(*m_daq1, AbortAsync(ErrorPolicy::Tolerant))
395 .WillOnce(Return(ByMove(abort_reply.get_future())));
396 EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>({
"daq1"})));
397 EXPECT_CALL(m_workspace, StoreStatus(Field(&
Status::id,
"daq1")));
398 EXPECT_CALL(m_workspace, StoreContext(_));
401 auto f = m_manager.StartDaqAsync(m_daq_ctx_1);
403 ASSERT_FALSE(f.is_ready());
406 abort_reply.set_exception(std::logic_error(
"ABORT ALSO FAILED"));
410 EXPECT_THROW(f.get(), std::runtime_error);
416 EXPECT_THROW(fut.get(), std::invalid_argument);
425 .WillOnce(Return(ByMove(
426 boost::make_ready_future<Status>(
Status(
"daq1",
"fileid", State::Stopped, t)))));
430 auto status = ExpectNoException(fut);
436 auto fut = m_manager.AbortDaqAsync(
"nonexistant-id"sv, ErrorPolicy::Tolerant);
438 EXPECT_THROW(fut.get(), std::invalid_argument);
446 reply_status.state = State::AbortingAcquiring;
447 EXPECT_CALL(*m_daq1, AbortAsync(ErrorPolicy::Strict))
448 .WillOnce(Return(ByMove(boost::make_ready_future<Status>(reply_status))));
451 auto fut = m_manager.AbortDaqAsync(
"daq1"sv, ErrorPolicy::Strict);
452 auto result = ExpectNoException(fut);
453 EXPECT_EQ(result.state, State::AbortingAcquiring);
467 m_manager.UpdateKeywords(
"daq1"sv, keywords);
474 EXPECT_THROW(m_manager.UpdateKeywords(
"nonexistant-id"sv, keywords), std::invalid_argument);
482 auto status = m_manager.GetStatus(
"daq1"sv);
490 EXPECT_CALL(m_workspace, LoadArchivedStatus(
"nonexistant")).WillOnce(Return(std::nullopt));
491 EXPECT_THROW(m_manager.GetStatus(
"nonexistant"sv), std::invalid_argument);
496 auto status =
Status(
"id",
"id");
497 EXPECT_CALL(m_workspace, LoadArchivedStatus(
"archived")).WillOnce(Return(status));
498 EXPECT_EQ(m_manager.GetStatus(
"archived"sv), status);
503 auto status =
Status(
"id",
"id");
504 status.state = State::Completed;
505 EXPECT_CALL(m_workspace, LoadArchivedStatus(
"archived")).WillOnce(Return(status));
509 auto [timeout, result] = fut.get();
510 EXPECT_FALSE(timeout);
516 auto status =
Status(
"id",
"id");
517 status.state = State::Collecting;
518 EXPECT_CALL(m_workspace, LoadArchivedStatus(
"archived")).WillOnce(Return(status));
522 EXPECT_THROW(fut.get(), std::invalid_argument);
532 auto [timeout, result] = fut.get();
533 EXPECT_FALSE(timeout);
534 EXPECT_EQ(result, m_daq1->GetStatus()->GetStatus());
541 ASSERT_FALSE(fut.is_ready());
546 auto [timeout, result] = fut.get();
547 EXPECT_FALSE(timeout);
548 EXPECT_EQ(result, m_daq1->GetStatus()->GetStatus());
555 ASSERT_FALSE(fut.is_ready());
558 ASSERT_TRUE(fut.is_ready()) <<
"Timer should have triggered to make the future ready";
559 auto [timeout, result] = fut.get();
560 EXPECT_TRUE(timeout);
561 EXPECT_EQ(result, m_daq1->GetStatus()->GetStatus());
570 m_daq1_status->SetState(State::Stopped);
572 EXPECT_CALL(*m_daq1, GetEventLog()).WillOnce(Return(m_event_log));
573 EXPECT_CALL(*m_daq1, GetContext()).WillOnce(ReturnRef(m_daq_ctx_1));
575 EXPECT_CALL(*m_dpm_daq1, ScheduleMergeAsync())
576 .WillOnce(Return(ByMove(boost::make_ready_future<State>(State::Scheduled))));
578 EXPECT_CALL(m_workspace, StoreList(_)).Times(AnyNumber());
579 EXPECT_CALL(m_workspace, StoreContext(_)).Times(AnyNumber());
586 EXPECT_EQ(m_daq1_status->GetState(), State::NotScheduled);
588 auto daqs = m_manager.GetDaqControllers();
590 EXPECT_EQ(daqs[0].get(), m_dpm_daq1.get());
596 EXPECT_CALL(*m_dpm_client, StartMonitorStatus(m_daq1_status->GetId())).Times(1);
597 m_daq1_status->SetState(State::Merging);
603 EXPECT_CALL(*m_dpm_client, StopMonitorStatus(m_daq_ctx_1.id)).Times(1);
604 EXPECT_CALL(m_workspace, ArchiveDaq(m_daq_ctx_1.id)).Times(1);
606 m_daq1_status->SetState(State::Completed);
609 EXPECT_FALSE(m_manager.HaveDaq(m_daq_ctx_1.id));
619 m_daq_ctx_1.creation_time = std::chrono::system_clock::now();
620 m_daq_ctx_2.creation_time = std::chrono::system_clock::now();
621 m_daq1_status->SetState(State::Acquiring);
622 m_daq2_status->SetState(State::Merging);
624 std::vector<std::string> to_load = {m_daq_ctx_1.id, m_daq_ctx_2.id};
626 EXPECT_CALL(m_workspace, LoadList()).WillOnce(Return(to_load));
628 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_1.id)).WillOnce(Return(m_daq_ctx_1));
629 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_1.id))
630 .WillOnce(Return(m_daq1_status->GetStatus()));
631 EXPECT_CALL(*m_dpm_client, StartMonitorStatus(m_daq_ctx_2.id)).Times(0);
633 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_2.id)).WillOnce(Return(m_daq_ctx_2));
634 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_2.id))
635 .WillOnce(Return(m_daq2_status->GetStatus()));
636 EXPECT_CALL(*m_dpm_client, StartMonitorStatus(m_daq_ctx_2.id));
638 EXPECT_CALL(m_workspace, StoreList(to_load)).Times(1);
641 m_manager.RestoreFromWorkspace();
650 m_daq_ctx_1.creation_time = std::chrono::system_clock::now() - m_params.acquiring_stale_age;
651 m_daq_ctx_2.creation_time = std::chrono::system_clock::now() - m_params.merging_stale_age;
653 std::vector<std::string> to_load = {m_daq_ctx_1.id, m_daq_ctx_2.id};
655 EXPECT_CALL(m_workspace, LoadList()).WillOnce(Return(to_load));
657 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_1.id)).WillOnce(Return(m_daq_ctx_1));
658 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_1.id))
659 .WillOnce(Return(m_daq1_status->GetStatus()));
660 EXPECT_CALL(m_workspace, ArchiveDaq(m_daq_ctx_1.id));
662 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_2.id)).WillOnce(Return(m_daq_ctx_2));
663 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_2.id))
664 .WillOnce(Return(m_daq2_dpm_status->GetStatus()));
665 EXPECT_CALL(m_workspace, ArchiveDaq(m_daq_ctx_2.id));
667 EXPECT_CALL(m_workspace, StoreList(std::vector<std::string>())).Times(1);
670 m_manager.RestoreFromWorkspace();
679 m_daq_ctx_1.creation_time = std::chrono::system_clock::now();
680 m_daq_ctx_2.creation_time = std::chrono::system_clock::now();
682 std::vector<std::string> to_load = {m_daq_ctx_1.id, m_daq_ctx_2.id};
683 std::vector<std::string> to_store = {m_daq_ctx_2.id};
685 EXPECT_CALL(m_workspace, LoadList()).WillOnce(Return(to_load));
687 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_1.id))
688 .WillOnce(Throw(std::runtime_error(
"ouch")));
689 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_1.id))
690 .Times(Between(0, 1))
691 .WillRepeatedly(Return(m_daq1_status->GetStatus()));
692 EXPECT_CALL(m_workspace, ArchiveDaq(m_daq_ctx_1.id));
693 EXPECT_CALL(*m_dpm_client, StartMonitorStatus(m_daq_ctx_1.id)).Times(0);
695 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_2.id)).WillOnce(Return(m_daq_ctx_2));
696 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_2.id))
697 .WillOnce(Return(m_daq2_dpm_status->GetStatus()));
698 EXPECT_CALL(*m_dpm_client, StartMonitorStatus(m_daq_ctx_2.id)).Times(1);
700 EXPECT_CALL(m_workspace, StoreList(to_store)).Times(1);
703 m_manager.RestoreFromWorkspace();
712 m_daq_ctx_1.creation_time = std::chrono::system_clock::now();
713 m_daq_ctx_2.creation_time = std::chrono::system_clock::now();
714 std::vector<std::string> to_load = {m_daq_ctx_1.id, m_daq_ctx_2.id};
715 std::vector<std::string> to_store = {m_daq_ctx_2.id};
717 EXPECT_CALL(m_workspace, LoadList()).WillOnce(Return(to_load));
719 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_1.id))
720 .WillOnce(Throw(std::runtime_error(
"ouch")));
721 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_1.id))
722 .Times(Between(0, 1))
723 .WillRepeatedly(Return(m_daq1_status->GetStatus()));
724 EXPECT_CALL(m_workspace, ArchiveDaq(m_daq_ctx_1.id))
725 .WillOnce(Throw(std::runtime_error(
"ouch")));
726 EXPECT_CALL(*m_dpm_client, StartMonitorStatus(m_daq_ctx_1.id)).Times(0);
728 EXPECT_CALL(m_workspace, LoadContext(m_daq_ctx_2.id)).WillOnce(Return(m_daq_ctx_2));
729 EXPECT_CALL(m_workspace, LoadStatus(m_daq_ctx_2.id))
730 .WillOnce(Return(m_daq2_dpm_status->GetStatus()));
732 EXPECT_CALL(*m_dpm_client, StartMonitorStatus(m_daq_ctx_2.id)).Times(1);
734 EXPECT_CALL(m_workspace, StoreList(to_store)).Times(1);
737 m_manager.RestoreFromWorkspace();
744 EXPECT_CALL(m_workspace, StoreContext(Field(&
DaqContext::id,
"daq1")));
747 m_daq1->signal(m_daq_ctx_1);
Started operation was aborted.
Combined mock and fake of interface to DPM server.
boost::future< Result< Status > > AwaitDaqStateAsync(std::string_view id, State, std::chrono::milliseconds timeout) override
Await DAQ state.
boost::future< State > StartDaqAsync(DaqContext ctx) override
Start DaqController identified by id.
StatusSignal & GetStatusSignal() override
Stores data acquisition status and allows subscription to status changes.
boost::signals2::connection ConnectObserver(Observer o)
Adapter object intended to be used in contexts without direct access to the output-stream object.
Indicates keyword is invalid for some reason.
Represents the literal 80-character FITS keyword record.
Indicates keyword is unknown and cannot be formatted.
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
std::shared_ptr< DaqControllerMock > m_daq1
std::shared_ptr< DaqControllerMock > m_dpm_daq2
rad::IoExecutor m_executor
boost::asio::io_context m_io_ctx
std::shared_ptr< ObservableStatus > m_daq2_status
MockWorkspace m_workspace
auto StartDaq1() -> boost::future< State >
void SetUp() override
Creates manager and adds two data acquisitions.
std::shared_ptr< DaqControllerMock > m_dpm_daq1
DaqControllerFactoryFake m_daq_factory
std::shared_ptr< daq::ObservableEventLog > m_event_log
std::shared_ptr< ObservableStatus > m_daq2_dpm_status
std::shared_ptr< DaqControllerMock > m_daq2
std::shared_ptr< ObservableStatus > m_daq1_dpm_status
std::shared_ptr< ObservableStatus > m_daq1_status
std::shared_ptr< DpmClientMock > m_dpm_client
void MakeTestProgress(boost::asio::io_context &io_ctx, Future *fut=nullptr)
Test helper that progress the test by executing pending jobs and optionally wait for a future to be r...
Simple observer used for testing.
Contains data structure for FITS keywords.
Declaration of daq::Manager
constexpr KeywordNameView GetKeywordName(EsoKeyword const &keyword) noexcept
Get keyword name from keyword.
LiteralKeyword Format(KeywordVariant const &keyword)
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
std::variant< ValueKeyword, EsoKeyword, LiteralKeyword > KeywordVariant
The different variants of keywords that are supported.
BasicKeyword< ValueKeywordTraits > ValueKeyword
Standard FITS value keyword.
BasicKeyword< EsoKeywordTraits > EsoKeyword
ESO hiearchical keyword.
std::string MakeIdCandidate(char const *instrument_id, unsigned jitter=0, std::chrono::system_clock::time_point *out=nullptr)
Creates a DAQ id candidate that may or may not be unique.
void UpdateKeywords(fits::KeywordVector &out, fits::KeywordVector const &in, fits::KeywordFormatter const &fmt)
Updates (adds or replaces) primary HDU keywords.
bool HasError(Status const &status) noexcept
@ Strict
Any error is considered fatal and may lead to the operation being aborted.
@ Completed
Completed DAQ.
@ Acquiring
All data sources have reported data acquisition is in progress.
@ NotStarted
Initial state of data acquisition.
Configurations parameters directly related to manager.
T ExpectNoException(boost::future< T > &f)
NiceMock< KeywordFormatterMock > m_kw_formatter
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
std::string file_id
Data Product FileId as specified by OLAS ICD.
std::string id
DAQ identfier, possibly provided by user.
Factory that creates mock versions.
std::map< std::string, std::shared_ptr< DaqControllerMock > > dpm_mocks
std::map< std::string, std::shared_ptr< DaqControllerMock > > ocm_mocks
Exception indicating the DAQ id was invalid.
Non observable status object that keeps stores status of data acquisition.
std::chrono::time_point< std::chrono::system_clock > TimePoint
MATCHER_P(KeywordNameEq, name, "keyword name equals")
TEST_F(TestManagerImplLifecycle, AddDaqNotifiesObserver)
EXPECT_EQ(meta.rr_uri, "zpb.rr://meta")
ASSERT_EQ(meta.keyword_rules.size(), 1u)
ASSERT_TRUE(std::holds_alternative< OlasReceiver >(spec.receivers[0]))
Defines shared test utilities.