14#include <boost/asio/post.hpp>
15#include <boost/range/adaptor/indexed.hpp>
16#include <fmt/format.h>
18#include <log4cplus/loggingmacros.h>
29void PrintArgs(log4cplus::Logger& logger, std::vector<std::string>
const& args) {
33 for (
auto const& token : args) {
41 LOG4CPLUS_DEBUG(logger,
"Executing merger with args: " << ss.str());
44void CheckConsistency(Status
const& status, json::DpSpec
const& spec) {
45 if (status.id != spec.id) {
46 throw std::invalid_argument(
47 fmt::format(
"Scheduler: Could not queue DAQ for merging as "
48 "provided status does not match provided data product specification "
49 "w.r.t DAQ Id: {} vs {}",
53 if (status.file_id != spec.target.file_id) {
54 throw std::invalid_argument(
55 fmt::format(
"Scheduler: Could not queue DAQ for merging as "
56 "provided status does not match provided data product specification "
57 "w.r.t DAQ File Id: {} vs {}",
59 spec.target.file_id));
63auto ParseDpSpec(log4cplus::Logger
const& logger, std::string
const& spec) -> json::DpSpec
try {
65}
catch (json::DpSpecError
const&) {
66 LOG4CPLUS_ERROR(logger,
"Invalid Data Product Specification provided: \n" << spec);
68 std::throw_with_nested(
69 std::invalid_argument(
"Scheduler: Could not queue DAQ for merging because "
70 "Data Product Specification is invalid"));
71}
catch (nlohmann::json::parse_error
const&) {
72 LOG4CPLUS_ERROR(logger,
"Invalid Data Product Specification provided: \n" << spec);
73 std::throw_with_nested(
74 std::invalid_argument(
"Scheduler: Could not queue DAQ for merging because provided "
75 "data product specification is invalid JSON"));
78auto ParseStatus(log4cplus::Logger
const& logger, std::string
const& status)
79 -> std::optional<Status>
try {
83 auto json = nlohmann::json::parse(status);
84 return json.get<Status>();
85}
catch (nlohmann::json::parse_error
const&) {
86 std::throw_with_nested(
87 std::invalid_argument(
"Scheduler: Could not queue DAQ for merging because provided "
88 "DAQ status is invalid JSON"));
94 os <<
"DAQ{" <<
daq.GetStatus() <<
"}";
102 : m_executor(executor)
103 , m_workspace(workspace)
104 , m_daq_controller_factory(std::move(daq_controller_factory))
107 , m_liveness(std::make_shared<bool>(false)) {
109 auto slot = [liveness = std::weak_ptr<bool>(m_liveness),
this]() {
110 if (
auto ptr = liveness.lock(); ptr) {
115 auto& conns = m_resources_connections;
132 std::string
const& status_serialized) {
133 LOG4CPLUS_TRACE(m_logger,
"QueueDaq()");
135 json::DpSpec dp_spec = ParseDpSpec(m_logger, dp_spec_serialized);
136 Status initial_status = ParseStatus(m_logger, status_serialized)
138 CheckConsistency(initial_status, dp_spec);
139 std::string
const&
id = dp_spec.
id;
141 LOG4CPLUS_ERROR(m_logger,
"QueueDaq(): DAQ conflict detected -> aborting");
142 throw std::invalid_argument(
143 fmt::format(
"Scheduler: Could not queue DAQ for merging as "
144 "a Data Acquisition with same id has been queued before: '{}'",
149 LOG4CPLUS_INFO(m_logger,
150 fmt::format(
"QueueDaq(): Initializing new workspace for DAQ {}",
id));
157 initial_status.
timestamp = Status::TimePoint::clock::now();
159 daq_ws->StoreStatus(initial_status);
160 daq_ws->StoreSpecification(dp_spec_serialized);
163 m_queue.push_back(
id);
172 std::throw_with_nested(std::runtime_error(
"Failed to store DAQ queue"));
175 std::throw_with_nested(
176 std::runtime_error(
"Failed to write status to DAQ workspace"));
181 std::throw_with_nested(std::runtime_error(fmt::format(
182 "Failed to initialize DAQ workspace in {}", m_workspace.
GetPath().native())));
184 }
catch (std::invalid_argument
const& e) {
185 LOG4CPLUS_ERROR(m_logger,
"Failed to queue DAQ: " << e.what());
186 std::throw_with_nested(std::invalid_argument(
"Scheduler: Failed to queue DAQ"));
187 }
catch (std::exception
const& e) {
188 LOG4CPLUS_ERROR(m_logger,
"Failed to queue DAQ: " << e.what());
189 std::throw_with_nested(std::runtime_error(
"Scheduler: Failed to queue DAQ"));
194 return std::find(m_queue.begin(), m_queue.end(),
id) != m_queue.end();
198 LOG4CPLUS_TRACE(m_logger, fmt::format(
"GetDaqStatus({})",
id));
201 auto maybe_status = m_workspace.LoadArchivedStatus(
id);
203 return *maybe_status;
205 LOG4CPLUS_INFO(m_logger, fmt::format(
"GetDaqStatus({}): No such ID",
id));
206 throw std::invalid_argument(fmt::format(
"DAQ with id='{}' not found",
id));
209 return m_workspace.LoadDaq(
id)->LoadStatus();
211 std::throw_with_nested(std::runtime_error(
"Scheduler: GetDaqStatus() failed"));
219 LOG4CPLUS_TRACE(m_logger, fmt::format(
"AbortDaq({})",
id));
225 LOG4CPLUS_INFO(m_logger, fmt::format(
"AbortDaq({}): No such ID",
id));
226 throw std::invalid_argument(fmt::format(
"DAQ with id='{}' not found",
id));
230 auto it = std::find_if(m_active.begin(), m_active.end(), [&
id](Active
const& active) {
232 return id == active.daq->GetId();
236 auto pub = [&, status = m_workspace.
LoadDaq(
id)->LoadStatus()](
State new_state)
mutable {
238 m_status_signal(status);
241 if (it != m_active.end()) {
242 auto state = it->daq->GetState();
244 LOG4CPLUS_DEBUG(m_logger,
245 fmt::format(
"AbortDaq({}): Cannot abort in state {}",
id, state));
250 fmt::format(
"AbortDaq({}): Erasing active DAQ currently in state {}",
id, state));
258 LOG4CPLUS_DEBUG(m_logger, fmt::format(
"AbortDaq({}): Removing DAQ from merge queue",
id));
259 m_queue.erase(std::find(m_queue.begin(), m_queue.end(),
id));
263 LOG4CPLUS_DEBUG(m_logger, fmt::format(
"AbortDaq({}): Removing DAQ workspace",
id));
271 LOG4CPLUS_INFO(m_logger, fmt::format(
"AbortDaq({}): Aborted and removed DAQ",
id));
273 std::throw_with_nested(std::runtime_error(
"Scheduler: AbortDaq() failed"));
277 return m_status_signal.connect(slot);
280void SchedulerImpl::Poll() {
281 LOG4CPLUS_TRACE(m_logger,
"Poll()");
286void SchedulerImpl::ArchiveCompleted() {
288 for (
auto& active : m_active) {
293 auto id = active.daq->GetId();
295 auto archive_path = m_workspace.
ArchiveDaq(
id);
298 fmt::format(
"DAQ {} is in final state {} -> moved workspace to archive: {}",
300 active.daq->GetState(),
301 archive_path.native()));
307 m_queue.erase(std::find(m_queue.begin(), m_queue.end(),
id));
311 LOG4CPLUS_ERROR(m_logger,
312 "Failed to archive DAQ workspace:\n"
313 << error::NestedExceptionReporter(std::current_exception()));
318 auto remove_it = std::remove_if(
319 m_active.begin(), m_active.end(), [&](
auto const& active) { return !active.daq; });
320 if (remove_it != m_active.end()) {
321 m_active.erase(remove_it, m_active.end());
327void SchedulerImpl::ActivateFromQueue() {
330 if (m_queue.empty()) {
331 LOG4CPLUS_INFO(m_logger,
"Merge queue empty - all done!");
335 auto candidates = GetCandidates();
336 if (candidates.empty()) {
337 LOG4CPLUS_INFO(m_logger,
"All DAQ merge candidates are active/in-progress");
342 for (
auto const&
id : candidates) {
343 LOG4CPLUS_TRACE(m_logger, fmt::format(
"{}: Attempting to schedule DAQ",
id));
346 LOG4CPLUS_INFO(m_logger,
347 fmt::format(
"Limit reached: Cannot schedule '{}' "
348 "Currently active DAQs at/exceed limit. "
349 "current: {}, limit: {}, queue size: {}",
359 auto ws = m_workspace.
LoadDaq(
id);
360 auto daq_controller = m_daq_controller_factory(std::move(ws), m_resources);
365 daq_controller->GetStatus().ConnectStatus([&](ObservableStatus
const& s) {
371 m_status_signal(s.GetStatus());
374 daq_controller->Start();
376 m_active.emplace_back(std::move(daq_controller), std::move(*maybe_token));
378 LOG4CPLUS_DEBUG(m_logger, fmt::format(
"{}: DAQ scheduled for merging",
id));
382 std::throw_with_nested(
383 std::runtime_error(fmt::format(
"{}: Failed to activate DAQ for merging.",
id)));
387 std::throw_with_nested(std::runtime_error(
"Failed to load "));
391void SchedulerImpl::DeferredPoll() {
392 if (m_stopped || *m_liveness) {
397 [liveness = std::weak_ptr<bool>(m_liveness),
this]() {
398 if (auto ptr = liveness.lock(); ptr) {
408std::vector<std::string> SchedulerImpl::GetCandidates()
const {
410 std::vector<std::string> candidates;
411 for (
auto const&
id : m_queue) {
412 auto it = std::find_if(m_active.begin(), m_active.end(), [&](
auto const& active) {
414 return active.daq->GetId() == id;
416 if (it == m_active.end()) {
417 candidates.push_back(
id);
424 std::unique_ptr<DaqWorkspace> workspace,
429 : m_executor(executor)
430 , m_workspace(std::move(workspace))
431 , m_resources(resources)
432 , m_rsync_factory(std::move(rsync_factory))
433 , m_proc_factory(std::move(proc_factory))
434 , m_options(std::move(options))
435 , m_dpspec(m_workspace->LoadSpecification())
438 , m_status(m_workspace->LoadStatus())
439 , m_status_connection()
440 , m_liveness(std::make_shared<bool>(false))
444 throw std::invalid_argument(
"Specified merge application name is empty");
447 throw std::invalid_argument(
"Specified rsync application name is empty");
450 LOG4CPLUS_DEBUG(m_logger,
451 "Loaded status: " << m_status <<
"\nAlerts: " << m_status.
GetAlerts());
456 m_result = m_workspace->GetResultPath() /
480 throw std::runtime_error(
"Not implemented");
489 "DaqController: No change -> skipping write of workspace status file: {}",
493 LOG4CPLUS_TRACE(m_logger,
494 fmt::format(
"DaqController: Updating workspace status file: {}", status));
495 m_workspace->StoreStatus(status);
513 return m_status.
GetId();
532void DaqControllerImpl::DeferredPoll() {
533 if (m_stopped || *m_liveness) {
538 [liveness = std::weak_ptr<bool>(m_liveness),
this]() {
539 if (auto ptr = liveness.lock(); ptr) {
550 LOG4CPLUS_TRACE(m_logger,
"Poll()");
554 fmt::format(
"{}: Poll() - DaqController is stopped so nothing will be done", *
this));
558 std::visit([
this](
auto& state) {
Poll(state); }, m_state_ctx);
562 LOG4CPLUS_TRACE(m_logger,
"Poll(Scheduled)");
565 std::vector<json::FitsFileSource> sources;
569 for (
auto const& s : m_dpspec.
sources) {
570 if (!std::holds_alternative<json::FitsFileSource>(s)) {
573 sources.push_back(std::get<json::FitsFileSource>(s));
576 auto sources_path = m_workspace->GetSourcesPath();
578 SourceResolver resolver;
579 using boost::adaptors::indexed;
580 for (
auto const& it : sources | indexed(0u)) {
581 auto index = it.index();
582 auto const& s = it.value();
587 fmt::format(
"{}_{}_{}", index, s.source_name, location.path.filename().native());
588 LOG4CPLUS_INFO(m_logger,
589 fmt::format(
"Poll(Scheduled): Source file '{}' from source \"{}\" "
590 "on host \"{}\" will be stored in {}",
593 !location.host.empty() ? location.host.c_str() :
"<n/a>",
596 resolver.Add({s.source_name, s.location}, local_path);
600 m_workspace->StoreSourceLookup(resolver.GetMapping());
605 fmt::format(
"{}: Failed to collect and store list of required sources", m_status);
606 LOG4CPLUS_ERROR(m_logger, fmt::format(
"Poll(Scheduled): Failed to process DAQ: {}", msg));
609 std::throw_with_nested(std::runtime_error(msg));
614 LOG4CPLUS_TRACE(m_logger,
"Poll(Collecting)");
618 auto defer = ObservableStatus::DeferSignal(&m_status);
620 auto const& sources = ctx.resolver.GetMapping();
621 auto missing = sources.size();
622 auto root = m_workspace->GetPath();
623 for (
auto const& source : sources) {
624 auto dest = source.second;
625 if (ctx.HasTransfer(source.first)) {
630 if (m_workspace->Exists(dest)) {
633 fmt::format(
"Poll(Collecting): File exist -> won't start new transfer: {}",
643 LOG4CPLUS_DEBUG(m_logger,
644 "Poll(Collecting): Soft stop is enabled"
645 "-> won't start new transfer");
653 using Proc = std::shared_ptr<RsyncAsyncProcessIf>;
657 std::filesystem::path(dest).filename().replace_extension(
"rsync.log");
658 opts.logfile = m_workspace->GetPath() / m_workspace->GetLogsPath() / log_file;
660 location.RsyncPath(),
665 auto last_lines = std::make_unique<LogCaptureLast>(
669 proc->ConnectStderr(std::ref(*last_lines));
671 proc->Initiate().then(
673 [liveness = std::weak_ptr<bool>(m_liveness),
674 source = source.first,
677 last_lines = std::move(last_lines),
678 this](boost::future<int> f) {
679 if (liveness.expired()) {
680 LOG4CPLUS_ERROR(
"dpm.daqcontroller",
681 fmt::format(
"DaqController abandoned -> ignoring "
682 "result from rsync for transfer of {}",
686 TransferComplete(source, dest, std::move(f), *last_lines);
691 "{}: Started transfer: {} -> {}: {}", *
this, source.first, dest, *proc));
692 ctx.transfers.emplace_back(Collecting::Transfer{
693 source.first, source.second, std::move(proc), std::move(*token)});
696 LOG4CPLUS_TRACE(m_logger,
697 fmt::format(
"Poll(Collecting): Could not start transfer due to "
698 "resource limit reached: {}",
711 auto msg = fmt::format(
"{}: Failed to transfer required sources", m_status);
712 LOG4CPLUS_ERROR(m_logger, fmt::format(
"Poll(Collecting): Failed to process DAQ: {}", msg));
714 std::throw_with_nested(std::runtime_error(msg));
718void DaqControllerImpl::TransferComplete(SourceResolver::SourceFile
const& source,
719 std::filesystem::path
const& local_path,
720 boost::future<int> result,
721 LogCaptureLast
const& log)
noexcept {
722 LOG4CPLUS_TRACE(m_logger,
723 fmt::format(
"{}: TransferComplete: {} -> {}", *
this, source, local_path));
724 auto* ctx = std::get_if<Collecting>(&m_state_ctx);
727 ctx->EraseTransfer(source);
730 int return_code = result.get();
733 if (return_code != 0) {
734 auto msg = fmt::format(
735 "rsync file transfer failed for remote file {}. Last lines: \n{}", source, log);
737 LOG4CPLUS_ERROR(m_logger, msg);
738 throw std::runtime_error(msg);
742 LOG4CPLUS_INFO(m_logger,
743 fmt::format(
"{}: Poll(Collecting): Successfully transferred file: {} -> {}",
752 LOG4CPLUS_ERROR(m_logger,
753 fmt::format(
"{}: Poll(Collecting): Failed to transfer file: {} -> {}",
759 if (ctx && ctx->transfers.empty()) {
762 fmt::format(
"{}: Poll(Collecting): All pending transfers have completed "
763 "(with or without error) so we stop DAQ",
770bool DaqControllerImpl::Collecting::HasTransfer(
771 SourceResolver::SourceFile
const& source)
const noexcept {
772 auto it = std::find_if(transfers.cbegin(), transfers.cend(), [&](Transfer
const& t) {
773 return t.source == source;
775 return it != transfers.cend();
778DaqControllerImpl::Collecting::Transfer*
779DaqControllerImpl::Collecting::GetTransfer(SourceResolver::SourceFile
const& source)
noexcept {
780 auto it = std::find_if(
781 transfers.begin(), transfers.end(), [&](Transfer
const& t) { return t.source == source; });
783 if (it != transfers.end()) {
789void DaqControllerImpl::Collecting::EraseTransfer(SourceResolver::SourceFile
const& source) {
790 auto it = std::find_if(transfers.cbegin(), transfers.cend(), [&](Transfer
const& t) {
791 return t.source == source;
793 if (it != transfers.end()) {
798DaqControllerImpl::Collecting::~Collecting() noexcept {
799 for (
auto& transfer : transfers) {
800 if (transfer.proc && transfer.proc->IsRunning()) {
803 "DaqController::Collecting::~Collecting: Terminating running transfer process");
804 transfer.proc->Signal(SIGTERM);
809DaqControllerImpl::Merging::~Merging() noexcept {
810 if (merger && merger->IsRunning()) {
811 LOG4CPLUS_DEBUG(
"dpm",
"DaqController::Merging::~Merging: Aborting running merge process");
812 merger->Signal(SIGTERM);
816DaqControllerImpl::Releasing::~Releasing() noexcept {
817 for (
auto& p : transfers) {
818 if (p.second.proc && p.second.proc->IsRunning()) {
821 "DaqController::Releasing::~Releasing: Terminating running transfer process");
822 p.second.proc->Signal(SIGTERM);
828 LOG4CPLUS_TRACE(m_logger,
"Poll(Merging)");
829 auto defer = ObservableStatus::DeferSignal(&m_status);
836 if (m_workspace->Exists(m_result)) {
838 SetState(Completed{});
839 LOG4CPLUS_TRACE(m_logger,
840 fmt::format(
"{}: Poll(Merging): "
841 "Recovered from error automatically (manual merge)",
847 LOG4CPLUS_TRACE(m_logger,
848 fmt::format(
"{}: Poll(Merging): Could not start merging due to "
849 "resource limit reached: {}",
859 std::vector<std::string> args{m_options.
merge_bin};
860 args.emplace_back(
"--json");
861 args.emplace_back(
"--root");
862 args.emplace_back(m_workspace->GetPath());
863 args.emplace_back(
"--logfile");
864 args.emplace_back(m_workspace->GetLogsPath() /
"merge.log");
865 args.emplace_back(
"--resolver");
866 args.emplace_back(m_workspace->GetSourceLookupPath());
867 args.emplace_back(
"-o");
868 args.emplace_back(m_result.native());
870 args.emplace_back(m_workspace->GetPath() / m_workspace->GetSpecificationPath());
872 PrintArgs(m_logger, args);
879 ctx.merger->ConnectStdout([logger = log4cplus::Logger::getInstance(
LOGGER_NAME_MERGER),
880 this](pid_t pid, std::string
const& line) {
881 LOG4CPLUS_DEBUG(logger, pid <<
": " << Trim(line));
882 HandleMergeMessage(line);
884 ctx.merger->ConnectStderr([logger = log4cplus::Logger::getInstance(
LOGGER_NAME_MERGER)](
885 pid_t pid, std::string
const& line) {
886 LOG4CPLUS_INFO(logger, pid <<
": " << Trim(line));
889 ctx.merger->Initiate().then(
891 [liveness = std::weak_ptr<bool>(m_liveness),
892 id = m_status.
GetId(),
894 this](boost::future<int> f) {
895 if (liveness.expired()) {
897 fmt::format(
"{}: DaqController abandoned -> ignoring "
898 "result from merger",
902 MergeComplete(std::move(f));
904 ctx.token = std::move(*token);
907 auto msg = fmt::format(
"{}: Failed to initiate merging", *
this);
908 LOG4CPLUS_ERROR(m_logger, msg);
909 std::throw_with_nested(std::runtime_error(msg));
913void DaqControllerImpl::MergeComplete(boost::future<int> result)
noexcept {
914 LOG4CPLUS_TRACE(m_logger,
"MergeComplete()");
917 auto* ctx = std::get_if<Merging>(&m_state_ctx);
921 auto exit_code = result.get();
922 if (exit_code != 0) {
923 auto msg = fmt::format(
"Merging failed with code {}", exit_code);
925 throw std::runtime_error(fmt::format(
"Merging failed with code {}", exit_code));
927 if (!m_workspace->Exists(m_result)) {
928 auto abs_path = m_workspace->GetPath() / m_result;
930 fmt::format(
"Merging reported with success but file is not found: {}", abs_path);
932 LOG4CPLUS_ERROR(m_logger, msg);
933 throw std::runtime_error(msg);
938 m_workspace->MakeResultSymlink(m_result);
940 m_status.SetResult(m_result.native());
944 LOG4CPLUS_INFO(m_logger, fmt::format(
"{}: Merge completed successfully!", *
this));
951 "{}: PollMergeComplete: Failed to create data product: {}", *
this, m_result));
955bool DaqControllerImpl::TryStartRelease(
Releasing& ctx,
960 if (
auto* cfg = std::get_if<json::OlasReceiver>(&receiver); cfg) {
961 return TryStartRelease(ctx, *cfg, index);
965 fmt::format(
"Receiver[{}]: TryStartRelease: Unknown receiver type encountered.",
972bool DaqControllerImpl::TryStartRelease(
Releasing& ctx,
973 json::OlasReceiver
const& receiver,
977 fmt::format(
"Receiver[{}]: TryStartRelease: Will try to start transfer to receiver",
979 namespace fs = std::filesystem;
980 if (receiver.host.empty()) {
992 auto link = receiver.path / GetArchiveFilename();
993 if (std::error_code ec = m_workspace->MakeHardLink(m_result, link); !ec) {
999 fmt::format(
"Receiver[{}]: Hardlink created from {} -> {}", index, m_result, link));
1001 }
else if (receiver.options.allow_symlink && !m_workspace->MakeSymlink(m_result, link)) {
1007 fmt::format(
"Receiver[{}]: Symlink created from {} -> {}", index, m_result, link));
1013 fmt::format(
"Receiver[{}]: Hard or symlink could not be created from {} -> {}",
1023 LOG4CPLUS_TRACE(m_logger,
1024 fmt::format(
"Receiver[{}]: Could not start rsync transfer due to "
1025 "resource limit reached.",
1029 using Proc = std::shared_ptr<RsyncAsyncProcessIf>;
1033 auto const& path_from = m_result.native();
1034 auto path_to = receiver.path / GetArchiveFilename();
1035 auto location = receiver.host.empty() ? path_to.native()
1036 : fmt::format(
"{}:{}", receiver.host, path_to.native());
1037 opts.logfile = m_workspace->GetPath() / m_workspace->GetLogsPath() /
1038 fmt::format(
"{}_receiver.rsync.log", index);
1049 proc->ConnectStderr(std::ref(*last_lines));
1051 proc->Initiate().then(
1056 last_lines = std::move(last_lines),
1057 liveness = std::weak_ptr<bool>(m_liveness)](boost::future<int> f) {
1058 if (liveness.expired()) {
1059 LOG4CPLUS_ERROR(
"dpm.daqcontroller",
1060 fmt::format(
"Receiver[{}]: DaqController abandoned -> ignoring "
1061 "result from rsync for receiver transfer "
1067 ReleaseComplete(index, *proc, std::move(f), *last_lines);
1069 LOG4CPLUS_DEBUG(m_logger,
1070 fmt::format(
"{}: Receiver[{}] Started release: {}", *
this, index, *proc));
1074 ctx.transfers.emplace(index, Releasing::Transfer{std::move(proc), std::move(*maybe_token)});
1079 LOG4CPLUS_TRACE(m_logger,
"Poll(Releasing)");
1082 LOG4CPLUS_INFO(m_logger,
1083 "Poll(Releasing): No receivers configured -> transition to Completed");
1084 SetState(Completed{});
1089 auto defer = ObservableStatus::DeferSignal(&m_status);
1099 LOG4CPLUS_TRACE(m_logger,
1100 "Poll(Releasing): Number of receivers: " << m_dpspec.
receivers.size());
1102 for (
auto it = m_dpspec.
receivers.begin(); it != m_dpspec.
receivers.end(); ++it, ++index) {
1108 "Poll(Releasing): Receiver[" << index <<
"] Will try to start receiver.");
1110 TryStartRelease(ctx, *it, index);
1112 }
else if (!receiver_status.IsFinalState()) {
1113 LOG4CPLUS_TRACE(m_logger,
1114 "Poll(Releasing): Receiver["
1115 << index <<
"] already in progress: " << receiver_status.state);
1118 LOG4CPLUS_TRACE(m_logger,
1119 "Poll(Releasing): Receiver[" << index <<
"] already completed with "
1120 << receiver_status.state);
1124 if (pending == 0 && ctx.transfers.empty()) {
1128 "Poll(Releasing): No release transfers remains -> transition to Completed");
1129 SetState(Completed{});
1135 auto msg = fmt::format(
"{}: Failed to release ", m_status);
1136 LOG4CPLUS_ERROR(m_logger, fmt::format(
"Poll(Releasing): Failed to process DAQ: {}", msg));
1137 std::throw_with_nested(std::runtime_error(msg));
1141void DaqControllerImpl::ReleaseComplete(std::size_t index,
1142 AsyncProcessIf
const& proc,
1143 boost::future<int> result,
1144 LogCaptureLast
const& lines)
noexcept {
1145 auto& ctx = std::get<Releasing>(m_state_ctx);
1147 ctx.transfers.erase(index);
1148 auto alert = ObservableStatus::AlertActivator(
1151 auto code = result.get();
1156 "{}: Receiver[{}]: Process completed with rc={}: {}", *
this, index, code, proc));
1163 auto msg = fmt::format(
1164 "Receiver[{}]: Failed to release file using command: {}. Last lines:\n{}",
1169 LOG4CPLUS_ERROR(m_logger, *
this <<
' ' << msg);
1171 }
catch (std::exception
const& e) {
1172 auto msg = fmt::format(
1173 "Receiver[{}]: Failed to release file using command: {}. Last "
1174 "lines:\n{}\nException:\n{}",
1179 LOG4CPLUS_ERROR(m_logger, *
this <<
' ' << msg);
1188 LOG4CPLUS_TRACE(m_logger,
"Poll(Completed)");
1193void DaqControllerImpl::SetState(StateVariant s) {
1194 m_state_ctx = std::move(s);
1195 auto new_state = MakeState(m_state_ctx);
1196 if (new_state == m_status.
GetState()) {
1200 LOG4CPLUS_DEBUG(m_logger,
"New state: " << new_state <<
", Alerts: " << m_status.
GetAlerts());
1206State DaqControllerImpl::MakeState(StateVariant
const& s) {
1207 if (std::holds_alternative<Scheduled>(s)) {
1210 if (std::holds_alternative<Collecting>(s)) {
1213 if (std::holds_alternative<Merging>(s)) {
1216 if (std::holds_alternative<Releasing>(s)) {
1219 if (std::holds_alternative<Completed>(s)) {
1226DaqControllerImpl::Collecting::Collecting(SourceResolver resolver_arg)
1227 : resolver(std::move(resolver_arg)) {
1230void DaqControllerImpl::Merging::Reset() {
1235void DaqControllerImpl::HandleMergeMessage(std::string
const& line)
noexcept try {
1240 auto json = nlohmann::json::parse(line);
1241 auto const& type = json.at(
"type").get<std::string>();
1242 auto const& content = json.at(
"content");
1243 if (type ==
"alert") {
1248 std::string message;
1249 content.at(
"id").get_to(
id);
1250 content.at(
"message").get_to(message);
1252 m_status.SetAlert(
MakeAlert(alert_id, message));
1255 LOG4CPLUS_DEBUG(m_logger,
"Failed to parse JSON message from merger");
1258std::string DaqControllerImpl::GetArchiveFilename()
const {
1260 throw std::runtime_error(fmt::format(
"{}: DAQ status has no file_id!", m_status));
1262 return fmt::format(
"{}.fits", m_status.
GetFileId());
Stores data acquisition status and allows subscription to status changes.
State GetState() const noexcept
void SetReceiverStatus(std::size_t index, ReceiverStatus status)
Set receiver status.
Status const & GetStatus() const noexcept
Connect observer that is invoked when state is modified.
void ClearAlert(AlertId const &alert)
Clear alert.
void SetState(State s) noexcept
Set state of data acquisition.
std::string const & GetFileId() const noexcept
boost::signals2::connection ConnectStatus(SignalType::slot_type const &slot)
Connect observer that is invoked when state is modified.
ReceiverStatus GetReceiverStatus(std::size_t index) const noexcept
Get receiver status.
Alerts const & GetAlerts() const noexcept
bool HasError() const noexcept
unsigned ChangesSinceLastSignal() const noexcept
Query number of changes made since last signal.
std::string const & GetId() const noexcept
boost::signals2::connection Connect(Signal::slot_type const &slot)
Connect to signal that is emitted when a resource become available.
std::optional< ResourceToken > Acquire() noexcept
unsigned GetLimit() const noexcept
unsigned GetUsed() const noexcept
auto IsStopped() const noexcept -> bool override
auto GetState() const noexcept -> State override
auto GetId() const noexcept -> std::string const &override
void Start() override
Start/stop operations.
auto GetErrorFlag() const noexcept -> bool override
auto GetStatus() noexcept -> ObservableStatus &override
std::function< std::unique_ptr< AsyncProcessIf >(boost::asio::io_context &, std::vector< std::string > const &)> ProcFactory
std::function< std::unique_ptr< RsyncAsyncProcessIf >(boost::asio::io_context &, std::string const &, std::string const &, RsyncOptions const &, RsyncAsyncProcess::DryRun)> RsyncFactory
Controller for specific DAQ.
std::function< std::unique_ptr< DaqController >(std::unique_ptr< DaqWorkspace >, Resources &)> DaqControllerFactory
boost::signals2::connection ConnectStatus(StatusSignal::slot_type const &slot) override
Signals.
std::string QueueDaq(std::string const &dp_spec, std::string const &status) override
Queues DAQ for processing.
SchedulerImpl(rad::IoExecutor &executor, Workspace &workspace, DaqControllerFactory daq_controller_factory, SchedulerOptions const &options)
Constructs a scheduler loading information from workspace ws.
void AbortDaq(std::string const &) override
Abort merging DAQ identified by id.
std::vector< std::string > GetQueue() const noexcept override
Queries current DAQ queue.
void Start() override
Start/stop operations.
Status GetDaqStatus(std::string const &id) const override
Queries current DAQ status, possibly from last recorded status in workspace.
bool IsQueued(std::string const &id) const noexcept override
Queries if DAQ with ID has been queued before in the current workspace.
Provides location of fits source file.
Interface to interact with DPM workspace.
virtual void RemoveDaq(std::string const &daq_id)=0
Removes workspace and all containing files for DAQ without archiving it.
virtual auto ArchiveDaq(std::string const &daq_id) -> std::filesystem::path=0
Archives specified DAQ witout deleting any files, typically by moving it to a specific location in th...
virtual auto LoadDaq(std::string const &daq_id) -> std::unique_ptr< DaqWorkspace >=0
Loads a previously initialized DAQ workspace.
virtual auto InitializeDaq(std::string const &daq_id) -> std::unique_ptr< DaqWorkspace >=0
Initializes new DAQ Workspace.
virtual auto GetPath() const -> std::filesystem::path=0
virtual void StoreQueue(std::vector< std::string > const &queue) const =0
virtual auto LoadQueue() const -> std::vector< std::string >=0
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
boost::asio::io_context & get_io_context() noexcept
Not part of the boost::thread::executor concept.
Declares JSON support for serialization.
daq::dpm::Workspace interface and implementation declaration
Declaration of log4cplus helpers.
constexpr std::string_view RELEASING_RSYNC
Failure during rsync source copy.
constexpr std::string_view COLLECTING_RSYNC
Failure during rsync source copy.
constexpr std::string_view MERGING_MERGE
Merging failed.
std::ostream & operator<<(std::ostream &os, DaqController const &daq)
const std::string LOGGER_NAME_CONTROLLER
const std::string LOGGER_NAME_SCHEDULER
boost::signals2::scoped_connection daqs
const std::string LOGGER_NAME_MERGER
const std::string LOGGER_NAME_TRANSFER
Options for DaqController.
Options controlling scheduler operations.
std::variant< OlasReceiver > ReceiverTypes
Location ParseSourceLocation(std::string const &location_str)
Parse location string from DpSpec into component parts.
DpSpec ParseDpSpec(Json const &json)
Parse JSON to construct the DpSpec structure.
Target target
Describes target which will become the data produtc.
std::optional< FitsFileSource > source
ReceiverList receivers
Ordered container of receivers where to deliver the target data product.
std::vector< SourceTypes > sources
List of sources to create data product from.
std::string file_prefix
Optioal user chosen file prefix to make it easier to identify the produced file.
Close representation of the JSON structure but with stronger types.
AlertId MakeAlertId(std::string_view category, std::string key)
State
Observable states of the data acquisition process.
@ Completed
Completed DAQ.
@ Scheduled
daq is acknowledged by dpm and is scheduled for merging (i.e.
@ Releasing
Releasing Data Product to receivers.
@ Collecting
Input files are being collected.
@ Aborted
Data acquisition has been aborted by user.
@ Merging
DAQ is being merged.
@ AbortingMerging
Transitional state for aborting during merging.
bool IsFinalState(State state) noexcept
Query whether state is in a final state.
bool IsAbortableState(State state) noexcept
Query whether state is in an abortable state.
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
daq::dpm::Scheduler and related class declarations.
Non observable status object that keeps stores status of data acquisition.
std::string result
Path to resulting data product.
TimePoint timestamp
Timestamp of last update.