ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
status.hpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup daq_common_libdaq
4 * @copyright 2022 ESO - European Southern Observatory
5 *
6 * @brief Contains declaration for Status and ObservableStatus
7 */
8#ifndef OCM_DAQ_DAQ_STATUS_HPP_
9#define OCM_DAQ_DAQ_STATUS_HPP_
10
11#include <chrono>
12#include <iosfwd>
13#include <map>
14#include <string>
15#include <vector>
16
17#include <boost/signals2/signal.hpp>
18#include <fmt/ostream.h>
19
20#include "state.hpp"
21#include <daq/fits/keyword.hpp>
22
23namespace daq {
24
25namespace alert {
26
27/**
28 * Request
29 *
30 * Key: request-name + source-name
31 */
32constexpr std::string_view REQUEST = "request";
33
34/**
35 * Failure during rsync source copy.
36 * Key: local file path
37 */
38constexpr std::string_view COLLECTING_RSYNC = "collecting.rsync";
39
40/**
41 * Failure during rsync source copy.
42 * Key: receiver index in DP Spec
43 */
44constexpr std::string_view RELEASING_RSYNC = "releasing.rsync";
45
46/**
47 * Merging failed.
48 * Key: "" (nothing)
49 *
50 * Warning/alert during merging.
51 * Key: warning-id (e.g. "primary_hdu_resize")
52 */
53constexpr std::string_view MERGING_MERGE = "merging.merge";
54
55/**
56 * Daq controller command failed which is normally when there was an exception from async
57 * operations. Alert is not set for invalid commands (e.g. invalid arguments).
58 *
59 * Key: command, e.g. `StartAsync()`
60 */
61constexpr std::string_view DAQ_CONTROLLER = "daqcontroller";
62
63} // namespace alert
64
65/**
66 * Uniquely identfies an alert.
67 */
68struct AlertId {
69 /**
70 * Standardized category
71 */
72 std::string category;
73 /**
74 * Unique key for each alert
75 */
76 std::string key;
77};
78
79bool operator==(AlertId const& lhs, AlertId const& rhs) noexcept;
80bool operator!=(AlertId const& lhs, AlertId const& rhs) noexcept;
81std::ostream& operator<<(std::ostream& os, AlertId const& s);
82
84 std::chrono::system_clock::time_point time;
85};
86std::ostream& operator<<(std::ostream& os, PutTimeStruct const& t);
87
88PutTimeStruct PutTime(std::chrono::system_clock::time_point const& time);
89
90/**
91 * Describes an active Data Acquisition alert.
92 *
93 * category and key make up the unique identifier of an alert.
94 */
95struct Alert {
96 using Clock = std::chrono::system_clock;
97 using TimePoint = Clock::time_point;
98
100 std::string description;
102};
103
104using Alerts = std::vector<Alert>;
105/**
106 * Construct alert
107 */
108Alert MakeAlert(std::string_view category, std::string key, std::string description);
109Alert MakeAlert(AlertId id, std::string description);
110AlertId MakeAlertId(std::string_view category, std::string key);
111
112/**
113 * Comparison operator for Alert.
114 * @returns true if category and key both compares equal between lhs and rhs (description .
115 */
116bool operator==(Alert const& lhs, Alert const& rhs) noexcept;
117bool operator==(Alert const& lhs, AlertId const& rhs) noexcept;
118bool operator==(AlertId const& lhs, Alert const& rhs) noexcept;
119bool operator!=(Alert const& lhs, Alert const& rhs) noexcept;
120
121std::ostream& operator<<(std::ostream& os, Alert const& s);
122std::ostream& operator<<(std::ostream& os, Alerts const& s);
123
124/**
125 * Set alert.
126 * @param alerts Container of alert to modify.
127 * @param alert to set.
128 * @relatesalso Status
129 */
130void SetAlert(std::vector<Alert>& alerts, Alert alert);
131
132/**
133 * Clear alert.
134 *
135 * @param alerts Container of alert to modify.
136 * @param alert to clear. If `key` is '*' then all alerts with matching category is cleared.
137 * @returns true if there was an alert to clear, false otherwise.
138 * @relatesalso Status
139 */
140bool ClearAlert(std::vector<Alert>& alerts, AlertId const& alert);
141
142/**
143 * Persistent status for receiver delivery.
144 */
146 enum class State { NotStarted = 0, Started, Success, Failure };
147
148 bool IsFinalState() const noexcept;
150};
151
152bool operator==(ReceiverStatus const& lhs, ReceiverStatus const& rhs) noexcept;
153bool operator!=(ReceiverStatus const& lhs, ReceiverStatus const& rhs) noexcept;
154
155std::ostream& operator<<(std::ostream& os, ReceiverStatus::State state);
156
157/**
158 * Non observable status object that keeps stores status of data acquisition.
159 *
160 * It is also planned to be serializable to allow crash recovery from persistent storage.
161 *
162 * @ingroup daq_common_libdaq
163 */
164struct Status {
165 using TimePoint = std::chrono::time_point<std::chrono::system_clock>;
166 using Clock = TimePoint::clock;
167
168 Status() = default;
169 /**
170 * @param id DAQ id
171 * @param file_id File identifier (OLAS compatible).
172 */
173 explicit Status(std::string id, std::string file_id) noexcept;
174 Status(std::string id, std::string file_id, State state, TimePoint timestamp) noexcept;
175
176 Status(Status&&) = default;
177 Status(Status const&) = default;
178 Status& operator=(Status&&) = default;
179 Status& operator=(Status const&) = default;
180
181 bool operator==(Status const& rhs) const noexcept;
182 bool operator!=(Status const& rhs) const noexcept;
183
184 std::string id;
185 std::string file_id;
187 /**
188 * Active alerts
189 */
190 std::vector<Alert> alerts;
191
192 /**
193 * Receiver processing (e.g. olas or rsync) status.
194 *
195 * Key represents corresponding DpSpec::receivers index.
196 */
197 std::map<std::size_t, ReceiverStatus> receivers;
198
199 /**
200 * Path to resulting data product.
201 */
202 std::string result;
203
204 /**
205 * Timestamp of last update.
206 */
208};
209
210/**
211 * @return true if there are alerts with error.
212 */
213bool HasError(Status const& status) noexcept;
214
215std::ostream& operator<<(std::ostream& os, Status const& s);
216
217/**
218 * Stores data acquisition status and allows subscription to status changes.
219 *
220 * DaqController instances will update ObservableStatus as changes occur.
221 *
222 * @ingroup daq_common_libdaq
223 */
225public:
226 /**
227 * Defer signal changes until later time.
228 *
229 * This is mainly usedful to allow making multiple changes with only one signal being emitted.
230 *
231 * Effects:
232 * - Blocks sending signals until Deferrer expires (destroyed or manually reset).
233 * - When a deferrer is expired a signal will be emitted (if there are multiple deferrers each
234 * will emit a signal).
235 */
237 public:
238 DeferSignal() = default;
239 DeferSignal(ObservableStatus* status, bool force = true);
240 ~DeferSignal() noexcept;
241 DeferSignal(DeferSignal&&) noexcept;
242 DeferSignal& operator=(DeferSignal&&) noexcept;
243
244 /**
245 * If object is valid this will unblock and signal changes and then remove reference to
246 * ObservableStatus.
247 * @post IsValid() == false
248 */
249 void Reset() noexcept;
250
251 /**
252 * @returns true if object is valid.
253 */
254 bool IsValid() const noexcept {
255 return m_status != nullptr;
256 }
257
258 private:
259 ObservableStatus* m_status = nullptr;
260 bool m_force;
261 };
262
263 /**
264 * Provide more consise way to set/clear alerts.
265 */
267 public:
269 explicit AlertActivator(ObservableStatus* status,
270 std::string_view category,
271 std::string key);
272
273 void Set(std::string description);
274 void Clear();
275
276 private:
277 ObservableStatus* m_status;
278 DeferSignal m_defer;
279 AlertId m_id;
280 };
281
282 /**
283 * Construct a new object
284 *
285 * @param id Data acquisition identifier.
286 */
287 explicit ObservableStatus(std::string id, std::string file_id) noexcept;
288 explicit ObservableStatus(Status status);
293
294 bool operator==(ObservableStatus const& rhs) const noexcept;
295 bool operator!=(ObservableStatus const& rhs) const noexcept;
296
297 bool operator==(Status const& rhs) const noexcept;
298 bool operator!=(Status const& rhs) const noexcept;
299
300 /**
301 * Assign new status where the new status refers to the same DAQ.
302 *
303 * @param status New status to assign from.
304 * @throws std::invalid_argument if status.id is not equal to this->id.
305 */
306 ObservableStatus& operator=(Status const& status);
307
308 ///@name Accessors
309 ///@{
310 /**
311 * @return Data acquisition identifier
312 */
313 std::string const& GetId() const noexcept;
314
315 /**
316 * @return OLAS file id
317 */
318 std::string const& GetFileId() const noexcept;
319
320 /**
321 * @return Data acquisition state
322 */
323 State GetState() const noexcept;
324
325 /**
326 * @return Data acquisition error flag.
327 */
328 bool HasError() const noexcept;
329
330 /**
331 * @return list of alerts.
332 */
333 Alerts const& GetAlerts() const noexcept;
334
335 /**
336 * @return Timestamp.
337 */
338 Status::Clock::time_point GetTimestamp() const noexcept;
339 ///@}
340
341 ///@name Modifiers
342 ///@{
343 /**
344 * Set state of data acquisition.
345 *
346 * @param s New state
347 *
348 * @post Connected observers have been signalled.
349 */
350 void SetState(State s) noexcept;
351
352 /**
353 * Set receiver status.
354 * @param index Index of receiver in DpSpec.
355 * @param status Status structure to write.
356 */
357 void SetReceiverStatus(std::size_t index, ReceiverStatus status);
358
359 /**
360 * Get receiver status.
361 *
362 * @param index Index of receiver in DpSpec.
363 * @return Status structure.
364 */
365 ReceiverStatus GetReceiverStatus(std::size_t index) const noexcept;
366
367 /**
368 * Set resulting data product path.
369 *
370 * @param result
371 */
372 void SetResult(std::string result);
373
374 /**
375 * Set alert
376 *
377 * @note An existing alert with same ID will be overwritten.
378 *
379 * @param alert to set.
380 */
381 void SetAlert(Alert alert);
382
383 /**
384 * Clear alert
385 *
386 * @param alert to clear.
387 */
388 void ClearAlert(AlertId const& alert);
389
390 /**
391 * Connect observer that is invoked when state is modified.
392 *
393 * @param o Observer callable invoked on status changes (state or file changes)
394 * Observer must be invocable with signature `void(ObservableStatus const&)`.
395 *
396 * @return signal connection object that can be used to disconnect observer:
397 *
398 * @code
399 * auto c = status.ConnectObserver([](ObservableStatus const& s){});
400 * // later the connection object can be used to disconnect
401 * c.disconnect();
402 * @endcode
403 */
404 using SignalType = boost::signals2::signal<void(ObservableStatus const&)>;
405 template <class Observer>
406 boost::signals2::connection ConnectObserver(Observer o) {
407 return m_signal.connect(std::move(o));
408 }
409 boost::signals2::connection ConnectStatus(SignalType::slot_type const& slot) {
410 return m_signal.connect(slot);
411 }
412 /**
413 * Emit signal, which unless @a force == true, will only emit signal unless blocked.
414 */
415 void EmitSignal(bool forced = false) noexcept;
416
417 /**
418 * Query number of changes made since last signal.
419 */
420 unsigned ChangesSinceLastSignal() const noexcept {
421 return m_changes_since_signal;
422 }
423
424 /**
425 * Allow implicit conversion to non-observable status.
426 */
427 operator Status() const;
428 Status const& GetStatus() const noexcept;
429 ///@}
430protected:
431 friend class DeferSignal;
432
433 /**
434 * Decrement signal blocker and if number of blocks reaches zero emit signal if generations are
435 * differnt, unless forced in which case signal is always emitted.
436 *
437 * @param force Force signal to be emitted even though
438 * @returns generation counter which can be used to detect if Status has been modified.
439 */
440 void EnableSignals(bool force) noexcept;
441
442 /**
443 * Disable signals from being sent (unless forced).
444 * @returns generation counter which can be used to detect if Status has been modified.
445 */
446 void DisableSignals() noexcept;
447
448private:
449 enum class UpdateTimestamp { No = 0, Yes = 1 };
450 /**
451 * Call to report that changes have been made, update timestamp and emit signal
452 * if not blocked.
453 */
454 void RecordChanges(bool forced = false, UpdateTimestamp update = UpdateTimestamp::Yes) noexcept;
455
456 Status m_status;
457 // const Status should also be observable.
458 mutable SignalType m_signal;
459 /**
460 * If positive there are instances of DeferSignal blocking signals
461 */
462 unsigned m_deferred_signals = 0u;
463 unsigned m_changes_since_signal = 0u;
464};
465
466std::ostream& operator<<(std::ostream& os, ObservableStatus const& s);
467
468} // namespace daq
469
470template <>
471struct fmt::formatter<daq::AlertId> : ostream_formatter {};
472template <>
473struct fmt::formatter<daq::PutTimeStruct> : ostream_formatter {};
474template <>
475struct fmt::formatter<daq::Alert> : ostream_formatter {};
476template <>
477struct fmt::formatter<daq::Alerts> : ostream_formatter {};
478template <>
479struct fmt::formatter<daq::ReceiverStatus> : ostream_formatter {};
480template <>
481struct fmt::formatter<daq::Status> : ostream_formatter {};
482template <>
483struct fmt::formatter<daq::ObservableStatus> : ostream_formatter {};
484
485#endif // #ifndef OCM_DAQ_DAQ_STATUS_HPP_
Provide more consise way to set/clear alerts.
Definition: status.hpp:266
Defer signal changes until later time.
Definition: status.hpp:236
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:224
ObservableStatus(ObservableStatus const &)=delete
ObservableStatus & operator=(ObservableStatus &&)=default
ObservableStatus(ObservableStatus &&)=default
boost::signals2::connection ConnectStatus(SignalType::slot_type const &slot)
Connect observer that is invoked when state is modified.
Definition: status.hpp:409
ObservableStatus & operator=(ObservableStatus const &)=delete
boost::signals2::signal< void(ObservableStatus const &)> SignalType
Connect observer that is invoked when state is modified.
Definition: status.hpp:404
Contains data structure for FITS keywords.
Declares daq::State and related functions.
constexpr std::string_view RELEASING_RSYNC
Failure during rsync source copy.
Definition: status.hpp:44
constexpr std::string_view REQUEST
Request.
Definition: status.hpp:32
constexpr std::string_view DAQ_CONTROLLER
Daq controller command failed which is normally when there was an exception from async operations.
Definition: status.hpp:61
constexpr std::string_view COLLECTING_RSYNC
Failure during rsync source copy.
Definition: status.hpp:38
constexpr std::string_view MERGING_MERGE
Merging failed.
Definition: status.hpp:53
AlertId id
Definition: status.hpp:99
AlertId MakeAlertId(std::string_view category, std::string key)
Definition: status.cpp:55
std::string key
Unique key for each alert.
Definition: status.hpp:76
std::string description
Definition: status.hpp:100
bool operator==(DaqContext const &lhs, DaqContext const &rhs) noexcept
Definition: daqContext.cpp:12
TimePoint timestamp
Definition: status.hpp:101
bool operator!=(AlertId const &lhs, AlertId const &rhs) noexcept
Definition: status.cpp:63
std::ostream & operator<<(std::ostream &os, AsyncProcessIf const &proc)
Formats proc representation in the form [<pid>] <args>
bool HasError(Status const &status) noexcept
Definition: status.cpp:179
std::string category
Standardized category.
Definition: status.hpp:72
void SetAlert(std::vector< Alert > &alerts, Alert alert)
Set alert.
Definition: status.cpp:20
bool ClearAlert(std::vector< Alert > &alerts, AlertId const &alert)
Clear alert.
Definition: status.cpp:31
State
Observable states of the data acquisition process.
Definition: state.hpp:41
std::chrono::system_clock::time_point time
Definition: status.hpp:84
PutTimeStruct PutTime(std::chrono::system_clock::time_point const &time)
Definition: status.cpp:101
std::vector< Alert > Alerts
Definition: status.hpp:104
Clock::time_point TimePoint
Definition: status.hpp:97
std::chrono::system_clock Clock
Definition: status.hpp:96
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
Definition: status.cpp:45
Describes an active Data Acquisition alert.
Definition: status.hpp:95
Uniquely identfies an alert.
Definition: status.hpp:68
Persistent status for receiver delivery.
Definition: status.hpp:145
bool IsFinalState() const noexcept
Definition: status.cpp:119
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:164
TimePoint::clock Clock
Definition: status.hpp:166
Status & operator=(Status const &)=default
std::string id
Definition: status.hpp:184
std::string result
Path to resulting data product.
Definition: status.hpp:202
std::map< std::size_t, ReceiverStatus > receivers
Receiver processing (e.g.
Definition: status.hpp:197
std::string file_id
Definition: status.hpp:185
Status(Status const &)=default
Status(Status &&)=default
std::vector< Alert > alerts
Active alerts.
Definition: status.hpp:190
Status & operator=(Status &&)=default
std::chrono::time_point< std::chrono::system_clock > TimePoint
Definition: status.hpp:165
TimePoint timestamp
Timestamp of last update.
Definition: status.hpp:207
Status()=default