8#ifndef OCF_DAQ_OP_UTIL_HPP_
9#define OCF_DAQ_OP_UTIL_HPP_
10#include "../config.hpp"
17#include <Metadaqif.hpp>
18#include <boost/thread/future.hpp>
19#include <fmt/format.h>
20#include <fmt/ostream.h>
21#include <log4cplus/logger.h>
22#include <log4cplus/loggingmacros.h>
24#include "../error.hpp"
25#include "../state.hpp"
30template <
class SourceType>
34 fmt::format(!
error ?
"{}: Got reply from '{}'" :
"{}: Got error or timeout from '{}'",
39template <
class SourceType>
43 fmt::format(
"{}: Sending request to'{}'", description, source),
49 return std::count_if(futures.begin(), futures.end(), [](F
const& future) noexcept ->
bool {
50 return future.has_exception();
84template <
class R,
class Iterator,
class Pred,
class Sender,
class ReplyHandler>
85boost::future<std::vector<boost::future<R>>>
91 ReplyHandler reply_handler,
92 std::string_view logging_description);
96 std::vector<std::exception_ptr> exceptions;
97 for (
auto& f : futures) {
100 }
catch (std::exception
const& e) {
101 LOG4CPLUS_DEBUG(
"TEST",
"E:" << e.what());
102 exceptions.push_back(std::current_exception());
123std::vector<R>
UnwrapReplies(boost::future<std::vector<boost::future<R>>>&& futures) {
144template <
class ReplyType>
146 std::optional<State> expected_state,
148 std::optional<State> error_state,
149 AsyncOpParams params,
151 boost::future<ReplyType>&& fut,
152 std::function<
void(ReplyType
const&)> func = {});
169template <
class ReplyType>
171 std::optional<State> expected_state,
173 std::optional<State> error_state,
174 AsyncOpParams params,
175 Source<PrimSource>& source,
176 boost::future<ReplyType>&& fut);
179template <
class R,
class Iterator,
class Pred,
class Sender,
class ReplyHandler>
180boost::future<std::vector<boost::future<R>>>
186 ReplyHandler reply_handler,
187 std::string_view logging_description) {
188 LOG4CPLUS_INFO(params.
logger,
189 fmt::format(
"{}: {}: Sending requests.", params.
status, logging_description));
190 using Sourceype =
typename Iterator::value_type;
192 std::vector<boost::future<R>> replies;
197 auto send_func = [&, sender = std::move(sender), descr = std::string(logging_description)](
198 Sourceype& source) -> boost::future<R> {
207 std::string(source.
GetSource().GetName()), std::string(logging_description)),
208 reply_handler = std::move(reply_handler)](
auto fut)
mutable -> R {
211 reply_token.Release();
212 return std::invoke(reply_handler, params, source, std::move(fut));
219 std::invoke(std::move(sender), source).then(params.
executor, std::move(cont));
221 }
catch (std::exception
const& e) {
228 LOG4CPLUS_ERROR(params.
logger,
229 fmt::format(
"{}: {}: "
230 "Error sending request to source '{}': {}",
235 return boost::make_exceptional_future<R>();
240 for (
auto it = begin; it != end; ++it) {
244 if (!filter_pred(*it)) {
248 replies.emplace_back(send_func(*it));
254 "{}: {}: Awaiting {} replies.", params.
status, logging_description, replies.size()));
258 return boost::when_all(replies.begin(), replies.end())
260 [params, descr = std::string(logging_description)](
261 boost::future<std::vector<boost::future<R>>>&& replies_fut)
262 -> std::vector<boost::future<R>> {
263 auto replies = replies_fut.get();
266 fmt::format(
"{}: {}: All replies received or timed out (num errors {}/{})",
269 eventlog::CountExceptions(replies),
void AddEvent(EventLog::EventType event)
Records that a file has been produced for this data acquisition.
Status const & GetStatus() const noexcept
Connect observer that is invoked when state is modified.
ReplyToken Acquire(std::string source_id, std::string request)
Acquire token.
void Reply(AsyncOpParams ¶ms, SourceType const &source, std::string description, bool error)
size_t CountExceptions(std::vector< F > const &futures) noexcept
void Request(AsyncOpParams ¶ms, SourceType const &source, std::string description)
std::vector< R > UnwrapReplies(boost::future< std::vector< boost::future< R > > > &&futures)
Unwrap replies.
std::optional< ReplyType > HandlePrimDaqReply(char const *request, std::optional< State > expected_state, State success_state, std::optional< State > error_state, AsyncOpParams params, Source< PrimSource > &source, boost::future< ReplyType > &&fut)
Reply handler that checks for exceptions in reply.
std::vector< std::exception_ptr > ExtractExceptions(std::vector< boost::future< T > > &futures)
boost::future< std::vector< boost::future< R > > > SendRequestAndCollectReplies(Iterator begin, Iterator end, Pred filter_pred, AsyncOpParams params, Sender sender, ReplyHandler reply_handler, std::string_view logging_description)
Utility function to Send requests and collect replies.
void UnwrapVoidReplies(boost::future< std::vector< boost::future< void > > > futures)
Unwrap futures to extract errors.
std::optional< ReplyType > HandleMetaDaqReply(char const *request, std::optional< State > expected_state, State success_state, std::optional< State > error_state, AsyncOpParams params, Source< MetaSource > &source, boost::future< ReplyType > &&fut, std::function< void(ReplyType const &)> func={})
Reply handler that checks for exceptions in reply.
State
Observable states of the data acquisition process.
Represents a generic event if a more specific event is not usable.
Simple class that holds the source and associated state.
Parameters required for each async operation.
ObservableEventLog & event_log
PendingReplies & pending_replies
rad::IoExecutor & executor
ObservableStatus & status
Async operations should not modify state directly DaqController does that.
log4cplus::Logger & logger