10 #include <fmt/format.h> 
   11 #include <fmt/ostream.h> 
   12 #include <log4cplus/loggingmacros.h> 
   27 bool AwaitUnwrapReplies(boost::future<std::vector<boost::future<bool>>>&& futures) {
 
   28     std::vector<boost::future<bool>> values = futures.get();
 
   33     std::vector<std::exception_ptr> exceptions;
 
   35     size_t num_exceptions = 0;
 
   36     for (
auto& f : values) {
 
   41         } 
catch (std::exception 
const& e) {
 
   45                 fmt::format(
"daq::op::AwaitUnwrapReplies: Source replied with exception: {}",
 
   51         fmt::format(
"daq::op::AwaitUnwrapReplies: {}/{} replies contained exceptions. {}/{} " 
   52                     "replies report success",
 
   58     return num_ok == values.size();
 
   61 class RecWaitSpec : 
public recif::RecWaitSpec {
 
   63     RecWaitSpec(
float timeout) : m_timeout{timeout}, m_info{} {
 
   65     std::string getInfo()
 const override {
 
   68     void setInfo(
const std::string& info)
 override {
 
   72     float getTimeout()
 const override {
 
   75     void setTimeout(
float timeout)
 override {
 
   78     bool hasKey()
 const override {
 
   81     std::unique_ptr<recif::RecWaitSpec> cloneKey()
 const override {
 
   82         throw std::runtime_error(
"not clonable");
 
   84     std::unique_ptr<recif::RecWaitSpec> clone()
 const override {
 
   85         throw std::runtime_error(
"not clonable");
 
   87     bool keyEquals(
const recif::RecWaitSpec& other)
 const override {
 
  100     : m_params(m_params), m_error(
false), m_parts(), m_aborted(
false) {
 
  105                     fmt::format(
"AwaitPrimAsync::Initiate: Operation initiating"));
 
  107     return m_promise.get_future();
 
  110 void AwaitPrimAsync::InitiateAwait() {
 
  114         .then(m_params.
common.
executor, [
this](
auto) { return AwaitOnceAsync(); })
 
  116         .then(m_params.
common.
executor, [
this](boost::future<bool> fut) -> 
void {
 
  119                 LOG4CPLUS_DEBUG(m_params.common.logger,
 
  120                                 fmt::format(
"AwaitPrimAsync::InitiateAwait: Operation aborted"));
 
  124                 auto is_ok = fut.get();
 
  127                     this->m_promise.set_value({m_error, std::move(m_parts)});
 
  130                     this->InitiateAwait();
 
  134                 this->m_promise.set_exception(boost::current_exception());
 
  139 boost::future<void> AwaitPrimAsync::MakeInterval() {
 
  140     using std::chrono::duration_cast;
 
  141     using std::chrono::milliseconds;
 
  142     using std::chrono::steady_clock;
 
  146         return boost::make_ready_future();
 
  149     auto now = steady_clock::now();
 
  150     auto next_start = *m_last_start + duration_cast<steady_clock::duration>(m_params.wait_interval);
 
  151     if (now >= next_start) {
 
  153         return boost::make_ready_future();
 
  155     LOG4CPLUS_DEBUG(m_params.common.logger,
 
  156                     fmt::format(
"AwaitPrimAsync::MakeInterval: Waiting {}ms until sending RecWait",
 
  157                                 duration_cast<milliseconds>(next_start - now).count()));
 
  160     m_interval.emplace(m_params.common.executor.get_io_context(), next_start);
 
  161     m_interval->timer.async_wait([
this](boost::system::error_code 
const& ec) {
 
  164         m_interval->promise.set_value();
 
  166     return m_interval->promise.get_future();
 
  169 void AwaitPrimAsync::Abort() noexcept {
 
  170     LOG4CPLUS_INFO(m_params.common.logger,
 
  171                    fmt::format(
"AwaitPrimAsync::Abort: Requested to abort! " 
  172                                "Number of files received so far: {}",
 
  179     m_promise.set_value({m_error, std::move(m_parts)});
 
  182 boost::future<bool> AwaitPrimAsync::AwaitOnceAsync() {
 
  183     using Reply = std::shared_ptr<recif::RecWaitStatus>;
 
  184     using Seconds = std::chrono::duration<float>;
 
  185     using std::chrono::duration_cast;
 
  187         LOG4CPLUS_DEBUG(m_params.common.logger,
 
  188                         fmt::format(
"AwaitPrimAsync::AwaitOnceAsync: Operation aborted"));
 
  189         return boost::make_ready_future<bool>(
true);
 
  191     LOG4CPLUS_DEBUG(m_params.common.logger,
 
  192                     fmt::format(
"AwaitPrimAsync::AwaitOnceAsync: Sending requests..."));
 
  194     m_last_start = std::chrono::steady_clock::now();
 
  196     return SendRequestAndCollectReplies<bool>(
 
  197                m_params.common.prim_sources.begin(),
 
  198                m_params.common.prim_sources.end(),
 
  201                    return IsSubsequentState(State::Stopped, source.GetState());
 
  206                    auto spec = std::make_shared<RecWaitSpec>(
 
  207                        duration_cast<Seconds>(this->m_params.wait_interval).count());
 
  208                    return s.GetSource().GetRrClient().RecWait(spec);
 
  212                    -> 
bool { return HandleRecWaitReply(source, std::move(fut)); },
 
  213                std::string_view(
"AwaitPrimAsync: await primary data acquisition"))
 
  214         .then(m_params.common.executor, AwaitUnwrapReplies);
 
  217 bool AwaitPrimAsync::HandleRecWaitReply(
 
  218     Source<PrimSource>& source, boost::future<std::shared_ptr<recif::RecWaitStatus>>&& fut) {
 
  224         auto reply = fut.get();
 
  226         m_params.common.status.ClearAlert(alert_id);
 
  228         auto status = reply->getStatus();
 
  229         if (status == recif::Success) {
 
  231             source.SetState(State::Stopped);
 
  233             auto rec_status = reply->getRecStatus();
 
  234             LOG4CPLUS_INFO(m_params.common.logger,
 
  235                            fmt::format(
"Data source '{}' replied successfully and provides {} " 
  238                                        rec_status->getDpFiles().size()));
 
  239             if (rec_status->getDpFiles().empty()) {
 
  240                 LOG4CPLUS_WARN(m_params.common.logger,
 
  241                                fmt::format(
"Data source '{}' replied successfully for " 
  242                                            "RecWait but did not produce any files!",
 
  243                                            source.GetSource()));
 
  245             for (
auto const& file : rec_status->getDpFiles()) {
 
  246                 m_parts.emplace_back(std::string(source.GetSource().GetName()), file);
 
  254     } 
catch (recif::ExceptionErr 
const& e) {
 
  255         m_params.common.status.SetAlert(
 
  257                       fmt::format(
"Primary source '{}' request 'RecWaitStatus' replied " 
  258                                   "with ICD error: ({}) {}",
 
  259                                   source.GetSource().GetName(),
 
  263         throw boost::enable_current_exception(
 
  264             DaqSourceError(
"RecWait", std::string(source.GetSource().GetName()), e.what()));
 
  267         m_params.common.status.SetAlert(
 
  269                       fmt::format(
"Primary source '{}' request 'RecWaitStatus' replied " 
  270                                   "with non-ICD error: {}",
 
  271                                   source.GetSource().GetName(),
 
  274         throw boost::enable_current_exception(DaqSourceError(
 
  275             "RecWait", std::string(source.GetSource().GetName()), 
"unknown exception"));