15 #include <fmt/format.h>
20 auto it = find(alerts.begin(), alerts.end(), alert);
21 if (it != alerts.end()) {
23 *it = std::move(alert);
26 alerts.emplace_back(std::move(alert));
31 auto it = std::remove(alerts.begin(), alerts.end(), alert);
32 if (it != alerts.end()) {
33 alerts.erase(it, alerts.end());
39 Alert MakeAlert(std::string_view category, std::string key, std::string description) {
40 return Alert{{std::string(category), std::move(key)},
41 std::move(description),
42 Alert::TimePoint::clock::now()};
46 return Alert{std::move(
id), std::move(description), Alert::TimePoint::clock::now()};
50 return AlertId{std::string(category), std::move(key)};
54 return lhs.key == rhs.key && lhs.category == rhs.category;
67 return lhs.id == rhs.id;
83 auto timet = std::chrono::system_clock::to_time_t(t.
time);
84 using milli = std::chrono::milliseconds;
86 std::ios restore(
nullptr);
88 os << std::put_time(std::gmtime(&timet),
"%FT%T.") << std::setfill(
'0') << std::setw(3)
89 << std::chrono::duration_cast<milli>(t.
time.time_since_epoch()).count() % 1000;
104 std::ostream&
operator<<(std::ostream& os, std::vector<Alert>
const& alerts) {
106 for (
auto const& alert : alerts) {
107 os <<
"[" << count <<
"]: " << alert <<
"\n";
118 return lhs.state == rhs.state;
122 return !(lhs == rhs);
147 std::string
id, std::string file_id,
State state,
bool error,
TimePoint timestamp) noexcept
149 , file_id(std::move(file_id))
154 , timestamp(timestamp) {
159 , file_id(std::move(file_id))
164 , timestamp(TimePoint::clock::now()) {
168 return id == rhs.id && file_id == rhs.file_id && state == rhs.state &&
error == rhs.error &&
169 alerts == rhs.alerts && receivers == rhs.receivers && result == rhs.result;
173 return !(*
this == rhs);
177 os <<
"Status(id='" << s.
id <<
"', file_id='" << s.
file_id <<
"', state=" << s.
state
178 <<
", error=" << (s.
error ?
"true" :
"false") <<
", result='" << s.
result
184 : m_status(status), m_force(force) {
191 : m_status(
nullptr) {
192 std::swap(m_status, other.m_status);
193 std::swap(m_force, other.m_force);
201 std::swap(m_status, other.m_status);
202 std::swap(m_force, other.m_force);
214 m_status->EnableSignals(m_force);
222 : m_status(std::move(
id), std::move(file_id)) {
229 if (m_status.
id != status.
id) {
230 throw std::invalid_argument(fmt::format(
231 "Precondition not met (equality of DAQ IDs): {} != {}", m_status.
id, status.
id));
234 RecordChanges(
true, UpdateTimestamp::No);
239 return m_status == rhs.m_status;
243 return !(*
this == rhs);
247 return m_status == rhs;
251 return !(*
this == rhs);
271 return m_status.
state;
275 return m_status.
error;
279 if (s == m_status.state) {
280 if (!
error ||
error.value() == m_status.error) {
285 if (
error.has_value()) {
286 m_status.error = *
error;
292 auto [it, inserted] = m_status.
receivers.try_emplace(index, status);
295 }
else if (it->second != status) {
302 if (
auto it = m_status.receivers.find(index); it != m_status.receivers.end()) {
310 if (
error == m_status.error) {
313 m_status.error =
error;
318 m_status.
result = std::move(result);
335 if (force || (m_deferred_signals == 0 && m_changes_since_signal > 0u)) {
341 m_changes_since_signal = 0u;
345 void ObservableStatus::RecordChanges(
bool force, UpdateTimestamp update) noexcept {
346 m_changes_since_signal++;
347 if (update == UpdateTimestamp::Yes) {
348 m_status.timestamp = Status::TimePoint::clock::now();
353 ObservableStatus::operator
Status()
const {
362 assert(m_deferred_signals > 0);
363 --m_deferred_signals;
369 ++m_deferred_signals;
373 os <<
"ObservableStatus(id='" << s.
GetId() <<
"', file_id='" << s.
GetFileId()
374 <<
"', state=" << s.
GetState() <<
", error=" << (s.
GetError() ?
"true" :
"false") <<
")";
Defer signal changes until later time.
void Reset() noexcept
If object is valid this will unblock and signal changes and then remove reference to ObservableStatus...
DeferSignal & operator=(DeferSignal &&) noexcept
Stores data acquisition status and allows subscription to status changes.
void EnableSignals(bool force) noexcept
Decrement signal blocker and if number of blocks reaches zero emit signal if generations are differnt...
State GetState() const noexcept
void SetError(bool error) noexcept
Set error flag for data acquisition.
ObservableStatus & operator=(ObservableStatus &&)=default
void EmitSignal(bool forced=false) noexcept
Emit signal, which unless force == true, will only emit signal unless blocked.
void SetReceiverStatus(std::size_t index, ReceiverStatus status)
Set receiver status.
Status const & GetStatus() const noexcept
Connect observer that is invoked when state is modified.
void ClearAlert(AlertId const &alert)
Clear alert.
bool operator==(ObservableStatus const &rhs) const noexcept
void SetState(State s, std::optional< bool > error=std::nullopt) noexcept
Set state of data acquisition.
void DisableSignals() noexcept
Disable signals from being sent (unless forced).
void SetResult(std::string result)
Set resulting data product path.
std::string const & GetFileId() const noexcept
bool GetError() const noexcept
ReceiverStatus GetReceiverStatus(std::size_t index) const noexcept
Get receiver status.
bool operator!=(ObservableStatus const &rhs) const noexcept
Status::Clock::time_point GetTimestamp() const noexcept
void SetAlert(Alert alert)
Set alert.
std::vector< Alert > const & GetAlerts() const noexcept
ObservableStatus(std::string id, std::string file_id) noexcept
Construct a new object.
std::string const & GetId() const noexcept
AlertId MakeAlertId(std::string_view category, std::string key)
std::string key
Unique key for each alert.
bool operator==(DaqContext const &lhs, DaqContext const &rhs) noexcept
bool operator!=(AlertId const &lhs, AlertId const &rhs) noexcept
std::ostream & operator<<(std::ostream &os, AsyncProcessIf const &proc)
Formats proc representation in the form [<pid>] <args>
std::string category
Standardized category.
void SetAlert(std::vector< Alert > &alerts, Alert alert)
Set alert.
bool ClearAlert(std::vector< Alert > &alerts, AlertId const &alert)
Clear alert.
State
Observable states of the data acquisition process.
@ NotStarted
Initial state of data acquisition.
std::chrono::system_clock::time_point time
PutTimeStruct PutTime(std::chrono::system_clock::time_point const &time)
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
Describes an active Data Acquisition alert.
Uniquely identfies an alert.
Contains declaration for Status and ObservableStatus.
Persistent status for receiver delivery.
bool IsFinalState() const noexcept
Non observable status object that keeps stores status of data acquisition.
bool operator!=(Status const &rhs) const noexcept
bool operator==(Status const &rhs) const noexcept
std::string result
Path to resulting data product.
std::map< std::size_t, ReceiverStatus > receivers
Receiver processing (e.g.
std::vector< Alert > alerts
Active alerts.
std::chrono::time_point< std::chrono::system_clock > TimePoint
TimePoint timestamp
Timestamp of last update.