ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
dpmClient.cpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup daq_ocm_libdaq_test
4 * @copyright 2022 ESO - European Southern Observatory
5 *
6 * @brief `daq::DpmClient`
7 */
8#include <daq/dpmClient.hpp>
9
10#include <boost/assert.hpp>
11
12#include <Daqif.hpp>
13#include <log4cplus/loggingmacros.h>
14#include <mal/Mal.hpp>
15#include <mal/rr/qos/ReplyTime.hpp>
16
17#include <daq/conversion.hpp>
18#include <daq/json.hpp>
20
21namespace daq {
22
23using DaqReplyPtr = std::shared_ptr<daqif::DaqReply>;
24
26 std::shared_ptr<bool> alive = std::make_shared<bool>(true);
29};
30
31DpmClientImpl::DpmClientImpl(boost::asio::io_context& io_ctx, mal::Mal& mal, DpmClientParams params)
32 : m_io_ctx(io_ctx)
33 , m_executor(m_io_ctx)
34 , m_mal(mal)
35 , m_params(std::move(params))
36 , m_logger(log4cplus::Logger::getInstance("daq.ocm")) {
37 m_dpmif = m_mal.getClient<daqif::DpmDaqControlAsync>(
38 daqif::MakeServiceUri(m_params.rr_uri, "daq"),
39 {std::make_shared<elt::mal::rr::qos::ReplyTime>(params.timeout)},
40 {});
41 assert(m_dpmif);
42
43 m_subs = std::make_unique<Subscriptions>();
44 // Add subscription to status notifications, which will invoked from unknown threads,
45 // so we dispatch to asio instead and emit signal safely there.
46 m_subs->status = daqif::MakeSubscription<daqif::InternalDaqStatus>(
47 m_mal,
48 daqif::MakeServiceUri(m_params.ps_uri, "internal/daq/status"),
49 [weak = std::weak_ptr<bool>(m_subs->alive), this](auto& sub, auto const& event) {
50 auto shared = weak.lock();
51 if (!shared) {
52 // Abandoned
53 return;
54 }
55 auto const& sample = event.getData();
56 Status status;
57 status << *sample;
58 LOG4CPLUS_TRACE(m_logger, "Received DaqStatus update: " << status);
59 // Dispatch to main thread
60 m_io_ctx.post([status = std::move(status), weak, this] {
61 auto shared = weak.lock();
62 if (!shared) {
63 // Abandoned
64 return;
65 }
66 UpdateStatus(status);
67 });
68 });
69 // Add subscription to storage notifications, which will invoked from unknown threads,
70 // so we dispatch to asio instead and emit signal safely there.
71 m_subs->storage = daqif::MakeSubscription<daqif::StorageStatus>(
72 m_mal,
73 daqif::MakeServiceUri(m_params.ps_uri, "dpm/storage"),
74 [weak = std::weak_ptr<bool>(m_subs->alive), this](auto& sub, auto const& event) {
75 auto shared = weak.lock();
76 if (!shared) {
77 // Abandoned
78 return;
79 }
80 auto const& sample = event.getData();
81 std::filesystem::space_info space = {};
82 space << *sample;
83 // Dispatch to main thread
84 m_io_ctx.post([space, weak, this] {
85 auto shared = weak.lock();
86 if (!shared) {
87 // Abandoned
88 return;
89 }
90 m_storage_signal(space);
91 });
92 });
93}
94
95DpmClientImpl::~DpmClientImpl() = default;
96
97auto DpmClientImpl::ScheduleAsync(std::string const& spec, std::optional<std::string> const& status)
98 -> boost::future<State> {
99 return m_dpmif->QueueDaq(spec, status.value_or(""))
100 .then(m_executor, [&](boost::future<DaqReplyPtr> f) {
101 auto reply = f.get();
102 // If it doesn't throw it was successful, until status change is published we can assume
103 // state is Scheduled.
104 return State::Scheduled;
105 });
106}
107
108auto DpmClientImpl::AbortAsync(std::string const& id) -> boost::future<State> {
109 return m_dpmif->AbortDaq(id).then(m_executor, [&](boost::future<DaqReplyPtr> f) {
110 auto reply = f.get();
111 // If it doesn't throw it was successful, until status change is published we can assume
112 // state is Aborted.
113 return State::Aborted;
114 });
115}
116
117void DpmClientImpl::StartMonitorStatus(std::string const& id) {
118 LOG4CPLUS_TRACE(m_logger, "DpmClientImpl::StartMonitorStatus: " << id);
119 RequestStatus(id);
120}
121
122void DpmClientImpl::RequestStatus(std::string const& id) {
123 LOG4CPLUS_TRACE(m_logger, "RequestStatus:" << id);
124 m_res_status.AsyncAcquire().then(
125 m_executor,
126 [&, id, alive = std::weak_ptr(m_liveness)](boost::future<ResourceToken> f_token) {
127 auto is_alive = alive.lock();
128 if (!is_alive) {
129 LOG4CPLUS_TRACE("daq.ocm", "DpmClient: Client abandoned.");
130 return;
131 }
132 LOG4CPLUS_TRACE(m_logger, "RequestStatus: Resource available returning ready token");
133 auto token = f_token.get();
134 m_dpmif->GetInternalDaqStatus(id).then(
135 m_executor,
136 [&, id, alive = std::weak_ptr(m_liveness), token = std::move(token)](
137 boost::future<std::string> f) {
138 auto is_alive = alive.lock();
139 if (!is_alive) {
140 LOG4CPLUS_TRACE("daq.ocm", "DpmClient: Client abandoned.");
141 return;
142 }
143 try {
144 auto reply = f.get();
145 auto json = nlohmann::json::parse(reply);
146
147 Status status = json.get<Status>();
148
149 UpdateStatus(status);
150 // To query indefinitely we can schedule a
151 // new request here. For now we rely on pub/sub
152 // to keep us updated after a successful
153 // alignment.
154 } catch (elt::mal::TimeoutException const&) {
155 // We only retry on timeouts (DPM not running?)
156 LOG4CPLUS_DEBUG(m_logger,
157 "Timeout from DPM GetDaqStatus() -> reschedule attempt");
158 ScheduleRequestStatus(id);
159 } catch (std::exception const& err) {
160 LOG4CPLUS_ERROR(
161 m_logger,
162 "Failure from GetDaqStatus() -> will not reschedule attempt. Error: "
163 << err.what());
164 }
165 });
166 });
167}
168
169void DpmClientImpl::UpdateStatus(Status const& status) {
170 m_status_signal(status);
171}
172
173void DpmClientImpl::ScheduleRequestStatus(std::string const& id) {
174 auto it = m_timers.find(id);
175 if (it != m_timers.end()) {
176 it->second.cancel();
177 } else {
178 it = m_timers.emplace(id, m_io_ctx).first;
179 }
180 // Set timer
181 it->second.expires_from_now(m_params.status_retry_interval);
182 // note: async_wait never invoked immedately but as if by post()
183 it->second.async_wait(
184 [&, id, alive = std::weak_ptr(m_liveness), this](boost::system::error_code ec) {
185 if (ec == boost::asio::error::operation_aborted) {
186 return;
187 }
188 auto is_alive = alive.lock();
189 if (!is_alive) {
190 LOG4CPLUS_TRACE("daq.ocm", "DpmClient: Client abandoned.");
191 return;
192 }
193 RequestStatus(id);
194 });
195}
196
197void DpmClientImpl::StopMonitorStatus(std::string const& id) {
198 m_timers.erase(id);
199}
200
201auto DpmClientImpl::ConnectStatusSignal(StatusSignal::slot_type const& slot)
202 -> boost::signals2::connection {
203 return m_status_signal.connect(slot);
204}
205
206auto DpmClientImpl::ConnectStorageSignal(StorageSignal::slot_type const& slot)
207 -> boost::signals2::connection {
208 return m_storage_signal.connect(slot);
209}
210
211} // namespace daq
std::shared_ptr< bool > alive
Definition: dpmClient.cpp:26
daqif::Subscription< daqif::InternalDaqStatus > status
Definition: dpmClient.cpp:27
DpmClientImpl(boost::asio::io_context &io_ctx, mal::Mal &mal, DpmClientParams params)
Definition: dpmClient.cpp:31
daqif::Subscription< daqif::StorageStatus > storage
Definition: dpmClient.cpp:28
Declares JSON support for serialization.
Contains support functions for daqif.
daq::DpmClient
std::chrono::seconds timeout
Definition: dpmClient.hpp:96
std::string rr_uri
Definition: dpmClient.hpp:94
std::string ps_uri
Definition: dpmClient.hpp:95
std::shared_ptr< daqif::DaqReply > DaqReplyPtr
Definition: dpmClient.cpp:23
Connection parameters for DPM.
Definition: dpmClient.hpp:93
network::uri MakeServiceUri(std::string base_uri, std::string_view service_path)
Creates a service URI of the form <baseuri>/<service>.
Definition: uri.cpp:19
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:164
Contains URI support functions for daqif.