15#include <fmt/format.h>
16#include <fmt/ostream.h>
17#include <log4cplus/loggingmacros.h>
32 : std::invalid_argument(fmt::format(
"DAQ with id '{}' not found", id)) {
36 : std::invalid_argument(fmt::format(
"DAQ with id '{}' not found ({})", id, reason)) {
41 std::chrono::system_clock::time_point creation_time) {
42 auto now = std::chrono::system_clock::now();
47 if (full.state == daqif::StateAcquiring) {
50 if (full.state == daqif::StateMerging) {
58 std::chrono::system_clock::time_point* tp) {
59 using time_point = std::chrono::system_clock::time_point;
60 using duration = std::chrono::system_clock::duration;
61 using seconds = std::chrono::seconds;
62 using microseconds = std::chrono::microseconds;
69 struct timeval jitter_tv {
72 if (gettimeofday(&tv,
nullptr) != 0) {
74 throw boost::enable_current_exception(std::system_error(errno, std::generic_category()));
79 timeradd(&tv, &jitter_tv, &res);
83 time_t time = tv.tv_sec;
84 if (gmtime_r(&time, &tm_time) ==
nullptr) {
86 throw boost::enable_current_exception(std::system_error(errno, std::generic_category()));
89 char time_str[31] = {0};
90 int n = snprintf(&time_str[0], 7,
"%.5s.", instrument_id);
92 strftime(&time_str[n], 20,
"%Y-%m-%dT%H:%M:%S", &tm_time);
94 snprintf(&frac_str[0], 5,
".%.3d",
static_cast<int>(tv.tv_usec / 1000.0));
96 strncpy(&time_str[n + 19], &frac_str[0], 4);
100 std::chrono::duration_cast<duration>(seconds(tv.tv_sec) + microseconds(tv.tv_usec)));
102 return std::string(time_str, n + 23);
109 std::shared_ptr<ObservableEventLog> event_log,
111 std::shared_ptr<DpmClient> dpm_client,
112 log4cplus::Logger
const& logger)
113 : m_alive_token(std::make_shared<bool>())
114 , m_executor(executor)
115 , m_params(std::move(params))
116 , m_workspace(workspace)
117 , m_kw_formatter(kw_formatter)
118 , m_event_log(std::move(event_log))
119 , m_daq_factory(daq_factory)
120 , m_dpm_client(std::move(dpm_client))
126 for (
auto& op : m_abort_funcs) {
129 }
catch (std::exception
const& e) {
130 LOG4CPLUS_WARN(m_logger,
131 fmt::format(
"ManagerImpl::~ManagerImpl: Error when aborting "
140 LOG4CPLUS_INFO(m_logger,
"RestoreFromWorkspace: Starting");
143 decltype(ids) pruned;
145 for (
auto const&
id : ids) {
147 LOG4CPLUS_INFO(m_logger,
"RestoreFromWorkspace: Loading state for DAQ " <<
id);
152 if (
IsStale(m_params, status.state, context.creation_time)) {
153 LOG4CPLUS_INFO(m_logger,
154 "RestoreFromWorkspace: DAQ " << status <<
" is stale -> archiving");
161 if (full.state == daqif::StateAcquiring) {
162 LOG4CPLUS_INFO(m_logger,
163 "RestoreFromWorkspace: Restoring OCM phase DAQ: " << status);
165 std::make_shared<ObservableStatus>(status),
169 AddDaq(
daq, Store::No);
171 pruned.push_back(
id);
172 }
else if (full.state == daqif::StateMerging) {
173 LOG4CPLUS_INFO(m_logger,
174 "RestoreFromWorkspace: Restoring DPM phase DAQ: " << status);
176 std::move(context), std::make_shared<ObservableStatus>(status), m_event_log);
178 AddDaq(
daq, Store::No);
181 pruned.push_back(
id);
185 "RestoreFromWorkspace: Skipping loading DAQ in unsupported state: " << status);
189 LOG4CPLUS_ERROR(m_logger,
190 "RestoreFromWorkspace: Loading state for DAQ "
191 <<
id <<
" failed (ignoring): " << r);
196 LOG4CPLUS_ERROR(m_logger,
197 "RestoreFromWorkspace: Failed to archive DAQ " <<
id
201 LOG4CPLUS_INFO(m_logger,
"RestoreFromWorkspace: Skipping " <<
id);
208 LOG4CPLUS_INFO(m_logger,
"RestoreFromWorkspace: Successfully completed");
210 LOG4CPLUS_INFO(m_logger,
"RestoreFromWorkspace: Failed");
211 std::throw_with_nested(std::runtime_error(
"Failed to restore from workspace"));
215 for (
unsigned jitter = 0;; ++jitter) {
217 if (!
HaveDaq(id_candidate, id_candidate)) {
226 std::find_if(m_daq_controllers.begin(), m_daq_controllers.end(), [=](
auto const&
daq) {
229 return daq.id == id ||
230 (!file_id.empty() && file_id == daq.controller->GetContext().file_id);
232 if (it != m_daq_controllers.end()) {
233 LOG4CPLUS_DEBUG(m_logger,
234 "Manager: Found conflicting DAQ: id="
235 <<
id <<
", file_id=" << file_id <<
" with existing: id=" << it->id
236 <<
", file_id=" << it->controller->GetContext().file_id);
242void ManagerImpl::AddInitialKeywords(
DaqContext& ctx) {
245 kws.emplace_back(std::in_place_type<fits::ValueKeyword>,
"ORIGIN", m_params.
origin);
246 kws.emplace_back(std::in_place_type<fits::ValueKeyword>,
"INSTRUME", m_params.
instrument_id);
257void ManagerImpl::FormatKeywordSources(DaqContext& ctx) {
261 auto const& fmt = m_kw_formatter;
264 for (DpPart & part : ctx.results) {
265 if (
auto * ds = std::get_if<fits::KeywordVector>(&part.Part()); ds !=
nullptr) {
266 LOG4CPLUS_INFO(m_logger,
268 <<
"): Validating & reformatting keywords for source '"
269 << part.SourceName() <<
"'");
270 for (
auto& kw : *ds) {
271 auto fkw = fmt.Format(kw);
272 LOG4CPLUS_TRACE(m_logger,
"Before and after formatting: " << kw <<
" -> " << fkw);
279void ManagerImpl::AddDaq(std::shared_ptr<DaqController>
const&
daq, Store store) {
281 LOG4CPLUS_INFO(m_logger,
"Manager: AddDaq: Attempting to add DAQ " <<
daq->GetId());
282 if (
daq->GetId().empty()) {
283 throw boost::enable_current_exception(InvalidDaqId(
"",
"DAQ id cannot be empty"));
285 if (
daq->GetContext().file_id.empty()) {
286 throw boost::enable_current_exception(
287 std::invalid_argument(
"DaqController has empty file_id"));
290 throw boost::enable_current_exception(
291 InvalidDaqId(
daq->GetId(),
"DAQ with same id already exists"));
294 if (store == Store::Yes) {
303 m_dpm_client->StartMonitorStatus(
daq->GetId());
306 m_daq_controllers.emplace_back(
309 daq->GetStatus()->ConnectObserver([alive = std::weak_ptr<bool>(m_alive_token),
310 prev_state =
daq->GetState(),
311 this](ObservableStatus
const& status)
mutable {
312 if (alive.expired()) {
313 LOG4CPLUS_INFO(
"daq",
"Manager has expired");
317 if (!IsFinalState(prev_state)) {
321 fmt::format(
"DAQ transitioned to a final state -> archiving: {} (prev {})",
324 m_workspace.StoreStatus(status);
326 m_executor.submit([alive = alive, id = status.GetId(), this] {
327 if (alive.expired()) {
328 LOG4CPLUS_INFO(
"daq",
"Manager has expired");
335 m_workspace.StoreStatus(status);
343 m_executor.submit([alive = alive, id = status.GetId(), this] {
344 if (alive.expired()) {
345 LOG4CPLUS_INFO(
"daq",
"Manager has expired");
348 MoveToMergePhase(id);
354 m_dpm_client->StartMonitorStatus(status.GetId());
359 this->m_status_signal.Signal(status);
361 prev_state = status.GetState();
364 [alive = std::weak_ptr<bool>(m_alive_token),
this](DaqContext
const& ctx) {
365 if (alive.expired()) {
366 LOG4CPLUS_INFO(
"daq",
"Manager has expired");
369 m_workspace.StoreContext(ctx);
372 if (store == Store::Yes) {
378 m_status_signal.Signal(*
daq->GetStatus());
380 if (
daq->GetState() == State::NotScheduled) {
385void ManagerImpl::RemoveDaq(std::string_view
id) {
386 auto it = std::find_if(m_daq_controllers.begin(),
387 m_daq_controllers.end(),
388 [
id](
auto const&
daq) { return daq.id == id; });
389 if (it == m_daq_controllers.end()) {
390 throw boost::enable_current_exception(InvalidDaqId(
id));
392 m_daq_controllers.erase(it);
395void ManagerImpl::ArchiveDaq(std::string
const&
id) {
396 LOG4CPLUS_INFO(m_logger, fmt::format(
"Moving DAQ to archive: id={}",
id));
399 m_dpm_client->StopMonitorStatus(
id);
402 m_workspace.ArchiveDaq(
id);
408 LOG4CPLUS_INFO(m_logger, fmt::format(
"Moving DAQ to archive done: id={}",
id));
411void ManagerImpl::StoreActiveDaqs()
const {
412 LOG4CPLUS_INFO(m_logger,
"StoreActiveDaqs()");
414 std::vector<std::string> daqs;
415 for (Daq
const&
daq : m_daq_controllers) {
416 assert(
daq.controller);
418 daqs.push_back(
daq.id);
421 m_workspace.StoreList(daqs);
424Status ManagerImpl::GetStatus(std::string_view
id)
const {
425 auto*
daq = FindDaq(
id);
427 return daq->GetStatus()->GetStatus();
430 auto maybe_status = m_workspace.LoadArchivedStatus(std::string(
id));
432 return *maybe_status;
434 throw boost::enable_current_exception(
InvalidDaqId(
id));
438ManagerImpl::Daq::Daq(std::string id_arg,
439 std::shared_ptr<DaqController> controller_arg,
440 boost::signals2::connection conn_status_arg,
441 boost::signals2::connection conn_context_arg) noexcept
442 : id(std::move(id_arg))
443 , controller(std::move(controller_arg))
444 , conn_status(std::move(conn_status_arg))
445 , conn_context(std::move(conn_context_arg)) {
448DaqController
const* ManagerImpl::FindDaq(std::string_view
id)
const noexcept {
449 return const_cast<ManagerImpl*
>(
this)->FindDaq(
id);
452DaqController* ManagerImpl::FindDaq(std::string_view
id)
noexcept {
453 auto it = std::find_if(m_daq_controllers.begin(),
454 m_daq_controllers.end(),
455 [
id](
auto const&
daq) { return daq.id == id; });
456 if (it != m_daq_controllers.end()) {
457 return it->controller.get();
462DaqController& ManagerImpl::FindDaqOrThrow(std::string_view
id) {
463 auto daq_ptr = FindDaq(
id);
466 throw boost::enable_current_exception(InvalidDaqId(
id));
471DaqController
const& ManagerImpl::FindDaqOrThrow(std::string_view
id)
const {
472 return const_cast<ManagerImpl*
>(
this)->FindDaqOrThrow(
id);
475void ManagerImpl::MoveToMergePhase(std::string_view
id) {
476 auto*
daq = FindDaq(
id);
478 LOG4CPLUS_WARN(m_logger, fmt::format(
"Daq requested to move does not exist: id={}",
id));
481 LOG4CPLUS_INFO(m_logger, fmt::format(
"Moving DAQ to merge-phase: id={}",
id));
483 auto ctx =
daq->GetContext();
484 auto status =
daq->GetStatus();
485 auto event_log =
daq->GetEventLog();
491 status->SetState(State::NotScheduled);
494 m_daq_factory.MakeDpmPhase(std::move(ctx), std::move(status), std::move(event_log));
498boost::future<State> ManagerImpl::StartDaqAsync(
DaqContext ctx) {
501 FormatKeywordSources(ctx);
503 AddInitialKeywords(ctx);
507 auto daq = m_daq_factory.MakeOcmPhase(
509 std::make_shared<ObservableStatus>(std::move(
id), std::move(file_id)),
514 return daq->StartAsync()
516 [&,
daq](boost::future<State> f) -> boost::future<State> {
517 if (f.has_exception()) {
520 return daq->AbortAsync(ErrorPolicy::Tolerant)
522 [f = std::move(f)](boost::future<Status>) mutable -> State {
526 __builtin_unreachable();
529 return boost::make_ready_future<State>(f.get());
534 return boost::make_exceptional_future<State>();
538boost::future<Status> ManagerImpl::StopDaqAsync(std::string_view
id,
ErrorPolicy policy) {
540 return FindDaqOrThrow(
id).StopAsync(policy);
542 return boost::make_exceptional_future<Status>();
546boost::future<Status> ManagerImpl::AbortDaqAsync(std::string_view
id,
ErrorPolicy policy) {
548 return FindDaqOrThrow(
id).AbortAsync(policy);
550 return boost::make_exceptional_future<Status>();
554boost::future<Result<Status>> ManagerImpl::AwaitDaqStateAsync(std::string_view
id,
556 std::chrono::milliseconds timeout) {
558 auto* maybe_daq = FindDaq(
id);
561 auto status = GetStatus(
id);
565 fmt::format(
"{}: Await condition already fulfilled (archived).", status));
567 return boost::make_ready_future<Result<Status>>({
false, status});
570 throw boost::enable_current_exception(
571 InvalidDaqId(
id,
"DAQ is archived and cannot be awaited"));
575 auto&
daq = *maybe_daq;
576 auto status =
daq.GetStatus();
578 LOG4CPLUS_INFO(m_logger,
579 fmt::format(
"{}: Await condition already fulfilled.", *status));
581 return boost::make_ready_future<Result<Status>>({
false, *status});
584 auto logger = log4cplus::Logger::getInstance(m_logger.getName() +
".awaitstate");
585 auto [fut, abort] = op::InitiateAbortableOperation<op::AwaitStateAsync>(
586 m_executor, m_executor.get_io_context(), status, state, timeout, logger);
588 auto& ref = m_abort_funcs.emplace_back(std::move(abort));
589 LOG4CPLUS_DEBUG(
"daq.manager",
590 fmt::format(
"op::AwaitStateAsync initiated. id={}", ref.GetId()));
593 [
this,
id = ref.GetId(), alive = std::weak_ptr<bool>(m_alive_token)](
auto res) {
594 LOG4CPLUS_DEBUG(
"daq.manager",
595 fmt::format(
"op::AwaitStateAsync completed. id={}", id));
598 auto is_alive = !alive.expired();
607 return boost::make_exceptional_future<Result<Status>>();
612 return FindDaqOrThrow(
id).UpdateKeywords(keywords);
616 return m_status_signal;
619std::vector<std::shared_ptr<DaqController const>> ManagerImpl::GetDaqControllers() {
620 std::vector<std::shared_ptr<DaqController const>> controllers;
621 controllers.reserve(m_daq_controllers.size());
622 std::transform(m_daq_controllers.begin(),
623 m_daq_controllers.end(),
624 std::back_inserter(controllers),
625 [](
auto const&
daq) { return daq.controller; });
629void ManagerImpl::RemoveAbortFunc(std::uint64_t
id)
noexcept {
631 m_abort_funcs.erase(std::remove_if(m_abort_funcs.begin(),
633 [
id](
auto const& obj) { return id == obj.GetId(); }),
634 m_abort_funcs.end());
639ManagerImpl::OpAbortFunc::OpAbortFunc(OpAbortFunc::Func&& func)
640 : m_id(NextId()), m_func(std::move(func)) {
644std::uint64_t ManagerImpl::OpAbortFunc::GetId() const noexcept {
648bool ManagerImpl::OpAbortFunc::Abort() noexcept {
652std::uint64_t ManagerImpl::OpAbortFunc::NextId() {
653 static std::uint64_t next_id = 0;
657void ManagerImpl::ScheduleDaqsAsync() {
658 LOG4CPLUS_TRACE(m_logger,
"ScheduleDaqAsync()");
661 m_schedule_retry.reset();
663 for (
auto&
daq : m_daq_controllers) {
667 daq.controller->ScheduleMergeAsync().then(
668 m_executor, [
id =
daq.id,
this](boost::future<State> reply) {
669 if (!reply.has_exception()) {
672 fmt::format(
"ScheduleDaqAsyn: Successfully scheduled DAQ '{}'", id));
678 fmt::format(
"ScheduleDaqAsyn: Failed to schedule DAQ '{}', will retry in 60s",
682 if (m_schedule_retry) {
687 boost::posix_time::seconds(60));
688 m_schedule_retry->async_wait([
this](boost::system::error_code
const&
error) {
Contains declaration for the AwaitStateAsync operation.
Abstract factory for DaqControllers.
virtual auto MakeDpmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log) -> std::shared_ptr< DaqController >=0
Create instance for the DPM phase of the DAQ process.
virtual auto MakeOcmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, fits::KeywordFormatter const &kw_formatter) -> std::shared_ptr< DaqController >=0
Create instance for the OCM phase of the DAQ process.
ManagerImpl(rad::IoExecutor &executor, ManagerParams params, Workspace &workspace, fits::KeywordFormatter const &formatter, std::shared_ptr< ObservableEventLog > event_log, DaqControllerFactory &daq_factory, std::shared_ptr< DpmClient > dpm_client, log4cplus::Logger const &logger)
void RestoreFromWorkspace() override
Loads status and constructs DaqControllers corresponding to stored state.
bool HaveDaq(std::string_view id, std::string_view file_id={}) const noexcept override
Query existing data acquisition by id and optional file_id.
std::string MakeDaqId(std::chrono::system_clock::time_point *time=nullptr) const override
Creates a new unique identifier based on the instrument id and current time.
Interface to interact with DPM workspace.
virtual auto LoadList() const -> std::vector< std::string >=0
virtual void ArchiveDaq(std::string const &id)=0
Archives specified DAQ without deleting any files, typically by moving files it to a specific locatio...
virtual auto LoadContext(std::string const &id) const -> DaqContext=0
Get file name of the data product specification stored in StoreSpecification()
virtual void StoreList(std::vector< std::string > const &queue) const =0
virtual void StoreStatus(Status const &status) const =0
Loads last archived DAQ status if any.
virtual auto LoadStatus(std::string const &id) const -> Status=0
Loads last archived DAQ status if any.
virtual void StoreContext(DaqContext const &context) const =0
Get file name of the data product specification stored in StoreSpecification()
Adapter object intended to be used in contexts without direct access to the output-stream object.
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
boost::asio::io_context & get_io_context() noexcept
Not part of the boost::thread::executor concept.
daq::Workspace interface and implementation declaration
Contains support functions for daqif.
Contains declarations for the helper functions to initiate operations.
Declaration of daq::Manager
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
void UpdateKeywords(KeywordVector &to, KeywordVector const &from, ConflictPolicy policy=ConflictPolicy::Replace)
Updates to with keywords from from.
@ Skip
Skip keyword that conflicts.
std::string MakeIdCandidate(char const *instrument_id, unsigned jitter=0, std::chrono::system_clock::time_point *out=nullptr)
Creates a DAQ id candidate that may or may not be unique.
bool IsActiveDpmState(State state) noexcept
Query whether state is an active (non-final) state executed by DPM.
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
bool IsStale(ManagerParams const ¶ms, State state, std::chrono::system_clock::time_point creation_time)
std::chrono::hours merging_stale_age
Age of DAQ in merging state, after which it is automatically considered abandoned and will be archive...
bool IsSubsequentState(State state1, State state2) noexcept
Compares states and returns whether state1 occurs after state2.
ErrorPolicy
Error policy supported by certain operations.
std::string instrument_id
Instrument identifier.
State
Observable states of the data acquisition process.
@ NotScheduled
Before daq is acknowledged by dpm it remains in NotScheduled.
@ Stopped
All data sources have reported they have stopped acquiring data.
bool IsFinalState(State state) noexcept
Query whether state is in a final state.
std::chrono::hours acquiring_stale_age
Age of DAQ in acquiring state after which it is automatically considered abandoned and will be archiv...
Configurations parameters directly related to manager.
Contains declaration for for DaqController.
Contains declaration for Status and ObservableStatus.
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
std::vector< daq::fits::KeywordVariant > keywords
Keyword list provided by OCM to Data Product.
std::string file_id
Data Product FileId as specified by OLAS ICD.
std::string id
DAQ identfier, possibly provided by user.
Exception indicating the DAQ id was invalid.
InvalidDaqId(std::string_view id)
Non observable status object that keeps stores status of data acquisition.