ifw-daq  3.0.1
IFW Data Acquisition modules
status.hpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_common_libdaq
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief Contains declaration for Status and ObservableStatus
7  */
8 #ifndef OCM_DAQ_DAQ_STATUS_HPP_
9 #define OCM_DAQ_DAQ_STATUS_HPP_
10 
11 #include <chrono>
12 #include <iosfwd>
13 #include <map>
14 #include <string>
15 #include <vector>
16 
17 #include <boost/signals2/signal.hpp>
18 
19 #include "dpPart.hpp"
20 #include "state.hpp"
21 #include <daq/fits/keyword.hpp>
22 
23 namespace daq {
24 
25 namespace alert {
26 
27 /**
28  * Request
29  *
30  * Key: request-name + source-name
31  */
32 constexpr std::string_view REQUEST = "request";
33 
34 /**
35  * Failure during rsync source copy.
36  * Key: local file path
37  */
38 constexpr std::string_view COLLECTING_RSYNC = "collecting.rsync";
39 
40 /**
41  * Failure during rsync source copy.
42  * Key: receiver index in DP Spec
43  */
44 constexpr std::string_view RELEASING_RSYNC = "releasing.rsync";
45 
46 /**
47  * Merging failed.
48  * Key: "" (nothing)
49  *
50  * Warning/alert during merging.
51  * Key: warning-id (e.g. "primary_hdu_resize")
52  */
53 constexpr std::string_view MERGING_MERGE = "merging.merge";
54 } // namespace alert
55 
56 /**
57  * Uniquely identfies an alert.
58  */
59 struct AlertId {
60  /**
61  * Standardized category
62  */
63  std::string category;
64  /**
65  * Unique key for each alert
66  */
67  std::string key;
68 };
69 
70 bool operator==(AlertId const& lhs, AlertId const& rhs) noexcept;
71 bool operator!=(AlertId const& lhs, AlertId const& rhs) noexcept;
72 std::ostream& operator<<(std::ostream& os, AlertId const& s);
73 
74 struct PutTimeStruct {
75  std::chrono::system_clock::time_point time;
76 };
77 std::ostream& operator<<(std::ostream& os, PutTimeStruct const& t);
78 
79 PutTimeStruct PutTime(std::chrono::system_clock::time_point const& time);
80 
81 /**
82  * Describes an active Data Acquisition alert.
83  *
84  * category and key make up the unique identifier of an alert.
85  */
86 struct Alert {
87  using Clock = std::chrono::system_clock;
88  using TimePoint = Clock::time_point;
89 
91  std::string description;
93 };
94 
95 /**
96  * Construct alert
97  */
98 Alert MakeAlert(std::string_view category, std::string key, std::string description);
99 Alert MakeAlert(AlertId id, std::string description);
100 AlertId MakeAlertId(std::string_view category, std::string key);
101 
102 /**
103  * Comparison operator for Alert.
104  * @returns true if category and key both compares equal between lhs and rhs (description .
105  */
106 bool operator==(Alert const& lhs, Alert const& rhs) noexcept;
107 bool operator==(Alert const& lhs, AlertId const& rhs) noexcept;
108 bool operator==(AlertId const& lhs, Alert const& rhs) noexcept;
109 bool operator!=(Alert const& lhs, Alert const& rhs) noexcept;
110 
111 std::ostream& operator<<(std::ostream& os, Alert const& s);
112 std::ostream& operator<<(std::ostream& os, std::vector<Alert> const& s);
113 
114 /**
115  * Set alert.
116  * @param alerts Container of alert to modify.
117  * @param alert to set.
118  * @relatesalso Status
119  */
120 void SetAlert(std::vector<Alert>& alerts, Alert alert);
121 
122 /**
123  * Clear alert.
124  * @param alerts Container of alert to modify.
125  * @param alert to set.
126  * @returns true if there was an alert to clear, false otherwise.
127  * @relatesalso Status
128  */
129 bool ClearAlert(std::vector<Alert>& alerts, AlertId const& alert);
130 
131 /**
132  * Persistent status for receiver delivery.
133  */
135  enum class State { NotStarted = 0, Started, Success, Failure };
136 
137  bool IsFinalState() const noexcept;
139 };
140 
141 bool operator==(ReceiverStatus const& lhs, ReceiverStatus const& rhs) noexcept;
142 bool operator!=(ReceiverStatus const& lhs, ReceiverStatus const& rhs) noexcept;
143 
144 std::ostream& operator<<(std::ostream& os, ReceiverStatus::State state);
145 
146 /**
147  * Non observable status object that keeps stores status of data acquisition.
148  *
149  * It is also planned to be serializable to allow crash recovery from persistent storage.
150  *
151  * @ingroup daq_common_libdaq
152  */
153 struct Status {
154  using TimePoint = std::chrono::time_point<std::chrono::system_clock>;
155  using Clock = TimePoint::clock;
156 
157  Status() = default;
158  /**
159  * @param id DAQ id
160  * @param file_id File identifier (OLAS compatible).
161  */
162  explicit Status(std::string id, std::string file_id) noexcept;
163  Status(
164  std::string id, std::string file_id, State state, bool error, TimePoint timestamp) noexcept;
165 
166  Status(Status&&) = default;
167  Status(Status const&) = default;
168  Status& operator=(Status&&) = default;
169  Status& operator=(Status const&) = default;
170 
171  bool operator==(Status const& rhs) const noexcept;
172  bool operator!=(Status const& rhs) const noexcept;
173 
174  std::string id;
175  std::string file_id;
177  bool error = false;
178  /**
179  * Active alerts
180  */
181  std::vector<Alert> alerts;
182 
183  /**
184  * Receiver processing (e.g. olas or rsync) status.
185  *
186  * Key represents corresponding DpSpec::receivers index.
187  */
188  std::map<std::size_t, ReceiverStatus> receivers;
189 
190  /**
191  * Path to resulting data product.
192  */
193  std::string result;
194 
195  /**
196  * Timestamp of last update.
197  */
199 };
200 
201 std::ostream& operator<<(std::ostream& os, Status const& s);
202 
203 /**
204  * Stores data acquisition status and allows subscription to status changes.
205  *
206  * DaqController instances will update ObservableStatus as changes occur.
207  *
208  * @ingroup daq_common_libdaq
209  */
211 public:
212  /**
213  * Defer signal changes until later time.
214  *
215  * This is mainly usedful to allow making multiple changes with only one signal being emitted.
216  *
217  * Effects:
218  * - Blocks sending signals until Deferrer expires (destroyed or manually reset).
219  * - When a deferrer is expired a signal will be emitted (if there are multiple deferrers each
220  * will emit a signal).
221  */
222  class DeferSignal {
223  public:
224  DeferSignal() = default;
225  DeferSignal(ObservableStatus* status, bool force = true);
226  ~DeferSignal() noexcept;
227  DeferSignal(DeferSignal&&) noexcept;
228  DeferSignal& operator=(DeferSignal&&) noexcept;
229 
230  /**
231  * If object is valid this will unblock and signal changes and then remove reference to
232  * ObservableStatus.
233  * @post IsValid() == false
234  */
235  void Reset() noexcept;
236 
237  /**
238  * @returns true if object is valid.
239  */
240  bool IsValid() const noexcept {
241  return m_status != nullptr;
242  }
243 
244  private:
245  ObservableStatus* m_status = nullptr;
246  bool m_force;
247  };
248 
249  /**
250  * Construct a new object
251  *
252  * @param id Data acquisition identifier.
253  */
254  explicit ObservableStatus(std::string id, std::string file_id) noexcept;
255  explicit ObservableStatus(Status status);
260 
261  bool operator==(ObservableStatus const& rhs) const noexcept;
262  bool operator!=(ObservableStatus const& rhs) const noexcept;
263 
264  bool operator==(Status const& rhs) const noexcept;
265  bool operator!=(Status const& rhs) const noexcept;
266 
267  /**
268  * Assign new status where the new status refers to the same DAQ.
269  *
270  * @param status New status to assign from.
271  * @throws std::invalid_argument if status.id is not equal to this->id.
272  */
273  ObservableStatus& operator=(Status const& status);
274 
275  ///@name Accessors
276  ///@{
277  /**
278  * @return Data acquisition identifier
279  */
280  std::string const& GetId() const noexcept;
281 
282  /**
283  * @return OLAS file id
284  */
285  std::string const& GetFileId() const noexcept;
286 
287  /**
288  * @return Data acquisition state
289  */
290  State GetState() const noexcept;
291 
292  /**
293  * @return Data acquisition error flag.
294  */
295  bool GetError() const noexcept;
296 
297  /**
298  * @return list of alerts.
299  */
300  std::vector<Alert> const& GetAlerts() const noexcept;
301 
302  /**
303  * @return Timestamp.
304  */
305  Status::Clock::time_point GetTimestamp() const noexcept;
306  ///@}
307 
308  ///@name Modifiers
309  ///@{
310  /**
311  * Set state of data acquisition.
312  *
313  * @param s New state
314  * @param error Optional new error flag value.
315  *
316  * @post Connected observers have been signalled.
317  */
318  void SetState(State s, std::optional<bool> error = std::nullopt) noexcept;
319 
320  /**
321  * Set receiver status.
322  * @param index Index of receiver in DpSpec.
323  * @param status Status structure to write.
324  */
325  void SetReceiverStatus(std::size_t index, ReceiverStatus status);
326 
327  /**
328  * Get receiver status.
329  *
330  * @param index Index of receiver in DpSpec.
331  * @return Status structure.
332  */
333  ReceiverStatus GetReceiverStatus(std::size_t index) const noexcept;
334 
335  /**
336  * Set error flag for data acquisition.
337  *
338  * @param error New error flag status.
339  *
340  * @post Connected observers have been signalled.
341  */
342  void SetError(bool error) noexcept;
343 
344  /**
345  * Set resulting data product path.
346  *
347  * @param result
348  */
349  void SetResult(std::string result);
350 
351  /**
352  * Set alert
353  *
354  * @note An existing alert with same ID will be overwritten.
355  *
356  * @param alert to set.
357  */
358  void SetAlert(Alert alert);
359 
360  /**
361  * Clear alert
362  *
363  * @param alert to clear.
364  */
365  void ClearAlert(AlertId const& alert);
366 
367  /**
368  * Connect observer that is invoked when state is modified.
369  *
370  * @param o Observer callable invoked on status changes (state or file changes)
371  * Observer must be invocable with signature `void(ObservableStatus const&)`.
372  *
373  * @return signal connection object that can be used to disconnect observer:
374  *
375  * @code
376  * auto c = status.ConnectObserver([](ObservableStatus const& s){});
377  * // later the connection object can be used to disconnect
378  * c.disconnect();
379  * @endcode
380  */
381  using SignalType = boost::signals2::signal<void(ObservableStatus const&)>;
382  template <class Observer>
383  boost::signals2::connection ConnectObserver(Observer o) {
384  return m_signal.connect(std::move(o));
385  }
386  boost::signals2::connection ConnectStatus(SignalType::slot_type const& slot) {
387  return m_signal.connect(slot);
388  }
389  /**
390  * Emit signal, which unless @a force == true, will only emit signal unless blocked.
391  */
392  void EmitSignal(bool forced = false) noexcept;
393 
394  /**
395  * Query number of changes made since last signal.
396  */
397  unsigned ChangesSinceLastSignal() const noexcept {
398  return m_changes_since_signal;
399  }
400 
401  /**
402  * Allow implicit conversion to non-observable status.
403  */
404  operator Status() const;
405  Status const& GetStatus() const noexcept;
406  ///@}
407 protected:
408  friend class DeferSignal;
409 
410  /**
411  * Decrement signal blocker and if number of blocks reaches zero emit signal if generations are
412  * differnt, unless forced in which case signal is always emitted.
413  *
414  * @param force Force signal to be emitted even though
415  * @returns generation counter which can be used to detect if Status has been modified.
416  */
417  void EnableSignals(bool force) noexcept;
418 
419  /**
420  * Disable signals from being sent (unless forced).
421  * @returns generation counter which can be used to detect if Status has been modified.
422  */
423  void DisableSignals() noexcept;
424 
425 private:
426  enum class UpdateTimestamp { No = 0, Yes = 1 };
427  /**
428  * Call to report that changes have been made, update timestamp and emit signal
429  * if not blocked.
430  */
431  void RecordChanges(bool forced = false, UpdateTimestamp update = UpdateTimestamp::Yes) noexcept;
432 
433  Status m_status;
434  // const Status should also be observable.
435  mutable SignalType m_signal;
436  /**
437  * If positive there are instances of DeferSignal blocking signals
438  */
439  unsigned m_deferred_signals = 0u;
440  unsigned m_changes_since_signal = 0u;
441 };
442 
443 std::ostream& operator<<(std::ostream& os, ObservableStatus const& s);
444 
445 } // namespace daq
446 #endif // #ifndef OCM_DAQ_DAQ_STATUS_HPP_
Defer signal changes until later time.
Definition: status.hpp:222
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:210
ObservableStatus(ObservableStatus const &)=delete
ObservableStatus & operator=(ObservableStatus &&)=default
ObservableStatus(ObservableStatus &&)=default
boost::signals2::connection ConnectStatus(SignalType::slot_type const &slot)
Connect observer that is invoked when state is modified.
Definition: status.hpp:386
ObservableStatus & operator=(ObservableStatus const &)=delete
boost::signals2::signal< void(ObservableStatus const &)> SignalType
Connect observer that is invoked when state is modified.
Definition: status.hpp:381
Contains declaration for DpPart.
Contains data structure for FITS keywords.
Declares daq::State and related functions.
constexpr std::string_view RELEASING_RSYNC
Failure during rsync source copy.
Definition: status.hpp:44
constexpr std::string_view REQUEST
Request.
Definition: status.hpp:32
constexpr std::string_view COLLECTING_RSYNC
Failure during rsync source copy.
Definition: status.hpp:38
constexpr std::string_view MERGING_MERGE
Merging failed.
Definition: status.hpp:53
AlertId id
Definition: status.hpp:90
AlertId MakeAlertId(std::string_view category, std::string key)
Definition: status.cpp:49
std::string key
Unique key for each alert.
Definition: status.hpp:67
std::string description
Definition: status.hpp:91
bool operator==(DaqContext const &lhs, DaqContext const &rhs) noexcept
Definition: daqContext.cpp:12
TimePoint timestamp
Definition: status.hpp:92
bool operator!=(AlertId const &lhs, AlertId const &rhs) noexcept
Definition: status.cpp:57
std::ostream & operator<<(std::ostream &os, AsyncProcessIf const &proc)
Formats proc representation in the form [<pid>] <args>
std::string category
Standardized category.
Definition: status.hpp:63
void SetAlert(std::vector< Alert > &alerts, Alert alert)
Set alert.
Definition: status.cpp:19
bool ClearAlert(std::vector< Alert > &alerts, AlertId const &alert)
Clear alert.
Definition: status.cpp:30
State
Observable states of the data acquisition process.
Definition: state.hpp:39
std::chrono::system_clock::time_point time
Definition: status.hpp:75
PutTimeStruct PutTime(std::chrono::system_clock::time_point const &time)
Definition: status.cpp:95
Clock::time_point TimePoint
Definition: status.hpp:88
std::chrono::system_clock Clock
Definition: status.hpp:87
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
Definition: status.cpp:39
Describes an active Data Acquisition alert.
Definition: status.hpp:86
Uniquely identfies an alert.
Definition: status.hpp:59
Definition: main.cpp:23
Persistent status for receiver delivery.
Definition: status.hpp:134
bool IsFinalState() const noexcept
Definition: status.cpp:113
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:153
Status & operator=(Status const &)=default
Status & operator=(Status &&)=default
TimePoint::clock Clock
Definition: status.hpp:155
std::string id
Definition: status.hpp:174
std::string result
Path to resulting data product.
Definition: status.hpp:193
std::map< std::size_t, ReceiverStatus > receivers
Receiver processing (e.g.
Definition: status.hpp:188
std::string file_id
Definition: status.hpp:175
Status(Status const &)=default
Status(Status &&)=default
std::vector< Alert > alerts
Active alerts.
Definition: status.hpp:181
std::chrono::time_point< std::chrono::system_clock > TimePoint
Definition: status.hpp:154
TimePoint timestamp
Timestamp of last update.
Definition: status.hpp:198
Status()=default