10#include <Metadaqif.hpp>
11#include <fmt/format.h>
12#include <fmt/ostream.h>
13#include <log4cplus/loggingmacros.h>
23 : m_policy(policy), m_params(m_params), m_error(
false) {
28 using boost::make_exceptional_future;
33 [
this](future<void> prim_result) -> future<void> {
35 if (prim_result.has_exception()) {
37 if (m_policy == ErrorPolicy::Strict) {
38 LOG4CPLUS_INFO(m_params.logger,
39 fmt::format(
"{}: StopAsync: primary daq "
40 "failed. Will not stop metadata acquisition.",
45 return make_exceptional_future<void>(prim_result.get_exception_ptr());
47 LOG4CPLUS_INFO(m_params.logger,
48 fmt::format(
"{}: StopAsync: primary daq "
49 "failed. Ignoring this because of "
50 "ErrorPolicy::Tolerant.",
57 if (res.has_exception()) {
59 if (m_policy == ErrorPolicy::Strict) {
60 LOG4CPLUS_ERROR(m_params.logger,
61 fmt::format(
"{}: StopAsync: stopping failed", m_params.status));
64 LOG4CPLUS_ERROR(m_params.logger,
65 fmt::format(
"{}: StopAsync: meta daq "
66 "failed. Ignoring this because of "
67 "ErrorPolicy::Tolerant.",
70 return {m_error, std::move(m_parts)};
74boost::future<void> StopAsync::StopMeta() {
75 return SendRequestAndCollectReplies<void>(
76 m_params.meta_sources.begin(),
77 m_params.meta_sources.end(),
80 return IsSubsequentState(State::Stopped, source.GetState());
84 [
id = m_params.id](Source<MetaSource>& s) {
85 s.SetState(State::Stopping);
86 return s.GetSource().GetRrClient().StopDaq(id);
89 [
this](AsyncOpParams params,
90 Source<MetaSource>& source,
91 boost::future<std::shared_ptr<metadaqif::DaqStopReply>>&& fut) ->
void {
92 if (source.GetState() == State::Stopped) {
93 LOG4CPLUS_INFO(params.logger,
94 fmt::format(
"{}: StopMeta: Source already stopped, ignoring "
108 [&](std::shared_ptr<metadaqif::DaqStopReply>
const& rep) ->
void {
113 std::string keywords = rep->getKeywords();
114 if (!keywords.empty()) {
116 fits::KeywordVector keyword_vec;
117 UpdateKeywords(keyword_vec,
118 fits::ParseJsonKeywords(keywords.c_str()),
119 m_params.kw_formatter);
120 m_parts.emplace_back(std::string(source.GetSource().GetName()),
121 std::move(keyword_vec));
123 for (
auto const& file : rep->getFiles()) {
124 m_parts.emplace_back(std::string(source.GetSource().GetName()),
128 if (!reply.has_value()) {
132 std::string_view(
"StopAsync: stop metadata acquisition"))
133 .then(UnwrapVoidReplies);
136boost::future<void> StopAsync::StopPrim() {
137 return SendRequestAndCollectReplies<void>(
138 m_params.prim_sources.begin(),
139 m_params.prim_sources.end(),
140 [](Source<PrimSource>
const& source) ->
bool {
142 return IsSubsequentState(State::Stopped, source.GetState());
146 [](Source<PrimSource>& s) {
148 s.SetState(State::Stopping);
149 return s.GetSource().GetRrClient().RecStop();
152 [
this](AsyncOpParams params,
153 Source<PrimSource>& source,
154 boost::future<std::shared_ptr<recif::RecStatus>>&& fut) ->
void {
155 auto reply = HandlePrimDaqReply(
"RecStop",
162 if (!reply.has_value()) {
166 for (
auto const& file : (**reply).getDpFiles()) {
167 m_parts.emplace_back(std::string(source.GetSource().GetName()), file);
170 std::string_view(
"StopAsync: stop primary data acquisition"))
171 .then(UnwrapVoidReplies);
Contains data structure for FITS keywords.
Contains declaration of daq::Context.
Declares daq::State and related functions.
std::optional< ReplyType > HandleMetaDaqReply(char const *request, std::optional< State > expected_state, State success_state, std::optional< State > error_state, AsyncOpParams params, Source< MetaSource > &source, boost::future< ReplyType > &&fut, std::function< void(ReplyType const &)> func={})
Reply handler that checks for exceptions in reply.
ErrorPolicy
Error policy supported by certain operations.
@ Stopping
Transitional state between Acquiring and Stopped.
@ Stopped
All data sources have reported they have stopped acquiring data.
Utility class that represents a result and an error.
Contains declaration for the StopAsync operation.
Parameters required for each async operation.
rad::IoExecutor & executor
boost::future< Result< DpParts > > Initiate()
Initiates operation that stop metadata acquisition.
StopAsync(ErrorPolicy policy, AsyncOpParams params) noexcept
Contains declaration for the async op utilities.