ifw-daq  3.0.1
IFW Data Acquisition modules
dpmClient.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_ocm_libdaq_test
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief `daq::DpmClient`
7  */
8 #include <daq/dpmClient.hpp>
9 
10 #include <boost/assert.hpp>
11 
12 #include <Daqif.hpp>
13 #include <log4cplus/loggingmacros.h>
14 #include <mal/Mal.hpp>
15 #include <mal/rr/qos/ReplyTime.hpp>
16 
17 #include <daq/conversion.hpp>
18 #include <daqif/subscription.hpp>
19 
20 namespace daq {
21 
22 using DaqReplyPtr = std::shared_ptr<daqif::DaqReply>;
23 
25  std::shared_ptr<bool> alive = std::make_shared<bool>(true);
28 };
29 
30 DpmClientImpl::DpmClientImpl(boost::asio::io_context& io_ctx, mal::Mal& mal, DpmClientParams params)
31  : m_io_ctx(io_ctx)
32  , m_executor(m_io_ctx)
33  , m_mal(mal)
34  , m_params(std::move(params))
35  , m_logger(log4cplus::Logger::getInstance("daq.ocm")) {
36  m_dpmif = m_mal.getClient<daqif::DpmDaqControlAsync>(
37  daqif::MakeServiceUri(m_params.rr_uri, "daq"),
38  {std::make_shared<elt::mal::rr::qos::ReplyTime>(params.timeout)},
39  {});
40  assert(m_dpmif);
41 
42  m_subs = std::make_unique<Subscriptions>();
43  // Add subscription to status notifications, which will invoked from unknown threads,
44  // so we dispatch to asio instead and emit signal safely there.
45  m_subs->status = daqif::MakeSubscription<daqif::DaqStatus>(
46  m_mal,
47  daqif::MakeServiceUri(m_params.ps_uri, "daq/status"),
48  [weak = std::weak_ptr<bool>(m_subs->alive), this](auto& sub, auto const& event) {
49  auto shared = weak.lock();
50  if (!shared) {
51  // Abandoned
52  return;
53  }
54  auto const& sample = event.getData();
55  Status status(sample->getId(), sample->getFileId());
56  status << *sample;
57  LOG4CPLUS_TRACE(m_logger, "Received DaqStatus update: " << status);
58  // Dispatch to main thread
59  m_io_ctx.post([status = std::move(status), weak, this] {
60  auto shared = weak.lock();
61  if (!shared) {
62  // Abandoned
63  return;
64  }
65  UpdateStatus(status);
66  });
67  });
68  // Add subscription to storage notifications, which will invoked from unknown threads,
69  // so we dispatch to asio instead and emit signal safely there.
70  m_subs->storage = daqif::MakeSubscription<daqif::StorageStatus>(
71  m_mal,
72  daqif::MakeServiceUri(m_params.ps_uri, "dpm/storage"),
73  [weak = std::weak_ptr<bool>(m_subs->alive), this](auto& sub, auto const& event) {
74  auto shared = weak.lock();
75  if (!shared) {
76  // Abandoned
77  return;
78  }
79  auto const& sample = event.getData();
80  std::filesystem::space_info space = {};
81  space << *sample;
82  // Dispatch to main thread
83  m_io_ctx.post([space, weak, this] {
84  auto shared = weak.lock();
85  if (!shared) {
86  // Abandoned
87  return;
88  }
89  m_storage_signal(space);
90  });
91  });
92 }
93 
94 DpmClientImpl::~DpmClientImpl() = default;
95 
96 auto DpmClientImpl::ScheduleAsync(std::string const& spec) -> boost::future<State> {
97  return m_dpmif->QueueDaq(spec).then(m_executor, [&](boost::future<DaqReplyPtr> f) {
98  auto reply = f.get();
99  // If it doesn't throw it was successful, until status change is published we can assume
100  // state is Scheduled.
101  return State::Scheduled;
102  });
103 }
104 
105 auto DpmClientImpl::AbortAsync(std::string const& id) -> boost::future<State> {
106  return m_dpmif->AbortDaq(id).then(m_executor, [&](boost::future<DaqReplyPtr> f) {
107  auto reply = f.get();
108  // If it doesn't throw it was successful, until status change is published we can assume
109  // state is Aborted.
110  return State::Aborted;
111  });
112 }
113 
114 void DpmClientImpl::StartMonitorStatus(std::string const& id) {
115  LOG4CPLUS_TRACE(m_logger, "DpmClientImpl::StartMonitorStatus: " << id);
116  RequestStatus(id);
117 }
118 
119 void DpmClientImpl::RequestStatus(std::string const& id) {
120  LOG4CPLUS_TRACE(m_logger, "RequestStatus:" << id);
121  m_res_status.AsyncAcquire().then(
122  m_executor, [&, id, alive = std::weak_ptr(m_liveness)](boost::future<ResourceToken> f_token) {
123  auto is_alive = alive.lock();
124  if (!is_alive) {
125  LOG4CPLUS_TRACE("daq.ocm", "DpmClient: Client abandoned.");
126  return;
127  }
128  LOG4CPLUS_TRACE(m_logger, "AsyncAcquire: Resource available returning ready token");
129  auto token = f_token.get();
130  m_dpmif->GetDaqStatus(id).then(
131  m_executor,
132  [&, id, alive = std::weak_ptr(m_liveness), token = std::move(token)](
133  boost::future<std::shared_ptr<daqif::DaqStatus>> f) {
134  auto is_alive = alive.lock();
135  if (!is_alive) {
136  LOG4CPLUS_TRACE("daq.ocm", "DpmClient: Client abandoned.");
137  return;
138  }
139  try {
140  auto reply = f.get();
141  BOOST_ASSERT_MSG(reply, "MAL returned empty shared pointer!");
142  Status status(reply->getId(), reply->getFileId());
143  status << *reply;
144 
145  UpdateStatus(status);
146  // To query indefinitely we can schedule a
147  // new request here. For now we rely on pub/sub
148  // to keep us updated after a successful
149  // alignment.
150  } catch (elt::mal::TimeoutException const&) {
151  // We only retry on timeouts (DPM not running?)
152  LOG4CPLUS_DEBUG(m_logger,
153  "Timeout from DPM GetDaqStatus() -> reschedule attempt");
154  ScheduleRequestStatus(id);
155  } catch (std::exception const& err) {
156  LOG4CPLUS_ERROR(
157  m_logger,
158  "Failure from GetDaqStatus() -> will not reschedule attempt. Error: "
159  << err.what());
160  }
161  });
162  });
163 }
164 
165 void DpmClientImpl::UpdateStatus(Status const& status) {
166  m_status_signal(status);
167 }
168 
169 void DpmClientImpl::ScheduleRequestStatus(std::string const& id) {
170  auto it = m_timers.find(id);
171  if (it != m_timers.end()) {
172  it->second.cancel();
173  } else {
174  it = m_timers.emplace(id, m_io_ctx).first;
175  }
176  // Set timer
177  it->second.expires_from_now(m_params.status_retry_interval);
178  // note: async_wait never invoked immedately but as if by post()
179  it->second.async_wait(
180  [&, id, alive = std::weak_ptr(m_liveness), this](boost::system::error_code ec) {
181  if (ec == boost::asio::error::operation_aborted) {
182  return;
183  }
184  auto is_alive = alive.lock();
185  if (!is_alive) {
186  LOG4CPLUS_TRACE("daq.ocm", "DpmClient: Client abandoned.");
187  return;
188  }
189  RequestStatus(id);
190  });
191 }
192 
193 void DpmClientImpl::StopMonitorStatus(std::string const& id) {
194  m_timers.erase(id);
195 }
196 
197 auto DpmClientImpl::ConnectStatusSignal(StatusSignal::slot_type const& slot)
198  -> boost::signals2::connection {
199  return m_status_signal.connect(slot);
200 }
201 
202 auto DpmClientImpl::ConnectStorageSignal(StorageSignal::slot_type const& slot)
203  -> boost::signals2::connection {
204  return m_storage_signal.connect(slot);
205 }
206 
207 } // namespace daq
std::shared_ptr< bool > alive
Definition: dpmClient.cpp:25
DpmClientImpl(boost::asio::io_context &io_ctx, mal::Mal &mal, DpmClientParams params)
Definition: dpmClient.cpp:30
daqif::Subscription< daqif::DaqStatus > status
Definition: dpmClient.cpp:26
daqif::Subscription< daqif::StorageStatus > storage
Definition: dpmClient.cpp:27
Contains support functions for daqif.
daq::DpmClient
std::chrono::seconds timeout
Definition: dpmClient.hpp:95
std::string rr_uri
Definition: dpmClient.hpp:93
std::string ps_uri
Definition: dpmClient.hpp:94
std::shared_ptr< daqif::DaqReply > DaqReplyPtr
Definition: dpmClient.cpp:22
Connection parameters for DPM.
Definition: dpmClient.hpp:92
network::uri MakeServiceUri(std::string base_uri, std::string_view service_path)
Creates a service URI of the form <baseuri>/<service>.
Definition: uri.cpp:19
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:153
Contains URI support functions for daqif.