ifw-daq  2.1.0-pre1
IFW Data Acquisition modules
scheduler.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_dpm
4  * @copyright (c) Copyright ESO 2022
5  * All Rights Reserved
6  * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7  *
8  * @brief daq::dpm::Scheduler implementation
9  */
10 #include <daq/dpm/scheduler.hpp>
11 
12 #include <algorithm>
13 
14 #include <boost/asio/post.hpp>
15 #include <fmt/format.h>
16 #include <fmt/ostream.h>
17 #include <log4cplus/loggingmacros.h>
18 
19 #include <daq/dpm/workspace.hpp>
20 #include <daq/error/report.hpp>
21 #include <daq/log4cplus.hpp>
22 
23 namespace daq::dpm {
24 
25 namespace {
26 void PrintArgs(log4cplus::Logger& logger, std::vector<std::string> const& args) {
27  std::stringstream ss;
28  ss << "{";
29  bool first = true;
30  for (auto const& token : args) {
31  if (!first) {
32  ss << ", ";
33  }
34  ss << token;
35  first = false;
36  }
37  ss << "}";
38  LOG4CPLUS_DEBUG(logger, "Executing merger with args: " << ss.str());
39 }
40 } // namespace
41 
42 ResourceToken::ResourceToken(Resource* resource) noexcept : m_resource(resource) {
43 }
44 
46  m_resource->Release();
47 }
48 
49 void Resource::Release() noexcept {
50  m_used--;
51  // If limit is 0 (infinite resouces) then there's no point in signalling as nothing will ever be
52  // blocked by missing resource.
53  if (m_limit > 0 && m_used < m_limit) {
54  m_signal();
55  }
56 }
57 
58 std::ostream& operator<<(std::ostream& os, DaqController const& daq) {
59  os << "DAQ{" << daq.GetStatus() << "}";
60  return os;
61 }
62 
64  Workspace& workspace,
65  DaqControllerFactory daq_controller_factory,
66  SchedulerOptions const& options)
67  : m_executor(executor)
68  , m_workspace(workspace)
69  , m_daq_controller_factory(std::move(daq_controller_factory))
70  , m_options(options)
71  , m_logger(log4cplus::Logger::getInstance("dpm.scheduler"))
72  , m_liveness(std::make_shared<bool>(false)) {
73  m_queue = m_workspace.LoadQueue();
74  auto slot = [liveness = std::weak_ptr<bool>(m_liveness), this]() {
75  if (auto ptr = liveness.lock(); ptr) {
76  // Scheduler still alive
77  DeferredPoll();
78  }
79  };
80  m_resources.daqs.Connect(slot);
81  m_resources.net_receive.Connect(slot);
82  m_resources.merge.Connect(slot);
83  m_resources.net_send.Connect(slot);
84 }
85 
87  m_stopped = false;
88  DeferredPoll();
89 }
90 
92  m_stopped = true;
93 }
94 
95 std::string SchedulerImpl::QueueDaq(std::string const& dp_spec_serialized) {
96  LOG4CPLUS_TRACE(m_logger, "QueueDaq()");
97  try {
98  auto json = nlohmann::json::parse(dp_spec_serialized);
99  DpSpec dp_spec = ParseDpSpec(json);
100  std::string const& id = dp_spec.id;
101  std::string const& file_id = dp_spec.target.file_id;
102  if (IsQueued(id)) {
103  LOG4CPLUS_ERROR(m_logger, "QueueDaq(): DAQ conflict detected -> aborting");
104  throw std::invalid_argument(
105  fmt::format("Scheduler: Could not queue DAQ for merging as "
106  "a Data Acquisition with same id has been queued before: '{}'",
107  id));
108  }
109  // New daq for this workspace -> Initialize persistent state.
110  try {
111  LOG4CPLUS_INFO(m_logger,
112  fmt::format("QueueDaq(): Initializing new workspace for DAQ {}", id));
113  auto daq_ws = m_workspace.InitializeDaq(id);
114  assert(daq_ws);
115  try {
116  assert(daq_ws);
117  Status initial_status;
118  initial_status.id = id;
119  initial_status.file_id = file_id;
120  // Initial state in DPM is Scheduled as that's where DPM takes over.
121  initial_status.state = State::Scheduled;
122  initial_status.error = false;
123  initial_status.timestamp = Status::TimePoint::clock::now();
124 
125  daq_ws->StoreStatus(initial_status);
126  daq_ws->StoreSpecification(dp_spec_serialized);
127 
128  try {
129  m_queue.push_back(id);
130  // Finally update the backlog
131  m_workspace.StoreQueue(m_queue);
132 
133  DeferredPoll();
134  return id;
135  } catch (...) {
136  // Undo push
137  m_queue.pop_back();
138  std::throw_with_nested(std::runtime_error("Failed to store DAQ queue"));
139  }
140  } catch (...) {
141  std::throw_with_nested(
142  std::runtime_error("Failed to write status to DAQ workspace"));
143  }
144  } catch (...) {
145  // Undo creation of DAQ workspace
146  m_workspace.RemoveDaq(id);
147  std::throw_with_nested(std::runtime_error(fmt::format(
148  "Failed to initialize DAQ workspace in {}", m_workspace.GetPath().native())));
149  }
150  } catch (DpSpecError const&) {
151  LOG4CPLUS_DEBUG(m_logger,
152  "Invalid Data Product Specification provided: \n"
153  << dp_spec_serialized);
154 
155  std::throw_with_nested(
156  std::invalid_argument("Scheduler: Could not queue DAQ for merging because "
157  "Data Product Specification is invalid"));
158  } catch (nlohmann::json::parse_error const&) {
159  std::throw_with_nested(
160  std::invalid_argument("Scheduler: Could not queue DAQ for merging because provided "
161  "data product specification is invalid JSON"));
162  }
163 }
164 
165 bool SchedulerImpl::IsQueued(std::string const& id) const noexcept {
166  return std::find(m_queue.begin(), m_queue.end(), id) != m_queue.end();
167 }
168 
169 Status SchedulerImpl::GetDaqStatus(std::string const& id) const try {
170  LOG4CPLUS_TRACE(m_logger, fmt::format("GetDaqStatus({})", id));
171  if (!IsQueued(id)) {
172  LOG4CPLUS_INFO(m_logger, fmt::format("GetDaqStatus({}): No such ID", id));
173  throw std::invalid_argument(fmt::format("DAQ with id='{}' not found", id));
174  }
175  return m_workspace.LoadDaq(id)->LoadStatus();
176 } catch (...) {
177  std::throw_with_nested(std::runtime_error("Scheduler: GetDaqStatus() failed"));
178 }
179 
180 std::vector<std::string> SchedulerImpl::GetQueue() const noexcept {
181  return m_queue;
182 }
183 
184 void SchedulerImpl::AbortDaq(std::string const& id) try {
185  LOG4CPLUS_TRACE(m_logger, fmt::format("AbortDaq({})", id));
186  // - If DAQ is either being released or is already in final state it cannot be aborted.
187  // - If DAQ is active must first be made inactive by destroying DaqController which will also
188  // terminate started processes.
189  // - If DAQ is not active aborting simply means to erase DAQ from filesystem and queue.
190  if (!IsQueued(id)) {
191  LOG4CPLUS_INFO(m_logger, fmt::format("AbortDaq({}): No such ID", id));
192  throw std::invalid_argument(fmt::format("DAQ with id='{}' not found", id));
193  }
194 
195  // If DAQ is active/in-progress we must first delete it
196  auto it = std::find_if(m_active.begin(), m_active.end(), [&id](Active const& active) {
197  assert(active.daq);
198  return id == active.daq->GetId();
199  });
200 
201  // Status to update with final aborted state
202  auto pub = [&, status = m_workspace.LoadDaq(id)->LoadStatus()](State new_state) mutable {
203  status.state = State::AbortingMerging;
204  m_status_signal(status);
205  };
206 
207  if (it != m_active.end()) {
208  auto state = it->daq->GetState();
209  if (IsFinalState(state) || state == State::Releasing) {
210  LOG4CPLUS_DEBUG(m_logger,
211  fmt::format("AbortDaq({}): Cannot abort in state {}", id, state));
212  }
213  // Erase if active
214  LOG4CPLUS_DEBUG(
215  m_logger,
216  fmt::format("AbortDaq({}): Erasing active DAQ currently in state {}", id, state));
217  m_active.erase(it);
218  }
219 
220  // Publish that we're aborting
221  pub(State::AbortingMerging);
222 
223  // Erase id from queue
224  LOG4CPLUS_DEBUG(m_logger, fmt::format("AbortDaq({}): Removing DAQ from merge queue", id));
225  m_queue.erase(std::find(m_queue.begin(), m_queue.end(), id));
226  m_workspace.StoreQueue(m_queue);
227 
228  // Delete workspace
229  LOG4CPLUS_DEBUG(m_logger, fmt::format("AbortDaq({}): Removing DAQ workspace", id));
230  m_workspace.RemoveDaq(id);
231 
232  // Publish that DAQ is aborted
233  pub(State::Aborted);
234 
235  // Queue has changed -> Poll
236  DeferredPoll();
237  LOG4CPLUS_INFO(m_logger, fmt::format("AbortDaq({}): Aborted and removed DAQ", id));
238 } catch (...) {
239  std::throw_with_nested(std::runtime_error("Scheduler: AbortDaq() failed"));
240 }
241 
242 boost::signals2::connection SchedulerImpl::ConnectStatus(StatusSignal::slot_type const& slot) {
243  return m_status_signal.connect(slot);
244 }
245 
246 void SchedulerImpl::Poll() {
247  LOG4CPLUS_TRACE(m_logger, "Poll()");
248  ArchiveCompleted();
249  ActivateFromQueue();
250 }
251 
252 void SchedulerImpl::ArchiveCompleted() {
253  // Archive completed DAQs
254  for (auto& active : m_active) { // auto it = remove_it; it != m_active.end(); ++it) {
255  try {
256  if (!IsFinalState(active.daq->GetState())) {
257  continue;
258  }
259  auto id = active.daq->GetId();
260 
261  auto archive_path = m_workspace.ArchiveDaq(id);
262  LOG4CPLUS_INFO(
263  m_logger,
264  fmt::format("DAQ {} is in final state {} -> moved workspace to archive: {}",
265  id,
266  active.daq->GetState(),
267  archive_path.native()));
268 
269  // Delete controller
270  active.daq.reset();
271 
272  // Remove ID from queue
273  m_queue.erase(std::find(m_queue.begin(), m_queue.end(), id));
274  m_workspace.StoreQueue(m_queue);
275  } catch (...) {
276  // @todo: Decide what to do here...
277  LOG4CPLUS_ERROR(m_logger,
278  "Failed to archive DAQ workspace:\n"
279  << error::NestedExceptionReporter(std::current_exception()));
280  }
281  }
282 
283  // Erase removed daqs
284  auto remove_it = std::remove_if(
285  m_active.begin(), m_active.end(), [&](auto const& active) { return !active.daq; });
286  if (remove_it != m_active.end()) {
287  m_active.erase(remove_it, m_active.end());
288  // Since this freed resources we schedule poll() to be run again.
289  DeferredPoll();
290  }
291 }
292 
293 void SchedulerImpl::ActivateFromQueue() {
294  // Update active DAQs
295  // See if a new DAQ can be active.
296  if (m_queue.empty()) {
297  LOG4CPLUS_INFO(m_logger, "Merge queue empty - all done!");
298  return;
299  }
300 
301  auto candidates = GetCandidates();
302  if (candidates.empty()) {
303  LOG4CPLUS_INFO(m_logger, "All DAQ merge candidates are active/in-progress");
304  return;
305  }
306  // Schedule as many candidates as allowed by resource limits
307  try {
308  for (auto const& id : candidates) {
309  LOG4CPLUS_TRACE(m_logger, fmt::format("{}: Attempting to schedule DAQ", id));
310  auto maybe_token = m_resources.daqs.Acquire();
311  if (!maybe_token) {
312  LOG4CPLUS_INFO(m_logger,
313  fmt::format("Limit reached: Cannot schedule '{}' "
314  "Currently active DAQs at/exceed limit. "
315  "current: {}, limit: {}, queue size: {}",
316  id,
317  m_resources.daqs.GetUsed(),
318  m_resources.daqs.GetLimit(),
319  m_queue.size()));
320  return;
321  }
322 
323  try {
324  // @todo: Store token with DAQ?
325  auto ws = m_workspace.LoadDaq(id);
326  auto daq_controller = m_daq_controller_factory(std::move(ws), m_resources);
327 
328  // Aggregate signal
329  // both DaqController and slot are owned by Scheduler so we don't need
330  // to manage connection lifetime.
331  daq_controller->GetStatus().ConnectStatus([&](ObservableStatus const& s) {
332  // Completed DAQs should be archived so we schedule a poll in that case.
333  if (IsFinalState(s.GetState())) {
334  DeferredPoll();
335  }
336  // Just forward status
337  m_status_signal(s.GetStatus());
338  });
339  // Tell DaqController to start
340  daq_controller->Start();
341 
342  m_active.emplace_back(std::move(daq_controller), std::move(*maybe_token));
343 
344  LOG4CPLUS_DEBUG(m_logger, fmt::format("{}: DAQ scheduled for merging", id));
345  // Schedule new Poll() as queue has been modified.
346  DeferredPoll();
347  } catch (...) {
348  std::throw_with_nested(
349  std::runtime_error(fmt::format("{}: Failed to activate DAQ for merging.", id)));
350  }
351  }
352  } catch (...) {
353  std::throw_with_nested(std::runtime_error("Failed to load "));
354  }
355 }
356 
357 void SchedulerImpl::DeferredPoll() {
358  if (m_stopped || *m_liveness) {
359  // Stopped or already scheduled
360  return;
361  }
362  boost::asio::post(m_executor.get_io_context().get_executor(),
363  [liveness = std::weak_ptr<bool>(m_liveness), this]() {
364  if (auto ptr = liveness.lock(); ptr) {
365  // Scheduler still alive
366  // Clear flag before invoking Poll so that another
367  // poll can be scheduled.
368  *ptr = false;
369  Poll();
370  }
371  });
372 }
373 
374 std::vector<std::string> SchedulerImpl::GetCandidates() const {
375  // Return all DAQs in m_queue that is not in m_active.
376  std::vector<std::string> candidates;
377  for (auto const& id : m_queue) {
378  auto it = std::find_if(m_active.begin(), m_active.end(), [&](auto const& active) {
379  assert(active.daq);
380  return active.daq->GetId() == id;
381  });
382  if (it == m_active.end()) {
383  candidates.push_back(id);
384  }
385  }
386  return candidates;
387 }
388 
389 DaqControllerImpl::DaqControllerImpl(rad::IoExecutor& executor,
390  std::unique_ptr<DaqWorkspace> workspace,
391  Resources& resources,
392  RsyncFactory rsync_factory,
393  ProcFactory proc_factory,
394  DaqControllerOptions options)
395  : m_executor(executor)
396  , m_workspace(std::move(workspace))
397  , m_resources(resources)
398  , m_rsync_factory(std::move(rsync_factory))
399  , m_proc_factory(std::move(proc_factory))
400  , m_options(std::move(options))
401  , m_dpspec(m_workspace->LoadSpecification())
402  , m_result()
403  , m_state_ctx(Scheduled{}) // m_state_ctx is updated in body with actual state.
404  , m_status(m_workspace->LoadStatus())
405  , m_status_connection()
406  , m_liveness(std::make_shared<bool>(false))
407  , m_stopped(true)
408  , m_logger(log4cplus::Logger::getInstance("dpm.daqcontroller")) {
409  if (m_options.merge_bin.empty()) {
410  throw std::invalid_argument("Specified merge application name is empty");
411  }
412  if (m_options.rsync_bin.empty()) {
413  throw std::invalid_argument("Specified rsync application name is empty");
414  }
415 
416  // If result has already been created it is contained in the stored status.
417  // Otherwise we set to-be-used absolute path
418  if (m_status.GetStatus().result.empty()) {
419  m_result = m_workspace->GetResultPath() /
420  (m_dpspec.target.file_prefix + m_dpspec.target.file_id + ".fits");
421  } else {
422  m_result = m_status.GetStatus().result;
423  }
424 
425  // Update m_state from loaded status.
426  auto error = m_status.GetError();
427  switch (m_status.GetState()) {
428  case State::Scheduled:
429  SetState(Scheduled{}, error);
430  break;
431  case State::Transferring:
432  SetState(Transferring{SourceResolver(m_workspace->LoadSourceLookup())}, error);
433  break;
434  case State::Merging:
435  SetState(Merging(), error);
436  break;
437  case State::Completed:
438  SetState(Completed{}, error);
439  break;
440  default:
441  throw std::runtime_error("Not implemented");
442  };
443 
444  m_status_connection = m_status.ConnectStatus(
445  [prev = m_status.GetStatus(), this](ObservableStatus const& status) mutable {
446  if (prev == status.GetStatus()) {
447  // No changes -> don't write
448  return;
449  }
450  prev = status.GetStatus();
451  LOG4CPLUS_TRACE(m_logger,
452  fmt::format("DaqController: Updating workspace status file: {}", prev));
453  m_workspace->StoreStatus(prev);
454  });
455 }
456 
458  m_stopped = false;
459  DeferredPoll();
460 }
461 
463  m_stopped = true;
464 }
465 
466 auto DaqControllerImpl::IsStopped() const noexcept -> bool {
467  return m_stopped;
468 }
469 
470 auto DaqControllerImpl::GetId() const noexcept -> std::string const& {
471  return m_status.GetId();
472 }
473 
474 auto DaqControllerImpl::GetErrorFlag() const noexcept -> bool {
475  return m_status.GetError();
476 }
477 
478 auto DaqControllerImpl::GetState() const noexcept -> State {
479  return m_status.GetState();
480 }
481 
483  return m_status;
484 }
485 
486 auto DaqControllerImpl::GetStatus() const noexcept -> ObservableStatus const& {
487  return m_status;
488 }
489 
490 void DaqControllerImpl::DeferredPoll() {
491  if (m_stopped || *m_liveness) {
492  // Stopped or already scheduled
493  return;
494  }
495  boost::asio::post(m_executor.get_io_context().get_executor(),
496  [liveness = std::weak_ptr<bool>(m_liveness), this]() {
497  if (auto ptr = liveness.lock(); ptr) {
498  // Scheduler still alive
499  // Clear flag before invoking Poll so that another
500  // poll can be scheduled.
501  *ptr = false;
502  Poll();
503  }
504  });
505 }
506 
508  LOG4CPLUS_TRACE(m_logger, "Poll()");
509  if (m_stopped) {
510  LOG4CPLUS_DEBUG(
511  m_logger,
512  fmt::format("{}: Poll() - DaqController is stopped so nothing will be done", *this));
513  // Stopped or already scheduled
514  return;
515  }
516  std::visit([this](auto& state) { Poll(state); }, m_state_ctx);
517 }
518 
519 void DaqControllerImpl::Poll(DaqControllerImpl::Scheduled&) {
520  LOG4CPLUS_TRACE(m_logger, "Poll(Scheduled)");
521  try {
522  // Create list of remote sources and build mapping to local files.
523  std::vector<DpSpec::SourceFitsFile> sources;
524  if (m_dpspec.target.source) {
525  sources.push_back(*m_dpspec.target.source);
526  }
527  for (auto const& s : m_dpspec.sources) {
528  if (!std::holds_alternative<DpSpec::SourceFitsFile>(s)) {
529  continue;
530  }
531  sources.push_back(std::get<DpSpec::SourceFitsFile>(s));
532  }
533 
534  auto sources_path = m_workspace->GetSourcesPath();
535 
536  SourceResolver resolver;
537  unsigned index = 0;
538  for (auto const& s : sources) {
539  Origin origin = ParseSourceOrigin(s.origin);
540  // Use <index>_<source>_<filename> as local filename.
541  auto local_path =
542  sources_path /
543  fmt::format("{}_{}_{}", index, s.source_name, origin.path.filename().native());
544  LOG4CPLUS_INFO(m_logger,
545  fmt::format("Poll(Scheduled): Source file '{}' from source \"{}\" "
546  "on host \"{}\" will be stored in {}",
547  origin.path,
548  s.source_name,
549  !origin.host.empty() ? origin.host.c_str() : "<n/a>",
550  local_path));
551 
552  resolver.Add({s.source_name, s.origin}, local_path);
553  }
554 
555  // Store source resolution mapping.
556  m_workspace->StoreSourceLookup(resolver.GetMapping());
557 
558  SetState(Transferring(std::move(resolver)));
559  } catch (...) {
560  auto msg =
561  fmt::format("{}: Failed to collect and store list of required sources", m_status);
562  LOG4CPLUS_ERROR(m_logger, fmt::format("Poll(Scheduled): Failed to process DAQ: {}", msg));
563  // No point in retrying these kinds of errors.
564  Stop();
565  std::throw_with_nested(std::runtime_error(msg));
566  }
567 }
568 
569 void DaqControllerImpl::Poll(DaqControllerImpl::Transferring& ctx) {
570  LOG4CPLUS_TRACE(m_logger, "Poll(Transferring)");
571  try {
572  // Check if any source is missing and start transfers if possible
573  // If all transfers are done transition to State::Merging.
574 
575  auto const& sources = ctx.resolver.GetMapping();
576  auto missing = sources.size();
577  auto root = m_workspace->GetPath();
578  for (auto const& source : sources) {
579  if (ctx.HasTransfer(source.first)) {
580  // Transfer already in progress
581  continue;
582  }
583  // [recovery]
584  if (m_workspace->Exists(source.second)) {
585  missing--;
586  // File already exist
587  continue;
588  }
589  if (m_soft_stop) {
590  // If there are errors we don't initiate new transfers
591  LOG4CPLUS_TRACE(m_logger,
592  "Poll(Transferring): Soft stop is enabled"
593  "-> won't start new transfer");
594  continue;
595  }
596  // We need to transfer the file, try to start.
597  auto token = m_resources.net_receive.Acquire();
598  if (token) {
599  // We got the token, so we can start transfer
600  Origin origin = ParseSourceOrigin(source.first.origin);
601  using Proc = std::shared_ptr<RsyncAsyncProcessIf>;
602  RsyncOptions opts;
603  opts.rsync = m_options.rsync_bin;
604  Proc proc = m_rsync_factory(m_executor.get_io_context(),
605  origin.RsyncPath(),
606  root / source.second,
607  opts,
609 
610  // Connect signals (stdout is ignored for now as it merely contains the progress
611  // which will be handled differently later on).
612  proc->ConnectStderr([logger = log4cplus::Logger::getInstance("dpm.transfer")](
613  pid_t pid, std::string const& line) {
614  LOG4CPLUS_INFO(logger, pid << ": " << Trim(line));
615  });
616  // Start transfer and set up completion handler
617  proc->Initiate().then(
618  m_executor,
619  [liveness = std::weak_ptr<bool>(m_liveness),
620  source = source.first,
621  dest = source.second,
622  proc = proc,
623  this](boost::future<int> f) {
624  if (liveness.expired()) {
625  LOG4CPLUS_ERROR("dpm.daqcontroller",
626  fmt::format("DaqController abandoned -> ignoring "
627  "result from rsync for transfer of {}",
628  source));
629  return;
630  }
631  TransferComplete(source, dest, std::move(f));
632  });
633  ctx.transfers.emplace_back(Transferring::Transfer{
634  source.first, source.second, std::move(proc), std::move(*token)});
635  } else {
636  // Out of tokens -> bail out
637  LOG4CPLUS_TRACE(m_logger,
638  fmt::format("Poll(Transferring): Could not start transfer due to "
639  "resource limit reached: {}",
640  source.first));
641  return;
642  }
643  }
644 
645  if (missing == 0) {
646  // If all transfers are complete we can transition to next state
647  SetState(Merging(), false);
648  return;
649  }
650  } catch (...) {
651  SetError(true);
652  // No point in retrying these kinds of errors.
653  auto msg = fmt::format("{}: Failed to transfer required sources", m_status);
654  LOG4CPLUS_ERROR(m_logger,
655  fmt::format("Poll(Transferring): Failed to process DAQ: {}", msg));
656  Stop();
657  std::throw_with_nested(std::runtime_error(msg));
658  }
659 }
660 
661 void DaqControllerImpl::TransferComplete(SourceResolver::SourceFile const& source,
662  std::filesystem::path const& local_path,
663  boost::future<int> result) noexcept {
664  LOG4CPLUS_TRACE(m_logger,
665  fmt::format("{}: TransferComplete: {} -> {}", *this, source, local_path));
666  auto* ctx = std::get_if<Transferring>(&m_state_ctx);
667  // Multiple changes can be made here so we defer signalling until we've done them all.
668  auto defer = ObservableStatus::DeferSignal(&m_status);
669  try {
670  if (ctx) {
671  ctx->EraseTransfer(source);
672  }
673 
674  int return_code = result.get();
675  auto alert_id = MakeAlertId(alert::TRANSFERRING_RSYNC, local_path);
676  if (return_code != 0) {
677  m_status.SetAlert(MakeAlert(
678  alert_id, fmt::format("rsync file transfer failed for remote file {}", source)));
679  LOG4CPLUS_ERROR(m_logger, fmt::format("rync file transfer failed: {}", source));
680  throw std::runtime_error("rsync failed to transfer file");
681  }
682  // Opportunistically clear alert in case it as set from previous attempt
683  m_status.ClearAlert(alert_id);
684  LOG4CPLUS_INFO(m_logger,
685  fmt::format("{}: Poll(Transferring): Successfully transfer file: {} -> {}",
686  *this,
687  source,
688  local_path));
689 
690  // Transfer ok
691  DeferredPoll();
692  return;
693  } catch (...) {
694  SetError(true);
695  LOG4CPLUS_ERROR(m_logger,
696  fmt::format("{}: Poll(Transferring): Failed to transfer file: {} -> {}",
697  *this,
698  source,
699  local_path));
700  // Stopping DAQ can be done when there's no more pending file transfers (we let them
701  // complete).
702  if (ctx && ctx->transfers.empty()) {
703  LOG4CPLUS_ERROR(
704  m_logger,
705  fmt::format("{}: Poll(Transferring): All pending transfers have completed "
706  "(with or without error) so we stop DAQ",
707  *this));
708  Stop();
709  }
710  }
711 }
712 
713 bool DaqControllerImpl::Transferring::HasTransfer(
714  SourceResolver::SourceFile const& source) const noexcept {
715  auto it = std::find_if(transfers.cbegin(), transfers.cend(), [&](Transfer const& t) {
716  return t.source == source;
717  });
718  return it != transfers.cend();
719 }
720 
721 DaqControllerImpl::Transferring::Transfer*
722 DaqControllerImpl::Transferring::GetTransfer(SourceResolver::SourceFile const& source) noexcept {
723  auto it = std::find_if(
724  transfers.begin(), transfers.end(), [&](Transfer const& t) { return t.source == source; });
725 
726  if (it != transfers.end()) {
727  return &(*it);
728  }
729  return nullptr;
730 }
731 
732 void DaqControllerImpl::Transferring::EraseTransfer(SourceResolver::SourceFile const& source) {
733  auto it = std::find_if(transfers.cbegin(), transfers.cend(), [&](Transfer const& t) {
734  return t.source == source;
735  });
736  if (it != transfers.end()) {
737  transfers.erase(it);
738  }
739 }
740 
741 DaqControllerImpl::Transferring::~Transferring() noexcept {
742  for (auto& transfer : transfers) {
743  if (transfer.proc && transfer.proc->IsRunning()) {
744  LOG4CPLUS_DEBUG(
745  "dpm",
746  "DaqController::Transferring::~Transferring: Aborting running transfer process");
747  transfer.proc->Abort();
748  }
749  }
750 }
751 
752 DaqControllerImpl::Merging::~Merging() noexcept {
753  if (merger && merger->IsRunning()) {
754  LOG4CPLUS_DEBUG("dpm", "DaqController::Merging::~Merging: Aborting running merge process");
755  merger->Abort();
756  }
757 }
758 
759 void DaqControllerImpl::Poll(DaqControllerImpl::Merging& ctx) {
760  LOG4CPLUS_TRACE(m_logger, "Poll(Merging)");
761  auto defer = ObservableStatus::DeferSignal(&m_status);
762  try {
763  if (ctx.merger) {
764  // Merge is already in progress
765  return;
766  }
767  // [recovery]
768  if (m_workspace->Exists(m_result)) {
769  SetState(Completed{});
770  LOG4CPLUS_TRACE(m_logger,
771  fmt::format("{}: Poll(Merging): "
772  "Recovered from error automatically (manual merge)",
773  *this));
774  return;
775  }
776  auto token = m_resources.merge.Acquire();
777  if (!token) {
778  LOG4CPLUS_TRACE(m_logger,
779  fmt::format("{}: Poll(Merging): Could not start merging due to "
780  "resource limit reached: {}",
781  *this,
782  m_resources.merge.GetLimit()));
783 
784  return;
785  }
786 
787  // Launch merge
788  // note: daqDpmMerge will atomically move result into output so we don't have
789  // to do that ourselves.
790  std::vector<std::string> args{m_options.merge_bin};
791  args.emplace_back("--json");
792  args.emplace_back("--root");
793  args.emplace_back(m_workspace->GetPath());
794  args.emplace_back("--resolver");
795  args.emplace_back(m_workspace->GetSourceLookupPath());
796  args.emplace_back("-o");
797  args.emplace_back(m_result.native());
798  // Positional argument for the specification which is passed as absolute path.
799  args.emplace_back(m_workspace->GetPath() / m_workspace->GetSpecificationPath());
800 
801  PrintArgs(m_logger, args);
802 
803  ctx.merger = m_proc_factory(m_executor.get_io_context(), std::move(args));
804 
805  ctx.merger->ConnectStdout([logger = log4cplus::Logger::getInstance("dpm.merger"), this](
806  pid_t pid, std::string const& line) {
807  LOG4CPLUS_DEBUG(logger, pid << ": " << Trim(line));
808  HandleMergeMessage(line);
809  });
810  ctx.merger->ConnectStderr([logger = log4cplus::Logger::getInstance("dpm.merger")](
811  pid_t pid, std::string const& line) {
812  LOG4CPLUS_INFO(logger, pid << ": " << Trim(line));
813  });
814 
815  ctx.merger->Initiate().then(
816  m_executor,
817  [liveness = std::weak_ptr<bool>(m_liveness),
818  id = m_status.GetId(),
819  proc = ctx.merger,
820  this](boost::future<int> f) {
821  if (liveness.expired()) {
822  LOG4CPLUS_ERROR("dpm.daqcontroller",
823  fmt::format("{}: DaqController abandoned -> ignoring "
824  "result from merger",
825  id));
826  return;
827  }
828  MergeComplete(std::move(f));
829  });
830  ctx.token = std::move(*token);
831  } catch (...) {
832  Stop();
833  auto msg = fmt::format("{}: Failed to initiate merging", *this);
834  LOG4CPLUS_ERROR(m_logger, msg);
835  std::throw_with_nested(std::runtime_error(msg));
836  }
837 }
838 
839 void DaqControllerImpl::MergeComplete(boost::future<int> result) noexcept {
840  LOG4CPLUS_TRACE(m_logger, "MergeComplete()");
841  // Defer signal until all changes have been made
842  auto defer = ObservableStatus::DeferSignal(&m_status);
843  try {
844  auto alert_id = MakeAlertId(alert::MERGING_MERGE, "");
845  auto* ctx = std::get_if<Merging>(&m_state_ctx);
846  if (ctx) {
847  ctx->Reset();
848  }
849  auto exit_code = result.get();
850  if (exit_code != 0) {
851  auto msg = fmt::format("Merging failed with code {}", exit_code);
852  m_status.SetAlert(MakeAlert(alert_id, msg));
853  throw std::runtime_error(fmt::format("Merging failed with code {}", exit_code));
854  }
855  if (!m_workspace->Exists(m_result)) {
856  auto abs_path = m_workspace->GetPath() / m_result;
857  auto msg =
858  fmt::format("Merging reported with success but file is not found: {}", abs_path);
859  m_status.SetAlert(MakeAlert(alert_id, msg));
860  LOG4CPLUS_ERROR(m_logger, msg);
861  throw std::runtime_error(msg);
862  }
863  m_status.ClearAlert(alert_id);
864 
865  // Create symlink and set result
866  m_workspace->MakeResultSymlink(m_result);
867  // @todo: Include host
868  m_status.SetResult(m_result.native());
869 
870  // All good -> transition to State::Completed
871  // @todo: Transition to Releasing rather than completed
872  SetState(Completed{});
873  LOG4CPLUS_INFO(m_logger, fmt::format("{}: Completed successfully!", *this));
874  } catch (...) {
875  SetError(true);
876  Stop();
877  LOG4CPLUS_ERROR(
878  m_logger,
879  fmt::format(
880  "{}: PollMergeComplete: Failed to create data product: {}", *this, m_result));
881  }
882 }
883 
884 void DaqControllerImpl::Poll(Completed&) {
885  LOG4CPLUS_TRACE(m_logger, "Poll(Completed)");
886  // We're done
887  Stop();
888 }
889 
890 void DaqControllerImpl::SetState(StateVariant s, bool error) {
891  m_state_ctx = std::move(s);
892  auto new_state = MakeState(m_state_ctx);
893  if (new_state == m_status.GetState() && error == m_status.GetError()) {
894  // No change
895  return;
896  }
897 
898  m_status.SetState(new_state, error);
899  DeferredPoll();
900 }
901 
902 void DaqControllerImpl::SetError(bool error) {
903  if (error == m_status.GetError()) {
904  // No change
905  return;
906  }
907 
908  m_status.SetError(error);
909  m_soft_stop = true;
910  DeferredPoll();
911 }
912 
913 State DaqControllerImpl::MakeState(StateVariant const& s) {
914  if (std::holds_alternative<Scheduled>(s)) {
915  return State::Scheduled;
916  }
917  if (std::holds_alternative<Transferring>(s)) {
918  return State::Transferring;
919  }
920  if (std::holds_alternative<Merging>(s)) {
921  return State::Merging;
922  }
923  if (std::holds_alternative<Completed>(s)) {
924  return State::Completed;
925  }
926 
927  assert(false); // NOLINT Not implemented yet
928 }
929 
930 DaqControllerImpl::Transferring::Transferring(SourceResolver resolver_arg)
931  : resolver(std::move(resolver_arg)) {
932 }
933 
934 void DaqControllerImpl::Merging::Reset() {
935  merger.reset();
936  token.reset();
937 }
938 
939 void DaqControllerImpl::HandleMergeMessage(std::string const& line) noexcept try {
940  // A valid JSON message on stdout has the basic form:
941  // {"type": "<content-type>",
942  // "timestamp": <int64 nanonseconds since epoch>,
943  // "content": <content>}
944  auto json = nlohmann::json::parse(line);
945  auto const& type = json.at("type").get<std::string>();
946  auto const& content = json.at("content");
947  if (type == "alert") {
948  // Content schema:
949  // {"id", "ID",
950  // "message", "MESSAGE"}
951  std::string id;
952  std::string message;
953  content.at("id").get_to(id);
954  content.at("message").get_to(message);
955  auto alert_id = MakeAlertId(alert::MERGING_MERGE, id);
956  m_status.SetAlert(MakeAlert(alert_id, message));
957  }
958 } catch (...) {
959  LOG4CPLUS_DEBUG(m_logger, "Failed to parse JSON message from merger");
960 }
961 
962 } // namespace daq::dpm
daq::dpm::Resource::Acquire
std::optional< ResourceToken > Acquire() noexcept
Definition: scheduler.hpp:193
rad::IoExecutor::get_io_context
boost::asio::io_context & get_io_context() noexcept
Not part of the boost::thread::executor concept.
Definition: ioExecutor.hpp:41
daq::dpm::Resource::GetLimit
unsigned GetLimit() const noexcept
Definition: scheduler.hpp:204
daq::dpm::Resource::Release
void Release() noexcept
Definition: scheduler.cpp:49
daq::dpm::Workspace::RemoveDaq
virtual void RemoveDaq(std::string const &daq_id)=0
Removes workspace and all containing files for DAQ without archiving it.
daq::State
State
Observable states of the data acquisition process.
Definition: state.hpp:39
daq::Status::error
bool error
Definition: status.hpp:139
daq::dpm::SchedulerImpl::ConnectStatus
boost::signals2::connection ConnectStatus(StatusSignal::slot_type const &slot) override
Signals.
Definition: scheduler.cpp:242
daq::dpm::SchedulerImpl::GetQueue
std::vector< std::string > GetQueue() const noexcept override
Queries current DAQ queue.
Definition: scheduler.cpp:180
daq::MakeAlertId
AlertId MakeAlertId(std::string_view category, std::string key)
Definition: status.cpp:39
daq::dpm::Workspace
Interface to interact with DPM workspace.
Definition: workspace.hpp:98
daq::DpSpec
Close representation of the JSON structure but with stronger types.
Definition: dpSpec.hpp:28
daq::dpm::SchedulerImpl::IsQueued
bool IsQueued(std::string const &id) const noexcept override
Queries if DAQ with ID has been queued before in the current workspace.
Definition: scheduler.cpp:165
daq::DpSpecError
Definition: dpSpec.hpp:21
daq::DpSpec::Target::file_id
std::string file_id
Definition: dpSpec.hpp:51
daq::dpm::DaqControllerImpl::Start
void Start() override
Start/stop operations.
Definition: scheduler.cpp:457
daq::dpm::Workspace::StoreQueue
virtual void StoreQueue(std::vector< std::string > const &queue) const =0
daq::dpm::Resource::GetUsed
unsigned GetUsed() const noexcept
Definition: scheduler.hpp:207
rad::IoExecutor
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
daq::alert::TRANSFERRING_RSYNC
constexpr std::string_view TRANSFERRING_RSYNC
Failure during rsync source copy.
Definition: status.hpp:37
daq::ObservableStatus::GetState
State GetState() const noexcept
Definition: status.cpp:184
daq::dpm::DaqControllerImpl::GetStatus
auto GetStatus() noexcept -> ObservableStatus &override
Definition: scheduler.cpp:482
daq::dpm::ResourceToken::ResourceToken
ResourceToken(Resource *) noexcept
Definition: scheduler.cpp:42
daq::dpm::DaqControllerImpl::GetState
auto GetState() const noexcept -> State override
Definition: scheduler.cpp:478
daq::ObservableStatus::GetError
bool GetError() const noexcept
Definition: status.cpp:188
daq::State::Transferring
@ Transferring
Input files are being transferred.
daq::Status::timestamp
TimePoint timestamp
Definition: status.hpp:149
daq::dpm::DaqControllerImpl::Poll
void Poll() override
Definition: scheduler.cpp:507
daq::dpm::Workspace::GetPath
virtual auto GetPath() const -> std::filesystem::path=0
daq::dpm::Resources::daqs
Resource daqs
Definition: scheduler.hpp:232
daq::DpSpec::sources
std::vector< SourceTypes > sources
Definition: dpSpec.hpp:63
daq::dpm::SchedulerImpl::QueueDaq
std::string QueueDaq(std::string const &dp_spec) override
Queues DAQ for processing.
Definition: scheduler.cpp:95
daq::dpm::Resources::net_receive
Resource net_receive
Definition: scheduler.hpp:234
daq::ParseDpSpec
DpSpec ParseDpSpec(Json const &json)
Parse JSON to construct the DpSpec structure.
Definition: dpSpec.cpp:211
report.hpp
scheduler.hpp
daq::dpm::Scheduler and related class declarations.
daq::dpm::DaqController
Controller for specific DAQ.
Definition: scheduler.hpp:59
daq
Definition: asyncProcess.cpp:15
daq::dpm::DaqControllerOptions::rsync_bin
std::string rsync_bin
Definition: scheduler.hpp:164
daq::dpm::Workspace::LoadQueue
virtual auto LoadQueue() const -> std::vector< std::string >=0
daq::dpm::Resources::net_send
Resource net_send
Definition: scheduler.hpp:233
daq::dpm::DaqControllerImpl::IsStopped
auto IsStopped() const noexcept -> bool override
Definition: scheduler.cpp:466
daq::dpm::Workspace::InitializeDaq
virtual auto InitializeDaq(std::string const &daq_id) -> std::unique_ptr< DaqWorkspace >=0
Initializes new DAQ Workspace.
daq::dpm::DaqControllerImpl::GetErrorFlag
auto GetErrorFlag() const noexcept -> bool override
Definition: scheduler.cpp:474
daq::ObservableStatus
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:161
daq::dpm::DaqControllerOptions::merge_bin
std::string merge_bin
Definition: scheduler.hpp:163
log4cplus.hpp
Declaration of log4cplus helpers.
daq::ObservableStatus::SetError
void SetError(bool error) noexcept
Set error flag for data acquisition.
Definition: status.cpp:205
daq::dpm::Workspace::ArchiveDaq
virtual auto ArchiveDaq(std::string const &daq_id) -> std::filesystem::path=0
Archives specified DAQ witout deleting any files, typically by moving it to a specific location in th...
daq::ObservableStatus::GetId
std::string const & GetId() const noexcept
Definition: status.cpp:172
daq::dpm::SchedulerImpl::Start
void Start() override
Start/stop operations.
Definition: scheduler.cpp:86
daq::State::Merging
@ Merging
DAQ is being merged.
daq::dpm::Resource::Connect
boost::signals2::connection Connect(Signal::slot_type const &slot)
Connect to signal that is emitted when a resource become available.
Definition: scheduler.hpp:214
daq::alert::MERGING_MERGE
constexpr std::string_view MERGING_MERGE
Merging failed.
Definition: status.hpp:46
workspace.hpp
daq::dpm::Workspace interface and implementation declaration
daq::dpm::SchedulerOptions
Options controlling scheduler operations.
Definition: scheduler.hpp:132
daq::DpSpec::Target::source
std::optional< SourceFitsFile > source
Definition: dpSpec.hpp:56
daq::dpm::DaqControllerImpl::Stop
void Stop() override
Definition: scheduler.cpp:462
daq::dpm::SchedulerImpl::Stop
void Stop() override
Definition: scheduler.cpp:91
daq::RsyncAsyncProcessIf::DryRun::Disabled
@ Disabled
daq::dpm::Workspace::LoadDaq
virtual auto LoadDaq(std::string const &daq_id) -> std::unique_ptr< DaqWorkspace >=0
Loads a previously initialized DAQ workspace.
daq::dpm::Resources::merge
Resource merge
Definition: scheduler.hpp:235
daq::DpSpec::id
std::string id
Definition: dpSpec.hpp:61
daq::ObservableStatus::DeferSignal
friend class DeferSignal
Definition: status.hpp:320
daq::dpm
Definition: testDpSpec.cpp:16
daq::dpm::DaqControllerImpl::RsyncFactory
std::function< std::unique_ptr< RsyncAsyncProcessIf >(boost::asio::io_context &, std::string, std::string, RsyncOptions const &, RsyncAsyncProcess::DryRun)> RsyncFactory
Definition: scheduler.hpp:487
daq::dpm::ResourceToken::~ResourceToken
~ResourceToken() noexcept
Definition: scheduler.cpp:45
daq::MakeAlert
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
Definition: status.cpp:29
daq::IsFinalState
bool IsFinalState(State state) noexcept
Query whether state is in a final state.
Definition: state.cpp:15
daq::Status::id
std::string id
Definition: status.hpp:136
daq::dpm::DaqControllerImpl::GetId
auto GetId() const noexcept -> std::string const &override
Definition: scheduler.cpp:470
daq::dpm::DaqControllerImpl::ProcFactory
std::function< std::unique_ptr< AsyncProcessIf >(boost::asio::io_context &, std::vector< std::string >)> ProcFactory
Definition: scheduler.hpp:490
daq::config::Origin
Origin
Configuration origins in descending priority.
Definition: manager.hpp:30
daq::Status
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:120
daq::Status::state
State state
Definition: status.hpp:138
daq::dpm::DaqControllerOptions
Options for DaqController.
Definition: scheduler.hpp:162
daq::DpSpec::target
Target target
Definition: dpSpec.hpp:62
daq::dpm::Resource
Definition: scheduler.hpp:182
daq::dpm::SchedulerImpl::GetDaqStatus
Status GetDaqStatus(std::string const &id) const override
Queries current DAQ status, possibly from last recorded status in workspace.
Definition: scheduler.cpp:169
daq::dpm::Resources
Limited resources.
Definition: scheduler.hpp:231
daq::ObservableStatus::SetState
void SetState(State s, std::optional< bool > error=std::nullopt) noexcept
Set state of data acquisition.
Definition: status.cpp:192
daq::dpm::SchedulerImpl::DaqControllerFactory
std::function< std::unique_ptr< DaqController >(std::unique_ptr< DaqWorkspace >, Resources &)> DaqControllerFactory
Definition: scheduler.hpp:349
daq::dpm::SchedulerImpl::SchedulerImpl
SchedulerImpl(rad::IoExecutor &executor, Workspace &workspace, DaqControllerFactory daq_controller_factory, SchedulerOptions const &options)
Constructs a scheduler loading information from workspace ws.
Definition: scheduler.cpp:63
error
Definition: main.cpp:23
daq::dpm::operator<<
std::ostream & operator<<(std::ostream &os, DaqController const &daq)
Definition: scheduler.cpp:58
daq::Status::file_id
std::string file_id
Definition: status.hpp:137
daq::State::DAQ states handled by OCM.
DAQ states handled by OCM.
Initial state of data acquisition.
daq::ParseSourceOrigin
Origin ParseSourceOrigin(std::string const &origin_str)
Parse origin string from DpSpec into component parts.
Definition: dpSpec.cpp:251
daq::json
nlohmann::json json
Definition: json.cpp:20
daq::dpm::SchedulerImpl::AbortDaq
void AbortDaq(std::string const &) override
Abort merging DAQ identified by id.
Definition: scheduler.cpp:184