ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
manager.hpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup daq_common_libdaq
4 * @copyright 2022 ESO - European Southern Observatory
5 *
6 * @brief Declaration of `daq::Manager`
7 */
8#ifndef OCM_DAQ_MANAGER_HPP_
9#define OCM_DAQ_MANAGER_HPP_
10#include "config.hpp"
11
12#include <chrono>
13#include <cstdint>
14#include <string_view>
15#include <vector>
16
17#include <boost/asio/deadline_timer.hpp>
18#include <boost/thread/future.hpp>
19#include <log4cplus/logger.h>
20#include <rad/ioExecutor.hpp>
21
22#include <daq/daqContext.hpp>
23#include <daq/dpmClient.hpp>
24#include <daq/error.hpp>
25#include <daq/eventLog.hpp>
26#include <daq/fits/keyword.hpp>
27#include <daq/state.hpp>
28#include <daq/status.hpp>
29#include <daq/utility.hpp>
30
31namespace daq {
32
33class Workspace;
34
35/**
36 * Exception indicating the DAQ id was invalid.
37 */
38struct InvalidDaqId : std::invalid_argument {
39 explicit InvalidDaqId(std::string_view id);
40 explicit InvalidDaqId(std::string_view id, std::string_view reason);
41};
42
43/**
44 * Configurations parameters directly related to manager.
45 */
47 /**
48 * Instrument identifier.
49 */
50 std::string instrument_id;
51 std::string origin = "ESO-PARANAL";
52
53 /**
54 * Age of DAQ in acquiring state after which it is automatically considered abandoned and will
55 * be archived without further action at startup.
56 */
57 std::chrono::hours acquiring_stale_age = std::chrono::hours(14);
58
59 /**
60 * Age of DAQ in merging state, after which it is automatically considered abandoned and will be
61 * archived without further action at startup.
62 */
63 std::chrono::hours merging_stale_age = std::chrono::hours(2 * 24);
64};
65
66/**
67 * Creates a DAQ id candidate that may or may not be unique.
68 *
69 * @param instrument_id The instrument ID to use for the id. Only the 5 first characters will be
70 * used if the name is longer than 5.
71 * @param jitter Jitter is a millisecond component added to the current time. This is meant to be
72 * used to find a unique id by adding jitter until a unique id is found.
73 *
74 * @note The function cannot guarantee unique ID and is the responsibility of the caller.
75 * @ingroup daq_common_libdaq
76 */
77std::string MakeIdCandidate(char const* instrument_id,
78 unsigned jitter = 0,
79 std::chrono::system_clock::time_point* out = nullptr);
80
81class DaqController;
83class FitsController;
84
85/**
86 * Observes any status.
87 */
89public:
90 using SignalType = boost::signals2::signal<void(ObservableStatus const&)>;
91
92 template <class Observer>
93 boost::signals2::connection ConnectObserver(Observer o) {
94 return m_signal.connect(std::move(o));
95 }
96
97 void Signal(ObservableStatus const& status) {
98 m_signal(status);
99 }
100
101private:
102 SignalType m_signal;
103};
104
105/**
106 * Manager owns DaqController and FitsController (active data acquisitions) instances and
107 * multiplexes requests to them.
108 *
109 *
110 * Important responsibilities:
111 *
112 * - Provide interface that allows multiplexing of multiple DaqControllers and FitsControllers
113 * (identified by unique id). This interface is close to what the MAL ICD interface looks like to
114 * control DaqControllers and FitsControllers.
115 * - Creation of `DaqController` instance as part of a new data acquisition (using factory)
116 * - Replace `DaqController` with another implementation to perform the merging (triggered when DAQ
117 * reaches Stopped state).
118 * - Monitor primary data sources to stop DaqController if a primary source completes (TODO).
119 * - (TBD): On startup query previous execution for incomplete DAQ (only consider DAQs with a
120 * maximum age to discard abandoned DAQs)
121 * - Load from persistant storage.
122 * - Query DPM for in-progress merges.
123 *
124 * FitsController also act as a data source.
125 *
126 * Out of scope:
127 *
128 * - Creation of `FitsController` instances.
129 *
130 * @note: It's a template only to facilitate mocking of DaqController, without requiring it to be
131 * an interface.
132 *
133 * @ingroup daq_common_libdaq
134 */
135class Manager {
136public:
137 virtual ~Manager() {
138 }
139 using Signal = boost::signals2::signal<void(ObservableStatus const&)>;
140
141 /**
142 * Restore from state stored in workspace.
143 *
144 * This should typically only be done after construction and not when running.
145 */
146 virtual void RestoreFromWorkspace() = 0;
147
148 /**
149 * Creates a new unique identifier based on the instrument id and current time.
150 *
151 * If there is a id collision when using current time a millisecond jitter component
152 * is added until a unique id is found.
153 *
154 * The format is the same as ARCFILE - the file extension: "<OLAS_ID>-2020-08-19T09:33:11.951"
155 *
156 * @note OLAS_ID is the first 5 characters of instrument id.
157 * @note The returned ID is guaranteed to be unique for all IDs known by Manager (there is
158 * currently no global repository of previous IDs) and is suitable for both DAQ id and OLAS
159 * FileId.
160 *
161 * @param [out] time Optional time used to produce the ID.
162 * @throws std::system_error on failure.
163 */
164 virtual std::string MakeDaqId(std::chrono::system_clock::time_point* time = nullptr) const = 0;
165
166 /**
167 * Query existing data acquisition by @c id and optional @c file_id.
168 *
169 * @param id DAQ id to look up.
170 * @param file_id Optional file_id to look up.
171 *
172 * @returns true if there is a already a DAQ with the same ID (or same file_id if argument is
173 * provided).
174 * @returns false otherwise.
175 */
176 virtual bool HaveDaq(std::string_view id, std::string_view file_id = {}) const DAQ_NOEXCEPT = 0;
177
178 /**
179 * Get status
180 *
181 * @throw std::invalid_argument if no data acquisition exist with provided `id`.
182 */
183 virtual Status GetStatus(std::string_view id) const = 0;
184
185 /**
186 * Start DaqController identified by `id`.
187 *
188 * @param id Data acquisition id.
189 *
190 * @returns Exceptional future containing std::invalid_argument, if data acquisition with `id`
191 * does not exist.
192 * @returns Future that will eventually be ready when data acquisition has started, or failed to
193 * start.
194 */
195 virtual boost::future<State> StartDaqAsync(DaqContext ctx) = 0;
196
197 /**
198 * Stop DaqController identified by `id`.
199 *
200 * @param id Data acquisition id.
201 * @param policy Error policy determining if errors are tolerated or not.
202 *
203 * @returns Exceptional future containing std::invalid_argument, if data acquisition with `id`
204 * does not exist.
205 * @returns Future that will eventually be ready when data acquisition has stopped, or failed to
206 * stop.
207 */
208 virtual boost::future<Status> StopDaqAsync(std::string_view id, ErrorPolicy policy) = 0;
209
210 /**
211 * Abort DaqController identified by `id`.
212 *
213 * @param id Data acquisition id.
214 * @param policy Error policy determining if errors are tolerated or not.
215 *
216 * @returns Exceptional future containing std::invalid_argument, if data acquisition with `id`
217 * does not exist.
218 * @returns Future that will eventually be ready when data acquisition has aborted, or failed to
219 * abort.
220 */
221 virtual boost::future<Status> AbortDaqAsync(std::string_view id, ErrorPolicy policy) = 0;
222
223 /**
224 * Await DAQ state
225 *
226 * @param id Data acquisition id.
227 * @param state target state to await.
228 * @param timeout How long to wait for state to be reached.
229 * @returns Exceptional future containing std::invalid_argument, if data acquisition with `id`
230 * does not exist or timeout is negative.
231 * @returns Future with a value set when condition is fulfilled or times out.
232 */
233 virtual boost::future<Result<Status>>
234 AwaitDaqStateAsync(std::string_view id, State state, std::chrono::milliseconds timeout) = 0;
235 /**
236 * Update FITS keywords for DaqController identified by `id`.
237 * @param id Data acquisition id.
238 * @param keywords FITS keywords to update.
239 *
240 * @throw std::invalid_argument if no data acquisition exist with provided `id`.
241 * @throw std::runtime_error If DaqController state does not allow updating of keywords (it's
242 * e.g. already been submitted for merging).
243 */
244 virtual void UpdateKeywords(std::string_view id, fits::KeywordVector const& keywords) = 0;
245
246 /**
247 * @returns status observer object.
248 *
249 * The status observer observes any DAQ status changes.
250 */
252
253 /**
254 * @returns current Daq Controllers.
255 */
256 virtual std::vector<std::shared_ptr<DaqController const>> GetDaqControllers() = 0;
257};
258
259/**
260 * Implements `daq::Manager`.
261 *
262 * @ingroup daq_common_libdaq
263 */
264class ManagerImpl : public Manager {
265public:
266 /**
267 * @param instrument_id Instrument id.
268 */
269 explicit ManagerImpl(rad::IoExecutor& executor,
270 ManagerParams params,
271 Workspace& workspace,
272 fits::KeywordFormatter const& formatter,
273 std::shared_ptr<ObservableEventLog> event_log,
274 DaqControllerFactory& daq_factory,
275 std::shared_ptr<DpmClient> dpm_client,
276 log4cplus::Logger const& logger);
277 ~ManagerImpl() noexcept;
278 /**
279 * Loads status and constructs DaqControllers corresponding to stored state.
280 */
281 void RestoreFromWorkspace() override;
282
283 std::string MakeDaqId(std::chrono::system_clock::time_point* time = nullptr) const override;
284
285 bool HaveDaq(std::string_view id, std::string_view file_id = {}) const noexcept override;
286
287 Status GetStatus(std::string_view id) const override;
288
289 boost::future<State> StartDaqAsync(DaqContext ctx) override;
290
291 boost::future<Status> StopDaqAsync(std::string_view id, ErrorPolicy policy) override;
292
293 boost::future<Status> AbortDaqAsync(std::string_view id, ErrorPolicy policy) override;
294
295 boost::future<Result<Status>>
296 AwaitDaqStateAsync(std::string_view id, State, std::chrono::milliseconds timeout) override;
297
298 void UpdateKeywords(std::string_view id, fits::KeywordVector const& keywords) override;
299
300 StatusSignal& GetStatusSignal() override;
301
302 std::vector<std::shared_ptr<DaqController const>> GetDaqControllers() override;
303
304private:
305 struct Daq {
306 Daq(std::string id_arg,
307 std::shared_ptr<DaqController> controller_arg,
308 boost::signals2::connection conn_status_arg,
309 boost::signals2::connection conn_context_arg) noexcept;
310
311 std::string id;
312 std::shared_ptr<DaqController> controller;
313 // connection for observer connected to controller.
314 boost::signals2::scoped_connection conn_status;
315 boost::signals2::scoped_connection conn_context;
316 };
317
318 /// Adds unique identifier (unrelated to DAQ id) to op abort functions
319 class OpAbortFunc {
320 public:
321 using Func = std::function<bool()>;
322 OpAbortFunc(Func&& f);
323 OpAbortFunc(OpAbortFunc&&) = default;
324 OpAbortFunc& operator=(OpAbortFunc&&) = default;
325
326 std::uint64_t GetId() const noexcept;
327 bool Abort() noexcept;
328
329 private:
330 static std::uint64_t NextId();
331 std::uint64_t m_id;
332 std::function<bool()> m_func;
333 };
334
335 enum class Store { Yes, No };
336 /**
337 * Adds initial keywords that can be provided by manager.
338 *
339 * - `ORIGIN`
340 * - `INSTRUME`
341 * - `ARCFILE`
342 */
343 void AddInitialKeywords(DaqContext& ctx);
344 void FormatKeywordSources(DaqContext& ctx);
345
346 /**
347 * Adds DaqController to active set and starts monitoring status changes.
348 *
349 * @param daq Data acquisition to add.
350 * @param store Determines if workspace should be updated or not (normally yes but if DAQ is
351 * added from workspace it does not have to be updated immediately again).
352 */
353 void AddDaq(std::shared_ptr<DaqController> const& daq, Store store = Store::Yes);
354
355 /** @name State persistence functions */
356 /// @{
357 /**
358 * Remove daq
359 * @param id DAQ to remove.
360 */
361 void RemoveDaq(std::string_view id);
362 void ArchiveDaq(std::string const& id);
363 /**
364 * Store list of active daqs in workspace.
365 */
366 void StoreActiveDaqs() const;
367 /// @}
368
369 void RemoveAbortFunc(std::uint64_t id) noexcept;
370 DaqController const* FindDaq(std::string_view id) const noexcept;
371 DaqController* FindDaq(std::string_view id) noexcept;
372 DaqController& FindDaqOrThrow(std::string_view id);
373 DaqController const& FindDaqOrThrow(std::string_view id) const;
374 /**
375 * Invoked by Manager when a DAQ has entered Stopped state and DaqController implementation is
376 * changed to the DPM phase version.
377 */
378 void MoveToMergePhase(std::string_view id);
379
380 /**
381 * Iterates data acquisitions and for those in State::NotStarted
382 *
383 * It sends pending requests and returns immediately. Upon failing the completion handler will
384 * schedule a 60s deadline timer to retry.
385 */
386 void ScheduleDaqsAsync();
387
388 std::shared_ptr<bool> m_alive_token;
389 rad::IoExecutor& m_executor;
390 ManagerParams m_params;
391 Workspace& m_workspace;
392 fits::KeywordFormatter const& m_kw_formatter;
393 std::shared_ptr<ObservableEventLog> m_event_log;
394 DaqControllerFactory& m_daq_factory;
395 std::shared_ptr<DpmClient> m_dpm_client;
396
397 StatusSignal m_status_signal;
398 /// Data acquisitions
399 std::vector<Daq> m_daq_controllers;
400
401 /// Pair of DAQ-id and abort function
402 std::vector<OpAbortFunc> m_abort_funcs;
403 log4cplus::Logger m_logger;
404
405 // Used by ScheduleDaqsAsync to retry schedule merges when DPM is offline
406 std::optional<boost::asio::deadline_timer> m_schedule_retry;
407};
408
409} // namespace daq
410#endif // #ifndef OCM_DAQ_MANAGER_HPP_
Abstract factory for DaqControllers.
Controls the execution of single data acquisition that ultimately result in a set of FITS keywords an...
Implements daq::Manager.
Definition: manager.hpp:264
void RestoreFromWorkspace() override
Loads status and constructs DaqControllers corresponding to stored state.
Definition: manager.cpp:139
Status GetStatus(std::string_view id) const override
Get status.
Definition: manager.cpp:424
boost::future< Result< Status > > AwaitDaqStateAsync(std::string_view id, State, std::chrono::milliseconds timeout) override
Await DAQ state.
Definition: manager.cpp:554
bool HaveDaq(std::string_view id, std::string_view file_id={}) const noexcept override
Query existing data acquisition by id and optional file_id.
Definition: manager.cpp:223
~ManagerImpl() noexcept
Definition: manager.cpp:124
std::string MakeDaqId(std::chrono::system_clock::time_point *time=nullptr) const override
Creates a new unique identifier based on the instrument id and current time.
Definition: manager.cpp:214
void UpdateKeywords(std::string_view id, fits::KeywordVector const &keywords) override
Update FITS keywords for DaqController identified by id.
Definition: manager.cpp:611
boost::future< State > StartDaqAsync(DaqContext ctx) override
Start DaqController identified by id.
Definition: manager.cpp:498
boost::future< Status > StopDaqAsync(std::string_view id, ErrorPolicy policy) override
Stop DaqController identified by id.
Definition: manager.cpp:538
boost::future< Status > AbortDaqAsync(std::string_view id, ErrorPolicy policy) override
Abort DaqController identified by id.
Definition: manager.cpp:546
StatusSignal & GetStatusSignal() override
Definition: manager.cpp:615
std::vector< std::shared_ptr< DaqController const > > GetDaqControllers() override
Definition: manager.cpp:619
Manager owns DaqController and FitsController (active data acquisitions) instances and multiplexes re...
Definition: manager.hpp:135
boost::signals2::signal< void(ObservableStatus const &)> Signal
Definition: manager.hpp:139
virtual boost::future< Result< Status > > AwaitDaqStateAsync(std::string_view id, State state, std::chrono::milliseconds timeout)=0
Await DAQ state.
virtual boost::future< State > StartDaqAsync(DaqContext ctx)=0
Start DaqController identified by id.
virtual Status GetStatus(std::string_view id) const =0
Get status.
virtual boost::future< Status > AbortDaqAsync(std::string_view id, ErrorPolicy policy)=0
Abort DaqController identified by id.
virtual void UpdateKeywords(std::string_view id, fits::KeywordVector const &keywords)=0
Update FITS keywords for DaqController identified by id.
virtual boost::future< Status > StopDaqAsync(std::string_view id, ErrorPolicy policy)=0
Stop DaqController identified by id.
virtual std::string MakeDaqId(std::chrono::system_clock::time_point *time=nullptr) const =0
Creates a new unique identifier based on the instrument id and current time.
virtual void RestoreFromWorkspace()=0
Restore from state stored in workspace.
virtual ~Manager()
Definition: manager.hpp:137
virtual std::vector< std::shared_ptr< DaqController const > > GetDaqControllers()=0
virtual bool HaveDaq(std::string_view id, std::string_view file_id={}) const DAQ_NOEXCEPT=0
Query existing data acquisition by id and optional file_id.
virtual StatusSignal & GetStatusSignal()=0
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:224
Observes any status.
Definition: manager.hpp:88
boost::signals2::connection ConnectObserver(Observer o)
Definition: manager.hpp:93
boost::signals2::signal< void(ObservableStatus const &)> SignalType
Definition: manager.hpp:90
void Signal(ObservableStatus const &status)
Definition: manager.hpp:97
Interface to interact with DPM workspace.
Definition: workspace.hpp:32
Formats keyword against e.g.
Definition: keyword.hpp:551
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
#define DAQ_NOEXCEPT
Definition: config.hpp:16
Contains declaration of daq::Context.
daq::DpmClient
Contains error related declarations for DAQ.
Contains declaration for EventLog, ObservableEventLog and related events.
Contains data structure for FITS keywords.
Declares daq::State and related functions.
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
Definition: keyword.hpp:423
std::string origin
Definition: manager.hpp:51
std::string MakeIdCandidate(char const *instrument_id, unsigned jitter=0, std::chrono::system_clock::time_point *out=nullptr)
Creates a DAQ id candidate that may or may not be unique.
Definition: manager.cpp:56
std::chrono::hours merging_stale_age
Age of DAQ in merging state, after which it is automatically considered abandoned and will be archive...
Definition: manager.hpp:63
ErrorPolicy
Error policy supported by certain operations.
Definition: error.hpp:26
std::string instrument_id
Instrument identifier.
Definition: manager.hpp:50
State
Observable states of the data acquisition process.
Definition: state.hpp:41
std::chrono::hours acquiring_stale_age
Age of DAQ in acquiring state after which it is automatically considered abandoned and will be archiv...
Definition: manager.hpp:57
Configurations parameters directly related to manager.
Definition: manager.hpp:46
Config class header file.
Contains declaration for Status and ObservableStatus.
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:42
Exception indicating the DAQ id was invalid.
Definition: manager.hpp:38
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:164
Declaration of utilities.