ifw-daq  3.0.1
IFW Data Acquisition modules
manager.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_common_libdaq
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief Definition of `daq::ManagerImpl` and related utilities.
7  */
8 #include <daq/manager.hpp>
9 
10 #include <algorithm>
11 #include <stdexcept>
12 #include <time.h>
13 
14 #include <fmt/format.h>
15 #include <fmt/ostream.h>
16 #include <log4cplus/loggingmacros.h>
17 #include <mal/Mal.hpp>
18 
19 #include <daq/conversion.hpp>
20 #include <daq/daqController.hpp>
21 #include <daq/error/report.hpp>
22 #include <daq/status.hpp>
23 #include <daq/workspace.hpp>
24 
25 #include <daq/op/awaitState.hpp>
26 #include <daq/op/initiate.hpp>
27 
28 namespace daq {
29 bool IsStale(ManagerParams const& params,
30  State state,
31  std::chrono::system_clock::time_point creation_time) {
32  auto now = std::chrono::system_clock::now();
33  if (IsFinalState(state)) {
34  return true;
35  }
36  auto full = MakeState(state);
37  if (full.state == daqif::StateAcquiring) {
38  return now > creation_time + params.acquiring_stale_age;
39  }
40  if (full.state == daqif::StateMerging) {
41  return now > creation_time + params.merging_stale_age;
42  }
43  return false;
44 }
45 
46 std::string MakeIdCandidate(char const* instrument_id,
47  unsigned jitter,
48  std::chrono::system_clock::time_point* tp) {
49  using time_point = std::chrono::system_clock::time_point;
50  using duration = std::chrono::system_clock::duration;
51  using seconds = std::chrono::seconds;
52  using microseconds = std::chrono::microseconds;
53  // 'KMOS.2017-06-02T16:45:55.701'
54  // olas-id must be <= 5 characters
55  // Format: <olas>-<strfmtime>.<frac>
56  // Size: 1-5 1 20 1 3 = 30
57  // chrono formatting is not provided until C++20
58  struct timeval tv;
59  struct timeval jitter_tv {
60  0, jitter * 1000
61  }; // 1 ms
62  if (gettimeofday(&tv, nullptr) != 0) {
63  // GCOVR_EXCL_START
64  throw boost::enable_current_exception(std::system_error(errno, std::generic_category()));
65  // GCOVR_EXCL_STOP
66  }
67  // Add jitter
68  struct timeval res;
69  timeradd(&tv, &jitter_tv, &res);
70  tv = res;
71 
72  struct tm tm_time;
73  time_t time = tv.tv_sec;
74  if (gmtime_r(&time, &tm_time) == nullptr) {
75  // GCOVR_EXCL_START
76  throw boost::enable_current_exception(std::system_error(errno, std::generic_category()));
77  // GCOVR_EXCL_STOP
78  }
79  char time_str[31] = {0};
80  int n = snprintf(&time_str[0], 7, "%.5s.", instrument_id);
81  // This part is always 20 characters long
82  strftime(&time_str[n], 20, "%Y-%m-%dT%H:%M:%S", &tm_time);
83  char frac_str[5];
84  snprintf(&frac_str[0], 5, ".%.3d", static_cast<int>(tv.tv_usec / 1000.0));
85  // Append the fractional part
86  strncpy(&time_str[n + 19], &frac_str[0], 4);
87  if (tp != nullptr) {
88  // Store resulting time in out optional out param
89  *tp = time_point(
90  std::chrono::duration_cast<duration>(seconds(tv.tv_sec) + microseconds(tv.tv_usec)));
91  }
92  return std::string(time_str, n + 23);
93 }
94 
96  ManagerParams params,
97  Workspace& workspace,
98  std::shared_ptr<ObservableEventLog> event_log,
99  DaqControllerFactory& daq_factory,
100  std::shared_ptr<DpmClient> dpm_client,
101  log4cplus::Logger const& logger)
102  : m_alive_token(std::make_shared<bool>())
103  , m_executor(executor)
104  , m_params(std::move(params))
105  , m_workspace(workspace)
106  , m_event_log(std::move(event_log))
107  , m_daq_factory(daq_factory)
108  , m_dpm_client(std::move(dpm_client))
109  , m_logger(logger) {
110 }
111 
113  // Abort any ongoing operations
114  for (auto& op : m_abort_funcs) {
115  try {
116  op.Abort();
117  } catch (std::exception const& e) {
118  LOG4CPLUS_WARN(m_logger,
119  fmt::format("ManagerImpl::~ManagerImpl: Error when aborting "
120  "operation {}: {}",
121  op.GetId(),
122  e.what()));
123  }
124  }
125 }
126 
128  LOG4CPLUS_INFO(m_logger, "RestoreFromWorkspace: Starting");
129  auto ids = m_workspace.LoadList();
130  // New list of pruned DAQs
131  decltype(ids) pruned;
132 
133  for (auto const& id : ids) {
134  try {
135  LOG4CPLUS_INFO(m_logger, "RestoreFromWorkspace: Loading state for DAQ " << id);
136  auto context = m_workspace.LoadContext(id);
137  auto status = m_workspace.LoadStatus(id);
138 
139  // Ignore stale DAQs
140  if (IsStale(m_params, status.state, context.creation_time)) {
141  LOG4CPLUS_INFO(m_logger,
142  "RestoreFromWorkspace: DAQ " << status << " is stale -> archiving");
143  m_workspace.ArchiveDaq(id);
144  continue;
145  }
146 
147  auto full = MakeState(status.state);
148  // DAQ should be loaded.
149  if (full.state == daqif::StateAcquiring) {
150  LOG4CPLUS_INFO(m_logger,
151  "RestoreFromWorkspace: Restoring OCM phase DAQ: " << status);
152  auto daq = m_daq_factory.MakeOcmPhase(
153  std::move(context), std::make_shared<ObservableStatus>(status), m_event_log);
154  assert(daq);
155  AddDaq(daq, Store::No);
156  // We keep this
157  pruned.push_back(id);
158  } else if (full.state == daqif::StateMerging) {
159  LOG4CPLUS_INFO(m_logger,
160  "RestoreFromWorkspace: Restoring DPM phase DAQ: " << status);
161  auto daq = m_daq_factory.MakeDpmPhase(
162  std::move(context), std::make_shared<ObservableStatus>(status), m_event_log);
163  assert(daq);
164  AddDaq(daq, Store::No);
165 
166  // We keep this
167  pruned.push_back(id);
168  } else {
169  LOG4CPLUS_INFO(
170  m_logger,
171  "RestoreFromWorkspace: Skipping loading DAQ in unsupported state: " << status);
172  }
173  } catch (...) {
174  error::NestedExceptionReporter r(std::current_exception());
175  LOG4CPLUS_ERROR(m_logger,
176  "RestoreFromWorkspace: Loading state for DAQ "
177  << id << " failed (ignoring): " << r);
178  try {
179  m_workspace.ArchiveDaq(id);
180  } catch (...) {
181  error::NestedExceptionReporter r(std::current_exception());
182  LOG4CPLUS_ERROR(m_logger,
183  "RestoreFromWorkspace: Failed to archive DAQ " << id
184  << "(ignoring): \n"
185  << r);
186  }
187  LOG4CPLUS_INFO(m_logger, "RestoreFromWorkspace: Skipping " << id);
188  }
189  }
190 
191  // Write back pruned list of DAQs
192  m_workspace.StoreList(pruned);
193 
194  LOG4CPLUS_INFO(m_logger, "RestoreFromWorkspace: Successfully completed");
195 } catch (...) {
196  LOG4CPLUS_INFO(m_logger, "RestoreFromWorkspace: Failed");
197  std::throw_with_nested(std::runtime_error("Failed to restore from workspace"));
198 }
199 
200 std::string ManagerImpl::MakeDaqId(std::chrono::system_clock::time_point* time) const {
201  for (unsigned jitter = 0;; ++jitter) {
202  auto id_candidate = daq::MakeIdCandidate(m_params.instrument_id.c_str(), jitter, time);
203  if (!HaveDaq(id_candidate, id_candidate)) {
204  return id_candidate;
205  }
206  }
207 }
208 
209 bool ManagerImpl::HaveDaq(std::string_view id, std::string_view file_id) const noexcept {
210  assert(!id.empty());
211  auto it =
212  std::find_if(m_daq_controllers.begin(), m_daq_controllers.end(), [=](auto const& daq) {
213  // Return true if daq is equal
214  // Return true if file_id is non-empty and equal
215  return daq.id == id ||
216  (!file_id.empty() && file_id == daq.controller->GetContext().file_id);
217  });
218  if (it != m_daq_controllers.end()) {
219  LOG4CPLUS_DEBUG(m_logger,
220  "Manager: Found conflicting DAQ: id="
221  << id << ", file_id=" << file_id << " with existing: id=" << it->id
222  << ", file_id=" << it->controller->GetContext().file_id);
223  return true;
224  }
225  return false;
226 }
227 
228 void ManagerImpl::AddInitialKeywords(DaqContext& ctx) {
229  // note: ARCFILE and ORIGFILE is added by daqDpmMerge
231  kws.emplace_back(std::in_place_type<fits::ValueKeyword>, "ORIGIN", m_params.origin);
232  kws.emplace_back(std::in_place_type<fits::ValueKeyword>, "INSTRUME", m_params.instrument_id);
233  // @todo:
234  // - TELESCOP
235  // - DATE?
236 
237  // Update so that OCM keywords are added first (DaqContext may already contain keywords from
238  // request).
240  ctx.keywords.swap(kws);
241 }
242 
243 void ManagerImpl::AddDaq(std::shared_ptr<DaqController> const& daq, Store store) {
244  assert(daq);
245  LOG4CPLUS_INFO(m_logger, "Manager: AddDaq: Attempting to add DAQ " << daq->GetId());
246  if (daq->GetId().empty()) {
247  throw boost::enable_current_exception(std::invalid_argument("DaqController has empty id!"));
248  }
249  if (daq->GetContext().file_id.empty()) {
250  throw boost::enable_current_exception(
251  std::invalid_argument("DaqController has empty file_id"));
252  }
253  if (HaveDaq(daq->GetId(), daq->GetContext().file_id)) {
254  throw boost::enable_current_exception(
255  std::invalid_argument("DaqController with same id already exists"));
256  }
257 
258  if (store == Store::Yes) {
259  // Requested to store DAQ (i.e. it was not loaded from workspace)
260  m_workspace.StoreContext(daq->GetContext());
261  m_workspace.StoreStatus(*daq->GetStatus());
262  }
263 
264  if (IsActiveDpmState(daq->GetState())) {
265  // Start monitoring to recover from e.g. state deviation due to OCM being offline
266  // and not receiving published status updates from DPM.
267  m_dpm_client->StartMonitorStatus(daq->GetId());
268  }
269 
270  m_daq_controllers.emplace_back(
271  daq->GetId(),
272  daq,
273  daq->GetStatus()->ConnectObserver([alive = std::weak_ptr<bool>(m_alive_token),
274  prev_state = daq->GetState(),
275  this](ObservableStatus const& status) mutable {
276  if (alive.expired()) {
277  LOG4CPLUS_INFO("daq", "Manager has expired");
278  return;
279  }
280  if (IsFinalState(status.GetState())) {
281  if (!IsFinalState(prev_state)) {
282  // Transition to final state -> Archive DAQ
283  LOG4CPLUS_INFO(
284  m_logger,
285  fmt::format("DAQ transitioned to a final state -> archiving: {} (prev {})",
286  status,
287  prev_state));
288  m_workspace.StoreStatus(status);
289 
290  m_executor.submit([alive = alive, id = status.GetId(), this] {
291  if (alive.expired()) {
292  LOG4CPLUS_INFO("daq", "Manager has expired");
293  return;
294  }
295  ArchiveDaq(id);
296  });
297  }
298  } else { // Not any final state
299  m_workspace.StoreStatus(status);
300 
301  // Handle handover
302  if (prev_state != State::Stopped && status.GetState() == State::Stopped) {
303  // If DAQ is stopped we need to move it to the merging phase
304  // To be safe we defer the execution.
305  // Manager lives as long as executor so we don't have to
306  // check liveness.
307  m_executor.submit([alive = alive, id = status.GetId(), this] {
308  if (alive.expired()) {
309  LOG4CPLUS_INFO("daq", "Manager has expired");
310  return;
311  }
312  MoveToMergePhase(id);
313  });
314  } else if (prev_state == State::NotScheduled &&
315  IsActiveDpmState(status.GetState())) {
316  // Transition from NotScheduled -> Scheduled+ transfers responsibility to DPM.
317  // As such we want to monitor status in case status updates are not received.
318  m_dpm_client->StartMonitorStatus(status.GetId());
319  }
320  }
321 
322  // Signal other observers.
323  this->m_status_signal.Signal(status);
324 
325  prev_state = status.GetState();
326  }),
327  daq->ConnectContext(
328  [alive = std::weak_ptr<bool>(m_alive_token), this](DaqContext const& ctx) {
329  if (alive.expired()) {
330  LOG4CPLUS_INFO("daq", "Manager has expired");
331  return;
332  }
333  m_workspace.StoreContext(ctx);
334  }));
335 
336  if (store == Store::Yes) {
337  // Store daq list
338  StoreActiveDaqs();
339  }
340 
341  // Notify observers that DAQ was added.
342  m_status_signal.Signal(*daq->GetStatus());
343 
344  if (daq->GetState() == State::NotScheduled) {
345  ScheduleDaqsAsync();
346  }
347 }
348 
349 void ManagerImpl::RemoveDaq(std::string_view id) {
350  auto it = std::find_if(m_daq_controllers.begin(),
351  m_daq_controllers.end(),
352  [id](auto const& daq) { return daq.id == id; });
353  if (it == m_daq_controllers.end()) {
354  throw boost::enable_current_exception(
355  std::invalid_argument(fmt::format("Remove DAQ failed - no id found: {}", id)));
356  }
357  m_daq_controllers.erase(it);
358 }
359 
360 void ManagerImpl::ArchiveDaq(std::string const& id) {
361  LOG4CPLUS_INFO(m_logger, fmt::format("Moving DAQ to archive: id={}", id));
362 
363  // Since DAQ has been completed we can stop monitoring for changes.
364  m_dpm_client->StopMonitorStatus(id);
365 
366  // Archive persistent storage
367  m_workspace.ArchiveDaq(id);
368 
369  // Remove daq controller
370  RemoveDaq(id);
371 
372  StoreActiveDaqs();
373  LOG4CPLUS_INFO(m_logger, fmt::format("Moving DAQ to archive done: id={}", id));
374 }
375 
376 void ManagerImpl::StoreActiveDaqs() const {
377  LOG4CPLUS_INFO(m_logger, "StoreActiveDaqs()");
378  // And remove from
379  std::vector<std::string> daqs;
380  for (Daq const& daq : m_daq_controllers) {
381  assert(daq.controller);
382  if (!IsFinalState(daq.controller->GetState())) {
383  daqs.push_back(daq.id);
384  }
385  }
386  m_workspace.StoreList(daqs);
387 }
388 
389 Status ManagerImpl::GetStatus(std::string_view id) const {
390  auto* daq = FindDaq(id);
391  if (daq) {
392  return daq->GetStatus()->GetStatus();
393  }
394  // Try to find archived status
395  auto maybe_status = m_workspace.LoadArchivedStatus(std::string(id));
396  if (maybe_status) {
397  return *maybe_status;
398  } else {
399  throw boost::enable_current_exception(
400  std::invalid_argument(fmt::format("DaqController with id '{}' does not exist", id)));
401  }
402 }
403 
404 ManagerImpl::Daq::Daq(std::string id_arg,
405  std::shared_ptr<DaqController> controller_arg,
406  boost::signals2::connection conn_status_arg,
407  boost::signals2::connection conn_context_arg) noexcept
408  : id(std::move(id_arg))
409  , controller(std::move(controller_arg))
410  , conn_status(std::move(conn_status_arg))
411  , conn_context(std::move(conn_context_arg)) {
412 }
413 
414 DaqController const* ManagerImpl::FindDaq(std::string_view id) const noexcept {
415  return const_cast<ManagerImpl*>(this)->FindDaq(id);
416 }
417 
418 DaqController* ManagerImpl::FindDaq(std::string_view id) noexcept {
419  auto it = std::find_if(m_daq_controllers.begin(),
420  m_daq_controllers.end(),
421  [id](auto const& daq) { return daq.id == id; });
422  if (it != m_daq_controllers.end()) {
423  return it->controller.get();
424  }
425  return nullptr;
426 }
427 
428 DaqController& ManagerImpl::FindDaqOrThrow(std::string_view id) {
429  auto daq_ptr = FindDaq(id);
430 
431  if (!daq_ptr) {
432  throw boost::enable_current_exception(std::invalid_argument(
433  fmt::format("DaqController with id '{}' does not exist", std::string(id))));
434  }
435  return *daq_ptr;
436 }
437 
438 DaqController const& ManagerImpl::FindDaqOrThrow(std::string_view id) const {
439  return const_cast<ManagerImpl*>(this)->FindDaqOrThrow(id);
440 }
441 
442 void ManagerImpl::MoveToMergePhase(std::string_view id) {
443  auto* daq = FindDaq(id);
444  if (!daq) {
445  LOG4CPLUS_WARN(m_logger, fmt::format("Daq requested to move does not exist: id={}", id));
446  return;
447  }
448  LOG4CPLUS_INFO(m_logger, fmt::format("Moving DAQ to merge-phase: id={}", id));
449  // Copy state we want to keep.
450  auto ctx = daq->GetContext();
451  auto status = daq->GetStatus();
452  auto event_log = daq->GetEventLog();
453  // Delete old DAQ before creating new
454  // note: this invalidates "daq"
455  RemoveDaq(id);
456 
457  // Manually transition to first state in Merging
458  status->SetState(State::NotScheduled);
459 
460  auto new_daq =
461  m_daq_factory.MakeDpmPhase(std::move(ctx), std::move(status), std::move(event_log));
462  AddDaq(new_daq);
463 }
464 
465 boost::future<State> ManagerImpl::StartDaqAsync(DaqContext ctx) {
466  try {
467  AddInitialKeywords(ctx);
468 
469  auto id = ctx.id;
470  auto file_id = ctx.file_id;
471  auto daq = m_daq_factory.MakeOcmPhase(
472  std::move(ctx),
473  std::make_shared<ObservableStatus>(std::move(id), std::move(file_id)),
474  m_event_log);
475  assert(daq);
476  AddDaq(daq);
477  return daq->StartAsync()
478  .then(m_executor,
479  [&, daq](boost::future<State> f) -> boost::future<State> {
480  if (f.has_exception()) {
481  // Any error during start may lead to partially started acquisition that
482  // we need to abort
483  return daq->AbortAsync(ErrorPolicy::Tolerant)
484  .then(m_executor,
485  [f = std::move(f)](boost::future<Status>) mutable -> State {
486  // We ignore errors from AbortAsync as we can't do anything
487  // about it Then we return original error
488  f.get(); // throws
489  __builtin_unreachable();
490  });
491  } else {
492  return boost::make_ready_future<State>(f.get());
493  }
494  })
495  .unwrap();
496  } catch (...) {
497  return boost::make_exceptional_future<State>();
498  }
499 }
500 
501 boost::future<Status> ManagerImpl::StopDaqAsync(std::string_view id, ErrorPolicy policy) {
502  try {
503  return FindDaqOrThrow(id).StopAsync(policy);
504  } catch (...) {
505  return boost::make_exceptional_future<Status>();
506  }
507 }
508 
509 boost::future<Status> ManagerImpl::AbortDaqAsync(std::string_view id, ErrorPolicy policy) {
510  try {
511  return FindDaqOrThrow(id).AbortAsync(policy);
512  } catch (...) {
513  return boost::make_exceptional_future<Status>();
514  }
515 }
516 
517 boost::future<Result<Status>> ManagerImpl::AwaitDaqStateAsync(std::string_view id,
518  State state,
519  std::chrono::milliseconds timeout) {
520  try {
521  auto* maybe_daq = FindDaq(id);
522  if (!maybe_daq) {
523  // It could be the DAQ just completed and await should be released because of that.
524  auto status = GetStatus(id);
525  if (!IsSubsequentState(state, status.state)) {
526  LOG4CPLUS_INFO(m_logger,
527  fmt::format("{}: Await condition already fulfilled (archived).", status));
528  // Condition already fulfilled.
529  return boost::make_ready_future<Result<Status>>({false, status});
530  }
531  // Since DAQ was archived we cannot wait anyway so we return exception
532  throw boost::enable_current_exception(std::invalid_argument(
533  fmt::format("DaqController with id '{}' is archived and cannot be awaited", id)));
534  }
535 
536  // DAQ is active
537  auto& daq = *maybe_daq;
538  auto status = daq.GetStatus();
539  if (!IsSubsequentState(state, daq.GetState())) {
540  LOG4CPLUS_INFO(m_logger,
541  fmt::format("{}: Await condition already fulfilled.", *status));
542  // Condition already fulfilled.
543  return boost::make_ready_future<Result<Status>>({false, *status});
544  } else {
545  // Create child logger for await state.
546  auto logger = log4cplus::Logger::getInstance(m_logger.getName() + ".awaitstate");
547  auto [fut, abort] = op::InitiateAbortableOperation<op::AwaitStateAsync>(
548  m_executor.get_io_context(), status, state, timeout, logger);
549  // Store abort function so when Manager is deleted it will
550  auto& ref = m_abort_funcs.emplace_back(std::move(abort));
551  LOG4CPLUS_DEBUG("daq.manager",
552  fmt::format("op::AwaitStateAsync initiated. id={}", ref.GetId()));
553  return fut.then(
554  m_executor,
555  [this, id = ref.GetId(), alive = std::weak_ptr<bool>(m_alive_token)](auto res) {
556  LOG4CPLUS_DEBUG("daq.manager",
557  fmt::format("op::AwaitStateAsync completed. id={}", id));
558  // Remove abort function since operation completed, but only if
559  // object is alive.
560  auto is_alive = !alive.expired();
561  if (is_alive) {
562  // Manager is still alive, so we remove abort function
563  RemoveAbortFunc(id);
564  }
565  return res.get();
566  });
567  }
568  } catch (...) {
569  return boost::make_exceptional_future<Result<Status>>();
570  }
571 }
572 
573 void ManagerImpl::UpdateKeywords(std::string_view id, fits::KeywordVector const& keywords) {
574  return FindDaqOrThrow(id).UpdateKeywords(keywords);
575 }
576 
577 StatusSignal& ManagerImpl::GetStatusSignal() {
578  return m_status_signal;
579 }
580 
581 std::vector<std::shared_ptr<DaqController const>> ManagerImpl::GetDaqControllers() {
582  std::vector<std::shared_ptr<DaqController const>> controllers;
583  controllers.reserve(m_daq_controllers.size());
584  std::transform(m_daq_controllers.begin(),
585  m_daq_controllers.end(),
586  std::back_inserter(controllers),
587  [](auto const& daq) { return daq.controller; });
588  return controllers;
589 }
590 
591 void ManagerImpl::RemoveAbortFunc(uint64_t id) noexcept {
592  try {
593  m_abort_funcs.erase(std::remove_if(m_abort_funcs.begin(),
594  m_abort_funcs.end(),
595  [id](auto const& obj) { return id == obj.GetId(); }),
596  m_abort_funcs.end());
597  } catch (...) {
598  }
599 }
600 
601 ManagerImpl::OpAbortFunc::OpAbortFunc(OpAbortFunc::Func&& func)
602  : m_id(NextId()), m_func(std::move(func)) {
603  assert(m_func);
604 }
605 
606 uint64_t ManagerImpl::OpAbortFunc::GetId() const noexcept {
607  return m_id;
608 }
609 
610 bool ManagerImpl::OpAbortFunc::Abort() noexcept {
611  return m_func();
612 }
613 
614 uint64_t ManagerImpl::OpAbortFunc::NextId() {
615  static uint64_t next_id = 0;
616  return next_id++;
617 }
618 
619 void ManagerImpl::ScheduleDaqsAsync() {
620  LOG4CPLUS_TRACE(m_logger, "ScheduleDaqAsync()");
621  // Regardless if caller was invoked from timer or manually we reset the deadline timer
622  // so that timer is restarted when ScheduleMergeAsync completes as necessary.
623  m_schedule_retry.reset();
624 
625  for (auto& daq : m_daq_controllers) {
626  if (daq.controller->GetState() != State::NotScheduled) {
627  continue;
628  }
629  daq.controller->ScheduleMergeAsync().then(
630  m_executor, [id = daq.id, this](boost::future<State> reply) {
631  if (!reply.has_exception()) {
632  LOG4CPLUS_INFO(
633  m_logger,
634  fmt::format("ScheduleDaqAsyn: Successfully scheduled DAQ '{}'", id));
635  // Success
636  return;
637  }
638  LOG4CPLUS_WARN(
639  m_logger,
640  fmt::format("ScheduleDaqAsyn: Failed to schedule DAQ '{}', will retry in 60s",
641  id));
642  // Some kind of error happened, at this point we don't care what it is
643  // but simply schedule a new attempt using deadline timer, unless already scheduled.
644  if (m_schedule_retry) {
645  // Already scheduled..
646  return;
647  }
648  m_schedule_retry.emplace(m_executor.get_io_context(),
649  boost::posix_time::seconds(60));
650  m_schedule_retry->async_wait([this](boost::system::error_code const& error) {
651  if (error) {
652  return;
653  }
654  ScheduleDaqsAsync();
655  });
656  });
657  }
658 }
659 
660 } // namespace daq
Contains declaration for the AwaitStateAsync operation.
Abstract factory for DaqControllers.
virtual auto MakeDpmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log) -> std::shared_ptr< DaqController >=0
Create instance for the DPM phase of the DAQ process.
virtual auto MakeOcmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log) -> std::shared_ptr< DaqController >=0
Create instance for the OCM phase of the DAQ process.
void RestoreFromWorkspace() override
Loads status and constructs DaqControllers corresponding to stored state.
Definition: manager.cpp:127
bool HaveDaq(std::string_view id, std::string_view file_id={}) const noexcept override
Query existing data acquisition by id and optional file_id.
Definition: manager.cpp:209
ManagerImpl(rad::IoExecutor &executor, ManagerParams params, Workspace &workspace, std::shared_ptr< ObservableEventLog > event_log, DaqControllerFactory &daq_factory, std::shared_ptr< DpmClient > dpm_client, log4cplus::Logger const &logger)
Definition: manager.cpp:95
~ManagerImpl() noexcept
Definition: manager.cpp:112
std::string MakeDaqId(std::chrono::system_clock::time_point *time=nullptr) const override
Creates a new unique identifier based on the instrument id and current time.
Definition: manager.cpp:200
Observes any status.
Definition: manager.hpp:78
Interface to interact with DPM workspace.
Definition: workspace.hpp:32
virtual auto LoadList() const -> std::vector< std::string >=0
virtual void ArchiveDaq(std::string const &id)=0
Archives specified DAQ without deleting any files, typically by moving files it to a specific locatio...
virtual auto LoadContext(std::string const &id) const -> DaqContext=0
Get file name of the data product specification stored in StoreSpecification()
virtual void StoreList(std::vector< std::string > const &queue) const =0
virtual void StoreStatus(Status const &status) const =0
Loads last archived DAQ status if any.
virtual auto LoadStatus(std::string const &id) const -> Status=0
Loads last archived DAQ status if any.
virtual void StoreContext(DaqContext const &context) const =0
Get file name of the data product specification stored in StoreSpecification()
Adapter object intended to be used in contexts without direct access to the output-stream object.
Definition: report.hpp:54
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
boost::asio::io_context & get_io_context() noexcept
Not part of the boost::thread::executor concept.
Definition: ioExecutor.hpp:41
daq::Workspace interface and implementation declaration
Contains support functions for daqif.
Contains declarations for the helper functions to initiate operations.
Declaration of daq::Manager
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
Definition: keyword.hpp:414
void UpdateKeywords(KeywordVector &to, KeywordVector const &from, ConflictPolicy policy=ConflictPolicy::Replace)
Updates to with keywords from from.
Definition: keyword.cpp:554
@ Skip
Skip keyword that conflicts.
std::string origin
Definition: manager.hpp:41
std::string MakeIdCandidate(char const *instrument_id, unsigned jitter=0, std::chrono::system_clock::time_point *out=nullptr)
Creates a DAQ id candidate that may or may not be unique.
Definition: manager.cpp:46
bool IsActiveDpmState(State state) noexcept
Query whether state is an active (non-final) state executed by DPM.
Definition: state.cpp:30
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
Definition: conversion.cpp:63
bool IsStale(ManagerParams const &params, State state, std::chrono::system_clock::time_point creation_time)
Definition: manager.cpp:29
void UpdateKeywords(DaqContext &ctx, fits::KeywordVector const &keywords)
Updates (adds or replaces) primary HDU keywords.
Definition: daqContext.cpp:29
std::chrono::hours merging_stale_age
Age of DAQ in merging state, after which it is automatically considered abandoned and will be archive...
Definition: manager.hpp:53
bool IsSubsequentState(State state1, State state2) noexcept
Compares states and returns whether state1 occurs after state2.
Definition: state.cpp:43
ErrorPolicy
Error policy supported by certain operations.
Definition: error.hpp:25
std::string instrument_id
Instrument identifier.
Definition: manager.hpp:40
State
Observable states of the data acquisition process.
Definition: state.hpp:39
@ NotScheduled
Before daq is acknowledged by dpm it remains in NotScheduled.
@ Stopped
All data sources have reported they have stopped acquiring data.
bool IsFinalState(State state) noexcept
Query whether state is in a final state.
Definition: state.cpp:15
std::chrono::hours acquiring_stale_age
Age of DAQ in acquiring state after which it is automatically considered abandoned and will be archiv...
Definition: manager.hpp:47
Configurations parameters directly related to manager.
Definition: manager.hpp:36
Definition: main.cpp:23
Contains declaration for for DaqController.
Contains declaration for Status and ObservableStatus.
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:44
std::vector< daq::fits::KeywordVariant > keywords
Keyword list provided by OCM to Data Product.
Definition: daqContext.hpp:87
std::string file_id
Data Product FileId as specified by OLAS ICD.
Definition: daqContext.hpp:65
std::string id
DAQ identfier, possibly provided by user.
Definition: daqContext.hpp:60
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:153
auto const & transform