ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
abort.cpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup daq_op
4 * @copyright 2022 ESO - European Southern Observatory
5 *
6 * @brief Contains definition for the AbortAsync operation
7 */
8#include <daq/op/abort.hpp>
9
10#include <Metadaqif.hpp>
11#include <fmt/format.h>
12#include <fmt/ostream.h>
13#include <log4cplus/loggingmacros.h>
14
15#include <daq/op/util.hpp>
16#include <daq/state.hpp>
17
18namespace daq::op {
19
21 : m_policy(policy), m_params(m_params) {
22}
23
24[[nodiscard]] boost::future<AbortAsync::ResultType> AbortAsync::Initiate() {
25 using boost::future;
26
27 // Sequencing is not important to abort. So we send request to both primary and metadata
28 // sources immediately instead of synchronizing them.
29 return boost::when_all(AbortMeta(), AbortPrim())
30 .then(m_params.executor,
31 [this](future<std::tuple<future<void>, future<void>>> futs) -> ResultType {
32 LOG4CPLUS_DEBUG(
33 m_params.logger,
34 fmt::format("{}: AbortAsync: AbortMeta() and AbortPrim() completed.",
35 m_params.status));
36 // future from when_all never throws according to specification.
37 auto tup = futs.get();
38 auto& prim = std::get<0>(tup);
39 auto& meta = std::get<1>(tup);
40
41 if (prim.has_value() && meta.has_value()) {
42 LOG4CPLUS_INFO(m_params.logger,
43 fmt::format("{}: AbortAsync: DAQ aborted successfully.",
44 m_params.status));
45 return {false};
46 }
47 if (m_policy == ErrorPolicy::Tolerant) {
48 // Errors are non-fatal
49 LOG4CPLUS_WARN(
50 m_params.logger,
51 fmt::format("{}: AbortAsync: Data Acquisition abort has errors that will "
52 "be ignored due to error policy.",
53 m_params.status));
54 return {true};
55 } else {
56 // Errors are fatal
57 LOG4CPLUS_ERROR(m_params.logger,
58 fmt::format("{}: AbortAsync: Data Acquisition abort failed.",
59 m_params.status));
60
61 // Either prim or meta has error, collect them:
62 std::vector<std::variant<DaqSourceError, std::exception_ptr>> errors;
63 try {
64 prim.get(); // throws
65 } catch (DaqSourceError const& e) {
66 errors.emplace_back(e);
67 } catch (...) {
68 errors.emplace_back(std::current_exception());
69 }
70 try {
71 meta.get(); // throws
72 } catch (DaqSourceError const& e) {
73 errors.emplace_back(e);
74 } catch (...) {
75 errors.emplace_back(std::current_exception());
76 }
77 LOG4CPLUS_DEBUG(m_params.logger,
78 fmt::format("{}: Throwing exception.", m_params.status));
79 throw boost::enable_current_exception(DaqSourceErrors(std::move(errors)));
80 }
81 });
82}
83
84[[nodiscard]] boost::future<void> AbortAsync::AbortMeta() {
85 return SendRequestAndCollectReplies<void>(
86 m_params.meta_sources.begin(),
87 m_params.meta_sources.end(),
88 [&](Source<MetaSource>& s) {
89 return s.GetState() != State::Aborted && s.GetState() != State::Stopped;
90 },
91 m_params,
92 // Sender
93 [id = m_params.id](Source<MetaSource>& s) {
94 s.SetState(State::AbortingAcquiring);
95 return s.GetSource().GetRrClient().AbortDaq(id);
96 },
97 // reply handler
98 [](AsyncOpParams m_params,
99 Source<MetaSource>& source,
100 boost::future<std::shared_ptr<metadaqif::DaqReply>>&& fut) -> void {
101 HandleMetaDaqReply("AbortDaq",
102 State::AbortingAcquiring,
103 State::Aborted,
104 {},
105 m_params,
106 source,
107 std::move(fut));
108 },
109 std::string_view("AbortAsync: abort metadata acquisition"))
110 .then(m_params.executor, UnwrapVoidReplies); // @note: UnwrapVoidReplies is thread safe
111}
112
113[[nodiscard]] boost::future<void> AbortAsync::AbortPrim() {
114 return SendRequestAndCollectReplies<void>(
115 m_params.prim_sources.begin(),
116 m_params.prim_sources.end(),
117 [&](Source<PrimSource>& s) { return !IsFinalState(s.GetState()); },
118 m_params,
119 // Sender
120 [](Source<PrimSource>& s) {
121 s.SetState(State::AbortingAcquiring);
122 // id is not supportd by interface
123 return s.GetSource().GetRrClient().RecAbort();
124 },
125 // reply handler
126 [](AsyncOpParams m_params,
127 Source<PrimSource>& source,
128 boost::future<std::string>&& fut) -> void {
129 HandlePrimDaqReply("RecAbort",
130 State::AbortingAcquiring,
131 State::Aborted,
132 {},
133 m_params,
134 source,
135 std::move(fut));
136 },
137 std::string_view("AbortAsync: abort primary data acquisition"))
138 .then(m_params.executor, UnwrapVoidReplies); // @note: UnwrapVoidReplies is thread safe
139}
140
141} // namespace daq::op
Contains declaration for the AbortAsync operation.
Represents error in single source.
Definition: error.hpp:68
Exception thrown to carry reply errors.
Definition: error.hpp:85
Declares daq::State and related functions.
ErrorPolicy
Error policy supported by certain operations.
Definition: error.hpp:26
@ Tolerant
Errors that can be ignored with partial completion of a command will be tolerated and is reported as ...
Simple class that holds the source and associated state.
Definition: source.hpp:30
AbortAsync(ErrorPolicy policy, AsyncOpParams params) noexcept
Definition: abort.cpp:20
boost::future< ResultType > Initiate()
Initiates operation that stats metadata acquisition.
Definition: abort.cpp:24
Parameters required for each async operation.
rad::IoExecutor & executor
ObservableStatus & status
Async operations should not modify state directly DaqController does that.
log4cplus::Logger & logger
Contains declaration for the async op utilities.