ifw-daq  3.0.1
IFW Data Acquisition modules
status.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_common_libdaq
4  * @copyright
5  * (c) Copyright ESO 2023
6  * All Rights Reserved
7  * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
8  */
9 #include <algorithm>
10 #include <ctime>
11 #include <daq/status.hpp>
12 #include <iomanip>
13 #include <ostream>
14 
15 #include <fmt/format.h>
16 
17 namespace daq {
18 
19 void SetAlert(std::vector<Alert>& alerts, Alert alert) {
20  auto it = find(alerts.begin(), alerts.end(), alert);
21  if (it != alerts.end()) {
22  // Replace
23  *it = std::move(alert);
24  } else {
25  // Or add
26  alerts.emplace_back(std::move(alert));
27  }
28 }
29 
30 bool ClearAlert(std::vector<Alert>& alerts, AlertId const& alert) {
31  auto it = std::remove(alerts.begin(), alerts.end(), alert);
32  if (it != alerts.end()) {
33  alerts.erase(it, alerts.end());
34  return true;
35  }
36  return false;
37 }
38 
39 Alert MakeAlert(std::string_view category, std::string key, std::string description) {
40  return Alert{{std::string(category), std::move(key)},
41  std::move(description),
42  Alert::TimePoint::clock::now()};
43 }
44 
45 Alert MakeAlert(AlertId id, std::string description) {
46  return Alert{std::move(id), std::move(description), Alert::TimePoint::clock::now()};
47 }
48 
49 AlertId MakeAlertId(std::string_view category, std::string key) {
50  return AlertId{std::string(category), std::move(key)};
51 }
52 
53 bool operator==(AlertId const& lhs, AlertId const& rhs) noexcept {
54  return lhs.key == rhs.key && lhs.category == rhs.category;
55 }
56 
57 bool operator!=(AlertId const& lhs, AlertId const& rhs) noexcept {
58  return !(lhs == rhs);
59 }
60 
61 std::ostream& operator<<(std::ostream& os, AlertId const& s) {
62  os << s.category << "-" << s.key;
63  return os;
64 }
65 
66 bool operator==(Alert const& lhs, Alert const& rhs) noexcept {
67  return lhs.id == rhs.id;
68 }
69 
70 bool operator==(Alert const& lhs, AlertId const& rhs) noexcept {
71  return lhs.id == rhs;
72 }
73 
74 bool operator==(AlertId const& lhs, Alert const& rhs) noexcept {
75  return lhs == rhs.id;
76 }
77 
78 bool operator!=(Alert const& lhs, Alert const& rhs) noexcept {
79  return !(lhs == rhs);
80 }
81 
82 std::ostream& operator<<(std::ostream& os, PutTimeStruct const& t) {
83  auto timet = std::chrono::system_clock::to_time_t(t.time);
84  using milli = std::chrono::milliseconds;
85 
86  std::ios restore(nullptr);
87  restore.copyfmt(os);
88  os << std::put_time(std::gmtime(&timet), "%FT%T.") << std::setfill('0') << std::setw(3)
89  << std::chrono::duration_cast<milli>(t.time.time_since_epoch()).count() % 1000;
90 
91  os.copyfmt(restore);
92  return os;
93 }
94 
95 PutTimeStruct PutTime(std::chrono::system_clock::time_point const& time) {
96  return PutTimeStruct{time};
97 }
98 
99 std::ostream& operator<<(std::ostream& os, Alert const& alert) {
100  os << PutTime(alert.timestamp) << " (" << alert.id << "): " << alert.description;
101  return os;
102 }
103 
104 std::ostream& operator<<(std::ostream& os, std::vector<Alert> const& alerts) {
105  auto count = 0u;
106  for (auto const& alert : alerts) {
107  os << "[" << count << "]: " << alert << "\n";
108  ++count;
109  }
110  return os;
111 }
112 
113 bool ReceiverStatus::IsFinalState() const noexcept {
115 }
116 
117 bool operator==(ReceiverStatus const& lhs, ReceiverStatus const& rhs) noexcept {
118  return lhs.state == rhs.state;
119 }
120 
121 bool operator!=(ReceiverStatus const& lhs, ReceiverStatus const& rhs) noexcept {
122  return !(lhs == rhs);
123 }
124 
125 std::ostream& operator<<(std::ostream& os, ReceiverStatus::State state) {
126  switch (state) {
128  os << "NotStarted";
129  break;
131  os << "Started";
132  break;
134  os << "Success";
135  break;
137  os << "Failure";
138  break;
139  default:
140  os << "Unknown";
141  break;
142  };
143  return os;
144 }
145 
147  std::string id, std::string file_id, State state, bool error, TimePoint timestamp) noexcept
148  : id(std::move(id))
149  , file_id(std::move(file_id))
150  , state(state)
151  , error(error)
152  , alerts()
153  , receivers()
154  , timestamp(timestamp) {
155 }
156 
157 Status::Status(std::string id, std::string file_id) noexcept
158  : id(std::move(id))
159  , file_id(std::move(file_id))
160  , state(State::NotStarted)
161  , error(false)
162  , alerts()
163  , receivers()
164  , timestamp(TimePoint::clock::now()) {
165 }
166 
167 bool Status::operator==(Status const& rhs) const noexcept {
168  return id == rhs.id && file_id == rhs.file_id && state == rhs.state && error == rhs.error &&
169  alerts == rhs.alerts && receivers == rhs.receivers && result == rhs.result;
170 }
171 
172 bool Status::operator!=(Status const& rhs) const noexcept {
173  return !(*this == rhs);
174 }
175 
176 std::ostream& operator<<(std::ostream& os, Status const& s) {
177  os << "Status(id='" << s.id << "', file_id='" << s.file_id << "', state=" << s.state
178  << ", error=" << (s.error ? "true" : "false") << ", result='" << s.result
179  << "', timestamp=" << PutTime(s.timestamp) << ")";
180  return os;
181 }
182 
184  : m_status(status), m_force(force) {
185  if (m_status) {
186  m_status->DisableSignals();
187  }
188 }
189 
191  : m_status(nullptr) {
192  std::swap(m_status, other.m_status);
193  std::swap(m_force, other.m_force);
194 }
195 
198  // Reset this before stealing resources from other
199  Reset();
200 
201  std::swap(m_status, other.m_status);
202  std::swap(m_force, other.m_force);
203  return *this;
204 }
205 
207  Reset();
208 }
209 
211  if (!m_status) {
212  return;
213  }
214  m_status->EnableSignals(m_force);
215  m_status = nullptr;
216 }
217 
218 ObservableStatus::ObservableStatus(Status status) : m_status(std::move(status)) {
219 }
220 
221 ObservableStatus::ObservableStatus(std::string id, std::string file_id) noexcept
222  : m_status(std::move(id), std::move(file_id)) {
223 }
224 
226  if (m_status == status && m_status.timestamp == status.timestamp) {
227  return *this;
228  }
229  if (m_status.id != status.id) {
230  throw std::invalid_argument(fmt::format(
231  "Precondition not met (equality of DAQ IDs): {} != {}", m_status.id, status.id));
232  }
233  m_status = status;
234  RecordChanges(true, UpdateTimestamp::No);
235  return *this;
236 }
237 
238 bool ObservableStatus::operator==(ObservableStatus const& rhs) const noexcept {
239  return m_status == rhs.m_status;
240 }
241 
242 bool ObservableStatus::operator!=(ObservableStatus const& rhs) const noexcept {
243  return !(*this == rhs);
244 }
245 
246 bool ObservableStatus::operator==(Status const& rhs) const noexcept {
247  return m_status == rhs;
248 }
249 
250 bool ObservableStatus::operator!=(Status const& rhs) const noexcept {
251  return !(*this == rhs);
252 }
253 
254 std::string const& ObservableStatus::GetId() const noexcept {
255  return m_status.id;
256 }
257 
258 std::vector<Alert> const& ObservableStatus::GetAlerts() const noexcept {
259  return m_status.alerts;
260 }
261 
262 Status::Clock::time_point ObservableStatus::GetTimestamp() const noexcept {
263  return m_status.timestamp;
264 }
265 
266 std::string const& ObservableStatus::GetFileId() const noexcept {
267  return m_status.file_id;
268 }
269 
271  return m_status.state;
272 }
273 
274 bool ObservableStatus::GetError() const noexcept {
275  return m_status.error;
276 }
277 
278 void ObservableStatus::SetState(State s, std::optional<bool> error) noexcept {
279  if (s == m_status.state) {
280  if (!error || error.value() == m_status.error) {
281  return;
282  }
283  }
284  m_status.state = s;
285  if (error.has_value()) {
286  m_status.error = *error;
287  }
288  RecordChanges();
289 }
290 
291 void ObservableStatus::SetReceiverStatus(std::size_t index, ReceiverStatus status) {
292  auto [it, inserted] = m_status.receivers.try_emplace(index, status);
293  if (inserted) {
294  RecordChanges();
295  } else if (it->second != status) {
296  it->second = status;
297  RecordChanges();
298  }
299 }
300 
301 ReceiverStatus ObservableStatus::GetReceiverStatus(std::size_t index) const noexcept {
302  if (auto it = m_status.receivers.find(index); it != m_status.receivers.end()) {
303  return it->second;
304  } else {
306  }
307 }
308 
309 void ObservableStatus::SetError(bool error) noexcept {
310  if (error == m_status.error) {
311  return;
312  }
313  m_status.error = error;
314  RecordChanges();
315 }
316 
317 void ObservableStatus::SetResult(std::string result) {
318  m_status.result = std::move(result);
319  RecordChanges();
320 }
321 
323  ::daq::SetAlert(m_status.alerts, std::move(alert));
324  RecordChanges();
325 }
326 
328  if (::daq::ClearAlert(m_status.alerts, alert)) {
329  RecordChanges();
330  }
331 }
332 
333 void ObservableStatus::EmitSignal(bool force) noexcept {
334  // Unless forced, emit signal if not blocked and there are changes.
335  if (force || (m_deferred_signals == 0 && m_changes_since_signal > 0u)) {
336  try {
337  m_signal(*this);
338  } catch (...) {
339  }
340  // Reset number of changes since emitted signal
341  m_changes_since_signal = 0u;
342  }
343 }
344 
345 void ObservableStatus::RecordChanges(bool force, UpdateTimestamp update) noexcept {
346  m_changes_since_signal++;
347  if (update == UpdateTimestamp::Yes) {
348  m_status.timestamp = Status::TimePoint::clock::now();
349  }
350  EmitSignal(force);
351 }
352 
353 ObservableStatus::operator Status() const {
354  return m_status;
355 }
356 
357 Status const& ObservableStatus::GetStatus() const noexcept {
358  return m_status;
359 }
360 
361 void ObservableStatus::EnableSignals(bool force) noexcept {
362  assert(m_deferred_signals > 0);
363  --m_deferred_signals;
364 
365  EmitSignal(force);
366 }
367 
369  ++m_deferred_signals;
370 }
371 
372 std::ostream& operator<<(std::ostream& os, ObservableStatus const& s) {
373  os << "ObservableStatus(id='" << s.GetId() << "', file_id='" << s.GetFileId()
374  << "', state=" << s.GetState() << ", error=" << (s.GetError() ? "true" : "false") << ")";
375  return os;
376 }
377 
378 } // namespace daq
Defer signal changes until later time.
Definition: status.hpp:222
void Reset() noexcept
If object is valid this will unblock and signal changes and then remove reference to ObservableStatus...
Definition: status.cpp:210
DeferSignal & operator=(DeferSignal &&) noexcept
Definition: status.cpp:197
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:210
void EnableSignals(bool force) noexcept
Decrement signal blocker and if number of blocks reaches zero emit signal if generations are differnt...
Definition: status.cpp:361
State GetState() const noexcept
Definition: status.cpp:270
void SetError(bool error) noexcept
Set error flag for data acquisition.
Definition: status.cpp:309
ObservableStatus & operator=(ObservableStatus &&)=default
void EmitSignal(bool forced=false) noexcept
Emit signal, which unless force == true, will only emit signal unless blocked.
Definition: status.cpp:333
void SetReceiverStatus(std::size_t index, ReceiverStatus status)
Set receiver status.
Definition: status.cpp:291
Status const & GetStatus() const noexcept
Connect observer that is invoked when state is modified.
Definition: status.cpp:357
void ClearAlert(AlertId const &alert)
Clear alert.
Definition: status.cpp:327
bool operator==(ObservableStatus const &rhs) const noexcept
Definition: status.cpp:238
void SetState(State s, std::optional< bool > error=std::nullopt) noexcept
Set state of data acquisition.
Definition: status.cpp:278
void DisableSignals() noexcept
Disable signals from being sent (unless forced).
Definition: status.cpp:368
void SetResult(std::string result)
Set resulting data product path.
Definition: status.cpp:317
std::string const & GetFileId() const noexcept
Definition: status.cpp:266
bool GetError() const noexcept
Definition: status.cpp:274
ReceiverStatus GetReceiverStatus(std::size_t index) const noexcept
Get receiver status.
Definition: status.cpp:301
bool operator!=(ObservableStatus const &rhs) const noexcept
Definition: status.cpp:242
Status::Clock::time_point GetTimestamp() const noexcept
Definition: status.cpp:262
void SetAlert(Alert alert)
Set alert.
Definition: status.cpp:322
std::vector< Alert > const & GetAlerts() const noexcept
Definition: status.cpp:258
ObservableStatus(std::string id, std::string file_id) noexcept
Construct a new object.
Definition: status.cpp:221
std::string const & GetId() const noexcept
Definition: status.cpp:254
AlertId id
Definition: status.hpp:90
AlertId MakeAlertId(std::string_view category, std::string key)
Definition: status.cpp:49
std::string key
Unique key for each alert.
Definition: status.hpp:67
std::string description
Definition: status.hpp:91
bool operator==(DaqContext const &lhs, DaqContext const &rhs) noexcept
Definition: daqContext.cpp:12
TimePoint timestamp
Definition: status.hpp:92
bool operator!=(AlertId const &lhs, AlertId const &rhs) noexcept
Definition: status.cpp:57
std::ostream & operator<<(std::ostream &os, AsyncProcessIf const &proc)
Formats proc representation in the form [<pid>] <args>
std::string category
Standardized category.
Definition: status.hpp:63
void SetAlert(std::vector< Alert > &alerts, Alert alert)
Set alert.
Definition: status.cpp:19
bool ClearAlert(std::vector< Alert > &alerts, AlertId const &alert)
Clear alert.
Definition: status.cpp:30
State
Observable states of the data acquisition process.
Definition: state.hpp:39
@ NotStarted
Initial state of data acquisition.
std::chrono::system_clock::time_point time
Definition: status.hpp:75
PutTimeStruct PutTime(std::chrono::system_clock::time_point const &time)
Definition: status.cpp:95
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
Definition: status.cpp:39
Describes an active Data Acquisition alert.
Definition: status.hpp:86
Uniquely identfies an alert.
Definition: status.hpp:59
Definition: main.cpp:23
Contains declaration for Status and ObservableStatus.
Persistent status for receiver delivery.
Definition: status.hpp:134
bool IsFinalState() const noexcept
Definition: status.cpp:113
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:153
bool operator!=(Status const &rhs) const noexcept
Definition: status.cpp:172
bool operator==(Status const &rhs) const noexcept
Definition: status.cpp:167
State state
Definition: status.hpp:176
std::string id
Definition: status.hpp:174
std::string result
Path to resulting data product.
Definition: status.hpp:193
std::map< std::size_t, ReceiverStatus > receivers
Receiver processing (e.g.
Definition: status.hpp:188
std::string file_id
Definition: status.hpp:175
bool error
Definition: status.hpp:177
std::vector< Alert > alerts
Active alerts.
Definition: status.hpp:181
std::chrono::time_point< std::chrono::system_clock > TimePoint
Definition: status.hpp:154
TimePoint timestamp
Timestamp of last update.
Definition: status.hpp:198
Status()=default