ifw-daq  3.0.1
IFW Data Acquisition modules
daqController.cpp
Go to the documentation of this file.
1 #include <daq/daqController.hpp>
2 
3 #include <iostream>
4 #include <stdexcept>
5 #include <type_traits>
6 
7 #include <fmt/format.h>
8 #include <fmt/ostream.h>
9 #include <log4cplus/loggingmacros.h>
10 #include <mal/MalException.hpp>
11 #include <mal/rr/qos/ReplyTime.hpp>
12 #include <nlohmann/json.hpp>
13 
14 #include <daq/dpmClient.hpp>
15 #include <daq/error.hpp>
16 #include <daq/fits/json.hpp>
17 #include <daq/op/abort.hpp>
18 #include <daq/op/awaitPrim.hpp>
19 #include <daq/op/initiate.hpp>
20 #include <daq/op/start.hpp>
21 #include <daq/op/stop.hpp>
22 
23 namespace daq {
24 namespace {
25 
26 DaqSources MakeSources(mal::Mal& mal, DaqContext const& ctx) {
27  std::vector<PrimSource> psources;
28  std::vector<MetaSource> msources;
29 
30  psources.reserve(ctx.prim_sources.size());
31  for (auto const& raw : ctx.prim_sources) {
32  psources.emplace_back(
33  raw.name,
34  mal.getClient<daq::PrimSource::RrClient>(
35  mal::Uri(raw.rr_uri),
36  {std::make_shared<elt::mal::rr::qos::ReplyTime>(std::chrono::seconds(15))},
37  {}));
38  }
39 
40  msources.reserve(ctx.meta_sources.size());
41  for (auto const& raw : ctx.meta_sources) {
42  msources.emplace_back(
43  raw.name,
44  mal.getClient<daq::MetaSource::RrClient>(
45  mal::Uri(raw.rr_uri),
46  {std::make_shared<elt::mal::rr::qos::ReplyTime>(std::chrono::seconds(15))},
47  {}));
48  }
49 
50  return DaqSources(std::move(psources), std::move(msources));
51 }
52 
53 template <class>
54 inline constexpr bool always_false_v = false; // NOLINT
55 
56 template <class Sources>
57 bool AllInState(Sources sources, State state) {
58  return std::all_of(
59  sources.begin(), sources.end(), [=](auto const& s) -> bool { return s.state == state; });
60 }
61 
62 } // namespace
63 
65  : start([](auto par) { return daq::op::InitiateOperation<daq::op::StartAsync>(par); })
66  , abort([](ErrorPolicy policy, auto par) {
67  return daq::op::InitiateOperation<op::AbortAsync>(policy, std::move(par));
68  })
69  , stop([](ErrorPolicy policy, auto par) {
70  return daq::op::InitiateOperation<op::StopAsync>(policy, std::move(par));
71  })
72  , await_prim([](auto par) {
73  return daq::op::InitiateAbortableOperation<op::AwaitPrimAsync>(std::move(par));
74  }) {
75 }
76 
77 bool OcmAsyncOperations::IsValid() const noexcept {
78  return start && stop && abort && await_prim;
79 }
80 
82  elt::mal::Mal& mal,
83  std::shared_ptr<DpmClient> dpm_client)
84  : m_io_ctx(io_ctx), m_mal(mal), m_async_ops(), m_dpm_client(std::move(dpm_client)) {
85  assert(m_dpm_client);
86 }
87 
89  std::shared_ptr<ObservableStatus> status,
90  std::shared_ptr<ObservableEventLog> event_log)
91  -> std::shared_ptr<DaqController> {
92  DaqSources sources = MakeSources(m_mal, daq_ctx);
93  return OcmDaqController::Create(m_io_ctx,
94  std::move(daq_ctx),
95  sources,
96  std::move(status),
97  std::move(event_log),
98  m_async_ops);
99 }
100 
102  std::shared_ptr<ObservableStatus> status,
103  std::shared_ptr<ObservableEventLog> event_log)
104  -> std::shared_ptr<DaqController> {
105  return std::make_shared<DpmDaqController>(
106  m_io_ctx, std::move(daq_ctx), std::move(status), std::move(event_log), m_dpm_client);
107 }
108 
109 std::ostream& operator<<(std::ostream& os, DaqController const& daq) {
110  os << "DaqController(id='" << daq.GetId() << "', state=" << daq.GetState() << ")";
111  return os;
112 }
113 
114 CommonDaqController::CommonDaqController(boost::asio::io_context& io_context,
115  DaqContext context,
116  std::shared_ptr<ObservableStatus> status,
117  std::shared_ptr<ObservableEventLog> event_log)
118  : m_io_ctx(io_context)
119  , m_executor(m_io_ctx)
120  , m_context(std::move(context))
121  , m_status(std::move(status))
122  , m_event_log(std::move(event_log)) {
123  assert(m_status);
124  assert(m_event_log);
125 }
126 
127 std::shared_ptr<ObservableStatus> CommonDaqController::GetStatus() DAQ_NOEXCEPT {
128  return m_status;
129 }
130 
131 std::shared_ptr<ObservableStatus const> CommonDaqController::GetStatus() const DAQ_NOEXCEPT {
132  return m_status;
133 }
134 
135 std::shared_ptr<ObservableEventLog> CommonDaqController::GetEventLog() DAQ_NOEXCEPT {
136  return m_event_log;
137 }
138 
139 std::string const& CommonDaqController::GetId() const DAQ_NOEXCEPT {
140  return m_status->GetId();
141 }
142 
144  return m_status->GetError();
145 }
146 
148  return m_context;
149 }
150 
151 boost::signals2::connection
152 CommonDaqController::ConnectContext(ContextSignal::slot_type const& slot) {
153  return m_sig_context.connect(slot);
154 }
155 
156 std::shared_ptr<OcmDaqController>
157 OcmDaqController::Create(boost::asio::io_context& io_context,
158  DaqContext context,
159  DaqSources const& sources,
160  std::shared_ptr<ObservableStatus> status,
161  std::shared_ptr<ObservableEventLog> event_log,
162  OcmAsyncOperations ops) {
163  // note: make_shared doesn't work since constructor is protected,
164  // to protect against non-shared ownership.
165  LOG4CPLUS_TRACE("daq", fmt::format("OcmDaqController::Create"));
166  return std::shared_ptr<OcmDaqController>(new OcmDaqController(io_context,
167  std::move(context),
168  sources,
169  std::move(status),
170  std::move(event_log),
171  std::move(ops)));
172 }
173 
174 OcmDaqController::OcmDaqController(boost::asio::io_context& io_context,
175  DaqContext context,
176  DaqSources const& sources,
177  std::shared_ptr<ObservableStatus> status,
178  std::shared_ptr<ObservableEventLog> event_log,
179  OcmAsyncOperations ops)
180  : CommonDaqController(io_context, std::move(context), std::move(status), std::move(event_log))
181 
182  , m_state(MakeState(GetStatusRef().GetState()))
183  , m_prim_sources(MakeSources<PrimSource>(sources.GetPrimarySources()))
184  , m_meta_sources(MakeSources<MetaSource>(sources.GetMetadataSources()))
185  , m_async_ops(std::move(ops))
186  , m_pending_replies(PendingReplies::Create())
187  , m_logger(log4cplus::Logger::getInstance("daq.ocm.controller")) {
188  if (GetContext().id != GetStatusRef().GetId()) {
189  throw boost::enable_current_exception(
190  std::invalid_argument("Data acquisition id mismatch between DaqContext "
191  "and ObservableStatus"));
192  }
193 
194  if (m_prim_sources.empty() && m_meta_sources.empty()) {
195  throw boost::enable_current_exception(std::invalid_argument("No data sources provided"));
196  }
197 
198  if (!m_async_ops.IsValid()) {
199  throw boost::enable_current_exception(
200  std::invalid_argument("OcmAsyncOperations is invalid"));
201  }
202 }
203 
205  State s;
206  std::visit(
207  [&](auto const& var) {
208  using T = std::decay_t<decltype(var)>;
209  if constexpr (std::is_same_v<T, NotStarted>) {
210  s = State::NotStarted;
211  } else if constexpr (std::is_same_v<T, Starting>) {
212  s = State::Starting;
213  } else if constexpr (std::is_same_v<T, Acquiring>) {
214  s = State::Acquiring;
215  } else if constexpr (std::is_same_v<T, Stopping>) {
216  s = State::Stopping;
217  } else if constexpr (std::is_same_v<T, Stopped>) {
218  s = State::Stopped;
219  } else if constexpr (std::is_same_v<T, Aborting>) {
221  } else if constexpr (std::is_same_v<T, Aborted>) {
222  s = State::Aborted;
223  } else {
224  static_assert(always_false_v<T>, "non-exhaustive visitor!");
225  }
226  },
227  m_state);
228  return s;
229 }
230 
231 constexpr log4cplus::Logger const& OcmDaqController::GetLogger() const noexcept {
232  return m_logger;
233 }
234 
236  switch (s) {
237  case State::NotStarted:
238  return StateVariant(NotStarted());
239  case State::Starting:
240  return StateVariant(Starting());
241  case State::Acquiring:
242  return StateVariant(Acquiring());
243  case State::Stopping:
244  return StateVariant(Stopping());
245  case State::Stopped:
246  return StateVariant(Stopped());
248  return StateVariant(Aborting());
249  case State::Aborted:
250  return StateVariant(Aborted());
251  default:
252  break;
253  };
254  LOG4CPLUS_FATAL(m_logger, fmt::format("Invalid state provided: '{}'", s));
255  std::terminate();
256 }
257 
259  GetStatusRef().SetError(error);
260 }
261 
263  m_state = s;
264  // Publish changes
265  GetStatusRef().SetState(GetState());
266 }
267 
270  GetEventLogRef(),
271  alerts,
272  GetIoExecutor(),
273  m_logger,
274  GetId(),
275  *m_pending_replies.get(),
278 }
279 
282  GetEventLogRef(),
283  alerts,
284  GetIoExecutor(),
285  m_logger,
286  GetId(),
287  *m_pending_replies.get(),
290  GetContext().await_interval);
291 }
292 
294  // Nothing yet.
295 }
296 
297 boost::future<State> OcmDaqController::StartAsync() {
299  ActionEvent(GetId(), "DaqController::StartAsync()", GetStatusRef().GetStatus()));
300 
301  // Make sure we're not already started.
302  if (!std::holds_alternative<NotStarted>(m_state)) {
303  return boost::make_exceptional_future<State>(
304  std::runtime_error("Data acquisition is already started"));
305  }
306 
307  SetState(Starting{});
308 
310 
311  auto alerts = std::make_unique<op::AlertState>();
312  return m_async_ops.start(MakeParams(*alerts.get()))
313  .then(GetIoExecutor(),
314  [alerts = std::move(alerts),
315  daq = std::static_pointer_cast<OcmDaqController>(shared_from_this())](
316  boost::future<void> f) {
317  LOG4CPLUS_INFO(daq->GetLogger(),
318  fmt::format("{}: StartAsync: Completed {}",
319  *daq,
320  f.has_value() ? "successfully" : "with error"));
321  // Merge alerts from async op
322  auto defer = ObservableStatus::DeferSignal(&daq->GetStatusRef());
323  op::MergeAlerts(daq->GetStatusRef(), *alerts);
324 
325  // Re-raise any exception
326  if (f.has_exception()) {
327  daq->SetErrorFlag(true);
328  (void)f.get();
329  }
330  // Await completion if there are primary data sources
331  daq->InitiateAwaitPrimarySources();
332 
333  // No exception -> all done! update and return Acquiring state
334  daq->SetErrorFlag(false);
335  daq->SetState(Acquiring{});
336  return daq->GetState();
337  });
338 }
339 
340 boost::future<Status> OcmDaqController::StopAsync(ErrorPolicy policy) {
342  ActionEvent(GetId(), "DaqController::StopAsync()", GetStatusRef().GetStatus()));
343 
344  if (std::holds_alternative<Stopped>(m_state)) {
345  return boost::make_exceptional_future<Status>(
346  std::runtime_error("Data acquisition already stopped"));
347  }
348 
349  if (std::holds_alternative<Aborted>(m_state)) {
350  return boost::make_exceptional_future<Status>(
351  std::runtime_error("Data acquisition already aborted"));
352  }
353 
354  if (!std::holds_alternative<Acquiring>(m_state)) {
355  return boost::make_exceptional_future<Status>(
356  std::runtime_error("Cannot stop a data acquisition that is not Acquiring"));
357  }
358 
359  // @todo If we're in Starting this will potentially mess up assumption that we're
360  // in Starting. Revisit to remove this assumption?
361  // @todo: Store produced files
362  // m_status.AddFiles(reply.getFiles());
363  SetState(Stopping{});
364 
365  // As we're asked to stop manually we stop waiting for primary sources
367 
368  auto alerts = std::make_unique<op::AlertState>();
369  return m_async_ops.stop(policy, MakeParams(*alerts.get()))
370  .then(GetIoExecutor(),
371  [alerts = std::move(alerts),
372  daq = std::static_pointer_cast<OcmDaqController>(shared_from_this())](
373  boost::future<Result<DpParts>> f) mutable -> Status {
374  if (daq->GetState() != State::Stopping) {
375  // It can happen that a request to stop was superseded and completed before
376  // reply was received. In this case we treat it as a failure as we cannot
377  // determine if post conditions are met.
378  LOG4CPLUS_WARN(
379  daq->GetLogger(),
380  fmt::format("{}: StopAsync: Data acquisition modified by other commands. "
381  "Do nothing else (errors are ignored). ",
382  *daq));
383  throw std::runtime_error(fmt::format(
384  "Stop command could not be completed because Data Acquisitions state was "
385  "modified in the meantime (current state: {})",
386  daq->GetState()));
387  }
388  LOG4CPLUS_INFO(daq->GetLogger(),
389  fmt::format("{}: StopAsync: Completed {}",
390  *daq,
391  f.has_value() ? "successfully" : "with error"));
392  auto defer = ObservableStatus::DeferSignal(&daq->GetStatusRef());
393  op::MergeAlerts(daq->GetStatusRef(), *alerts);
394 
395  if (f.has_exception()) {
396  daq->SetErrorFlag(true);
397  (void)f.get(); // Throw to propagate any error
398  }
399 
400  auto result = f.get();
401  AddDpParts(daq->GetContextMut(), result.result);
402  daq->EmitContextSignal();
403 
404  // If there were no exceptions we're all done.
405  // Update and return Stopped state
406  daq->SetErrorFlag(result.error);
407  daq->SetState(Stopped{});
408  return daq->GetStatus()->GetStatus();
409  });
410 }
411 
412 boost::future<Status> OcmDaqController::AbortAsync(ErrorPolicy policy) {
414  GetId(), fmt::format("DaqController::AbortAsync({})", policy), GetStatusRef().GetStatus()));
415 
416  if (std::holds_alternative<NotStarted>(m_state)) {
417  LOG4CPLUS_INFO(m_logger, fmt::format("{}: Aborting not started data acquisition", *this));
418  SetState(Aborted{});
419  return boost::make_ready_future<Status>(GetStatus()->GetStatus());
420  }
421 
422  if (std::holds_alternative<Stopped>(m_state) || std::holds_alternative<Aborted>(m_state)) {
423  return boost::make_exceptional_future<Status>(
424  std::runtime_error("Data acquisition already stopped or aborted"));
425  }
426 
427  SetState(Aborting{});
428 
429  // As we're asked to stop manually we stop waiting for primary sources
431 
432  auto alerts = std::make_unique<op::AlertState>();
433  return m_async_ops.abort(policy, MakeParams(*alerts.get()))
434  .then(GetIoExecutor(),
435  [alerts = std::move(alerts),
436  daq = std::static_pointer_cast<OcmDaqController>(shared_from_this())](
437  boost::future<Result<void>> f) -> Status {
438  if (daq->GetState() == State::Aborted) {
439  // It can happen that a request to abort was superseded and
440  // finalized before reply was received.
441  LOG4CPLUS_INFO(
442  daq->GetLogger(),
443  fmt::format("{}: AbortAsync: Data acquisition already aborted. "
444  "Do nothing else (errors are ignored). ",
445  *daq));
446  // @todo: Should throw instead as the command was superseeded by
447  // other command.
448  return daq->GetStatus()->GetStatus();
449  }
450  LOG4CPLUS_DEBUG(daq->GetLogger(),
451  fmt::format("{}: AbortAsync: Completed, updating DAQ status and "
452  "set reply remaining. Has fatal error={}",
453  *daq,
454  f.has_exception()));
455  //
456  // Merge alerts from saync op
457  auto defer = ObservableStatus::DeferSignal(&daq->GetStatusRef());
458  op::MergeAlerts(daq->GetStatusRef(), *alerts);
459 
460  if (f.has_exception()) {
461  LOG4CPLUS_ERROR(
462  daq->GetLogger(),
463  fmt::format("{}: AbortAsync: Completed with fatal error.", *daq));
464 
465  daq->SetErrorFlag(true);
466  f.get(); // throw to propagate
467  }
468  auto result = f.get();
469  daq->SetErrorFlag(result.error);
470 
471  // Success
472  daq->SetState(Aborted{});
473  LOG4CPLUS_INFO(daq->GetLogger(),
474  fmt::format("{}: AbortAsync: Completed successfully.", *daq));
475  return daq->GetStatus()->GetStatus();
476  });
477 }
478 
479 boost::future<State> OcmDaqController::ScheduleMergeAsync() {
480  return boost::make_exceptional_future<State>(std::runtime_error(
481  fmt::format("ScheduleMergeAsync() is invalid in state: {}", GetState())));
482 }
483 
486  fmt::format("DaqController::UpdateKeywords(<omitted>)"),
487  GetStatusRef().GetStatus()));
488 
489  if (std::holds_alternative<Stopped>(m_state) || std::holds_alternative<Aborted>(m_state)) {
490  throw boost::enable_current_exception(
491  std::runtime_error("Data acquisition already stopped or aborted"));
492  }
493 
494  daq::UpdateKeywords(GetContextMut(), keywords);
496 }
497 
498 boost::future<State>
499 OcmDaqController::AwaitAsync(std::vector<std::string> sources, std::chrono::milliseconds timeout) {
501  GetId(),
502  fmt::format("DaqController::AwaitAsync({}, {} ms)",
503  sources.empty() ? "all primary sources" : "a user defined list of sources",
504  timeout.count()),
505  GetStatusRef().GetStatus()));
506 
507  std::vector<
508  std::variant<gsl::not_null<Source<PrimSource>*>, gsl::not_null<Source<MetaSource>*>>>
509  await_on;
510 
511  if (!sources.empty()) {
512  for (auto const& source_id : sources) {
513  auto source_var = FindSource(source_id);
514  if (source_var) {
515  await_on.emplace_back(*source_var);
516  } else {
517  return boost::make_exceptional_future<State>(
518  std::invalid_argument(fmt::format("Source with id='{}' not found", source_id)));
519  }
520  }
521  } else {
522  // Use all primary sources by default
523  for (auto& source : m_prim_sources) {
524  await_on.emplace_back(&source);
525  }
526  }
527 
528  // note that condition references sources in OcmDaqController and should not be invoked
529  // unless OcmDaqController is alive.
530  auto condition = [sources = await_on]() {
531  return std::all_of(sources.begin(), sources.end(), [](auto var) {
532  return std::visit(
533  [](auto v) {
534  return v->GetState() == State::Aborted || v->GetState() == State::Stopped;
535  },
536  var);
537  });
538  };
539 
540  // Test if condition is already satified
541  if (condition()) {
542  return boost::make_ready_future<State>(GetState());
543  }
544 
545  // Wait is not already satisfied, attach state listeners to all sources.
546  // daq is captured with a weak ptr to avoid keeping OcmDaqController alive if no state changes
547  // occur on monitored components.
548  auto promise = std::make_shared<boost::promise<State>>();
549 
550  if (timeout > std::chrono::milliseconds(0)) {
551  auto timer = std::make_unique<boost::asio::steady_timer>(GetIoCtx(), timeout);
552  timer->async_wait([promise,
553  timer_ptr = timer.get(),
554  daq_weak = std::weak_ptr<OcmDaqController>(
555  std::static_pointer_cast<OcmDaqController>(shared_from_this()))](
556  boost::system::error_code ec) {
557  // Promise might already be fulfilled, there's no way to check if it is though.
558  try {
559  auto daq = daq_weak.lock();
560  if (ec) {
561  if (daq) {
562  LOG4CPLUS_DEBUG(
563  daq->m_logger,
564  fmt::format("{}: AsyncWait: Operation abandoned before completing.",
565  *daq));
566  }
567  // Timer deleted or was cancelled, set exception in promise
568  promise->set_exception(DaqOperationAborted(""));
569  } else {
570  // Normal timeout
571  // For this case we also want to delete the timer itself
572  if (daq) {
573  LOG4CPLUS_DEBUG(
574  daq->m_logger,
575  fmt::format("{}: AsyncWait: Operation timed out before completing.",
576  *daq));
577  daq->m_timers.erase(
578  std::remove_if(
579  daq->m_timers.begin(),
580  daq->m_timers.end(),
581  [timer_ptr](std::unique_ptr<boost::asio::steady_timer>& val) {
582  return val.get() == timer_ptr;
583  }),
584  daq->m_timers.end());
585  }
586 
587  promise->set_exception(DaqOperationTimeout(""));
588  }
589  } catch (...) {
590  }
591  });
592  m_timers.emplace_back(std::move(timer));
593  }
594 
595  auto listener = [condition,
596  promise,
597  daq_weak = std::weak_ptr<OcmDaqController>(
598  std::static_pointer_cast<OcmDaqController>(shared_from_this()))](State,
599  bool) {
600  auto daq = daq_weak.lock();
601  if (!daq) {
602  LOG4CPLUS_WARN("daq", "OcmDaqController deleted before await condition was fulfulled");
603  // this async op was abandoned. Do nothing.
604  return;
605  }
606 
607  if (condition()) {
608  LOG4CPLUS_INFO(daq->m_logger,
609  fmt::format("{}: AwaitAsync: Await condition fulfilled", *daq));
610  try {
611  promise->set_value(daq->GetState());
612  } catch (...) {
613  // Exception might be thrown because promise is already fulfilled, which is
614  // expected to happen.
615  }
616  }
617  };
618 
619  // Connect listeners to sources that should be awaited.
620  for (auto& source : await_on) {
621  std::visit(
622  [daq = std::static_pointer_cast<OcmDaqController>(shared_from_this()),
623  listener](auto s) {
624  LOG4CPLUS_DEBUG(
625  daq->m_logger,
626  fmt::format("{}: AsyncWait: Attaching listener on source '{}'.", *daq, *s));
627  // Use automatic connection disconnect if daq is destroyed. (i.e. track_foreign).
628  using SlotType = typename std::remove_reference<
629  decltype(*s.get())>::type::StateSignal::slot_type;
630  s->ConnectStateListener(SlotType(std::move(listener)).track_foreign(daq));
631  },
632  source);
633  }
634  return promise->get_future();
635 }
636 
637 std::optional<std::variant<gsl::not_null<Source<PrimSource>*>, gsl::not_null<Source<MetaSource>*>>>
638 OcmDaqController::FindSource(std::string_view source_id) {
639  {
640  auto it =
641  std::find_if(m_prim_sources.begin(), m_prim_sources.end(), [source_id](auto& source) {
642  return source.GetSource().GetName() == source_id;
643  });
644  if (it != m_prim_sources.end()) {
645  return gsl::not_null<Source<PrimSource>*>(&(*it));
646  }
647  }
648  {
649  auto it =
650  std::find_if(m_meta_sources.begin(), m_meta_sources.end(), [source_id](auto& source) {
651  return source.GetSource().GetName() == source_id;
652  });
653  if (it != m_meta_sources.end()) {
654  return gsl::not_null<Source<MetaSource>*>(&(*it));
655  }
656  }
657 
658  // Not found
659  return {};
660 }
661 
662 template <class SourceType>
663 std::vector<Source<SourceType>> OcmDaqController::MakeSources(std::vector<SourceType> sources) {
664  //@todo: Check for duplicates
665  std::vector<Source<SourceType>> dest;
666  dest.reserve(sources.size());
668  std::make_move_iterator(sources.begin()),
669  std::make_move_iterator(sources.end()),
670  std::back_inserter(dest),
671  [](SourceType&& s) -> Source<SourceType> { return Source<SourceType>{std::move(s)}; });
672  return dest;
673 }
674 
675 void OcmDaqController::InitiateAwaitPrimarySources() {
676  AddEvent<ActionEvent>(GetId(),
677  "DaqController::InitiateAwaitPrimarySources(): Initiating",
678  GetStatusRef().GetStatus());
679  if (m_prim_sources.empty()) {
680  LOG4CPLUS_DEBUG(
681  m_logger,
682  fmt::format("{}: InitiateAwaitPrimarySources: No primary sources to monitor.", *this));
683  return;
684  }
685  LOG4CPLUS_DEBUG(m_logger,
686  fmt::format("{}: InitiateAwaitPrimarySources: Starting operation.", *this));
687  auto alerts = std::make_unique<op::AlertState>();
688  auto [future, abort] = m_async_ops.await_prim(MakeAwaitParams(*alerts.get()));
689  m_abort_await_primary_sources = abort;
690  // Set up continuation that stops data acquisition automatically
691  future.then(
692  GetIoExecutor(),
693  [alerts = std::move(alerts),
694  daq = std::static_pointer_cast<OcmDaqController>(shared_from_this())](
695  boost::future<Result<DpParts>> fut) {
696  // Remove abort callback as async op has completed
697  daq->m_abort_await_primary_sources = nullptr;
698 
699  // Add FITS files from the stopped sources
700  auto result = fut.get();
701  LOG4CPLUS_DEBUG(daq->m_logger,
702  fmt::format("{}: InitiateAwaitPrimarySources: Adding {} files from "
703  "primary sources",
704  *daq,
705  result.result.size()));
706  // @todo Confirm if we want to add parts even though state has transitioned past
707  // Acquiring (which may mean manual stop and that parts have already been added)
708  // @todo: How to treat errors from await?
709  daq::AddDpParts(daq->GetContextMut(), result.result);
710  daq->EmitContextSignal();
711 
712  // If daq is still acquiring we stop using strict error policy, otherwise do nothing
713  if (!std::holds_alternative<Acquiring>(daq->m_state)) {
714  LOG4CPLUS_DEBUG(
715  daq->m_logger,
716  fmt::format(
717  "{}: InitiateAwaitPrimarySources: "
718  "AwaitAsync completed but another operation has already transitioned "
719  "DAQ from Acquiring so automatic stop will not be performed.",
720  *daq));
721  return;
722  }
723  daq->AddEvent<ActionEvent>(
724  daq->GetId(),
725  "DaqController::InitiateAwaitPrimarySources(): "
726  "Primary sources completed. Performing automatic stop of metadata sources",
727  daq->GetStatusRef().GetStatus());
728 
729  // Merge alerts from async op
730  auto defer = ObservableStatus::DeferSignal(&daq->GetStatusRef());
731  op::MergeAlerts(daq->GetStatusRef(), *alerts);
732 
733  // No continuation is necessary. If an error occurs that information is published
734  // and user needs to intervene.
735  // If a separate topic is created for errors only it may be published with a
736  // continuation attached to the result of StopAsync (or more likely implemented inside
737  // StopAsync).
738  daq->StopAsync(ErrorPolicy::Strict);
739  });
740 }
741 
742 void OcmDaqController::CancelAwaitPrimarySources() {
743  if (m_abort_await_primary_sources) {
744  LOG4CPLUS_TRACE(m_logger, "OcmDaqController::CancelAwaitPrimarySources()");
745  m_abort_await_primary_sources();
746  m_abort_await_primary_sources = nullptr;
747  }
748 }
749 
750 DpmDaqController::DpmDaqController(boost::asio::io_context& io_context,
751  DaqContext context,
752  std::shared_ptr<ObservableStatus> status,
753  std::shared_ptr<ObservableEventLog> event_log,
754  std::shared_ptr<DpmClient> dpm_client)
755  : CommonDaqController(io_context, std::move(context), std::move(status), std::move(event_log))
756  , m_liveness(std::make_shared<bool>(true))
757  , m_dpm_client(std::move(dpm_client))
758  , m_logger(log4cplus::Logger::getInstance("daq.ocm.controller")) {
759  assert(m_dpm_client);
760  UpdateStateContext();
761 
762  // Connect slot to status signal which mirrors DAQ status of *this* from DPM
763  //
764  // Note: weak_from_this cannot be used in constructor as the reference count is not yet
765  // incremented by the holding shared_ptr. For this reason the m_liveness is used instead.
766  //
767  // Additionally the status provided may from a past update, so we only update if received status
768  // is newer than current.
769  m_status_connection = m_dpm_client->ConnectStatusSignal(
770  [id = GetId(), weak = std::weak_ptr<bool>(m_liveness), this](Status const& status) {
771  if (id != status.id) {
772  // Update for another DAQ
773  return;
774  }
775  auto lock = weak.lock();
776  if (!lock) {
777  // Abandoned
778  LOG4CPLUS_DEBUG("daq", "DpmDaqController is abandoned: " << this);
779  return;
780  }
781 
782  auto const& current = GetStatusRef().GetStatus();
783  if (current.timestamp > status.timestamp) {
784  LOG4CPLUS_TRACE(
785  m_logger,
786  "Ignoring DAQ status update: From DPM: " << status << ", current: " << current);
787  return;
788  }
789  // Assign status of DAQ from DPM.
790  LOG4CPLUS_TRACE(m_logger, *this << ": Assigning new DAQ status from DPM: " << status);
791  GetStatusRef() = status;
792  UpdateStateContext();
793  });
794 }
795 
796 boost::future<State> DpmDaqController::StartAsync() {
797  return boost::make_exceptional_future<State>(
798  std::runtime_error(fmt::format("StartAsync() is invalid in state: {}", GetState())));
799 }
800 
801 boost::future<Status> DpmDaqController::StopAsync(ErrorPolicy) {
802  return boost::make_exceptional_future<Status>(
803  std::runtime_error(fmt::format("StopAsync() is invalid in state: {}", GetState())));
804 }
805 
806 boost::future<State>
807 DpmDaqController::AwaitAsync(std::vector<std::string>, std::chrono::milliseconds) {
808  return boost::make_exceptional_future<State>(std::runtime_error(
809  fmt::format("AwaitAsync() with sources is invalid in state: {}", GetState())));
810 }
811 
813  throw std::runtime_error(fmt::format("UpdateKeywords() is invalid in state: {}", GetState()));
814 }
815 
817  return GetStatusRef().GetState();
818 }
819 
820 boost::future<State> DpmDaqController::ScheduleMergeAsync() {
821  LOG4CPLUS_TRACE(m_logger, *this << ": DpmDaqController::ScheduleMergeAsync");
822 
823  // If we are in NotScheduled and no request in-flight already: Send request to DPM to merge.
824  // otherwise fail.
825  auto starting_state = GetState();
826  if (starting_state != State::NotScheduled) {
827  return boost::make_exceptional_future<State>(std::runtime_error(
828  fmt::format("ScheduleMergeAsync() is invalid in state: {}", GetState())));
829  }
830 
831  assert(std::holds_alternative<NotScheduled>(m_state_ctx));
832  auto& ctx = std::get<NotScheduled>(m_state_ctx);
833  if (ctx.schedule_reply_pending) {
834  // A request has already been sent
835  return boost::make_exceptional_future<State>(
836  std::logic_error("ScheduleMergeAsync() a request is already in flight"));
837  }
838  if (!m_dp_spec) {
839  auto dp_spec = MakeDataProductSpecification(GetContext(), m_logger);
840  nlohmann::json j;
841  to_json(j, dp_spec);
842  m_dp_spec = j.dump(2); // indent 2
843  LOG4CPLUS_DEBUG(m_logger, "Created DpSpec:\n" << *m_dp_spec);
844  }
845 
846  return m_dpm_client->ScheduleAsync(*m_dp_spec)
847  .then(GetIoExecutor(),
848  [starting_state, weak = weak_from_this(), this](boost::future<State> f) -> State {
849  auto alert_id = MakeAlertId(alert::REQUEST, "dpm.QueueDaq()");
850  auto shared = weak.lock();
851  try {
852  auto state = f.get();
853  if (shared) {
854  auto& status = GetStatusRef();
856  // Update state only if DAQ has not changed state from starting_state.
857  //
858  // This can e.g. happen if OCM receive published state change data sample
859  // from DPM before receiving command reply.
860  // @todo: Use timestamp instead?
861  if (GetState() == starting_state) {
862  SetState(state);
863  }
864  status.ClearAlert(alert_id);
865  LOG4CPLUS_INFO(m_logger,
866  fmt::format("{}: Scheduled DAQ successfully", *this));
867  } else {
868  LOG4CPLUS_DEBUG("daq", "DpmDaqController is abandoned: " << this);
869  }
870  return state;
871  } catch (elt::mal::TimeoutException const&) {
872  // Timeout errors does not result in error flag being set.
873  if (shared) {
874  LOG4CPLUS_INFO(m_logger,
875  fmt::format("{}: Schedule DAQ failed with timeout "
876  "(no error flag set for this condition)",
877  *this));
878  }
879  throw;
880  } catch (std::exception const& e) {
881  if (shared) {
882  auto& status = GetStatusRef();
884  status.SetAlert(MakeAlert(
885  alert_id,
886  fmt::format("Failed to schedule DAQ for merging: {}", e.what())));
887  status.SetError(true);
888  LOG4CPLUS_ERROR(m_logger, fmt::format("{}: Scheduled DAQ failed", *this));
889  }
890  throw;
891  }
892  });
893 }
894 
895 boost::future<Status> DpmDaqController::AbortAsync(ErrorPolicy policy) {
896  LOG4CPLUS_TRACE(m_logger, *this << ": DpmDaqController::AbortAsync");
897 
898  // 1. If we are sure DAQ has not been scheduled we can abort immediately.
899  // @todo: Handle case where request to schedule has been sent, but no acknowledgement received.
900  auto state = GetState();
901  if (state == State::NotScheduled) {
902  assert(std::holds_alternative<NotScheduled>(m_state_ctx));
903  if (!std::get<NotScheduled>(m_state_ctx).schedule_reply_pending) {
904  SetState(State::Aborted);
905  return boost::make_ready_future<Status>(*GetStatus());
906  }
907  }
908 
909  // 2. Otherwise we must delegate to DPM to abort (this is also why we are not setting state
910  // AbortingMerging as we cannot know until we communiate with DPM).
911  return m_dpm_client->AbortAsync(GetId()).then(
912  GetIoExecutor(), [policy, weak = weak_from_this(), this](boost::future<State> f) -> Status {
913  auto shared = weak.lock();
914  if (!shared) {
915  // Abandonded
916  LOG4CPLUS_DEBUG("daq", "DpmDaqController is abandoned: " << this);
917  throw boost::enable_current_exception(std::runtime_error("Operation aborted"));
918  }
919  try {
920  auto result = f.get();
921  SetState(result);
922 
923  return *GetStatus();
924  } catch (...) {
925  // Even if DPM failed we mark it as aborted if forced
926  if (policy == ErrorPolicy::Tolerant) {
927  LOG4CPLUS_ERROR(m_logger,
928  *this
929  << ": Request to abort DAQ to DPM returned with "
930  "error. ErrorPolicy::Tolerant is used so DAQ is marked "
931  "as aborted anyway");
932  SetState(State::Aborted, true);
933  return *GetStatus();
934  }
935  throw;
936  }
937  });
938 }
939 
940 void DpmDaqController::UpdateStateContext() {
941  auto state = GetState();
942  switch (state) {
943  case State::NotScheduled:
944  m_state_ctx = NotScheduled{false};
945  return;
946  default:
947  m_state_ctx = std::monostate();
948  };
949 }
950 
951 void DpmDaqController::SetState(State state, std::optional<bool> error) {
952  auto prev = GetState();
953  GetStatusRef().SetState(state, error);
954  if (prev != state) {
955  UpdateStateContext();
956  }
957 }
958 
959 } // namespace daq
Contains declaration for the AbortAsync operation.
Contains declaration for the AwaitPrimAsync operation.
Implements common behaviour of OcmDaqController and DpmDaqController.
ObservableEventLog & GetEventLogRef() noexcept
DaqContext const & GetContext() const DAQ_NOEXCEPT override
ObservableStatus & GetStatusRef() noexcept
std::shared_ptr< ObservableStatus > GetStatus() DAQ_NOEXCEPT override
bool GetErrorFlag() const DAQ_NOEXCEPT override
std::shared_ptr< ObservableEventLog > GetEventLog() DAQ_NOEXCEPT override
boost::signals2::connection ConnectContext(ContextSignal::slot_type const &slot) override
Connect observer that is invoked when context is modified.
DaqContext & GetContextMut() noexcept
std::string const & GetId() const DAQ_NOEXCEPT override
rad::IoExecutor & GetIoExecutor() noexcept
CommonDaqController(boost::asio::io_context &io_context, DaqContext context, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log)
auto MakeDpmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log) -> std::shared_ptr< DaqController > override
Create instance for the DPM phase of the DAQ process.
auto MakeOcmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log) -> std::shared_ptr< DaqController > override
Create instance for the OCM phase of the DAQ process.
DaqControllerFactoryDefault(boost::asio::io_context &io_ctx, elt::mal::Mal &m_mal, std::shared_ptr< DpmClient > dpm_client)
Controls the execution of single data acquisition that ultimately result in a set of FITS keywords an...
Data acquisition sources.
Definition: source.hpp:184
boost::future< Status > AbortAsync(ErrorPolicy policy) override
Attempts to abort Data Acquisition.
void UpdateKeywords(fits::KeywordVector const &keywords) override
boost::future< State > AwaitAsync(std::vector< std::string > sources, std::chrono::milliseconds timeout) override
boost::future< State > ScheduleMergeAsync() override
Schedules DAQ for merging by sending request to DPM.
State GetState() const DAQ_NOEXCEPT override
boost::future< Status > StopAsync(ErrorPolicy policy) override
boost::future< State > StartAsync() override
Keeps relevant state to be able to communicate with a primary data source.
Definition: source.hpp:140
metadaqif::MetaDaqAsync RrClient
Definition: source.hpp:142
void AddEvent(EventLog::EventType event)
Records that a file has been produced for this data acquisition.
Definition: eventLog.cpp:56
Defer signal changes until later time.
Definition: status.hpp:222
State GetState() const noexcept
Definition: status.cpp:270
Status const & GetStatus() const noexcept
Connect observer that is invoked when state is modified.
Definition: status.cpp:357
void SetState(State s, std::optional< bool > error=std::nullopt) noexcept
Set state of data acquisition.
Definition: status.cpp:278
friend class DeferSignal
Definition: status.hpp:408
boost::future< State > StartAsync() override
Starts the data acquisition.
constexpr log4cplus::Logger const & GetLogger() const noexcept
std::vector< Source< PrimSource > > m_prim_sources
Note: Consider vector immutable!
State GetState() const DAQ_NOEXCEPT override
op::AsyncOpParams MakeParams(op::AlertState &)
Constructs the parameters used for asynchronous operations.
static std::shared_ptr< OcmDaqController > Create(boost::asio::io_context &io_context, DaqContext context, DaqSources const &sources, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, OcmAsyncOperations operations)
Construct object.
StateVariant MakeState(State s) const noexcept
std::optional< std::variant< gsl::not_null< Source< PrimSource > * >, gsl::not_null< Source< MetaSource > * > > > FindSource(std::string_view source_id)
log4cplus::Logger m_logger
void UpdateKeywords(fits::KeywordVector const &keywords) override
Updates (replace or add) list of keywords.
std::shared_ptr< PendingReplies > m_pending_replies
std::variant< NotStarted, Starting, Acquiring, Stopping, Stopped, Aborting, Aborted > StateVariant
OcmAsyncOperations m_async_ops
std::vector< Source< MetaSource > > m_meta_sources
Note: Consider vector immutable!
boost::future< State > ScheduleMergeAsync() override
Schedules DAQ for merging by sending request to DPM.
OcmDaqController(boost::asio::io_context &io_context, DaqContext context, DaqSources const &sources, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, OcmAsyncOperations ops)
boost::future< State > AwaitAsync(std::vector< std::string > sources, std::chrono::milliseconds timeout) override
Awaits that data acquisition stops or aborts.
void CancelAwaitPrimarySources()
Cancels the awaiting of primary sources.
op::AwaitOpParams MakeAwaitParams(op::AlertState &)
void SetErrorFlag(bool error) noexcept
boost::future< Status > AbortAsync(ErrorPolicy policy) override
Aborts the data acquisition.
void SetState(StateVariant &&s) noexcept
boost::future< Status > StopAsync(ErrorPolicy policy) override
Stops the data acquisition.
Simple class that allows you to keep track of how many replies are pending.
Keeps relevant state to be able to communicate with a primary data source.
Definition: source.hpp:96
recif::RecCmdsAsync RrClient
Definition: source.hpp:98
#define DAQ_NOEXCEPT
Definition: config.hpp:16
Contains data structure for FITS keywords.
daq::DpmClient
Contains error related declarations for DAQ.
Contains declarations for the helper functions to initiate operations.
constexpr std::string_view REQUEST
Request.
Definition: status.hpp:32
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
Definition: keyword.hpp:414
void MergeAlerts(ObservableStatus &dest, AlertState &src)
Merge alerts.
AlertId MakeAlertId(std::string_view category, std::string key)
Definition: status.cpp:49
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
Definition: conversion.cpp:63
void UpdateKeywords(DaqContext &ctx, fits::KeywordVector const &keywords)
Updates (adds or replaces) primary HDU keywords.
Definition: daqContext.cpp:29
void AddDpParts(DaqContext &ctx, std::vector< DpPart > const &parts)
Definition: daqContext.cpp:36
std::ostream & operator<<(std::ostream &os, AsyncProcessIf const &proc)
Formats proc representation in the form [<pid>] <args>
ErrorPolicy
Error policy supported by certain operations.
Definition: error.hpp:25
@ Tolerant
Errors that can be ignored with partial completion of a command will be tolerated and is reported as ...
State
Observable states of the data acquisition process.
Definition: state.hpp:39
@ NotScheduled
Before daq is acknowledged by dpm it remains in NotScheduled.
@ Aborted
Data acquisition has been aborted by user.
@ Stopping
Transitional state between Acquiring and Stopped.
@ Acquiring
All data sources have reported data acquisition is in progress.
@ Stopped
All data sources have reported they have stopped acquiring data.
@ Starting
Transitional state between NotStarted and Acquiring when sources have not begun acquiring data yet.
@ AbortingAcquiring
Transitional state for aborting during acquisition.
@ NotStarted
Initial state of data acquisition.
void to_json(nlohmann::json &j, Status const &p)
Definition: json.cpp:37
json::DpSpec MakeDataProductSpecification(DaqContext const &ctx, log4cplus::Logger &logger)
Creates a Data Product Specification as serialized JSON from the provided DaqContext.
Definition: makeDpSpec.cpp:281
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
Definition: status.cpp:39
Utility class that represents a result and an error.
Definition: utility.hpp:17
Definition: main.cpp:23
Contains declaration for for DaqController.
Contains declaration for the StartAsync operation.
Contains declaration for the StopAsync operation.
Event related to an action being requested or performed.
Definition: eventLog.hpp:56
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:44
OCM Async operations.
std::function< boost::future< Result< void > >ErrorPolicy, op::AsyncOpParams)> abort
bool IsValid() const noexcept
std::function< boost::future< void >op::AsyncOpParams)> start
std::function< AwaitReturnType(op::AwaitOpParams)> await_prim
OcmAsyncOperations()
Default constructs object with standard async operations.
std::function< boost::future< Result< DpParts > >ErrorPolicy, op::AsyncOpParams)> stop
Simple class that holds the source and associated state.
Definition: source.hpp:29
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:153
Parameters required for each async operation.
Await specific parameters that is not provided with AsyncOpParams.
auto const & transform