14 #include <boost/asio/post.hpp>
15 #include <boost/range/adaptor/indexed.hpp>
16 #include <fmt/format.h>
17 #include <fmt/ostream.h>
18 #include <log4cplus/loggingmacros.h>
28 void PrintArgs(log4cplus::Logger& logger, std::vector<std::string>
const& args) {
32 for (
auto const& token : args) {
40 LOG4CPLUS_DEBUG(logger,
"Executing merger with args: " << ss.str());
46 os <<
"DAQ{" <<
daq.GetStatus() <<
"}";
54 : m_executor(executor)
55 , m_workspace(workspace)
56 , m_daq_controller_factory(std::move(daq_controller_factory))
59 , m_liveness(std::make_shared<bool>(false)) {
61 auto slot = [liveness = std::weak_ptr<bool>(m_liveness),
this]() {
62 if (
auto ptr = liveness.lock(); ptr) {
67 auto& conns = m_resources_connections;
84 LOG4CPLUS_TRACE(m_logger,
"QueueDaq()");
86 auto json = nlohmann::json::parse(dp_spec_serialized);
88 std::string
const&
id = dp_spec.
id;
91 LOG4CPLUS_ERROR(m_logger,
"QueueDaq(): DAQ conflict detected -> aborting");
92 throw std::invalid_argument(
93 fmt::format(
"Scheduler: Could not queue DAQ for merging as "
94 "a Data Acquisition with same id has been queued before: '{}'",
99 LOG4CPLUS_INFO(m_logger,
100 fmt::format(
"QueueDaq(): Initializing new workspace for DAQ {}",
id));
106 initial_status.
id = id;
107 initial_status.
file_id = file_id;
110 initial_status.
error =
false;
111 initial_status.
timestamp = Status::TimePoint::clock::now();
113 daq_ws->StoreStatus(initial_status);
114 daq_ws->StoreSpecification(dp_spec_serialized);
117 m_queue.push_back(
id);
126 std::throw_with_nested(std::runtime_error(
"Failed to store DAQ queue"));
129 std::throw_with_nested(
130 std::runtime_error(
"Failed to write status to DAQ workspace"));
135 std::throw_with_nested(std::runtime_error(fmt::format(
136 "Failed to initialize DAQ workspace in {}", m_workspace.
GetPath().native())));
139 LOG4CPLUS_DEBUG(m_logger,
140 "Invalid Data Product Specification provided: \n"
141 << dp_spec_serialized);
143 std::throw_with_nested(
144 std::invalid_argument(
"Scheduler: Could not queue DAQ for merging because "
145 "Data Product Specification is invalid"));
146 }
catch (nlohmann::json::parse_error
const&) {
147 std::throw_with_nested(
148 std::invalid_argument(
"Scheduler: Could not queue DAQ for merging because provided "
149 "data product specification is invalid JSON"));
154 return std::find(m_queue.begin(), m_queue.end(),
id) != m_queue.end();
158 LOG4CPLUS_TRACE(m_logger, fmt::format(
"GetDaqStatus({})",
id));
161 auto maybe_status = m_workspace.LoadArchivedStatus(
id);
163 return *maybe_status;
165 LOG4CPLUS_INFO(m_logger, fmt::format(
"GetDaqStatus({}): No such ID",
id));
166 throw std::invalid_argument(fmt::format(
"DAQ with id='{}' not found",
id));
169 return m_workspace.LoadDaq(
id)->LoadStatus();
171 std::throw_with_nested(std::runtime_error(
"Scheduler: GetDaqStatus() failed"));
179 LOG4CPLUS_TRACE(m_logger, fmt::format(
"AbortDaq({})",
id));
185 LOG4CPLUS_INFO(m_logger, fmt::format(
"AbortDaq({}): No such ID",
id));
186 throw std::invalid_argument(fmt::format(
"DAQ with id='{}' not found",
id));
190 auto it = std::find_if(m_active.begin(), m_active.end(), [&
id](Active
const& active) {
192 return id == active.daq->GetId();
196 auto pub = [&, status = m_workspace.
LoadDaq(
id)->LoadStatus()](
State new_state)
mutable {
198 m_status_signal(status);
201 if (it != m_active.end()) {
202 auto state = it->daq->GetState();
204 LOG4CPLUS_DEBUG(m_logger,
205 fmt::format(
"AbortDaq({}): Cannot abort in state {}",
id, state));
210 fmt::format(
"AbortDaq({}): Erasing active DAQ currently in state {}",
id, state));
218 LOG4CPLUS_DEBUG(m_logger, fmt::format(
"AbortDaq({}): Removing DAQ from merge queue",
id));
219 m_queue.erase(std::find(m_queue.begin(), m_queue.end(),
id));
223 LOG4CPLUS_DEBUG(m_logger, fmt::format(
"AbortDaq({}): Removing DAQ workspace",
id));
231 LOG4CPLUS_INFO(m_logger, fmt::format(
"AbortDaq({}): Aborted and removed DAQ",
id));
233 std::throw_with_nested(std::runtime_error(
"Scheduler: AbortDaq() failed"));
237 return m_status_signal.connect(slot);
240 void SchedulerImpl::Poll() {
241 LOG4CPLUS_TRACE(m_logger,
"Poll()");
246 void SchedulerImpl::ArchiveCompleted() {
248 for (
auto& active : m_active) {
253 auto id = active.daq->GetId();
255 auto archive_path = m_workspace.
ArchiveDaq(
id);
258 fmt::format(
"DAQ {} is in final state {} -> moved workspace to archive: {}",
260 active.daq->GetState(),
261 archive_path.native()));
267 m_queue.erase(std::find(m_queue.begin(), m_queue.end(),
id));
271 LOG4CPLUS_ERROR(m_logger,
272 "Failed to archive DAQ workspace:\n"
273 << error::NestedExceptionReporter(std::current_exception()));
278 auto remove_it = std::remove_if(
279 m_active.begin(), m_active.end(), [&](
auto const& active) { return !active.daq; });
280 if (remove_it != m_active.end()) {
281 m_active.erase(remove_it, m_active.end());
287 void SchedulerImpl::ActivateFromQueue() {
290 if (m_queue.empty()) {
291 LOG4CPLUS_INFO(m_logger,
"Merge queue empty - all done!");
295 auto candidates = GetCandidates();
296 if (candidates.empty()) {
297 LOG4CPLUS_INFO(m_logger,
"All DAQ merge candidates are active/in-progress");
302 for (
auto const&
id : candidates) {
303 LOG4CPLUS_TRACE(m_logger, fmt::format(
"{}: Attempting to schedule DAQ",
id));
306 LOG4CPLUS_INFO(m_logger,
307 fmt::format(
"Limit reached: Cannot schedule '{}' "
308 "Currently active DAQs at/exceed limit. "
309 "current: {}, limit: {}, queue size: {}",
319 auto ws = m_workspace.
LoadDaq(
id);
320 auto daq_controller = m_daq_controller_factory(std::move(ws), m_resources);
325 daq_controller->GetStatus().ConnectStatus([&](ObservableStatus
const& s) {
331 m_status_signal(s.GetStatus());
334 daq_controller->Start();
336 m_active.emplace_back(std::move(daq_controller), std::move(*maybe_token));
338 LOG4CPLUS_DEBUG(m_logger, fmt::format(
"{}: DAQ scheduled for merging",
id));
342 std::throw_with_nested(
343 std::runtime_error(fmt::format(
"{}: Failed to activate DAQ for merging.",
id)));
347 std::throw_with_nested(std::runtime_error(
"Failed to load "));
351 void SchedulerImpl::DeferredPoll() {
352 if (m_stopped || *m_liveness) {
357 [liveness = std::weak_ptr<bool>(m_liveness),
this]() {
358 if (auto ptr = liveness.lock(); ptr) {
368 std::vector<std::string> SchedulerImpl::GetCandidates()
const {
370 std::vector<std::string> candidates;
371 for (
auto const&
id : m_queue) {
372 auto it = std::find_if(m_active.begin(), m_active.end(), [&](
auto const& active) {
374 return active.daq->GetId() == id;
376 if (it == m_active.end()) {
377 candidates.push_back(
id);
384 std::unique_ptr<DaqWorkspace> workspace,
389 : m_executor(executor)
390 , m_workspace(std::move(workspace))
391 , m_resources(resources)
392 , m_rsync_factory(std::move(rsync_factory))
393 , m_proc_factory(std::move(proc_factory))
394 , m_options(std::move(options))
395 , m_dpspec(m_workspace->LoadSpecification())
398 , m_status(m_workspace->LoadStatus())
399 , m_status_connection()
400 , m_liveness(std::make_shared<bool>(false))
404 throw std::invalid_argument(
"Specified merge application name is empty");
407 throw std::invalid_argument(
"Specified rsync application name is empty");
413 m_result = m_workspace->GetResultPath() /
438 throw std::runtime_error(
"Not implemented");
447 "DaqController: No change -> skipping write of workspace status file: {}",
451 LOG4CPLUS_TRACE(m_logger,
452 fmt::format(
"DaqController: Updating workspace status file: {}", status));
453 m_workspace->StoreStatus(status);
471 return m_status.
GetId();
490 void DaqControllerImpl::DeferredPoll() {
491 if (m_stopped || *m_liveness) {
496 [liveness = std::weak_ptr<bool>(m_liveness),
this]() {
497 if (auto ptr = liveness.lock(); ptr) {
508 LOG4CPLUS_TRACE(m_logger,
"Poll()");
512 fmt::format(
"{}: Poll() - DaqController is stopped so nothing will be done", *
this));
516 std::visit([
this](
auto& state) {
Poll(state); }, m_state_ctx);
520 LOG4CPLUS_TRACE(m_logger,
"Poll(Scheduled)");
523 std::vector<json::FitsFileSource> sources;
527 for (
auto const& s : m_dpspec.
sources) {
528 if (!std::holds_alternative<json::FitsFileSource>(s)) {
531 sources.push_back(std::get<json::FitsFileSource>(s));
534 auto sources_path = m_workspace->GetSourcesPath();
536 SourceResolver resolver;
538 for (
auto const& s : sources) {
543 fmt::format(
"{}_{}_{}", index, s.source_name, location.path.filename().native());
544 LOG4CPLUS_INFO(m_logger,
545 fmt::format(
"Poll(Scheduled): Source file '{}' from source \"{}\" "
546 "on host \"{}\" will be stored in {}",
549 !location.host.empty() ? location.host.c_str() :
"<n/a>",
552 resolver.Add({s.source_name, s.location}, local_path);
556 m_workspace->StoreSourceLookup(resolver.GetMapping());
561 fmt::format(
"{}: Failed to collect and store list of required sources", m_status);
562 LOG4CPLUS_ERROR(m_logger, fmt::format(
"Poll(Scheduled): Failed to process DAQ: {}", msg));
565 std::throw_with_nested(std::runtime_error(msg));
570 LOG4CPLUS_TRACE(m_logger,
"Poll(Collecting)");
575 auto const& sources = ctx.resolver.GetMapping();
576 auto missing = sources.size();
577 auto root = m_workspace->GetPath();
578 for (
auto const& source : sources) {
579 if (ctx.HasTransfer(source.first)) {
584 if (m_workspace->Exists(source.second)) {
591 LOG4CPLUS_TRACE(m_logger,
592 "Poll(Collecting): Soft stop is enabled"
593 "-> won't start new transfer");
601 using Proc = std::shared_ptr<RsyncAsyncProcessIf>;
605 location.RsyncPath(),
606 root / source.second,
610 auto last_lines = std::make_unique<LogCaptureLast>(
614 proc->ConnectStderr(std::ref(*last_lines));
616 proc->Initiate().then(
618 [liveness = std::weak_ptr<bool>(m_liveness),
619 source = source.first,
620 dest = source.second,
622 last_lines = std::move(last_lines),
623 this](boost::future<int> f) {
624 if (liveness.expired()) {
625 LOG4CPLUS_ERROR(
"dpm.daqcontroller",
626 fmt::format(
"DaqController abandoned -> ignoring "
627 "result from rsync for transfer of {}",
631 TransferComplete(source, dest, std::move(f), *last_lines);
633 ctx.transfers.emplace_back(Collecting::Transfer{
634 source.first, source.second, std::move(proc), std::move(*token)});
637 LOG4CPLUS_TRACE(m_logger,
638 fmt::format(
"Poll(Collecting): Could not start transfer due to "
639 "resource limit reached: {}",
653 auto msg = fmt::format(
"{}: Failed to transfer required sources", m_status);
654 LOG4CPLUS_ERROR(m_logger,
655 fmt::format(
"Poll(Collecting): Failed to process DAQ: {}", msg));
657 std::throw_with_nested(std::runtime_error(msg));
661 void DaqControllerImpl::TransferComplete(SourceResolver::SourceFile
const& source,
662 std::filesystem::path
const& local_path,
663 boost::future<int> result,
664 LogCaptureLast
const& log) noexcept {
665 LOG4CPLUS_TRACE(m_logger,
666 fmt::format(
"{}: TransferComplete: {} -> {}", *
this, source, local_path));
667 auto* ctx = std::get_if<Collecting>(&m_state_ctx);
672 ctx->EraseTransfer(source);
675 int return_code = result.get();
677 if (return_code != 0) {
680 fmt::format(
"rsync file transfer failed for remote file {}. Last lines: \n{}",
683 LOG4CPLUS_ERROR(m_logger, fmt::format(
"rync file transfer failed: {}", source));
684 throw std::runtime_error(
"rsync failed to transfer file");
687 m_status.ClearAlert(alert_id);
688 LOG4CPLUS_INFO(m_logger,
689 fmt::format(
"{}: Poll(Collecting): Successfully transfer file: {} -> {}",
699 LOG4CPLUS_ERROR(m_logger,
700 fmt::format(
"{}: Poll(Collecting): Failed to transfer file: {} -> {}",
706 if (ctx && ctx->transfers.empty()) {
709 fmt::format(
"{}: Poll(Collecting): All pending transfers have completed "
710 "(with or without error) so we stop DAQ",
717 bool DaqControllerImpl::Collecting::HasTransfer(
718 SourceResolver::SourceFile
const& source)
const noexcept {
719 auto it = std::find_if(transfers.cbegin(), transfers.cend(), [&](Transfer
const& t) {
720 return t.source == source;
722 return it != transfers.cend();
725 DaqControllerImpl::Collecting::Transfer*
726 DaqControllerImpl::Collecting::GetTransfer(SourceResolver::SourceFile
const& source) noexcept {
727 auto it = std::find_if(
728 transfers.begin(), transfers.end(), [&](Transfer
const& t) { return t.source == source; });
730 if (it != transfers.end()) {
736 void DaqControllerImpl::Collecting::EraseTransfer(SourceResolver::SourceFile
const& source) {
737 auto it = std::find_if(transfers.cbegin(), transfers.cend(), [&](Transfer
const& t) {
738 return t.source == source;
740 if (it != transfers.end()) {
745 DaqControllerImpl::Collecting::~Collecting() noexcept {
746 for (
auto& transfer : transfers) {
747 if (transfer.proc && transfer.proc->IsRunning()) {
750 "DaqController::Collecting::~Collecting: Terminating running transfer process");
751 transfer.proc->Signal(SIGTERM);
756 DaqControllerImpl::Merging::~Merging() noexcept {
757 if (merger && merger->IsRunning()) {
758 LOG4CPLUS_DEBUG(
"dpm",
"DaqController::Merging::~Merging: Aborting running merge process");
759 merger->Signal(SIGTERM);
763 DaqControllerImpl::Releasing::~Releasing() noexcept {
764 for (
auto& p : transfers) {
765 if (p.second.proc && p.second.proc->IsRunning()) {
768 "DaqController::Releasing::~Releasing: Terminating running transfer process");
769 p.second.proc->Signal(SIGTERM);
775 LOG4CPLUS_TRACE(m_logger,
"Poll(Merging)");
783 if (m_workspace->Exists(m_result)) {
784 SetState(Completed{});
785 LOG4CPLUS_TRACE(m_logger,
786 fmt::format(
"{}: Poll(Merging): "
787 "Recovered from error automatically (manual merge)",
793 LOG4CPLUS_TRACE(m_logger,
794 fmt::format(
"{}: Poll(Merging): Could not start merging due to "
795 "resource limit reached: {}",
805 std::vector<std::string> args{m_options.
merge_bin};
806 args.emplace_back(
"--json");
807 args.emplace_back(
"--root");
808 args.emplace_back(m_workspace->GetPath());
809 args.emplace_back(
"--resolver");
810 args.emplace_back(m_workspace->GetSourceLookupPath());
811 args.emplace_back(
"-o");
812 args.emplace_back(m_result.native());
814 args.emplace_back(m_workspace->GetPath() / m_workspace->GetSpecificationPath());
816 PrintArgs(m_logger, args);
820 ctx.merger->ConnectStdout([logger = log4cplus::Logger::getInstance(
LOGGER_NAME_MERGER),
821 this](pid_t pid, std::string
const& line) {
822 LOG4CPLUS_DEBUG(logger, pid <<
": " << Trim(line));
823 HandleMergeMessage(line);
825 ctx.merger->ConnectStderr([logger = log4cplus::Logger::getInstance(
LOGGER_NAME_MERGER)](
826 pid_t pid, std::string
const& line) {
827 LOG4CPLUS_INFO(logger, pid <<
": " << Trim(line));
830 ctx.merger->Initiate().then(
832 [liveness = std::weak_ptr<bool>(m_liveness),
833 id = m_status.
GetId(),
835 this](boost::future<int> f) {
836 if (liveness.expired()) {
838 fmt::format(
"{}: DaqController abandoned -> ignoring "
839 "result from merger",
843 MergeComplete(std::move(f));
845 ctx.token = std::move(*token);
848 auto msg = fmt::format(
"{}: Failed to initiate merging", *
this);
849 LOG4CPLUS_ERROR(m_logger, msg);
850 std::throw_with_nested(std::runtime_error(msg));
854 void DaqControllerImpl::MergeComplete(boost::future<int> result) noexcept {
855 LOG4CPLUS_TRACE(m_logger,
"MergeComplete()");
860 auto* ctx = std::get_if<Merging>(&m_state_ctx);
864 auto exit_code = result.get();
865 if (exit_code != 0) {
866 auto msg = fmt::format(
"Merging failed with code {}", exit_code);
867 m_status.SetAlert(
MakeAlert(alert_id, msg));
868 throw std::runtime_error(fmt::format(
"Merging failed with code {}", exit_code));
870 if (!m_workspace->Exists(m_result)) {
871 auto abs_path = m_workspace->GetPath() / m_result;
873 fmt::format(
"Merging reported with success but file is not found: {}", abs_path);
874 m_status.SetAlert(
MakeAlert(alert_id, msg));
875 LOG4CPLUS_ERROR(m_logger, msg);
876 throw std::runtime_error(msg);
878 m_status.ClearAlert(alert_id);
881 m_workspace->MakeResultSymlink(m_result);
883 m_status.SetResult(m_result.native());
887 LOG4CPLUS_INFO(m_logger, fmt::format(
"{}: Merge completed successfully!", *
this));
888 SetState(Releasing{}, m_status.GetError());
895 "{}: PollMergeComplete: Failed to create data product: {}", *
this, m_result));
899 bool DaqControllerImpl::TryStartRelease(Releasing& ctx,
904 if (
auto* cfg = std::get_if<json::OlasReceiver>(&receiver); cfg) {
905 return TryStartRelease(ctx, *cfg, index);
909 fmt::format(
"Receiver[{}]: TryStartRelease: Unknown receiver type encountered.",
916 bool DaqControllerImpl::TryStartRelease(Releasing& ctx,
917 json::OlasReceiver
const& receiver,
921 fmt::format(
"Receiver[{}]: TryStartRelease: Will try to start transfer to receiver",
923 namespace fs = std::filesystem;
924 if (receiver.host.empty()) {
936 auto link = receiver.path / GetArchiveFilename();
937 if (std::error_code ec = m_workspace->MakeHardLink(m_result, link); !ec) {
943 fmt::format(
"Receiver[{}]: Hardlink created from {} -> {}", index, m_result, link));
945 }
else if (receiver.options.allow_symlink && !m_workspace->MakeSymlink(m_result, link)) {
951 fmt::format(
"Receiver[{}]: Symlink created from {} -> {}", index, m_result, link));
957 fmt::format(
"Receiver[{}]: Hard or symlink could not be created from {} -> {}",
967 LOG4CPLUS_TRACE(m_logger,
968 fmt::format(
"Receiver[{}]: Could not start rsync transfer due to "
969 "resource limit reached.",
973 using Proc = std::shared_ptr<RsyncAsyncProcessIf>;
977 auto const& path_from = m_result.native();
978 auto path_to = receiver.path / GetArchiveFilename();
979 auto location = receiver.host.empty() ? path_to.native()
980 : fmt::format(
"{}:{}", receiver.host, path_to.native());
991 proc->ConnectStderr(std::ref(*last_lines));
993 proc->Initiate().then(
998 last_lines = std::move(last_lines),
999 liveness = std::weak_ptr<bool>(m_liveness)](boost::future<int> f) {
1000 if (liveness.expired()) {
1001 LOG4CPLUS_ERROR(
"dpm.daqcontroller",
1002 fmt::format(
"Receiver[{}]: DaqController abandoned -> ignoring "
1003 "result from rsync for receiver transfer "
1009 ReleaseComplete(index, *proc, std::move(f), *last_lines);
1014 ctx.transfers.emplace(index, Releasing::Transfer{std::move(proc), std::move(*maybe_token)});
1019 LOG4CPLUS_TRACE(m_logger,
"Poll(Releasing)");
1022 LOG4CPLUS_INFO(m_logger,
1023 "Poll(Releasing): No receivers configured -> transition to Completed");
1024 SetState(Completed{});
1039 LOG4CPLUS_TRACE(m_logger,
1040 "Poll(Releasing): Number of receivers: " << m_dpspec.
receivers.size());
1042 for (
auto it = m_dpspec.
receivers.begin(); it != m_dpspec.
receivers.end(); ++it, ++index) {
1048 "Poll(Releasing): Receiver[" << index <<
"] Will try to start receiver.");
1050 TryStartRelease(ctx, *it, index);
1052 }
else if (!receiver_status.IsFinalState()) {
1053 LOG4CPLUS_TRACE(m_logger,
1054 "Poll(Releasing): Receiver["
1055 << index <<
"] already in progress: " << receiver_status.state);
1058 LOG4CPLUS_TRACE(m_logger,
1059 "Poll(Releasing): Receiver[" << index <<
"] already completed with "
1060 << receiver_status.state);
1064 if (pending == 0 && ctx.transfers.empty()) {
1068 "Poll(Releasing): No release transfers remains -> transition to Completed");
1069 SetState(Completed{}, m_status.
GetError());
1075 auto msg = fmt::format(
"{}: Failed to release ", m_status);
1076 LOG4CPLUS_ERROR(m_logger, fmt::format(
"Poll(Releasing): Failed to process DAQ: {}", msg));
1077 std::throw_with_nested(std::runtime_error(msg));
1081 void DaqControllerImpl::ReleaseComplete(std::size_t index,
1082 AsyncProcessIf
const& proc,
1083 boost::future<int> result,
1084 LogCaptureLast
const& lines) noexcept {
1086 auto& ctx = std::get<Releasing>(m_state_ctx);
1088 ctx.transfers.erase(index);
1090 auto code = result.get();
1095 "{}: Receiver[{}]: Process completed with rc={}: {}", *
this, index, code, proc));
1099 m_status.ClearAlert(alert_id);
1104 auto msg = fmt::format(
1105 "Failed to release file using command: {}. Last lines:\n{}", proc, lines);
1106 m_status.SetAlert(
MakeAlert(alert_id, msg));
1109 fmt::format(
"{}: Receiver[{}]: ReleaseComplete(): Failed to release file:\n{}",
1114 }
catch (std::exception
const& e) {
1119 fmt::format(
"{}: Receiver[{}]: ReleaseComplete(): Failed to release file ({}): ",
1130 LOG4CPLUS_TRACE(m_logger,
"Poll(Completed)");
1135 void DaqControllerImpl::SetState(StateVariant s,
bool error) {
1136 m_state_ctx = std::move(s);
1137 auto new_state =
MakeState(m_state_ctx);
1147 void DaqControllerImpl::SetError(
bool error) {
1159 if (std::holds_alternative<Scheduled>(s)) {
1162 if (std::holds_alternative<Collecting>(s)) {
1165 if (std::holds_alternative<Merging>(s)) {
1168 if (std::holds_alternative<Releasing>(s)) {
1171 if (std::holds_alternative<Completed>(s)) {
1178 DaqControllerImpl::Collecting::Collecting(SourceResolver resolver_arg)
1179 : resolver(std::move(resolver_arg)) {
1182 void DaqControllerImpl::Merging::Reset() {
1187 void DaqControllerImpl::HandleMergeMessage(std::string
const& line) noexcept
try {
1192 auto json = nlohmann::json::parse(line);
1193 auto const& type = json.at(
"type").get<std::string>();
1194 auto const& content = json.at(
"content");
1195 if (type ==
"alert") {
1200 std::string message;
1201 content.at(
"id").get_to(
id);
1202 content.at(
"message").get_to(message);
1204 m_status.SetAlert(
MakeAlert(alert_id, message));
1207 LOG4CPLUS_DEBUG(m_logger,
"Failed to parse JSON message from merger");
1210 std::string DaqControllerImpl::GetArchiveFilename()
const {
1212 throw std::runtime_error(fmt::format(
"{}: DAQ status has no file_id!", m_status));
1214 return fmt::format(
"{}.fits", m_status.
GetFileId());
Stores data acquisition status and allows subscription to status changes.
State GetState() const noexcept
void SetError(bool error) noexcept
Set error flag for data acquisition.
void SetReceiverStatus(std::size_t index, ReceiverStatus status)
Set receiver status.
Status const & GetStatus() const noexcept
Connect observer that is invoked when state is modified.
void SetState(State s, std::optional< bool > error=std::nullopt) noexcept
Set state of data acquisition.
std::string const & GetFileId() const noexcept
bool GetError() const noexcept
boost::signals2::connection ConnectStatus(SignalType::slot_type const &slot)
Connect observer that is invoked when state is modified.
ReceiverStatus GetReceiverStatus(std::size_t index) const noexcept
Get receiver status.
unsigned ChangesSinceLastSignal() const noexcept
Query number of changes made since last signal.
std::string const & GetId() const noexcept
boost::signals2::connection Connect(Signal::slot_type const &slot)
Connect to signal that is emitted when a resource become available.
std::optional< ResourceToken > Acquire() noexcept
unsigned GetLimit() const noexcept
unsigned GetUsed() const noexcept
auto IsStopped() const noexcept -> bool override
auto GetState() const noexcept -> State override
auto GetId() const noexcept -> std::string const &override
void Start() override
Start/stop operations.
auto GetErrorFlag() const noexcept -> bool override
auto GetStatus() noexcept -> ObservableStatus &override
std::function< std::unique_ptr< AsyncProcessIf >(boost::asio::io_context &, std::vector< std::string > const &)> ProcFactory
std::function< std::unique_ptr< RsyncAsyncProcessIf >(boost::asio::io_context &, std::string const &, std::string const &, RsyncOptions const &, RsyncAsyncProcess::DryRun)> RsyncFactory
Controller for specific DAQ.
std::function< std::unique_ptr< DaqController >(std::unique_ptr< DaqWorkspace >, Resources &)> DaqControllerFactory
std::string QueueDaq(std::string const &dp_spec) override
Queues DAQ for processing.
boost::signals2::connection ConnectStatus(StatusSignal::slot_type const &slot) override
Signals.
SchedulerImpl(rad::IoExecutor &executor, Workspace &workspace, DaqControllerFactory daq_controller_factory, SchedulerOptions const &options)
Constructs a scheduler loading information from workspace ws.
void AbortDaq(std::string const &) override
Abort merging DAQ identified by id.
std::vector< std::string > GetQueue() const noexcept override
Queries current DAQ queue.
void Start() override
Start/stop operations.
Status GetDaqStatus(std::string const &id) const override
Queries current DAQ status, possibly from last recorded status in workspace.
bool IsQueued(std::string const &id) const noexcept override
Queries if DAQ with ID has been queued before in the current workspace.
Provides location of fits source file.
Interface to interact with DPM workspace.
virtual void RemoveDaq(std::string const &daq_id)=0
Removes workspace and all containing files for DAQ without archiving it.
virtual auto ArchiveDaq(std::string const &daq_id) -> std::filesystem::path=0
Archives specified DAQ witout deleting any files, typically by moving it to a specific location in th...
virtual auto LoadDaq(std::string const &daq_id) -> std::unique_ptr< DaqWorkspace >=0
Loads a previously initialized DAQ workspace.
virtual auto InitializeDaq(std::string const &daq_id) -> std::unique_ptr< DaqWorkspace >=0
Initializes new DAQ Workspace.
virtual auto GetPath() const -> std::filesystem::path=0
virtual void StoreQueue(std::vector< std::string > const &queue) const =0
virtual auto LoadQueue() const -> std::vector< std::string >=0
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
boost::asio::io_context & get_io_context() noexcept
Not part of the boost::thread::executor concept.
daq::dpm::Workspace interface and implementation declaration
Declaration of log4cplus helpers.
constexpr std::string_view RELEASING_RSYNC
Failure during rsync source copy.
constexpr std::string_view COLLECTING_RSYNC
Failure during rsync source copy.
constexpr std::string_view MERGING_MERGE
Merging failed.
std::ostream & operator<<(std::ostream &os, DaqController const &daq)
const std::string LOGGER_NAME_CONTROLLER
const std::string LOGGER_NAME_SCHEDULER
boost::signals2::scoped_connection daqs
const std::string LOGGER_NAME_MERGER
const std::string LOGGER_NAME_TRANSFER
Options for DaqController.
Options controlling scheduler operations.
std::variant< OlasReceiver > ReceiverTypes
Location ParseSourceLocation(std::string const &location_str)
Parse location string from DpSpec into component parts.
DpSpec ParseDpSpec(Json const &json)
Parse JSON to construct the DpSpec structure.
Target target
Describes target which will become the data produtc.
std::optional< FitsFileSource > source
ReceiverList receivers
Ordered container of receivers where to deliver the target data product.
std::vector< SourceTypes > sources
List of sources to create data product from.
std::string file_prefix
Optioal user chosen file prefix to make it easier to identify the produced file.
Close representation of the JSON structure but with stronger types.
AlertId MakeAlertId(std::string_view category, std::string key)
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
State
Observable states of the data acquisition process.
@ Completed
Completed DAQ.
@ Scheduled
daq is acknowledged by dpm and is scheduled for merging (i.e.
@ Releasing
Releasing Data Product to receivers.
@ Collecting
Input files are being collected.
@ Aborted
Data acquisition has been aborted by user.
@ Merging
DAQ is being merged.
@ AbortingMerging
Transitional state for aborting during merging.
bool IsFinalState(State state) noexcept
Query whether state is in a final state.
bool IsAbortableState(State state) noexcept
Query whether state is in an abortable state.
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
daq::dpm::Scheduler and related class declarations.
Non observable status object that keeps stores status of data acquisition.
std::string result
Path to resulting data product.
TimePoint timestamp
Timestamp of last update.