14 #include <boost/asio/post.hpp>
15 #include <fmt/format.h>
16 #include <fmt/ostream.h>
17 #include <log4cplus/loggingmacros.h>
26 void PrintArgs(log4cplus::Logger& logger, std::vector<std::string>
const& args) {
30 for (
auto const& token : args) {
38 LOG4CPLUS_DEBUG(logger,
"Executing merger with args: " << ss.str());
53 if (m_limit > 0 && m_used < m_limit) {
59 os <<
"DAQ{" <<
daq.GetStatus() <<
"}";
67 : m_executor(executor)
68 , m_workspace(workspace)
69 , m_daq_controller_factory(std::move(daq_controller_factory))
71 , m_logger(log4cplus::Logger::getInstance(
"dpm.scheduler"))
72 , m_liveness(std::make_shared<bool>(false)) {
74 auto slot = [liveness = std::weak_ptr<bool>(m_liveness),
this]() {
75 if (
auto ptr = liveness.lock(); ptr) {
96 LOG4CPLUS_TRACE(m_logger,
"QueueDaq()");
98 auto json = nlohmann::json::parse(dp_spec_serialized);
100 std::string
const&
id = dp_spec.
id;
103 LOG4CPLUS_ERROR(m_logger,
"QueueDaq(): DAQ conflict detected -> aborting");
104 throw std::invalid_argument(
105 fmt::format(
"Scheduler: Could not queue DAQ for merging as "
106 "a Data Acquisition with same id has been queued before: '{}'",
111 LOG4CPLUS_INFO(m_logger,
112 fmt::format(
"QueueDaq(): Initializing new workspace for DAQ {}",
id));
118 initial_status.
id = id;
119 initial_status.
file_id = file_id;
121 initial_status.
state = State::Scheduled;
122 initial_status.
error =
false;
123 initial_status.
timestamp = Status::TimePoint::clock::now();
125 daq_ws->StoreStatus(initial_status);
126 daq_ws->StoreSpecification(dp_spec_serialized);
129 m_queue.push_back(
id);
138 std::throw_with_nested(std::runtime_error(
"Failed to store DAQ queue"));
141 std::throw_with_nested(
142 std::runtime_error(
"Failed to write status to DAQ workspace"));
147 std::throw_with_nested(std::runtime_error(fmt::format(
148 "Failed to initialize DAQ workspace in {}", m_workspace.
GetPath().native())));
151 LOG4CPLUS_DEBUG(m_logger,
152 "Invalid Data Product Specification provided: \n"
153 << dp_spec_serialized);
155 std::throw_with_nested(
156 std::invalid_argument(
"Scheduler: Could not queue DAQ for merging because "
157 "Data Product Specification is invalid"));
158 }
catch (nlohmann::json::parse_error
const&) {
159 std::throw_with_nested(
160 std::invalid_argument(
"Scheduler: Could not queue DAQ for merging because provided "
161 "data product specification is invalid JSON"));
166 return std::find(m_queue.begin(), m_queue.end(),
id) != m_queue.end();
170 LOG4CPLUS_TRACE(m_logger, fmt::format(
"GetDaqStatus({})",
id));
172 LOG4CPLUS_INFO(m_logger, fmt::format(
"GetDaqStatus({}): No such ID",
id));
173 throw std::invalid_argument(fmt::format(
"DAQ with id='{}' not found",
id));
175 return m_workspace.LoadDaq(
id)->LoadStatus();
177 std::throw_with_nested(std::runtime_error(
"Scheduler: GetDaqStatus() failed"));
185 LOG4CPLUS_TRACE(m_logger, fmt::format(
"AbortDaq({})",
id));
191 LOG4CPLUS_INFO(m_logger, fmt::format(
"AbortDaq({}): No such ID",
id));
192 throw std::invalid_argument(fmt::format(
"DAQ with id='{}' not found",
id));
196 auto it = std::find_if(m_active.begin(), m_active.end(), [&
id](Active
const& active) {
198 return id == active.daq->GetId();
202 auto pub = [&, status = m_workspace.
LoadDaq(
id)->LoadStatus()](
State new_state)
mutable {
203 status.state = State::AbortingMerging;
204 m_status_signal(status);
207 if (it != m_active.end()) {
208 auto state = it->daq->GetState();
210 LOG4CPLUS_DEBUG(m_logger,
211 fmt::format(
"AbortDaq({}): Cannot abort in state {}",
id, state));
216 fmt::format(
"AbortDaq({}): Erasing active DAQ currently in state {}",
id, state));
221 pub(State::AbortingMerging);
224 LOG4CPLUS_DEBUG(m_logger, fmt::format(
"AbortDaq({}): Removing DAQ from merge queue",
id));
225 m_queue.erase(std::find(m_queue.begin(), m_queue.end(),
id));
229 LOG4CPLUS_DEBUG(m_logger, fmt::format(
"AbortDaq({}): Removing DAQ workspace",
id));
237 LOG4CPLUS_INFO(m_logger, fmt::format(
"AbortDaq({}): Aborted and removed DAQ",
id));
239 std::throw_with_nested(std::runtime_error(
"Scheduler: AbortDaq() failed"));
243 return m_status_signal.connect(slot);
246 void SchedulerImpl::Poll() {
247 LOG4CPLUS_TRACE(m_logger,
"Poll()");
252 void SchedulerImpl::ArchiveCompleted() {
254 for (
auto& active : m_active) {
259 auto id = active.daq->GetId();
261 auto archive_path = m_workspace.
ArchiveDaq(
id);
264 fmt::format(
"DAQ {} is in final state {} -> moved workspace to archive: {}",
266 active.daq->GetState(),
267 archive_path.native()));
273 m_queue.erase(std::find(m_queue.begin(), m_queue.end(),
id));
277 LOG4CPLUS_ERROR(m_logger,
278 "Failed to archive DAQ workspace:\n"
279 << error::NestedExceptionReporter(std::current_exception()));
284 auto remove_it = std::remove_if(
285 m_active.begin(), m_active.end(), [&](
auto const& active) { return !active.daq; });
286 if (remove_it != m_active.end()) {
287 m_active.erase(remove_it, m_active.end());
293 void SchedulerImpl::ActivateFromQueue() {
296 if (m_queue.empty()) {
297 LOG4CPLUS_INFO(m_logger,
"Merge queue empty - all done!");
301 auto candidates = GetCandidates();
302 if (candidates.empty()) {
303 LOG4CPLUS_INFO(m_logger,
"All DAQ merge candidates are active/in-progress");
308 for (
auto const&
id : candidates) {
309 LOG4CPLUS_TRACE(m_logger, fmt::format(
"{}: Attempting to schedule DAQ",
id));
312 LOG4CPLUS_INFO(m_logger,
313 fmt::format(
"Limit reached: Cannot schedule '{}' "
314 "Currently active DAQs at/exceed limit. "
315 "current: {}, limit: {}, queue size: {}",
325 auto ws = m_workspace.
LoadDaq(
id);
326 auto daq_controller = m_daq_controller_factory(std::move(ws), m_resources);
331 daq_controller->GetStatus().ConnectStatus([&](ObservableStatus
const& s) {
337 m_status_signal(s.GetStatus());
340 daq_controller->Start();
342 m_active.emplace_back(std::move(daq_controller), std::move(*maybe_token));
344 LOG4CPLUS_DEBUG(m_logger, fmt::format(
"{}: DAQ scheduled for merging",
id));
348 std::throw_with_nested(
349 std::runtime_error(fmt::format(
"{}: Failed to activate DAQ for merging.",
id)));
353 std::throw_with_nested(std::runtime_error(
"Failed to load "));
357 void SchedulerImpl::DeferredPoll() {
358 if (m_stopped || *m_liveness) {
363 [liveness = std::weak_ptr<bool>(m_liveness),
this]() {
364 if (auto ptr = liveness.lock(); ptr) {
374 std::vector<std::string> SchedulerImpl::GetCandidates()
const {
376 std::vector<std::string> candidates;
377 for (
auto const&
id : m_queue) {
378 auto it = std::find_if(m_active.begin(), m_active.end(), [&](
auto const& active) {
380 return active.daq->GetId() == id;
382 if (it == m_active.end()) {
383 candidates.push_back(
id);
390 std::unique_ptr<DaqWorkspace> workspace,
395 : m_executor(executor)
396 , m_workspace(std::move(workspace))
397 , m_resources(resources)
398 , m_rsync_factory(std::move(rsync_factory))
399 , m_proc_factory(std::move(proc_factory))
400 , m_options(std::move(options))
401 , m_dpspec(m_workspace->LoadSpecification())
403 , m_state_ctx(Scheduled{})
404 , m_status(m_workspace->LoadStatus())
405 , m_status_connection()
406 , m_liveness(std::make_shared<bool>(
false))
408 , m_logger(log4cplus::Logger::getInstance(
"dpm.daqcontroller")) {
409 if (m_options.merge_bin.empty()) {
410 throw std::invalid_argument(
"Specified merge application name is empty");
412 if (m_options.rsync_bin.empty()) {
413 throw std::invalid_argument(
"Specified rsync application name is empty");
418 if (m_status.GetStatus().result.empty()) {
419 m_result = m_workspace->GetResultPath() /
420 (m_dpspec.target.file_prefix + m_dpspec.target.file_id +
".fits");
422 m_result = m_status.GetStatus().result;
426 auto error = m_status.GetError();
427 switch (m_status.GetState()) {
428 case State::Scheduled:
429 SetState(Scheduled{},
error);
432 SetState(Transferring{SourceResolver(m_workspace->LoadSourceLookup())},
error);
435 SetState(Merging(),
error);
438 SetState(Completed{},
error);
441 throw std::runtime_error(
"Not implemented");
444 m_status_connection = m_status.ConnectStatus(
445 [prev = m_status.GetStatus(),
this](ObservableStatus
const& status)
mutable {
446 if (prev == status.GetStatus()) {
450 prev = status.GetStatus();
451 LOG4CPLUS_TRACE(m_logger,
452 fmt::format(
"DaqController: Updating workspace status file: {}", prev));
453 m_workspace->StoreStatus(prev);
471 return m_status.
GetId();
490 void DaqControllerImpl::DeferredPoll() {
491 if (m_stopped || *m_liveness) {
496 [liveness = std::weak_ptr<bool>(m_liveness),
this]() {
497 if (auto ptr = liveness.lock(); ptr) {
508 LOG4CPLUS_TRACE(m_logger,
"Poll()");
512 fmt::format(
"{}: Poll() - DaqController is stopped so nothing will be done", *
this));
516 std::visit([
this](
auto& state) {
Poll(state); }, m_state_ctx);
520 LOG4CPLUS_TRACE(m_logger,
"Poll(Scheduled)");
523 std::vector<DpSpec::SourceFitsFile> sources;
527 for (
auto const& s : m_dpspec.
sources) {
528 if (!std::holds_alternative<DpSpec::SourceFitsFile>(s)) {
531 sources.push_back(std::get<DpSpec::SourceFitsFile>(s));
534 auto sources_path = m_workspace->GetSourcesPath();
536 SourceResolver resolver;
538 for (
auto const& s : sources) {
543 fmt::format(
"{}_{}_{}", index, s.source_name, origin.path.filename().native());
544 LOG4CPLUS_INFO(m_logger,
545 fmt::format(
"Poll(Scheduled): Source file '{}' from source \"{}\" "
546 "on host \"{}\" will be stored in {}",
549 !origin.host.empty() ? origin.host.c_str() :
"<n/a>",
552 resolver.Add({s.source_name, s.origin}, local_path);
556 m_workspace->StoreSourceLookup(resolver.GetMapping());
558 SetState(Transferring(std::move(resolver)));
561 fmt::format(
"{}: Failed to collect and store list of required sources", m_status);
562 LOG4CPLUS_ERROR(m_logger, fmt::format(
"Poll(Scheduled): Failed to process DAQ: {}", msg));
565 std::throw_with_nested(std::runtime_error(msg));
570 LOG4CPLUS_TRACE(m_logger,
"Poll(Transferring)");
575 auto const& sources = ctx.resolver.GetMapping();
576 auto missing = sources.size();
577 auto root = m_workspace->GetPath();
578 for (
auto const& source : sources) {
579 if (ctx.HasTransfer(source.first)) {
584 if (m_workspace->Exists(source.second)) {
591 LOG4CPLUS_TRACE(m_logger,
592 "Poll(Transferring): Soft stop is enabled"
593 "-> won't start new transfer");
601 using Proc = std::shared_ptr<RsyncAsyncProcessIf>;
606 root / source.second,
612 proc->ConnectStderr([logger = log4cplus::Logger::getInstance(
"dpm.transfer")](
613 pid_t pid, std::string
const& line) {
614 LOG4CPLUS_INFO(logger, pid <<
": " << Trim(line));
617 proc->Initiate().then(
619 [liveness = std::weak_ptr<bool>(m_liveness),
620 source = source.first,
621 dest = source.second,
623 this](boost::future<int> f) {
624 if (liveness.expired()) {
625 LOG4CPLUS_ERROR(
"dpm.daqcontroller",
626 fmt::format(
"DaqController abandoned -> ignoring "
627 "result from rsync for transfer of {}",
631 TransferComplete(source, dest, std::move(f));
633 ctx.transfers.emplace_back(Transferring::Transfer{
634 source.first, source.second, std::move(proc), std::move(*token)});
637 LOG4CPLUS_TRACE(m_logger,
638 fmt::format(
"Poll(Transferring): Could not start transfer due to "
639 "resource limit reached: {}",
647 SetState(Merging(),
false);
653 auto msg = fmt::format(
"{}: Failed to transfer required sources", m_status);
654 LOG4CPLUS_ERROR(m_logger,
655 fmt::format(
"Poll(Transferring): Failed to process DAQ: {}", msg));
657 std::throw_with_nested(std::runtime_error(msg));
661 void DaqControllerImpl::TransferComplete(SourceResolver::SourceFile
const& source,
662 std::filesystem::path
const& local_path,
663 boost::future<int> result) noexcept {
664 LOG4CPLUS_TRACE(m_logger,
665 fmt::format(
"{}: TransferComplete: {} -> {}", *
this, source, local_path));
666 auto* ctx = std::get_if<Transferring>(&m_state_ctx);
671 ctx->EraseTransfer(source);
674 int return_code = result.get();
676 if (return_code != 0) {
678 alert_id, fmt::format(
"rsync file transfer failed for remote file {}", source)));
679 LOG4CPLUS_ERROR(m_logger, fmt::format(
"rync file transfer failed: {}", source));
680 throw std::runtime_error(
"rsync failed to transfer file");
683 m_status.ClearAlert(alert_id);
684 LOG4CPLUS_INFO(m_logger,
685 fmt::format(
"{}: Poll(Transferring): Successfully transfer file: {} -> {}",
695 LOG4CPLUS_ERROR(m_logger,
696 fmt::format(
"{}: Poll(Transferring): Failed to transfer file: {} -> {}",
702 if (ctx && ctx->transfers.empty()) {
705 fmt::format(
"{}: Poll(Transferring): All pending transfers have completed "
706 "(with or without error) so we stop DAQ",
713 bool DaqControllerImpl::Transferring::HasTransfer(
714 SourceResolver::SourceFile
const& source)
const noexcept {
715 auto it = std::find_if(transfers.cbegin(), transfers.cend(), [&](Transfer
const& t) {
716 return t.source == source;
718 return it != transfers.cend();
721 DaqControllerImpl::Transferring::Transfer*
722 DaqControllerImpl::Transferring::GetTransfer(SourceResolver::SourceFile
const& source) noexcept {
723 auto it = std::find_if(
724 transfers.begin(), transfers.end(), [&](Transfer
const& t) { return t.source == source; });
726 if (it != transfers.end()) {
732 void DaqControllerImpl::Transferring::EraseTransfer(SourceResolver::SourceFile
const& source) {
733 auto it = std::find_if(transfers.cbegin(), transfers.cend(), [&](Transfer
const& t) {
734 return t.source == source;
736 if (it != transfers.end()) {
741 DaqControllerImpl::Transferring::~Transferring() noexcept {
742 for (
auto& transfer : transfers) {
743 if (transfer.proc && transfer.proc->IsRunning()) {
746 "DaqController::Transferring::~Transferring: Aborting running transfer process");
747 transfer.proc->Abort();
752 DaqControllerImpl::Merging::~Merging() noexcept {
753 if (merger && merger->IsRunning()) {
754 LOG4CPLUS_DEBUG(
"dpm",
"DaqController::Merging::~Merging: Aborting running merge process");
760 LOG4CPLUS_TRACE(m_logger,
"Poll(Merging)");
768 if (m_workspace->Exists(m_result)) {
769 SetState(Completed{});
770 LOG4CPLUS_TRACE(m_logger,
771 fmt::format(
"{}: Poll(Merging): "
772 "Recovered from error automatically (manual merge)",
778 LOG4CPLUS_TRACE(m_logger,
779 fmt::format(
"{}: Poll(Merging): Could not start merging due to "
780 "resource limit reached: {}",
790 std::vector<std::string> args{m_options.
merge_bin};
791 args.emplace_back(
"--json");
792 args.emplace_back(
"--root");
793 args.emplace_back(m_workspace->GetPath());
794 args.emplace_back(
"--resolver");
795 args.emplace_back(m_workspace->GetSourceLookupPath());
796 args.emplace_back(
"-o");
797 args.emplace_back(m_result.native());
799 args.emplace_back(m_workspace->GetPath() / m_workspace->GetSpecificationPath());
801 PrintArgs(m_logger, args);
803 ctx.merger = m_proc_factory(m_executor.
get_io_context(), std::move(args));
805 ctx.merger->ConnectStdout([logger = log4cplus::Logger::getInstance(
"dpm.merger"),
this](
806 pid_t pid, std::string
const& line) {
807 LOG4CPLUS_DEBUG(logger, pid <<
": " << Trim(line));
808 HandleMergeMessage(line);
810 ctx.merger->ConnectStderr([logger = log4cplus::Logger::getInstance(
"dpm.merger")](
811 pid_t pid, std::string
const& line) {
812 LOG4CPLUS_INFO(logger, pid <<
": " << Trim(line));
815 ctx.merger->Initiate().then(
817 [liveness = std::weak_ptr<bool>(m_liveness),
818 id = m_status.
GetId(),
820 this](boost::future<int> f) {
821 if (liveness.expired()) {
822 LOG4CPLUS_ERROR(
"dpm.daqcontroller",
823 fmt::format(
"{}: DaqController abandoned -> ignoring "
824 "result from merger",
828 MergeComplete(std::move(f));
830 ctx.token = std::move(*token);
833 auto msg = fmt::format(
"{}: Failed to initiate merging", *
this);
834 LOG4CPLUS_ERROR(m_logger, msg);
835 std::throw_with_nested(std::runtime_error(msg));
839 void DaqControllerImpl::MergeComplete(boost::future<int> result) noexcept {
840 LOG4CPLUS_TRACE(m_logger,
"MergeComplete()");
845 auto* ctx = std::get_if<Merging>(&m_state_ctx);
849 auto exit_code = result.get();
850 if (exit_code != 0) {
851 auto msg = fmt::format(
"Merging failed with code {}", exit_code);
852 m_status.SetAlert(
MakeAlert(alert_id, msg));
853 throw std::runtime_error(fmt::format(
"Merging failed with code {}", exit_code));
855 if (!m_workspace->Exists(m_result)) {
856 auto abs_path = m_workspace->GetPath() / m_result;
858 fmt::format(
"Merging reported with success but file is not found: {}", abs_path);
859 m_status.SetAlert(
MakeAlert(alert_id, msg));
860 LOG4CPLUS_ERROR(m_logger, msg);
861 throw std::runtime_error(msg);
863 m_status.ClearAlert(alert_id);
866 m_workspace->MakeResultSymlink(m_result);
868 m_status.SetResult(m_result.native());
872 SetState(Completed{});
873 LOG4CPLUS_INFO(m_logger, fmt::format(
"{}: Completed successfully!", *
this));
880 "{}: PollMergeComplete: Failed to create data product: {}", *
this, m_result));
885 LOG4CPLUS_TRACE(m_logger,
"Poll(Completed)");
890 void DaqControllerImpl::SetState(StateVariant s,
bool error) {
891 m_state_ctx = std::move(s);
892 auto new_state = MakeState(m_state_ctx);
902 void DaqControllerImpl::SetError(
bool error) {
913 State DaqControllerImpl::MakeState(StateVariant
const& s) {
914 if (std::holds_alternative<Scheduled>(s)) {
915 return State::Scheduled;
917 if (std::holds_alternative<Transferring>(s)) {
920 if (std::holds_alternative<Merging>(s)) {
923 if (std::holds_alternative<Completed>(s)) {
930 DaqControllerImpl::Transferring::Transferring(SourceResolver resolver_arg)
931 : resolver(std::move(resolver_arg)) {
934 void DaqControllerImpl::Merging::Reset() {
939 void DaqControllerImpl::HandleMergeMessage(std::string
const& line) noexcept
try {
944 auto json = nlohmann::json::parse(line);
945 auto const& type =
json.at(
"type").get<std::string>();
946 auto const& content =
json.at(
"content");
947 if (type ==
"alert") {
953 content.at(
"id").get_to(
id);
954 content.at(
"message").get_to(message);
956 m_status.SetAlert(
MakeAlert(alert_id, message));
959 LOG4CPLUS_DEBUG(m_logger,
"Failed to parse JSON message from merger");