10#include <boost/assert.hpp>
13#include <log4cplus/loggingmacros.h>
15#include <mal/rr/qos/ReplyTime.hpp>
26 std::shared_ptr<bool>
alive = std::make_shared<bool>(
true);
33 , m_executor(m_io_ctx)
35 , m_params(std::move(params))
36 , m_logger(log4cplus::Logger::getInstance(
"daq.ocm")) {
37 m_dpmif = m_mal.getClient<daqif::DpmDaqControlAsync>(
39 {std::make_shared<elt::mal::rr::qos::ReplyTime>(params.
timeout)},
43 m_subs = std::make_unique<Subscriptions>();
46 m_subs->status = daqif::MakeSubscription<daqif::InternalDaqStatus>(
49 [weak = std::weak_ptr<bool>(m_subs->alive),
this](
auto& sub,
auto const& event) {
50 auto shared = weak.lock();
55 auto const& sample = event.getData();
58 LOG4CPLUS_TRACE(m_logger,
"Received DaqStatus update: " << status);
60 m_io_ctx.post([status = std::move(status), weak,
this] {
61 auto shared = weak.lock();
71 m_subs->storage = daqif::MakeSubscription<daqif::StorageStatus>(
74 [weak = std::weak_ptr<bool>(m_subs->alive),
this](
auto& sub,
auto const& event) {
75 auto shared = weak.lock();
80 auto const& sample = event.getData();
81 std::filesystem::space_info space = {};
84 m_io_ctx.post([space, weak,
this] {
85 auto shared = weak.lock();
90 m_storage_signal(space);
95DpmClientImpl::~DpmClientImpl() =
default;
97auto DpmClientImpl::ScheduleAsync(std::string
const& spec, std::optional<std::string>
const& status)
98 -> boost::future<State> {
99 return m_dpmif->QueueDaq(spec, status.value_or(
""))
100 .then(m_executor, [&](boost::future<DaqReplyPtr> f) {
101 auto reply = f.get();
104 return State::Scheduled;
108auto DpmClientImpl::AbortAsync(std::string
const&
id) -> boost::future<State> {
109 return m_dpmif->AbortDaq(
id).then(m_executor, [&](boost::future<DaqReplyPtr> f) {
110 auto reply = f.get();
113 return State::Aborted;
117void DpmClientImpl::StartMonitorStatus(std::string
const&
id) {
118 LOG4CPLUS_TRACE(m_logger,
"DpmClientImpl::StartMonitorStatus: " <<
id);
122void DpmClientImpl::RequestStatus(std::string
const&
id) {
123 LOG4CPLUS_TRACE(m_logger,
"RequestStatus:" <<
id);
124 m_res_status.AsyncAcquire().then(
126 [&,
id, alive = std::weak_ptr(m_liveness)](boost::future<ResourceToken> f_token) {
127 auto is_alive = alive.lock();
129 LOG4CPLUS_TRACE(
"daq.ocm",
"DpmClient: Client abandoned.");
132 LOG4CPLUS_TRACE(m_logger,
"RequestStatus: Resource available returning ready token");
133 auto token = f_token.get();
134 m_dpmif->GetInternalDaqStatus(
id).then(
136 [&,
id, alive = std::weak_ptr(m_liveness), token = std::move(token)](
137 boost::future<std::string> f) {
138 auto is_alive = alive.lock();
140 LOG4CPLUS_TRACE(
"daq.ocm",
"DpmClient: Client abandoned.");
144 auto reply = f.get();
145 auto json = nlohmann::json::parse(reply);
147 Status status = json.get<Status>();
149 UpdateStatus(status);
154 }
catch (elt::mal::TimeoutException
const&) {
156 LOG4CPLUS_DEBUG(m_logger,
157 "Timeout from DPM GetDaqStatus() -> reschedule attempt");
158 ScheduleRequestStatus(
id);
159 }
catch (std::exception
const& err) {
162 "Failure from GetDaqStatus() -> will not reschedule attempt. Error: "
169void DpmClientImpl::UpdateStatus(Status
const& status) {
170 m_status_signal(status);
173void DpmClientImpl::ScheduleRequestStatus(std::string
const&
id) {
174 auto it = m_timers.find(
id);
175 if (it != m_timers.end()) {
178 it = m_timers.emplace(
id, m_io_ctx).first;
181 it->second.expires_from_now(m_params.status_retry_interval);
183 it->second.async_wait(
184 [&,
id, alive = std::weak_ptr(m_liveness),
this](boost::system::error_code ec) {
185 if (ec == boost::asio::error::operation_aborted) {
188 auto is_alive = alive.lock();
190 LOG4CPLUS_TRACE(
"daq.ocm",
"DpmClient: Client abandoned.");
197void DpmClientImpl::StopMonitorStatus(std::string
const&
id) {
201auto DpmClientImpl::ConnectStatusSignal(StatusSignal::slot_type
const& slot)
202 -> boost::signals2::connection {
203 return m_status_signal.connect(slot);
206auto DpmClientImpl::ConnectStorageSignal(StorageSignal::slot_type
const& slot)
207 -> boost::signals2::connection {
208 return m_storage_signal.connect(slot);
std::shared_ptr< bool > alive
daqif::Subscription< daqif::InternalDaqStatus > status
DpmClientImpl(boost::asio::io_context &io_ctx, mal::Mal &mal, DpmClientParams params)
daqif::Subscription< daqif::StorageStatus > storage
Declares JSON support for serialization.
Contains support functions for daqif.
std::chrono::seconds timeout
std::shared_ptr< daqif::DaqReply > DaqReplyPtr
Connection parameters for DPM.
network::uri MakeServiceUri(std::string base_uri, std::string_view service_path)
Creates a service URI of the form <baseuri>/<service>.
Non observable status object that keeps stores status of data acquisition.
Contains URI support functions for daqif.