15#include <boost/assert.hpp>
16#include <fmt/format.h>
21 auto it = find(alerts.begin(), alerts.end(), alert);
22 if (it != alerts.end()) {
24 *it = std::move(alert);
27 alerts.emplace_back(std::move(alert));
34 ? std::remove_if(alerts.begin(),
36 [&](
Alert const& cmp) { return cmp.id.category == alert.category; })
37 : std::remove(alerts.begin(), alerts.end(), alert);
38 if (it != alerts.end()) {
39 alerts.erase(it, alerts.end());
45Alert MakeAlert(std::string_view category, std::string key, std::string description) {
46 return Alert{{std::string(category), std::move(key)},
47 std::move(description),
48 Alert::TimePoint::clock::now()};
52 return Alert{std::move(
id), std::move(description), Alert::TimePoint::clock::now()};
56 return AlertId{std::string(category), std::move(key)};
60 return lhs.key == rhs.key && lhs.category == rhs.category;
73 return lhs.id == rhs.id;
89 auto timet = std::chrono::system_clock::to_time_t(t.
time);
90 using milli = std::chrono::milliseconds;
92 std::ios restore(
nullptr);
94 os << std::put_time(std::gmtime(&timet),
"%FT%T.") << std::setfill(
'0') << std::setw(3)
95 << std::chrono::duration_cast<milli>(t.
time.time_since_epoch()).count() % 1000;
110std::ostream&
operator<<(std::ostream& os, std::vector<Alert>
const& alerts) {
112 for (
auto const& alert : alerts) {
113 os <<
"[" << count <<
"]: " << alert <<
"\n";
124 return lhs.state == rhs.state;
128 return !(lhs == rhs);
154 , file_id(std::move(file_id))
158 , timestamp(timestamp) {
163 , file_id(std::move(file_id))
167 , timestamp(TimePoint::clock::now()) {
171 return id == rhs.id && file_id == rhs.file_id && state == rhs.state && alerts == rhs.alerts &&
172 receivers == rhs.receivers && result == rhs.result;
176 return !(*
this == rhs);
180 return !status.alerts.empty();
184 os <<
"Status(id='" << s.
id <<
"', file_id='" << s.
file_id <<
"', state=" << s.
state
185 <<
", error=" << (
HasError(s) ?
"true" :
"false") <<
", result='" << s.
result
191 : m_status(status), m_force(force) {
198 : m_status(
nullptr) {
199 std::swap(m_status, other.m_status);
200 std::swap(m_force, other.m_force);
208 std::swap(m_status, other.m_status);
209 std::swap(m_force, other.m_force);
221 m_status->EnableSignals(m_force);
226 : m_status(status), m_defer(status), m_id(std::move(id)) {
230 std::string_view category,
232 : m_status(status), m_defer(status), m_id{std::string(category), std::move(key)} {
236 BOOST_ASSERT_MSG(m_status !=
nullptr,
"ObservableStatus is a nullptr");
241 BOOST_ASSERT_MSG(m_status !=
nullptr,
"ObservableStatus is a nullptr");
249 : m_status(std::move(
id), std::move(file_id)) {
256 if (m_status.
id != status.
id) {
257 throw std::invalid_argument(fmt::format(
258 "Precondition not met (equality of DAQ IDs): {} != {}", m_status.
id, status.
id));
261 RecordChanges(
true, UpdateTimestamp::No);
266 return m_status == rhs.m_status;
270 return !(*
this == rhs);
274 return m_status == rhs;
278 return !(*
this == rhs);
298 return m_status.
state;
306 if (s == m_status.state) {
314 auto [it, inserted] = m_status.
receivers.try_emplace(index, status);
317 }
else if (it->second != status) {
324 if (
auto it = m_status.receivers.find(index); it != m_status.receivers.end()) {
332 m_status.
result = std::move(result);
349 if (force || (m_deferred_signals == 0 && m_changes_since_signal > 0u)) {
355 m_changes_since_signal = 0u;
359void ObservableStatus::RecordChanges(
bool force, UpdateTimestamp update)
noexcept {
360 m_changes_since_signal++;
361 if (update == UpdateTimestamp::Yes) {
362 m_status.timestamp = Status::TimePoint::clock::now();
367ObservableStatus::operator
Status()
const {
376 assert(m_deferred_signals > 0);
377 --m_deferred_signals;
383 ++m_deferred_signals;
387 os <<
"ObservableStatus(id='" << s.
GetId() <<
"', file_id='" << s.
GetFileId()
388 <<
"', state=" << s.
GetState() <<
", error=" << (s.
HasError() ?
"true" :
"false") <<
")";
AlertActivator(ObservableStatus *, AlertId id)
void Set(std::string description)
Defer signal changes until later time.
void Reset() noexcept
If object is valid this will unblock and signal changes and then remove reference to ObservableStatus...
DeferSignal & operator=(DeferSignal &&) noexcept
Stores data acquisition status and allows subscription to status changes.
void EnableSignals(bool force) noexcept
Decrement signal blocker and if number of blocks reaches zero emit signal if generations are differnt...
State GetState() const noexcept
void EmitSignal(bool forced=false) noexcept
Emit signal, which unless force == true, will only emit signal unless blocked.
void SetReceiverStatus(std::size_t index, ReceiverStatus status)
Set receiver status.
Status const & GetStatus() const noexcept
Connect observer that is invoked when state is modified.
void ClearAlert(AlertId const &alert)
Clear alert.
bool operator==(ObservableStatus const &rhs) const noexcept
void DisableSignals() noexcept
Disable signals from being sent (unless forced).
ObservableStatus & operator=(ObservableStatus &&)=default
void SetState(State s) noexcept
Set state of data acquisition.
void SetResult(std::string result)
Set resulting data product path.
std::string const & GetFileId() const noexcept
ReceiverStatus GetReceiverStatus(std::size_t index) const noexcept
Get receiver status.
bool operator!=(ObservableStatus const &rhs) const noexcept
Status::Clock::time_point GetTimestamp() const noexcept
void SetAlert(Alert alert)
Set alert.
Alerts const & GetAlerts() const noexcept
bool HasError() const noexcept
ObservableStatus(std::string id, std::string file_id) noexcept
Construct a new object.
std::string const & GetId() const noexcept
AlertId MakeAlertId(std::string_view category, std::string key)
std::string key
Unique key for each alert.
bool operator==(DaqContext const &lhs, DaqContext const &rhs) noexcept
bool operator!=(AlertId const &lhs, AlertId const &rhs) noexcept
std::ostream & operator<<(std::ostream &os, AsyncProcessIf const &proc)
Formats proc representation in the form [<pid>] <args>
bool HasError(Status const &status) noexcept
std::string category
Standardized category.
void SetAlert(std::vector< Alert > &alerts, Alert alert)
Set alert.
bool ClearAlert(std::vector< Alert > &alerts, AlertId const &alert)
Clear alert.
State
Observable states of the data acquisition process.
@ NotStarted
Initial state of data acquisition.
std::chrono::system_clock::time_point time
PutTimeStruct PutTime(std::chrono::system_clock::time_point const &time)
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
Describes an active Data Acquisition alert.
Uniquely identfies an alert.
Contains declaration for Status and ObservableStatus.
Persistent status for receiver delivery.
bool IsFinalState() const noexcept
Non observable status object that keeps stores status of data acquisition.
bool operator!=(Status const &rhs) const noexcept
bool operator==(Status const &rhs) const noexcept
std::string result
Path to resulting data product.
std::map< std::size_t, ReceiverStatus > receivers
Receiver processing (e.g.
bool ClearAlert(std::vector< Alert > &alerts, AlertId const &alert)
Clear alert.
std::vector< Alert > alerts
Active alerts.
std::chrono::time_point< std::chrono::system_clock > TimePoint
TimePoint timestamp
Timestamp of last update.
void SetAlert(std::vector< Alert > &alerts, Alert alert)
Set alert.