7 #include <fmt/format.h>
8 #include <fmt/ostream.h>
9 #include <log4cplus/loggingmacros.h>
22 inline constexpr
bool always_false_v =
false;
24 template <
class Sources>
25 bool AllInState(Sources sources,
State state) {
27 sources.begin(), sources.end(), [=](
auto const& s) ->
bool { return s.state == state; });
33 : start([](auto par) {
return daq::op::InitiateOperation<daq::op::StartAsync>(par); })
35 return daq::op::InitiateOperation<op::AbortAsync>(policy, std::move(par));
38 return daq::op::InitiateOperation<op::StopAsync>(policy, std::move(par));
40 , await_prim([](
auto par) {
41 return daq::op::InitiateAbortableOperation<op::AwaitPrimAsync>(std::move(par));
50 os <<
"DaqController(id='" <<
daq.GetId() <<
"', state=" <<
daq.GetState() <<
")";
56 case State::NotStarted:
62 case State::Acquiring:
85 std::shared_ptr<DaqControllerImpl>
88 std::shared_ptr<ObservableStatus> status,
89 std::shared_ptr<ObservableEventLog> event_log,
93 LOG4CPLUS_TRACE(
"daq", fmt::format(
"DaqControllerImpl::Create"));
94 auto fits_ctl = std::make_unique<FitsControllerImpl>(properties, event_log);
96 std::move(properties),
105 std::unique_ptr<FitsController> fits_ctl,
106 std::shared_ptr<ObservableStatus> status,
107 std::shared_ptr<ObservableEventLog> event_log,
110 , m_io_ctx(io_context)
111 , m_executor(io_context)
112 , m_properties(std::move(properties))
113 , m_fits_ctl(std::move(fits_ctl))
114 , m_status(std::move(status))
115 , m_event_log(std::move(event_log))
116 , m_prim_sources(MakeSources<
PrimSource>(m_properties.prim_sources))
117 , m_meta_sources(MakeSources<
MetaSource>(m_properties.meta_sources))
118 , m_async_ops(std::move(ops))
120 , m_logger(log4cplus::Logger::getInstance(
"daq")) {
125 throw std::invalid_argument(
126 "Data acquisition id mismatch between DaqProperties and ObservableStatus");
130 throw std::invalid_argument(
"No data sources provided");
134 throw std::invalid_argument(
"AsyncOperations is invalid");
141 [&](
auto const& var) {
142 using T = std::decay_t<decltype(var)>;
143 if constexpr (std::is_same_v<T, NotStarted>) {
144 s = State::NotStarted;
145 }
else if constexpr (std::is_same_v<T, Starting>) {
147 }
else if constexpr (std::is_same_v<T, Acquiring>) {
148 s = State::Acquiring;
149 }
else if constexpr (std::is_same_v<T, Stopping>) {
151 }
else if constexpr (std::is_same_v<T, Stopped>) {
153 }
else if constexpr (std::is_same_v<T, Aborting>) {
155 }
else if constexpr (std::is_same_v<T, Aborted>) {
158 static_assert(always_false_v<T>,
"non-exhaustive visitor!");
191 case State::NotStarted:
193 case State::Starting:
195 case State::Acquiring:
197 case State::Stopping:
201 case State::Aborting:
206 LOG4CPLUS_FATAL(m_logger, fmt::format(
"Invalid state provided: '{}'", s));
211 m_status->SetError(error);
217 m_status->SetState(GetState());
248 if (!std::holds_alternative<NotStarted>(
m_state)) {
249 return boost::make_exceptional_future<State>(
250 std::runtime_error(
"Data acquisition is already started"));
258 }
catch (std::exception
const& e) {
261 fmt::format(
"{}: StartAsync: FitsController::Start failed (aborting start): {}",
265 return boost::make_exceptional_future<State>();
270 [
daq = std::static_pointer_cast<DaqControllerImpl>(shared_from_this())](
271 boost::future<void> f) {
272 LOG4CPLUS_INFO(
daq->GetLogger(),
273 fmt::format(
"{}: StartAsync: Completed {}",
275 f.has_value() ?
"successfully" :
"with error"));
277 if (f.has_exception()) {
278 daq->SetErrorFlag(
true);
282 daq->InitiateAwaitPrimarySources();
285 daq->SetErrorFlag(
false);
287 return daq->GetState();
295 if (std::holds_alternative<Stopped>(
m_state)) {
296 return boost::make_exceptional_future<Status>(
297 std::runtime_error(
"Data acquisition already stopped"));
300 if (std::holds_alternative<Aborted>(
m_state)) {
301 return boost::make_exceptional_future<Status>(
302 std::runtime_error(
"Data acquisition already aborted"));
305 if (!std::holds_alternative<Acquiring>(
m_state)) {
306 return boost::make_exceptional_future<Status>(
307 std::runtime_error(
"Cannot stop a data acquisition that is not Acquiring"));
318 [policy,
daq = std::static_pointer_cast<DaqControllerImpl>(shared_from_this())](
320 LOG4CPLUS_INFO(
daq->GetLogger(),
321 fmt::format(
"{}: StopAsync: Completed {}",
323 f.has_value() ?
"successfully" :
"with error"));
329 daq->m_fits_ctl->UpdateKeywords(
daq->m_status->GetKeywords());
330 auto res =
daq->m_fits_ctl->Stop(policy);
332 daq->m_status->AddFiles({*res});
336 daq->SetErrorFlag(
true);
338 }
catch (std::exception
const& e) {
339 LOG4CPLUS_ERROR(
daq->GetLogger(),
340 fmt::format(
"{}: StopAsync: Failed to create OCM FITS "
344 daq->SetErrorFlag(
true);
347 daq->SetErrorFlag(
true);
352 if (f.has_exception()) {
353 daq->SetErrorFlag(
true);
357 auto result = f.get();
358 daq->m_status->AddFiles(result.result);
362 daq->SetErrorFlag(result.error);
364 return daq->GetStatus()->GetStatus();
370 GetId(), fmt::format(
"DaqController::AbortAsync({})", policy),
m_status->GetStatus()));
372 if (std::holds_alternative<NotStarted>(
m_state)) {
373 LOG4CPLUS_INFO(
m_logger, fmt::format(
"{}: Aborting not started data acquisition", *
this));
378 if (std::holds_alternative<Stopped>(
m_state) || std::holds_alternative<Aborted>(
m_state)) {
379 return boost::make_exceptional_future<Status>(
380 std::runtime_error(
"Data acquisition already stopped or aborted"));
387 [policy,
daq = std::static_pointer_cast<DaqControllerImpl>(shared_from_this())](
394 fmt::format(
"{}: AbortAsync: Data acquisition already aborted. "
395 "Do nothing else (errors are ignored). ",
399 return daq->GetStatus()->GetStatus();
401 LOG4CPLUS_DEBUG(
daq->GetLogger(),
402 fmt::format(
"{}: AbortAsync: Completed, updating DAQ status and "
403 "set reply remaining. Has fatal error={}",
410 daq->m_fits_ctl->Abort(policy);
412 daq->SetErrorFlag(
true);
419 if (f.has_exception()) {
422 fmt::format(
"{}: AbortAsync: Completed with fatal error.", *daq));
424 daq->SetErrorFlag(true);
427 auto result = f.get();
428 daq->SetErrorFlag(result.error);
432 LOG4CPLUS_INFO(
daq->GetLogger(),
433 fmt::format(
"{}: AbortAsync: Completed successfully.", *
daq));
434 return daq->GetStatus()->GetStatus();
440 GetId(), fmt::format(
"DaqController::UpdateKeywords(<omitted>)"), m_status->GetStatus()));
442 if (std::holds_alternative<Stopped>(m_state) || std::holds_alternative<Aborted>(m_state)) {
443 throw boost::enable_current_exception(
444 std::runtime_error(
"Data acquisition already stopped or aborted"));
447 m_status->UpdateKeywords(keywords);
451 DaqControllerImpl::AwaitAsync(std::vector<std::string> sources, std::chrono::milliseconds timeout) {
454 fmt::format(
"DaqController::AwaitAsync({}, {} ms)",
455 sources.empty() ?
"all primary sources" :
"a user defined list of sources",
457 m_status->GetStatus()));
460 std::variant<gsl::not_null<Source<PrimSource>*>, gsl::not_null<Source<MetaSource>*>>>
463 if (!sources.empty()) {
464 for (
auto const& source_id : sources) {
465 auto source_var = FindSource(source_id);
467 await_on.emplace_back(*source_var);
469 return boost::make_exceptional_future<State>(
470 std::invalid_argument(fmt::format(
"Source with id='{}' not found", source_id)));
475 for (
auto& source : m_prim_sources) {
476 await_on.emplace_back(&source);
482 auto condition = [sources = await_on]() {
483 return std::all_of(sources.begin(), sources.end(), [](
auto var) {
486 return v->GetState() == State::Aborted || v->GetState() == State::Stopped;
494 return boost::make_ready_future<State>(GetState());
500 auto promise = std::make_shared<boost::promise<State>>();
502 if (timeout > std::chrono::milliseconds(0)) {
503 auto timer = std::make_unique<boost::asio::steady_timer>(m_io_ctx, timeout);
504 timer->async_wait([promise,
505 timer_ptr = timer.get(),
506 daq_weak = std::weak_ptr<DaqControllerImpl>(
507 std::static_pointer_cast<DaqControllerImpl>(shared_from_this()))](
508 boost::system::error_code ec) {
511 auto daq = daq_weak.lock();
516 fmt::format(
"{}: AsyncWait: Operation abandoned before completing.",
520 promise->set_exception(DaqOperationAborted(
""));
527 fmt::format(
"{}: AsyncWait: Operation timed out before completing.",
531 daq->m_timers.begin(),
533 [timer_ptr](std::unique_ptr<boost::asio::steady_timer>& val) {
534 return val.get() == timer_ptr;
536 daq->m_timers.end());
539 promise->set_exception(DaqOperationTimeout(
""));
544 m_timers.emplace_back(std::move(timer));
547 auto listener = [condition,
549 daq_weak = std::weak_ptr<DaqControllerImpl>(
550 std::static_pointer_cast<DaqControllerImpl>(shared_from_this()))](
State,
552 auto daq = daq_weak.lock();
554 LOG4CPLUS_WARN(
"daq",
"DaqControllerImpl deleted before await condition was fulfulled");
560 LOG4CPLUS_INFO(
daq->m_logger,
561 fmt::format(
"{}: AwaitAsync: Await condition fulfilled", *
daq));
563 promise->set_value(
daq->GetState());
572 for (
auto& source : await_on) {
574 [
daq = std::static_pointer_cast<DaqControllerImpl>(shared_from_this()),
578 fmt::format(
"{}: AsyncWait: Attaching listener on source '{}'.", *
daq, *s));
580 using SlotType =
typename std::remove_reference<decltype(
581 *s.get())>::type::StateSignal::slot_type;
582 s->ConnectStateListener(SlotType(std::move(listener)).track_foreign(
daq));
586 return promise->get_future();
589 std::optional<std::variant<gsl::not_null<Source<PrimSource>*>, gsl::not_null<Source<MetaSource>*>>>
590 DaqControllerImpl::FindSource(std::string_view source_id) {
593 std::find_if(m_prim_sources.begin(), m_prim_sources.end(), [source_id](
auto& source) {
594 return source.GetSource().GetName() == source_id;
596 if (it != m_prim_sources.end()) {
597 return gsl::not_null<Source<PrimSource>*>(&(*it));
602 std::find_if(m_meta_sources.begin(), m_meta_sources.end(), [source_id](
auto& source) {
603 return source.GetSource().GetName() == source_id;
605 if (it != m_meta_sources.end()) {
606 return gsl::not_null<Source<MetaSource>*>(&(*it));
614 template <
class SourceType>
615 std::vector<Source<SourceType>> DaqControllerImpl::MakeSources(std::vector<SourceType> sources) {
617 std::vector<Source<SourceType>> dest;
618 dest.reserve(sources.size());
620 std::make_move_iterator(sources.begin()),
621 std::make_move_iterator(sources.end()),
622 std::back_inserter(dest),
623 [](SourceType&& s) ->
Source<SourceType> { return Source<SourceType>{std::move(s)}; });
627 void DaqControllerImpl::InitiateAwaitPrimarySources() {
628 AddEvent<ActionEvent>(GetId(),
629 "DaqController::InitiateAwaitPrimarySources(): Initiating",
630 m_status->GetStatus());
631 if (m_prim_sources.empty()) {
634 fmt::format(
"{}: InitiateAwaitPrimarySources: No primary sources to monitor.", *
this));
637 LOG4CPLUS_DEBUG(m_logger,
638 fmt::format(
"{}: InitiateAwaitPrimarySources: Starting operation.", *
this));
639 auto [future, abort] = m_async_ops.await_prim(MakeAwaitParams());
640 m_abort_await_primary_sources = abort;
644 [
daq = std::static_pointer_cast<DaqControllerImpl>(shared_from_this())](boost::future<
Result<DpParts>> fut) {
646 auto result = fut.get();
647 LOG4CPLUS_DEBUG(
daq->m_logger,
648 fmt::format(
"{}: InitiateAwaitPrimarySources: Adding {} files from "
649 "primary sources", *
daq,
650 result.result.size()));
651 daq->m_status->AddFiles({result.result});
655 if (!std::holds_alternative<Acquiring>(
daq->m_state)) {
659 "{}: InitiateAwaitPrimarySources: "
660 "AwaitAsync completed but another operation has already transitioned "
661 "DAQ from Acquiring so automatic stop will not be performed.",
667 "DaqController::InitiateAwaitPrimarySources(): "
668 "Primary sources completed. Performing automatic stop of metadata sources",
669 daq->m_status->GetStatus());
675 daq->StopAsync(ErrorPolicy::Strict);