14 #include <fmt/format.h>
15 #include <fmt/ostream.h>
16 #include <log4cplus/loggingmacros.h>
17 #include <mal/Mal.hpp>
31 std::chrono::system_clock::time_point creation_time) {
32 auto now = std::chrono::system_clock::now();
37 if (full.state == daqif::StateAcquiring) {
40 if (full.state == daqif::StateMerging) {
48 std::chrono::system_clock::time_point* tp) {
49 using time_point = std::chrono::system_clock::time_point;
50 using duration = std::chrono::system_clock::duration;
51 using seconds = std::chrono::seconds;
52 using microseconds = std::chrono::microseconds;
59 struct timeval jitter_tv {
62 if (gettimeofday(&tv,
nullptr) != 0) {
64 throw boost::enable_current_exception(std::system_error(errno, std::generic_category()));
69 timeradd(&tv, &jitter_tv, &res);
73 time_t time = tv.tv_sec;
74 if (gmtime_r(&time, &tm_time) ==
nullptr) {
76 throw boost::enable_current_exception(std::system_error(errno, std::generic_category()));
79 char time_str[31] = {0};
80 int n = snprintf(&time_str[0], 7,
"%.5s.", instrument_id);
82 strftime(&time_str[n], 20,
"%Y-%m-%dT%H:%M:%S", &tm_time);
84 snprintf(&frac_str[0], 5,
".%.3d",
static_cast<int>(tv.tv_usec / 1000.0));
86 strncpy(&time_str[n + 19], &frac_str[0], 4);
90 std::chrono::duration_cast<duration>(seconds(tv.tv_sec) + microseconds(tv.tv_usec)));
92 return std::string(time_str, n + 23);
98 std::shared_ptr<ObservableEventLog> event_log,
100 std::shared_ptr<DpmClient> dpm_client,
101 log4cplus::Logger
const& logger)
102 : m_alive_token(std::make_shared<bool>())
103 , m_executor(executor)
104 , m_params(std::move(params))
105 , m_workspace(workspace)
106 , m_event_log(std::move(event_log))
107 , m_daq_factory(daq_factory)
108 , m_dpm_client(std::move(dpm_client))
114 for (
auto& op : m_abort_funcs) {
117 }
catch (std::exception
const& e) {
118 LOG4CPLUS_WARN(m_logger,
119 fmt::format(
"ManagerImpl::~ManagerImpl: Error when aborting "
128 LOG4CPLUS_INFO(m_logger,
"RestoreFromWorkspace: Starting");
131 decltype(ids) pruned;
133 for (
auto const&
id : ids) {
135 LOG4CPLUS_INFO(m_logger,
"RestoreFromWorkspace: Loading state for DAQ " <<
id);
140 if (
IsStale(m_params, status.state, context.creation_time)) {
141 LOG4CPLUS_INFO(m_logger,
142 "RestoreFromWorkspace: DAQ " << status <<
" is stale -> archiving");
149 if (full.state == daqif::StateAcquiring) {
150 LOG4CPLUS_INFO(m_logger,
151 "RestoreFromWorkspace: Restoring OCM phase DAQ: " << status);
153 std::move(context), std::make_shared<ObservableStatus>(status), m_event_log);
155 AddDaq(
daq, Store::No);
157 pruned.push_back(
id);
158 }
else if (full.state == daqif::StateMerging) {
159 LOG4CPLUS_INFO(m_logger,
160 "RestoreFromWorkspace: Restoring DPM phase DAQ: " << status);
162 std::move(context), std::make_shared<ObservableStatus>(status), m_event_log);
164 AddDaq(
daq, Store::No);
167 pruned.push_back(
id);
171 "RestoreFromWorkspace: Skipping loading DAQ in unsupported state: " << status);
175 LOG4CPLUS_ERROR(m_logger,
176 "RestoreFromWorkspace: Loading state for DAQ "
177 <<
id <<
" failed (ignoring): " << r);
182 LOG4CPLUS_ERROR(m_logger,
183 "RestoreFromWorkspace: Failed to archive DAQ " <<
id
187 LOG4CPLUS_INFO(m_logger,
"RestoreFromWorkspace: Skipping " <<
id);
194 LOG4CPLUS_INFO(m_logger,
"RestoreFromWorkspace: Successfully completed");
196 LOG4CPLUS_INFO(m_logger,
"RestoreFromWorkspace: Failed");
197 std::throw_with_nested(std::runtime_error(
"Failed to restore from workspace"));
201 for (
unsigned jitter = 0;; ++jitter) {
203 if (!
HaveDaq(id_candidate, id_candidate)) {
212 std::find_if(m_daq_controllers.begin(), m_daq_controllers.end(), [=](
auto const&
daq) {
215 return daq.id == id ||
216 (!file_id.empty() && file_id == daq.controller->GetContext().file_id);
218 if (it != m_daq_controllers.end()) {
219 LOG4CPLUS_DEBUG(m_logger,
220 "Manager: Found conflicting DAQ: id="
221 <<
id <<
", file_id=" << file_id <<
" with existing: id=" << it->id
222 <<
", file_id=" << it->controller->GetContext().file_id);
228 void ManagerImpl::AddInitialKeywords(
DaqContext& ctx) {
231 kws.emplace_back(std::in_place_type<fits::ValueKeyword>,
"ORIGIN", m_params.
origin);
232 kws.emplace_back(std::in_place_type<fits::ValueKeyword>,
"INSTRUME", m_params.
instrument_id);
243 void ManagerImpl::AddDaq(std::shared_ptr<DaqController>
const&
daq, Store store) {
245 LOG4CPLUS_INFO(m_logger,
"Manager: AddDaq: Attempting to add DAQ " <<
daq->GetId());
246 if (
daq->GetId().empty()) {
247 throw boost::enable_current_exception(std::invalid_argument(
"DaqController has empty id!"));
249 if (
daq->GetContext().file_id.empty()) {
250 throw boost::enable_current_exception(
251 std::invalid_argument(
"DaqController has empty file_id"));
254 throw boost::enable_current_exception(
255 std::invalid_argument(
"DaqController with same id already exists"));
258 if (store == Store::Yes) {
267 m_dpm_client->StartMonitorStatus(
daq->GetId());
270 m_daq_controllers.emplace_back(
273 daq->GetStatus()->ConnectObserver([alive = std::weak_ptr<bool>(m_alive_token),
274 prev_state =
daq->GetState(),
275 this](ObservableStatus
const& status)
mutable {
276 if (alive.expired()) {
277 LOG4CPLUS_INFO(
"daq",
"Manager has expired");
281 if (!IsFinalState(prev_state)) {
285 fmt::format(
"DAQ transitioned to a final state -> archiving: {} (prev {})",
288 m_workspace.StoreStatus(status);
290 m_executor.submit([alive = alive, id = status.GetId(), this] {
291 if (alive.expired()) {
292 LOG4CPLUS_INFO(
"daq",
"Manager has expired");
299 m_workspace.StoreStatus(status);
307 m_executor.submit([alive = alive, id = status.GetId(), this] {
308 if (alive.expired()) {
309 LOG4CPLUS_INFO(
"daq",
"Manager has expired");
312 MoveToMergePhase(id);
318 m_dpm_client->StartMonitorStatus(status.GetId());
323 this->m_status_signal.Signal(status);
325 prev_state = status.GetState();
328 [alive = std::weak_ptr<bool>(m_alive_token),
this](DaqContext
const& ctx) {
329 if (alive.expired()) {
330 LOG4CPLUS_INFO(
"daq",
"Manager has expired");
333 m_workspace.StoreContext(ctx);
336 if (store == Store::Yes) {
342 m_status_signal.Signal(*
daq->GetStatus());
344 if (
daq->GetState() == State::NotScheduled) {
349 void ManagerImpl::RemoveDaq(std::string_view
id) {
350 auto it = std::find_if(m_daq_controllers.begin(),
351 m_daq_controllers.end(),
352 [
id](
auto const&
daq) { return daq.id == id; });
353 if (it == m_daq_controllers.end()) {
354 throw boost::enable_current_exception(
355 std::invalid_argument(fmt::format(
"Remove DAQ failed - no id found: {}",
id)));
357 m_daq_controllers.erase(it);
360 void ManagerImpl::ArchiveDaq(std::string
const&
id) {
361 LOG4CPLUS_INFO(m_logger, fmt::format(
"Moving DAQ to archive: id={}",
id));
364 m_dpm_client->StopMonitorStatus(
id);
367 m_workspace.ArchiveDaq(
id);
373 LOG4CPLUS_INFO(m_logger, fmt::format(
"Moving DAQ to archive done: id={}",
id));
376 void ManagerImpl::StoreActiveDaqs()
const {
377 LOG4CPLUS_INFO(m_logger,
"StoreActiveDaqs()");
379 std::vector<std::string> daqs;
380 for (Daq
const&
daq : m_daq_controllers) {
381 assert(
daq.controller);
383 daqs.push_back(
daq.id);
386 m_workspace.StoreList(daqs);
389 Status ManagerImpl::GetStatus(std::string_view
id)
const {
390 auto*
daq = FindDaq(
id);
392 return daq->GetStatus()->GetStatus();
395 auto maybe_status = m_workspace.LoadArchivedStatus(std::string(
id));
397 return *maybe_status;
399 throw boost::enable_current_exception(
400 std::invalid_argument(fmt::format(
"DaqController with id '{}' does not exist",
id)));
404 ManagerImpl::Daq::Daq(std::string id_arg,
405 std::shared_ptr<DaqController> controller_arg,
406 boost::signals2::connection conn_status_arg,
407 boost::signals2::connection conn_context_arg) noexcept
408 : id(std::move(id_arg))
409 , controller(std::move(controller_arg))
410 , conn_status(std::move(conn_status_arg))
411 , conn_context(std::move(conn_context_arg)) {
414 DaqController
const* ManagerImpl::FindDaq(std::string_view
id)
const noexcept {
415 return const_cast<ManagerImpl*
>(
this)->FindDaq(
id);
418 DaqController* ManagerImpl::FindDaq(std::string_view
id) noexcept {
419 auto it = std::find_if(m_daq_controllers.begin(),
420 m_daq_controllers.end(),
421 [
id](
auto const&
daq) { return daq.id == id; });
422 if (it != m_daq_controllers.end()) {
423 return it->controller.get();
428 DaqController& ManagerImpl::FindDaqOrThrow(std::string_view
id) {
429 auto daq_ptr = FindDaq(
id);
432 throw boost::enable_current_exception(std::invalid_argument(
433 fmt::format(
"DaqController with id '{}' does not exist", std::string(
id))));
438 DaqController
const& ManagerImpl::FindDaqOrThrow(std::string_view
id)
const {
439 return const_cast<ManagerImpl*
>(
this)->FindDaqOrThrow(
id);
442 void ManagerImpl::MoveToMergePhase(std::string_view
id) {
443 auto*
daq = FindDaq(
id);
445 LOG4CPLUS_WARN(m_logger, fmt::format(
"Daq requested to move does not exist: id={}",
id));
448 LOG4CPLUS_INFO(m_logger, fmt::format(
"Moving DAQ to merge-phase: id={}",
id));
450 auto ctx =
daq->GetContext();
451 auto status =
daq->GetStatus();
452 auto event_log =
daq->GetEventLog();
458 status->SetState(State::NotScheduled);
461 m_daq_factory.MakeDpmPhase(std::move(ctx), std::move(status), std::move(event_log));
465 boost::future<State> ManagerImpl::StartDaqAsync(
DaqContext ctx) {
467 AddInitialKeywords(ctx);
471 auto daq = m_daq_factory.MakeOcmPhase(
473 std::make_shared<ObservableStatus>(std::move(
id), std::move(file_id)),
477 return daq->StartAsync()
479 [&,
daq](boost::future<State> f) -> boost::future<State> {
480 if (f.has_exception()) {
483 return daq->AbortAsync(ErrorPolicy::Tolerant)
485 [f = std::move(f)](boost::future<Status>) mutable -> State {
489 __builtin_unreachable();
492 return boost::make_ready_future<State>(f.get());
497 return boost::make_exceptional_future<State>();
501 boost::future<Status> ManagerImpl::StopDaqAsync(std::string_view
id,
ErrorPolicy policy) {
503 return FindDaqOrThrow(
id).StopAsync(policy);
505 return boost::make_exceptional_future<Status>();
509 boost::future<Status> ManagerImpl::AbortDaqAsync(std::string_view
id,
ErrorPolicy policy) {
511 return FindDaqOrThrow(
id).AbortAsync(policy);
513 return boost::make_exceptional_future<Status>();
517 boost::future<Result<Status>> ManagerImpl::AwaitDaqStateAsync(std::string_view
id,
519 std::chrono::milliseconds timeout) {
521 auto* maybe_daq = FindDaq(
id);
524 auto status = GetStatus(
id);
526 LOG4CPLUS_INFO(m_logger,
527 fmt::format(
"{}: Await condition already fulfilled (archived).", status));
529 return boost::make_ready_future<Result<Status>>({
false, status});
532 throw boost::enable_current_exception(std::invalid_argument(
533 fmt::format(
"DaqController with id '{}' is archived and cannot be awaited",
id)));
537 auto&
daq = *maybe_daq;
538 auto status =
daq.GetStatus();
540 LOG4CPLUS_INFO(m_logger,
541 fmt::format(
"{}: Await condition already fulfilled.", *status));
543 return boost::make_ready_future<Result<Status>>({
false, *status});
546 auto logger = log4cplus::Logger::getInstance(m_logger.getName() +
".awaitstate");
547 auto [fut, abort] = op::InitiateAbortableOperation<op::AwaitStateAsync>(
548 m_executor.get_io_context(), status, state, timeout, logger);
550 auto& ref = m_abort_funcs.emplace_back(std::move(abort));
551 LOG4CPLUS_DEBUG(
"daq.manager",
552 fmt::format(
"op::AwaitStateAsync initiated. id={}", ref.GetId()));
555 [
this,
id = ref.GetId(), alive = std::weak_ptr<bool>(m_alive_token)](
auto res) {
556 LOG4CPLUS_DEBUG(
"daq.manager",
557 fmt::format(
"op::AwaitStateAsync completed. id={}", id));
560 auto is_alive = !alive.expired();
569 return boost::make_exceptional_future<Result<Status>>();
574 return FindDaqOrThrow(
id).UpdateKeywords(keywords);
578 return m_status_signal;
581 std::vector<std::shared_ptr<DaqController const>> ManagerImpl::GetDaqControllers() {
582 std::vector<std::shared_ptr<DaqController const>> controllers;
583 controllers.reserve(m_daq_controllers.size());
585 m_daq_controllers.end(),
586 std::back_inserter(controllers),
587 [](
auto const&
daq) { return daq.controller; });
591 void ManagerImpl::RemoveAbortFunc(uint64_t
id) noexcept {
593 m_abort_funcs.erase(std::remove_if(m_abort_funcs.begin(),
595 [
id](
auto const& obj) { return id == obj.GetId(); }),
596 m_abort_funcs.end());
601 ManagerImpl::OpAbortFunc::OpAbortFunc(OpAbortFunc::Func&& func)
602 : m_id(NextId()), m_func(std::move(func)) {
606 uint64_t ManagerImpl::OpAbortFunc::GetId() const noexcept {
610 bool ManagerImpl::OpAbortFunc::Abort() noexcept {
614 uint64_t ManagerImpl::OpAbortFunc::NextId() {
615 static uint64_t next_id = 0;
619 void ManagerImpl::ScheduleDaqsAsync() {
620 LOG4CPLUS_TRACE(m_logger,
"ScheduleDaqAsync()");
623 m_schedule_retry.reset();
625 for (
auto&
daq : m_daq_controllers) {
629 daq.controller->ScheduleMergeAsync().then(
630 m_executor, [
id =
daq.id,
this](boost::future<State> reply) {
631 if (!reply.has_exception()) {
634 fmt::format(
"ScheduleDaqAsyn: Successfully scheduled DAQ '{}'", id));
640 fmt::format(
"ScheduleDaqAsyn: Failed to schedule DAQ '{}', will retry in 60s",
644 if (m_schedule_retry) {
649 boost::posix_time::seconds(60));
650 m_schedule_retry->async_wait([
this](boost::system::error_code
const&
error) {
Contains declaration for the AwaitStateAsync operation.
Abstract factory for DaqControllers.
virtual auto MakeDpmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log) -> std::shared_ptr< DaqController >=0
Create instance for the DPM phase of the DAQ process.
virtual auto MakeOcmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log) -> std::shared_ptr< DaqController >=0
Create instance for the OCM phase of the DAQ process.
void RestoreFromWorkspace() override
Loads status and constructs DaqControllers corresponding to stored state.
bool HaveDaq(std::string_view id, std::string_view file_id={}) const noexcept override
Query existing data acquisition by id and optional file_id.
ManagerImpl(rad::IoExecutor &executor, ManagerParams params, Workspace &workspace, std::shared_ptr< ObservableEventLog > event_log, DaqControllerFactory &daq_factory, std::shared_ptr< DpmClient > dpm_client, log4cplus::Logger const &logger)
std::string MakeDaqId(std::chrono::system_clock::time_point *time=nullptr) const override
Creates a new unique identifier based on the instrument id and current time.
Interface to interact with DPM workspace.
virtual auto LoadList() const -> std::vector< std::string >=0
virtual void ArchiveDaq(std::string const &id)=0
Archives specified DAQ without deleting any files, typically by moving files it to a specific locatio...
virtual auto LoadContext(std::string const &id) const -> DaqContext=0
Get file name of the data product specification stored in StoreSpecification()
virtual void StoreList(std::vector< std::string > const &queue) const =0
virtual void StoreStatus(Status const &status) const =0
Loads last archived DAQ status if any.
virtual auto LoadStatus(std::string const &id) const -> Status=0
Loads last archived DAQ status if any.
virtual void StoreContext(DaqContext const &context) const =0
Get file name of the data product specification stored in StoreSpecification()
Adapter object intended to be used in contexts without direct access to the output-stream object.
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
boost::asio::io_context & get_io_context() noexcept
Not part of the boost::thread::executor concept.
daq::Workspace interface and implementation declaration
Contains support functions for daqif.
Contains declarations for the helper functions to initiate operations.
Declaration of daq::Manager
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
void UpdateKeywords(KeywordVector &to, KeywordVector const &from, ConflictPolicy policy=ConflictPolicy::Replace)
Updates to with keywords from from.
@ Skip
Skip keyword that conflicts.
std::string MakeIdCandidate(char const *instrument_id, unsigned jitter=0, std::chrono::system_clock::time_point *out=nullptr)
Creates a DAQ id candidate that may or may not be unique.
bool IsActiveDpmState(State state) noexcept
Query whether state is an active (non-final) state executed by DPM.
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
bool IsStale(ManagerParams const ¶ms, State state, std::chrono::system_clock::time_point creation_time)
void UpdateKeywords(DaqContext &ctx, fits::KeywordVector const &keywords)
Updates (adds or replaces) primary HDU keywords.
std::chrono::hours merging_stale_age
Age of DAQ in merging state, after which it is automatically considered abandoned and will be archive...
bool IsSubsequentState(State state1, State state2) noexcept
Compares states and returns whether state1 occurs after state2.
ErrorPolicy
Error policy supported by certain operations.
std::string instrument_id
Instrument identifier.
State
Observable states of the data acquisition process.
@ NotScheduled
Before daq is acknowledged by dpm it remains in NotScheduled.
@ Stopped
All data sources have reported they have stopped acquiring data.
bool IsFinalState(State state) noexcept
Query whether state is in a final state.
std::chrono::hours acquiring_stale_age
Age of DAQ in acquiring state after which it is automatically considered abandoned and will be archiv...
Configurations parameters directly related to manager.
Contains declaration for for DaqController.
Contains declaration for Status and ObservableStatus.
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
std::vector< daq::fits::KeywordVariant > keywords
Keyword list provided by OCM to Data Product.
std::string file_id
Data Product FileId as specified by OLAS ICD.
std::string id
DAQ identfier, possibly provided by user.
Non observable status object that keeps stores status of data acquisition.