ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
manager.cpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup daq_common_libdaq
4 * @copyright 2022 ESO - European Southern Observatory
5 *
6 * @brief Definition of `daq::ManagerImpl` and related utilities.
7 */
8#include <daq/manager.hpp>
9
10#include <algorithm>
11#include <functional>
12#include <stdexcept>
13#include <time.h>
14
15#include <fmt/format.h>
16#include <fmt/ostream.h>
17#include <log4cplus/loggingmacros.h>
18#include <mal/Mal.hpp>
19
20#include <daq/conversion.hpp>
21#include <daq/daqController.hpp>
22#include <daq/error/report.hpp>
23#include <daq/status.hpp>
24#include <daq/workspace.hpp>
25
26#include <daq/op/awaitState.hpp>
27#include <daq/op/initiate.hpp>
28
29namespace daq {
30
31InvalidDaqId::InvalidDaqId(std::string_view id)
32 : std::invalid_argument(fmt::format("DAQ with id '{}' not found", id)) {
33}
34
35InvalidDaqId::InvalidDaqId(std::string_view id, std::string_view reason)
36 : std::invalid_argument(fmt::format("DAQ with id '{}' not found ({})", id, reason)) {
37}
38
39bool IsStale(ManagerParams const& params,
40 State state,
41 std::chrono::system_clock::time_point creation_time) {
42 auto now = std::chrono::system_clock::now();
43 if (IsFinalState(state)) {
44 return true;
45 }
46 auto full = MakeState(state);
47 if (full.state == daqif::StateAcquiring) {
48 return now > creation_time + params.acquiring_stale_age;
49 }
50 if (full.state == daqif::StateMerging) {
51 return now > creation_time + params.merging_stale_age;
52 }
53 return false;
54}
55
56std::string MakeIdCandidate(char const* instrument_id,
57 unsigned jitter,
58 std::chrono::system_clock::time_point* tp) {
59 using time_point = std::chrono::system_clock::time_point;
60 using duration = std::chrono::system_clock::duration;
61 using seconds = std::chrono::seconds;
62 using microseconds = std::chrono::microseconds;
63 // 'KMOS.2017-06-02T16:45:55.701'
64 // olas-id must be <= 5 characters
65 // Format: <olas>-<strfmtime>.<frac>
66 // Size: 1-5 1 20 1 3 = 30
67 // chrono formatting is not provided until C++20
68 struct timeval tv;
69 struct timeval jitter_tv {
70 0, jitter * 1000
71 }; // 1 ms
72 if (gettimeofday(&tv, nullptr) != 0) {
73 // GCOVR_EXCL_START
74 throw boost::enable_current_exception(std::system_error(errno, std::generic_category()));
75 // GCOVR_EXCL_STOP
76 }
77 // Add jitter
78 struct timeval res;
79 timeradd(&tv, &jitter_tv, &res);
80 tv = res;
81
82 struct tm tm_time;
83 time_t time = tv.tv_sec;
84 if (gmtime_r(&time, &tm_time) == nullptr) {
85 // GCOVR_EXCL_START
86 throw boost::enable_current_exception(std::system_error(errno, std::generic_category()));
87 // GCOVR_EXCL_STOP
88 }
89 char time_str[31] = {0};
90 int n = snprintf(&time_str[0], 7, "%.5s.", instrument_id);
91 // This part is always 20 characters long
92 strftime(&time_str[n], 20, "%Y-%m-%dT%H:%M:%S", &tm_time);
93 char frac_str[5];
94 snprintf(&frac_str[0], 5, ".%.3d", static_cast<int>(tv.tv_usec / 1000.0));
95 // Append the fractional part
96 strncpy(&time_str[n + 19], &frac_str[0], 4);
97 if (tp != nullptr) {
98 // Store resulting time in out optional out param
99 *tp = time_point(
100 std::chrono::duration_cast<duration>(seconds(tv.tv_sec) + microseconds(tv.tv_usec)));
101 }
102 return std::string(time_str, n + 23);
103}
104
106 ManagerParams params,
107 Workspace& workspace,
108 fits::KeywordFormatter const& kw_formatter,
109 std::shared_ptr<ObservableEventLog> event_log,
110 DaqControllerFactory& daq_factory,
111 std::shared_ptr<DpmClient> dpm_client,
112 log4cplus::Logger const& logger)
113 : m_alive_token(std::make_shared<bool>())
114 , m_executor(executor)
115 , m_params(std::move(params))
116 , m_workspace(workspace)
117 , m_kw_formatter(kw_formatter)
118 , m_event_log(std::move(event_log))
119 , m_daq_factory(daq_factory)
120 , m_dpm_client(std::move(dpm_client))
121 , m_logger(logger) {
122}
123
125 // Abort any ongoing operations
126 for (auto& op : m_abort_funcs) {
127 try {
128 op.Abort();
129 } catch (std::exception const& e) {
130 LOG4CPLUS_WARN(m_logger,
131 fmt::format("ManagerImpl::~ManagerImpl: Error when aborting "
132 "operation {}: {}",
133 op.GetId(),
134 e.what()));
135 }
136 }
137}
138
140 LOG4CPLUS_INFO(m_logger, "RestoreFromWorkspace: Starting");
141 auto ids = m_workspace.LoadList();
142 // New list of pruned DAQs
143 decltype(ids) pruned;
144
145 for (auto const& id : ids) {
146 try {
147 LOG4CPLUS_INFO(m_logger, "RestoreFromWorkspace: Loading state for DAQ " << id);
148 auto context = m_workspace.LoadContext(id);
149 auto status = m_workspace.LoadStatus(id);
150
151 // Ignore stale DAQs
152 if (IsStale(m_params, status.state, context.creation_time)) {
153 LOG4CPLUS_INFO(m_logger,
154 "RestoreFromWorkspace: DAQ " << status << " is stale -> archiving");
155 m_workspace.ArchiveDaq(id);
156 continue;
157 }
158
159 auto full = MakeState(status.state);
160 // DAQ should be loaded.
161 if (full.state == daqif::StateAcquiring) {
162 LOG4CPLUS_INFO(m_logger,
163 "RestoreFromWorkspace: Restoring OCM phase DAQ: " << status);
164 auto daq = m_daq_factory.MakeOcmPhase(std::move(context),
165 std::make_shared<ObservableStatus>(status),
166 m_event_log,
167 m_kw_formatter);
168 assert(daq);
169 AddDaq(daq, Store::No);
170 // We keep this
171 pruned.push_back(id);
172 } else if (full.state == daqif::StateMerging) {
173 LOG4CPLUS_INFO(m_logger,
174 "RestoreFromWorkspace: Restoring DPM phase DAQ: " << status);
175 auto daq = m_daq_factory.MakeDpmPhase(
176 std::move(context), std::make_shared<ObservableStatus>(status), m_event_log);
177 assert(daq);
178 AddDaq(daq, Store::No);
179
180 // We keep this
181 pruned.push_back(id);
182 } else {
183 LOG4CPLUS_INFO(
184 m_logger,
185 "RestoreFromWorkspace: Skipping loading DAQ in unsupported state: " << status);
186 }
187 } catch (...) {
188 error::NestedExceptionReporter r(std::current_exception());
189 LOG4CPLUS_ERROR(m_logger,
190 "RestoreFromWorkspace: Loading state for DAQ "
191 << id << " failed (ignoring): " << r);
192 try {
193 m_workspace.ArchiveDaq(id);
194 } catch (...) {
195 error::NestedExceptionReporter r(std::current_exception());
196 LOG4CPLUS_ERROR(m_logger,
197 "RestoreFromWorkspace: Failed to archive DAQ " << id
198 << "(ignoring): \n"
199 << r);
200 }
201 LOG4CPLUS_INFO(m_logger, "RestoreFromWorkspace: Skipping " << id);
202 }
203 }
204
205 // Write back pruned list of DAQs
206 m_workspace.StoreList(pruned);
207
208 LOG4CPLUS_INFO(m_logger, "RestoreFromWorkspace: Successfully completed");
209} catch (...) {
210 LOG4CPLUS_INFO(m_logger, "RestoreFromWorkspace: Failed");
211 std::throw_with_nested(std::runtime_error("Failed to restore from workspace"));
212}
213
214std::string ManagerImpl::MakeDaqId(std::chrono::system_clock::time_point* time) const {
215 for (unsigned jitter = 0;; ++jitter) {
216 auto id_candidate = daq::MakeIdCandidate(m_params.instrument_id.c_str(), jitter, time);
217 if (!HaveDaq(id_candidate, id_candidate)) {
218 return id_candidate;
219 }
220 }
221}
222
223bool ManagerImpl::HaveDaq(std::string_view id, std::string_view file_id) const noexcept {
224 assert(!id.empty());
225 auto it =
226 std::find_if(m_daq_controllers.begin(), m_daq_controllers.end(), [=](auto const& daq) {
227 // Return true if daq is equal
228 // Return true if file_id is non-empty and equal
229 return daq.id == id ||
230 (!file_id.empty() && file_id == daq.controller->GetContext().file_id);
231 });
232 if (it != m_daq_controllers.end()) {
233 LOG4CPLUS_DEBUG(m_logger,
234 "Manager: Found conflicting DAQ: id="
235 << id << ", file_id=" << file_id << " with existing: id=" << it->id
236 << ", file_id=" << it->controller->GetContext().file_id);
237 return true;
238 }
239 return false;
240}
241
242void ManagerImpl::AddInitialKeywords(DaqContext& ctx) {
243 // note: ARCFILE and ORIGFILE is added by daqDpmMerge
245 kws.emplace_back(std::in_place_type<fits::ValueKeyword>, "ORIGIN", m_params.origin);
246 kws.emplace_back(std::in_place_type<fits::ValueKeyword>, "INSTRUME", m_params.instrument_id);
247 // @todo:
248 // - TELESCOP
249 // - DATE?
250
251 // Update so that OCM keywords are added first (DaqContext may already contain keywords from
252 // request).
254 ctx.keywords.swap(kws);
255}
256
257void ManagerImpl::FormatKeywordSources(DaqContext& ctx) {
258 // note: Validation of complete sources is only done before DAQ is started and on failues the
259 // command is rejected as a whole. As such we don't care to create a complete transaction of all
260 // changes.
261 auto const& fmt = m_kw_formatter;
262
263 // Check each source and reformat keywords.
264 for (DpPart & part : ctx.results) {
265 if (auto * ds = std::get_if<fits::KeywordVector>(&part.Part()); ds != nullptr) {
266 LOG4CPLUS_INFO(m_logger,
267 "Manager(" << ctx.id
268 << "): Validating & reformatting keywords for source '"
269 << part.SourceName() << "'");
270 for (auto& kw : *ds) {
271 auto fkw = fmt.Format(kw);
272 LOG4CPLUS_TRACE(m_logger, "Before and after formatting: " << kw << " -> " << fkw);
273 kw = fkw;
274 }
275 }
276 }
277}
278
279void ManagerImpl::AddDaq(std::shared_ptr<DaqController> const& daq, Store store) {
280 assert(daq);
281 LOG4CPLUS_INFO(m_logger, "Manager: AddDaq: Attempting to add DAQ " << daq->GetId());
282 if (daq->GetId().empty()) {
283 throw boost::enable_current_exception(InvalidDaqId("", "DAQ id cannot be empty"));
284 }
285 if (daq->GetContext().file_id.empty()) {
286 throw boost::enable_current_exception(
287 std::invalid_argument("DaqController has empty file_id"));
288 }
289 if (HaveDaq(daq->GetId(), daq->GetContext().file_id)) {
290 throw boost::enable_current_exception(
291 InvalidDaqId(daq->GetId(), "DAQ with same id already exists"));
292 }
293
294 if (store == Store::Yes) {
295 // Requested to store DAQ (i.e. it was not loaded from workspace)
296 m_workspace.StoreContext(daq->GetContext());
297 m_workspace.StoreStatus(*daq->GetStatus());
298 }
299
300 if (IsActiveDpmState(daq->GetState())) {
301 // Start monitoring to recover from e.g. state deviation due to OCM being offline
302 // and not receiving published status updates from DPM.
303 m_dpm_client->StartMonitorStatus(daq->GetId());
304 }
305
306 m_daq_controllers.emplace_back(
307 daq->GetId(),
308 daq,
309 daq->GetStatus()->ConnectObserver([alive = std::weak_ptr<bool>(m_alive_token),
310 prev_state = daq->GetState(),
311 this](ObservableStatus const& status) mutable {
312 if (alive.expired()) {
313 LOG4CPLUS_INFO("daq", "Manager has expired");
314 return;
315 }
316 if (IsFinalState(status.GetState())) {
317 if (!IsFinalState(prev_state)) {
318 // Transition to final state -> Archive DAQ
319 LOG4CPLUS_INFO(
320 m_logger,
321 fmt::format("DAQ transitioned to a final state -> archiving: {} (prev {})",
322 status,
323 prev_state));
324 m_workspace.StoreStatus(status);
325
326 m_executor.submit([alive = alive, id = status.GetId(), this] {
327 if (alive.expired()) {
328 LOG4CPLUS_INFO("daq", "Manager has expired");
329 return;
330 }
331 ArchiveDaq(id);
332 });
333 }
334 } else { // Not any final state
335 m_workspace.StoreStatus(status);
336
337 // Handle handover
338 if (prev_state != State::Stopped && status.GetState() == State::Stopped) {
339 // If DAQ is stopped we need to move it to the merging phase
340 // To be safe we defer the execution.
341 // Manager lives as long as executor so we don't have to
342 // check liveness.
343 m_executor.submit([alive = alive, id = status.GetId(), this] {
344 if (alive.expired()) {
345 LOG4CPLUS_INFO("daq", "Manager has expired");
346 return;
347 }
348 MoveToMergePhase(id);
349 });
350 } else if (prev_state == State::NotScheduled &&
351 IsActiveDpmState(status.GetState())) {
352 // Transition from NotScheduled -> Scheduled+ transfers responsibility to DPM.
353 // As such we want to monitor status in case status updates are not received.
354 m_dpm_client->StartMonitorStatus(status.GetId());
355 }
356 }
357
358 // Signal other observers.
359 this->m_status_signal.Signal(status);
360
361 prev_state = status.GetState();
362 }),
363 daq->ConnectContext(
364 [alive = std::weak_ptr<bool>(m_alive_token), this](DaqContext const& ctx) {
365 if (alive.expired()) {
366 LOG4CPLUS_INFO("daq", "Manager has expired");
367 return;
368 }
369 m_workspace.StoreContext(ctx);
370 }));
371
372 if (store == Store::Yes) {
373 // Store daq list
374 StoreActiveDaqs();
375 }
376
377 // Notify observers that DAQ was added.
378 m_status_signal.Signal(*daq->GetStatus());
379
380 if (daq->GetState() == State::NotScheduled) {
381 ScheduleDaqsAsync();
382 }
383}
384
385void ManagerImpl::RemoveDaq(std::string_view id) {
386 auto it = std::find_if(m_daq_controllers.begin(),
387 m_daq_controllers.end(),
388 [id](auto const& daq) { return daq.id == id; });
389 if (it == m_daq_controllers.end()) {
390 throw boost::enable_current_exception(InvalidDaqId(id));
391 }
392 m_daq_controllers.erase(it);
393}
394
395void ManagerImpl::ArchiveDaq(std::string const& id) {
396 LOG4CPLUS_INFO(m_logger, fmt::format("Moving DAQ to archive: id={}", id));
397
398 // Since DAQ has been completed we can stop monitoring for changes.
399 m_dpm_client->StopMonitorStatus(id);
400
401 // Archive persistent storage
402 m_workspace.ArchiveDaq(id);
403
404 // Remove daq controller
405 RemoveDaq(id);
406
407 StoreActiveDaqs();
408 LOG4CPLUS_INFO(m_logger, fmt::format("Moving DAQ to archive done: id={}", id));
409}
410
411void ManagerImpl::StoreActiveDaqs() const {
412 LOG4CPLUS_INFO(m_logger, "StoreActiveDaqs()");
413 // And remove from
414 std::vector<std::string> daqs;
415 for (Daq const& daq : m_daq_controllers) {
416 assert(daq.controller);
417 if (!IsFinalState(daq.controller->GetState())) {
418 daqs.push_back(daq.id);
419 }
420 }
421 m_workspace.StoreList(daqs);
422}
423
424Status ManagerImpl::GetStatus(std::string_view id) const {
425 auto* daq = FindDaq(id);
426 if (daq) {
427 return daq->GetStatus()->GetStatus();
428 }
429 // Try to find archived status
430 auto maybe_status = m_workspace.LoadArchivedStatus(std::string(id));
431 if (maybe_status) {
432 return *maybe_status;
433 } else {
434 throw boost::enable_current_exception(InvalidDaqId(id));
435 }
436}
437
438ManagerImpl::Daq::Daq(std::string id_arg,
439 std::shared_ptr<DaqController> controller_arg,
440 boost::signals2::connection conn_status_arg,
441 boost::signals2::connection conn_context_arg) noexcept
442 : id(std::move(id_arg))
443 , controller(std::move(controller_arg))
444 , conn_status(std::move(conn_status_arg))
445 , conn_context(std::move(conn_context_arg)) {
446}
447
448DaqController const* ManagerImpl::FindDaq(std::string_view id) const noexcept {
449 return const_cast<ManagerImpl*>(this)->FindDaq(id);
450}
451
452DaqController* ManagerImpl::FindDaq(std::string_view id) noexcept {
453 auto it = std::find_if(m_daq_controllers.begin(),
454 m_daq_controllers.end(),
455 [id](auto const& daq) { return daq.id == id; });
456 if (it != m_daq_controllers.end()) {
457 return it->controller.get();
458 }
459 return nullptr;
460}
461
462DaqController& ManagerImpl::FindDaqOrThrow(std::string_view id) {
463 auto daq_ptr = FindDaq(id);
464
465 if (!daq_ptr) {
466 throw boost::enable_current_exception(InvalidDaqId(id));
467 }
468 return *daq_ptr;
469}
470
471DaqController const& ManagerImpl::FindDaqOrThrow(std::string_view id) const {
472 return const_cast<ManagerImpl*>(this)->FindDaqOrThrow(id);
473}
474
475void ManagerImpl::MoveToMergePhase(std::string_view id) {
476 auto* daq = FindDaq(id);
477 if (!daq) {
478 LOG4CPLUS_WARN(m_logger, fmt::format("Daq requested to move does not exist: id={}", id));
479 return;
480 }
481 LOG4CPLUS_INFO(m_logger, fmt::format("Moving DAQ to merge-phase: id={}", id));
482 // Copy state we want to keep.
483 auto ctx = daq->GetContext();
484 auto status = daq->GetStatus();
485 auto event_log = daq->GetEventLog();
486 // Delete old DAQ before creating new
487 // note: this invalidates "daq"
488 RemoveDaq(id);
489
490 // Manually transition to first state in Merging
491 status->SetState(State::NotScheduled);
492
493 auto new_daq =
494 m_daq_factory.MakeDpmPhase(std::move(ctx), std::move(status), std::move(event_log));
495 AddDaq(new_daq);
496}
497
498boost::future<State> ManagerImpl::StartDaqAsync(DaqContext ctx) {
499 try {
500 // Will throw if any keyword in DpParts results is invalid
501 FormatKeywordSources(ctx);
502
503 AddInitialKeywords(ctx);
504
505 auto id = ctx.id;
506 auto file_id = ctx.file_id;
507 auto daq = m_daq_factory.MakeOcmPhase(
508 std::move(ctx),
509 std::make_shared<ObservableStatus>(std::move(id), std::move(file_id)),
510 m_event_log,
511 m_kw_formatter);
512 assert(daq);
513 AddDaq(daq);
514 return daq->StartAsync()
515 .then(m_executor,
516 [&, daq](boost::future<State> f) -> boost::future<State> {
517 if (f.has_exception()) {
518 // Any error during start may lead to partially started acquisition that
519 // we need to abort
520 return daq->AbortAsync(ErrorPolicy::Tolerant)
521 .then(m_executor,
522 [f = std::move(f)](boost::future<Status>) mutable -> State {
523 // We ignore errors from AbortAsync as we can't do anything
524 // about it Then we return original error
525 f.get(); // throws
526 __builtin_unreachable();
527 });
528 } else {
529 return boost::make_ready_future<State>(f.get());
530 }
531 })
532 .unwrap();
533 } catch (...) {
534 return boost::make_exceptional_future<State>();
535 }
536}
537
538boost::future<Status> ManagerImpl::StopDaqAsync(std::string_view id, ErrorPolicy policy) {
539 try {
540 return FindDaqOrThrow(id).StopAsync(policy);
541 } catch (...) {
542 return boost::make_exceptional_future<Status>();
543 }
544}
545
546boost::future<Status> ManagerImpl::AbortDaqAsync(std::string_view id, ErrorPolicy policy) {
547 try {
548 return FindDaqOrThrow(id).AbortAsync(policy);
549 } catch (...) {
550 return boost::make_exceptional_future<Status>();
551 }
552}
553
554boost::future<Result<Status>> ManagerImpl::AwaitDaqStateAsync(std::string_view id,
555 State state,
556 std::chrono::milliseconds timeout) {
557 try {
558 auto* maybe_daq = FindDaq(id);
559 if (!maybe_daq) {
560 // It could be the DAQ just completed and await should be released because of that.
561 auto status = GetStatus(id);
562 if (!IsSubsequentState(state, status.state)) {
563 LOG4CPLUS_INFO(
564 m_logger,
565 fmt::format("{}: Await condition already fulfilled (archived).", status));
566 // Condition already fulfilled.
567 return boost::make_ready_future<Result<Status>>({false, status});
568 }
569 // Since DAQ was archived we cannot wait anyway so we return exception
570 throw boost::enable_current_exception(
571 InvalidDaqId(id, "DAQ is archived and cannot be awaited"));
572 }
573
574 // DAQ is active
575 auto& daq = *maybe_daq;
576 auto status = daq.GetStatus();
577 if (!IsSubsequentState(state, daq.GetState())) {
578 LOG4CPLUS_INFO(m_logger,
579 fmt::format("{}: Await condition already fulfilled.", *status));
580 // Condition already fulfilled.
581 return boost::make_ready_future<Result<Status>>({false, *status});
582 } else {
583 // Create child logger for await state.
584 auto logger = log4cplus::Logger::getInstance(m_logger.getName() + ".awaitstate");
585 auto [fut, abort] = op::InitiateAbortableOperation<op::AwaitStateAsync>(
586 m_executor, m_executor.get_io_context(), status, state, timeout, logger);
587 // Store abort function so when Manager is deleted it will
588 auto& ref = m_abort_funcs.emplace_back(std::move(abort));
589 LOG4CPLUS_DEBUG("daq.manager",
590 fmt::format("op::AwaitStateAsync initiated. id={}", ref.GetId()));
591 return fut.then(
592 m_executor,
593 [this, id = ref.GetId(), alive = std::weak_ptr<bool>(m_alive_token)](auto res) {
594 LOG4CPLUS_DEBUG("daq.manager",
595 fmt::format("op::AwaitStateAsync completed. id={}", id));
596 // Remove abort function since operation completed, but only if
597 // object is alive.
598 auto is_alive = !alive.expired();
599 if (is_alive) {
600 // Manager is still alive, so we remove abort function
601 RemoveAbortFunc(id);
602 }
603 return res.get();
604 });
605 }
606 } catch (...) {
607 return boost::make_exceptional_future<Result<Status>>();
608 }
609}
610
611void ManagerImpl::UpdateKeywords(std::string_view id, fits::KeywordVector const& keywords) {
612 return FindDaqOrThrow(id).UpdateKeywords(keywords);
613}
614
615StatusSignal& ManagerImpl::GetStatusSignal() {
616 return m_status_signal;
617}
618
619std::vector<std::shared_ptr<DaqController const>> ManagerImpl::GetDaqControllers() {
620 std::vector<std::shared_ptr<DaqController const>> controllers;
621 controllers.reserve(m_daq_controllers.size());
622 std::transform(m_daq_controllers.begin(),
623 m_daq_controllers.end(),
624 std::back_inserter(controllers),
625 [](auto const& daq) { return daq.controller; });
626 return controllers;
627}
628
629void ManagerImpl::RemoveAbortFunc(std::uint64_t id) noexcept {
630 try {
631 m_abort_funcs.erase(std::remove_if(m_abort_funcs.begin(),
632 m_abort_funcs.end(),
633 [id](auto const& obj) { return id == obj.GetId(); }),
634 m_abort_funcs.end());
635 } catch (...) {
636 }
637}
638
639ManagerImpl::OpAbortFunc::OpAbortFunc(OpAbortFunc::Func&& func)
640 : m_id(NextId()), m_func(std::move(func)) {
641 assert(m_func);
642}
643
644std::uint64_t ManagerImpl::OpAbortFunc::GetId() const noexcept {
645 return m_id;
646}
647
648bool ManagerImpl::OpAbortFunc::Abort() noexcept {
649 return m_func();
650}
651
652std::uint64_t ManagerImpl::OpAbortFunc::NextId() {
653 static std::uint64_t next_id = 0;
654 return next_id++;
655}
656
657void ManagerImpl::ScheduleDaqsAsync() {
658 LOG4CPLUS_TRACE(m_logger, "ScheduleDaqAsync()");
659 // Regardless if caller was invoked from timer or manually we reset the deadline timer
660 // so that timer is restarted when ScheduleMergeAsync completes as necessary.
661 m_schedule_retry.reset();
662
663 for (auto& daq : m_daq_controllers) {
664 if (daq.controller->GetState() != State::NotScheduled) {
665 continue;
666 }
667 daq.controller->ScheduleMergeAsync().then(
668 m_executor, [id = daq.id, this](boost::future<State> reply) {
669 if (!reply.has_exception()) {
670 LOG4CPLUS_INFO(
671 m_logger,
672 fmt::format("ScheduleDaqAsyn: Successfully scheduled DAQ '{}'", id));
673 // Success
674 return;
675 }
676 LOG4CPLUS_WARN(
677 m_logger,
678 fmt::format("ScheduleDaqAsyn: Failed to schedule DAQ '{}', will retry in 60s",
679 id));
680 // Some kind of error happened, at this point we don't care what it is
681 // but simply schedule a new attempt using deadline timer, unless already scheduled.
682 if (m_schedule_retry) {
683 // Already scheduled..
684 return;
685 }
686 m_schedule_retry.emplace(m_executor.get_io_context(),
687 boost::posix_time::seconds(60));
688 m_schedule_retry->async_wait([this](boost::system::error_code const& error) {
689 if (error) {
690 return;
691 }
692 ScheduleDaqsAsync();
693 });
694 });
695 }
696}
697
698} // namespace daq
Contains declaration for the AwaitStateAsync operation.
Abstract factory for DaqControllers.
virtual auto MakeDpmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log) -> std::shared_ptr< DaqController >=0
Create instance for the DPM phase of the DAQ process.
virtual auto MakeOcmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, fits::KeywordFormatter const &kw_formatter) -> std::shared_ptr< DaqController >=0
Create instance for the OCM phase of the DAQ process.
ManagerImpl(rad::IoExecutor &executor, ManagerParams params, Workspace &workspace, fits::KeywordFormatter const &formatter, std::shared_ptr< ObservableEventLog > event_log, DaqControllerFactory &daq_factory, std::shared_ptr< DpmClient > dpm_client, log4cplus::Logger const &logger)
Definition: manager.cpp:105
void RestoreFromWorkspace() override
Loads status and constructs DaqControllers corresponding to stored state.
Definition: manager.cpp:139
bool HaveDaq(std::string_view id, std::string_view file_id={}) const noexcept override
Query existing data acquisition by id and optional file_id.
Definition: manager.cpp:223
~ManagerImpl() noexcept
Definition: manager.cpp:124
std::string MakeDaqId(std::chrono::system_clock::time_point *time=nullptr) const override
Creates a new unique identifier based on the instrument id and current time.
Definition: manager.cpp:214
Observes any status.
Definition: manager.hpp:88
Interface to interact with DPM workspace.
Definition: workspace.hpp:32
virtual auto LoadList() const -> std::vector< std::string >=0
virtual void ArchiveDaq(std::string const &id)=0
Archives specified DAQ without deleting any files, typically by moving files it to a specific locatio...
virtual auto LoadContext(std::string const &id) const -> DaqContext=0
Get file name of the data product specification stored in StoreSpecification()
virtual void StoreList(std::vector< std::string > const &queue) const =0
virtual void StoreStatus(Status const &status) const =0
Loads last archived DAQ status if any.
virtual auto LoadStatus(std::string const &id) const -> Status=0
Loads last archived DAQ status if any.
virtual void StoreContext(DaqContext const &context) const =0
Get file name of the data product specification stored in StoreSpecification()
Adapter object intended to be used in contexts without direct access to the output-stream object.
Definition: report.hpp:54
Formats keyword against e.g.
Definition: keyword.hpp:551
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
boost::asio::io_context & get_io_context() noexcept
Not part of the boost::thread::executor concept.
Definition: ioExecutor.hpp:41
daq::Workspace interface and implementation declaration
Contains support functions for daqif.
Contains declarations for the helper functions to initiate operations.
Declaration of daq::Manager
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
Definition: keyword.hpp:423
void UpdateKeywords(KeywordVector &to, KeywordVector const &from, ConflictPolicy policy=ConflictPolicy::Replace)
Updates to with keywords from from.
Definition: keyword.cpp:679
@ Skip
Skip keyword that conflicts.
std::string origin
Definition: manager.hpp:51
std::string MakeIdCandidate(char const *instrument_id, unsigned jitter=0, std::chrono::system_clock::time_point *out=nullptr)
Creates a DAQ id candidate that may or may not be unique.
Definition: manager.cpp:56
bool IsActiveDpmState(State state) noexcept
Query whether state is an active (non-final) state executed by DPM.
Definition: state.cpp:30
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
Definition: conversion.cpp:77
bool IsStale(ManagerParams const &params, State state, std::chrono::system_clock::time_point creation_time)
Definition: manager.cpp:39
std::chrono::hours merging_stale_age
Age of DAQ in merging state, after which it is automatically considered abandoned and will be archive...
Definition: manager.hpp:63
bool IsSubsequentState(State state1, State state2) noexcept
Compares states and returns whether state1 occurs after state2.
Definition: state.cpp:43
ErrorPolicy
Error policy supported by certain operations.
Definition: error.hpp:26
std::string instrument_id
Instrument identifier.
Definition: manager.hpp:50
State
Observable states of the data acquisition process.
Definition: state.hpp:41
@ NotScheduled
Before daq is acknowledged by dpm it remains in NotScheduled.
@ Stopped
All data sources have reported they have stopped acquiring data.
bool IsFinalState(State state) noexcept
Query whether state is in a final state.
Definition: state.cpp:15
std::chrono::hours acquiring_stale_age
Age of DAQ in acquiring state after which it is automatically considered abandoned and will be archiv...
Definition: manager.hpp:57
Configurations parameters directly related to manager.
Definition: manager.hpp:46
Definition: main.cpp:24
Contains declaration for for DaqController.
Contains declaration for Status and ObservableStatus.
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:42
std::vector< daq::fits::KeywordVariant > keywords
Keyword list provided by OCM to Data Product.
Definition: daqContext.hpp:85
std::string file_id
Data Product FileId as specified by OLAS ICD.
Definition: daqContext.hpp:63
std::string id
DAQ identfier, possibly provided by user.
Definition: daqContext.hpp:58
Exception indicating the DAQ id was invalid.
Definition: manager.hpp:38
InvalidDaqId(std::string_view id)
Definition: manager.cpp:31
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:164