8#include <fmt/ostream.h>
9#include <log4cplus/loggingmacros.h>
10#include <mal/MalException.hpp>
11#include <mal/rr/qos/ReplyTime.hpp>
12#include <nlohmann/json.hpp>
28DaqSources MakeSources(mal::Mal&
mal, DaqContext
const& ctx) {
29 std::vector<PrimSource> psources;
30 std::vector<MetaSource> msources;
32 psources.reserve(ctx.prim_sources.size());
33 for (
auto const& raw : ctx.prim_sources) {
34 psources.emplace_back(
38 {std::make_shared<elt::mal::rr::qos::ReplyTime>(std::chrono::seconds(15))},
42 msources.reserve(ctx.meta_sources.size());
43 for (
auto const& raw : ctx.meta_sources) {
44 msources.emplace_back(
48 {std::make_shared<elt::mal::rr::qos::ReplyTime>(std::chrono::seconds(15))},
52 return DaqSources(std::move(psources), std::move(msources));
56inline constexpr bool always_false_v =
false;
58template <
class Sources>
59bool AllInState(Sources sources,
State state) {
61 sources.begin(), sources.end(), [=](
auto const& s) ->
bool { return s.state == state; });
68 , start([this](auto par) {
69 return daq::op::InitiateOperation<daq::op::StartAsync>(
executor, par);
72 return daq::op::InitiateOperation<op::AbortAsync>(
executor, policy, std::move(par));
75 return daq::op::InitiateOperation<op::StopAsync>(
executor, policy, std::move(par));
77 , await_prim([
this](
auto par) {
78 return daq::op::InitiateAbortableOperation<op::AwaitPrimAsync>(
executor, std::move(par));
88 std::shared_ptr<DpmClient> dpm_client)
92 , m_async_ops(m_executor)
93 , m_dpm_client(std::move(dpm_client)) {
98 std::shared_ptr<ObservableStatus> status,
99 std::shared_ptr<ObservableEventLog> event_log,
101 -> std::shared_ptr<DaqController> {
102 DaqSources sources = MakeSources(m_mal, daq_ctx);
108 std::move(event_log),
113 std::shared_ptr<ObservableStatus> status,
114 std::shared_ptr<ObservableEventLog> event_log)
115 -> std::shared_ptr<DaqController> {
116 return std::make_shared<DpmDaqController>(
117 m_io_ctx, std::move(daq_ctx), std::move(status), std::move(event_log), m_dpm_client);
121 os <<
"DaqController(id='" <<
daq.GetId() <<
"', state=" <<
daq.GetState() <<
")";
127 std::shared_ptr<ObservableStatus> status,
128 std::shared_ptr<ObservableEventLog> event_log)
129 : m_io_ctx(io_context)
130 , m_executor(m_io_ctx)
131 , m_context(std::move(context))
132 , m_status(std::move(status))
133 , m_event_log(std::move(event_log)) {
151 return m_status->GetId();
155 return m_status->HasError();
162boost::signals2::connection
164 return m_sig_context.connect(slot);
167std::shared_ptr<OcmDaqController>
172 std::shared_ptr<ObservableStatus> status,
173 std::shared_ptr<ObservableEventLog> event_log,
177 LOG4CPLUS_TRACE(
"daq", fmt::format(
"OcmDaqController::Create"));
183 std::move(event_log),
191 std::shared_ptr<ObservableStatus> status,
192 std::shared_ptr<ObservableEventLog> event_log,
194 :
CommonDaqController(io_context, std::move(context), std::move(status), std::move(event_log))
195 , m_kw_formatter(kw_formatter)
196 , m_state(
MakeState(GetStatusRef().GetState()))
197 , m_prim_sources(MakeSources<
PrimSource>(sources.GetPrimarySources()))
198 , m_meta_sources(MakeSources<
MetaSource>(sources.GetMetadataSources()))
199 , m_async_ops(std::move(ops))
201 , m_logger(log4cplus::Logger::getInstance(
"daq.ocm.controller")) {
203 throw boost::enable_current_exception(
204 std::invalid_argument(
"Data acquisition id mismatch between DaqContext "
205 "and ObservableStatus"));
209 throw boost::enable_current_exception(
210 std::invalid_argument(
"No data sources or data to merge is provided"));
214 throw boost::enable_current_exception(
215 std::invalid_argument(
"OcmAsyncOperations is invalid"));
222 [&](
auto const& var) {
223 using T = std::decay_t<
decltype(var)>;
224 if constexpr (std::is_same_v<T, NotStarted>) {
226 }
else if constexpr (std::is_same_v<T, Starting>) {
228 }
else if constexpr (std::is_same_v<T, Acquiring>) {
230 }
else if constexpr (std::is_same_v<T, Stopping>) {
232 }
else if constexpr (std::is_same_v<T, Stopped>) {
234 }
else if constexpr (std::is_same_v<T, Aborting>) {
236 }
else if constexpr (std::is_same_v<T, Aborted>) {
239 static_assert(always_false_v<T>,
"non-exhaustive visitor!");
269 LOG4CPLUS_FATAL(m_logger, fmt::format(
"Invalid state provided: '{}'", s));
276 GetStatusRef().SetState(GetState());
319 if (!std::holds_alternative<NotStarted>(
m_state)) {
320 return boost::make_exceptional_future<State>(
321 std::runtime_error(
"Data acquisition is already started"));
328 auto alerts = std::make_unique<op::AlertState>();
331 [alerts = std::move(alerts),
332 daq = std::static_pointer_cast<OcmDaqController>(shared_from_this()),
333 this](boost::future<void> f) {
334 LOG4CPLUS_INFO(
daq->GetLogger(),
335 fmt::format(
"{}: StartAsync: Completed {}",
337 f.has_value() ?
"successfully" :
"with error"));
346 if (f.has_exception()) {
355 }
catch (std::exception
const& e) {
357 fmt::format(
"{}: AsyncStartDaq failed: {}", *
this, e.what()));
358 alert.Set(fmt::format(
"Failed to start DAQ: {}", e.what()));
368 if (!(std::holds_alternative<Acquiring>(
m_state) ||
369 std::holds_alternative<Stopping>(
m_state))) {
370 return boost::make_exceptional_future<Status>(
371 std::runtime_error(fmt::format(
"Data acquisition can only be stopped while Acquiring "
372 "or being Stopped, current state: {}",
385 auto alerts = std::make_unique<op::AlertState>();
388 [alerts = std::move(alerts),
389 daq = std::static_pointer_cast<OcmDaqController>(shared_from_this()),
397 fmt::format(
"{}: StopAsync: Data acquisition modified by other commands. "
398 "Do nothing else (errors are ignored). ",
400 throw std::runtime_error(fmt::format(
401 "Stop command could not be completed because Data Acquisitions state was "
402 "modified in the meantime (current state: {})",
406 fmt::format(
"{}: StopAsync: Completed {}",
408 f.has_value() ?
"successfully" :
"with error"));
416 auto result = f.get();
418 alert.Set(
"Async operation Stop completed with error(s)");
429 }
catch (std::exception
const& e) {
431 fmt::format(
"{}: StopAsync failed: {}", *
this, e.what()));
432 alert.Set(fmt::format(
"Failed to stop DAQ: {}", e.what()));
442 if (std::holds_alternative<NotStarted>(
m_state)) {
443 LOG4CPLUS_INFO(
m_logger, fmt::format(
"{}: Aborting not started data acquisition", *
this));
448 if (std::holds_alternative<Stopped>(
m_state) || std::holds_alternative<Aborted>(
m_state)) {
449 return boost::make_exceptional_future<Status>(
450 std::runtime_error(
"Data acquisition already stopped or aborted"));
458 auto alerts = std::make_unique<op::AlertState>();
461 [alerts = std::move(alerts),
462 daq = std::static_pointer_cast<OcmDaqController>(shared_from_this()),
469 fmt::format(
"{}: AbortAsync: Data acquisition already aborted. "
470 "Do nothing else (errors are ignored). ",
477 fmt::format(
"{}: AbortAsync: Completed, updating DAQ status and "
478 "set reply remaining. Has fatal error={}",
488 auto result = f.get();
490 alert.Set(
"Async operation Abort completed with error(s)");
497 fmt::format(
"{}: AbortAsync: Completed successfully.", *
daq));
499 }
catch (std::exception
const& e) {
501 fmt::format(
"{}: AbortAsync failed: {}", *
this, e.what()));
502 alert.Set(fmt::format(
"Failed to abort DAQ: {}", e.what()));
509 return boost::make_exceptional_future<State>(std::runtime_error(
510 fmt::format(
"ScheduleMergeAsync() is invalid in state: {}",
GetState())));
515 fmt::format(
"DaqController::UpdateKeywords(<omitted>)"),
518 if (std::holds_alternative<Stopped>(
m_state) || std::holds_alternative<Aborted>(
m_state)) {
519 throw boost::enable_current_exception(
520 std::runtime_error(
"Data acquisition already stopped or aborted"));
531 fmt::format(
"DaqController::AwaitAsync({}, {} ms)",
532 sources.empty() ?
"all primary sources" :
"a user defined list of sources",
538 if (!sources.empty()) {
539 for (
auto const& source_id : sources) {
542 await_on.emplace_back(*source_var);
544 return boost::make_exceptional_future<State>(
545 std::invalid_argument(fmt::format(
"Source with id='{}' not found", source_id)));
551 await_on.emplace_back(&source);
557 auto condition = [sources = await_on]() {
558 return std::all_of(sources.begin(), sources.end(), [](
auto var) {
561 return v->GetState() == State::Aborted || v->GetState() == State::Stopped;
569 return boost::make_ready_future<State>(GetState());
575 auto promise = std::make_shared<boost::promise<State>>();
577 if (timeout > std::chrono::milliseconds(0)) {
578 auto timer = std::make_unique<boost::asio::steady_timer>(GetIoCtx(), timeout);
579 timer->async_wait([promise,
580 timer_ptr = timer.get(),
581 daq_weak = std::weak_ptr<OcmDaqController>(
582 std::static_pointer_cast<OcmDaqController>(shared_from_this()))](
583 boost::system::error_code ec) {
586 auto daq = daq_weak.lock();
591 fmt::format(
"{}: AsyncWait: Operation abandoned before completing.",
595 promise->set_exception(DaqOperationAborted(
""));
602 fmt::format(
"{}: AsyncWait: Operation timed out before completing.",
606 daq->m_timers.begin(),
608 [timer_ptr](std::unique_ptr<boost::asio::steady_timer>& val) {
609 return val.get() == timer_ptr;
611 daq->m_timers.end());
614 promise->set_exception(DaqOperationTimeout(
""));
619 m_timers.emplace_back(std::move(timer));
622 auto listener = [condition,
624 daq_weak = std::weak_ptr<OcmDaqController>(
625 std::static_pointer_cast<OcmDaqController>(shared_from_this()))](
State,
627 auto daq = daq_weak.lock();
629 LOG4CPLUS_WARN(
"daq",
"OcmDaqController deleted before await condition was fulfulled");
635 LOG4CPLUS_INFO(
daq->m_logger,
636 fmt::format(
"{}: AwaitAsync: Await condition fulfilled", *
daq));
638 promise->set_value(
daq->GetState());
647 for (
auto& source : await_on) {
649 [
daq = std::static_pointer_cast<OcmDaqController>(shared_from_this()),
653 fmt::format(
"{}: AsyncWait: Attaching listener on source '{}'.", *
daq, *s));
656 typename std::remove_reference<
decltype(*s)>::type::StateSignal::slot_type;
657 s->ConnectStateListener(SlotType(std::move(listener)).track_foreign(
daq));
661 return promise->get_future();
664std::optional<std::variant<Source<PrimSource>*, Source<MetaSource>*>>
665OcmDaqController::FindSource(std::string_view source_id) {
668 std::find_if(m_prim_sources.begin(), m_prim_sources.end(), [source_id](
auto& source) {
669 return source.GetSource().GetName() == source_id;
671 if (it != m_prim_sources.end()) {
677 std::find_if(m_meta_sources.begin(), m_meta_sources.end(), [source_id](
auto& source) {
678 return source.GetSource().GetName() == source_id;
680 if (it != m_meta_sources.end()) {
689template <
class SourceType>
690std::vector<Source<SourceType>> OcmDaqController::MakeSources(std::vector<SourceType> sources) {
692 std::vector<Source<SourceType>> dest;
693 dest.reserve(sources.size());
695 std::make_move_iterator(sources.begin()),
696 std::make_move_iterator(sources.end()),
697 std::back_inserter(dest),
698 [](SourceType&& s) ->
Source<SourceType> { return Source<SourceType>{std::move(s)}; });
702void OcmDaqController::InitiateStopCondition() {
703 AddEvent<ActionEvent>(
704 GetId(),
"DaqController::InitiateStopCondition(): Initiating", GetStatusRef().GetStatus());
705 if (!HasStatefulSources()) {
708 fmt::format(
"{}: InitiateStopCondition: No stateful sources to wait for -> stopping",
713 boost::async(GetIoExecutor(),
714 [
daq = std::static_pointer_cast<OcmDaqController>(shared_from_this())] {
715 daq->StopAsync(ErrorPolicy::Strict);
719 if (m_prim_sources.empty()) {
722 fmt::format(
"{}: InitiateStopCondition: No primary sources to monitor.", *
this));
725 LOG4CPLUS_DEBUG(m_logger, fmt::format(
"{}: InitiateStopCondition: Starting operation.", *
this));
726 auto alerts = std::make_unique<op::AlertState>();
727 auto [future, abort] = m_async_ops.await_prim(MakeAwaitParams(*alerts.get()));
728 m_abort_await_primary_sources = abort;
732 [alerts = std::move(alerts),
733 daq = std::static_pointer_cast<OcmDaqController>(shared_from_this())](
736 daq->m_abort_await_primary_sources =
nullptr;
739 auto result = fut.get();
740 LOG4CPLUS_DEBUG(
daq->m_logger,
741 fmt::format(
"{}: InitiateStopCondition: Adding {} files from "
744 result.result.size()));
749 daq->EmitContextSignal();
752 if (!std::holds_alternative<Acquiring>(
daq->m_state)) {
756 "{}: InitiateStopCondition: "
757 "AwaitAsync completed but another operation has already transitioned "
758 "DAQ from Acquiring so automatic stop will not be performed.",
764 "DaqController::InitiateStopCondition(): "
765 "Primary sources completed. Performing automatic stop of metadata sources",
766 daq->GetStatusRef().GetStatus());
770 op::MergeAlerts(
daq->GetStatusRef(), *alerts);
777 daq->StopAsync(ErrorPolicy::Strict);
781void OcmDaqController::CancelAwaitPrimarySources() {
782 if (m_abort_await_primary_sources) {
783 LOG4CPLUS_TRACE(m_logger,
"OcmDaqController::CancelAwaitPrimarySources()");
784 m_abort_await_primary_sources();
785 m_abort_await_primary_sources =
nullptr;
789DpmDaqController::DpmDaqController(boost::asio::io_context& io_context,
791 std::shared_ptr<ObservableStatus> status,
792 std::shared_ptr<ObservableEventLog> event_log,
793 std::shared_ptr<DpmClient> dpm_client)
794 :
CommonDaqController(io_context, std::move(context), std::move(status), std::move(event_log))
795 , m_liveness(std::make_shared<bool>(true))
796 , m_dpm_client(std::move(dpm_client))
797 , m_logger(log4cplus::Logger::getInstance(
"daq.ocm.controller")) {
798 assert(m_dpm_client);
799 UpdateStateContext();
808 m_status_connection = m_dpm_client->ConnectStatusSignal(
809 [
id =
GetId(), weak = std::weak_ptr<bool>(m_liveness),
this](
Status const& status) {
810 if (
id != status.id) {
814 auto lock = weak.lock();
817 LOG4CPLUS_DEBUG(
"daq",
"DpmDaqController is abandoned: " <<
this);
822 if (current.timestamp > status.timestamp) {
825 "Ignoring DAQ status update: From DPM: " << status <<
", current: " << current);
829 LOG4CPLUS_TRACE(m_logger, *
this <<
": Assigning new DAQ status from DPM: " << status);
831 UpdateStateContext();
836 return boost::make_exceptional_future<State>(
837 std::runtime_error(fmt::format(
"StartAsync() is invalid in state: {}",
GetState())));
841 return boost::make_exceptional_future<Status>(
842 std::runtime_error(fmt::format(
"StopAsync() is invalid in state: {}",
GetState())));
847 return boost::make_exceptional_future<State>(std::runtime_error(
848 fmt::format(
"AwaitAsync() with sources is invalid in state: {}",
GetState())));
852 throw std::runtime_error(fmt::format(
"UpdateKeywords() is invalid in state: {}",
GetState()));
860 LOG4CPLUS_TRACE(m_logger, *
this <<
": DpmDaqController::ScheduleMergeAsync");
866 return boost::make_exceptional_future<State>(std::runtime_error(
867 fmt::format(
"ScheduleMergeAsync() is invalid in state: {}",
GetState())));
870 assert(std::holds_alternative<NotScheduled>(m_state_ctx));
871 auto& ctx = std::get<NotScheduled>(m_state_ctx);
872 if (ctx.schedule_reply_pending) {
874 return boost::make_exceptional_future<State>(
875 std::logic_error(
"ScheduleMergeAsync() a request is already in flight"));
881 m_dp_spec = j.dump(2);
882 LOG4CPLUS_DEBUG(m_logger,
"Created DpSpec:\n" << *m_dp_spec);
884 auto status = std::string();
890 return m_dpm_client->ScheduleAsync(*m_dp_spec, status)
892 [starting_state, weak = weak_from_this(),
this](boost::future<State> f) ->
State {
894 auto shared = weak.lock();
896 auto state = f.get();
908 status.ClearAlert(alert_id);
909 LOG4CPLUS_INFO(m_logger,
910 fmt::format(
"{}: Scheduled DAQ successfully", *
this));
912 LOG4CPLUS_DEBUG(
"daq",
"DpmDaqController is abandoned: " <<
this);
915 }
catch (elt::mal::TimeoutException
const&) {
918 LOG4CPLUS_INFO(m_logger,
919 fmt::format(
"{}: Schedule DAQ failed with timeout "
920 "(no error flag set for this condition)",
924 }
catch (std::exception
const& e) {
930 fmt::format(
"Failed to schedule DAQ for merging: {}", e.what())));
931 LOG4CPLUS_ERROR(m_logger, fmt::format(
"{}: Scheduled DAQ failed", *
this));
939 LOG4CPLUS_TRACE(m_logger, *
this <<
": DpmDaqController::AbortAsync");
945 assert(std::holds_alternative<NotScheduled>(m_state_ctx));
946 if (!std::get<NotScheduled>(m_state_ctx).schedule_reply_pending) {
948 return boost::make_ready_future<Status>(*
GetStatus());
955 return m_dpm_client->AbortAsync(
GetId()).then(
956 GetIoExecutor(), [policy, weak = weak_from_this(),
this](boost::future<State> f) ->
Status {
957 auto shared = weak.lock();
960 LOG4CPLUS_DEBUG(
"daq",
"DpmDaqController is abandoned: " <<
this);
961 throw boost::enable_current_exception(std::runtime_error(
"Operation aborted"));
967 auto result = f.get();
972 }
catch (std::exception
const& e) {
976 LOG4CPLUS_ERROR(m_logger,
978 <<
": Request to abort DAQ to DPM returned with "
979 "error. ErrorPolicy::Tolerant is used so DAQ is marked "
980 "as aborted anyway");
982 fmt::format(
"Async operation Abort completed with error(s): {}", e.what()));
991void DpmDaqController::UpdateStateContext() {
998 m_state_ctx = std::monostate();
1002void DpmDaqController::SetState(
State state) {
1005 if (prev != state) {
1006 UpdateStateContext();
Contains declaration for the AbortAsync operation.
Contains declaration for the AwaitPrimAsync operation.
Implements common behaviour of OcmDaqController and DpmDaqController.
ObservableStatus & GetStatusRef() noexcept
DaqContext const & GetContext() const DAQ_NOEXCEPT override
std::shared_ptr< ObservableStatus > GetStatus() DAQ_NOEXCEPT override
bool GetErrorFlag() const DAQ_NOEXCEPT override
std::shared_ptr< ObservableEventLog > GetEventLog() DAQ_NOEXCEPT override
boost::signals2::connection ConnectContext(ContextSignal::slot_type const &slot) override
Connect observer that is invoked when context is modified.
rad::IoExecutor & GetIoExecutor() noexcept
std::string const & GetId() const DAQ_NOEXCEPT override
DaqContext & GetContextMut() noexcept
CommonDaqController(boost::asio::io_context &io_context, DaqContext context, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log)
ObservableEventLog & GetEventLogRef() noexcept
auto MakeDpmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log) -> std::shared_ptr< DaqController > override
Create instance for the DPM phase of the DAQ process.
auto MakeOcmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, fits::KeywordFormatter const &kw_formatter) -> std::shared_ptr< DaqController > override
Create instance for the OCM phase of the DAQ process.
DaqControllerFactoryDefault(boost::asio::io_context &io_ctx, elt::mal::Mal &m_mal, std::shared_ptr< DpmClient > dpm_client)
Controls the execution of single data acquisition that ultimately result in a set of FITS keywords an...
Data acquisition sources.
boost::future< Status > AbortAsync(ErrorPolicy policy) override
Attempts to abort Data Acquisition.
void UpdateKeywords(fits::KeywordVector const &keywords) override
boost::future< State > AwaitAsync(std::vector< std::string > sources, std::chrono::milliseconds timeout) override
boost::future< State > ScheduleMergeAsync() override
Schedules DAQ for merging by sending request to DPM.
State GetState() const DAQ_NOEXCEPT override
boost::future< Status > StopAsync(ErrorPolicy policy) override
boost::future< State > StartAsync() override
void AddEvent(EventLog::EventType event)
Records that a file has been produced for this data acquisition.
Provide more consise way to set/clear alerts.
Defer signal changes until later time.
State GetState() const noexcept
Status const & GetStatus() const noexcept
Connect observer that is invoked when state is modified.
void SetState(State s) noexcept
Set state of data acquisition.
Implements daq::DaqController for states responsible to be executed by OCM.
boost::future< State > StartAsync() override
Starts the data acquisition.
static std::shared_ptr< OcmDaqController > Create(boost::asio::io_context &io_context, fits::KeywordFormatter const &kw_formatter, DaqContext context, DaqSources const &sources, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, OcmAsyncOperations operations)
Construct object.
constexpr log4cplus::Logger const & GetLogger() const noexcept
std::vector< Source< PrimSource > > m_prim_sources
Note: Consider vector immutable!
State GetState() const DAQ_NOEXCEPT override
op::AsyncOpParams MakeParams(op::AlertState &)
Constructs the parameters used for asynchronous operations.
StateVariant MakeState(State s) const noexcept
log4cplus::Logger m_logger
OcmDaqController(boost::asio::io_context &io_context, fits::KeywordFormatter const &kw_formatter, DaqContext context, DaqSources const &sources, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, OcmAsyncOperations ops)
std::optional< std::variant< Source< PrimSource > *, Source< MetaSource > * > > FindSource(std::string_view source_id)
auto HasStatefulSources() noexcept -> bool
Queries if ctx has stateful sources (primary or metadata source).
void UpdateKeywords(fits::KeywordVector const &keywords) override
Updates (replace or add) list of keywords.
std::shared_ptr< PendingReplies > m_pending_replies
std::variant< NotStarted, Starting, Acquiring, Stopping, Stopped, Aborting, Aborted > StateVariant
OcmAsyncOperations m_async_ops
std::vector< Source< MetaSource > > m_meta_sources
Note: Consider vector immutable!
boost::future< State > ScheduleMergeAsync() override
Schedules DAQ for merging by sending request to DPM.
boost::future< State > AwaitAsync(std::vector< std::string > sources, std::chrono::milliseconds timeout) override
Awaits that data acquisition stops or aborts.
void CancelAwaitPrimarySources()
Cancels the awaiting of primary sources.
void InitiateStopCondition()
Initiate async operation that triggers DAQ stop automatically.
void AddInitialKeywords()
op::AwaitOpParams MakeAwaitParams(op::AlertState &)
fits::KeywordFormatter const & m_kw_formatter
boost::future< Status > AbortAsync(ErrorPolicy policy) override
Aborts the data acquisition.
void SetState(StateVariant &&s) noexcept
boost::future< Status > StopAsync(ErrorPolicy policy) override
Stops the data acquisition.
Simple class that allows you to keep track of how many replies are pending.
Keeps relevant state to be able to communicate with a primary data source.
recif::RecCmdsAsync RrClient
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Declares JSON support for serialization.
Contains data structure for FITS keywords.
Contains error related declarations for DAQ.
Contains declarations for the helper functions to initiate operations.
constexpr std::string_view REQUEST
Request.
constexpr std::string_view DAQ_CONTROLLER
Daq controller command failed which is normally when there was an exception from async operations.
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
void MergeAlerts(ObservableStatus &dest, AlertState &src)
Merge alerts.
void UpdateKeywords(fits::KeywordVector &out, fits::KeywordVector const &in, fits::KeywordFormatter const &fmt)
Updates (adds or replaces) primary HDU keywords.
AlertId MakeAlertId(std::string_view category, std::string key)
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
void AddDpParts(DaqContext &ctx, std::vector< DpPart > const &parts)
std::ostream & operator<<(std::ostream &os, AsyncProcessIf const &proc)
Formats proc representation in the form [<pid>] <args>
ErrorPolicy
Error policy supported by certain operations.
@ Tolerant
Errors that can be ignored with partial completion of a command will be tolerated and is reported as ...
State
Observable states of the data acquisition process.
@ NotScheduled
Before daq is acknowledged by dpm it remains in NotScheduled.
@ Aborted
Data acquisition has been aborted by user.
@ Stopping
Transitional state between Acquiring and Stopped.
@ Acquiring
All data sources have reported data acquisition is in progress.
@ Stopped
All data sources have reported they have stopped acquiring data.
@ Starting
Transitional state between NotStarted and Acquiring when sources have not begun acquiring data yet.
@ AbortingAcquiring
Transitional state for aborting during acquisition.
@ NotStarted
Initial state of data acquisition.
void to_json(nlohmann::json &j, Status const &p)
json::DpSpec MakeDataProductSpecification(DaqContext const &ctx, log4cplus::Logger &logger)
Creates a Data Product Specification as serialized JSON from the provided DaqContext.
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
Utility class that represents a result and an error.
Contains declaration for for DaqController.
Contains declaration for the StartAsync operation.
Contains declaration for the StopAsync operation.
Event related to an action being requested or performed.
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
bool IsValid() const noexcept
std::function< boost::future< Result< void > >(ErrorPolicy, op::AsyncOpParams)> abort
std::reference_wrapper< rad::IoExecutor > executor
std::function< AwaitReturnType(op::AwaitOpParams)> await_prim
std::function< boost::future< void >(op::AsyncOpParams)> start
OcmAsyncOperations(rad::IoExecutor &executor)
Default constructs object with standard async operations.
std::function< boost::future< Result< DpParts > >(ErrorPolicy, op::AsyncOpParams)> stop
Simple class that holds the source and associated state.
Non observable status object that keeps stores status of data acquisition.
Parameters required for each async operation.
Await specific parameters that is not provided with AsyncOpParams.