12#include <fmt/format.h>
13#include <fmt/ostream.h>
14#include <log4cplus/loggingmacros.h>
29bool AwaitUnwrapReplies(boost::future<std::vector<boost::future<bool>>>&& futures) {
30 std::vector<boost::future<bool>> values = futures.get();
35 std::vector<std::exception_ptr> exceptions;
36 std::size_t num_ok = 0;
37 std::size_t num_exceptions = 0;
38 for (
auto& f : values) {
43 }
catch (std::exception
const& e) {
47 fmt::format(
"daq::op::AwaitUnwrapReplies: Source replied with exception: {}",
53 fmt::format(
"daq::op::AwaitUnwrapReplies: {}/{} replies contained exceptions. {}/{} "
54 "replies report success",
60 return num_ok == values.size();
63class RecWaitSpec :
public recif::RecWaitSpec {
65 RecWaitSpec(
float timeout) : m_timeout{timeout}, m_info{} {
67 std::string getInfo()
const override {
70 void setInfo(
const std::string& info)
override {
74 float getTimeout()
const override {
77 void setTimeout(
float timeout)
override {
80 bool hasKey()
const override {
83 std::unique_ptr<recif::RecWaitSpec> cloneKey()
const override {
84 throw std::runtime_error(
"not clonable");
86 std::unique_ptr<recif::RecWaitSpec> clone()
const override {
87 throw std::runtime_error(
"not clonable");
89 bool keyEquals(
const recif::RecWaitSpec& other)
const override {
102 : m_params(m_params), m_error(
false), m_parts(), m_abort_requested(
false) {
107 fmt::format(
"AwaitPrimAsync::Initiate: Operation initiating"));
109 return m_promise.get_future();
112void AwaitPrimAsync::InitiateAwait() {
116 .then(m_params.
common.
executor, [
this](
auto) { return AwaitOnceAsync(); })
118 .then(m_params.
common.
executor, [
this](boost::future<bool> fut) ->
void {
119 if (m_abort_requested) {
121 m_params.common.logger,
122 "AwaitPrimAsync::InitiateAwait: Operation aborted -> setting promise");
124 m_promise.set_value({m_error, std::move(m_parts)});
125 } catch (boost::promise_already_satisfied const&) {
126 LOG4CPLUS_DEBUG(m_params.common.logger,
127 "AwaitPrimAsync::InitiateAwait: Promise already satisfied");
134 auto is_ok = fut.get();
137 m_promise.set_value({m_error, std::move(m_parts)});
142 }
catch (boost::promise_already_satisfied
const&) {
143 LOG4CPLUS_DEBUG(m_params.common.logger,
144 "AwaitPrimAsync::InitiateAwait: Promise already satisfied");
147 m_promise.set_exception(boost::current_exception());
152boost::future<void> AwaitPrimAsync::MakeInterval() {
153 using std::chrono::duration_cast;
154 using std::chrono::milliseconds;
155 using std::chrono::steady_clock;
159 return boost::make_ready_future();
162 auto now = steady_clock::now();
163 auto next_start = *m_last_start + duration_cast<steady_clock::duration>(m_params.wait_interval);
164 if (now >= next_start) {
166 return boost::make_ready_future();
168 LOG4CPLUS_DEBUG(m_params.common.logger,
169 fmt::format(
"AwaitPrimAsync::MakeInterval: Waiting {}ms until sending RecWait",
170 duration_cast<milliseconds>(next_start - now).count()));
173 m_interval.emplace(m_params.common.executor.get_io_context(), next_start);
174 m_interval->timer.async_wait([
this](boost::system::error_code
const& ec) {
177 m_interval->promise.set_value();
179 return m_interval->promise.get_future();
182void AwaitPrimAsync::Abort() noexcept {
183 if (m_abort_requested) {
187 LOG4CPLUS_INFO(m_params.common.logger,
188 fmt::format(
"AwaitPrimAsync::Abort: Requested to abort! "
189 "Number of files received so far: {}",
191 m_abort_requested =
true;
193 m_interval->timer.cancel();
197boost::future<bool> AwaitPrimAsync::AwaitOnceAsync() {
198 using Reply = std::shared_ptr<recif::RecWaitStatus>;
199 using Seconds = std::chrono::duration<float>;
200 using std::chrono::duration_cast;
201 if (m_abort_requested) {
203 m_params.common.logger,
204 fmt::format(
"AwaitPrimAsync::AwaitOnceAsync: Operation requested to abort"));
205 return boost::make_ready_future<bool>(
true);
207 LOG4CPLUS_DEBUG(m_params.common.logger,
208 fmt::format(
"AwaitPrimAsync::AwaitOnceAsync: Sending requests..."));
210 m_last_start = std::chrono::steady_clock::now();
212 return SendRequestAndCollectReplies<bool>(
213 m_params.common.prim_sources.begin(),
214 m_params.common.prim_sources.end(),
217 return IsSubsequentState(State::Stopped, source.GetState());
222 auto& client = s.GetSource().GetRrClient();
223 auto seconds = duration_cast<Seconds>(this->m_params.wait_interval).count();
224#if defined(UNIT_TEST)
229 auto spec = std::make_shared<RecWaitSpec>(seconds);
231 auto mal = client.getMal();
232 BOOST_ASSERT_MSG(mal,
"MAL RR client returned invalid MAL pointer");
233 auto spec = mal->createDataEntity<recif::RecWaitSpec>();
235 *spec = RecWaitSpec(seconds);
237 return client.RecWait(spec);
241 ->
bool { return HandleRecWaitReply(source, std::move(fut)); },
242 std::string_view(
"AwaitPrimAsync: await primary data acquisition"))
243 .then(m_params.common.executor, AwaitUnwrapReplies);
246bool AwaitPrimAsync::HandleRecWaitReply(
247 Source<PrimSource>& source, boost::future<std::shared_ptr<recif::RecWaitStatus>>&& fut) {
253 auto reply = fut.get();
255 m_params.common.status.ClearAlert(alert_id);
257 auto status = reply->getStatus();
258 if (status == recif::Success) {
262 auto rec_status = reply->getRecStat();
263 LOG4CPLUS_INFO(m_params.common.logger,
264 fmt::format(
"Data source '{}' replied successfully and provides {} "
267 rec_status->getDpFiles().size()));
268 if (rec_status->getDpFiles().empty()) {
269 LOG4CPLUS_WARN(m_params.common.logger,
270 fmt::format(
"Data source '{}' replied successfully for "
271 "RecWait but did not produce any files!",
272 source.GetSource()));
274 for (
auto const& file : rec_status->getDpFiles()) {
275 m_parts.emplace_back(std::string(source.GetSource().GetName()), file);
283 }
catch (recif::ExceptionErr
const& e) {
284 m_params.common.status.SetAlert(
286 fmt::format(
"Primary source '{}' request 'RecWaitStatus' replied "
287 "with ICD error: ({}) {}",
288 source.GetSource().GetName(),
292 throw boost::enable_current_exception(
293 DaqSourceError(
"RecWait", std::string(source.GetSource().GetName()), e.what()));
296 m_params.common.status.SetAlert(
298 fmt::format(
"Primary source '{}' request 'RecWaitStatus' replied "
299 "with non-ICD error: {}",
300 source.GetSource().GetName(),
303 throw boost::enable_current_exception(DaqSourceError(
304 "RecWait", std::string(source.GetSource().GetName()),
"unknown exception"));
Contains declaration for the AwaitPrimAsync operation.
Declares daq::State and related functions.
constexpr std::string_view REQUEST
Request.
void FormatException(std::ostream &os, std::exception_ptr ptr)
Report without nesting.
AlertId MakeAlertId(std::string_view category, std::string key)
@ Stopped
All data sources have reported they have stopped acquiring data.
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
Simple class that holds the source and associated state.
rad::IoExecutor & executor
log4cplus::Logger & logger
Await specific parameters that is not provided with AsyncOpParams.
boost::future< Result< DpParts > > Initiate()
Initiates operation that await acquisition completion.
AwaitPrimAsync(AwaitOpParams params) noexcept
Constructs operation with the privided parameters.
Contains declaration for the async op utilities.