12#include <fmt/format.h>
13#include <log4cplus/loggingmacros.h>
18#include "mock/mockWorkspace.hpp"
20#include <gmock/gmock.h>
21#include <gtest/gtest.h>
25using namespace ::testing;
29 std::unique_ptr<RsyncAsyncProcessIf>
operator()(boost::asio::io_context&,
34 auto rsync = std::make_unique<MockRsyncAsyncProcess>();
40 procs.push_back(rsync.get());
44 EXPECT_CALL(mock, GetArguments()).WillRepeatedly(ReturnRef(
rsync_dummy_args));
45 EXPECT_CALL(mock, GetPid()).WillRepeatedly(Return(1234));
50 std::vector<MockRsyncAsyncProcess*>
procs;
58 std::unique_ptr<AsyncProcessIf>
operator()(boost::asio::io_context&, std::vector<std::string>) {
59 auto rsync = std::make_unique<MockRsyncAsyncProcess>();
63 procs.push_back(rsync.get());
69 std::vector<MockAsyncProcess*>
procs;
125 std::filesystem::path sources_root =
"sources";
132 if (std::holds_alternative<json::FitsFileSource>(s)) {
133 auto const& source = std::get<json::FitsFileSource>(s);
138 auto ws = std::make_unique<daq::dpm::MockDaqWorkspace>();
142 EXPECT_CALL(*ws, LoadSpecification()).WillRepeatedly(Return(
m_dpspec));
143 EXPECT_CALL(*ws, GetSourcesPath()).WillRepeatedly(Return(std::filesystem::path(
"sources")));
144 EXPECT_CALL(*ws, GetResultPath()).WillRepeatedly(Return(std::filesystem::path(
"")));
145 EXPECT_CALL(*ws, GetSourceLookupPath())
146 .WillRepeatedly(Return(std::filesystem::path(
"sources.json")));
147 EXPECT_CALL(*ws, GetPath())
149 .WillRepeatedly(Return(std::filesystem::path(
"")));
150 EXPECT_CALL(*ws, GetLogsPath())
152 .WillRepeatedly(Return(std::filesystem::path(
"logs")));
170 template <
class Pred>
175 if (
m_io_ctx.poll_one() == 0 || p()) {
178 }
catch (std::exception
const& ex) {
179 LOG4CPLUS_ERROR(
"test",
"Exception bubbled up through ASIO: " << ex.what());
191 std::unique_ptr<DaqControllerImpl>
m_daq;
198using namespace ::testing;
203 EXPECT_CALL(*m_ws_mock_ptr, StoreSourceLookup(_));
214 m_dpspec.target.source = std::nullopt;
215 m_dpspec.sources.clear();
232 m_resources.net_receive.SetLimit(0);
235 EXPECT_EQ(m_resolver.GetMapping().size(), 2u)
236 <<
"Setup should have added target and one extra source";
239 Exists(std::filesystem::path(
"sources/target.fits"), std::filesystem::file_type::regular))
241 .WillRepeatedly(Return(
false));
244 Exists(std::filesystem::path(
"sources/source.fits"), std::filesystem::file_type::regular))
246 .WillRepeatedly(Return(
false));
254 Exists(std::filesystem::path(
"sources/source.fits"), std::filesystem::file_type::regular))
255 .WillOnce(Return(
true));
258 Exists(std::filesystem::path(
"sources/target.fits"), std::filesystem::file_type::regular))
259 .WillOnce(Return(
true));
263 for (
auto& proc : m_rsync_factory.procs) {
264 proc->promise.set_value(0);
277 m_resources.net_receive.SetLimit(0);
280 EXPECT_EQ(m_resolver.GetMapping().size(), 2u)
281 <<
"Setup should have added target and one extra source";
284 Exists(std::filesystem::path(
"sources/target.fits"), std::filesystem::file_type::regular))
285 .WillOnce(Return(
false));
288 Exists(std::filesystem::path(
"sources/source.fits"), std::filesystem::file_type::regular))
289 .WillOnce(Return(
false));
303 for (
auto& proc : m_rsync_factory.procs) {
304 proc->promise.set_value(
error++);
310 EXPECT_TRUE(m_daq->IsStopped());
323 m_initial_status.alerts.push_back(
328 EXPECT_CALL(*m_ws_mock_ptr, Exists(_, _)).WillRepeatedly(Return(
true));
344 EXPECT_CALL(*m_ws_mock_ptr, GetLogsPath()).WillRepeatedly(Return(
"logs"));
345 EXPECT_CALL(*m_ws_mock_ptr, GetSpecificationPath())
346 .WillRepeatedly(Return(
"specification.json"));
347 EXPECT_CALL(*m_ws_mock_ptr,
348 Exists(std::filesystem::path(
"TEST.ID.fits"), std::filesystem::file_type::regular))
349 .WillOnce(Return(
false));
355 ASSERT_EQ(m_proc_factory.procs.size(), 1u);
356 EXPECT_CALL(*m_ws_mock_ptr,
357 Exists(std::filesystem::path(
"TEST.ID.fits"), std::filesystem::file_type::regular))
358 .WillRepeatedly(Return(
true));
361 EXPECT_CALL(*m_ws_mock_ptr,
365 HasSubstr(
"Writing keywords required resizing")))),
371 EXPECT_CALL(*m_ws_mock_ptr, MakeResultSymlink(std::filesystem::path(
"TEST.ID.fits")));
374 std::string message =
375 R
"({"content":{"id":"primary_hdu_resize","message":"Writing keywords required resizing of primary HDU: Add space for at least 73 keywords to avoid resize"},"timestamp":1650964356522093759,"type":"alert"})";
376 m_proc_factory.procs[0]->stdout(8, message);
379 m_proc_factory.procs[0]->stdout(8,
"This is invalid JSON");
382 m_proc_factory.procs[0]->promise.set_value(0);
400 EXPECT_CALL(*m_ws_mock_ptr,
401 Exists(std::filesystem::path(
"TEST.ID.fits"), std::filesystem::file_type::regular))
402 .WillRepeatedly(Return(
true));
410 ASSERT_EQ(m_proc_factory.procs.size(), 0u);
419 m_dpspec.receivers.clear();
433 m_initial_status.result =
"TEST.ID.fits";
434 std::vector<std::string> rsync_dummy_args = {
"rsync",
"from",
"to"};
436 EXPECT_CALL(mock, GetArguments()).WillRepeatedly(ReturnRef(rsync_dummy_args));
437 EXPECT_CALL(mock, GetPid()).WillRepeatedly(Return(1234));
447 EXPECT_CALL(*m_ws_mock_ptr, GetSpecificationPath())
448 .WillRepeatedly(Return(
"specification.json"));
451 Exists(std::filesystem::path(
"TEST.ID.fits"), std::filesystem::file_type::regular))
452 .WillRepeatedly(Return(
false));
456 EXPECT_CALL(*m_ws_mock_ptr,
457 MakeHardLink(std::filesystem::path(
"TEST.ID.fits"),
458 std::filesystem::path(m_arcfile_path)))
459 .WillOnce(Return(std::error_code()));
468 <<
"Expected first receiver, which is local, to have used hard links which completes "
478 ASSERT_EQ(m_rsync_factory.procs.size(), 1u);
479 m_rsync_factory.procs[0]->promise.set_value(0);
490 m_initial_status.result =
"TEST.ID.fits";
491 std::vector<std::string> rsync_dummy_args = {
"rsync",
"from",
"to"};
493 EXPECT_CALL(mock, GetArguments()).WillRepeatedly(ReturnRef(rsync_dummy_args));
494 EXPECT_CALL(mock, GetPid()).WillRepeatedly(Return(1234));
504 EXPECT_CALL(*m_ws_mock_ptr, GetSpecificationPath())
505 .WillRepeatedly(Return(
"specification.json"));
508 Exists(std::filesystem::path(
"TEST.ID.fits"), std::filesystem::file_type::regular))
509 .WillRepeatedly(Return(
false));
513 EXPECT_CALL(*m_ws_mock_ptr,
514 MakeHardLink(std::filesystem::path(
"TEST.ID.fits"), m_arcfile_path))
515 .WillOnce(Return(std::make_error_code(std::errc::not_supported)));
516 EXPECT_CALL(*m_ws_mock_ptr,
517 MakeSymlink(std::filesystem::path(
"TEST.ID.fits"), m_arcfile_path))
518 .WillOnce(Return(std::error_code()));
527 <<
"Expected first receiver, which is local, to have used hard links which completes "
537 ASSERT_EQ(m_rsync_factory.procs.size(), 1u);
538 m_rsync_factory.procs[0]->promise.set_value(0);
549 receiver.
path =
"/local/olas/";
550 receiver.options.allow_symlink =
false;
551 m_dpspec.receivers.clear();
552 m_dpspec.receivers.push_back(receiver);
555 m_initial_status.result =
"TEST.ID.fits";
556 std::vector<std::string> rsync_dummy_args = {
"rsync",
"from",
"to"};
558 EXPECT_CALL(mock, GetArguments()).WillRepeatedly(ReturnRef(rsync_dummy_args));
559 EXPECT_CALL(mock, GetPid()).WillRepeatedly(Return(1234));
569 EXPECT_CALL(*m_ws_mock_ptr, GetSpecificationPath())
570 .WillRepeatedly(Return(
"specification.json"));
573 Exists(std::filesystem::path(
"TEST.ID.fits"), std::filesystem::file_type::regular))
574 .WillRepeatedly(Return(
false));
578 EXPECT_CALL(*m_ws_mock_ptr,
579 MakeHardLink(std::filesystem::path(
"TEST.ID.fits"), m_arcfile_path))
580 .WillOnce(Return(std::make_error_code(std::errc::not_supported)));
581 EXPECT_CALL(*m_ws_mock_ptr, MakeSymlink(_, _)).Times(0);
596 ASSERT_EQ(m_rsync_factory.procs.size(), 1u);
597 m_rsync_factory.procs[0]->promise.set_value(0);
608 m_initial_status.result =
"TEST.ID.fits";
609 std::vector<std::string> rsync_dummy_args = {
"rsync",
"from",
"to"};
611 EXPECT_CALL(mock, GetArguments()).WillRepeatedly(ReturnRef(rsync_dummy_args));
612 EXPECT_CALL(mock, GetPid()).WillRepeatedly(Return(1234));
622 EXPECT_CALL(*m_ws_mock_ptr, GetSpecificationPath())
623 .WillRepeatedly(Return(
"specification.json"));
626 Exists(std::filesystem::path(
"TEST.ID.fits"), std::filesystem::file_type::regular))
627 .WillRepeatedly(Return(
false));
631 EXPECT_CALL(*m_ws_mock_ptr,
632 MakeHardLink(std::filesystem::path(
"TEST.ID.fits"),
633 std::filesystem::path(m_arcfile_path)))
634 .WillOnce(Return(std::make_error_code(std::errc::not_supported)));
635 EXPECT_CALL(*m_ws_mock_ptr,
636 MakeSymlink(std::filesystem::path(
"TEST.ID.fits"),
637 std::filesystem::path(m_arcfile_path)))
638 .WillOnce(Return(std::make_error_code(std::errc::not_supported)));
655 ASSERT_EQ(m_rsync_factory.procs.size(), 2u);
656 m_rsync_factory.procs[0]->promise.set_value(0);
657 m_rsync_factory.procs[1]->promise.set_value(0);
669 m_initial_status.result =
"TEST.ID.fits";
670 std::vector<std::string> rsync_dummy_args = {
"rsync",
"from",
"to"};
672 EXPECT_CALL(mock, GetArguments()).WillRepeatedly(ReturnRef(rsync_dummy_args));
673 EXPECT_CALL(mock, GetPid()).WillRepeatedly(Return(1234));
687 EXPECT_CALL(*m_ws_mock_ptr, GetSpecificationPath())
688 .WillRepeatedly(Return(
"specification.json"));
691 Exists(std::filesystem::path(
"TEST.ID.fits"), std::filesystem::file_type::regular))
692 .WillRepeatedly(Return(
false));
696 EXPECT_CALL(*m_ws_mock_ptr,
697 MakeHardLink(std::filesystem::path(
"TEST.ID.fits"), m_arcfile_path))
698 .WillOnce(Return(std::make_error_code(std::errc::not_supported)));
700 EXPECT_CALL(*m_ws_mock_ptr,
701 MakeSymlink(std::filesystem::path(
"TEST.ID.fits"), m_arcfile_path))
702 .WillOnce(Return(std::make_error_code(std::errc::not_supported)));
718 ASSERT_EQ(m_rsync_factory.procs.size(), 2u);
719 m_rsync_factory.procs[0]->promise.set_value(1);
720 m_rsync_factory.procs[1]->promise.set_value(0);
726 EXPECT_EQ(1, m_daq->GetStatus().GetAlerts().size()) <<
"Expected one alert from failed rsync";
Provides location of fits source file.
void Add(SourceFile const &source, std::filesystem::path const &path)
Adds path so it is resolved using source_name and location.
auto GetMapping() const noexcept -> Mapping const &
Get native representation of source mapping for serialization.
DaqControllerOptions m_options
rad::IoExecutor m_executor
boost::asio::io_context m_io_ctx
std::unique_ptr< DaqControllerImpl > m_daq
daq::dpm::MockDaqWorkspace * m_ws_mock_ptr
std::filesystem::path m_arcfile_path
void PollUntil(Pred &&p)
Poll one handler at a time until predicate has been satisifed or io_context runs out of work.
SourceResolver m_resolver
FakeProcFactory m_proc_factory
void PostSetUp()
Specifically sets up expectations and other things based on current state.
FakeRsyncFactory m_rsync_factory
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Mocks for daq::RsyncAsyncProcessIf.
Mocks for daq::dpm::Scheduler and daq::dpm::DaqScheduler.
constexpr std::string_view COLLECTING_RSYNC
Failure during rsync source copy.
constexpr std::string_view MERGING_MERGE
Merging failed.
TEST_F(TestDaqController, ScheduledTransitionsToCollecting)
Options for DaqController.
std::filesystem::path path
Absolute path to the OLAS "incoming directory" where DPM will drop files.
Location ParseSourceLocation(std::string const &location_str)
Parse location string from DpSpec into component parts.
Target target
Describes target which will become the data produtc.
std::optional< FitsFileSource > source
ReceiverList receivers
Ordered container of receivers where to deliver the target data product.
std::vector< SourceTypes > sources
List of sources to create data product from.
Close representation of the JSON structure but with stronger types.
Represents OlasReceiver JSON type used in StartDaqV2 and dpspec.
bool HasError(Status const &status) noexcept
@ Completed
Completed DAQ.
@ Scheduled
daq is acknowledged by dpm and is scheduled for merging (i.e.
@ Releasing
Releasing Data Product to receivers.
@ Collecting
Input files are being collected.
@ Merging
DAQ is being merged.
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
Options controlling rsync invocation.
daq::dpm::Scheduler and related class declarations.
Non observable status object that keeps stores status of data acquisition.
std::map< std::size_t, ReceiverStatus > receivers
Receiver processing (e.g.
std::vector< Alert > alerts
Active alerts.
TimePoint timestamp
Timestamp of last update.
std::unique_ptr< AsyncProcessIf > operator()(boost::asio::io_context &, std::vector< std::string >)
std::vector< MockAsyncProcess * > procs
std::function< void(MockAsyncProcess &)> Hook
std::function< void(MockRsyncAsyncProcess &)> Hook
std::vector< std::string > rsync_dummy_args
std::unique_ptr< RsyncAsyncProcessIf > operator()(boost::asio::io_context &, std::string, std::string, RsyncOptions const &, RsyncAsyncProcess::DryRun)
void AddDefaultExpectations(MockRsyncAsyncProcess &mock) const
std::vector< MockRsyncAsyncProcess * > procs
std::filesystem::path path
EXPECT_EQ(meta.rr_uri, "zpb.rr://meta")
ASSERT_EQ(meta.keyword_rules.size(), 1u)
ASSERT_TRUE(std::holds_alternative< OlasReceiver >(spec.receivers[0]))