14#include <boost/asio/io_context.hpp>
15#include <boost/throw_exception.hpp>
17#include <fmt/format.h>
18#include <log4cplus/loggingmacros.h>
31 : m_executor(executor)
33 , m_workspace(workspace)
34 , m_scheduler(scheduler)
35 , m_work_guard(m_executor.get_io_context().get_executor())
36 , m_sigset(executor.get_io_context(), SIGINT, SIGTERM)
37 , m_logger(log4cplus::Logger::getInstance(
LOGGER_NAME)) {
38 InitiateSignalHandler();
46 std::shared_ptr<daqif::StorageStatus> rep = m_mal.createDataEntity<daqif::StorageStatus>();
47 rep->setCapacity(status.capacity);
48 rep->setFree(status.free);
49 rep->setAvailable(status.available);
51 return boost::make_ready_future<std::shared_ptr<daqif::StorageStatus>>(rep);
54 return boost::make_exceptional_future<std::shared_ptr<daqif::StorageStatus>>(
55 daqif::RuntimeError(r.
Str()));
64 LOG4CPLUS_INFO(m_logger,
"Application termination requested with Exit()");
65 return boost::async(m_executor, [
this]()
mutable -> std::string {
72 return boost::make_ready_future(std::string(VERSION));
75boost::future<std::shared_ptr<::daqif::DaqReply>>
79 "QueueDaq() Queuing new DAQ:\nStatus: " << status <<
"\nSpecification: " << specification);
80 using V = ::daqif::DaqReply;
81 using R = std::shared_ptr<V>;
82 return boost::async(m_executor, [status, specification,
this]()
mutable -> R {
84 auto id = m_scheduler.
QueueDaq(status, specification);
86 auto rep = m_mal.createDataEntity<V>();
92 BOOST_THROW_EXCEPTION(daqif::DaqException(
id, r.
Str()));
94 }
catch (daqif::DaqException
const& e) {
96 LOG4CPLUS_ERROR(m_logger,
"QueueDaq() failed with ICD error:\n" << r.
Str());
100 LOG4CPLUS_ERROR(m_logger,
"QueueDaq() failed with following error(s):\n" << r);
101 BOOST_THROW_EXCEPTION(daqif::DaqException(
"", r.
Str()));
107 LOG4CPLUS_INFO(m_logger, fmt::format(
"AbortDaq({}): Handling request",
id));
108 using V = ::daqif::DaqReply;
109 using R = std::shared_ptr<V>;
110 return boost::async(m_executor, [
id,
this]()
mutable -> R {
114 auto rep = m_mal.createDataEntity<V>();
116 rep->setError(
false);
120 throw daqif::DaqException(
id, r.
Str());
122 }
catch (daqif::DaqException
const& e) {
124 LOG4CPLUS_ERROR(m_logger,
"AbortDaq() failed with following error(s):\n" << r);
128 LOG4CPLUS_ERROR(m_logger,
"AbortDaq() failed with following error(s):\n" << r);
129 BOOST_THROW_EXCEPTION(daqif::DaqException(
"", r.
Str()));
135 LOG4CPLUS_INFO(m_logger, fmt::format(
"GetDaqStatus({}): Handling request",
id));
136 using V = ::daqif::DaqStatus;
137 using R = std::shared_ptr<V>;
138 return boost::async(m_executor, [
id,
this]()
mutable -> R {
140 return GetDaqStatusSync(
id);
141 }
catch (daqif::DaqException
const& e) {
143 LOG4CPLUS_ERROR(m_logger,
"GetDaqStatus() failed with following error(s):\n" << r);
147 LOG4CPLUS_ERROR(m_logger,
"GetDaqStatus() failed with following error(s):\n" << r);
148 BOOST_THROW_EXCEPTION(daqif::DaqException(
id, r.
Str()));
154 LOG4CPLUS_INFO(m_logger, fmt::format(
"GetInternalDaqStatus({}): Handling request",
id));
155 return boost::async(m_executor, [
id,
this]()
mutable -> std::string {
158 nlohmann::json j = status;
160 }
catch (daqif::DaqException
const& e) {
162 LOG4CPLUS_ERROR(m_logger,
163 "GetInternalDaqStatus() failed with following error(s):\n"
168 LOG4CPLUS_ERROR(m_logger,
169 "GetInternalDaqStatus() failed with following error(s):\n"
171 BOOST_THROW_EXCEPTION(daqif::DaqException(
id, r.
Str()));
177 LOG4CPLUS_INFO(m_logger,
"GetActiveDaqs(): Handling request");
178 using V = std::shared_ptr<::daqif::DaqStatus>;
179 using R = std::vector<V>;
180 return boost::async(m_executor, [
this]()
mutable -> R {
184 for (
auto const&
id : ids) {
185 statuses.emplace_back(GetDaqStatusSync(
id));
188 }
catch (daqif::DaqException
const& e) {
190 LOG4CPLUS_ERROR(m_logger,
"GetActiveDaqs() failed with following error(s):\n" << r);
194 LOG4CPLUS_ERROR(m_logger,
"GetActiveDaqs() failed with following error(s):\n" << r);
195 BOOST_THROW_EXCEPTION(daqif::DaqException(
"", r.
Str()));
200void DpmService::InitiateSignalHandler() {
201 m_sigset.async_wait([&](boost::system::error_code
const& ec,
int sig_num) {
205 LOG4CPLUS_INFO(m_logger,
206 fmt::format(
"Application termination requested with signal {}", sig_num));
211void DpmService::HandleExit() {
212 LOG4CPLUS_TRACE(m_logger,
"HandleExit(): Application termination has been requested");
214 if (m_work_guard.owns_work()) {
216 m_work_guard.reset();
221 LOG4CPLUS_INFO(m_logger,
222 "Application termination already requested before. Stopping immediately!");
227std::shared_ptr<daqif::DaqStatus> DpmService::GetDaqStatusSync(
const std::string&
id) {
228 LOG4CPLUS_INFO(m_logger, fmt::format(
"GetDaqStatusSync({}): Handling request",
id));
229 using V = ::daqif::DaqStatus;
232 auto rep = m_mal.createDataEntity<V>();
236 error::NestedExceptionReporter r(std::current_exception());
237 BOOST_THROW_EXCEPTION(daqif::DaqException(
id, r.Str()));
DpmService(rad::IoExecutor &executor, elt::mal::Mal &mal, Workspace &workspace, Scheduler &scheduler)
Construct replier.
boost::future< std::string > GetVersion() override
Cancels any started merges and terminates application.
boost::future< std::shared_ptr<::daqif::StorageStatus > > QueryStorageStatus() override
Cancels any started merges and terminates application.
boost::future< std::string > Exit() override
Cancels any started merges and terminates application.
boost::future< std::vector< std::shared_ptr<::daqif::DaqStatus > > > GetActiveDaqs() override
boost::future< std::shared_ptr<::daqif::DaqReply > > AbortDaq(const std::string &id) override
boost::future< std::string > GetInternalDaqStatus(const std::string &id) override
boost::future< std::shared_ptr<::daqif::DaqStatus > > GetDaqStatus(const std::string &id) override
boost::future< std::shared_ptr<::daqif::DaqReply > > QueueDaq(std::string const &status, std::string const &specification) override
Schedules asynchronous activities that results in merged Data Product and delivery.
virtual void AbortDaq(std::string const &id)=0
Abort merging DAQ identified by id.
virtual std::string QueueDaq(std::string const &dp_spec, std::string const &status)=0
Queues DAQ for processing.
virtual Status GetDaqStatus(std::string const &id) const =0
Queries current DAQ status, possibly from last recorded status in workspace.
virtual std::vector< std::string > GetQueue() const noexcept=0
Queries current DAQ queue.
Interface to interact with DPM workspace.
virtual auto QueryStorageStatus() const -> std::filesystem::space_info=0
Queries available storage for workspace.
Adapter object intended to be used in contexts without direct access to the output-stream object.
std::string Str() const
Convenience function for constructing a std::string from the exception.
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
boost::asio::io_context & get_io_context() noexcept
Not part of the boost::thread::executor concept.
Declares JSON support for serialization.
Contains support functions for daqif.
daq::dpm::DpmService implements MAL services daqif::DpmControl and daqif::DpmDaqControl.
const std::string LOGGER_NAME