8#ifndef OCF_DAQ_DAQ_CONTROLLER_HPP_
9#define OCF_DAQ_DAQ_CONTROLLER_HPP_
20#include <Metadaqif.hpp>
21#include <boost/asio/io_context.hpp>
22#include <boost/asio/steady_timer.hpp>
23#include <boost/thread/future.hpp>
24#include <fmt/ostream.h>
25#include <log4cplus/logger.h>
26#include <nlohmann/json.hpp>
53 using AwaitReturnType = std::pair<boost::future<Result<DpParts>>, std::function<bool()>>;
68 std::function<boost::future<
void>(op::AsyncOpParams)>
start;
87 std::shared_ptr<ObservableStatus> status,
88 std::shared_ptr<ObservableEventLog> event_log,
94 std::shared_ptr<ObservableStatus> status,
95 std::shared_ptr<ObservableEventLog> event_log)
96 -> std::shared_ptr<DaqController> = 0;
105 elt::mal::Mal& m_mal,
106 std::shared_ptr<DpmClient> dpm_client);
108 std::shared_ptr<ObservableStatus> status,
109 std::shared_ptr<ObservableEventLog> event_log,
111 -> std::shared_ptr<DaqController>
override;
113 std::shared_ptr<ObservableStatus> status,
114 std::shared_ptr<ObservableEventLog> event_log)
115 -> std::shared_ptr<DaqController>
override;
118 boost::asio::io_context& m_io_ctx;
120 elt::mal::Mal& m_mal;
122 std::shared_ptr<DpmClient> m_dpm_client;
290 virtual boost::future<State>
291 AwaitAsync(std::vector<std::string> sources, std::chrono::milliseconds timeout) = 0;
329 virtual boost::signals2::connection ConnectContext(
ContextSignal::slot_type const& slot) = 0;
342 std::shared_ptr<ObservableStatus> status,
343 std::shared_ptr<ObservableEventLog> event_log);
345 std::shared_ptr<ObservableStatus> GetStatus()
DAQ_NOEXCEPT override;
346 std::shared_ptr<ObservableStatus const> GetStatus()
const DAQ_NOEXCEPT override;
347 std::shared_ptr<ObservableEventLog> GetEventLog()
DAQ_NOEXCEPT override;
351 boost::signals2::connection ConnectContext(ContextSignal::slot_type
const& slot)
override;
354 template <
class T,
class... Args>
356 m_event_log->EmplaceEvent<T>(std::forward<Args>(args)...);
368 return *m_event_log.get();
371 return *m_status.get();
374 return *m_status.get();
377 m_sig_context(m_context);
381 boost::asio::io_context& m_io_ctx;
384 ContextSignal m_sig_context;
385 std::shared_ptr<ObservableStatus> m_status;
386 std::shared_ptr<ObservableEventLog> m_event_log;
412 static std::shared_ptr<OcmDaqController> Create(boost::asio::io_context& io_context,
416 std::shared_ptr<ObservableStatus> status,
417 std::shared_ptr<ObservableEventLog> event_log,
420 boost::future<State> StartAsync()
override;
421 boost::future<Status> StopAsync(
ErrorPolicy policy)
override;
422 boost::future<Status> AbortAsync(
ErrorPolicy policy)
override;
423 boost::future<State> ScheduleMergeAsync()
override;
426 AwaitAsync(std::vector<std::string> sources, std::chrono::milliseconds timeout)
override;
433 constexpr log4cplus::Logger
const& GetLogger()
const noexcept;
445 std::variant<NotStarted, Starting, Acquiring, Stopping, Stopped, Aborting, Aborted>;
456 std::shared_ptr<ObservableStatus> status,
457 std::shared_ptr<ObservableEventLog> event_log,
480 void InitiateStopCondition();
486 void CancelAwaitPrimarySources();
488 void AddInitialKeywords();
496 auto HasStatefulSources() noexcept ->
bool;
501 void SetAlert(std::
string key, std::
string description) noexcept;
505 FindSource(std::string_view source_id);
508 template <class SourceType>
509 std::vector<
Source<SourceType>> MakeSources(std::vector<SourceType> sources);
519 std::vector<std::unique_ptr<boost::asio::steady_timer>> m_timers;
527 std::function<
bool()> m_abort_await_primary_sources;
528 log4cplus::Logger m_logger;
563 std::shared_ptr<ObservableStatus> status,
564 std::shared_ptr<ObservableEventLog> event_log,
565 std::shared_ptr<DpmClient> dpm_client);
567 boost::future<State> ScheduleMergeAsync()
override;
572 boost::future<State> StartAsync()
override;
576 boost::future<Status> StopAsync(
ErrorPolicy policy)
override;
588 AwaitAsync(std::vector<std::string> sources, std::chrono::milliseconds timeout)
override;
600 boost::future<Status> AbortAsync(
ErrorPolicy policy)
override;
612 bool schedule_reply_pending;
618 void UpdateStateContext();
619 void SetState(
State state);
621 using StateVariant = std::variant<std::monostate, NotScheduled>;
626 std::shared_ptr<bool> m_liveness;
627 std::shared_ptr<DpmClient> m_dpm_client;
631 boost::signals2::scoped_connection m_status_connection;
635 std::optional<std::string> m_dp_spec;
636 StateVariant m_state_ctx;
637 log4cplus::Logger m_logger;
643struct fmt::formatter<
daq::DaqController> : ostream_formatter {};
645template <
typename T,
typename Char>
646struct fmt::formatter<T, Char, std::enable_if_t<std::is_convertible_v<T*, daq::DaqController*>>>
647 : ostream_formatter {};
Implements common behaviour of OcmDaqController and DpmDaqController.
ObservableStatus & GetStatusRef() noexcept
void AddEvent(Args &&... args)
ObservableStatus const & GetStatusRef() const noexcept
boost::asio::io_context & GetIoCtx() noexcept
rad::IoExecutor & GetIoExecutor() noexcept
DaqContext & GetContextMut() noexcept
ObservableEventLog & GetEventLogRef() noexcept
Default factory producing "real" implementations.
Abstract factory for DaqControllers.
virtual auto MakeDpmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log) -> std::shared_ptr< DaqController >=0
Create instance for the DPM phase of the DAQ process.
virtual auto MakeOcmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, fits::KeywordFormatter const &kw_formatter) -> std::shared_ptr< DaqController >=0
Create instance for the OCM phase of the DAQ process.
Controls the execution of single data acquisition that ultimately result in a set of FITS keywords an...
virtual boost::future< State > StartAsync()=0
Starts the data acquisition.
virtual boost::future< Status > AbortAsync(ErrorPolicy policy)=0
Aborts the data acquisition.
virtual boost::future< Status > StopAsync(ErrorPolicy policy)=0
Stops the data acquisition.
virtual void UpdateKeywords(fits::KeywordVector const &keywords)=0
Updates (replace or add) list of keywords.
virtual boost::future< State > ScheduleMergeAsync()=0
Schedules DAQ for merging by sending request to DPM.
virtual ~DaqController()=default
virtual boost::future< State > AwaitAsync(std::vector< std::string > sources, std::chrono::milliseconds timeout)=0
Awaits that data acquisition stops or aborts.
boost::signals2::signal< void(DaqContext const &)> ContextSignal
virtual State GetState() const DAQ_NOEXCEPT=0
Data acquisition sources.
Implements behaviour from the state NotScheduled to Completed.
Stores data acquisition status and allows subscription to status changes.
Stores data acquisition status and allows subscription to status changes.
Implements daq::DaqController for states responsible to be executed by OCM.
std::variant< NotStarted, Starting, Acquiring, Stopping, Stopped, Aborting, Aborted > StateVariant
Simple class that allows you to keep track of how many replies are pending.
Keeps relevant state to be able to communicate with a primary data source.
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Contains declaration of daq::Context.
Contains declaration for DpPart.
Contains error related declarations for DAQ.
Contains declaration for EventLog, ObservableEventLog and related events.
Contains declarations for the helper functions to initiate operations.
Declares daq::State and related functions.
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
void UpdateKeywords(fits::KeywordVector &out, fits::KeywordVector const &in, fits::KeywordFormatter const &fmt)
Updates (adds or replaces) primary HDU keywords.
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
std::vector< DpPart > DpParts
ErrorPolicy
Error policy supported by certain operations.
State
Observable states of the data acquisition process.
@ NotScheduled
Before daq is acknowledged by dpm it remains in NotScheduled.
Utility class that represents a result and an error.
Config class header file.
Contains declaration for classes related to pending replies.
Declarations for daq::Source and related classes.
Contains declaration for Status and ObservableStatus.
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
bool IsValid() const noexcept
OcmAsyncOperations & operator=(OcmAsyncOperations const &)=default
std::function< boost::future< Result< void > >(ErrorPolicy, op::AsyncOpParams)> abort
std::reference_wrapper< rad::IoExecutor > executor
std::function< AwaitReturnType(op::AwaitOpParams)> await_prim
std::function< boost::future< void >(op::AsyncOpParams)> start
OcmAsyncOperations(OcmAsyncOperations &&)=default
std::pair< boost::future< Result< DpParts > >, std::function< bool()> > AwaitReturnType
OcmAsyncOperations(OcmAsyncOperations const &)=default
OcmAsyncOperations & operator=(OcmAsyncOperations &&)=default
std::function< boost::future< Result< DpParts > >(ErrorPolicy, op::AsyncOpParams)> stop
Simple class that holds the source and associated state.
Parameters required for each async operation.
Await specific parameters that is not provided with AsyncOpParams.
Declaration of utilities.