ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
scheduler.cpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup daq_dpm
4 * @copyright (c) Copyright ESO 2022
5 * All Rights Reserved
6 * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7 *
8 * @brief daq::dpm::Scheduler implementation
9 */
10#include <daq/dpm/scheduler.hpp>
11
12#include <algorithm>
13
14#include <boost/asio/post.hpp>
15#include <boost/range/adaptor/indexed.hpp>
16#include <fmt/format.h>
17#include <fmt/std.h>
18#include <log4cplus/loggingmacros.h>
19
20#include <daq/dpm/config.hpp>
21#include <daq/dpm/workspace.hpp>
22#include <daq/error/report.hpp>
23#include <daq/json.hpp>
24#include <daq/log4cplus.hpp>
25
26namespace daq::dpm {
27
28namespace {
29void PrintArgs(log4cplus::Logger& logger, std::vector<std::string> const& args) {
30 std::stringstream ss;
31 ss << "{";
32 bool first = true;
33 for (auto const& token : args) {
34 if (!first) {
35 ss << ", ";
36 }
37 ss << token;
38 first = false;
39 }
40 ss << "}";
41 LOG4CPLUS_DEBUG(logger, "Executing merger with args: " << ss.str());
42}
43
44void CheckConsistency(Status const& status, json::DpSpec const& spec) {
45 if (status.id != spec.id) {
46 throw std::invalid_argument(
47 fmt::format("Scheduler: Could not queue DAQ for merging as "
48 "provided status does not match provided data product specification "
49 "w.r.t DAQ Id: {} vs {}",
50 status.id,
51 spec.id));
52 }
53 if (status.file_id != spec.target.file_id) {
54 throw std::invalid_argument(
55 fmt::format("Scheduler: Could not queue DAQ for merging as "
56 "provided status does not match provided data product specification "
57 "w.r.t DAQ File Id: {} vs {}",
58 status.file_id,
59 spec.target.file_id));
60 }
61}
62
63auto ParseDpSpec(log4cplus::Logger const& logger, std::string const& spec) -> json::DpSpec try {
64 return json::ParseDpSpec(nlohmann::json::parse(spec));
65} catch (json::DpSpecError const&) {
66 LOG4CPLUS_ERROR(logger, "Invalid Data Product Specification provided: \n" << spec);
67
68 std::throw_with_nested(
69 std::invalid_argument("Scheduler: Could not queue DAQ for merging because "
70 "Data Product Specification is invalid"));
71} catch (nlohmann::json::parse_error const&) {
72 LOG4CPLUS_ERROR(logger, "Invalid Data Product Specification provided: \n" << spec);
73 std::throw_with_nested(
74 std::invalid_argument("Scheduler: Could not queue DAQ for merging because provided "
75 "data product specification is invalid JSON"));
76}
77
78auto ParseStatus(log4cplus::Logger const& logger, std::string const& status)
79 -> std::optional<Status> try {
80 if (status.empty()) {
81 return std::nullopt;
82 }
83 auto json = nlohmann::json::parse(status);
84 return json.get<Status>();
85} catch (nlohmann::json::parse_error const&) {
86 std::throw_with_nested(
87 std::invalid_argument("Scheduler: Could not queue DAQ for merging because provided "
88 "DAQ status is invalid JSON"));
89}
90
91} // namespace
92
93std::ostream& operator<<(std::ostream& os, DaqController const& daq) {
94 os << "DAQ{" << daq.GetStatus() << "}";
95 return os;
96}
97
99 Workspace& workspace,
100 DaqControllerFactory daq_controller_factory,
101 SchedulerOptions const& options)
102 : m_executor(executor)
103 , m_workspace(workspace)
104 , m_daq_controller_factory(std::move(daq_controller_factory))
105 , m_options(options)
106 , m_logger(log4cplus::Logger::getInstance(LOGGER_NAME_SCHEDULER))
107 , m_liveness(std::make_shared<bool>(false)) {
108 m_queue = m_workspace.LoadQueue();
109 auto slot = [liveness = std::weak_ptr<bool>(m_liveness), this]() {
110 if (auto ptr = liveness.lock(); ptr) {
111 // Scheduler still alive
112 DeferredPoll();
113 }
114 };
115 auto& conns = m_resources_connections;
116 conns.daqs = m_resources.daqs.Connect(slot);
117 conns.net_receive = m_resources.net_receive.Connect(slot);
118 conns.merge = m_resources.merge.Connect(slot);
119 conns.net_send = m_resources.net_send.Connect(slot);
120}
121
123 m_stopped = false;
124 DeferredPoll();
125}
126
128 m_stopped = true;
129}
130
131std::string SchedulerImpl::QueueDaq(std::string const& dp_spec_serialized,
132 std::string const& status_serialized) {
133 LOG4CPLUS_TRACE(m_logger, "QueueDaq()");
134 try {
135 json::DpSpec dp_spec = ParseDpSpec(m_logger, dp_spec_serialized);
136 Status initial_status = ParseStatus(m_logger, status_serialized)
137 .value_or(Status(dp_spec.id, dp_spec.target.file_id));
138 CheckConsistency(initial_status, dp_spec);
139 std::string const& id = dp_spec.id;
140 if (IsQueued(id)) {
141 LOG4CPLUS_ERROR(m_logger, "QueueDaq(): DAQ conflict detected -> aborting");
142 throw std::invalid_argument(
143 fmt::format("Scheduler: Could not queue DAQ for merging as "
144 "a Data Acquisition with same id has been queued before: '{}'",
145 id));
146 }
147 // New daq for this workspace -> Initialize persistent state.
148 try {
149 LOG4CPLUS_INFO(m_logger,
150 fmt::format("QueueDaq(): Initializing new workspace for DAQ {}", id));
151 auto daq_ws = m_workspace.InitializeDaq(id);
152 assert(daq_ws);
153 try {
154 assert(daq_ws);
155 // Initial state in DPM is Scheduled as that's where DPM takes over.
156 initial_status.state = State::Scheduled;
157 initial_status.timestamp = Status::TimePoint::clock::now();
158
159 daq_ws->StoreStatus(initial_status);
160 daq_ws->StoreSpecification(dp_spec_serialized);
161
162 try {
163 m_queue.push_back(id);
164 // Finally update the backlog
165 m_workspace.StoreQueue(m_queue);
166
167 DeferredPoll();
168 return id;
169 } catch (...) {
170 // Undo push
171 m_queue.pop_back();
172 std::throw_with_nested(std::runtime_error("Failed to store DAQ queue"));
173 }
174 } catch (...) {
175 std::throw_with_nested(
176 std::runtime_error("Failed to write status to DAQ workspace"));
177 }
178 } catch (...) {
179 // Undo creation of DAQ workspace
180 m_workspace.RemoveDaq(id);
181 std::throw_with_nested(std::runtime_error(fmt::format(
182 "Failed to initialize DAQ workspace in {}", m_workspace.GetPath().native())));
183 }
184 } catch (std::invalid_argument const& e) {
185 LOG4CPLUS_ERROR(m_logger, "Failed to queue DAQ: " << e.what());
186 std::throw_with_nested(std::invalid_argument("Scheduler: Failed to queue DAQ"));
187 } catch (std::exception const& e) {
188 LOG4CPLUS_ERROR(m_logger, "Failed to queue DAQ: " << e.what());
189 std::throw_with_nested(std::runtime_error("Scheduler: Failed to queue DAQ"));
190 }
191}
192
193bool SchedulerImpl::IsQueued(std::string const& id) const noexcept {
194 return std::find(m_queue.begin(), m_queue.end(), id) != m_queue.end();
195}
196
197Status SchedulerImpl::GetDaqStatus(std::string const& id) const try {
198 LOG4CPLUS_TRACE(m_logger, fmt::format("GetDaqStatus({})", id));
199 if (!IsQueued(id)) {
200 // Try to find it in the archive.
201 auto maybe_status = m_workspace.LoadArchivedStatus(id);
202 if (maybe_status) {
203 return *maybe_status;
204 } else {
205 LOG4CPLUS_INFO(m_logger, fmt::format("GetDaqStatus({}): No such ID", id));
206 throw std::invalid_argument(fmt::format("DAQ with id='{}' not found", id));
207 }
208 }
209 return m_workspace.LoadDaq(id)->LoadStatus();
210} catch (...) {
211 std::throw_with_nested(std::runtime_error("Scheduler: GetDaqStatus() failed"));
212}
213
214std::vector<std::string> SchedulerImpl::GetQueue() const noexcept {
215 return m_queue;
216}
217
218void SchedulerImpl::AbortDaq(std::string const& id) try {
219 LOG4CPLUS_TRACE(m_logger, fmt::format("AbortDaq({})", id));
220 // - If DAQ is either being released or is already in final state it cannot be aborted.
221 // - If DAQ is active must first be made inactive by destroying DaqController which will also
222 // terminate started processes.
223 // - If DAQ is not active aborting simply means to erase DAQ from filesystem and queue.
224 if (!IsQueued(id)) {
225 LOG4CPLUS_INFO(m_logger, fmt::format("AbortDaq({}): No such ID", id));
226 throw std::invalid_argument(fmt::format("DAQ with id='{}' not found", id));
227 }
228
229 // If DAQ is active/in-progress we must first delete it
230 auto it = std::find_if(m_active.begin(), m_active.end(), [&id](Active const& active) {
231 assert(active.daq);
232 return id == active.daq->GetId();
233 });
234
235 // Status to update with final aborted state
236 auto pub = [&, status = m_workspace.LoadDaq(id)->LoadStatus()](State new_state) mutable {
237 status.state = State::AbortingMerging;
238 m_status_signal(status);
239 };
240
241 if (it != m_active.end()) {
242 auto state = it->daq->GetState();
243 if (IsAbortableState(state)) {
244 LOG4CPLUS_DEBUG(m_logger,
245 fmt::format("AbortDaq({}): Cannot abort in state {}", id, state));
246 }
247 // Erase if active
248 LOG4CPLUS_DEBUG(
249 m_logger,
250 fmt::format("AbortDaq({}): Erasing active DAQ currently in state {}", id, state));
251 m_active.erase(it);
252 }
253
254 // Publish that we're aborting
256
257 // Erase id from queue
258 LOG4CPLUS_DEBUG(m_logger, fmt::format("AbortDaq({}): Removing DAQ from merge queue", id));
259 m_queue.erase(std::find(m_queue.begin(), m_queue.end(), id));
260 m_workspace.StoreQueue(m_queue);
261
262 // Delete workspace
263 LOG4CPLUS_DEBUG(m_logger, fmt::format("AbortDaq({}): Removing DAQ workspace", id));
264 m_workspace.RemoveDaq(id);
265
266 // Publish that DAQ is aborted
267 pub(State::Aborted);
268
269 // Queue has changed -> Poll
270 DeferredPoll();
271 LOG4CPLUS_INFO(m_logger, fmt::format("AbortDaq({}): Aborted and removed DAQ", id));
272} catch (...) {
273 std::throw_with_nested(std::runtime_error("Scheduler: AbortDaq() failed"));
274}
275
276boost::signals2::connection SchedulerImpl::ConnectStatus(StatusSignal::slot_type const& slot) {
277 return m_status_signal.connect(slot);
278}
279
280void SchedulerImpl::Poll() {
281 LOG4CPLUS_TRACE(m_logger, "Poll()");
282 ArchiveCompleted();
283 ActivateFromQueue();
284}
285
286void SchedulerImpl::ArchiveCompleted() {
287 // Archive completed DAQs
288 for (auto& active : m_active) { // auto it = remove_it; it != m_active.end(); ++it) {
289 try {
290 if (!IsFinalState(active.daq->GetState())) {
291 continue;
292 }
293 auto id = active.daq->GetId();
294
295 auto archive_path = m_workspace.ArchiveDaq(id);
296 LOG4CPLUS_INFO(
297 m_logger,
298 fmt::format("DAQ {} is in final state {} -> moved workspace to archive: {}",
299 id,
300 active.daq->GetState(),
301 archive_path.native()));
302
303 // Delete controller
304 active.daq.reset();
305
306 // Remove ID from queue
307 m_queue.erase(std::find(m_queue.begin(), m_queue.end(), id));
308 m_workspace.StoreQueue(m_queue);
309 } catch (...) {
310 // @todo: Decide what to do here...
311 LOG4CPLUS_ERROR(m_logger,
312 "Failed to archive DAQ workspace:\n"
313 << error::NestedExceptionReporter(std::current_exception()));
314 }
315 }
316
317 // Erase removed daqs
318 auto remove_it = std::remove_if(
319 m_active.begin(), m_active.end(), [&](auto const& active) { return !active.daq; });
320 if (remove_it != m_active.end()) {
321 m_active.erase(remove_it, m_active.end());
322 // Since this freed resources we schedule poll() to be run again.
323 DeferredPoll();
324 }
325}
326
327void SchedulerImpl::ActivateFromQueue() {
328 // Update active DAQs
329 // See if a new DAQ can be active.
330 if (m_queue.empty()) {
331 LOG4CPLUS_INFO(m_logger, "Merge queue empty - all done!");
332 return;
333 }
334
335 auto candidates = GetCandidates();
336 if (candidates.empty()) {
337 LOG4CPLUS_INFO(m_logger, "All DAQ merge candidates are active/in-progress");
338 return;
339 }
340 // Schedule as many candidates as allowed by resource limits
341 try {
342 for (auto const& id : candidates) {
343 LOG4CPLUS_TRACE(m_logger, fmt::format("{}: Attempting to schedule DAQ", id));
344 auto maybe_token = m_resources.daqs.Acquire();
345 if (!maybe_token) {
346 LOG4CPLUS_INFO(m_logger,
347 fmt::format("Limit reached: Cannot schedule '{}' "
348 "Currently active DAQs at/exceed limit. "
349 "current: {}, limit: {}, queue size: {}",
350 id,
351 m_resources.daqs.GetUsed(),
352 m_resources.daqs.GetLimit(),
353 m_queue.size()));
354 return;
355 }
356
357 try {
358 // @todo: Store token with DAQ?
359 auto ws = m_workspace.LoadDaq(id);
360 auto daq_controller = m_daq_controller_factory(std::move(ws), m_resources);
361
362 // Aggregate signal
363 // both DaqController and slot are owned by Scheduler so we don't need
364 // to manage connection lifetime.
365 daq_controller->GetStatus().ConnectStatus([&](ObservableStatus const& s) {
366 // Completed DAQs should be archived so we schedule a poll in that case.
367 if (IsFinalState(s.GetState())) {
368 DeferredPoll();
369 }
370 // Just forward status
371 m_status_signal(s.GetStatus());
372 });
373 // Tell DaqController to start
374 daq_controller->Start();
375
376 m_active.emplace_back(std::move(daq_controller), std::move(*maybe_token));
377
378 LOG4CPLUS_DEBUG(m_logger, fmt::format("{}: DAQ scheduled for merging", id));
379 // Schedule new Poll() as queue has been modified.
380 DeferredPoll();
381 } catch (...) {
382 std::throw_with_nested(
383 std::runtime_error(fmt::format("{}: Failed to activate DAQ for merging.", id)));
384 }
385 }
386 } catch (...) {
387 std::throw_with_nested(std::runtime_error("Failed to load "));
388 }
389}
390
391void SchedulerImpl::DeferredPoll() {
392 if (m_stopped || *m_liveness) {
393 // Stopped or already scheduled
394 return;
395 }
396 boost::asio::post(m_executor.get_io_context().get_executor(),
397 [liveness = std::weak_ptr<bool>(m_liveness), this]() {
398 if (auto ptr = liveness.lock(); ptr) {
399 // Scheduler still alive
400 // Clear flag before invoking Poll so that another
401 // poll can be scheduled.
402 *ptr = false;
403 Poll();
404 }
405 });
406}
407
408std::vector<std::string> SchedulerImpl::GetCandidates() const {
409 // Return all DAQs in m_queue that is not in m_active.
410 std::vector<std::string> candidates;
411 for (auto const& id : m_queue) {
412 auto it = std::find_if(m_active.begin(), m_active.end(), [&](auto const& active) {
413 assert(active.daq);
414 return active.daq->GetId() == id;
415 });
416 if (it == m_active.end()) {
417 candidates.push_back(id);
418 }
419 }
420 return candidates;
421}
422
423DaqControllerImpl::DaqControllerImpl(rad::IoExecutor& executor,
424 std::unique_ptr<DaqWorkspace> workspace,
425 Resources& resources,
426 RsyncFactory rsync_factory,
427 ProcFactory proc_factory,
428 DaqControllerOptions options)
429 : m_executor(executor)
430 , m_workspace(std::move(workspace))
431 , m_resources(resources)
432 , m_rsync_factory(std::move(rsync_factory))
433 , m_proc_factory(std::move(proc_factory))
434 , m_options(std::move(options))
435 , m_dpspec(m_workspace->LoadSpecification())
436 , m_result()
437 , m_state_ctx(Scheduled{}) // m_state_ctx is updated in body with actual state.
438 , m_status(m_workspace->LoadStatus())
439 , m_status_connection()
440 , m_liveness(std::make_shared<bool>(false))
441 , m_stopped(true)
442 , m_logger(log4cplus::Logger::getInstance(LOGGER_NAME_CONTROLLER)) {
443 if (m_options.merge_bin.empty()) {
444 throw std::invalid_argument("Specified merge application name is empty");
445 }
446 if (m_options.rsync_bin.empty()) {
447 throw std::invalid_argument("Specified rsync application name is empty");
448 }
449
450 LOG4CPLUS_DEBUG(m_logger,
451 "Loaded status: " << m_status << "\nAlerts: " << m_status.GetAlerts());
452
453 // If result has already been created it is contained in the stored status.
454 // Otherwise we set to-be-used absolute path
455 if (m_status.GetStatus().result.empty()) {
456 m_result = m_workspace->GetResultPath() /
457 (m_dpspec.target.file_prefix + m_dpspec.target.file_id + ".fits");
458 } else {
459 m_result = m_status.GetStatus().result;
460 }
461
462 // Update m_state from loaded status.
463 switch (m_status.GetState()) {
464 case State::Scheduled:
465 SetState(Scheduled{});
466 break;
468 SetState(Collecting{SourceResolver(m_workspace->LoadSourceLookup())});
469 break;
470 case State::Merging:
471 SetState(Merging());
472 break;
473 case State::Releasing:
474 SetState(Releasing{});
475 break;
476 case State::Completed:
477 SetState(Completed{});
478 break;
479 default:
480 throw std::runtime_error("Not implemented");
481 };
482
483 m_status_connection = m_status.ConnectStatus([this](ObservableStatus const& status) mutable {
484 if (status.ChangesSinceLastSignal() == 0u) {
485 // No changes -> don't write
486 LOG4CPLUS_TRACE(
487 m_logger,
488 fmt::format(
489 "DaqController: No change -> skipping write of workspace status file: {}",
490 status));
491 return;
492 }
493 LOG4CPLUS_TRACE(m_logger,
494 fmt::format("DaqController: Updating workspace status file: {}", status));
495 m_workspace->StoreStatus(status);
496 });
497}
498
500 m_stopped = false;
501 DeferredPoll();
502}
503
505 m_stopped = true;
506}
507
508auto DaqControllerImpl::IsStopped() const noexcept -> bool {
509 return m_stopped;
510}
511
512auto DaqControllerImpl::GetId() const noexcept -> std::string const& {
513 return m_status.GetId();
514}
515
516auto DaqControllerImpl::GetErrorFlag() const noexcept -> bool {
517 return m_status.HasError();
518}
519
520auto DaqControllerImpl::GetState() const noexcept -> State {
521 return m_status.GetState();
522}
523
525 return m_status;
526}
527
528auto DaqControllerImpl::GetStatus() const noexcept -> ObservableStatus const& {
529 return m_status;
530}
531
532void DaqControllerImpl::DeferredPoll() {
533 if (m_stopped || *m_liveness) {
534 // Stopped or already scheduled
535 return;
536 }
537 boost::asio::post(m_executor.get_io_context().get_executor(),
538 [liveness = std::weak_ptr<bool>(m_liveness), this]() {
539 if (auto ptr = liveness.lock(); ptr) {
540 // Scheduler still alive
541 // Clear flag before invoking Poll so that another
542 // poll can be scheduled.
543 *ptr = false;
544 Poll();
545 }
546 });
547}
548
550 LOG4CPLUS_TRACE(m_logger, "Poll()");
551 if (m_stopped) {
552 LOG4CPLUS_DEBUG(
553 m_logger,
554 fmt::format("{}: Poll() - DaqController is stopped so nothing will be done", *this));
555 // Stopped or already scheduled
556 return;
557 }
558 std::visit([this](auto& state) { Poll(state); }, m_state_ctx);
559}
560
561void DaqControllerImpl::Poll(DaqControllerImpl::Scheduled&) {
562 LOG4CPLUS_TRACE(m_logger, "Poll(Scheduled)");
563 try {
564 // Create list of remote sources and build mapping to local files.
565 std::vector<json::FitsFileSource> sources;
566 if (m_dpspec.target.source) {
567 sources.push_back(*m_dpspec.target.source);
568 }
569 for (auto const& s : m_dpspec.sources) {
570 if (!std::holds_alternative<json::FitsFileSource>(s)) {
571 continue;
572 }
573 sources.push_back(std::get<json::FitsFileSource>(s));
574 }
575
576 auto sources_path = m_workspace->GetSourcesPath();
577
578 SourceResolver resolver;
579 using boost::adaptors::indexed;
580 for (auto const& it : sources | indexed(0u)) {
581 auto index = it.index();
582 auto const& s = it.value();
583 json::Location location = json::ParseSourceLocation(s.location);
584 // Use <index>_<source>_<filename> as local filename.
585 auto local_path =
586 sources_path /
587 fmt::format("{}_{}_{}", index, s.source_name, location.path.filename().native());
588 LOG4CPLUS_INFO(m_logger,
589 fmt::format("Poll(Scheduled): Source file '{}' from source \"{}\" "
590 "on host \"{}\" will be stored in {}",
591 location.path,
592 s.source_name,
593 !location.host.empty() ? location.host.c_str() : "<n/a>",
594 local_path));
595
596 resolver.Add({s.source_name, s.location}, local_path);
597 }
598
599 // Store source resolution mapping.
600 m_workspace->StoreSourceLookup(resolver.GetMapping());
601
602 SetState(Collecting(std::move(resolver)));
603 } catch (...) {
604 auto msg =
605 fmt::format("{}: Failed to collect and store list of required sources", m_status);
606 LOG4CPLUS_ERROR(m_logger, fmt::format("Poll(Scheduled): Failed to process DAQ: {}", msg));
607 // No point in retrying these kinds of errors.
608 Stop();
609 std::throw_with_nested(std::runtime_error(msg));
610 }
611}
612
614 LOG4CPLUS_TRACE(m_logger, "Poll(Collecting)");
615 try {
616 // Check if any source is missing and start transfers if possible
617 // If all transfers are done transition to State::Merging.
618 auto defer = ObservableStatus::DeferSignal(&m_status);
619
620 auto const& sources = ctx.resolver.GetMapping();
621 auto missing = sources.size();
622 auto root = m_workspace->GetPath();
623 for (auto const& source : sources) {
624 auto dest = source.second;
625 if (ctx.HasTransfer(source.first)) {
626 // Transfer already in progress
627 continue;
628 }
629 // [recovery]
630 if (m_workspace->Exists(dest)) {
631 LOG4CPLUS_DEBUG(
632 m_logger,
633 fmt::format("Poll(Collecting): File exist -> won't start new transfer: {}",
634 dest));
635 // Clear alert if there was a previous issue.
637 missing--;
638 // File already exist
639 continue;
640 }
641 if (m_soft_stop) {
642 // If there are errors we don't initiate new transfers
643 LOG4CPLUS_DEBUG(m_logger,
644 "Poll(Collecting): Soft stop is enabled"
645 "-> won't start new transfer");
646 continue;
647 }
648 // We need to transfer the file, try to start.
649 auto token = m_resources.net_receive.Acquire();
650 if (token) {
651 // We got the token, so we can start transfer
652 json::Location location = json::ParseSourceLocation(source.first.location);
653 using Proc = std::shared_ptr<RsyncAsyncProcessIf>;
654 RsyncOptions opts;
655 opts.rsync = m_options.rsync_bin;
656 auto log_file =
657 std::filesystem::path(dest).filename().replace_extension("rsync.log");
658 opts.logfile = m_workspace->GetPath() / m_workspace->GetLogsPath() / log_file;
659 Proc proc = m_rsync_factory(m_executor.get_io_context(),
660 location.RsyncPath(),
661 root / dest,
662 opts,
664
665 auto last_lines = std::make_unique<LogCaptureLast>(
666 log4cplus::Logger::getInstance(LOGGER_NAME_TRANSFER), 10);
667 // Connect signals (stdout is ignored for now as it merely contains the progress
668 // which will be handled differently later on).
669 proc->ConnectStderr(std::ref(*last_lines));
670 // Start transfer and set up completion handler
671 proc->Initiate().then(
672 m_executor,
673 [liveness = std::weak_ptr<bool>(m_liveness),
674 source = source.first,
675 dest = dest,
676 proc = proc,
677 last_lines = std::move(last_lines),
678 this](boost::future<int> f) {
679 if (liveness.expired()) {
680 LOG4CPLUS_ERROR("dpm.daqcontroller",
681 fmt::format("DaqController abandoned -> ignoring "
682 "result from rsync for transfer of {}",
683 source));
684 return;
685 }
686 TransferComplete(source, dest, std::move(f), *last_lines);
687 });
688 LOG4CPLUS_DEBUG(
689 m_logger,
690 fmt::format(
691 "{}: Started transfer: {} -> {}: {}", *this, source.first, dest, *proc));
692 ctx.transfers.emplace_back(Collecting::Transfer{
693 source.first, source.second, std::move(proc), std::move(*token)});
694 } else {
695 // Out of tokens -> bail out
696 LOG4CPLUS_TRACE(m_logger,
697 fmt::format("Poll(Collecting): Could not start transfer due to "
698 "resource limit reached: {}",
699 source.first));
700 return;
701 }
702 }
703
704 if (missing == 0) {
705 // If all transfers are complete we can transition to next state
706 SetState(Merging());
707 return;
708 }
709 } catch (...) {
710 // No point in retrying these kinds of errors.
711 auto msg = fmt::format("{}: Failed to transfer required sources", m_status);
712 LOG4CPLUS_ERROR(m_logger, fmt::format("Poll(Collecting): Failed to process DAQ: {}", msg));
713 Stop();
714 std::throw_with_nested(std::runtime_error(msg));
715 }
716}
717
718void DaqControllerImpl::TransferComplete(SourceResolver::SourceFile const& source,
719 std::filesystem::path const& local_path,
720 boost::future<int> result,
721 LogCaptureLast const& log) noexcept {
722 LOG4CPLUS_TRACE(m_logger,
723 fmt::format("{}: TransferComplete: {} -> {}", *this, source, local_path));
724 auto* ctx = std::get_if<Collecting>(&m_state_ctx);
725 try {
726 if (ctx) {
727 ctx->EraseTransfer(source);
728 }
729
730 int return_code = result.get();
731 auto alert =
732 ObservableStatus::AlertActivator(&m_status, alert::COLLECTING_RSYNC, local_path);
733 if (return_code != 0) {
734 auto msg = fmt::format(
735 "rsync file transfer failed for remote file {}. Last lines: \n{}", source, log);
736 alert.Set(msg);
737 LOG4CPLUS_ERROR(m_logger, msg);
738 throw std::runtime_error(msg);
739 }
740 // Opportunistically clear alert in case it as set from previous attempt
741 alert.Clear();
742 LOG4CPLUS_INFO(m_logger,
743 fmt::format("{}: Poll(Collecting): Successfully transferred file: {} -> {}",
744 *this,
745 source,
746 local_path));
747
748 // Transfer ok
749 DeferredPoll();
750 return;
751 } catch (...) {
752 LOG4CPLUS_ERROR(m_logger,
753 fmt::format("{}: Poll(Collecting): Failed to transfer file: {} -> {}",
754 *this,
755 source,
756 local_path));
757 // Stopping DAQ can be done when there's no more pending file transfers (we let them
758 // complete).
759 if (ctx && ctx->transfers.empty()) {
760 LOG4CPLUS_ERROR(
761 m_logger,
762 fmt::format("{}: Poll(Collecting): All pending transfers have completed "
763 "(with or without error) so we stop DAQ",
764 *this));
765 Stop();
766 }
767 }
768}
769
770bool DaqControllerImpl::Collecting::HasTransfer(
771 SourceResolver::SourceFile const& source) const noexcept {
772 auto it = std::find_if(transfers.cbegin(), transfers.cend(), [&](Transfer const& t) {
773 return t.source == source;
774 });
775 return it != transfers.cend();
776}
777
778DaqControllerImpl::Collecting::Transfer*
779DaqControllerImpl::Collecting::GetTransfer(SourceResolver::SourceFile const& source) noexcept {
780 auto it = std::find_if(
781 transfers.begin(), transfers.end(), [&](Transfer const& t) { return t.source == source; });
782
783 if (it != transfers.end()) {
784 return &(*it);
785 }
786 return nullptr;
787}
788
789void DaqControllerImpl::Collecting::EraseTransfer(SourceResolver::SourceFile const& source) {
790 auto it = std::find_if(transfers.cbegin(), transfers.cend(), [&](Transfer const& t) {
791 return t.source == source;
792 });
793 if (it != transfers.end()) {
794 transfers.erase(it);
795 }
796}
797
798DaqControllerImpl::Collecting::~Collecting() noexcept {
799 for (auto& transfer : transfers) {
800 if (transfer.proc && transfer.proc->IsRunning()) {
801 LOG4CPLUS_INFO(
802 "dpm",
803 "DaqController::Collecting::~Collecting: Terminating running transfer process");
804 transfer.proc->Signal(SIGTERM);
805 }
806 }
807}
808
809DaqControllerImpl::Merging::~Merging() noexcept {
810 if (merger && merger->IsRunning()) {
811 LOG4CPLUS_DEBUG("dpm", "DaqController::Merging::~Merging: Aborting running merge process");
812 merger->Signal(SIGTERM);
813 }
814}
815
816DaqControllerImpl::Releasing::~Releasing() noexcept {
817 for (auto& p : transfers) {
818 if (p.second.proc && p.second.proc->IsRunning()) {
819 LOG4CPLUS_INFO(
820 "dpm",
821 "DaqController::Releasing::~Releasing: Terminating running transfer process");
822 p.second.proc->Signal(SIGTERM);
823 }
824 }
825}
826
827void DaqControllerImpl::Poll(DaqControllerImpl::Merging& ctx) {
828 LOG4CPLUS_TRACE(m_logger, "Poll(Merging)");
829 auto defer = ObservableStatus::DeferSignal(&m_status);
830 try {
831 if (ctx.merger) {
832 // Merge is already in progress
833 return;
834 }
835 // [recovery]
836 if (m_workspace->Exists(m_result)) {
838 SetState(Completed{});
839 LOG4CPLUS_TRACE(m_logger,
840 fmt::format("{}: Poll(Merging): "
841 "Recovered from error automatically (manual merge)",
842 *this));
843 return;
844 }
845 auto token = m_resources.merge.Acquire();
846 if (!token) {
847 LOG4CPLUS_TRACE(m_logger,
848 fmt::format("{}: Poll(Merging): Could not start merging due to "
849 "resource limit reached: {}",
850 *this,
851 m_resources.merge.GetLimit()));
852
853 return;
854 }
855
856 // Launch merge
857 // note: daqDpmMerge will atomically move result into output so we don't have
858 // to do that ourselves.
859 std::vector<std::string> args{m_options.merge_bin};
860 args.emplace_back("--json");
861 args.emplace_back("--root");
862 args.emplace_back(m_workspace->GetPath());
863 args.emplace_back("--logfile");
864 args.emplace_back(m_workspace->GetLogsPath() / "merge.log");
865 args.emplace_back("--resolver");
866 args.emplace_back(m_workspace->GetSourceLookupPath());
867 args.emplace_back("-o");
868 args.emplace_back(m_result.native());
869 // Positional argument for the specification which is passed as absolute path.
870 args.emplace_back(m_workspace->GetPath() / m_workspace->GetSpecificationPath());
871
872 PrintArgs(m_logger, args);
873
874 // Clear all merger alerts before HandleMergeMessage() sets them again.
876
877 ctx.merger = m_proc_factory(m_executor.get_io_context(), args);
878
879 ctx.merger->ConnectStdout([logger = log4cplus::Logger::getInstance(LOGGER_NAME_MERGER),
880 this](pid_t pid, std::string const& line) {
881 LOG4CPLUS_DEBUG(logger, pid << ": " << Trim(line));
882 HandleMergeMessage(line);
883 });
884 ctx.merger->ConnectStderr([logger = log4cplus::Logger::getInstance(LOGGER_NAME_MERGER)](
885 pid_t pid, std::string const& line) {
886 LOG4CPLUS_INFO(logger, pid << ": " << Trim(line));
887 });
888
889 ctx.merger->Initiate().then(
890 m_executor,
891 [liveness = std::weak_ptr<bool>(m_liveness),
892 id = m_status.GetId(),
893 proc = ctx.merger,
894 this](boost::future<int> f) {
895 if (liveness.expired()) {
896 LOG4CPLUS_ERROR(LOGGER_NAME_SCHEDULER,
897 fmt::format("{}: DaqController abandoned -> ignoring "
898 "result from merger",
899 id));
900 return;
901 }
902 MergeComplete(std::move(f));
903 });
904 ctx.token = std::move(*token);
905 } catch (...) {
906 Stop();
907 auto msg = fmt::format("{}: Failed to initiate merging", *this);
908 LOG4CPLUS_ERROR(m_logger, msg);
909 std::throw_with_nested(std::runtime_error(msg));
910 }
911}
912
913void DaqControllerImpl::MergeComplete(boost::future<int> result) noexcept {
914 LOG4CPLUS_TRACE(m_logger, "MergeComplete()");
915 try {
916 auto alert = ObservableStatus::AlertActivator(&m_status, alert::MERGING_MERGE, "");
917 auto* ctx = std::get_if<Merging>(&m_state_ctx);
918 if (ctx) {
919 ctx->Reset();
920 }
921 auto exit_code = result.get();
922 if (exit_code != 0) {
923 auto msg = fmt::format("Merging failed with code {}", exit_code);
924 alert.Set(msg);
925 throw std::runtime_error(fmt::format("Merging failed with code {}", exit_code));
926 }
927 if (!m_workspace->Exists(m_result)) {
928 auto abs_path = m_workspace->GetPath() / m_result;
929 auto msg =
930 fmt::format("Merging reported with success but file is not found: {}", abs_path);
931 alert.Set(msg);
932 LOG4CPLUS_ERROR(m_logger, msg);
933 throw std::runtime_error(msg);
934 }
935 alert.Clear();
936
937 // Create symlink and set result
938 m_workspace->MakeResultSymlink(m_result);
939 // @todo: Include host?
940 m_status.SetResult(m_result.native());
941
942 // All good -> transition to State::Completed
943 // @todo: Transition to Releasing rather than completed
944 LOG4CPLUS_INFO(m_logger, fmt::format("{}: Merge completed successfully!", *this));
945 SetState(Releasing{});
946 } catch (...) {
947 Stop();
948 LOG4CPLUS_ERROR(
949 m_logger,
950 fmt::format(
951 "{}: PollMergeComplete: Failed to create data product: {}", *this, m_result));
952 }
953}
954
955bool DaqControllerImpl::TryStartRelease(Releasing& ctx,
956 json::ReceiverTypes const& receiver,
957 std::size_t index) {
958 // Start release transfer according to configuration
959
960 if (auto* cfg = std::get_if<json::OlasReceiver>(&receiver); cfg) {
961 return TryStartRelease(ctx, *cfg, index);
962 } else {
963 LOG4CPLUS_ERROR(
964 m_logger,
965 fmt::format("Receiver[{}]: TryStartRelease: Unknown receiver type encountered.",
966 index));
967 }
968
969 return true;
970}
971
972bool DaqControllerImpl::TryStartRelease(Releasing& ctx,
973 json::OlasReceiver const& receiver,
974 std::size_t index) {
975 LOG4CPLUS_TRACE(
976 m_logger,
977 fmt::format("Receiver[{}]: TryStartRelease: Will try to start transfer to receiver",
978 index));
979 namespace fs = std::filesystem;
980 if (receiver.host.empty()) {
981 // Local host case
982 //
983 // Try first to create a hardlink, then symlink, which if it works we don't need to copy
984 // the file (in the case of symlink OLAS is responsible for copying).
985 //
986 // Failure to crate hardlink may include:
987 // - Filesystem doesn't support it.
988 // - Different filesystem between link and target.
989 // - Link is in a directory that does not exist (rsync might create it but we don't)
990
991 // note: We cannot simply use result filename as this may contain custom prefixes.
992 auto link = receiver.path / GetArchiveFilename();
993 if (std::error_code ec = m_workspace->MakeHardLink(m_result, link); !ec) {
994 // Mark as completed
995 m_status.SetReceiverStatus(index, ReceiverStatus{ReceiverStatus::State::Success});
996 DeferredPoll();
997 LOG4CPLUS_INFO(
998 m_logger,
999 fmt::format("Receiver[{}]: Hardlink created from {} -> {}", index, m_result, link));
1000 return true;
1001 } else if (receiver.options.allow_symlink && !m_workspace->MakeSymlink(m_result, link)) {
1002 // Mark as completed
1003 m_status.SetReceiverStatus(index, ReceiverStatus{ReceiverStatus::State::Success});
1004 DeferredPoll();
1005 LOG4CPLUS_INFO(
1006 m_logger,
1007 fmt::format("Receiver[{}]: Symlink created from {} -> {}", index, m_result, link));
1008 return true;
1009 } else {
1010 // Failure to create hardlink is not an error.
1011 LOG4CPLUS_TRACE(
1012 m_logger,
1013 fmt::format("Receiver[{}]: Hard or symlink could not be created from {} -> {}",
1014 index,
1015 m_result,
1016 link));
1017 }
1018 }
1019
1020 auto maybe_token = m_resources.net_send.Acquire();
1021 if (!maybe_token) {
1022 // Out of tokens -> bail out
1023 LOG4CPLUS_TRACE(m_logger,
1024 fmt::format("Receiver[{}]: Could not start rsync transfer due to "
1025 "resource limit reached.",
1026 index));
1027 return false;
1028 }
1029 using Proc = std::shared_ptr<RsyncAsyncProcessIf>;
1030 RsyncOptions opts;
1031 opts.rsync = m_options.rsync_bin;
1032
1033 auto const& path_from = m_result.native();
1034 auto path_to = receiver.path / GetArchiveFilename();
1035 auto location = receiver.host.empty() ? path_to.native()
1036 : fmt::format("{}:{}", receiver.host, path_to.native());
1037 opts.logfile = m_workspace->GetPath() / m_workspace->GetLogsPath() /
1038 fmt::format("{}_receiver.rsync.log", index);
1039 Proc proc = m_rsync_factory(m_executor.get_io_context(),
1040 path_from,
1041 location,
1042 opts,
1044 auto last_lines =
1045 std::make_unique<LogCaptureLast>(log4cplus::Logger::getInstance(LOGGER_NAME_TRANSFER), 10);
1046
1047 // Connect signals (stdout is ignored for now as it merely contains the progress
1048 // which will be handled differently later on).
1049 proc->ConnectStderr(std::ref(*last_lines));
1050 // Start transfer and set up completion handler
1051 proc->Initiate().then(
1052 m_executor,
1053 [this,
1054 proc,
1055 index,
1056 last_lines = std::move(last_lines),
1057 liveness = std::weak_ptr<bool>(m_liveness)](boost::future<int> f) {
1058 if (liveness.expired()) {
1059 LOG4CPLUS_ERROR("dpm.daqcontroller",
1060 fmt::format("Receiver[{}]: DaqController abandoned -> ignoring "
1061 "result from rsync for receiver transfer "
1062 "of {}",
1063 index,
1064 *proc));
1065 return;
1066 }
1067 ReleaseComplete(index, *proc, std::move(f), *last_lines);
1068 });
1069 LOG4CPLUS_DEBUG(m_logger,
1070 fmt::format("{}: Receiver[{}] Started release: {}", *this, index, *proc));
1071 // Update DAQ status
1072 m_status.SetReceiverStatus(index, ReceiverStatus{ReceiverStatus::State::Started});
1073 // Add transfer
1074 ctx.transfers.emplace(index, Releasing::Transfer{std::move(proc), std::move(*maybe_token)});
1075 return true;
1076}
1077
1079 LOG4CPLUS_TRACE(m_logger, "Poll(Releasing)");
1080
1081 if (m_dpspec.receivers.empty()) {
1082 LOG4CPLUS_INFO(m_logger,
1083 "Poll(Releasing): No receivers configured -> transition to Completed");
1084 SetState(Completed{});
1085 return;
1086 }
1087
1088 // Multiple changes can be made here so we defer signalling until we've done them all.
1089 auto defer = ObservableStatus::DeferSignal(&m_status);
1090 try {
1091 // For every non-started, non-completed receiver we initiate the process, limited by
1092 // available resources.
1093 // Once releasing is done (no more pending, transition to Completed.
1094
1095 // Check status of each receiver as configured in dpspec
1096 // - Try to start non-started receiver transfer
1097 auto index = 0u;
1098 auto pending = 0;
1099 LOG4CPLUS_TRACE(m_logger,
1100 "Poll(Releasing): Number of receivers: " << m_dpspec.receivers.size());
1101
1102 for (auto it = m_dpspec.receivers.begin(); it != m_dpspec.receivers.end(); ++it, ++index) {
1103 // Persistent status of receivers contain status of previously started transfers.
1104 auto receiver_status = m_status.GetReceiverStatus(index);
1105 if (receiver_status.state == ReceiverStatus::State::NotStarted) {
1106 LOG4CPLUS_TRACE(
1107 m_logger,
1108 "Poll(Releasing): Receiver[" << index << "] Will try to start receiver.");
1109 // No status for receiver -> Try to start it
1110 TryStartRelease(ctx, *it, index);
1111 pending++;
1112 } else if (!receiver_status.IsFinalState()) {
1113 LOG4CPLUS_TRACE(m_logger,
1114 "Poll(Releasing): Receiver["
1115 << index << "] already in progress: " << receiver_status.state);
1116 pending++;
1117 } else {
1118 LOG4CPLUS_TRACE(m_logger,
1119 "Poll(Releasing): Receiver[" << index << "] already completed with "
1120 << receiver_status.state);
1121 }
1122 }
1123
1124 if (pending == 0 && ctx.transfers.empty()) {
1125 // Since we have no pending (to be started) and no active transfers we're done!
1126 LOG4CPLUS_INFO(
1127 m_logger,
1128 "Poll(Releasing): No release transfers remains -> transition to Completed");
1129 SetState(Completed{});
1130 }
1131 } catch (...) {
1132 // Alerts should already be set.
1133 Stop();
1134
1135 auto msg = fmt::format("{}: Failed to release ", m_status);
1136 LOG4CPLUS_ERROR(m_logger, fmt::format("Poll(Releasing): Failed to process DAQ: {}", msg));
1137 std::throw_with_nested(std::runtime_error(msg));
1138 }
1139}
1140
1141void DaqControllerImpl::ReleaseComplete(std::size_t index,
1142 AsyncProcessIf const& proc,
1143 boost::future<int> result,
1144 LogCaptureLast const& lines) noexcept {
1145 auto& ctx = std::get<Releasing>(m_state_ctx);
1146 // Remove transfer
1147 ctx.transfers.erase(index);
1148 auto alert = ObservableStatus::AlertActivator(
1149 &m_status, alert::RELEASING_RSYNC, fmt::format("{}", index));
1150 try {
1151 auto code = result.get();
1152
1153 LOG4CPLUS_DEBUG(
1154 m_logger,
1155 fmt::format(
1156 "{}: Receiver[{}]: Process completed with rc={}: {}", *this, index, code, proc));
1157
1158 if (code == 0) {
1159 alert.Clear();
1160 m_status.SetReceiverStatus(index, ReceiverStatus{ReceiverStatus::State::Success});
1161 } else {
1162 m_status.SetReceiverStatus(index, ReceiverStatus{ReceiverStatus::State::Failure});
1163 auto msg = fmt::format(
1164 "Receiver[{}]: Failed to release file using command: {}. Last lines:\n{}",
1165 index,
1166 proc,
1167 lines);
1168 alert.Set(msg);
1169 LOG4CPLUS_ERROR(m_logger, *this << ' ' << msg);
1170 }
1171 } catch (std::exception const& e) {
1172 auto msg = fmt::format(
1173 "Receiver[{}]: Failed to release file using command: {}. Last "
1174 "lines:\n{}\nException:\n{}",
1175 index,
1176 proc,
1177 lines,
1178 e.what());
1179 LOG4CPLUS_ERROR(m_logger, *this << ' ' << msg);
1180 alert.Set(msg);
1181 Stop();
1182 }
1183
1184 DeferredPoll();
1185}
1186
1188 LOG4CPLUS_TRACE(m_logger, "Poll(Completed)");
1189 // We're done
1190 Stop();
1191}
1192
1193void DaqControllerImpl::SetState(StateVariant s) {
1194 m_state_ctx = std::move(s);
1195 auto new_state = MakeState(m_state_ctx);
1196 if (new_state == m_status.GetState()) {
1197 // No change
1198 return;
1199 }
1200 LOG4CPLUS_DEBUG(m_logger, "New state: " << new_state << ", Alerts: " << m_status.GetAlerts());
1201
1202 m_status.SetState(new_state);
1203 DeferredPoll();
1204}
1205
1206State DaqControllerImpl::MakeState(StateVariant const& s) {
1207 if (std::holds_alternative<Scheduled>(s)) {
1208 return State::Scheduled;
1209 }
1210 if (std::holds_alternative<Collecting>(s)) {
1211 return State::Collecting;
1212 }
1213 if (std::holds_alternative<Merging>(s)) {
1214 return State::Merging;
1215 }
1216 if (std::holds_alternative<Releasing>(s)) {
1217 return State::Releasing;
1218 }
1219 if (std::holds_alternative<Completed>(s)) {
1220 return State::Completed;
1221 }
1222
1223 assert(false); // NOLINT Not implemented yet
1224}
1225
1226DaqControllerImpl::Collecting::Collecting(SourceResolver resolver_arg)
1227 : resolver(std::move(resolver_arg)) {
1228}
1229
1230void DaqControllerImpl::Merging::Reset() {
1231 merger.reset();
1232 token.reset();
1233}
1234
1235void DaqControllerImpl::HandleMergeMessage(std::string const& line) noexcept try {
1236 // A valid JSON message on stdout has the basic form:
1237 // {"type": "<content-type>",
1238 // "timestamp": <int64 nanonseconds since epoch>,
1239 // "content": <content>}
1240 auto json = nlohmann::json::parse(line);
1241 auto const& type = json.at("type").get<std::string>();
1242 auto const& content = json.at("content");
1243 if (type == "alert") {
1244 // Content schema:
1245 // {"id", "ID",
1246 // "message", "MESSAGE"}
1247 std::string id;
1248 std::string message;
1249 content.at("id").get_to(id);
1250 content.at("message").get_to(message);
1251 auto alert_id = MakeAlertId(alert::MERGING_MERGE, id);
1252 m_status.SetAlert(MakeAlert(alert_id, message));
1253 }
1254} catch (...) {
1255 LOG4CPLUS_DEBUG(m_logger, "Failed to parse JSON message from merger");
1256}
1257
1258std::string DaqControllerImpl::GetArchiveFilename() const {
1259 if (m_status.GetFileId().empty()) {
1260 throw std::runtime_error(fmt::format("{}: DAQ status has no file_id!", m_status));
1261 }
1262 return fmt::format("{}.fits", m_status.GetFileId());
1263}
1264
1265} // namespace daq::dpm
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:224
State GetState() const noexcept
Definition: status.cpp:297
void SetReceiverStatus(std::size_t index, ReceiverStatus status)
Set receiver status.
Definition: status.cpp:313
Status const & GetStatus() const noexcept
Connect observer that is invoked when state is modified.
Definition: status.cpp:371
void ClearAlert(AlertId const &alert)
Clear alert.
Definition: status.cpp:341
void SetState(State s) noexcept
Set state of data acquisition.
Definition: status.cpp:305
std::string const & GetFileId() const noexcept
Definition: status.cpp:293
boost::signals2::connection ConnectStatus(SignalType::slot_type const &slot)
Connect observer that is invoked when state is modified.
Definition: status.hpp:409
ReceiverStatus GetReceiverStatus(std::size_t index) const noexcept
Get receiver status.
Definition: status.cpp:323
Alerts const & GetAlerts() const noexcept
Definition: status.cpp:285
bool HasError() const noexcept
Definition: status.cpp:301
unsigned ChangesSinceLastSignal() const noexcept
Query number of changes made since last signal.
Definition: status.hpp:420
std::string const & GetId() const noexcept
Definition: status.cpp:281
boost::signals2::connection Connect(Signal::slot_type const &slot)
Connect to signal that is emitted when a resource become available.
std::optional< ResourceToken > Acquire() noexcept
unsigned GetLimit() const noexcept
unsigned GetUsed() const noexcept
auto IsStopped() const noexcept -> bool override
Definition: scheduler.cpp:508
auto GetState() const noexcept -> State override
Definition: scheduler.cpp:520
auto GetId() const noexcept -> std::string const &override
Definition: scheduler.cpp:512
void Start() override
Start/stop operations.
Definition: scheduler.cpp:499
auto GetErrorFlag() const noexcept -> bool override
Definition: scheduler.cpp:516
auto GetStatus() noexcept -> ObservableStatus &override
Definition: scheduler.cpp:524
std::function< std::unique_ptr< AsyncProcessIf >(boost::asio::io_context &, std::vector< std::string > const &)> ProcFactory
Definition: scheduler.hpp:442
std::function< std::unique_ptr< RsyncAsyncProcessIf >(boost::asio::io_context &, std::string const &, std::string const &, RsyncOptions const &, RsyncAsyncProcess::DryRun)> RsyncFactory
Definition: scheduler.hpp:439
Controller for specific DAQ.
Definition: scheduler.hpp:61
std::function< std::unique_ptr< DaqController >(std::unique_ptr< DaqWorkspace >, Resources &)> DaqControllerFactory
Definition: scheduler.hpp:298
boost::signals2::connection ConnectStatus(StatusSignal::slot_type const &slot) override
Signals.
Definition: scheduler.cpp:276
std::string QueueDaq(std::string const &dp_spec, std::string const &status) override
Queues DAQ for processing.
Definition: scheduler.cpp:131
SchedulerImpl(rad::IoExecutor &executor, Workspace &workspace, DaqControllerFactory daq_controller_factory, SchedulerOptions const &options)
Constructs a scheduler loading information from workspace ws.
Definition: scheduler.cpp:98
void Stop() override
Definition: scheduler.cpp:127
void AbortDaq(std::string const &) override
Abort merging DAQ identified by id.
Definition: scheduler.cpp:218
std::vector< std::string > GetQueue() const noexcept override
Queries current DAQ queue.
Definition: scheduler.cpp:214
void Start() override
Start/stop operations.
Definition: scheduler.cpp:122
Status GetDaqStatus(std::string const &id) const override
Queries current DAQ status, possibly from last recorded status in workspace.
Definition: scheduler.cpp:197
bool IsQueued(std::string const &id) const noexcept override
Queries if DAQ with ID has been queued before in the current workspace.
Definition: scheduler.cpp:193
Provides location of fits source file.
Interface to interact with DPM workspace.
Definition: workspace.hpp:133
virtual void RemoveDaq(std::string const &daq_id)=0
Removes workspace and all containing files for DAQ without archiving it.
virtual auto ArchiveDaq(std::string const &daq_id) -> std::filesystem::path=0
Archives specified DAQ witout deleting any files, typically by moving it to a specific location in th...
virtual auto LoadDaq(std::string const &daq_id) -> std::unique_ptr< DaqWorkspace >=0
Loads a previously initialized DAQ workspace.
virtual auto InitializeDaq(std::string const &daq_id) -> std::unique_ptr< DaqWorkspace >=0
Initializes new DAQ Workspace.
virtual auto GetPath() const -> std::filesystem::path=0
virtual void StoreQueue(std::vector< std::string > const &queue) const =0
virtual auto LoadQueue() const -> std::vector< std::string >=0
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
boost::asio::io_context & get_io_context() noexcept
Not part of the boost::thread::executor concept.
Definition: ioExecutor.hpp:41
Declares JSON support for serialization.
DPM server config.
daq::dpm::Workspace interface and implementation declaration
Declaration of log4cplus helpers.
constexpr std::string_view RELEASING_RSYNC
Failure during rsync source copy.
Definition: status.hpp:44
constexpr std::string_view COLLECTING_RSYNC
Failure during rsync source copy.
Definition: status.hpp:38
constexpr std::string_view MERGING_MERGE
Merging failed.
Definition: status.hpp:53
std::ostream & operator<<(std::ostream &os, DaqController const &daq)
Definition: scheduler.cpp:93
const std::string LOGGER_NAME_CONTROLLER
Definition: config.hpp:24
const std::string LOGGER_NAME_SCHEDULER
Definition: config.hpp:23
boost::signals2::scoped_connection daqs
Definition: scheduler.hpp:180
const std::string LOGGER_NAME_MERGER
Definition: config.hpp:26
const std::string LOGGER_NAME_TRANSFER
Definition: config.hpp:25
Options for DaqController.
Definition: scheduler.hpp:164
Limited resources.
Definition: scheduler.hpp:172
Options controlling scheduler operations.
Definition: scheduler.hpp:134
std::variant< OlasReceiver > ReceiverTypes
std::string id
DAQ id.
Definition: dpSpec.hpp:45
Location ParseSourceLocation(std::string const &location_str)
Parse location string from DpSpec into component parts.
Definition: dpSpec.cpp:90
DpSpec ParseDpSpec(Json const &json)
Parse JSON to construct the DpSpec structure.
Definition: dpSpec.cpp:47
Target target
Describes target which will become the data produtc.
Definition: dpSpec.hpp:49
std::optional< FitsFileSource > source
Definition: dpSpec.hpp:37
ReceiverList receivers
Ordered container of receivers where to deliver the target data product.
Definition: dpSpec.hpp:59
std::vector< SourceTypes > sources
List of sources to create data product from.
Definition: dpSpec.hpp:54
std::string file_prefix
Optioal user chosen file prefix to make it easier to identify the produced file.
Definition: dpSpec.hpp:36
Close representation of the JSON structure but with stronger types.
Definition: dpSpec.hpp:30
AlertId MakeAlertId(std::string_view category, std::string key)
Definition: status.cpp:55
State
Observable states of the data acquisition process.
Definition: state.hpp:41
@ Completed
Completed DAQ.
@ Scheduled
daq is acknowledged by dpm and is scheduled for merging (i.e.
@ Releasing
Releasing Data Product to receivers.
@ Collecting
Input files are being collected.
@ Aborted
Data acquisition has been aborted by user.
@ Merging
DAQ is being merged.
@ AbortingMerging
Transitional state for aborting during merging.
bool IsFinalState(State state) noexcept
Query whether state is in a final state.
Definition: state.cpp:15
bool IsAbortableState(State state) noexcept
Query whether state is in an abortable state.
Definition: state.cpp:25
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
Definition: status.cpp:45
daq::dpm::Scheduler and related class declarations.
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:164
State state
Definition: status.hpp:186
std::string result
Path to resulting data product.
Definition: status.hpp:202
TimePoint timestamp
Timestamp of last update.
Definition: status.hpp:207