ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
stop.cpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup daq_op
4 * @copyright 2022 ESO - European Southern Observatory
5 *
6 * @brief Contains definition for the StopAsync operation
7 */
8#include <daq/op/stop.hpp>
9
10#include <Metadaqif.hpp>
11#include <fmt/format.h>
12#include <fmt/ostream.h>
13#include <log4cplus/loggingmacros.h>
14
15#include <daq/daqContext.hpp>
16#include <daq/fits/json.hpp>
17#include <daq/op/util.hpp>
18#include <daq/state.hpp>
19
20namespace daq::op {
21
23 : m_policy(policy), m_params(m_params), m_error(false) {
24}
25
26[[nodiscard]] boost::future<Result<DpParts>> StopAsync::Initiate() {
27 using boost::future;
28 using boost::make_exceptional_future;
29
30 // stop primary then metadata.
31 return StopPrim()
32 .then(m_params.executor,
33 [this](future<void> prim_result) -> future<void> {
34 // Simply propagate errors, if any
35 if (prim_result.has_exception()) {
36 m_error = true;
37 if (m_policy == ErrorPolicy::Strict) {
38 LOG4CPLUS_INFO(m_params.logger,
39 fmt::format("{}: StopAsync: primary daq "
40 "failed. Will not stop metadata acquisition.",
41 m_params.status));
42 // Note: Can't throw directly here as we'll unwrap which then
43 // discards that exception -> simply propagate future containing
44 // error instead.
45 return make_exceptional_future<void>(prim_result.get_exception_ptr());
46 }
47 LOG4CPLUS_INFO(m_params.logger,
48 fmt::format("{}: StopAsync: primary daq "
49 "failed. Ignoring this because of "
50 "ErrorPolicy::Tolerant.",
51 m_params.status));
52 }
53 return StopMeta();
54 })
55 .unwrap() // transform future<future<T>> to future<T>
56 .then([this](future<void> res) -> Result<DpParts> {
57 if (res.has_exception()) {
58 m_error = true;
59 if (m_policy == ErrorPolicy::Strict) {
60 LOG4CPLUS_ERROR(m_params.logger,
61 fmt::format("{}: StopAsync: stopping failed", m_params.status));
62 (void)res.get(); // throws
63 }
64 LOG4CPLUS_ERROR(m_params.logger,
65 fmt::format("{}: StopAsync: meta daq "
66 "failed. Ignoring this because of "
67 "ErrorPolicy::Tolerant.",
68 m_params.status));
69 }
70 return {m_error, std::move(m_parts)};
71 });
72}
73
74boost::future<void> StopAsync::StopMeta() {
75 return SendRequestAndCollectReplies<void>(
76 m_params.meta_sources.begin(),
77 m_params.meta_sources.end(),
78 [](auto& source) {
79 // Only send requests to sources that are not stopped.
80 return IsSubsequentState(State::Stopped, source.GetState());
81 },
82 m_params,
83 // Sender
84 [id = m_params.id](Source<MetaSource>& s) {
85 s.SetState(State::Stopping);
86 return s.GetSource().GetRrClient().StopDaq(id);
87 },
88 // reply handler (note that caller must keep this alive until future is set)
89 [this](AsyncOpParams params,
90 Source<MetaSource>& source,
91 boost::future<std::shared_ptr<metadaqif::DaqStopReply>>&& fut) -> void {
92 if (source.GetState() == State::Stopped) {
93 LOG4CPLUS_INFO(params.logger,
94 fmt::format("{}: StopMeta: Source already stopped, ignoring "
95 "reply.",
96 params.status));
97 return;
98 }
99 auto reply = HandleMetaDaqReply(
100 "StopDaq",
103 std::nullopt,
104 params,
105 source,
106 std::move(fut),
107 std::function(
108 [&](std::shared_ptr<metadaqif::DaqStopReply> const& rep) -> void {
109 // Add files and keywords from reply. As reply was received we set
110 // source state to Stopped to prevent possiblity of sending request
111 // again.
112 source.SetState(State::Stopped);
113 std::string keywords = rep->getKeywords();
114 if (!keywords.empty()) {
115 // Decode & validate
116 fits::KeywordVector keyword_vec;
117 UpdateKeywords(keyword_vec,
118 fits::ParseJsonKeywords(keywords.c_str()),
119 m_params.kw_formatter);
120 m_parts.emplace_back(std::string(source.GetSource().GetName()),
121 std::move(keyword_vec));
122 }
123 for (auto const& file : rep->getFiles()) {
124 m_parts.emplace_back(std::string(source.GetSource().GetName()),
125 file);
126 }
127 }));
128 if (!reply.has_value()) {
129 return;
130 }
131 },
132 std::string_view("StopAsync: stop metadata acquisition"))
133 .then(UnwrapVoidReplies);
134}
135
136boost::future<void> StopAsync::StopPrim() {
137 return SendRequestAndCollectReplies<void>(
138 m_params.prim_sources.begin(),
139 m_params.prim_sources.end(),
140 [](Source<PrimSource> const& source) -> bool {
141 /* only send to sources that are not already stopped */
142 return IsSubsequentState(State::Stopped, source.GetState());
143 },
144 m_params,
145 // Sender
146 [](Source<PrimSource>& s) {
147 // id is not supported by recif
148 s.SetState(State::Stopping);
149 return s.GetSource().GetRrClient().RecStop();
150 },
151 // reply handler
152 [this](AsyncOpParams params,
153 Source<PrimSource>& source,
154 boost::future<std::shared_ptr<recif::RecStatus>>&& fut) -> void {
155 auto reply = HandlePrimDaqReply("RecStop",
156 State::Stopping,
157 State::Stopped,
158 State::Stopping,
159 params,
160 source,
161 std::move(fut));
162 if (!reply.has_value()) {
163 return;
164 }
165 // Add files and keywords from reply:
166 for (auto const& file : (**reply).getDpFiles()) {
167 m_parts.emplace_back(std::string(source.GetSource().GetName()), file);
168 }
169 },
170 std::string_view("StopAsync: stop primary data acquisition"))
171 .then(UnwrapVoidReplies);
172}
173
174} // namespace daq::op
Contains data structure for FITS keywords.
Contains declaration of daq::Context.
Declares daq::State and related functions.
std::optional< ReplyType > HandleMetaDaqReply(char const *request, std::optional< State > expected_state, State success_state, std::optional< State > error_state, AsyncOpParams params, Source< MetaSource > &source, boost::future< ReplyType > &&fut, std::function< void(ReplyType const &)> func={})
Reply handler that checks for exceptions in reply.
Definition: util.cpp:70
ErrorPolicy
Error policy supported by certain operations.
Definition: error.hpp:26
@ Stopping
Transitional state between Acquiring and Stopped.
@ Stopped
All data sources have reported they have stopped acquiring data.
Utility class that represents a result and an error.
Definition: utility.hpp:17
Contains declaration for the StopAsync operation.
Parameters required for each async operation.
rad::IoExecutor & executor
boost::future< Result< DpParts > > Initiate()
Initiates operation that stop metadata acquisition.
Definition: stop.cpp:26
StopAsync(ErrorPolicy policy, AsyncOpParams params) noexcept
Definition: stop.cpp:22
Contains declaration for the async op utilities.