ifw-daq  1.0.0
IFW Data Acquisition modules
daqController.cpp
Go to the documentation of this file.
1 #include <daq/daqController.hpp>
2 
3 #include <iostream>
4 #include <stdexcept>
5 #include <type_traits>
6 
7 #include <fmt/format.h>
8 #include <fmt/ostream.h>
9 #include <log4cplus/loggingmacros.h>
10 
11 #include <daq/error.hpp>
12 #include <daq/op/abort.hpp>
13 #include <daq/op/initiate.hpp>
14 #include <daq/op/start.hpp>
15 #include <daq/op/stop.hpp>
16 #include <daq/op/awaitPrim.hpp>
17 
18 namespace daq {
19 namespace {
20 
21 template <class>
22 inline constexpr bool always_false_v = false; // NOLINT
23 
24 template <class Sources>
25 bool AllInState(Sources sources, State state) {
26  return std::all_of(
27  sources.begin(), sources.end(), [=](auto const& s) -> bool { return s.state == state; });
28 }
29 
30 } // namespace
31 
33  : start([](auto par) { return daq::op::InitiateOperation<daq::op::StartAsync>(par); })
34  , abort([](ErrorPolicy policy, auto par) {
35  return daq::op::InitiateOperation<op::AbortAsync>(policy, std::move(par));
36  })
37  , stop([](ErrorPolicy policy, auto par) {
38  return daq::op::InitiateOperation<op::StopAsync>(policy, std::move(par));
39  })
40  , await_prim([](auto par) {
41  return daq::op::InitiateAbortableOperation<op::AwaitPrimAsync>(std::move(par));
42  }) {
43 }
44 
45 bool AsyncOperations::IsValid() const noexcept {
46  return start && stop && abort && await_prim;
47 }
48 
49 std::ostream& operator<<(std::ostream& os, DaqController const& daq) {
50  os << "DaqController(id='" << daq.GetId() << "', state=" << daq.GetState() << ")";
51  return os;
52 }
53 
54 std::ostream& operator<<(std::ostream& os, State state) {
55  switch (state) {
56  case State::NotStarted:
57  os << "NotStarted";
58  return os;
59  case State::Starting:
60  os << "Starting";
61  return os;
62  case State::Acquiring:
63  os << "Acquiring";
64  return os;
65  case State::Stopping:
66  os << "Stopping";
67  return os;
68  case State::Stopped:
69  os << "Stopped";
70  return os;
71  case State::Aborting:
72  os << "Aborting";
73  return os;
74  case State::Aborted:
75  os << "Aborted";
76  return os;
77  // GCOVR_EXCL_START
78  default:
79  os << "Unknown";
80  return os;
81  // GCOVR_EXCL_STOP
82  }
83 }
84 
85 std::shared_ptr<DaqControllerImpl>
86 DaqControllerImpl::Create(boost::asio::io_context& io_context,
87  DaqProperties properties,
88  std::shared_ptr<ObservableStatus> status,
89  std::shared_ptr<ObservableEventLog> event_log,
90  AsyncOperations ops) {
91  // note: make_shared doesn't work since constructor is protected,
92  // to protect against non-shared ownership.
93  LOG4CPLUS_TRACE("daq", fmt::format("DaqControllerImpl::Create"));
94  auto fits_ctl = std::make_unique<FitsControllerImpl>(properties, event_log);
95  return std::shared_ptr<DaqControllerImpl>(new DaqControllerImpl(io_context,
96  std::move(properties),
97  std::move(fits_ctl),
98  std::move(status),
99  std::move(event_log),
100  std::move(ops)));
101 }
102 
103 DaqControllerImpl::DaqControllerImpl(boost::asio::io_context& io_context,
104  DaqProperties properties,
105  std::unique_ptr<FitsController> fits_ctl,
106  std::shared_ptr<ObservableStatus> status,
107  std::shared_ptr<ObservableEventLog> event_log,
108  AsyncOperations ops)
109  : m_state(MakeState(status->GetState()))
110  , m_io_ctx(io_context)
111  , m_executor(io_context)
112  , m_properties(std::move(properties))
113  , m_fits_ctl(std::move(fits_ctl))
114  , m_status(std::move(status))
115  , m_event_log(std::move(event_log))
116  , m_prim_sources(MakeSources<PrimSource>(m_properties.prim_sources))
117  , m_meta_sources(MakeSources<MetaSource>(m_properties.meta_sources))
118  , m_async_ops(std::move(ops))
119  , m_pending_replies(PendingReplies::Create())
120  , m_logger(log4cplus::Logger::getInstance("daq")) {
121  assert(m_status);
122  assert(m_event_log);
123 
124  if (m_properties.id != m_status->GetId()) {
125  throw std::invalid_argument(
126  "Data acquisition id mismatch between DaqProperties and ObservableStatus");
127  }
128 
129  if (m_prim_sources.empty() && m_meta_sources.empty()) {
130  throw std::invalid_argument("No data sources provided");
131  }
132 
133  if (!m_async_ops.IsValid()) {
134  throw std::invalid_argument("AsyncOperations is invalid");
135  }
136 }
137 
139  State s;
140  std::visit(
141  [&](auto const& var) {
142  using T = std::decay_t<decltype(var)>;
143  if constexpr (std::is_same_v<T, NotStarted>) {
144  s = State::NotStarted;
145  } else if constexpr (std::is_same_v<T, Starting>) {
146  s = State::Starting;
147  } else if constexpr (std::is_same_v<T, Acquiring>) {
148  s = State::Acquiring;
149  } else if constexpr (std::is_same_v<T, Stopping>) {
150  s = State::Stopping;
151  } else if constexpr (std::is_same_v<T, Stopped>) {
152  s = State::Stopped;
153  } else if constexpr (std::is_same_v<T, Aborting>) {
154  s = State::Aborting;
155  } else if constexpr (std::is_same_v<T, Aborted>) {
156  s = State::Aborted;
157  } else {
158  static_assert(always_false_v<T>, "non-exhaustive visitor!");
159  }
160  },
161  m_state);
162  return s;
163 }
164 
165 std::shared_ptr<ObservableStatus> DaqControllerImpl::GetStatus() DAQ_NOEXCEPT {
166  return m_status;
167 }
168 
169 std::shared_ptr<ObservableStatus const> DaqControllerImpl::GetStatus() const DAQ_NOEXCEPT {
170  return m_status;
171 }
172 
173 std::shared_ptr<ObservableEventLog> DaqControllerImpl::GetEventLog() DAQ_NOEXCEPT {
174  return m_event_log;
175 }
176 
177 std::string const& DaqControllerImpl::GetId() const DAQ_NOEXCEPT {
178  return m_status->GetId();
179 }
180 
182  return m_status->GetError();
183 }
184 
185 constexpr log4cplus::Logger const& DaqControllerImpl::GetLogger() const noexcept {
186  return m_logger;
187 }
188 
190  switch (s) {
191  case State::NotStarted:
192  return StateVariant(NotStarted());
193  case State::Starting:
194  return StateVariant(Starting());
195  case State::Acquiring:
196  return StateVariant(Acquiring());
197  case State::Stopping:
198  return StateVariant(Stopping());
199  case State::Stopped:
200  return StateVariant(Stopped());
201  case State::Aborting:
202  return StateVariant(Aborting());
203  case State::Aborted:
204  return StateVariant(Aborted());
205  };
206  LOG4CPLUS_FATAL(m_logger, fmt::format("Invalid state provided: '{}'", s));
207  std::terminate();
208 }
209 
210 void DaqControllerImpl::SetErrorFlag(bool error) noexcept {
211  m_status->SetError(error);
212 }
213 
215  m_state = s;
216  // Publish changes
217  m_status->SetState(GetState());
218 }
219 
221  return op::AsyncOpParams(*m_status.get(),
222  *m_event_log.get(),
223  m_executor,
224  m_logger,
225  GetId(),
226  *m_pending_replies.get(),
229 }
230 
233  *m_event_log.get(),
234  m_executor,
235  m_logger,
236  GetId(),
237  *m_pending_replies.get(),
241 }
242 
243 boost::future<State> DaqControllerImpl::StartAsync() {
244  m_event_log->AddEvent(
245  ActionEvent(GetId(), "DaqController::StartAsync()", m_status->GetStatus()));
246 
247  // Make sure we're not already started.
248  if (!std::holds_alternative<NotStarted>(m_state)) {
249  return boost::make_exceptional_future<State>(
250  std::runtime_error("Data acquisition is already started"));
251  }
252 
253  SetState(Starting{});
254 
255  // Special handling for the FITS file produced by OCM
256  try {
257  m_fits_ctl->Start();
258  } catch (std::exception const& e) {
259  LOG4CPLUS_ERROR(
260  GetLogger(),
261  fmt::format("{}: StartAsync: FitsController::Start failed (aborting start): {}",
262  *this,
263  e.what()));
264  SetErrorFlag(true);
265  return boost::make_exceptional_future<State>();
266  }
267 
268  return m_async_ops.start(MakeParams())
269  .then(m_executor,
270  [daq = std::static_pointer_cast<DaqControllerImpl>(shared_from_this())](
271  boost::future<void> f) {
272  LOG4CPLUS_INFO(daq->GetLogger(),
273  fmt::format("{}: StartAsync: Completed {}",
274  *daq,
275  f.has_value() ? "successfully" : "with error"));
276  // Re-raise any exception
277  if (f.has_exception()) {
278  daq->SetErrorFlag(true);
279  (void)f.get();
280  }
281  // Await completion if there are primary data sources
282  daq->InitiateAwaitPrimarySources();
283 
284  // No exception -> all done! update and return Acquiring state
285  daq->SetErrorFlag(false);
286  daq->SetState(Acquiring{});
287  return daq->GetState();
288  });
289 }
290 
291 boost::future<Status> DaqControllerImpl::StopAsync(ErrorPolicy policy) {
292  m_event_log->AddEvent(
293  ActionEvent(GetId(), "DaqController::StopAsync()", m_status->GetStatus()));
294 
295  if (std::holds_alternative<Stopped>(m_state)) {
296  return boost::make_exceptional_future<Status>(
297  std::runtime_error("Data acquisition already stopped"));
298  }
299 
300  if (std::holds_alternative<Aborted>(m_state)) {
301  return boost::make_exceptional_future<Status>(
302  std::runtime_error("Data acquisition already aborted"));
303  }
304 
305  if (!std::holds_alternative<Acquiring>(m_state)) {
306  return boost::make_exceptional_future<Status>(
307  std::runtime_error("Cannot stop a data acquisition that is not Acquiring"));
308  }
309 
310  // @todo If we're in Starting this will potentially mess up assumption that we're
311  // in Starting. Revisit to remove this assumption?
312  // @todo: Store produced files
313  // m_status.AddFiles(reply.getFiles());
314  SetState(Stopping{});
315 
316  return m_async_ops.stop(policy, MakeParams())
317  .then(m_executor,
318  [policy, daq = std::static_pointer_cast<DaqControllerImpl>(shared_from_this())](
319  boost::future<Result<DpParts>> f) -> Status {
320  LOG4CPLUS_INFO(daq->GetLogger(),
321  fmt::format("{}: StopAsync: Completed {}",
322  *daq,
323  f.has_value() ? "successfully" : "with error"));
324  if (!f.has_exception() || policy == ErrorPolicy::Tolerant) {
325  // Stop local FITS controller on success or if forced
326  try {
327  // Update keywords in FITS controller from current status, then save
328  // stop the controller to save the state.
329  daq->m_fits_ctl->UpdateKeywords(daq->m_status->GetKeywords());
330  auto res = daq->m_fits_ctl->Stop(policy);
331  if (res) {
332  daq->m_status->AddFiles({*res});
333  } else {
334  // If policy prevented an error to be thrown, but there is no
335  // resulting file we mark it as an error
336  daq->SetErrorFlag(true);
337  }
338  } catch (std::exception const& e) {
339  LOG4CPLUS_ERROR(daq->GetLogger(),
340  fmt::format("{}: StopAsync: Failed to create OCM FITS "
341  "file: {}",
342  *daq,
343  e.what()));
344  daq->SetErrorFlag(true);
345  throw;
346  } catch (...) {
347  daq->SetErrorFlag(true);
348  throw;
349  }
350  }
351 
352  if (f.has_exception()) {
353  daq->SetErrorFlag(true);
354  (void)f.get(); // Throw to propagate any error
355  }
356 
357  auto result = f.get();
358  daq->m_status->AddFiles(result.result);
359 
360  // If there were no exceptions we're all done.
361  // Update and return Stopped state
362  daq->SetErrorFlag(result.error);
363  daq->SetState(Stopped{});
364  return daq->GetStatus()->GetStatus();
365  });
366 }
367 
368 boost::future<Status> DaqControllerImpl::AbortAsync(ErrorPolicy policy) {
369  m_event_log->AddEvent(ActionEvent(
370  GetId(), fmt::format("DaqController::AbortAsync({})", policy), m_status->GetStatus()));
371 
372  if (std::holds_alternative<NotStarted>(m_state)) {
373  LOG4CPLUS_INFO(m_logger, fmt::format("{}: Aborting not started data acquisition", *this));
374  SetState(Aborted{});
375  return boost::make_ready_future<Status>(GetStatus()->GetStatus());
376  }
377 
378  if (std::holds_alternative<Stopped>(m_state) || std::holds_alternative<Aborted>(m_state)) {
379  return boost::make_exceptional_future<Status>(
380  std::runtime_error("Data acquisition already stopped or aborted"));
381  }
382 
383  SetState(Aborting{});
384 
385  return m_async_ops.abort(policy, MakeParams())
386  .then(m_executor,
387  [policy, daq = std::static_pointer_cast<DaqControllerImpl>(shared_from_this())](
388  boost::future<Result<void>> f) -> Status {
389  if (daq->GetState() == State::Aborted) {
390  // It can happen that a request to abort was superseded and finalized before
391  // reply was received.
392  LOG4CPLUS_INFO(
393  daq->GetLogger(),
394  fmt::format("{}: AbortAsync: Data acquisition already aborted. "
395  "Do nothing else (errors are ignored). ",
396  *daq));
397  // @todo: Should throw instead as the command was superseeded by other
398  // command.
399  return daq->GetStatus()->GetStatus();
400  }
401  LOG4CPLUS_DEBUG(daq->GetLogger(),
402  fmt::format("{}: AbortAsync: Completed, updating DAQ status and "
403  "set reply remaining. Has fatal error={}",
404  *daq,
405  f.has_exception()));
406 
407  if (!f.has_exception() || policy == ErrorPolicy::Tolerant) {
408  // Stop local FITS controller on success or if forced
409  try {
410  daq->m_fits_ctl->Abort(policy);
411  } catch (...) {
412  daq->SetErrorFlag(true);
413  if (policy != ErrorPolicy::Tolerant) {
414  throw;
415  }
416  }
417  }
418 
419  if (f.has_exception()) {
420  LOG4CPLUS_INFO(
421  daq->GetLogger(),
422  fmt::format("{}: AbortAsync: Completed with fatal error.", *daq));
423 
424  daq->SetErrorFlag(true);
425  f.get(); // throw to propagate
426  }
427  auto result = f.get();
428  daq->SetErrorFlag(result.error);
429 
430  // Success
431  daq->SetState(Aborted{});
432  LOG4CPLUS_INFO(daq->GetLogger(),
433  fmt::format("{}: AbortAsync: Completed successfully.", *daq));
434  return daq->GetStatus()->GetStatus();
435  });
436 }
437 
439  m_event_log->AddEvent(ActionEvent(
440  GetId(), fmt::format("DaqController::UpdateKeywords(<omitted>)"), m_status->GetStatus()));
441 
442  if (std::holds_alternative<Stopped>(m_state) || std::holds_alternative<Aborted>(m_state)) {
443  throw boost::enable_current_exception(
444  std::runtime_error("Data acquisition already stopped or aborted"));
445  }
446 
447  m_status->UpdateKeywords(keywords);
448 }
449 
450 boost::future<State>
451 DaqControllerImpl::AwaitAsync(std::vector<std::string> sources, std::chrono::milliseconds timeout) {
452  m_event_log->AddEvent(ActionEvent(
453  GetId(),
454  fmt::format("DaqController::AwaitAsync({}, {} ms)",
455  sources.empty() ? "all primary sources" : "a user defined list of sources",
456  timeout.count()),
457  m_status->GetStatus()));
458 
459  std::vector<
460  std::variant<gsl::not_null<Source<PrimSource>*>, gsl::not_null<Source<MetaSource>*>>>
461  await_on;
462 
463  if (!sources.empty()) {
464  for (auto const& source_id : sources) {
465  auto source_var = FindSource(source_id);
466  if (source_var) {
467  await_on.emplace_back(*source_var);
468  } else {
469  return boost::make_exceptional_future<State>(
470  std::invalid_argument(fmt::format("Source with id='{}' not found", source_id)));
471  }
472  }
473  } else {
474  // Use all primary sources by default
475  for (auto& source : m_prim_sources) {
476  await_on.emplace_back(&source);
477  }
478  }
479 
480  // note that condition references sources in DaqControllerImpl and should not be invoked
481  // unless DaqControllerImpl is alive.
482  auto condition = [sources = await_on]() {
483  return std::all_of(sources.begin(), sources.end(), [](auto var) {
484  return std::visit(
485  [](auto v) {
486  return v->GetState() == State::Aborted || v->GetState() == State::Stopped;
487  },
488  var);
489  });
490  };
491 
492  // Test if condition is already satified
493  if (condition()) {
494  return boost::make_ready_future<State>(GetState());
495  }
496 
497  // Wait is not already satisfied, attach state listeners to all sources.
498  // daq is captured with a weak ptr to avoid keeping DaqControllerImpl alive if no state changes
499  // occur on monitored components.
500  auto promise = std::make_shared<boost::promise<State>>();
501 
502  if (timeout > std::chrono::milliseconds(0)) {
503  auto timer = std::make_unique<boost::asio::steady_timer>(m_io_ctx, timeout);
504  timer->async_wait([promise,
505  timer_ptr = timer.get(),
506  daq_weak = std::weak_ptr<DaqControllerImpl>(
507  std::static_pointer_cast<DaqControllerImpl>(shared_from_this()))](
508  boost::system::error_code ec) {
509  // Promise might already be fulfilled, there's no way to check if it is though.
510  try {
511  auto daq = daq_weak.lock();
512  if (ec) {
513  if (daq) {
514  LOG4CPLUS_DEBUG(
515  daq->m_logger,
516  fmt::format("{}: AsyncWait: Operation abandoned before completing.",
517  *daq));
518  }
519  // Timer deleted or was cancelled, set exception in promise
520  promise->set_exception(DaqOperationAborted(""));
521  } else {
522  // Normal timeout
523  // For this case we also want to delete the timer itself
524  if (daq) {
525  LOG4CPLUS_DEBUG(
526  daq->m_logger,
527  fmt::format("{}: AsyncWait: Operation timed out before completing.",
528  *daq));
529  daq->m_timers.erase(
530  std::remove_if(
531  daq->m_timers.begin(),
532  daq->m_timers.end(),
533  [timer_ptr](std::unique_ptr<boost::asio::steady_timer>& val) {
534  return val.get() == timer_ptr;
535  }),
536  daq->m_timers.end());
537  }
538 
539  promise->set_exception(DaqOperationTimeout(""));
540  }
541  } catch (...) {
542  }
543  });
544  m_timers.emplace_back(std::move(timer));
545  }
546 
547  auto listener = [condition,
548  promise,
549  daq_weak = std::weak_ptr<DaqControllerImpl>(
550  std::static_pointer_cast<DaqControllerImpl>(shared_from_this()))](State,
551  bool) {
552  auto daq = daq_weak.lock();
553  if (!daq) {
554  LOG4CPLUS_WARN("daq", "DaqControllerImpl deleted before await condition was fulfulled");
555  // this async op was abandoned. Do nothing.
556  return;
557  }
558 
559  if (condition()) {
560  LOG4CPLUS_INFO(daq->m_logger,
561  fmt::format("{}: AwaitAsync: Await condition fulfilled", *daq));
562  try {
563  promise->set_value(daq->GetState());
564  } catch (...) {
565  // Exception might be thrown because promise is already fulfilled, which is
566  // expected to happen.
567  }
568  }
569  };
570 
571  // Connect listeners to sources that should be awaited.
572  for (auto& source : await_on) {
573  std::visit(
574  [daq = std::static_pointer_cast<DaqControllerImpl>(shared_from_this()),
575  listener](auto s) {
576  LOG4CPLUS_DEBUG(
577  daq->m_logger,
578  fmt::format("{}: AsyncWait: Attaching listener on source '{}'.", *daq, *s));
579  // Use automatic connection disconnect if daq is destroyed. (i.e. track_foreign).
580  using SlotType = typename std::remove_reference<decltype(
581  *s.get())>::type::StateSignal::slot_type;
582  s->ConnectStateListener(SlotType(std::move(listener)).track_foreign(daq));
583  },
584  source);
585  }
586  return promise->get_future();
587 }
588 
589 std::optional<std::variant<gsl::not_null<Source<PrimSource>*>, gsl::not_null<Source<MetaSource>*>>>
590 DaqControllerImpl::FindSource(std::string_view source_id) {
591  {
592  auto it =
593  std::find_if(m_prim_sources.begin(), m_prim_sources.end(), [source_id](auto& source) {
594  return source.GetSource().GetName() == source_id;
595  });
596  if (it != m_prim_sources.end()) {
597  return gsl::not_null<Source<PrimSource>*>(&(*it));
598  }
599  }
600  {
601  auto it =
602  std::find_if(m_meta_sources.begin(), m_meta_sources.end(), [source_id](auto& source) {
603  return source.GetSource().GetName() == source_id;
604  });
605  if (it != m_meta_sources.end()) {
606  return gsl::not_null<Source<MetaSource>*>(&(*it));
607  }
608  }
609 
610  // Not found
611  return {};
612 }
613 
614 template <class SourceType>
615 std::vector<Source<SourceType>> DaqControllerImpl::MakeSources(std::vector<SourceType> sources) {
616  //@todo: Check for duplicates
617  std::vector<Source<SourceType>> dest;
618  dest.reserve(sources.size());
619  std::transform(
620  std::make_move_iterator(sources.begin()),
621  std::make_move_iterator(sources.end()),
622  std::back_inserter(dest),
623  [](SourceType&& s) -> Source<SourceType> { return Source<SourceType>{std::move(s)}; });
624  return dest;
625 }
626 
627 void DaqControllerImpl::InitiateAwaitPrimarySources() {
628  AddEvent<ActionEvent>(GetId(),
629  "DaqController::InitiateAwaitPrimarySources(): Initiating",
630  m_status->GetStatus());
631  if (m_prim_sources.empty()) {
632  LOG4CPLUS_DEBUG(
633  m_logger,
634  fmt::format("{}: InitiateAwaitPrimarySources: No primary sources to monitor.", *this));
635  return;
636  }
637  LOG4CPLUS_DEBUG(m_logger,
638  fmt::format("{}: InitiateAwaitPrimarySources: Starting operation.", *this));
639  auto [future, abort] = m_async_ops.await_prim(MakeAwaitParams());
640  m_abort_await_primary_sources = abort;
641  // Set up continuation that stops data acquisition automatically
642  future.then(
643  m_executor,
644  [daq = std::static_pointer_cast<DaqControllerImpl>(shared_from_this())](boost::future<Result<DpParts>> fut) {
645  // Add FITS files from the stopped sources
646  auto result = fut.get();
647  LOG4CPLUS_DEBUG(daq->m_logger,
648  fmt::format("{}: InitiateAwaitPrimarySources: Adding {} files from "
649  "primary sources", *daq,
650  result.result.size()));
651  daq->m_status->AddFiles({result.result});
652  // @todo: How to treat errors from await?
653 
654  // If daq is still acquiring we stop using strict error policy, otherwise do nothing
655  if (!std::holds_alternative<Acquiring>(daq->m_state)) {
656  LOG4CPLUS_DEBUG(
657  daq->m_logger,
658  fmt::format(
659  "{}: InitiateAwaitPrimarySources: "
660  "AwaitAsync completed but another operation has already transitioned "
661  "DAQ from Acquiring so automatic stop will not be performed.",
662  *daq));
663  return;
664  }
665  daq->AddEvent<ActionEvent>(
666  daq->GetId(),
667  "DaqController::InitiateAwaitPrimarySources(): "
668  "Primary sources completed. Performing automatic stop of metadata sources",
669  daq->m_status->GetStatus());
670  // No continuation is necessary. If an error occurs that information is published
671  // and user needs to intervene.
672  // If a separate topic is created for errors only it may be published with a
673  // continuation attached to the result of StopAsync (or more likely implemented inside
674  // StopAsync).
675  daq->StopAsync(ErrorPolicy::Strict);
676  });
677 }
678 
679 } // namespace daq
daq::DaqControllerImpl::GetLogger
constexpr log4cplus::Logger const & GetLogger() const noexcept
Definition: daqController.cpp:185
DAQ_NOEXCEPT
#define DAQ_NOEXCEPT
Definition: config.hpp:16
daq::State
State
Observable states of the data acquisition process.
Definition: state.hpp:41
abort.hpp
Contains declaration for the AbortAsync operation.
initiate.hpp
Contains declarations for the helper functions to initiate operations.
daq::DaqControllerImpl::m_logger
log4cplus::Logger m_logger
Definition: daqController.hpp:387
daq::DaqControllerImpl::m_fits_ctl
std::unique_ptr< FitsController > m_fits_ctl
Definition: daqController.hpp:369
daq::MetaSource
Keeps relevant state to be able to communicate with a primary data source.
Definition: source.hpp:139
daq::DaqControllerImpl::SetState
void SetState(StateVariant &&s) noexcept
Definition: daqController.cpp:214
daq::PendingReplies
Simple class that allows you to keep track of how many replies are pending.
Definition: pendingReplies.hpp:58
daq::DaqControllerImpl::m_event_log
std::shared_ptr< ObservableEventLog > m_event_log
Definition: daqController.hpp:371
start.hpp
Contains declaration for the StartAsync operation.
stop.hpp
Contains declaration for the StopAsync operation.
daq::DaqProperties
Structure carrying properties needed to start a DataAcquisition.
Definition: daqProperties.hpp:28
daq::DaqControllerImpl::Create
static std::shared_ptr< DaqControllerImpl > Create(boost::asio::io_context &io_context, DaqProperties properties, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, AsyncOperations operations)
Construct object.
Definition: daqController.cpp:86
daq::Source
Simple class that holds the source and associated state.
Definition: source.hpp:29
daq::DaqControllerImpl::m_prim_sources
std::vector< Source< PrimSource > > m_prim_sources
Note: Consider vector immutable!
Definition: daqController.hpp:373
daq::fits::UpdateKeywords
void UpdateKeywords(KeywordVector &to, KeywordVector const &from)
Updates a with keywords from b.
Definition: keyword.cpp:120
daq::DaqControllerImpl::m_properties
DaqProperties m_properties
Definition: daqController.hpp:367
daq::op::AwaitOpParams
Await specific parameters that is not provided with AsyncOpParams.
Definition: asyncOpParams.hpp:64
daq::DaqControllerImpl::Aborting
Definition: daqController.hpp:305
daq
Definition: daqController.cpp:18
daq::DaqControllerImpl::StopAsync
boost::future< Status > StopAsync(ErrorPolicy policy) override
Stops the data acquisition.
Definition: daqController.cpp:291
daq::DaqControllerImpl::GetEventLog
std::shared_ptr< ObservableEventLog > GetEventLog() DAQ_NOEXCEPT override
Definition: daqController.cpp:173
daq::DaqControllerImpl::GetState
State GetState() const DAQ_NOEXCEPT override
Definition: daqController.cpp:138
ocmif::MakeState
ocmif::DaqSubState MakeState(daq::State state) noexcept
Converts daq::State to DaqSubstate.
Definition: conversion.cpp:32
daq::AsyncOperations::IsValid
bool IsValid() const noexcept
Definition: daqController.cpp:45
daq::DaqControllerImpl::GetStatus
std::shared_ptr< ObservableStatus > GetStatus() DAQ_NOEXCEPT override
Definition: daqController.cpp:165
daq::DaqProperties::id
std::string id
Definition: daqProperties.hpp:35
daq::DaqControllerImpl::m_meta_sources
std::vector< Source< MetaSource > > m_meta_sources
Note: Consider vector immutable!
Definition: daqController.hpp:374
daq::DaqControllerImpl::SetErrorFlag
void SetErrorFlag(bool error) noexcept
Definition: daqController.cpp:210
daq::AsyncOperations::stop
std::function< boost::future< Result< DpParts > >ErrorPolicy, op::AsyncOpParams)> stop
Definition: daqController.hpp:64
daq::DaqControllerImpl::MakeState
StateVariant MakeState(State s) const noexcept
Definition: daqController.cpp:189
daq::Result
Utility class that represents a result and an error.
Definition: utility.hpp:17
daqController.hpp
Contains declaration for for DaqController.
daq::DaqControllerImpl::AbortAsync
boost::future< Status > AbortAsync(ErrorPolicy policy) override
Aborts the data acquisition.
Definition: daqController.cpp:368
daq::DaqControllerImpl::Aborted
Definition: daqController.hpp:306
daq::DaqControllerImpl::m_executor
rad::IoExecutor m_executor
Definition: daqController.hpp:366
daq::DaqControllerImpl::MakeParams
op::AsyncOpParams MakeParams()
Constructs the parameters used for asynchronous operations.
Definition: daqController.cpp:220
daq::DaqControllerImpl::MakeAwaitParams
op::AwaitOpParams MakeAwaitParams()
Definition: daqController.cpp:231
daq::DaqControllerImpl::StateVariant
std::variant< NotStarted, Starting, Acquiring, Stopping, Stopped, Aborting, Aborted > StateVariant
Definition: daqController.hpp:314
daq::DaqControllerImpl::Stopping
Definition: daqController.hpp:303
daq::DaqControllerImpl::m_state
StateVariant m_state
Definition: daqController.hpp:364
daq::DaqControllerImpl::m_async_ops
AsyncOperations m_async_ops
Definition: daqController.hpp:376
daq::DaqControllerImpl::StartAsync
boost::future< State > StartAsync() override
Starts the data acquisition.
Definition: daqController.cpp:243
awaitPrim.hpp
Contains declaration for the AwaitPrimAsync operation.
daq::AsyncOperations::await_prim
std::function< AwaitReturnType(op::AwaitOpParams)> await_prim
Definition: daqController.hpp:65
daq::Result< void >
Definition: utility.hpp:23
daq::Status
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:32
daq::operator<<
std::ostream & operator<<(std::ostream &os, DaqController const &daq)
Definition: daqController.cpp:49
daq::fits::KeywordVector
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
Definition: keyword.hpp:138
daq::op::AsyncOpParams
Parameters required for each async operation.
Definition: asyncOpParams.hpp:23
daq::DaqControllerImpl::m_status
std::shared_ptr< ObservableStatus > m_status
Definition: daqController.hpp:370
daq::DaqControllerImpl::GetId
std::string const & GetId() const DAQ_NOEXCEPT override
Definition: daqController.cpp:177
daq::DaqControllerImpl::Acquiring
Definition: daqController.hpp:302
daq::AsyncOperations::start
std::function< boost::future< void >op::AsyncOpParams)> start
Definition: daqController.hpp:62
daq::State::NotStarted
@ NotStarted
Initial state of data acquisition.
daq::DaqControllerImpl::DaqControllerImpl
DaqControllerImpl(boost::asio::io_context &io_context, DaqProperties properties, std::unique_ptr< FitsController > fits_controller, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, AsyncOperations ops)
Definition: daqController.cpp:103
daq::ErrorPolicy
ErrorPolicy
Error policy supported by certain operations.
Definition: error.hpp:25
daq::DaqControllerImpl::NotStarted
Definition: daqController.hpp:300
daq::AsyncOperations::abort
std::function< boost::future< Result< void > >ErrorPolicy, op::AsyncOpParams)> abort
Definition: daqController.hpp:63
daq::DaqController
Controls the execution of single data acquisition that ultimately result in a set of FITS keywords an...
Definition: daqController.hpp:163
daq::DaqControllerImpl::GetErrorFlag
bool GetErrorFlag() const DAQ_NOEXCEPT override
Definition: daqController.cpp:181
daq::ErrorPolicy::Strict
@ Strict
Any error is considered fatal and may lead to the operation being aborted.
daq::ActionEvent
Event related to an action being requested or performed.
Definition: eventLog.hpp:56
daq::DaqControllerImpl::Stopped
Definition: daqController.hpp:304
daq::DaqProperties::await_interval
std::chrono::milliseconds await_interval
Interval (and thus duration) of the requests sent to primary sources to await end of recording.
Definition: daqProperties.hpp:61
daq::AsyncOperations::AsyncOperations
AsyncOperations()
Default constructs object with standard async operations.
Definition: daqController.cpp:32
daq::AsyncOperations
Async operations.
Definition: daqController.hpp:46
daq::DaqControllerImpl::m_pending_replies
std::shared_ptr< PendingReplies > m_pending_replies
Definition: daqController.hpp:377
daq::DaqControllerImpl::Starting
Definition: daqController.hpp:301
error.hpp
Contains error related declarations for DAQ.
daq::PrimSource
Keeps relevant state to be able to communicate with a primary data source.
Definition: source.hpp:96