ifw-daq  3.0.1
IFW Data Acquisition modules
scheduler.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_dpm
4  * @copyright (c) Copyright ESO 2022
5  * All Rights Reserved
6  * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7  *
8  * @brief daq::dpm::Scheduler implementation
9  */
10 #include <daq/dpm/scheduler.hpp>
11 
12 #include <algorithm>
13 
14 #include <boost/asio/post.hpp>
15 #include <boost/range/adaptor/indexed.hpp>
16 #include <fmt/format.h>
17 #include <fmt/ostream.h>
18 #include <log4cplus/loggingmacros.h>
19 
20 #include <daq/dpm/config.hpp>
21 #include <daq/dpm/workspace.hpp>
22 #include <daq/error/report.hpp>
23 #include <daq/log4cplus.hpp>
24 
25 namespace daq::dpm {
26 
27 namespace {
28 void PrintArgs(log4cplus::Logger& logger, std::vector<std::string> const& args) {
29  std::stringstream ss;
30  ss << "{";
31  bool first = true;
32  for (auto const& token : args) {
33  if (!first) {
34  ss << ", ";
35  }
36  ss << token;
37  first = false;
38  }
39  ss << "}";
40  LOG4CPLUS_DEBUG(logger, "Executing merger with args: " << ss.str());
41 }
42 } // namespace
43 
44 
45 std::ostream& operator<<(std::ostream& os, DaqController const& daq) {
46  os << "DAQ{" << daq.GetStatus() << "}";
47  return os;
48 }
49 
51  Workspace& workspace,
52  DaqControllerFactory daq_controller_factory,
53  SchedulerOptions const& options)
54  : m_executor(executor)
55  , m_workspace(workspace)
56  , m_daq_controller_factory(std::move(daq_controller_factory))
57  , m_options(options)
58  , m_logger(log4cplus::Logger::getInstance(LOGGER_NAME_SCHEDULER))
59  , m_liveness(std::make_shared<bool>(false)) {
60  m_queue = m_workspace.LoadQueue();
61  auto slot = [liveness = std::weak_ptr<bool>(m_liveness), this]() {
62  if (auto ptr = liveness.lock(); ptr) {
63  // Scheduler still alive
64  DeferredPoll();
65  }
66  };
67  auto& conns = m_resources_connections;
68  conns.daqs = m_resources.daqs.Connect(slot);
69  conns.net_receive = m_resources.net_receive.Connect(slot);
70  conns.merge = m_resources.merge.Connect(slot);
71  conns.net_send = m_resources.net_send.Connect(slot);
72 }
73 
75  m_stopped = false;
76  DeferredPoll();
77 }
78 
80  m_stopped = true;
81 }
82 
83 std::string SchedulerImpl::QueueDaq(std::string const& dp_spec_serialized) {
84  LOG4CPLUS_TRACE(m_logger, "QueueDaq()");
85  try {
86  auto json = nlohmann::json::parse(dp_spec_serialized);
87  json::DpSpec dp_spec = json::ParseDpSpec(json);
88  std::string const& id = dp_spec.id;
89  std::string const& file_id = dp_spec.target.file_id;
90  if (IsQueued(id)) {
91  LOG4CPLUS_ERROR(m_logger, "QueueDaq(): DAQ conflict detected -> aborting");
92  throw std::invalid_argument(
93  fmt::format("Scheduler: Could not queue DAQ for merging as "
94  "a Data Acquisition with same id has been queued before: '{}'",
95  id));
96  }
97  // New daq for this workspace -> Initialize persistent state.
98  try {
99  LOG4CPLUS_INFO(m_logger,
100  fmt::format("QueueDaq(): Initializing new workspace for DAQ {}", id));
101  auto daq_ws = m_workspace.InitializeDaq(id);
102  assert(daq_ws);
103  try {
104  assert(daq_ws);
105  Status initial_status;
106  initial_status.id = id;
107  initial_status.file_id = file_id;
108  // Initial state in DPM is Scheduled as that's where DPM takes over.
109  initial_status.state = State::Scheduled;
110  initial_status.error = false;
111  initial_status.timestamp = Status::TimePoint::clock::now();
112 
113  daq_ws->StoreStatus(initial_status);
114  daq_ws->StoreSpecification(dp_spec_serialized);
115 
116  try {
117  m_queue.push_back(id);
118  // Finally update the backlog
119  m_workspace.StoreQueue(m_queue);
120 
121  DeferredPoll();
122  return id;
123  } catch (...) {
124  // Undo push
125  m_queue.pop_back();
126  std::throw_with_nested(std::runtime_error("Failed to store DAQ queue"));
127  }
128  } catch (...) {
129  std::throw_with_nested(
130  std::runtime_error("Failed to write status to DAQ workspace"));
131  }
132  } catch (...) {
133  // Undo creation of DAQ workspace
134  m_workspace.RemoveDaq(id);
135  std::throw_with_nested(std::runtime_error(fmt::format(
136  "Failed to initialize DAQ workspace in {}", m_workspace.GetPath().native())));
137  }
138  } catch (json::DpSpecError const&) {
139  LOG4CPLUS_DEBUG(m_logger,
140  "Invalid Data Product Specification provided: \n"
141  << dp_spec_serialized);
142 
143  std::throw_with_nested(
144  std::invalid_argument("Scheduler: Could not queue DAQ for merging because "
145  "Data Product Specification is invalid"));
146  } catch (nlohmann::json::parse_error const&) {
147  std::throw_with_nested(
148  std::invalid_argument("Scheduler: Could not queue DAQ for merging because provided "
149  "data product specification is invalid JSON"));
150  }
151 }
152 
153 bool SchedulerImpl::IsQueued(std::string const& id) const noexcept {
154  return std::find(m_queue.begin(), m_queue.end(), id) != m_queue.end();
155 }
156 
157 Status SchedulerImpl::GetDaqStatus(std::string const& id) const try {
158  LOG4CPLUS_TRACE(m_logger, fmt::format("GetDaqStatus({})", id));
159  if (!IsQueued(id)) {
160  // Try to find it in the archive.
161  auto maybe_status = m_workspace.LoadArchivedStatus(id);
162  if (maybe_status) {
163  return *maybe_status;
164  } else {
165  LOG4CPLUS_INFO(m_logger, fmt::format("GetDaqStatus({}): No such ID", id));
166  throw std::invalid_argument(fmt::format("DAQ with id='{}' not found", id));
167  }
168  }
169  return m_workspace.LoadDaq(id)->LoadStatus();
170 } catch (...) {
171  std::throw_with_nested(std::runtime_error("Scheduler: GetDaqStatus() failed"));
172 }
173 
174 std::vector<std::string> SchedulerImpl::GetQueue() const noexcept {
175  return m_queue;
176 }
177 
178 void SchedulerImpl::AbortDaq(std::string const& id) try {
179  LOG4CPLUS_TRACE(m_logger, fmt::format("AbortDaq({})", id));
180  // - If DAQ is either being released or is already in final state it cannot be aborted.
181  // - If DAQ is active must first be made inactive by destroying DaqController which will also
182  // terminate started processes.
183  // - If DAQ is not active aborting simply means to erase DAQ from filesystem and queue.
184  if (!IsQueued(id)) {
185  LOG4CPLUS_INFO(m_logger, fmt::format("AbortDaq({}): No such ID", id));
186  throw std::invalid_argument(fmt::format("DAQ with id='{}' not found", id));
187  }
188 
189  // If DAQ is active/in-progress we must first delete it
190  auto it = std::find_if(m_active.begin(), m_active.end(), [&id](Active const& active) {
191  assert(active.daq);
192  return id == active.daq->GetId();
193  });
194 
195  // Status to update with final aborted state
196  auto pub = [&, status = m_workspace.LoadDaq(id)->LoadStatus()](State new_state) mutable {
197  status.state = State::AbortingMerging;
198  m_status_signal(status);
199  };
200 
201  if (it != m_active.end()) {
202  auto state = it->daq->GetState();
203  if (IsAbortableState(state)) {
204  LOG4CPLUS_DEBUG(m_logger,
205  fmt::format("AbortDaq({}): Cannot abort in state {}", id, state));
206  }
207  // Erase if active
208  LOG4CPLUS_DEBUG(
209  m_logger,
210  fmt::format("AbortDaq({}): Erasing active DAQ currently in state {}", id, state));
211  m_active.erase(it);
212  }
213 
214  // Publish that we're aborting
216 
217  // Erase id from queue
218  LOG4CPLUS_DEBUG(m_logger, fmt::format("AbortDaq({}): Removing DAQ from merge queue", id));
219  m_queue.erase(std::find(m_queue.begin(), m_queue.end(), id));
220  m_workspace.StoreQueue(m_queue);
221 
222  // Delete workspace
223  LOG4CPLUS_DEBUG(m_logger, fmt::format("AbortDaq({}): Removing DAQ workspace", id));
224  m_workspace.RemoveDaq(id);
225 
226  // Publish that DAQ is aborted
227  pub(State::Aborted);
228 
229  // Queue has changed -> Poll
230  DeferredPoll();
231  LOG4CPLUS_INFO(m_logger, fmt::format("AbortDaq({}): Aborted and removed DAQ", id));
232 } catch (...) {
233  std::throw_with_nested(std::runtime_error("Scheduler: AbortDaq() failed"));
234 }
235 
236 boost::signals2::connection SchedulerImpl::ConnectStatus(StatusSignal::slot_type const& slot) {
237  return m_status_signal.connect(slot);
238 }
239 
240 void SchedulerImpl::Poll() {
241  LOG4CPLUS_TRACE(m_logger, "Poll()");
242  ArchiveCompleted();
243  ActivateFromQueue();
244 }
245 
246 void SchedulerImpl::ArchiveCompleted() {
247  // Archive completed DAQs
248  for (auto& active : m_active) { // auto it = remove_it; it != m_active.end(); ++it) {
249  try {
250  if (!IsFinalState(active.daq->GetState())) {
251  continue;
252  }
253  auto id = active.daq->GetId();
254 
255  auto archive_path = m_workspace.ArchiveDaq(id);
256  LOG4CPLUS_INFO(
257  m_logger,
258  fmt::format("DAQ {} is in final state {} -> moved workspace to archive: {}",
259  id,
260  active.daq->GetState(),
261  archive_path.native()));
262 
263  // Delete controller
264  active.daq.reset();
265 
266  // Remove ID from queue
267  m_queue.erase(std::find(m_queue.begin(), m_queue.end(), id));
268  m_workspace.StoreQueue(m_queue);
269  } catch (...) {
270  // @todo: Decide what to do here...
271  LOG4CPLUS_ERROR(m_logger,
272  "Failed to archive DAQ workspace:\n"
273  << error::NestedExceptionReporter(std::current_exception()));
274  }
275  }
276 
277  // Erase removed daqs
278  auto remove_it = std::remove_if(
279  m_active.begin(), m_active.end(), [&](auto const& active) { return !active.daq; });
280  if (remove_it != m_active.end()) {
281  m_active.erase(remove_it, m_active.end());
282  // Since this freed resources we schedule poll() to be run again.
283  DeferredPoll();
284  }
285 }
286 
287 void SchedulerImpl::ActivateFromQueue() {
288  // Update active DAQs
289  // See if a new DAQ can be active.
290  if (m_queue.empty()) {
291  LOG4CPLUS_INFO(m_logger, "Merge queue empty - all done!");
292  return;
293  }
294 
295  auto candidates = GetCandidates();
296  if (candidates.empty()) {
297  LOG4CPLUS_INFO(m_logger, "All DAQ merge candidates are active/in-progress");
298  return;
299  }
300  // Schedule as many candidates as allowed by resource limits
301  try {
302  for (auto const& id : candidates) {
303  LOG4CPLUS_TRACE(m_logger, fmt::format("{}: Attempting to schedule DAQ", id));
304  auto maybe_token = m_resources.daqs.Acquire();
305  if (!maybe_token) {
306  LOG4CPLUS_INFO(m_logger,
307  fmt::format("Limit reached: Cannot schedule '{}' "
308  "Currently active DAQs at/exceed limit. "
309  "current: {}, limit: {}, queue size: {}",
310  id,
311  m_resources.daqs.GetUsed(),
312  m_resources.daqs.GetLimit(),
313  m_queue.size()));
314  return;
315  }
316 
317  try {
318  // @todo: Store token with DAQ?
319  auto ws = m_workspace.LoadDaq(id);
320  auto daq_controller = m_daq_controller_factory(std::move(ws), m_resources);
321 
322  // Aggregate signal
323  // both DaqController and slot are owned by Scheduler so we don't need
324  // to manage connection lifetime.
325  daq_controller->GetStatus().ConnectStatus([&](ObservableStatus const& s) {
326  // Completed DAQs should be archived so we schedule a poll in that case.
327  if (IsFinalState(s.GetState())) {
328  DeferredPoll();
329  }
330  // Just forward status
331  m_status_signal(s.GetStatus());
332  });
333  // Tell DaqController to start
334  daq_controller->Start();
335 
336  m_active.emplace_back(std::move(daq_controller), std::move(*maybe_token));
337 
338  LOG4CPLUS_DEBUG(m_logger, fmt::format("{}: DAQ scheduled for merging", id));
339  // Schedule new Poll() as queue has been modified.
340  DeferredPoll();
341  } catch (...) {
342  std::throw_with_nested(
343  std::runtime_error(fmt::format("{}: Failed to activate DAQ for merging.", id)));
344  }
345  }
346  } catch (...) {
347  std::throw_with_nested(std::runtime_error("Failed to load "));
348  }
349 }
350 
351 void SchedulerImpl::DeferredPoll() {
352  if (m_stopped || *m_liveness) {
353  // Stopped or already scheduled
354  return;
355  }
356  boost::asio::post(m_executor.get_io_context().get_executor(),
357  [liveness = std::weak_ptr<bool>(m_liveness), this]() {
358  if (auto ptr = liveness.lock(); ptr) {
359  // Scheduler still alive
360  // Clear flag before invoking Poll so that another
361  // poll can be scheduled.
362  *ptr = false;
363  Poll();
364  }
365  });
366 }
367 
368 std::vector<std::string> SchedulerImpl::GetCandidates() const {
369  // Return all DAQs in m_queue that is not in m_active.
370  std::vector<std::string> candidates;
371  for (auto const& id : m_queue) {
372  auto it = std::find_if(m_active.begin(), m_active.end(), [&](auto const& active) {
373  assert(active.daq);
374  return active.daq->GetId() == id;
375  });
376  if (it == m_active.end()) {
377  candidates.push_back(id);
378  }
379  }
380  return candidates;
381 }
382 
383 DaqControllerImpl::DaqControllerImpl(rad::IoExecutor& executor,
384  std::unique_ptr<DaqWorkspace> workspace,
385  Resources& resources,
386  RsyncFactory rsync_factory,
387  ProcFactory proc_factory,
388  DaqControllerOptions options)
389  : m_executor(executor)
390  , m_workspace(std::move(workspace))
391  , m_resources(resources)
392  , m_rsync_factory(std::move(rsync_factory))
393  , m_proc_factory(std::move(proc_factory))
394  , m_options(std::move(options))
395  , m_dpspec(m_workspace->LoadSpecification())
396  , m_result()
397  , m_state_ctx(Scheduled{}) // m_state_ctx is updated in body with actual state.
398  , m_status(m_workspace->LoadStatus())
399  , m_status_connection()
400  , m_liveness(std::make_shared<bool>(false))
401  , m_stopped(true)
402  , m_logger(log4cplus::Logger::getInstance(LOGGER_NAME_CONTROLLER)) {
403  if (m_options.merge_bin.empty()) {
404  throw std::invalid_argument("Specified merge application name is empty");
405  }
406  if (m_options.rsync_bin.empty()) {
407  throw std::invalid_argument("Specified rsync application name is empty");
408  }
409 
410  // If result has already been created it is contained in the stored status.
411  // Otherwise we set to-be-used absolute path
412  if (m_status.GetStatus().result.empty()) {
413  m_result = m_workspace->GetResultPath() /
414  (m_dpspec.target.file_prefix + m_dpspec.target.file_id + ".fits");
415  } else {
416  m_result = m_status.GetStatus().result;
417  }
418 
419  // Update m_state from loaded status.
420  auto error = m_status.GetError();
421  switch (m_status.GetState()) {
422  case State::Scheduled:
423  SetState(Scheduled{}, error);
424  break;
425  case State::Collecting:
426  SetState(Collecting{SourceResolver(m_workspace->LoadSourceLookup())}, error);
427  break;
428  case State::Merging:
429  SetState(Merging(), error);
430  break;
431  case State::Releasing:
432  SetState(Releasing{}, error);
433  break;
434  case State::Completed:
435  SetState(Completed{}, error);
436  break;
437  default:
438  throw std::runtime_error("Not implemented");
439  };
440 
441  m_status_connection = m_status.ConnectStatus([this](ObservableStatus const& status) mutable {
442  if (status.ChangesSinceLastSignal() == 0u) {
443  // No changes -> don't write
444  LOG4CPLUS_TRACE(
445  m_logger,
446  fmt::format(
447  "DaqController: No change -> skipping write of workspace status file: {}",
448  status));
449  return;
450  }
451  LOG4CPLUS_TRACE(m_logger,
452  fmt::format("DaqController: Updating workspace status file: {}", status));
453  m_workspace->StoreStatus(status);
454  });
455 }
456 
458  m_stopped = false;
459  DeferredPoll();
460 }
461 
463  m_stopped = true;
464 }
465 
466 auto DaqControllerImpl::IsStopped() const noexcept -> bool {
467  return m_stopped;
468 }
469 
470 auto DaqControllerImpl::GetId() const noexcept -> std::string const& {
471  return m_status.GetId();
472 }
473 
474 auto DaqControllerImpl::GetErrorFlag() const noexcept -> bool {
475  return m_status.GetError();
476 }
477 
478 auto DaqControllerImpl::GetState() const noexcept -> State {
479  return m_status.GetState();
480 }
481 
483  return m_status;
484 }
485 
486 auto DaqControllerImpl::GetStatus() const noexcept -> ObservableStatus const& {
487  return m_status;
488 }
489 
490 void DaqControllerImpl::DeferredPoll() {
491  if (m_stopped || *m_liveness) {
492  // Stopped or already scheduled
493  return;
494  }
495  boost::asio::post(m_executor.get_io_context().get_executor(),
496  [liveness = std::weak_ptr<bool>(m_liveness), this]() {
497  if (auto ptr = liveness.lock(); ptr) {
498  // Scheduler still alive
499  // Clear flag before invoking Poll so that another
500  // poll can be scheduled.
501  *ptr = false;
502  Poll();
503  }
504  });
505 }
506 
508  LOG4CPLUS_TRACE(m_logger, "Poll()");
509  if (m_stopped) {
510  LOG4CPLUS_DEBUG(
511  m_logger,
512  fmt::format("{}: Poll() - DaqController is stopped so nothing will be done", *this));
513  // Stopped or already scheduled
514  return;
515  }
516  std::visit([this](auto& state) { Poll(state); }, m_state_ctx);
517 }
518 
519 void DaqControllerImpl::Poll(DaqControllerImpl::Scheduled&) {
520  LOG4CPLUS_TRACE(m_logger, "Poll(Scheduled)");
521  try {
522  // Create list of remote sources and build mapping to local files.
523  std::vector<json::FitsFileSource> sources;
524  if (m_dpspec.target.source) {
525  sources.push_back(*m_dpspec.target.source);
526  }
527  for (auto const& s : m_dpspec.sources) {
528  if (!std::holds_alternative<json::FitsFileSource>(s)) {
529  continue;
530  }
531  sources.push_back(std::get<json::FitsFileSource>(s));
532  }
533 
534  auto sources_path = m_workspace->GetSourcesPath();
535 
536  SourceResolver resolver;
537  unsigned index = 0;
538  for (auto const& s : sources) {
539  json::Location location = json::ParseSourceLocation(s.location);
540  // Use <index>_<source>_<filename> as local filename.
541  auto local_path =
542  sources_path /
543  fmt::format("{}_{}_{}", index, s.source_name, location.path.filename().native());
544  LOG4CPLUS_INFO(m_logger,
545  fmt::format("Poll(Scheduled): Source file '{}' from source \"{}\" "
546  "on host \"{}\" will be stored in {}",
547  location.path,
548  s.source_name,
549  !location.host.empty() ? location.host.c_str() : "<n/a>",
550  local_path));
551 
552  resolver.Add({s.source_name, s.location}, local_path);
553  }
554 
555  // Store source resolution mapping.
556  m_workspace->StoreSourceLookup(resolver.GetMapping());
557 
558  SetState(Collecting(std::move(resolver)));
559  } catch (...) {
560  auto msg =
561  fmt::format("{}: Failed to collect and store list of required sources", m_status);
562  LOG4CPLUS_ERROR(m_logger, fmt::format("Poll(Scheduled): Failed to process DAQ: {}", msg));
563  // No point in retrying these kinds of errors.
564  Stop();
565  std::throw_with_nested(std::runtime_error(msg));
566  }
567 }
568 
569 void DaqControllerImpl::Poll(Collecting& ctx) {
570  LOG4CPLUS_TRACE(m_logger, "Poll(Collecting)");
571  try {
572  // Check if any source is missing and start transfers if possible
573  // If all transfers are done transition to State::Merging.
574 
575  auto const& sources = ctx.resolver.GetMapping();
576  auto missing = sources.size();
577  auto root = m_workspace->GetPath();
578  for (auto const& source : sources) {
579  if (ctx.HasTransfer(source.first)) {
580  // Transfer already in progress
581  continue;
582  }
583  // [recovery]
584  if (m_workspace->Exists(source.second)) {
585  missing--;
586  // File already exist
587  continue;
588  }
589  if (m_soft_stop) {
590  // If there are errors we don't initiate new transfers
591  LOG4CPLUS_TRACE(m_logger,
592  "Poll(Collecting): Soft stop is enabled"
593  "-> won't start new transfer");
594  continue;
595  }
596  // We need to transfer the file, try to start.
597  auto token = m_resources.net_receive.Acquire();
598  if (token) {
599  // We got the token, so we can start transfer
600  json::Location location = json::ParseSourceLocation(source.first.location);
601  using Proc = std::shared_ptr<RsyncAsyncProcessIf>;
602  RsyncOptions opts;
603  opts.rsync = m_options.rsync_bin;
604  Proc proc = m_rsync_factory(m_executor.get_io_context(),
605  location.RsyncPath(),
606  root / source.second,
607  opts,
609 
610  auto last_lines = std::make_unique<LogCaptureLast>(
611  log4cplus::Logger::getInstance(LOGGER_NAME_TRANSFER), 10);
612  // Connect signals (stdout is ignored for now as it merely contains the progress
613  // which will be handled differently later on).
614  proc->ConnectStderr(std::ref(*last_lines));
615  // Start transfer and set up completion handler
616  proc->Initiate().then(
617  m_executor,
618  [liveness = std::weak_ptr<bool>(m_liveness),
619  source = source.first,
620  dest = source.second,
621  proc = proc,
622  last_lines = std::move(last_lines),
623  this](boost::future<int> f) {
624  if (liveness.expired()) {
625  LOG4CPLUS_ERROR("dpm.daqcontroller",
626  fmt::format("DaqController abandoned -> ignoring "
627  "result from rsync for transfer of {}",
628  source));
629  return;
630  }
631  TransferComplete(source, dest, std::move(f), *last_lines);
632  });
633  ctx.transfers.emplace_back(Collecting::Transfer{
634  source.first, source.second, std::move(proc), std::move(*token)});
635  } else {
636  // Out of tokens -> bail out
637  LOG4CPLUS_TRACE(m_logger,
638  fmt::format("Poll(Collecting): Could not start transfer due to "
639  "resource limit reached: {}",
640  source.first));
641  return;
642  }
643  }
644 
645  if (missing == 0) {
646  // If all transfers are complete we can transition to next state
647  SetState(Merging(), false);
648  return;
649  }
650  } catch (...) {
651  SetError(true);
652  // No point in retrying these kinds of errors.
653  auto msg = fmt::format("{}: Failed to transfer required sources", m_status);
654  LOG4CPLUS_ERROR(m_logger,
655  fmt::format("Poll(Collecting): Failed to process DAQ: {}", msg));
656  Stop();
657  std::throw_with_nested(std::runtime_error(msg));
658  }
659 }
660 
661 void DaqControllerImpl::TransferComplete(SourceResolver::SourceFile const& source,
662  std::filesystem::path const& local_path,
663  boost::future<int> result,
664  LogCaptureLast const& log) noexcept {
665  LOG4CPLUS_TRACE(m_logger,
666  fmt::format("{}: TransferComplete: {} -> {}", *this, source, local_path));
667  auto* ctx = std::get_if<Collecting>(&m_state_ctx);
668  // Multiple changes can be made here so we defer signalling until we've done them all.
669  auto defer = ObservableStatus::DeferSignal(&m_status);
670  try {
671  if (ctx) {
672  ctx->EraseTransfer(source);
673  }
674 
675  int return_code = result.get();
676  auto alert_id = MakeAlertId(alert::COLLECTING_RSYNC, local_path);
677  if (return_code != 0) {
678  m_status.SetAlert(MakeAlert(
679  alert_id,
680  fmt::format("rsync file transfer failed for remote file {}. Last lines: \n{}",
681  source,
682  log)));
683  LOG4CPLUS_ERROR(m_logger, fmt::format("rync file transfer failed: {}", source));
684  throw std::runtime_error("rsync failed to transfer file");
685  }
686  // Opportunistically clear alert in case it as set from previous attempt
687  m_status.ClearAlert(alert_id);
688  LOG4CPLUS_INFO(m_logger,
689  fmt::format("{}: Poll(Collecting): Successfully transfer file: {} -> {}",
690  *this,
691  source,
692  local_path));
693 
694  // Transfer ok
695  DeferredPoll();
696  return;
697  } catch (...) {
698  SetError(true);
699  LOG4CPLUS_ERROR(m_logger,
700  fmt::format("{}: Poll(Collecting): Failed to transfer file: {} -> {}",
701  *this,
702  source,
703  local_path));
704  // Stopping DAQ can be done when there's no more pending file transfers (we let them
705  // complete).
706  if (ctx && ctx->transfers.empty()) {
707  LOG4CPLUS_ERROR(
708  m_logger,
709  fmt::format("{}: Poll(Collecting): All pending transfers have completed "
710  "(with or without error) so we stop DAQ",
711  *this));
712  Stop();
713  }
714  }
715 }
716 
717 bool DaqControllerImpl::Collecting::HasTransfer(
718  SourceResolver::SourceFile const& source) const noexcept {
719  auto it = std::find_if(transfers.cbegin(), transfers.cend(), [&](Transfer const& t) {
720  return t.source == source;
721  });
722  return it != transfers.cend();
723 }
724 
725 DaqControllerImpl::Collecting::Transfer*
726 DaqControllerImpl::Collecting::GetTransfer(SourceResolver::SourceFile const& source) noexcept {
727  auto it = std::find_if(
728  transfers.begin(), transfers.end(), [&](Transfer const& t) { return t.source == source; });
729 
730  if (it != transfers.end()) {
731  return &(*it);
732  }
733  return nullptr;
734 }
735 
736 void DaqControllerImpl::Collecting::EraseTransfer(SourceResolver::SourceFile const& source) {
737  auto it = std::find_if(transfers.cbegin(), transfers.cend(), [&](Transfer const& t) {
738  return t.source == source;
739  });
740  if (it != transfers.end()) {
741  transfers.erase(it);
742  }
743 }
744 
745 DaqControllerImpl::Collecting::~Collecting() noexcept {
746  for (auto& transfer : transfers) {
747  if (transfer.proc && transfer.proc->IsRunning()) {
748  LOG4CPLUS_INFO(
749  "dpm",
750  "DaqController::Collecting::~Collecting: Terminating running transfer process");
751  transfer.proc->Signal(SIGTERM);
752  }
753  }
754 }
755 
756 DaqControllerImpl::Merging::~Merging() noexcept {
757  if (merger && merger->IsRunning()) {
758  LOG4CPLUS_DEBUG("dpm", "DaqController::Merging::~Merging: Aborting running merge process");
759  merger->Signal(SIGTERM);
760  }
761 }
762 
763 DaqControllerImpl::Releasing::~Releasing() noexcept {
764  for (auto& p : transfers) {
765  if (p.second.proc && p.second.proc->IsRunning()) {
766  LOG4CPLUS_INFO(
767  "dpm",
768  "DaqController::Releasing::~Releasing: Terminating running transfer process");
769  p.second.proc->Signal(SIGTERM);
770  }
771  }
772 }
773 
774 void DaqControllerImpl::Poll(DaqControllerImpl::Merging& ctx) {
775  LOG4CPLUS_TRACE(m_logger, "Poll(Merging)");
776  auto defer = ObservableStatus::DeferSignal(&m_status);
777  try {
778  if (ctx.merger) {
779  // Merge is already in progress
780  return;
781  }
782  // [recovery]
783  if (m_workspace->Exists(m_result)) {
784  SetState(Completed{});
785  LOG4CPLUS_TRACE(m_logger,
786  fmt::format("{}: Poll(Merging): "
787  "Recovered from error automatically (manual merge)",
788  *this));
789  return;
790  }
791  auto token = m_resources.merge.Acquire();
792  if (!token) {
793  LOG4CPLUS_TRACE(m_logger,
794  fmt::format("{}: Poll(Merging): Could not start merging due to "
795  "resource limit reached: {}",
796  *this,
797  m_resources.merge.GetLimit()));
798 
799  return;
800  }
801 
802  // Launch merge
803  // note: daqDpmMerge will atomically move result into output so we don't have
804  // to do that ourselves.
805  std::vector<std::string> args{m_options.merge_bin};
806  args.emplace_back("--json");
807  args.emplace_back("--root");
808  args.emplace_back(m_workspace->GetPath());
809  args.emplace_back("--resolver");
810  args.emplace_back(m_workspace->GetSourceLookupPath());
811  args.emplace_back("-o");
812  args.emplace_back(m_result.native());
813  // Positional argument for the specification which is passed as absolute path.
814  args.emplace_back(m_workspace->GetPath() / m_workspace->GetSpecificationPath());
815 
816  PrintArgs(m_logger, args);
817 
818  ctx.merger = m_proc_factory(m_executor.get_io_context(), args);
819 
820  ctx.merger->ConnectStdout([logger = log4cplus::Logger::getInstance(LOGGER_NAME_MERGER),
821  this](pid_t pid, std::string const& line) {
822  LOG4CPLUS_DEBUG(logger, pid << ": " << Trim(line));
823  HandleMergeMessage(line);
824  });
825  ctx.merger->ConnectStderr([logger = log4cplus::Logger::getInstance(LOGGER_NAME_MERGER)](
826  pid_t pid, std::string const& line) {
827  LOG4CPLUS_INFO(logger, pid << ": " << Trim(line));
828  });
829 
830  ctx.merger->Initiate().then(
831  m_executor,
832  [liveness = std::weak_ptr<bool>(m_liveness),
833  id = m_status.GetId(),
834  proc = ctx.merger,
835  this](boost::future<int> f) {
836  if (liveness.expired()) {
837  LOG4CPLUS_ERROR(LOGGER_NAME_SCHEDULER,
838  fmt::format("{}: DaqController abandoned -> ignoring "
839  "result from merger",
840  id));
841  return;
842  }
843  MergeComplete(std::move(f));
844  });
845  ctx.token = std::move(*token);
846  } catch (...) {
847  Stop();
848  auto msg = fmt::format("{}: Failed to initiate merging", *this);
849  LOG4CPLUS_ERROR(m_logger, msg);
850  std::throw_with_nested(std::runtime_error(msg));
851  }
852 }
853 
854 void DaqControllerImpl::MergeComplete(boost::future<int> result) noexcept {
855  LOG4CPLUS_TRACE(m_logger, "MergeComplete()");
856  // Defer signal until all changes have been made
857  auto defer = ObservableStatus::DeferSignal(&m_status);
858  try {
859  auto alert_id = MakeAlertId(alert::MERGING_MERGE, "");
860  auto* ctx = std::get_if<Merging>(&m_state_ctx);
861  if (ctx) {
862  ctx->Reset();
863  }
864  auto exit_code = result.get();
865  if (exit_code != 0) {
866  auto msg = fmt::format("Merging failed with code {}", exit_code);
867  m_status.SetAlert(MakeAlert(alert_id, msg));
868  throw std::runtime_error(fmt::format("Merging failed with code {}", exit_code));
869  }
870  if (!m_workspace->Exists(m_result)) {
871  auto abs_path = m_workspace->GetPath() / m_result;
872  auto msg =
873  fmt::format("Merging reported with success but file is not found: {}", abs_path);
874  m_status.SetAlert(MakeAlert(alert_id, msg));
875  LOG4CPLUS_ERROR(m_logger, msg);
876  throw std::runtime_error(msg);
877  }
878  m_status.ClearAlert(alert_id);
879 
880  // Create symlink and set result
881  m_workspace->MakeResultSymlink(m_result);
882  // @todo: Include host?
883  m_status.SetResult(m_result.native());
884 
885  // All good -> transition to State::Completed
886  // @todo: Transition to Releasing rather than completed
887  LOG4CPLUS_INFO(m_logger, fmt::format("{}: Merge completed successfully!", *this));
888  SetState(Releasing{}, m_status.GetError());
889  } catch (...) {
890  SetError(true);
891  Stop();
892  LOG4CPLUS_ERROR(
893  m_logger,
894  fmt::format(
895  "{}: PollMergeComplete: Failed to create data product: {}", *this, m_result));
896  }
897 }
898 
899 bool DaqControllerImpl::TryStartRelease(Releasing& ctx,
900  json::ReceiverTypes const& receiver,
901  std::size_t index) {
902  // Start release transfer according to configuration
903 
904  if (auto* cfg = std::get_if<json::OlasReceiver>(&receiver); cfg) {
905  return TryStartRelease(ctx, *cfg, index);
906  } else {
907  LOG4CPLUS_ERROR(
908  m_logger,
909  fmt::format("Receiver[{}]: TryStartRelease: Unknown receiver type encountered.",
910  index));
911  }
912 
913  return true;
914 }
915 
916 bool DaqControllerImpl::TryStartRelease(Releasing& ctx,
917  json::OlasReceiver const& receiver,
918  std::size_t index) {
919  LOG4CPLUS_TRACE(
920  m_logger,
921  fmt::format("Receiver[{}]: TryStartRelease: Will try to start transfer to receiver",
922  index));
923  namespace fs = std::filesystem;
924  if (receiver.host.empty()) {
925  // Local host case
926  //
927  // Try first to create a hardlink, then symlink, which if it works we don't need to copy
928  // the file (in the case of symlink OLAS is responsible for copying).
929  //
930  // Failure to crate hardlink may include:
931  // - Filesystem doesn't support it.
932  // - Different filesystem between link and target.
933  // - Link is in a directory that does not exist (rsync might create it but we don't)
934 
935  // note: We cannot simply use result filename as this may contain custom prefixes.
936  auto link = receiver.path / GetArchiveFilename();
937  if (std::error_code ec = m_workspace->MakeHardLink(m_result, link); !ec) {
938  // Mark as completed
939  m_status.SetReceiverStatus(index, ReceiverStatus{ReceiverStatus::State::Success});
940  DeferredPoll();
941  LOG4CPLUS_INFO(
942  m_logger,
943  fmt::format("Receiver[{}]: Hardlink created from {} -> {}", index, m_result, link));
944  return true;
945  } else if (receiver.options.allow_symlink && !m_workspace->MakeSymlink(m_result, link)) {
946  // Mark as completed
947  m_status.SetReceiverStatus(index, ReceiverStatus{ReceiverStatus::State::Success});
948  DeferredPoll();
949  LOG4CPLUS_INFO(
950  m_logger,
951  fmt::format("Receiver[{}]: Symlink created from {} -> {}", index, m_result, link));
952  return true;
953  } else {
954  // Failure to create hardlink is not an error.
955  LOG4CPLUS_TRACE(
956  m_logger,
957  fmt::format("Receiver[{}]: Hard or symlink could not be created from {} -> {}",
958  index,
959  m_result,
960  link));
961  }
962  }
963 
964  auto maybe_token = m_resources.net_send.Acquire();
965  if (!maybe_token) {
966  // Out of tokens -> bail out
967  LOG4CPLUS_TRACE(m_logger,
968  fmt::format("Receiver[{}]: Could not start rsync transfer due to "
969  "resource limit reached.",
970  index));
971  return false;
972  }
973  using Proc = std::shared_ptr<RsyncAsyncProcessIf>;
974  RsyncOptions opts;
975  opts.rsync = m_options.rsync_bin;
976 
977  auto const& path_from = m_result.native();
978  auto path_to = receiver.path / GetArchiveFilename();
979  auto location = receiver.host.empty() ? path_to.native()
980  : fmt::format("{}:{}", receiver.host, path_to.native());
981  Proc proc = m_rsync_factory(m_executor.get_io_context(),
982  path_from,
983  location,
984  opts,
986  auto last_lines =
987  std::make_unique<LogCaptureLast>(log4cplus::Logger::getInstance(LOGGER_NAME_TRANSFER), 10);
988 
989  // Connect signals (stdout is ignored for now as it merely contains the progress
990  // which will be handled differently later on).
991  proc->ConnectStderr(std::ref(*last_lines));
992  // Start transfer and set up completion handler
993  proc->Initiate().then(
994  m_executor,
995  [this,
996  proc,
997  index,
998  last_lines = std::move(last_lines),
999  liveness = std::weak_ptr<bool>(m_liveness)](boost::future<int> f) {
1000  if (liveness.expired()) {
1001  LOG4CPLUS_ERROR("dpm.daqcontroller",
1002  fmt::format("Receiver[{}]: DaqController abandoned -> ignoring "
1003  "result from rsync for receiver transfer "
1004  "of {}",
1005  index,
1006  *proc));
1007  return;
1008  }
1009  ReleaseComplete(index, *proc, std::move(f), *last_lines);
1010  });
1011  // Update DAQ status
1012  m_status.SetReceiverStatus(index, ReceiverStatus{ReceiverStatus::State::Started});
1013  // Add transfer
1014  ctx.transfers.emplace(index, Releasing::Transfer{std::move(proc), std::move(*maybe_token)});
1015  return true;
1016 }
1017 
1018 void DaqControllerImpl::Poll(Releasing& ctx) {
1019  LOG4CPLUS_TRACE(m_logger, "Poll(Releasing)");
1020 
1021  if (m_dpspec.receivers.empty()) {
1022  LOG4CPLUS_INFO(m_logger,
1023  "Poll(Releasing): No receivers configured -> transition to Completed");
1024  SetState(Completed{});
1025  return;
1026  }
1027 
1028  // Multiple changes can be made here so we defer signalling until we've done them all.
1029  auto defer = ObservableStatus::DeferSignal(&m_status);
1030  try {
1031  // For every non-started, non-completed receiver we initiate the process, limited by
1032  // available resources.
1033  // Once releasing is done (no more pending, transition to Completed.
1034 
1035  // Check status of each receiver as configured in dpspec
1036  // - Try to start non-started receiver transfer
1037  auto index = 0u;
1038  auto pending = 0;
1039  LOG4CPLUS_TRACE(m_logger,
1040  "Poll(Releasing): Number of receivers: " << m_dpspec.receivers.size());
1041 
1042  for (auto it = m_dpspec.receivers.begin(); it != m_dpspec.receivers.end(); ++it, ++index) {
1043  // Persistent status of receivers contain status of previously started transfers.
1044  auto receiver_status = m_status.GetReceiverStatus(index);
1045  if (receiver_status.state == ReceiverStatus::State::NotStarted) {
1046  LOG4CPLUS_TRACE(
1047  m_logger,
1048  "Poll(Releasing): Receiver[" << index << "] Will try to start receiver.");
1049  // No status for receiver -> Try to start it
1050  TryStartRelease(ctx, *it, index);
1051  pending++;
1052  } else if (!receiver_status.IsFinalState()) {
1053  LOG4CPLUS_TRACE(m_logger,
1054  "Poll(Releasing): Receiver["
1055  << index << "] already in progress: " << receiver_status.state);
1056  pending++;
1057  } else {
1058  LOG4CPLUS_TRACE(m_logger,
1059  "Poll(Releasing): Receiver[" << index << "] already completed with "
1060  << receiver_status.state);
1061  }
1062  }
1063 
1064  if (pending == 0 && ctx.transfers.empty()) {
1065  // Since we have no pending (to be started) and no active transfers we're done!
1066  LOG4CPLUS_INFO(
1067  m_logger,
1068  "Poll(Releasing): No release transfers remains -> transition to Completed");
1069  SetState(Completed{}, m_status.GetError());
1070  }
1071  } catch (...) {
1072  SetError(true);
1073  Stop();
1074 
1075  auto msg = fmt::format("{}: Failed to release ", m_status);
1076  LOG4CPLUS_ERROR(m_logger, fmt::format("Poll(Releasing): Failed to process DAQ: {}", msg));
1077  std::throw_with_nested(std::runtime_error(msg));
1078  }
1079 }
1080 
1081 void DaqControllerImpl::ReleaseComplete(std::size_t index,
1082  AsyncProcessIf const& proc,
1083  boost::future<int> result,
1084  LogCaptureLast const& lines) noexcept {
1085  auto defer = ObservableStatus::DeferSignal(&m_status);
1086  auto& ctx = std::get<Releasing>(m_state_ctx);
1087  // Remove transfer
1088  ctx.transfers.erase(index);
1089  try {
1090  auto code = result.get();
1091 
1092  LOG4CPLUS_DEBUG(
1093  m_logger,
1094  fmt::format(
1095  "{}: Receiver[{}]: Process completed with rc={}: {}", *this, index, code, proc));
1096 
1097  auto alert_id = MakeAlertId(alert::RELEASING_RSYNC, fmt::format("{}", index));
1098  if (code == 0) {
1099  m_status.ClearAlert(alert_id);
1100  m_status.SetReceiverStatus(index, ReceiverStatus{ReceiverStatus::State::Success});
1101  } else {
1102  SetError(true);
1103  m_status.SetReceiverStatus(index, ReceiverStatus{ReceiverStatus::State::Failure});
1104  auto msg = fmt::format(
1105  "Failed to release file using command: {}. Last lines:\n{}", proc, lines);
1106  m_status.SetAlert(MakeAlert(alert_id, msg));
1107  LOG4CPLUS_ERROR(
1108  m_logger,
1109  fmt::format("{}: Receiver[{}]: ReleaseComplete(): Failed to release file:\n{}",
1110  *this,
1111  index,
1112  msg));
1113  }
1114  } catch (std::exception const& e) {
1115  SetError(true);
1116  Stop();
1117  LOG4CPLUS_ERROR(
1118  m_logger,
1119  fmt::format("{}: Receiver[{}]: ReleaseComplete(): Failed to release file ({}): ",
1120  *this,
1121  index,
1122  proc,
1123  e.what()));
1124  }
1125 
1126  DeferredPoll();
1127 }
1128 
1129 void DaqControllerImpl::Poll(Completed&) {
1130  LOG4CPLUS_TRACE(m_logger, "Poll(Completed)");
1131  // We're done
1132  Stop();
1133 }
1134 
1135 void DaqControllerImpl::SetState(StateVariant s, bool error) {
1136  m_state_ctx = std::move(s);
1137  auto new_state = MakeState(m_state_ctx);
1138  if (new_state == m_status.GetState() && error == m_status.GetError()) {
1139  // No change
1140  return;
1141  }
1142 
1143  m_status.SetState(new_state, error);
1144  DeferredPoll();
1145 }
1146 
1147 void DaqControllerImpl::SetError(bool error) {
1148  if (error == m_status.GetError()) {
1149  // No change
1150  return;
1151  }
1152 
1153  m_status.SetError(error);
1154  m_soft_stop = true;
1155  DeferredPoll();
1156 }
1157 
1158 State DaqControllerImpl::MakeState(StateVariant const& s) {
1159  if (std::holds_alternative<Scheduled>(s)) {
1160  return State::Scheduled;
1161  }
1162  if (std::holds_alternative<Collecting>(s)) {
1163  return State::Collecting;
1164  }
1165  if (std::holds_alternative<Merging>(s)) {
1166  return State::Merging;
1167  }
1168  if (std::holds_alternative<Releasing>(s)) {
1169  return State::Releasing;
1170  }
1171  if (std::holds_alternative<Completed>(s)) {
1172  return State::Completed;
1173  }
1174 
1175  assert(false); // NOLINT Not implemented yet
1176 }
1177 
1178 DaqControllerImpl::Collecting::Collecting(SourceResolver resolver_arg)
1179  : resolver(std::move(resolver_arg)) {
1180 }
1181 
1182 void DaqControllerImpl::Merging::Reset() {
1183  merger.reset();
1184  token.reset();
1185 }
1186 
1187 void DaqControllerImpl::HandleMergeMessage(std::string const& line) noexcept try {
1188  // A valid JSON message on stdout has the basic form:
1189  // {"type": "<content-type>",
1190  // "timestamp": <int64 nanonseconds since epoch>,
1191  // "content": <content>}
1192  auto json = nlohmann::json::parse(line);
1193  auto const& type = json.at("type").get<std::string>();
1194  auto const& content = json.at("content");
1195  if (type == "alert") {
1196  // Content schema:
1197  // {"id", "ID",
1198  // "message", "MESSAGE"}
1199  std::string id;
1200  std::string message;
1201  content.at("id").get_to(id);
1202  content.at("message").get_to(message);
1203  auto alert_id = MakeAlertId(alert::MERGING_MERGE, id);
1204  m_status.SetAlert(MakeAlert(alert_id, message));
1205  }
1206 } catch (...) {
1207  LOG4CPLUS_DEBUG(m_logger, "Failed to parse JSON message from merger");
1208 }
1209 
1210 std::string DaqControllerImpl::GetArchiveFilename() const {
1211  if (m_status.GetFileId().empty()) {
1212  throw std::runtime_error(fmt::format("{}: DAQ status has no file_id!", m_status));
1213  }
1214  return fmt::format("{}.fits", m_status.GetFileId());
1215 }
1216 
1217 } // namespace daq::dpm
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:210
State GetState() const noexcept
Definition: status.cpp:270
void SetError(bool error) noexcept
Set error flag for data acquisition.
Definition: status.cpp:309
void SetReceiverStatus(std::size_t index, ReceiverStatus status)
Set receiver status.
Definition: status.cpp:291
Status const & GetStatus() const noexcept
Connect observer that is invoked when state is modified.
Definition: status.cpp:357
void SetState(State s, std::optional< bool > error=std::nullopt) noexcept
Set state of data acquisition.
Definition: status.cpp:278
std::string const & GetFileId() const noexcept
Definition: status.cpp:266
bool GetError() const noexcept
Definition: status.cpp:274
boost::signals2::connection ConnectStatus(SignalType::slot_type const &slot)
Connect observer that is invoked when state is modified.
Definition: status.hpp:386
ReceiverStatus GetReceiverStatus(std::size_t index) const noexcept
Get receiver status.
Definition: status.cpp:301
friend class DeferSignal
Definition: status.hpp:408
unsigned ChangesSinceLastSignal() const noexcept
Query number of changes made since last signal.
Definition: status.hpp:397
std::string const & GetId() const noexcept
Definition: status.cpp:254
boost::signals2::connection Connect(Signal::slot_type const &slot)
Connect to signal that is emitted when a resource become available.
std::optional< ResourceToken > Acquire() noexcept
unsigned GetLimit() const noexcept
unsigned GetUsed() const noexcept
auto IsStopped() const noexcept -> bool override
Definition: scheduler.cpp:466
auto GetState() const noexcept -> State override
Definition: scheduler.cpp:478
auto GetId() const noexcept -> std::string const &override
Definition: scheduler.cpp:470
void Start() override
Start/stop operations.
Definition: scheduler.cpp:457
auto GetErrorFlag() const noexcept -> bool override
Definition: scheduler.cpp:474
auto GetStatus() noexcept -> ObservableStatus &override
Definition: scheduler.cpp:482
std::function< std::unique_ptr< AsyncProcessIf >(boost::asio::io_context &, std::vector< std::string > const &)> ProcFactory
Definition: scheduler.hpp:443
std::function< std::unique_ptr< RsyncAsyncProcessIf >(boost::asio::io_context &, std::string const &, std::string const &, RsyncOptions const &, RsyncAsyncProcess::DryRun)> RsyncFactory
Definition: scheduler.hpp:440
Controller for specific DAQ.
Definition: scheduler.hpp:60
std::function< std::unique_ptr< DaqController >(std::unique_ptr< DaqWorkspace >, Resources &)> DaqControllerFactory
Definition: scheduler.hpp:296
std::string QueueDaq(std::string const &dp_spec) override
Queues DAQ for processing.
Definition: scheduler.cpp:83
boost::signals2::connection ConnectStatus(StatusSignal::slot_type const &slot) override
Signals.
Definition: scheduler.cpp:236
SchedulerImpl(rad::IoExecutor &executor, Workspace &workspace, DaqControllerFactory daq_controller_factory, SchedulerOptions const &options)
Constructs a scheduler loading information from workspace ws.
Definition: scheduler.cpp:50
void Stop() override
Definition: scheduler.cpp:79
void AbortDaq(std::string const &) override
Abort merging DAQ identified by id.
Definition: scheduler.cpp:178
std::vector< std::string > GetQueue() const noexcept override
Queries current DAQ queue.
Definition: scheduler.cpp:174
void Start() override
Start/stop operations.
Definition: scheduler.cpp:74
Status GetDaqStatus(std::string const &id) const override
Queries current DAQ status, possibly from last recorded status in workspace.
Definition: scheduler.cpp:157
bool IsQueued(std::string const &id) const noexcept override
Queries if DAQ with ID has been queued before in the current workspace.
Definition: scheduler.cpp:153
Provides location of fits source file.
Interface to interact with DPM workspace.
Definition: workspace.hpp:129
virtual void RemoveDaq(std::string const &daq_id)=0
Removes workspace and all containing files for DAQ without archiving it.
virtual auto ArchiveDaq(std::string const &daq_id) -> std::filesystem::path=0
Archives specified DAQ witout deleting any files, typically by moving it to a specific location in th...
virtual auto LoadDaq(std::string const &daq_id) -> std::unique_ptr< DaqWorkspace >=0
Loads a previously initialized DAQ workspace.
virtual auto InitializeDaq(std::string const &daq_id) -> std::unique_ptr< DaqWorkspace >=0
Initializes new DAQ Workspace.
virtual auto GetPath() const -> std::filesystem::path=0
virtual void StoreQueue(std::vector< std::string > const &queue) const =0
virtual auto LoadQueue() const -> std::vector< std::string >=0
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
boost::asio::io_context & get_io_context() noexcept
Not part of the boost::thread::executor concept.
Definition: ioExecutor.hpp:41
DPM server config.
daq::dpm::Workspace interface and implementation declaration
Declaration of log4cplus helpers.
constexpr std::string_view RELEASING_RSYNC
Failure during rsync source copy.
Definition: status.hpp:44
constexpr std::string_view COLLECTING_RSYNC
Failure during rsync source copy.
Definition: status.hpp:38
constexpr std::string_view MERGING_MERGE
Merging failed.
Definition: status.hpp:53
std::ostream & operator<<(std::ostream &os, DaqController const &daq)
Definition: scheduler.cpp:45
const std::string LOGGER_NAME_CONTROLLER
Definition: config.hpp:24
const std::string LOGGER_NAME_SCHEDULER
Definition: config.hpp:23
boost::signals2::scoped_connection daqs
Definition: scheduler.hpp:179
const std::string LOGGER_NAME_MERGER
Definition: config.hpp:26
const std::string LOGGER_NAME_TRANSFER
Definition: config.hpp:25
Options for DaqController.
Definition: scheduler.hpp:163
Limited resources.
Definition: scheduler.hpp:171
Options controlling scheduler operations.
Definition: scheduler.hpp:133
std::variant< OlasReceiver > ReceiverTypes
std::string id
DAQ id.
Definition: dpSpec.hpp:45
Location ParseSourceLocation(std::string const &location_str)
Parse location string from DpSpec into component parts.
Definition: dpSpec.cpp:90
DpSpec ParseDpSpec(Json const &json)
Parse JSON to construct the DpSpec structure.
Definition: dpSpec.cpp:47
Target target
Describes target which will become the data produtc.
Definition: dpSpec.hpp:49
std::optional< FitsFileSource > source
Definition: dpSpec.hpp:37
ReceiverList receivers
Ordered container of receivers where to deliver the target data product.
Definition: dpSpec.hpp:59
std::vector< SourceTypes > sources
List of sources to create data product from.
Definition: dpSpec.hpp:54
std::string file_prefix
Optioal user chosen file prefix to make it easier to identify the produced file.
Definition: dpSpec.hpp:36
Close representation of the JSON structure but with stronger types.
Definition: dpSpec.hpp:30
AlertId MakeAlertId(std::string_view category, std::string key)
Definition: status.cpp:49
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
Definition: conversion.cpp:63
State
Observable states of the data acquisition process.
Definition: state.hpp:39
@ Completed
Completed DAQ.
@ Scheduled
daq is acknowledged by dpm and is scheduled for merging (i.e.
@ Releasing
Releasing Data Product to receivers.
@ Collecting
Input files are being collected.
@ Aborted
Data acquisition has been aborted by user.
@ Merging
DAQ is being merged.
@ AbortingMerging
Transitional state for aborting during merging.
bool IsFinalState(State state) noexcept
Query whether state is in a final state.
Definition: state.cpp:15
bool IsAbortableState(State state) noexcept
Query whether state is in an abortable state.
Definition: state.cpp:25
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
Definition: status.cpp:39
Definition: main.cpp:23
daq::dpm::Scheduler and related class declarations.
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:153
State state
Definition: status.hpp:176
std::string id
Definition: status.hpp:174
std::string result
Path to resulting data product.
Definition: status.hpp:193
std::string file_id
Definition: status.hpp:175
bool error
Definition: status.hpp:177
TimePoint timestamp
Timestamp of last update.
Definition: status.hpp:198