ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
ocmDaqService.hpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup daq_ocm
4 * @copyright 2022 ESO - European Southern Observatory
5 *
6 * @brief Declaration of OcmDaqService
7 */
8#ifndef DAQ_OCM_OCM_DAQ_SERVICE_HPP_
9#define DAQ_OCM_OCM_DAQ_SERVICE_HPP_
10#include <daq/config.hpp>
11#include <rad/ioExecutor.hpp>
12
13#include <Daqif.hpp>
14
15#include <boost/asio/io_context.hpp>
16#include <daq/daqContext.hpp>
17#include <daq/eventLog.hpp>
19#include <daq/manager.hpp>
20#include <log4cplus/logger.h>
21#include <mal/Mal.hpp>
22
23namespace mal = ::elt::mal;
24
26 ParsedSource(std::string name, std::string rr_uri);
27 ParsedSource() = default;
28 ParsedSource(ParsedSource const&) = default;
30 ParsedSource& operator=(ParsedSource const& rhs) = default;
32
33 bool operator==(ParsedSource const& rhs) const;
34 std::string name;
35 std::string rr_uri;
36};
37
38std::ostream& operator<<(std::ostream& os, ParsedSource const& s);
39
40/**
41 * Parse user provided string in the format
42 * "<name>@<rr-uri>"
43 *
44 * @throw std::invalid_argument on errors.
45 */
46ParsedSource ParseSourceUri(std::string_view s);
47
48/**
49 * Parse user provided string in the format
50 * "<name>@<rr-uri>[ <name>@...]"
51 *
52 * @throw std::invalid_argument on errors.
53 */
54std::vector<ParsedSource> ParseSourceUris(std::string_view s);
55
56/**
57 * Parse the JSON properties user provides with StartDaq
58 *
59 * {"keywords": KEYWORDS,
60 * "awaitInterval": DURATION}
61 *
62 * @throws nlohmann::json::exception on parsing errors.
63 * @throws std::invalid_argument on argument errors.
64 */
65daq::DaqContext ParseStartDaqContext(std::string const& properties);
66
67/**
68 * Parse JSON specification and returns corresponding DaqContext.
69 *
70 * @throws daq::json::StartDaqV2SpecError On schema error
71 * @throws daq::json::SchemaError On schema error
72 * @throws nlohmann::json::exception on parsing error
73 * @throws std::invalid_argument on argument errors.
74 * @throws std::exception-derived exception on other errors.
75 */
76daq::DaqContext ParseStartDaqV2(std::string const& specification);
77
78/**
79 * Implements the MAL interface daqif::OcmDaq (async version).
80 *
81 * The server is only safe to use as a shared pointer, as weak pointers to this object is
82 * stored in asynchronous continuations, apart from the initiating operation.
83 * Provided boost::asio::io_context must outlive object and any pending operations.
84 *
85 * This object can be deleted while there are operations that have not complete yet.
86 * In that case, when the operation finally completes, an exception is sent as reply, even if
87 * the requested operation would have succeeded otherwise. The io_context must stay alive however,
88 * as it will be used as an executor when asynchonous operation is continued.
89 *
90 * A note on thread safety:
91 *
92 * The public (asynchronous) interface will immediately defer the execution to the provided
93 * executor, so depending on the executor this can provide thread safety.
94 *
95 *
96 * @ingroup daq_ocm_server
97 */
98class OcmDaqService : public daqif::AsyncOcmDaqControl,
99 public std::enable_shared_from_this<OcmDaqService> {
100public:
101 static constexpr char const* LOGGER_NAME = "ocm.service.daq";
102
103 /**
104 *
105 * @param io_ctx ASIO io context to use as an executor. This object must outlive OcmDaqService.
106 * The io context will be progated to any created `daq::DaqController` instances as well.
107 * @param mal The mal instance to use to create instances. This object must outlive
108 * OcmDaqService.
109 * @param mgr The data acquisition manager to use. This object must outlive OcmDaqService.
110 */
111 OcmDaqService(boost::asio::io_context& io_ctx,
112 mal::Mal& mal,
113 daq::Manager& mgr,
114 std::string proc_name,
115 std::string output_path,
116 std::shared_ptr<daq::ObservableEventLog> event_log);
118
119 boost::future<std::shared_ptr<::daqif::DaqReply>>
120 StartDaq(const std::string& id,
121 const std::string& file_prefix,
122 const std::string& primary_sources,
123 const std::string& metadata_sources,
124 const std::string& properties) override;
125 boost::future<std::shared_ptr<::daqif::DaqReply>>
126 StartDaqV2(const std::string& specification) override;
127
128 boost::future<std::shared_ptr<::daqif::DaqReply>> StopDaq(const std::string& id) override;
129
130 boost::future<std::shared_ptr<::daqif::DaqReply>> ForceStopDaq(const std::string& id) override;
131
132 boost::future<std::shared_ptr<::daqif::DaqReply>> AbortDaq(const std::string& id) override;
133
134 boost::future<std::shared_ptr<::daqif::DaqReply>> ForceAbortDaq(const std::string& id) override;
135
136 boost::future<std::shared_ptr<::daqif::DaqReply>>
137 UpdateKeywords(const std::string& id, const std::string& keywords) override;
138
139 boost::future<std::shared_ptr<::daqif::AwaitDaqReply>>
140 AwaitDaqState(const std::string& id,
141 daqif::DaqState state,
142 daqif::DaqSubState substate,
143 double timeout) override;
144
145 boost::future<std::shared_ptr<::daqif::DaqStatus>> GetStatus(const std::string& id) override;
146
147 boost::future<std::vector<std::shared_ptr<::daqif::DaqStatus>>> GetActiveList() override;
148
149private:
150 std::string MakeExceptionMessageWithStatus(std::string const& id,
151 std::exception_ptr const& exception) const;
152
153 /**
154 * Common Start function for StartDaq and StartDaqV2.
155 *
156 * @param context DaqContext describing DAQ to start.
157 * @param function Pointer to static string.
158 */
159 boost::future<std::shared_ptr<::daqif::DaqReply>>
160 StartDaq(daq::DaqContext const& context, char const* function);
161 boost::future<std::shared_ptr<::daqif::DaqReply>> StopDaq(const std::string& id, bool forced);
162 boost::future<std::shared_ptr<::daqif::DaqReply>> AbortDaq(const std::string& id, bool forced);
163
164 /**
165 * @throws std::invalid_argument on validation errors.
166 */
167 void UpdateFrom(daq::DaqContext& context, daq::json::StartDaqV2Spec const& spec);
168
169 boost::asio::io_context& m_io_ctx;
170 rad::IoExecutor m_executor;
171 mal::Mal& m_mal;
172 daq::Manager& m_mgr;
173 /**
174 * Process name
175 */
176 std::string m_proc_name;
177
178 /**
179 * Directory in which FITS files are written
180 */
181 std::string m_output_path;
182
183 std::shared_ptr<daq::ObservableEventLog> m_event_log;
184 boost::signals2::connection m_log_observer_connection;
185 /**
186 * Observer for m_event_log.
187 */
188 daq::EventLogObserverLogger m_log_observer;
189
190 log4cplus::Logger m_logger;
191};
192
193#endif // #ifndef DAQ_OCM_OCM_DAQ_SERVICE_HPP_
A simple daq::ObservableEventLog observer that logs observed events to provided logger.
Manager owns DaqController and FitsController (active data acquisitions) instances and multiplexes re...
Definition: manager.hpp:135
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
Contains declaration of daq::Context.
Contains declaration for EventLogObserverLogger.
Contains declaration for EventLog, ObservableEventLog and related events.
boost::future< std::vector< std::shared_ptr<::daqif::DaqStatus > > > GetActiveList() override
boost::future< std::shared_ptr<::daqif::DaqReply > > StopDaq(const std::string &id) override
boost::future< std::shared_ptr<::daqif::AwaitDaqReply > > AwaitDaqState(const std::string &id, daqif::DaqState state, daqif::DaqSubState substate, double timeout) override
boost::future< std::shared_ptr<::daqif::DaqReply > > AbortDaq(const std::string &id) override
boost::future< std::shared_ptr<::daqif::DaqReply > > StartDaqV2(const std::string &specification) override
boost::future< std::shared_ptr<::daqif::DaqReply > > UpdateKeywords(const std::string &id, const std::string &keywords) override
boost::future< std::shared_ptr<::daqif::DaqReply > > StartDaq(const std::string &id, const std::string &file_prefix, const std::string &primary_sources, const std::string &metadata_sources, const std::string &properties) override
boost::future< std::shared_ptr<::daqif::DaqReply > > ForceStopDaq(const std::string &id) override
boost::future< std::shared_ptr<::daqif::DaqStatus > > GetStatus(const std::string &id) override
static constexpr char const * LOGGER_NAME
boost::future< std::shared_ptr<::daqif::DaqReply > > ForceAbortDaq(const std::string &id) override
Implements the MAL interface daqif::OcmDaq (async version).
Declaration of daq::Manager
Structure with a close mapping from JSON representation in the StartDaqV2 MAL request.
Definition: startDaqV2.hpp:33
daq::DaqContext ParseStartDaqContext(std::string const &properties)
Parse the JSON properties user provides with StartDaq.
std::ostream & operator<<(std::ostream &os, ParsedSource const &s)
ParsedSource ParseSourceUri(std::string_view s)
Parse user provided string in the format "<name>@<rr-uri>".
daq::DaqContext ParseStartDaqV2(std::string const &specification)
Parse JSON specification and returns corresponding DaqContext.
std::vector< ParsedSource > ParseSourceUris(std::string_view s)
Parse user provided string in the format "<name>@<rr-uri>[ <name>@...]".
bool operator==(ParsedSource const &rhs) const
std::string name
ParsedSource()=default
ParsedSource(ParsedSource &&)=default
ParsedSource(ParsedSource const &)=default
ParsedSource & operator=(ParsedSource &&rhs)=default
std::string rr_uri
ParsedSource & operator=(ParsedSource const &rhs)=default
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:42