ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
awaitPrim.hpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup daq_common_libdaq
4 * @copyright 2022 ESO - European Southern Observatory
5 *
6 * @brief Contains declaration for the AwaitPrimAsync operation
7 */
8#ifndef OCM_DAQ_OP_AWAIT_HPP_
9#define OCM_DAQ_OP_AWAIT_HPP_
10#include <daq/config.hpp>
11
12#include <boost/asio/steady_timer.hpp>
13#include <boost/thread/future.hpp>
14
15#include <daq/dpPart.hpp>
17#include <daq/utility.hpp>
18
19namespace daq::op {
20
21/**
22 * A composite async operation that awaits primary data sources.
23 *
24 * This is used by OCM to await primary data sources completion.
25 * Since sources may acquire data for a long time the operation is implemented by
26 * sending await commands to client with a smaller timeout, and then resending request
27 * on timeout. This is done to be able to have a reasonable MAL timeout to detect network related
28 * issues.
29 *
30 * Notes
31 * -----
32 *
33 * The await operation returns files produced from awaited-on sources (using DpParts result).
34 *
35 * Await requests are ony sent to source if observed state requires it (i.e. if source is not
36 * already observed to be stopped, from a previous AwaitPrimAsync operation).
37 *
38 * Await operation is completed when any of the following conditions are fulfilled:
39 *
40 * - All sources reply that recording is complete.
41 * - Any source reply with fatal error.
42 * - Operation is aborted.
43 *
44 * The operation does not allow configurable error policy but behaves as if ErrorPolicy::Robust
45 * is set. The following are condidered fatal errors (causing result of operation to be an
46 * exception):
47 * - Internal errors.
48 *
49 * @todo Do not assume commands are honoring the timeout parameter. Instead keep track of the send
50 * interval. Otherwise a malfunctioning subsystem that immediately returns exception will be spammed
51 * with requests.
52 * @ingroup daq_common_libdaq
53 */
55public:
56 /**
57 * Constructs operation with the privided parameters.
58 *
59 * @param params parameters.
60 */
61 explicit AwaitPrimAsync(AwaitOpParams params) noexcept;
62
63 /**
64 * Initiates operation that await acquisition completion.
65 *
66 * @note Caller is responsible for keeping object alive until
67 * result is set.
68 */
69 [[nodiscard]] boost::future<Result<DpParts>> Initiate();
70
71 /**
72 * Aborts the operation.
73 */
74 void Abort() noexcept;
75
76private:
77 /**
78 * Initiates the await operation that will eventually set the m_promise value or exception
79 * and returns.
80 */
81 void InitiateAwait();
82 /**
83 * Send request to all incomplete primary sources and wait for reply.
84 *
85 * @returns future set when all sources reply.
86 * - True means all awaited-on sources have completed.
87 * - False if one or more sources are not completed.
88 * - Contains exception on fatal error.
89 *
90 * Source state are updated using status from reply, including produced files.
91 */
92 boost::future<bool> AwaitOnceAsync();
93 /**
94 * Callback for each reply that
95 * - updates source state,
96 * - adds data products
97 *
98 * @returns true if RecWait completed successfully (data acquisition is complete)
99 * @returns false if RecWait timed out (data acquisition is still ongoing)
100 * @throws DaqSourceError if RecWait contains exception.
101 */
102 bool HandleRecWaitReply(Source<PrimSource>& source,
103 boost::future<std::shared_ptr<recif::RecWaitStatus>>&& fut);
104
105 /**
106 * Returns future with value set when requests should be sent.
107 */
108 boost::future<void> MakeInterval();
109 AwaitOpParams m_params;
110
111 struct IntervalTimer {
112 IntervalTimer(boost::asio::io_context& io_ctx, std::chrono::steady_clock::time_point next)
113 : timer(io_ctx, next), promise() {
114 }
115 ~IntervalTimer() {
116 timer.cancel();
117 }
118 boost::asio::steady_timer timer;
119 boost::promise<void> promise;
120 };
121 /**
122 * Indicates if it completed with error.
123 */
124 bool m_error;
125
126 /**
127 * Holds result from data sources.
128 */
129 DpParts m_parts;
130
131 /**
132 * Indicates whether operation was aborted or not.
133 *
134 * If this is true it means that m_promise has already been fulfilled.
135 */
136 bool m_abort_requested;
137
138 /**
139 * Promise for future returned from `Initiate()`
140 */
141 boost::promise<Result<DpParts>> m_promise;
142 /**
143 * Time of last InitiateAwait
144 */
145 std::optional<IntervalTimer> m_interval;
146 std::optional<std::chrono::steady_clock::time_point> m_last_start;
147};
148
149} // namespace daq::op
150#endif // #ifndef OCM_DAQ_OP_AWAIT_HPP_
Contains declaration for DpPart.
std::vector< DpPart > DpParts
Definition: dpPart.hpp:66
Simple class that holds the source and associated state.
Definition: source.hpp:30
Await specific parameters that is not provided with AsyncOpParams.
A composite async operation that awaits primary data sources.
Definition: awaitPrim.hpp:54
boost::future< Result< DpParts > > Initiate()
Initiates operation that await acquisition completion.
Definition: awaitPrim.cpp:105
void Abort() noexcept
Aborts the operation.
Definition: awaitPrim.cpp:182
Declaration of utilities.