7 #include <fmt/format.h>
8 #include <fmt/ostream.h>
9 #include <log4cplus/loggingmacros.h>
10 #include <mal/MalException.hpp>
11 #include <mal/rr/qos/ReplyTime.hpp>
12 #include <nlohmann/json.hpp>
26 DaqSources MakeSources(mal::Mal&
mal, DaqContext
const& ctx) {
27 std::vector<PrimSource> psources;
28 std::vector<MetaSource> msources;
30 psources.reserve(ctx.prim_sources.size());
31 for (
auto const& raw : ctx.prim_sources) {
32 psources.emplace_back(
36 {std::make_shared<elt::mal::rr::qos::ReplyTime>(std::chrono::seconds(15))},
40 msources.reserve(ctx.meta_sources.size());
41 for (
auto const& raw : ctx.meta_sources) {
42 msources.emplace_back(
46 {std::make_shared<elt::mal::rr::qos::ReplyTime>(std::chrono::seconds(15))},
50 return DaqSources(std::move(psources), std::move(msources));
54 inline constexpr
bool always_false_v =
false;
56 template <
class Sources>
57 bool AllInState(Sources sources,
State state) {
59 sources.begin(), sources.end(), [=](
auto const& s) ->
bool { return s.state == state; });
65 : start([](auto par) {
return daq::op::InitiateOperation<daq::op::StartAsync>(par); })
67 return daq::op::InitiateOperation<op::AbortAsync>(policy, std::move(par));
70 return daq::op::InitiateOperation<op::StopAsync>(policy, std::move(par));
72 , await_prim([](
auto par) {
73 return daq::op::InitiateAbortableOperation<op::AwaitPrimAsync>(std::move(par));
83 std::shared_ptr<DpmClient> dpm_client)
84 : m_io_ctx(io_ctx), m_mal(
mal), m_async_ops(), m_dpm_client(std::move(dpm_client)) {
89 std::shared_ptr<ObservableStatus> status,
90 std::shared_ptr<ObservableEventLog> event_log)
91 -> std::shared_ptr<DaqController> {
92 DaqSources sources = MakeSources(m_mal, daq_ctx);
102 std::shared_ptr<ObservableStatus> status,
103 std::shared_ptr<ObservableEventLog> event_log)
104 -> std::shared_ptr<DaqController> {
105 return std::make_shared<DpmDaqController>(
106 m_io_ctx, std::move(daq_ctx), std::move(status), std::move(event_log), m_dpm_client);
110 os <<
"DaqController(id='" <<
daq.GetId() <<
"', state=" <<
daq.GetState() <<
")";
116 std::shared_ptr<ObservableStatus> status,
117 std::shared_ptr<ObservableEventLog> event_log)
118 : m_io_ctx(io_context)
119 , m_executor(m_io_ctx)
120 , m_context(std::move(context))
121 , m_status(std::move(status))
122 , m_event_log(std::move(event_log)) {
140 return m_status->GetId();
144 return m_status->GetError();
151 boost::signals2::connection
153 return m_sig_context.connect(slot);
156 std::shared_ptr<OcmDaqController>
160 std::shared_ptr<ObservableStatus> status,
161 std::shared_ptr<ObservableEventLog> event_log,
165 LOG4CPLUS_TRACE(
"daq", fmt::format(
"OcmDaqController::Create"));
170 std::move(event_log),
177 std::shared_ptr<ObservableStatus> status,
178 std::shared_ptr<ObservableEventLog> event_log,
180 :
CommonDaqController(io_context, std::move(context), std::move(status), std::move(event_log))
182 , m_state(
MakeState(GetStatusRef().GetState()))
183 , m_prim_sources(MakeSources<
PrimSource>(sources.GetPrimarySources()))
184 , m_meta_sources(MakeSources<
MetaSource>(sources.GetMetadataSources()))
185 , m_async_ops(std::move(ops))
187 , m_logger(log4cplus::Logger::getInstance(
"daq.ocm.controller")) {
189 throw boost::enable_current_exception(
190 std::invalid_argument(
"Data acquisition id mismatch between DaqContext "
191 "and ObservableStatus"));
195 throw boost::enable_current_exception(std::invalid_argument(
"No data sources provided"));
199 throw boost::enable_current_exception(
200 std::invalid_argument(
"OcmAsyncOperations is invalid"));
207 [&](
auto const& var) {
208 using T = std::decay_t<decltype(var)>;
209 if constexpr (std::is_same_v<T, NotStarted>) {
211 }
else if constexpr (std::is_same_v<T, Starting>) {
213 }
else if constexpr (std::is_same_v<T, Acquiring>) {
215 }
else if constexpr (std::is_same_v<T, Stopping>) {
217 }
else if constexpr (std::is_same_v<T, Stopped>) {
219 }
else if constexpr (std::is_same_v<T, Aborting>) {
221 }
else if constexpr (std::is_same_v<T, Aborted>) {
224 static_assert(always_false_v<T>,
"non-exhaustive visitor!");
254 LOG4CPLUS_FATAL(m_logger, fmt::format(
"Invalid state provided: '{}'", s));
259 GetStatusRef().SetError(
error);
265 GetStatusRef().SetState(GetState());
302 if (!std::holds_alternative<NotStarted>(
m_state)) {
303 return boost::make_exceptional_future<State>(
304 std::runtime_error(
"Data acquisition is already started"));
311 auto alerts = std::make_unique<op::AlertState>();
314 [alerts = std::move(alerts),
315 daq = std::static_pointer_cast<OcmDaqController>(shared_from_this())](
316 boost::future<void> f) {
317 LOG4CPLUS_INFO(
daq->GetLogger(),
318 fmt::format(
"{}: StartAsync: Completed {}",
320 f.has_value() ?
"successfully" :
"with error"));
326 if (f.has_exception()) {
327 daq->SetErrorFlag(
true);
331 daq->InitiateAwaitPrimarySources();
334 daq->SetErrorFlag(
false);
336 return daq->GetState();
344 if (std::holds_alternative<Stopped>(
m_state)) {
345 return boost::make_exceptional_future<Status>(
346 std::runtime_error(
"Data acquisition already stopped"));
349 if (std::holds_alternative<Aborted>(
m_state)) {
350 return boost::make_exceptional_future<Status>(
351 std::runtime_error(
"Data acquisition already aborted"));
354 if (!std::holds_alternative<Acquiring>(
m_state)) {
355 return boost::make_exceptional_future<Status>(
356 std::runtime_error(
"Cannot stop a data acquisition that is not Acquiring"));
368 auto alerts = std::make_unique<op::AlertState>();
371 [alerts = std::move(alerts),
372 daq = std::static_pointer_cast<OcmDaqController>(shared_from_this())](
380 fmt::format(
"{}: StopAsync: Data acquisition modified by other commands. "
381 "Do nothing else (errors are ignored). ",
383 throw std::runtime_error(fmt::format(
384 "Stop command could not be completed because Data Acquisitions state was "
385 "modified in the meantime (current state: {})",
388 LOG4CPLUS_INFO(
daq->GetLogger(),
389 fmt::format(
"{}: StopAsync: Completed {}",
391 f.has_value() ?
"successfully" :
"with error"));
395 if (f.has_exception()) {
396 daq->SetErrorFlag(true);
400 auto result = f.get();
402 daq->EmitContextSignal();
406 daq->SetErrorFlag(result.error);
408 return daq->GetStatus()->GetStatus();
416 if (std::holds_alternative<NotStarted>(
m_state)) {
417 LOG4CPLUS_INFO(
m_logger, fmt::format(
"{}: Aborting not started data acquisition", *
this));
422 if (std::holds_alternative<Stopped>(
m_state) || std::holds_alternative<Aborted>(
m_state)) {
423 return boost::make_exceptional_future<Status>(
424 std::runtime_error(
"Data acquisition already stopped or aborted"));
432 auto alerts = std::make_unique<op::AlertState>();
435 [alerts = std::move(alerts),
436 daq = std::static_pointer_cast<OcmDaqController>(shared_from_this())](
443 fmt::format(
"{}: AbortAsync: Data acquisition already aborted. "
444 "Do nothing else (errors are ignored). ",
448 return daq->GetStatus()->GetStatus();
450 LOG4CPLUS_DEBUG(
daq->GetLogger(),
451 fmt::format(
"{}: AbortAsync: Completed, updating DAQ status and "
452 "set reply remaining. Has fatal error={}",
460 if (f.has_exception()) {
463 fmt::format(
"{}: AbortAsync: Completed with fatal error.", *daq));
465 daq->SetErrorFlag(true);
468 auto result = f.get();
469 daq->SetErrorFlag(result.error);
473 LOG4CPLUS_INFO(
daq->GetLogger(),
474 fmt::format(
"{}: AbortAsync: Completed successfully.", *
daq));
475 return daq->GetStatus()->GetStatus();
480 return boost::make_exceptional_future<State>(std::runtime_error(
481 fmt::format(
"ScheduleMergeAsync() is invalid in state: {}",
GetState())));
486 fmt::format(
"DaqController::UpdateKeywords(<omitted>)"),
489 if (std::holds_alternative<Stopped>(
m_state) || std::holds_alternative<Aborted>(
m_state)) {
490 throw boost::enable_current_exception(
491 std::runtime_error(
"Data acquisition already stopped or aborted"));
502 fmt::format(
"DaqController::AwaitAsync({}, {} ms)",
503 sources.empty() ?
"all primary sources" :
"a user defined list of sources",
508 std::variant<gsl::not_null<Source<PrimSource>*>, gsl::not_null<Source<MetaSource>*>>>
511 if (!sources.empty()) {
512 for (
auto const& source_id : sources) {
515 await_on.emplace_back(*source_var);
517 return boost::make_exceptional_future<State>(
518 std::invalid_argument(fmt::format(
"Source with id='{}' not found", source_id)));
524 await_on.emplace_back(&source);
530 auto condition = [sources = await_on]() {
531 return std::all_of(sources.begin(), sources.end(), [](
auto var) {
534 return v->GetState() == State::Aborted || v->GetState() == State::Stopped;
542 return boost::make_ready_future<State>(GetState());
548 auto promise = std::make_shared<boost::promise<State>>();
550 if (timeout > std::chrono::milliseconds(0)) {
551 auto timer = std::make_unique<boost::asio::steady_timer>(GetIoCtx(), timeout);
552 timer->async_wait([promise,
553 timer_ptr = timer.get(),
554 daq_weak = std::weak_ptr<OcmDaqController>(
555 std::static_pointer_cast<OcmDaqController>(shared_from_this()))](
556 boost::system::error_code ec) {
559 auto daq = daq_weak.lock();
564 fmt::format(
"{}: AsyncWait: Operation abandoned before completing.",
568 promise->set_exception(DaqOperationAborted(
""));
575 fmt::format(
"{}: AsyncWait: Operation timed out before completing.",
579 daq->m_timers.begin(),
581 [timer_ptr](std::unique_ptr<boost::asio::steady_timer>& val) {
582 return val.get() == timer_ptr;
584 daq->m_timers.end());
587 promise->set_exception(DaqOperationTimeout(
""));
592 m_timers.emplace_back(std::move(timer));
595 auto listener = [condition,
597 daq_weak = std::weak_ptr<OcmDaqController>(
598 std::static_pointer_cast<OcmDaqController>(shared_from_this()))](
State,
600 auto daq = daq_weak.lock();
602 LOG4CPLUS_WARN(
"daq",
"OcmDaqController deleted before await condition was fulfulled");
608 LOG4CPLUS_INFO(
daq->m_logger,
609 fmt::format(
"{}: AwaitAsync: Await condition fulfilled", *
daq));
611 promise->set_value(
daq->GetState());
620 for (
auto& source : await_on) {
622 [
daq = std::static_pointer_cast<OcmDaqController>(shared_from_this()),
626 fmt::format(
"{}: AsyncWait: Attaching listener on source '{}'.", *
daq, *s));
628 using SlotType =
typename std::remove_reference<
629 decltype(*s.get())>::type::StateSignal::slot_type;
630 s->ConnectStateListener(SlotType(std::move(listener)).track_foreign(
daq));
634 return promise->get_future();
637 std::optional<std::variant<gsl::not_null<Source<PrimSource>*>, gsl::not_null<Source<MetaSource>*>>>
638 OcmDaqController::FindSource(std::string_view source_id) {
641 std::find_if(m_prim_sources.begin(), m_prim_sources.end(), [source_id](
auto& source) {
642 return source.GetSource().GetName() == source_id;
644 if (it != m_prim_sources.end()) {
645 return gsl::not_null<Source<PrimSource>*>(&(*it));
650 std::find_if(m_meta_sources.begin(), m_meta_sources.end(), [source_id](
auto& source) {
651 return source.GetSource().GetName() == source_id;
653 if (it != m_meta_sources.end()) {
654 return gsl::not_null<Source<MetaSource>*>(&(*it));
662 template <
class SourceType>
663 std::vector<Source<SourceType>> OcmDaqController::MakeSources(std::vector<SourceType> sources) {
665 std::vector<Source<SourceType>> dest;
666 dest.reserve(sources.size());
668 std::make_move_iterator(sources.begin()),
669 std::make_move_iterator(sources.end()),
670 std::back_inserter(dest),
671 [](SourceType&& s) ->
Source<SourceType> { return Source<SourceType>{std::move(s)}; });
675 void OcmDaqController::InitiateAwaitPrimarySources() {
676 AddEvent<ActionEvent>(GetId(),
677 "DaqController::InitiateAwaitPrimarySources(): Initiating",
678 GetStatusRef().GetStatus());
679 if (m_prim_sources.empty()) {
682 fmt::format(
"{}: InitiateAwaitPrimarySources: No primary sources to monitor.", *
this));
685 LOG4CPLUS_DEBUG(m_logger,
686 fmt::format(
"{}: InitiateAwaitPrimarySources: Starting operation.", *
this));
687 auto alerts = std::make_unique<op::AlertState>();
688 auto [future, abort] = m_async_ops.await_prim(MakeAwaitParams(*alerts.get()));
689 m_abort_await_primary_sources = abort;
693 [alerts = std::move(alerts),
694 daq = std::static_pointer_cast<OcmDaqController>(shared_from_this())](
697 daq->m_abort_await_primary_sources =
nullptr;
700 auto result = fut.get();
701 LOG4CPLUS_DEBUG(
daq->m_logger,
702 fmt::format(
"{}: InitiateAwaitPrimarySources: Adding {} files from "
705 result.result.size()));
710 daq->EmitContextSignal();
713 if (!std::holds_alternative<Acquiring>(
daq->m_state)) {
717 "{}: InitiateAwaitPrimarySources: "
718 "AwaitAsync completed but another operation has already transitioned "
719 "DAQ from Acquiring so automatic stop will not be performed.",
725 "DaqController::InitiateAwaitPrimarySources(): "
726 "Primary sources completed. Performing automatic stop of metadata sources",
727 daq->GetStatusRef().GetStatus());
738 daq->StopAsync(ErrorPolicy::Strict);
742 void OcmDaqController::CancelAwaitPrimarySources() {
743 if (m_abort_await_primary_sources) {
744 LOG4CPLUS_TRACE(m_logger,
"OcmDaqController::CancelAwaitPrimarySources()");
745 m_abort_await_primary_sources();
746 m_abort_await_primary_sources =
nullptr;
750 DpmDaqController::DpmDaqController(boost::asio::io_context& io_context,
752 std::shared_ptr<ObservableStatus> status,
753 std::shared_ptr<ObservableEventLog> event_log,
754 std::shared_ptr<DpmClient> dpm_client)
755 :
CommonDaqController(io_context, std::move(context), std::move(status), std::move(event_log))
756 , m_liveness(std::make_shared<bool>(true))
757 , m_dpm_client(std::move(dpm_client))
758 , m_logger(log4cplus::Logger::getInstance(
"daq.ocm.controller")) {
759 assert(m_dpm_client);
760 UpdateStateContext();
769 m_status_connection = m_dpm_client->ConnectStatusSignal(
770 [
id =
GetId(), weak = std::weak_ptr<bool>(m_liveness),
this](
Status const& status) {
771 if (
id != status.id) {
775 auto lock = weak.lock();
778 LOG4CPLUS_DEBUG(
"daq",
"DpmDaqController is abandoned: " <<
this);
783 if (current.timestamp > status.timestamp) {
786 "Ignoring DAQ status update: From DPM: " << status <<
", current: " << current);
790 LOG4CPLUS_TRACE(m_logger, *
this <<
": Assigning new DAQ status from DPM: " << status);
792 UpdateStateContext();
797 return boost::make_exceptional_future<State>(
798 std::runtime_error(fmt::format(
"StartAsync() is invalid in state: {}",
GetState())));
802 return boost::make_exceptional_future<Status>(
803 std::runtime_error(fmt::format(
"StopAsync() is invalid in state: {}",
GetState())));
808 return boost::make_exceptional_future<State>(std::runtime_error(
809 fmt::format(
"AwaitAsync() with sources is invalid in state: {}",
GetState())));
813 throw std::runtime_error(fmt::format(
"UpdateKeywords() is invalid in state: {}",
GetState()));
821 LOG4CPLUS_TRACE(m_logger, *
this <<
": DpmDaqController::ScheduleMergeAsync");
827 return boost::make_exceptional_future<State>(std::runtime_error(
828 fmt::format(
"ScheduleMergeAsync() is invalid in state: {}",
GetState())));
831 assert(std::holds_alternative<NotScheduled>(m_state_ctx));
832 auto& ctx = std::get<NotScheduled>(m_state_ctx);
833 if (ctx.schedule_reply_pending) {
835 return boost::make_exceptional_future<State>(
836 std::logic_error(
"ScheduleMergeAsync() a request is already in flight"));
842 m_dp_spec = j.dump(2);
843 LOG4CPLUS_DEBUG(m_logger,
"Created DpSpec:\n" << *m_dp_spec);
846 return m_dpm_client->ScheduleAsync(*m_dp_spec)
848 [starting_state, weak = weak_from_this(),
this](boost::future<State> f) ->
State {
850 auto shared = weak.lock();
852 auto state = f.get();
864 status.ClearAlert(alert_id);
865 LOG4CPLUS_INFO(m_logger,
866 fmt::format(
"{}: Scheduled DAQ successfully", *
this));
868 LOG4CPLUS_DEBUG(
"daq",
"DpmDaqController is abandoned: " <<
this);
871 }
catch (elt::mal::TimeoutException
const&) {
874 LOG4CPLUS_INFO(m_logger,
875 fmt::format(
"{}: Schedule DAQ failed with timeout "
876 "(no error flag set for this condition)",
880 }
catch (std::exception
const& e) {
886 fmt::format(
"Failed to schedule DAQ for merging: {}", e.what())));
887 status.SetError(
true);
888 LOG4CPLUS_ERROR(m_logger, fmt::format(
"{}: Scheduled DAQ failed", *
this));
896 LOG4CPLUS_TRACE(m_logger, *
this <<
": DpmDaqController::AbortAsync");
902 assert(std::holds_alternative<NotScheduled>(m_state_ctx));
903 if (!std::get<NotScheduled>(m_state_ctx).schedule_reply_pending) {
905 return boost::make_ready_future<Status>(*
GetStatus());
911 return m_dpm_client->AbortAsync(
GetId()).then(
912 GetIoExecutor(), [policy, weak = weak_from_this(),
this](boost::future<State> f) ->
Status {
913 auto shared = weak.lock();
916 LOG4CPLUS_DEBUG(
"daq",
"DpmDaqController is abandoned: " <<
this);
917 throw boost::enable_current_exception(std::runtime_error(
"Operation aborted"));
920 auto result = f.get();
927 LOG4CPLUS_ERROR(m_logger,
929 <<
": Request to abort DAQ to DPM returned with "
930 "error. ErrorPolicy::Tolerant is used so DAQ is marked "
931 "as aborted anyway");
940 void DpmDaqController::UpdateStateContext() {
947 m_state_ctx = std::monostate();
951 void DpmDaqController::SetState(
State state, std::optional<bool>
error) {
955 UpdateStateContext();
Contains declaration for the AbortAsync operation.
Contains declaration for the AwaitPrimAsync operation.
Implements common behaviour of OcmDaqController and DpmDaqController.
ObservableEventLog & GetEventLogRef() noexcept
DaqContext const & GetContext() const DAQ_NOEXCEPT override
ObservableStatus & GetStatusRef() noexcept
std::shared_ptr< ObservableStatus > GetStatus() DAQ_NOEXCEPT override
bool GetErrorFlag() const DAQ_NOEXCEPT override
std::shared_ptr< ObservableEventLog > GetEventLog() DAQ_NOEXCEPT override
boost::signals2::connection ConnectContext(ContextSignal::slot_type const &slot) override
Connect observer that is invoked when context is modified.
DaqContext & GetContextMut() noexcept
std::string const & GetId() const DAQ_NOEXCEPT override
rad::IoExecutor & GetIoExecutor() noexcept
CommonDaqController(boost::asio::io_context &io_context, DaqContext context, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log)
auto MakeDpmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log) -> std::shared_ptr< DaqController > override
Create instance for the DPM phase of the DAQ process.
auto MakeOcmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log) -> std::shared_ptr< DaqController > override
Create instance for the OCM phase of the DAQ process.
DaqControllerFactoryDefault(boost::asio::io_context &io_ctx, elt::mal::Mal &m_mal, std::shared_ptr< DpmClient > dpm_client)
Controls the execution of single data acquisition that ultimately result in a set of FITS keywords an...
Data acquisition sources.
boost::future< Status > AbortAsync(ErrorPolicy policy) override
Attempts to abort Data Acquisition.
void UpdateKeywords(fits::KeywordVector const &keywords) override
boost::future< State > AwaitAsync(std::vector< std::string > sources, std::chrono::milliseconds timeout) override
boost::future< State > ScheduleMergeAsync() override
Schedules DAQ for merging by sending request to DPM.
State GetState() const DAQ_NOEXCEPT override
boost::future< Status > StopAsync(ErrorPolicy policy) override
boost::future< State > StartAsync() override
void AddEvent(EventLog::EventType event)
Records that a file has been produced for this data acquisition.
Defer signal changes until later time.
State GetState() const noexcept
Status const & GetStatus() const noexcept
Connect observer that is invoked when state is modified.
void SetState(State s, std::optional< bool > error=std::nullopt) noexcept
Set state of data acquisition.
boost::future< State > StartAsync() override
Starts the data acquisition.
constexpr log4cplus::Logger const & GetLogger() const noexcept
std::vector< Source< PrimSource > > m_prim_sources
Note: Consider vector immutable!
State GetState() const DAQ_NOEXCEPT override
op::AsyncOpParams MakeParams(op::AlertState &)
Constructs the parameters used for asynchronous operations.
static std::shared_ptr< OcmDaqController > Create(boost::asio::io_context &io_context, DaqContext context, DaqSources const &sources, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, OcmAsyncOperations operations)
Construct object.
StateVariant MakeState(State s) const noexcept
std::optional< std::variant< gsl::not_null< Source< PrimSource > * >, gsl::not_null< Source< MetaSource > * > > > FindSource(std::string_view source_id)
log4cplus::Logger m_logger
void UpdateKeywords(fits::KeywordVector const &keywords) override
Updates (replace or add) list of keywords.
std::shared_ptr< PendingReplies > m_pending_replies
std::variant< NotStarted, Starting, Acquiring, Stopping, Stopped, Aborting, Aborted > StateVariant
OcmAsyncOperations m_async_ops
std::vector< Source< MetaSource > > m_meta_sources
Note: Consider vector immutable!
boost::future< State > ScheduleMergeAsync() override
Schedules DAQ for merging by sending request to DPM.
OcmDaqController(boost::asio::io_context &io_context, DaqContext context, DaqSources const &sources, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, OcmAsyncOperations ops)
boost::future< State > AwaitAsync(std::vector< std::string > sources, std::chrono::milliseconds timeout) override
Awaits that data acquisition stops or aborts.
void CancelAwaitPrimarySources()
Cancels the awaiting of primary sources.
void AddInitialKeywords()
op::AwaitOpParams MakeAwaitParams(op::AlertState &)
void SetErrorFlag(bool error) noexcept
boost::future< Status > AbortAsync(ErrorPolicy policy) override
Aborts the data acquisition.
void SetState(StateVariant &&s) noexcept
boost::future< Status > StopAsync(ErrorPolicy policy) override
Stops the data acquisition.
Simple class that allows you to keep track of how many replies are pending.
Keeps relevant state to be able to communicate with a primary data source.
recif::RecCmdsAsync RrClient
Contains data structure for FITS keywords.
Contains error related declarations for DAQ.
Contains declarations for the helper functions to initiate operations.
constexpr std::string_view REQUEST
Request.
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
void MergeAlerts(ObservableStatus &dest, AlertState &src)
Merge alerts.
AlertId MakeAlertId(std::string_view category, std::string key)
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
void UpdateKeywords(DaqContext &ctx, fits::KeywordVector const &keywords)
Updates (adds or replaces) primary HDU keywords.
void AddDpParts(DaqContext &ctx, std::vector< DpPart > const &parts)
std::ostream & operator<<(std::ostream &os, AsyncProcessIf const &proc)
Formats proc representation in the form [<pid>] <args>
ErrorPolicy
Error policy supported by certain operations.
@ Tolerant
Errors that can be ignored with partial completion of a command will be tolerated and is reported as ...
State
Observable states of the data acquisition process.
@ NotScheduled
Before daq is acknowledged by dpm it remains in NotScheduled.
@ Aborted
Data acquisition has been aborted by user.
@ Stopping
Transitional state between Acquiring and Stopped.
@ Acquiring
All data sources have reported data acquisition is in progress.
@ Stopped
All data sources have reported they have stopped acquiring data.
@ Starting
Transitional state between NotStarted and Acquiring when sources have not begun acquiring data yet.
@ AbortingAcquiring
Transitional state for aborting during acquisition.
@ NotStarted
Initial state of data acquisition.
void to_json(nlohmann::json &j, Status const &p)
json::DpSpec MakeDataProductSpecification(DaqContext const &ctx, log4cplus::Logger &logger)
Creates a Data Product Specification as serialized JSON from the provided DaqContext.
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
Utility class that represents a result and an error.
Contains declaration for for DaqController.
Contains declaration for the StartAsync operation.
Contains declaration for the StopAsync operation.
Event related to an action being requested or performed.
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
std::function< boost::future< Result< void > >ErrorPolicy, op::AsyncOpParams)> abort
bool IsValid() const noexcept
std::function< boost::future< void >op::AsyncOpParams)> start
std::function< AwaitReturnType(op::AwaitOpParams)> await_prim
OcmAsyncOperations()
Default constructs object with standard async operations.
std::function< boost::future< Result< DpParts > >ErrorPolicy, op::AsyncOpParams)> stop
Simple class that holds the source and associated state.
Non observable status object that keeps stores status of data acquisition.
Parameters required for each async operation.
Await specific parameters that is not provided with AsyncOpParams.