ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
dpmService.cpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup daq_dpm
4 * @copyright (c) Copyright ESO 2022
5 * All Rights Reserved
6 * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7 *
8 * @brief daq::dpm::DpmService definition.
9 */
11
12#include <exception>
13
14#include <boost/asio/io_context.hpp>
15#include <boost/throw_exception.hpp>
16
17#include <fmt/format.h>
18#include <log4cplus/loggingmacros.h>
19
20#include <daq/conversion.hpp>
21#include <daq/dpm/config.hpp>
22#include <daq/error/report.hpp>
23#include <daq/json.hpp>
24
25namespace daq::dpm {
26
28 elt::mal::Mal& mal,
29 Workspace& workspace,
30 Scheduler& scheduler)
31 : m_executor(executor)
32 , m_mal(mal)
33 , m_workspace(workspace)
34 , m_scheduler(scheduler)
35 , m_work_guard(m_executor.get_io_context().get_executor())
36 , m_sigset(executor.get_io_context(), SIGINT, SIGTERM)
37 , m_logger(log4cplus::Logger::getInstance(LOGGER_NAME)) {
38 InitiateSignalHandler();
39}
40
41boost::future<std::shared_ptr<::daqif::StorageStatus>> DpmService::QueryStorageStatus() {
42 try {
43 // Note: QueryStorageStatus() is thread-safe
44 auto status = m_workspace.QueryStorageStatus();
45
46 std::shared_ptr<daqif::StorageStatus> rep = m_mal.createDataEntity<daqif::StorageStatus>();
47 rep->setCapacity(status.capacity);
48 rep->setFree(status.free);
49 rep->setAvailable(status.available);
50
51 return boost::make_ready_future<std::shared_ptr<daqif::StorageStatus>>(rep);
52 } catch (...) {
53 error::NestedExceptionReporter r(std::current_exception());
54 return boost::make_exceptional_future<std::shared_ptr<daqif::StorageStatus>>(
55 daqif::RuntimeError(r.Str()));
56 }
57}
58
59boost::future<std::string> DpmService::Exit() {
60 // Removing guard which will cause asio::io_context::run to finish when it runs out of pending
61 // work.
62 // Note: If e.g. timers are added continously then there will always be an outstanding operation
63 // so it won't complete.
64 LOG4CPLUS_INFO(m_logger, "Application termination requested with Exit()");
65 return boost::async(m_executor, [this]() mutable -> std::string {
66 HandleExit();
67 return "OK";
68 });
69}
70
71boost::future<std::string> DpmService::GetVersion() {
72 return boost::make_ready_future(std::string(VERSION));
73}
74
75boost::future<std::shared_ptr<::daqif::DaqReply>>
76DpmService::QueueDaq(std::string const& status, std::string const& specification) {
77 LOG4CPLUS_DEBUG(
78 m_logger,
79 "QueueDaq() Queuing new DAQ:\nStatus: " << status << "\nSpecification: " << specification);
80 using V = ::daqif::DaqReply;
81 using R = std::shared_ptr<V>;
82 return boost::async(m_executor, [status, specification, this]() mutable -> R {
83 try {
84 auto id = m_scheduler.QueueDaq(status, specification);
85 try {
86 auto rep = m_mal.createDataEntity<V>();
87 rep->setId(id);
88 rep->setError(false);
89 return rep;
90 } catch (...) {
91 error::NestedExceptionReporter r(std::current_exception());
92 BOOST_THROW_EXCEPTION(daqif::DaqException(id, r.Str()));
93 }
94 } catch (daqif::DaqException const& e) {
96 LOG4CPLUS_ERROR(m_logger, "QueueDaq() failed with ICD error:\n" << r.Str());
97 throw;
98 } catch (...) {
99 error::NestedExceptionReporter r(std::current_exception());
100 LOG4CPLUS_ERROR(m_logger, "QueueDaq() failed with following error(s):\n" << r);
101 BOOST_THROW_EXCEPTION(daqif::DaqException("", r.Str()));
102 }
103 });
104}
105
106boost::future<std::shared_ptr<::daqif::DaqReply>> DpmService::AbortDaq(const std::string& id) {
107 LOG4CPLUS_INFO(m_logger, fmt::format("AbortDaq({}): Handling request", id));
108 using V = ::daqif::DaqReply;
109 using R = std::shared_ptr<V>;
110 return boost::async(m_executor, [id, this]() mutable -> R {
111 try {
112 m_scheduler.AbortDaq(id);
113 try {
114 auto rep = m_mal.createDataEntity<V>();
115 rep->setId(id);
116 rep->setError(false);
117 return rep;
118 } catch (...) {
119 error::NestedExceptionReporter r(std::current_exception());
120 throw daqif::DaqException(id, r.Str());
121 }
122 } catch (daqif::DaqException const& e) {
124 LOG4CPLUS_ERROR(m_logger, "AbortDaq() failed with following error(s):\n" << r);
125 throw;
126 } catch (...) {
127 error::NestedExceptionReporter r(std::current_exception());
128 LOG4CPLUS_ERROR(m_logger, "AbortDaq() failed with following error(s):\n" << r);
129 BOOST_THROW_EXCEPTION(daqif::DaqException("", r.Str()));
130 }
131 });
132}
133
134boost::future<std::shared_ptr<::daqif::DaqStatus>> DpmService::GetDaqStatus(const std::string& id) {
135 LOG4CPLUS_INFO(m_logger, fmt::format("GetDaqStatus({}): Handling request", id));
136 using V = ::daqif::DaqStatus;
137 using R = std::shared_ptr<V>;
138 return boost::async(m_executor, [id, this]() mutable -> R {
139 try {
140 return GetDaqStatusSync(id);
141 } catch (daqif::DaqException const& e) {
143 LOG4CPLUS_ERROR(m_logger, "GetDaqStatus() failed with following error(s):\n" << r);
144 throw;
145 } catch (...) {
146 error::NestedExceptionReporter r(std::current_exception());
147 LOG4CPLUS_ERROR(m_logger, "GetDaqStatus() failed with following error(s):\n" << r);
148 BOOST_THROW_EXCEPTION(daqif::DaqException(id, r.Str()));
149 }
150 });
151}
152
153boost::future<std::string> DpmService::GetInternalDaqStatus(const std::string& id) {
154 LOG4CPLUS_INFO(m_logger, fmt::format("GetInternalDaqStatus({}): Handling request", id));
155 return boost::async(m_executor, [id, this]() mutable -> std::string {
156 try {
157 auto status = m_scheduler.GetDaqStatus(id);
158 nlohmann::json j = status;
159 return j.dump(2);
160 } catch (daqif::DaqException const& e) {
162 LOG4CPLUS_ERROR(m_logger,
163 "GetInternalDaqStatus() failed with following error(s):\n"
164 << r);
165 throw;
166 } catch (...) {
167 error::NestedExceptionReporter r(std::current_exception());
168 LOG4CPLUS_ERROR(m_logger,
169 "GetInternalDaqStatus() failed with following error(s):\n"
170 << r);
171 BOOST_THROW_EXCEPTION(daqif::DaqException(id, r.Str()));
172 }
173 });
174}
175
176boost::future<std::vector<std::shared_ptr<::daqif::DaqStatus>>> DpmService::GetActiveDaqs() {
177 LOG4CPLUS_INFO(m_logger, "GetActiveDaqs(): Handling request");
178 using V = std::shared_ptr<::daqif::DaqStatus>;
179 using R = std::vector<V>;
180 return boost::async(m_executor, [this]() mutable -> R {
181 try {
182 auto ids = m_scheduler.GetQueue();
183 R statuses;
184 for (auto const& id : ids) {
185 statuses.emplace_back(GetDaqStatusSync(id));
186 }
187 return statuses;
188 } catch (daqif::DaqException const& e) {
190 LOG4CPLUS_ERROR(m_logger, "GetActiveDaqs() failed with following error(s):\n" << r);
191 throw;
192 } catch (...) {
193 error::NestedExceptionReporter r(std::current_exception());
194 LOG4CPLUS_ERROR(m_logger, "GetActiveDaqs() failed with following error(s):\n" << r);
195 BOOST_THROW_EXCEPTION(daqif::DaqException("", r.Str()));
196 }
197 });
198}
199
200void DpmService::InitiateSignalHandler() {
201 m_sigset.async_wait([&](boost::system::error_code const& ec, int sig_num) {
202 if (ec) {
203 return;
204 }
205 LOG4CPLUS_INFO(m_logger,
206 fmt::format("Application termination requested with signal {}", sig_num));
207 HandleExit();
208 });
209}
210
211void DpmService::HandleExit() {
212 LOG4CPLUS_TRACE(m_logger, "HandleExit(): Application termination has been requested");
213 // If we still have a work-guard we try a nice shutdown, if there's no work guard
214 if (m_work_guard.owns_work()) {
215 // Reset work guard and cancel pending operations
216 m_work_guard.reset();
217 m_scheduler.Stop();
218 m_sigset.cancel();
219 m_sigset.clear();
220 } else {
221 LOG4CPLUS_INFO(m_logger,
222 "Application termination already requested before. Stopping immediately!");
223 m_executor.get_io_context().stop();
224 }
225}
226
227std::shared_ptr<daqif::DaqStatus> DpmService::GetDaqStatusSync(const std::string& id) {
228 LOG4CPLUS_INFO(m_logger, fmt::format("GetDaqStatusSync({}): Handling request", id));
229 using V = ::daqif::DaqStatus;
230 try {
231 auto status = m_scheduler.GetDaqStatus(id);
232 auto rep = m_mal.createDataEntity<V>();
233 *rep << status;
234 return rep;
235 } catch (...) {
236 error::NestedExceptionReporter r(std::current_exception());
237 BOOST_THROW_EXCEPTION(daqif::DaqException(id, r.Str()));
238 }
239}
240
241} // namespace daq::dpm
DpmService(rad::IoExecutor &executor, elt::mal::Mal &mal, Workspace &workspace, Scheduler &scheduler)
Construct replier.
Definition: dpmService.cpp:27
boost::future< std::string > GetVersion() override
Cancels any started merges and terminates application.
Definition: dpmService.cpp:71
boost::future< std::shared_ptr<::daqif::StorageStatus > > QueryStorageStatus() override
Cancels any started merges and terminates application.
Definition: dpmService.cpp:41
boost::future< std::string > Exit() override
Cancels any started merges and terminates application.
Definition: dpmService.cpp:59
boost::future< std::vector< std::shared_ptr<::daqif::DaqStatus > > > GetActiveDaqs() override
Definition: dpmService.cpp:176
boost::future< std::shared_ptr<::daqif::DaqReply > > AbortDaq(const std::string &id) override
Definition: dpmService.cpp:106
boost::future< std::string > GetInternalDaqStatus(const std::string &id) override
Definition: dpmService.cpp:153
boost::future< std::shared_ptr<::daqif::DaqStatus > > GetDaqStatus(const std::string &id) override
Definition: dpmService.cpp:134
boost::future< std::shared_ptr<::daqif::DaqReply > > QueueDaq(std::string const &status, std::string const &specification) override
Definition: dpmService.cpp:76
Schedules asynchronous activities that results in merged Data Product and delivery.
Definition: scheduler.hpp:222
virtual void AbortDaq(std::string const &id)=0
Abort merging DAQ identified by id.
virtual std::string QueueDaq(std::string const &dp_spec, std::string const &status)=0
Queues DAQ for processing.
virtual Status GetDaqStatus(std::string const &id) const =0
Queries current DAQ status, possibly from last recorded status in workspace.
virtual void Stop()=0
virtual std::vector< std::string > GetQueue() const noexcept=0
Queries current DAQ queue.
Interface to interact with DPM workspace.
Definition: workspace.hpp:133
virtual auto QueryStorageStatus() const -> std::filesystem::space_info=0
Queries available storage for workspace.
Adapter object intended to be used in contexts without direct access to the output-stream object.
Definition: report.hpp:54
std::string Str() const
Convenience function for constructing a std::string from the exception.
Definition: report.cpp:97
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
boost::asio::io_context & get_io_context() noexcept
Not part of the boost::thread::executor concept.
Definition: ioExecutor.hpp:41
Declares JSON support for serialization.
Contains support functions for daqif.
daq::dpm::DpmService implements MAL services daqif::DpmControl and daqif::DpmDaqControl.
DPM server config.
const std::string LOGGER_NAME
Definition: config.hpp:22