ifw-daq  3.0.1
IFW Data Acquisition modules
manager.hpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_common_libdaq
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief Declaration of `daq::Manager`
7  */
8 #ifndef OCM_DAQ_MANAGER_HPP_
9 #define OCM_DAQ_MANAGER_HPP_
10 #include "config.hpp"
11 
12 #include <chrono>
13 #include <string_view>
14 #include <vector>
15 
16 #include <boost/asio/deadline_timer.hpp>
17 #include <boost/thread/future.hpp>
18 #include <log4cplus/logger.h>
19 #include <rad/ioExecutor.hpp>
20 
21 #include <daq/daqContext.hpp>
22 #include <daq/dpmClient.hpp>
23 #include <daq/error.hpp>
24 #include <daq/eventLog.hpp>
25 #include <daq/state.hpp>
26 #include <daq/status.hpp>
27 #include <daq/utility.hpp>
28 
29 namespace daq {
30 
31 class Workspace;
32 
33 /**
34  * Configurations parameters directly related to manager.
35  */
36 struct ManagerParams {
37  /**
38  * Instrument identifier.
39  */
40  std::string instrument_id;
41  std::string origin = "ESO-PARANAL";
42 
43  /**
44  * Age of DAQ in acquiring state after which it is automatically considered abandoned and will
45  * be archived without further action at startup.
46  */
47  std::chrono::hours acquiring_stale_age = std::chrono::hours(14);
48 
49  /**
50  * Age of DAQ in merging state, after which it is automatically considered abandoned and will be
51  * archived without further action at startup.
52  */
53  std::chrono::hours merging_stale_age = std::chrono::hours(2 * 24);
54 };
55 
56 /**
57  * Creates a DAQ id candidate that may or may not be unique.
58  *
59  * @param instrument_id The instrument ID to use for the id. Only the 5 first characters will be
60  * used if the name is longer than 5.
61  * @param jitter Jitter is a millisecond component added to the current time. This is meant to be
62  * used to find a unique id by adding jitter until a unique id is found.
63  *
64  * @note The function cannot guarantee unique ID and is the responsibility of the caller.
65  * @ingroup daq_common_libdaq
66  */
67 std::string MakeIdCandidate(char const* instrument_id,
68  unsigned jitter = 0,
69  std::chrono::system_clock::time_point* out = nullptr);
70 
71 class DaqController;
73 class FitsController;
74 
75 /**
76  * Observes any status.
77  */
78 class StatusSignal {
79 public:
80  using SignalType = boost::signals2::signal<void(ObservableStatus const&)>;
81 
82  template <class Observer>
83  boost::signals2::connection ConnectObserver(Observer o) {
84  return m_signal.connect(std::move(o));
85  }
86 
87  void Signal(ObservableStatus const& status) {
88  m_signal(status);
89  }
90 
91 private:
92  SignalType m_signal;
93 };
94 
95 /**
96  * Manager owns DaqController and FitsController (active data acquisitions) instances and
97  * multiplexes requests to them.
98  *
99  *
100  * Important responsibilities:
101  *
102  * - Provide interface that allows multiplexing of multiple DaqControllers and FitsControllers
103  * (identified by unique id). This interface is close to what the MAL ICD interface looks like to
104  * control DaqControllers and FitsControllers.
105  * - Creation of `DaqController` instance as part of a new data acquisition (using factory)
106  * - Replace `DaqController` with another implementation to perform the merging (triggered when DAQ
107  * reaches Stopped state).
108  * - Monitor primary data sources to stop DaqController if a primary source completes (TODO).
109  * - (TBD): On startup query previous execution for incomplete DAQ (only consider DAQs with a
110  * maximum age to discard abandoned DAQs)
111  * - Load from persistant storage.
112  * - Query DPM for in-progress merges.
113  *
114  * FitsController also act as a data source.
115  *
116  * Out of scope:
117  *
118  * - Creation of `FitsController` instances.
119  *
120  * @note: It's a template only to facilitate mocking of DaqController, without requiring it to be
121  * an interface.
122  *
123  * @ingroup daq_common_libdaq
124  */
125 class Manager {
126 public:
127  virtual ~Manager() {
128  }
129  using Signal = boost::signals2::signal<void(ObservableStatus const&)>;
130 
131  /**
132  * Restore from state stored in workspace.
133  *
134  * This should typically only be done after construction and not when running.
135  */
136  virtual void RestoreFromWorkspace() = 0;
137 
138  /**
139  * Creates a new unique identifier based on the instrument id and current time.
140  *
141  * If there is a id collision when using current time a millisecond jitter component
142  * is added until a unique id is found.
143  *
144  * The format is the same as ARCFILE - the file extension: "<OLAS_ID>-2020-08-19T09:33:11.951"
145  *
146  * @note OLAS_ID is the first 5 characters of instrument id.
147  * @note The returned ID is guaranteed to be unique for all IDs known by Manager (there is
148  * currently no global repository of previous IDs) and is suitable for both DAQ id and OLAS
149  * FileId.
150  *
151  * @param [out] time Optional time used to produce the ID.
152  * @throws std::system_error on failure.
153  */
154  virtual std::string MakeDaqId(std::chrono::system_clock::time_point* time = nullptr) const = 0;
155 
156  /**
157  * Query existing data acquisition by @c id and optional @c file_id.
158  *
159  * @param id DAQ id to look up.
160  * @param file_id Optional file_id to look up.
161  *
162  * @returns true if there is a already a DAQ with the same ID (or same file_id if argument is
163  * provided).
164  * @returns false otherwise.
165  */
166  virtual bool HaveDaq(std::string_view id, std::string_view file_id = {}) const DAQ_NOEXCEPT = 0;
167 
168  /**
169  * Get status
170  *
171  * @throw std::invalid_argument if no data acquisition exist with provided `id`.
172  */
173  virtual Status GetStatus(std::string_view id) const = 0;
174 
175  /**
176  * Start DaqController identified by `id`.
177  *
178  * @param id Data acquisition id.
179  *
180  * @returns Exceptional future containing std::invalid_argument, if data acquisition with `id`
181  * does not exist.
182  * @returns Future that will eventually be ready when data acquisition has started, or failed to
183  * start.
184  */
185  virtual boost::future<State> StartDaqAsync(DaqContext ctx) = 0;
186 
187  /**
188  * Stop DaqController identified by `id`.
189  *
190  * @param id Data acquisition id.
191  * @param policy Error policy determining if errors are tolerated or not.
192  *
193  * @returns Exceptional future containing std::invalid_argument, if data acquisition with `id`
194  * does not exist.
195  * @returns Future that will eventually be ready when data acquisition has stopped, or failed to
196  * stop.
197  */
198  virtual boost::future<Status> StopDaqAsync(std::string_view id, ErrorPolicy policy) = 0;
199 
200  /**
201  * Abort DaqController identified by `id`.
202  *
203  * @param id Data acquisition id.
204  * @param policy Error policy determining if errors are tolerated or not.
205  *
206  * @returns Exceptional future containing std::invalid_argument, if data acquisition with `id`
207  * does not exist.
208  * @returns Future that will eventually be ready when data acquisition has aborted, or failed to
209  * abort.
210  */
211  virtual boost::future<Status> AbortDaqAsync(std::string_view id, ErrorPolicy policy) = 0;
212 
213  /**
214  * Await DAQ state
215  *
216  * @param id Data acquisition id.
217  * @param state target state to await.
218  * @param timeout How long to wait for state to be reached.
219  * @returns Exceptional future containing std::invalid_argument, if data acquisition with `id`
220  * does not exist or timeout is negative.
221  * @returns Future with a value set when condition is fulfilled or times out.
222  */
223  virtual boost::future<Result<Status>>
224  AwaitDaqStateAsync(std::string_view id, State state, std::chrono::milliseconds timeout) = 0;
225  /**
226  * Update FITS keywords for DaqController identified by `id`.
227  * @param id Data acquisition id.
228  * @param keywords FITS keywords to update.
229  *
230  * @throw std::invalid_argument if no data acquisition exist with provided `id`.
231  * @throw std::runtime_error If DaqController state does not allow updating of keywords (it's
232  * e.g. already been submitted for merging).
233  */
234  virtual void UpdateKeywords(std::string_view id, fits::KeywordVector const& keywords) = 0;
235 
236  /**
237  * @returns status observer object.
238  *
239  * The status observer observes any DAQ status changes.
240  */
242 
243  /**
244  * @returns current Daq Controllers.
245  */
246  virtual std::vector<std::shared_ptr<DaqController const>> GetDaqControllers() = 0;
247 };
248 
249 /**
250  * Implements `daq::Manager`.
251  *
252  * @ingroup daq_common_libdaq
253  */
254 class ManagerImpl : public Manager {
255 public:
256  /**
257  * @param instrument_id Instrument id.
258  */
259  explicit ManagerImpl(rad::IoExecutor& executor,
260  ManagerParams params,
261  Workspace& workspace,
262  std::shared_ptr<ObservableEventLog> event_log,
263  DaqControllerFactory& daq_factory,
264  std::shared_ptr<DpmClient> dpm_client,
265  log4cplus::Logger const& logger);
266  ~ManagerImpl() noexcept;
267  /**
268  * Loads status and constructs DaqControllers corresponding to stored state.
269  */
270  void RestoreFromWorkspace() override;
271 
272  std::string MakeDaqId(std::chrono::system_clock::time_point* time = nullptr) const override;
273 
274  bool HaveDaq(std::string_view id, std::string_view file_id = {}) const noexcept override;
275 
276  Status GetStatus(std::string_view id) const override;
277 
278  boost::future<State> StartDaqAsync(DaqContext ctx) override;
279 
280  boost::future<Status> StopDaqAsync(std::string_view id, ErrorPolicy policy) override;
281 
282  boost::future<Status> AbortDaqAsync(std::string_view id, ErrorPolicy policy) override;
283 
284  boost::future<Result<Status>>
285  AwaitDaqStateAsync(std::string_view id, State, std::chrono::milliseconds timeout) override;
286 
287  void UpdateKeywords(std::string_view id, fits::KeywordVector const& keywords) override;
288 
289  StatusSignal& GetStatusSignal() override;
290 
291  std::vector<std::shared_ptr<DaqController const>> GetDaqControllers() override;
292 
293 private:
294  struct Daq {
295  Daq(std::string id_arg,
296  std::shared_ptr<DaqController> controller_arg,
297  boost::signals2::connection conn_status_arg,
298  boost::signals2::connection conn_context_arg) noexcept;
299 
300  std::string id;
301  std::shared_ptr<DaqController> controller;
302  // connection for observer connected to controller.
303  boost::signals2::scoped_connection conn_status;
304  boost::signals2::scoped_connection conn_context;
305  };
306 
307  /// Adds unique identifier (unrelated to DAQ id) to op abort functions
308  class OpAbortFunc {
309  public:
310  using Func = std::function<bool()>;
311  OpAbortFunc(Func&& f);
312  OpAbortFunc(OpAbortFunc&&) = default;
313  OpAbortFunc& operator=(OpAbortFunc&&) = default;
314 
315  uint64_t GetId() const noexcept;
316  bool Abort() noexcept;
317 
318  private:
319  static uint64_t NextId();
320  uint64_t m_id;
321  std::function<bool()> m_func;
322  };
323 
324  enum class Store { Yes, No };
325  /**
326  * Adds initial keywords that can be provided by manager.
327  *
328  * - `ORIGIN`
329  * - `INSTRUME`
330  * - `ARCFILE`
331  */
332  void AddInitialKeywords(DaqContext& ctx);
333 
334  /**
335  * Adds DaqController to active set and starts monitoring status changes.
336  *
337  * @param daq Data acquisition to add.
338  * @param store Determines if workspace should be updated or not (normally yes but if DAQ is
339  * added from workspace it does not have to be updated immediately again).
340  */
341  void AddDaq(std::shared_ptr<DaqController> const& daq, Store store = Store::Yes);
342 
343  /** @name State persistence functions */
344  /// @{
345  /**
346  * Remove daq
347  * @param id DAQ to remove.
348  */
349  void RemoveDaq(std::string_view id);
350  void ArchiveDaq(std::string const& id);
351  /**
352  * Store list of active daqs in workspace.
353  */
354  void StoreActiveDaqs() const;
355  /// @}
356 
357  void RemoveAbortFunc(uint64_t id) noexcept;
358  DaqController const* FindDaq(std::string_view id) const noexcept;
359  DaqController* FindDaq(std::string_view id) noexcept;
360  DaqController& FindDaqOrThrow(std::string_view id);
361  DaqController const& FindDaqOrThrow(std::string_view id) const;
362  /**
363  * Invoked by Manager when a DAQ has entered Stopped state and DaqController implementation is
364  * changed to the DPM phase version.
365  */
366  void MoveToMergePhase(std::string_view id);
367 
368  /**
369  * Iterates data acquisitions and for those in State::NotStarted
370  *
371  * It sends pending requests and returns immediately. Upon failing the completion handler will
372  * schedule a 60s deadline timer to retry.
373  */
374  void ScheduleDaqsAsync();
375 
376  std::shared_ptr<bool> m_alive_token;
377  rad::IoExecutor& m_executor;
378  ManagerParams m_params;
379  Workspace& m_workspace;
380  std::shared_ptr<ObservableEventLog> m_event_log;
381  DaqControllerFactory& m_daq_factory;
382  std::shared_ptr<DpmClient> m_dpm_client;
383 
384  StatusSignal m_status_signal;
385  /// Data acquisitions
386  std::vector<Daq> m_daq_controllers;
387 
388  /// Pair of DAQ-id and abort function
389  std::vector<OpAbortFunc> m_abort_funcs;
390  log4cplus::Logger m_logger;
391 
392  // Used by ScheduleDaqsAsync to retry schedule merges when DPM is offline
393  std::optional<boost::asio::deadline_timer> m_schedule_retry;
394 };
395 
396 } // namespace daq
397 #endif // #ifndef OCM_DAQ_MANAGER_HPP_
Abstract factory for DaqControllers.
Controls the execution of single data acquisition that ultimately result in a set of FITS keywords an...
Implements daq::Manager.
Definition: manager.hpp:254
void RestoreFromWorkspace() override
Loads status and constructs DaqControllers corresponding to stored state.
Definition: manager.cpp:127
Status GetStatus(std::string_view id) const override
Get status.
Definition: manager.cpp:389
boost::future< Result< Status > > AwaitDaqStateAsync(std::string_view id, State, std::chrono::milliseconds timeout) override
Await DAQ state.
Definition: manager.cpp:517
bool HaveDaq(std::string_view id, std::string_view file_id={}) const noexcept override
Query existing data acquisition by id and optional file_id.
Definition: manager.cpp:209
ManagerImpl(rad::IoExecutor &executor, ManagerParams params, Workspace &workspace, std::shared_ptr< ObservableEventLog > event_log, DaqControllerFactory &daq_factory, std::shared_ptr< DpmClient > dpm_client, log4cplus::Logger const &logger)
Definition: manager.cpp:95
std::vector< std::shared_ptr< DaqController const > > GetDaqControllers() override
Definition: manager.cpp:581
~ManagerImpl() noexcept
Definition: manager.cpp:112
std::string MakeDaqId(std::chrono::system_clock::time_point *time=nullptr) const override
Creates a new unique identifier based on the instrument id and current time.
Definition: manager.cpp:200
void UpdateKeywords(std::string_view id, fits::KeywordVector const &keywords) override
Update FITS keywords for DaqController identified by id.
Definition: manager.cpp:573
boost::future< State > StartDaqAsync(DaqContext ctx) override
Start DaqController identified by id.
Definition: manager.cpp:465
boost::future< Status > StopDaqAsync(std::string_view id, ErrorPolicy policy) override
Stop DaqController identified by id.
Definition: manager.cpp:501
boost::future< Status > AbortDaqAsync(std::string_view id, ErrorPolicy policy) override
Abort DaqController identified by id.
Definition: manager.cpp:509
StatusSignal & GetStatusSignal() override
Definition: manager.cpp:577
Manager owns DaqController and FitsController (active data acquisitions) instances and multiplexes re...
Definition: manager.hpp:125
virtual boost::future< Result< Status > > AwaitDaqStateAsync(std::string_view id, State state, std::chrono::milliseconds timeout)=0
Await DAQ state.
boost::signals2::signal< void(ObservableStatus const &)> Signal
Definition: manager.hpp:129
virtual boost::future< State > StartDaqAsync(DaqContext ctx)=0
Start DaqController identified by id.
virtual StatusSignal & GetStatusSignal()=0
virtual boost::future< Status > StopDaqAsync(std::string_view id, ErrorPolicy policy)=0
Stop DaqController identified by id.
virtual Status GetStatus(std::string_view id) const =0
Get status.
virtual void UpdateKeywords(std::string_view id, fits::KeywordVector const &keywords)=0
Update FITS keywords for DaqController identified by id.
virtual std::string MakeDaqId(std::chrono::system_clock::time_point *time=nullptr) const =0
Creates a new unique identifier based on the instrument id and current time.
virtual void RestoreFromWorkspace()=0
Restore from state stored in workspace.
virtual ~Manager()
Definition: manager.hpp:127
virtual bool HaveDaq(std::string_view id, std::string_view file_id={}) const DAQ_NOEXCEPT=0
Query existing data acquisition by id and optional file_id.
virtual boost::future< Status > AbortDaqAsync(std::string_view id, ErrorPolicy policy)=0
Abort DaqController identified by id.
virtual std::vector< std::shared_ptr< DaqController const > > GetDaqControllers()=0
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:210
Observes any status.
Definition: manager.hpp:78
boost::signals2::connection ConnectObserver(Observer o)
Definition: manager.hpp:83
boost::signals2::signal< void(ObservableStatus const &)> SignalType
Definition: manager.hpp:80
void Signal(ObservableStatus const &status)
Definition: manager.hpp:87
Interface to interact with DPM workspace.
Definition: workspace.hpp:32
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
#define DAQ_NOEXCEPT
Definition: config.hpp:16
Contains declaration of daq::Context.
daq::DpmClient
Contains error related declarations for DAQ.
Contains declaration for EventLog, ObservableEventLog and related events.
Declares daq::State and related functions.
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
Definition: keyword.hpp:414
std::string origin
Definition: manager.hpp:41
std::string MakeIdCandidate(char const *instrument_id, unsigned jitter=0, std::chrono::system_clock::time_point *out=nullptr)
Creates a DAQ id candidate that may or may not be unique.
Definition: manager.cpp:46
std::chrono::hours merging_stale_age
Age of DAQ in merging state, after which it is automatically considered abandoned and will be archive...
Definition: manager.hpp:53
ErrorPolicy
Error policy supported by certain operations.
Definition: error.hpp:25
std::string instrument_id
Instrument identifier.
Definition: manager.hpp:40
State
Observable states of the data acquisition process.
Definition: state.hpp:39
std::chrono::hours acquiring_stale_age
Age of DAQ in acquiring state after which it is automatically considered abandoned and will be archiv...
Definition: manager.hpp:47
Configurations parameters directly related to manager.
Definition: manager.hpp:36
Config class header file.
Contains declaration for Status and ObservableStatus.
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:44
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:153
Declaration of utilities.