ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
status.cpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup daq_common_libdaq
4 * @copyright
5 * (c) Copyright ESO 2023
6 * All Rights Reserved
7 * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
8 */
9#include <algorithm>
10#include <ctime>
11#include <daq/status.hpp>
12#include <iomanip>
13#include <ostream>
14
15#include <boost/assert.hpp>
16#include <fmt/format.h>
17
18namespace daq {
19
20void SetAlert(std::vector<Alert>& alerts, Alert alert) {
21 auto it = find(alerts.begin(), alerts.end(), alert);
22 if (it != alerts.end()) {
23 // Replace
24 *it = std::move(alert);
25 } else {
26 // Or add
27 alerts.emplace_back(std::move(alert));
28 }
29}
30
31bool ClearAlert(std::vector<Alert>& alerts, AlertId const& alert) {
32 auto it =
33 alert.key == "*"
34 ? std::remove_if(alerts.begin(),
35 alerts.end(),
36 [&](Alert const& cmp) { return cmp.id.category == alert.category; })
37 : std::remove(alerts.begin(), alerts.end(), alert);
38 if (it != alerts.end()) {
39 alerts.erase(it, alerts.end());
40 return true;
41 }
42 return false;
43}
44
45Alert MakeAlert(std::string_view category, std::string key, std::string description) {
46 return Alert{{std::string(category), std::move(key)},
47 std::move(description),
48 Alert::TimePoint::clock::now()};
49}
50
51Alert MakeAlert(AlertId id, std::string description) {
52 return Alert{std::move(id), std::move(description), Alert::TimePoint::clock::now()};
53}
54
55AlertId MakeAlertId(std::string_view category, std::string key) {
56 return AlertId{std::string(category), std::move(key)};
57}
58
59bool operator==(AlertId const& lhs, AlertId const& rhs) noexcept {
60 return lhs.key == rhs.key && lhs.category == rhs.category;
61}
62
63bool operator!=(AlertId const& lhs, AlertId const& rhs) noexcept {
64 return !(lhs == rhs);
65}
66
67std::ostream& operator<<(std::ostream& os, AlertId const& s) {
68 os << s.category << "-" << s.key;
69 return os;
70}
71
72bool operator==(Alert const& lhs, Alert const& rhs) noexcept {
73 return lhs.id == rhs.id;
74}
75
76bool operator==(Alert const& lhs, AlertId const& rhs) noexcept {
77 return lhs.id == rhs;
78}
79
80bool operator==(AlertId const& lhs, Alert const& rhs) noexcept {
81 return lhs == rhs.id;
82}
83
84bool operator!=(Alert const& lhs, Alert const& rhs) noexcept {
85 return !(lhs == rhs);
86}
87
88std::ostream& operator<<(std::ostream& os, PutTimeStruct const& t) {
89 auto timet = std::chrono::system_clock::to_time_t(t.time);
90 using milli = std::chrono::milliseconds;
91
92 std::ios restore(nullptr);
93 restore.copyfmt(os);
94 os << std::put_time(std::gmtime(&timet), "%FT%T.") << std::setfill('0') << std::setw(3)
95 << std::chrono::duration_cast<milli>(t.time.time_since_epoch()).count() % 1000;
96
97 os.copyfmt(restore);
98 return os;
99}
100
101PutTimeStruct PutTime(std::chrono::system_clock::time_point const& time) {
102 return PutTimeStruct{time};
103}
104
105std::ostream& operator<<(std::ostream& os, Alert const& alert) {
106 os << PutTime(alert.timestamp) << " (" << alert.id << "): " << alert.description;
107 return os;
108}
109
110std::ostream& operator<<(std::ostream& os, std::vector<Alert> const& alerts) {
111 auto count = 0u;
112 for (auto const& alert : alerts) {
113 os << "[" << count << "]: " << alert << "\n";
114 ++count;
115 }
116 return os;
117}
118
119bool ReceiverStatus::IsFinalState() const noexcept {
121}
122
123bool operator==(ReceiverStatus const& lhs, ReceiverStatus const& rhs) noexcept {
124 return lhs.state == rhs.state;
125}
126
127bool operator!=(ReceiverStatus const& lhs, ReceiverStatus const& rhs) noexcept {
128 return !(lhs == rhs);
129}
130
131std::ostream& operator<<(std::ostream& os, ReceiverStatus::State state) {
132 switch (state) {
134 os << "NotStarted";
135 break;
137 os << "Started";
138 break;
140 os << "Success";
141 break;
143 os << "Failure";
144 break;
145 default:
146 os << "Unknown";
147 break;
148 };
149 return os;
150}
151
152Status::Status(std::string id, std::string file_id, State state, TimePoint timestamp) noexcept
153 : id(std::move(id))
154 , file_id(std::move(file_id))
155 , state(state)
156 , alerts()
157 , receivers()
158 , timestamp(timestamp) {
159}
160
161Status::Status(std::string id, std::string file_id) noexcept
162 : id(std::move(id))
163 , file_id(std::move(file_id))
164 , state(State::NotStarted)
165 , alerts()
166 , receivers()
167 , timestamp(TimePoint::clock::now()) {
168}
169
170bool Status::operator==(Status const& rhs) const noexcept {
171 return id == rhs.id && file_id == rhs.file_id && state == rhs.state && alerts == rhs.alerts &&
172 receivers == rhs.receivers && result == rhs.result;
173}
174
175bool Status::operator!=(Status const& rhs) const noexcept {
176 return !(*this == rhs);
177}
178
179bool HasError(Status const& status) noexcept {
180 return !status.alerts.empty();
181}
182
183std::ostream& operator<<(std::ostream& os, Status const& s) {
184 os << "Status(id='" << s.id << "', file_id='" << s.file_id << "', state=" << s.state
185 << ", error=" << (HasError(s) ? "true" : "false") << ", result='" << s.result
186 << "', timestamp=" << PutTime(s.timestamp) << ")";
187 return os;
188}
189
191 : m_status(status), m_force(force) {
192 if (m_status) {
193 m_status->DisableSignals();
194 }
195}
196
198 : m_status(nullptr) {
199 std::swap(m_status, other.m_status);
200 std::swap(m_force, other.m_force);
201}
202
205 // Reset this before stealing resources from other
206 Reset();
207
208 std::swap(m_status, other.m_status);
209 std::swap(m_force, other.m_force);
210 return *this;
211}
212
214 Reset();
215}
216
218 if (!m_status) {
219 return;
220 }
221 m_status->EnableSignals(m_force);
222 m_status = nullptr;
223}
224
226 : m_status(status), m_defer(status), m_id(std::move(id)) {
227}
228
230 std::string_view category,
231 std::string key)
232 : m_status(status), m_defer(status), m_id{std::string(category), std::move(key)} {
233}
234
235void ObservableStatus::AlertActivator::Set(std::string description) {
236 BOOST_ASSERT_MSG(m_status != nullptr, "ObservableStatus is a nullptr");
237 m_status->SetAlert(MakeAlert(m_id, std::move(description)));
238}
239
241 BOOST_ASSERT_MSG(m_status != nullptr, "ObservableStatus is a nullptr");
242 m_status->ClearAlert(m_id);
243}
244
245ObservableStatus::ObservableStatus(Status status) : m_status(std::move(status)) {
246}
247
248ObservableStatus::ObservableStatus(std::string id, std::string file_id) noexcept
249 : m_status(std::move(id), std::move(file_id)) {
250}
251
253 if (m_status == status && m_status.timestamp == status.timestamp) {
254 return *this;
255 }
256 if (m_status.id != status.id) {
257 throw std::invalid_argument(fmt::format(
258 "Precondition not met (equality of DAQ IDs): {} != {}", m_status.id, status.id));
259 }
260 m_status = status;
261 RecordChanges(true, UpdateTimestamp::No);
262 return *this;
263}
264
265bool ObservableStatus::operator==(ObservableStatus const& rhs) const noexcept {
266 return m_status == rhs.m_status;
267}
268
269bool ObservableStatus::operator!=(ObservableStatus const& rhs) const noexcept {
270 return !(*this == rhs);
271}
272
273bool ObservableStatus::operator==(Status const& rhs) const noexcept {
274 return m_status == rhs;
275}
276
277bool ObservableStatus::operator!=(Status const& rhs) const noexcept {
278 return !(*this == rhs);
279}
280
281std::string const& ObservableStatus::GetId() const noexcept {
282 return m_status.id;
283}
284
285std::vector<Alert> const& ObservableStatus::GetAlerts() const noexcept {
286 return m_status.alerts;
287}
288
289Status::Clock::time_point ObservableStatus::GetTimestamp() const noexcept {
290 return m_status.timestamp;
291}
292
293std::string const& ObservableStatus::GetFileId() const noexcept {
294 return m_status.file_id;
295}
296
298 return m_status.state;
299}
300
301bool ObservableStatus::HasError() const noexcept {
302 return daq::HasError(m_status);
303}
304
306 if (s == m_status.state) {
307 return;
308 }
309 m_status.state = s;
310 RecordChanges();
311}
312
314 auto [it, inserted] = m_status.receivers.try_emplace(index, status);
315 if (inserted) {
316 RecordChanges();
317 } else if (it->second != status) {
318 it->second = status;
319 RecordChanges();
320 }
321}
322
323ReceiverStatus ObservableStatus::GetReceiverStatus(std::size_t index) const noexcept {
324 if (auto it = m_status.receivers.find(index); it != m_status.receivers.end()) {
325 return it->second;
326 } else {
328 }
329}
330
331void ObservableStatus::SetResult(std::string result) {
332 m_status.result = std::move(result);
333 RecordChanges();
334}
335
337 ::daq::SetAlert(m_status.alerts, std::move(alert));
338 RecordChanges();
339}
340
342 if (::daq::ClearAlert(m_status.alerts, alert)) {
343 RecordChanges();
344 }
345}
346
347void ObservableStatus::EmitSignal(bool force) noexcept {
348 // Unless forced, emit signal if not blocked and there are changes.
349 if (force || (m_deferred_signals == 0 && m_changes_since_signal > 0u)) {
350 try {
351 m_signal(*this);
352 } catch (...) {
353 }
354 // Reset number of changes since emitted signal
355 m_changes_since_signal = 0u;
356 }
357}
358
359void ObservableStatus::RecordChanges(bool force, UpdateTimestamp update) noexcept {
360 m_changes_since_signal++;
361 if (update == UpdateTimestamp::Yes) {
362 m_status.timestamp = Status::TimePoint::clock::now();
363 }
364 EmitSignal(force);
365}
366
367ObservableStatus::operator Status() const {
368 return m_status;
369}
370
371Status const& ObservableStatus::GetStatus() const noexcept {
372 return m_status;
373}
374
375void ObservableStatus::EnableSignals(bool force) noexcept {
376 assert(m_deferred_signals > 0);
377 --m_deferred_signals;
378
379 EmitSignal(force);
380}
381
383 ++m_deferred_signals;
384}
385
386std::ostream& operator<<(std::ostream& os, ObservableStatus const& s) {
387 os << "ObservableStatus(id='" << s.GetId() << "', file_id='" << s.GetFileId()
388 << "', state=" << s.GetState() << ", error=" << (s.HasError() ? "true" : "false") << ")";
389 return os;
390}
391
392} // namespace daq
AlertActivator(ObservableStatus *, AlertId id)
Definition: status.cpp:225
void Set(std::string description)
Definition: status.cpp:235
Defer signal changes until later time.
Definition: status.hpp:236
void Reset() noexcept
If object is valid this will unblock and signal changes and then remove reference to ObservableStatus...
Definition: status.cpp:217
DeferSignal & operator=(DeferSignal &&) noexcept
Definition: status.cpp:204
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:224
void EnableSignals(bool force) noexcept
Decrement signal blocker and if number of blocks reaches zero emit signal if generations are differnt...
Definition: status.cpp:375
State GetState() const noexcept
Definition: status.cpp:297
void EmitSignal(bool forced=false) noexcept
Emit signal, which unless force == true, will only emit signal unless blocked.
Definition: status.cpp:347
void SetReceiverStatus(std::size_t index, ReceiverStatus status)
Set receiver status.
Definition: status.cpp:313
Status const & GetStatus() const noexcept
Connect observer that is invoked when state is modified.
Definition: status.cpp:371
void ClearAlert(AlertId const &alert)
Clear alert.
Definition: status.cpp:341
bool operator==(ObservableStatus const &rhs) const noexcept
Definition: status.cpp:265
void DisableSignals() noexcept
Disable signals from being sent (unless forced).
Definition: status.cpp:382
ObservableStatus & operator=(ObservableStatus &&)=default
void SetState(State s) noexcept
Set state of data acquisition.
Definition: status.cpp:305
void SetResult(std::string result)
Set resulting data product path.
Definition: status.cpp:331
std::string const & GetFileId() const noexcept
Definition: status.cpp:293
ReceiverStatus GetReceiverStatus(std::size_t index) const noexcept
Get receiver status.
Definition: status.cpp:323
bool operator!=(ObservableStatus const &rhs) const noexcept
Definition: status.cpp:269
Status::Clock::time_point GetTimestamp() const noexcept
Definition: status.cpp:289
void SetAlert(Alert alert)
Set alert.
Definition: status.cpp:336
Alerts const & GetAlerts() const noexcept
Definition: status.cpp:285
bool HasError() const noexcept
Definition: status.cpp:301
ObservableStatus(std::string id, std::string file_id) noexcept
Construct a new object.
Definition: status.cpp:248
std::string const & GetId() const noexcept
Definition: status.cpp:281
AlertId id
Definition: status.hpp:99
AlertId MakeAlertId(std::string_view category, std::string key)
Definition: status.cpp:55
std::string key
Unique key for each alert.
Definition: status.hpp:76
std::string description
Definition: status.hpp:100
bool operator==(DaqContext const &lhs, DaqContext const &rhs) noexcept
Definition: daqContext.cpp:12
TimePoint timestamp
Definition: status.hpp:101
bool operator!=(AlertId const &lhs, AlertId const &rhs) noexcept
Definition: status.cpp:63
std::ostream & operator<<(std::ostream &os, AsyncProcessIf const &proc)
Formats proc representation in the form [<pid>] <args>
bool HasError(Status const &status) noexcept
Definition: status.cpp:179
std::string category
Standardized category.
Definition: status.hpp:72
void SetAlert(std::vector< Alert > &alerts, Alert alert)
Set alert.
Definition: status.cpp:20
bool ClearAlert(std::vector< Alert > &alerts, AlertId const &alert)
Clear alert.
Definition: status.cpp:31
State
Observable states of the data acquisition process.
Definition: state.hpp:41
@ NotStarted
Initial state of data acquisition.
std::chrono::system_clock::time_point time
Definition: status.hpp:84
PutTimeStruct PutTime(std::chrono::system_clock::time_point const &time)
Definition: status.cpp:101
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
Definition: status.cpp:45
Describes an active Data Acquisition alert.
Definition: status.hpp:95
Uniquely identfies an alert.
Definition: status.hpp:68
Contains declaration for Status and ObservableStatus.
Persistent status for receiver delivery.
Definition: status.hpp:145
bool IsFinalState() const noexcept
Definition: status.cpp:119
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:164
bool operator!=(Status const &rhs) const noexcept
Definition: status.cpp:175
bool operator==(Status const &rhs) const noexcept
Definition: status.cpp:170
State state
Definition: status.hpp:186
std::string id
Definition: status.hpp:184
std::string result
Path to resulting data product.
Definition: status.hpp:202
std::map< std::size_t, ReceiverStatus > receivers
Receiver processing (e.g.
Definition: status.hpp:197
std::string file_id
Definition: status.hpp:185
bool ClearAlert(std::vector< Alert > &alerts, AlertId const &alert)
Clear alert.
Definition: status.cpp:31
std::vector< Alert > alerts
Active alerts.
Definition: status.hpp:190
std::chrono::time_point< std::chrono::system_clock > TimePoint
Definition: status.hpp:165
TimePoint timestamp
Timestamp of last update.
Definition: status.hpp:207
Status()=default
void SetAlert(std::vector< Alert > &alerts, Alert alert)
Set alert.
Definition: status.cpp:20