10 #include <boost/assert.hpp>
13 #include <log4cplus/loggingmacros.h>
14 #include <mal/Mal.hpp>
15 #include <mal/rr/qos/ReplyTime.hpp>
25 std::shared_ptr<bool>
alive = std::make_shared<bool>(
true);
32 , m_executor(m_io_ctx)
34 , m_params(std::move(params))
35 , m_logger(log4cplus::Logger::getInstance(
"daq.ocm")) {
36 m_dpmif = m_mal.getClient<daqif::DpmDaqControlAsync>(
38 {std::make_shared<elt::mal::rr::qos::ReplyTime>(params.
timeout)},
42 m_subs = std::make_unique<Subscriptions>();
45 m_subs->status = daqif::MakeSubscription<daqif::DaqStatus>(
48 [weak = std::weak_ptr<bool>(m_subs->alive),
this](
auto& sub,
auto const& event) {
49 auto shared = weak.lock();
54 auto const& sample = event.getData();
55 Status status(sample->getId(), sample->getFileId());
57 LOG4CPLUS_TRACE(m_logger,
"Received DaqStatus update: " << status);
59 m_io_ctx.post([status = std::move(status), weak,
this] {
60 auto shared = weak.lock();
70 m_subs->storage = daqif::MakeSubscription<daqif::StorageStatus>(
73 [weak = std::weak_ptr<bool>(m_subs->alive),
this](
auto& sub,
auto const& event) {
74 auto shared = weak.lock();
79 auto const& sample = event.getData();
80 std::filesystem::space_info space = {};
83 m_io_ctx.post([space, weak,
this] {
84 auto shared = weak.lock();
89 m_storage_signal(space);
94 DpmClientImpl::~DpmClientImpl() =
default;
96 auto DpmClientImpl::ScheduleAsync(std::string
const& spec) -> boost::future<State> {
97 return m_dpmif->QueueDaq(spec).then(m_executor, [&](boost::future<DaqReplyPtr> f) {
101 return State::Scheduled;
105 auto DpmClientImpl::AbortAsync(std::string
const&
id) -> boost::future<State> {
106 return m_dpmif->AbortDaq(
id).then(m_executor, [&](boost::future<DaqReplyPtr> f) {
107 auto reply = f.get();
110 return State::Aborted;
114 void DpmClientImpl::StartMonitorStatus(std::string
const&
id) {
115 LOG4CPLUS_TRACE(m_logger,
"DpmClientImpl::StartMonitorStatus: " <<
id);
119 void DpmClientImpl::RequestStatus(std::string
const&
id) {
120 LOG4CPLUS_TRACE(m_logger,
"RequestStatus:" <<
id);
121 m_res_status.AsyncAcquire().then(
122 m_executor, [&,
id, alive = std::weak_ptr(m_liveness)](boost::future<ResourceToken> f_token) {
123 auto is_alive = alive.lock();
125 LOG4CPLUS_TRACE(
"daq.ocm",
"DpmClient: Client abandoned.");
128 LOG4CPLUS_TRACE(m_logger,
"AsyncAcquire: Resource available returning ready token");
129 auto token = f_token.get();
130 m_dpmif->GetDaqStatus(
id).then(
132 [&,
id, alive = std::weak_ptr(m_liveness), token = std::move(token)](
133 boost::future<std::shared_ptr<daqif::DaqStatus>> f) {
134 auto is_alive = alive.lock();
136 LOG4CPLUS_TRACE(
"daq.ocm",
"DpmClient: Client abandoned.");
140 auto reply = f.get();
141 BOOST_ASSERT_MSG(reply,
"MAL returned empty shared pointer!");
142 Status status(reply->getId(), reply->getFileId());
145 UpdateStatus(status);
150 }
catch (elt::mal::TimeoutException
const&) {
152 LOG4CPLUS_DEBUG(m_logger,
153 "Timeout from DPM GetDaqStatus() -> reschedule attempt");
154 ScheduleRequestStatus(
id);
155 }
catch (std::exception
const& err) {
158 "Failure from GetDaqStatus() -> will not reschedule attempt. Error: "
165 void DpmClientImpl::UpdateStatus(Status
const& status) {
166 m_status_signal(status);
169 void DpmClientImpl::ScheduleRequestStatus(std::string
const&
id) {
170 auto it = m_timers.find(
id);
171 if (it != m_timers.end()) {
174 it = m_timers.emplace(
id, m_io_ctx).first;
177 it->second.expires_from_now(m_params.status_retry_interval);
179 it->second.async_wait(
180 [&,
id, alive = std::weak_ptr(m_liveness),
this](boost::system::error_code ec) {
181 if (ec == boost::asio::error::operation_aborted) {
184 auto is_alive = alive.lock();
186 LOG4CPLUS_TRACE(
"daq.ocm",
"DpmClient: Client abandoned.");
193 void DpmClientImpl::StopMonitorStatus(std::string
const&
id) {
197 auto DpmClientImpl::ConnectStatusSignal(StatusSignal::slot_type
const& slot)
198 -> boost::signals2::connection {
199 return m_status_signal.connect(slot);
202 auto DpmClientImpl::ConnectStorageSignal(StorageSignal::slot_type
const& slot)
203 -> boost::signals2::connection {
204 return m_storage_signal.connect(slot);
std::shared_ptr< bool > alive
DpmClientImpl(boost::asio::io_context &io_ctx, mal::Mal &mal, DpmClientParams params)
daqif::Subscription< daqif::DaqStatus > status
daqif::Subscription< daqif::StorageStatus > storage
Contains support functions for daqif.
std::chrono::seconds timeout
std::shared_ptr< daqif::DaqReply > DaqReplyPtr
Connection parameters for DPM.
network::uri MakeServiceUri(std::string base_uri, std::string_view service_path)
Creates a service URI of the form <baseuri>/<service>.
Non observable status object that keeps stores status of data acquisition.
Contains URI support functions for daqif.