12 #include <fmt/format.h>
13 #include <log4cplus/loggingmacros.h>
18 #include "mock/mockWorkspace.hpp"
20 #include <gmock/gmock.h>
21 #include <gtest/gtest.h>
25 using namespace ::testing;
29 std::unique_ptr<RsyncAsyncProcessIf>
operator()(boost::asio::io_context&,
34 auto rsync = std::make_unique<MockRsyncAsyncProcess>();
38 procs.push_back(rsync.get());
44 std::vector<MockRsyncAsyncProcess*>
procs;
51 std::unique_ptr<AsyncProcessIf>
operator()(boost::asio::io_context&, std::vector<std::string>) {
52 auto rsync = std::make_unique<MockRsyncAsyncProcess>();
56 procs.push_back(rsync.get());
62 std::vector<MockAsyncProcess*>
procs;
119 std::filesystem::path sources_root =
"sources";
126 if (std::holds_alternative<json::FitsFileSource>(s)) {
127 auto const& source = std::get<json::FitsFileSource>(s);
132 auto ws = std::make_unique<daq::dpm::MockDaqWorkspace>();
136 EXPECT_CALL(*ws, LoadSpecification()).WillRepeatedly(Return(
m_dpspec));
137 EXPECT_CALL(*ws, GetSourcesPath()).WillRepeatedly(Return(std::filesystem::path(
"sources")));
138 EXPECT_CALL(*ws, GetResultPath()).WillRepeatedly(Return(std::filesystem::path(
"")));
139 EXPECT_CALL(*ws, GetSourceLookupPath())
140 .WillRepeatedly(Return(std::filesystem::path(
"sources.json")));
141 EXPECT_CALL(*ws, GetPath())
143 .WillRepeatedly(Return(std::filesystem::path(
"")));
161 template <
class Pred>
166 if (
m_io_ctx.poll_one() == 0 || p()) {
169 }
catch (std::exception
const& ex) {
170 LOG4CPLUS_ERROR(
"test",
"Exception bubbled up through ASIO: " << ex.what());
182 std::unique_ptr<DaqControllerImpl>
m_daq;
189 using namespace ::testing;
194 EXPECT_CALL(*m_ws_mock_ptr, StoreSourceLookup(_));
205 m_dpspec.target.source = std::nullopt;
206 m_dpspec.sources.clear();
223 m_resources.net_receive.SetLimit(0);
226 EXPECT_EQ(m_resolver.GetMapping().size(), 2u)
227 <<
"Setup should have added target and one extra source";
230 Exists(std::filesystem::path(
"sources/target.fits"), std::filesystem::file_type::regular))
232 .WillRepeatedly(Return(
false));
235 Exists(std::filesystem::path(
"sources/source.fits"), std::filesystem::file_type::regular))
237 .WillRepeatedly(Return(
false));
245 Exists(std::filesystem::path(
"sources/source.fits"), std::filesystem::file_type::regular))
246 .WillOnce(Return(
true));
249 Exists(std::filesystem::path(
"sources/target.fits"), std::filesystem::file_type::regular))
250 .WillOnce(Return(
true));
254 for (
auto& proc : m_rsync_factory.procs) {
255 proc->promise.set_value(0);
268 m_resources.net_receive.SetLimit(0);
271 EXPECT_EQ(m_resolver.GetMapping().size(), 2u)
272 <<
"Setup should have added target and one extra source";
275 Exists(std::filesystem::path(
"sources/target.fits"), std::filesystem::file_type::regular))
276 .WillOnce(Return(
false));
279 Exists(std::filesystem::path(
"sources/source.fits"), std::filesystem::file_type::regular))
280 .WillOnce(Return(
false));
287 EXPECT_CALL(*m_ws_mock_ptr,
294 for (
auto& proc : m_rsync_factory.procs) {
295 proc->promise.set_value(
error++);
301 EXPECT_TRUE(m_daq->IsStopped());
313 m_initial_status.error =
true;
317 EXPECT_CALL(*m_ws_mock_ptr, Exists(_, _)).WillRepeatedly(Return(
true));
333 EXPECT_CALL(*m_ws_mock_ptr, GetSpecificationPath())
334 .WillRepeatedly(Return(
"specification.json"));
335 EXPECT_CALL(*m_ws_mock_ptr,
336 Exists(std::filesystem::path(
"TEST.ID.fits"), std::filesystem::file_type::regular))
337 .WillOnce(Return(
false));
343 ASSERT_EQ(m_proc_factory.procs.size(), 1u);
344 EXPECT_CALL(*m_ws_mock_ptr,
345 Exists(std::filesystem::path(
"TEST.ID.fits"), std::filesystem::file_type::regular))
346 .WillRepeatedly(Return(
true));
349 EXPECT_CALL(*m_ws_mock_ptr,
353 HasSubstr(
"Writing keywords required resizing")))),
359 EXPECT_CALL(*m_ws_mock_ptr, MakeResultSymlink(std::filesystem::path(
"TEST.ID.fits")));
362 std::string message =
363 R
"({"content":{"id":"primary_hdu_resize","message":"Writing keywords required resizing of primary HDU: Add space for at least 73 keywords to avoid resize"},"timestamp":1650964356522093759,"type":"alert"})";
364 m_proc_factory.procs[0]->stdout(8, message);
367 m_proc_factory.procs[0]->stdout(8,
"This is invalid JSON");
370 m_proc_factory.procs[0]->promise.set_value(0);
385 m_initial_status.error =
true;
388 EXPECT_CALL(*m_ws_mock_ptr,
389 Exists(std::filesystem::path(
"TEST.ID.fits"), std::filesystem::file_type::regular))
390 .WillRepeatedly(Return(
true));
398 ASSERT_EQ(m_proc_factory.procs.size(), 0u);
406 m_initial_status.error =
false;
408 m_dpspec.receivers.clear();
422 m_initial_status.result =
"TEST.ID.fits";
423 std::vector<std::string> rsync_dummy_args = {
"rsync",
"from",
"to"};
425 EXPECT_CALL(mock, GetArguments()).WillRepeatedly(ReturnRef(rsync_dummy_args));
426 EXPECT_CALL(mock, GetPid()).WillRepeatedly(Return(1234));
436 EXPECT_CALL(*m_ws_mock_ptr, GetSpecificationPath())
437 .WillRepeatedly(Return(
"specification.json"));
440 Exists(std::filesystem::path(
"TEST.ID.fits"), std::filesystem::file_type::regular))
441 .WillRepeatedly(Return(
false));
445 EXPECT_CALL(*m_ws_mock_ptr,
446 MakeHardLink(std::filesystem::path(
"TEST.ID.fits"),
447 std::filesystem::path(m_arcfile_path)))
448 .WillOnce(Return(std::error_code()));
457 <<
"Expected first receiver, which is local, to have used hard links which completes "
467 ASSERT_EQ(m_rsync_factory.procs.size(), 1u);
468 m_rsync_factory.procs[0]->promise.set_value(0);
479 m_initial_status.result =
"TEST.ID.fits";
480 std::vector<std::string> rsync_dummy_args = {
"rsync",
"from",
"to"};
482 EXPECT_CALL(mock, GetArguments()).WillRepeatedly(ReturnRef(rsync_dummy_args));
483 EXPECT_CALL(mock, GetPid()).WillRepeatedly(Return(1234));
493 EXPECT_CALL(*m_ws_mock_ptr, GetSpecificationPath())
494 .WillRepeatedly(Return(
"specification.json"));
497 Exists(std::filesystem::path(
"TEST.ID.fits"), std::filesystem::file_type::regular))
498 .WillRepeatedly(Return(
false));
502 EXPECT_CALL(*m_ws_mock_ptr,
503 MakeHardLink(std::filesystem::path(
"TEST.ID.fits"), m_arcfile_path))
504 .WillOnce(Return(std::make_error_code(std::errc::not_supported)));
505 EXPECT_CALL(*m_ws_mock_ptr,
506 MakeSymlink(std::filesystem::path(
"TEST.ID.fits"), m_arcfile_path))
507 .WillOnce(Return(std::error_code()));
516 <<
"Expected first receiver, which is local, to have used hard links which completes "
526 ASSERT_EQ(m_rsync_factory.procs.size(), 1u);
527 m_rsync_factory.procs[0]->promise.set_value(0);
538 receiver.
path =
"/local/olas/";
539 receiver.options.allow_symlink =
false;
540 m_dpspec.receivers.clear();
541 m_dpspec.receivers.push_back(receiver);
544 m_initial_status.result =
"TEST.ID.fits";
545 std::vector<std::string> rsync_dummy_args = {
"rsync",
"from",
"to"};
547 EXPECT_CALL(mock, GetArguments()).WillRepeatedly(ReturnRef(rsync_dummy_args));
548 EXPECT_CALL(mock, GetPid()).WillRepeatedly(Return(1234));
558 EXPECT_CALL(*m_ws_mock_ptr, GetSpecificationPath())
559 .WillRepeatedly(Return(
"specification.json"));
562 Exists(std::filesystem::path(
"TEST.ID.fits"), std::filesystem::file_type::regular))
563 .WillRepeatedly(Return(
false));
567 EXPECT_CALL(*m_ws_mock_ptr,
568 MakeHardLink(std::filesystem::path(
"TEST.ID.fits"), m_arcfile_path))
569 .WillOnce(Return(std::make_error_code(std::errc::not_supported)));
570 EXPECT_CALL(*m_ws_mock_ptr, MakeSymlink(_, _)).Times(0);
585 ASSERT_EQ(m_rsync_factory.procs.size(), 1u);
586 m_rsync_factory.procs[0]->promise.set_value(0);
597 m_initial_status.result =
"TEST.ID.fits";
598 std::vector<std::string> rsync_dummy_args = {
"rsync",
"from",
"to"};
600 EXPECT_CALL(mock, GetArguments()).WillRepeatedly(ReturnRef(rsync_dummy_args));
601 EXPECT_CALL(mock, GetPid()).WillRepeatedly(Return(1234));
611 EXPECT_CALL(*m_ws_mock_ptr, GetSpecificationPath())
612 .WillRepeatedly(Return(
"specification.json"));
615 Exists(std::filesystem::path(
"TEST.ID.fits"), std::filesystem::file_type::regular))
616 .WillRepeatedly(Return(
false));
620 EXPECT_CALL(*m_ws_mock_ptr,
621 MakeHardLink(std::filesystem::path(
"TEST.ID.fits"),
622 std::filesystem::path(m_arcfile_path)))
623 .WillOnce(Return(std::make_error_code(std::errc::not_supported)));
624 EXPECT_CALL(*m_ws_mock_ptr,
625 MakeSymlink(std::filesystem::path(
"TEST.ID.fits"),
626 std::filesystem::path(m_arcfile_path)))
627 .WillOnce(Return(std::make_error_code(std::errc::not_supported)));
644 ASSERT_EQ(m_rsync_factory.procs.size(), 2u);
645 m_rsync_factory.procs[0]->promise.set_value(0);
646 m_rsync_factory.procs[1]->promise.set_value(0);
658 m_initial_status.result =
"TEST.ID.fits";
659 std::vector<std::string> rsync_dummy_args = {
"rsync",
"from",
"to"};
661 EXPECT_CALL(mock, GetArguments()).WillRepeatedly(ReturnRef(rsync_dummy_args));
662 EXPECT_CALL(mock, GetPid()).WillRepeatedly(Return(1234));
676 EXPECT_CALL(*m_ws_mock_ptr, GetSpecificationPath())
677 .WillRepeatedly(Return(
"specification.json"));
680 Exists(std::filesystem::path(
"TEST.ID.fits"), std::filesystem::file_type::regular))
681 .WillRepeatedly(Return(
false));
685 EXPECT_CALL(*m_ws_mock_ptr,
686 MakeHardLink(std::filesystem::path(
"TEST.ID.fits"), m_arcfile_path))
687 .WillOnce(Return(std::make_error_code(std::errc::not_supported)));
689 EXPECT_CALL(*m_ws_mock_ptr,
690 MakeSymlink(std::filesystem::path(
"TEST.ID.fits"), m_arcfile_path))
691 .WillOnce(Return(std::make_error_code(std::errc::not_supported)));
707 ASSERT_EQ(m_rsync_factory.procs.size(), 2u);
708 m_rsync_factory.procs[0]->promise.set_value(1);
709 m_rsync_factory.procs[1]->promise.set_value(0);
715 EXPECT_EQ(1, m_daq->GetStatus().GetAlerts().size()) <<
"Expected one alert from failed rsync";
Provides location of fits source file.
void Add(SourceFile const &source, std::filesystem::path const &path)
Adds path so it is resolved using source_name and location.
auto GetMapping() const noexcept -> Mapping const &
Get native representation of source mapping for serialization.
DaqControllerOptions m_options
rad::IoExecutor m_executor
boost::asio::io_context m_io_ctx
std::unique_ptr< DaqControllerImpl > m_daq
daq::dpm::MockDaqWorkspace * m_ws_mock_ptr
std::filesystem::path m_arcfile_path
void PollUntil(Pred &&p)
Poll one handler at a time until predicate has been satisifed or io_context runs out of work.
SourceResolver m_resolver
FakeProcFactory m_proc_factory
void PostSetUp()
Specifically sets up expectations and other things based on current state.
FakeRsyncFactory m_rsync_factory
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Mocks for daq::RsyncAsyncProcessIf.
Mocks for daq::dpm::Scheduler and daq::dpm::DaqScheduler.
TEST_F(TestDaqController, ScheduledTransitionsToCollecting)
Options for DaqController.
std::filesystem::path path
Absolute path to the OLAS "incoming directory" where DPM will drop files.
Location ParseSourceLocation(std::string const &location_str)
Parse location string from DpSpec into component parts.
Target target
Describes target which will become the data produtc.
std::optional< FitsFileSource > source
ReceiverList receivers
Ordered container of receivers where to deliver the target data product.
std::vector< SourceTypes > sources
List of sources to create data product from.
Close representation of the JSON structure but with stronger types.
Represents OlasReceiver JSON type used in StartDaqV2 and dpspec.
@ Completed
Completed DAQ.
@ Scheduled
daq is acknowledged by dpm and is scheduled for merging (i.e.
@ Releasing
Releasing Data Product to receivers.
@ Collecting
Input files are being collected.
@ Merging
DAQ is being merged.
Options controlling rsync invocation.
daq::dpm::Scheduler and related class declarations.
Non observable status object that keeps stores status of data acquisition.
std::map< std::size_t, ReceiverStatus > receivers
Receiver processing (e.g.
std::vector< Alert > alerts
Active alerts.
TimePoint timestamp
Timestamp of last update.
std::vector< MockAsyncProcess * > procs
std::unique_ptr< AsyncProcessIf > operator()(boost::asio::io_context &, std::vector< std::string >)
std::function< void(MockAsyncProcess &)> Hook
std::unique_ptr< RsyncAsyncProcessIf > operator()(boost::asio::io_context &, std::string, std::string, RsyncOptions const &, RsyncAsyncProcess::DryRun)
std::function< void(MockRsyncAsyncProcess &)> Hook
std::vector< MockRsyncAsyncProcess * > procs
std::filesystem::path path
EXPECT_EQ(meta.rr_uri, "zpb.rr://meta")
ASSERT_EQ(meta.keyword_rules.size(), 1u)
ASSERT_TRUE(std::holds_alternative< OlasReceiver >(spec.receivers[0]))