ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
daqController.cpp
Go to the documentation of this file.
2
3#include <iostream>
4#include <stdexcept>
5#include <type_traits>
6
7#include <fmt/format.h>
8#include <fmt/ostream.h>
9#include <log4cplus/loggingmacros.h>
10#include <mal/MalException.hpp>
11#include <mal/rr/qos/ReplyTime.hpp>
12#include <nlohmann/json.hpp>
13
14#include <daq/dpmClient.hpp>
15#include <daq/error.hpp>
16#include <daq/fits/json.hpp>
17#include <daq/json.hpp>
18#include <daq/makeDpSpec.hpp>
19#include <daq/op/abort.hpp>
20#include <daq/op/awaitPrim.hpp>
21#include <daq/op/initiate.hpp>
22#include <daq/op/start.hpp>
23#include <daq/op/stop.hpp>
24
25namespace daq {
26namespace {
27
28DaqSources MakeSources(mal::Mal& mal, DaqContext const& ctx) {
29 std::vector<PrimSource> psources;
30 std::vector<MetaSource> msources;
31
32 psources.reserve(ctx.prim_sources.size());
33 for (auto const& raw : ctx.prim_sources) {
34 psources.emplace_back(
35 raw.name,
37 mal::Uri(raw.rr_uri),
38 {std::make_shared<elt::mal::rr::qos::ReplyTime>(std::chrono::seconds(15))},
39 {}));
40 }
41
42 msources.reserve(ctx.meta_sources.size());
43 for (auto const& raw : ctx.meta_sources) {
44 msources.emplace_back(
45 raw.name,
47 mal::Uri(raw.rr_uri),
48 {std::make_shared<elt::mal::rr::qos::ReplyTime>(std::chrono::seconds(15))},
49 {}));
50 }
51
52 return DaqSources(std::move(psources), std::move(msources));
53}
54
55template <class>
56inline constexpr bool always_false_v = false; // NOLINT
57
58template <class Sources>
59bool AllInState(Sources sources, State state) {
60 return std::all_of(
61 sources.begin(), sources.end(), [=](auto const& s) -> bool { return s.state == state; });
62}
63
64} // namespace
65
67 : executor(ex)
68 , start([this](auto par) {
69 return daq::op::InitiateOperation<daq::op::StartAsync>(executor, par);
70 })
71 , abort([this](ErrorPolicy policy, auto par) {
72 return daq::op::InitiateOperation<op::AbortAsync>(executor, policy, std::move(par));
73 })
74 , stop([this](ErrorPolicy policy, auto par) {
75 return daq::op::InitiateOperation<op::StopAsync>(executor, policy, std::move(par));
76 })
77 , await_prim([this](auto par) {
78 return daq::op::InitiateAbortableOperation<op::AwaitPrimAsync>(executor, std::move(par));
79 }) {
80}
81
82bool OcmAsyncOperations::IsValid() const noexcept {
83 return start && stop && abort && await_prim;
84}
85
87 elt::mal::Mal& mal,
88 std::shared_ptr<DpmClient> dpm_client)
89 : m_io_ctx(io_ctx)
90 , m_executor(io_ctx)
91 , m_mal(mal)
92 , m_async_ops(m_executor)
93 , m_dpm_client(std::move(dpm_client)) {
94 assert(m_dpm_client);
95}
96
98 std::shared_ptr<ObservableStatus> status,
99 std::shared_ptr<ObservableEventLog> event_log,
100 fits::KeywordFormatter const& kw_formatter)
101 -> std::shared_ptr<DaqController> {
102 DaqSources sources = MakeSources(m_mal, daq_ctx);
103 return OcmDaqController::Create(m_io_ctx,
104 kw_formatter,
105 std::move(daq_ctx),
106 sources,
107 std::move(status),
108 std::move(event_log),
109 m_async_ops);
110}
111
113 std::shared_ptr<ObservableStatus> status,
114 std::shared_ptr<ObservableEventLog> event_log)
115 -> std::shared_ptr<DaqController> {
116 return std::make_shared<DpmDaqController>(
117 m_io_ctx, std::move(daq_ctx), std::move(status), std::move(event_log), m_dpm_client);
118}
119
120std::ostream& operator<<(std::ostream& os, DaqController const& daq) {
121 os << "DaqController(id='" << daq.GetId() << "', state=" << daq.GetState() << ")";
122 return os;
123}
124
125CommonDaqController::CommonDaqController(boost::asio::io_context& io_context,
126 DaqContext context,
127 std::shared_ptr<ObservableStatus> status,
128 std::shared_ptr<ObservableEventLog> event_log)
129 : m_io_ctx(io_context)
130 , m_executor(m_io_ctx)
131 , m_context(std::move(context))
132 , m_status(std::move(status))
133 , m_event_log(std::move(event_log)) {
134 assert(m_status);
135 assert(m_event_log);
136}
137
138std::shared_ptr<ObservableStatus> CommonDaqController::GetStatus() DAQ_NOEXCEPT {
139 return m_status;
140}
141
142std::shared_ptr<ObservableStatus const> CommonDaqController::GetStatus() const DAQ_NOEXCEPT {
143 return m_status;
144}
145
146std::shared_ptr<ObservableEventLog> CommonDaqController::GetEventLog() DAQ_NOEXCEPT {
147 return m_event_log;
148}
149
150std::string const& CommonDaqController::GetId() const DAQ_NOEXCEPT {
151 return m_status->GetId();
152}
153
155 return m_status->HasError();
156}
157
159 return m_context;
160}
161
162boost::signals2::connection
163CommonDaqController::ConnectContext(ContextSignal::slot_type const& slot) {
164 return m_sig_context.connect(slot);
165}
166
167std::shared_ptr<OcmDaqController>
168OcmDaqController::Create(boost::asio::io_context& io_context,
169 fits::KeywordFormatter const& kw_formatter,
170 DaqContext context,
171 DaqSources const& sources,
172 std::shared_ptr<ObservableStatus> status,
173 std::shared_ptr<ObservableEventLog> event_log,
174 OcmAsyncOperations ops) {
175 // note: make_shared doesn't work since constructor is protected,
176 // to protect against non-shared ownership.
177 LOG4CPLUS_TRACE("daq", fmt::format("OcmDaqController::Create"));
178 return std::shared_ptr<OcmDaqController>(new OcmDaqController(io_context,
179 kw_formatter,
180 std::move(context),
181 sources,
182 std::move(status),
183 std::move(event_log),
184 std::move(ops)));
185}
186
187OcmDaqController::OcmDaqController(boost::asio::io_context& io_context,
188 fits::KeywordFormatter const& kw_formatter,
189 DaqContext context,
190 DaqSources const& sources,
191 std::shared_ptr<ObservableStatus> status,
192 std::shared_ptr<ObservableEventLog> event_log,
194 : CommonDaqController(io_context, std::move(context), std::move(status), std::move(event_log))
195 , m_kw_formatter(kw_formatter)
196 , m_state(MakeState(GetStatusRef().GetState()))
197 , m_prim_sources(MakeSources<PrimSource>(sources.GetPrimarySources()))
198 , m_meta_sources(MakeSources<MetaSource>(sources.GetMetadataSources()))
199 , m_async_ops(std::move(ops))
200 , m_pending_replies(PendingReplies::Create())
201 , m_logger(log4cplus::Logger::getInstance("daq.ocm.controller")) {
202 if (GetContext().id != GetStatusRef().GetId()) {
203 throw boost::enable_current_exception(
204 std::invalid_argument("Data acquisition id mismatch between DaqContext "
205 "and ObservableStatus"));
206 }
207
208 if (!HasStatefulSources() && GetContext().results.empty()) {
209 throw boost::enable_current_exception(
210 std::invalid_argument("No data sources or data to merge is provided"));
211 }
212
213 if (!m_async_ops.IsValid()) {
214 throw boost::enable_current_exception(
215 std::invalid_argument("OcmAsyncOperations is invalid"));
216 }
217}
218
220 State s;
221 std::visit(
222 [&](auto const& var) {
223 using T = std::decay_t<decltype(var)>;
224 if constexpr (std::is_same_v<T, NotStarted>) {
226 } else if constexpr (std::is_same_v<T, Starting>) {
227 s = State::Starting;
228 } else if constexpr (std::is_same_v<T, Acquiring>) {
230 } else if constexpr (std::is_same_v<T, Stopping>) {
231 s = State::Stopping;
232 } else if constexpr (std::is_same_v<T, Stopped>) {
233 s = State::Stopped;
234 } else if constexpr (std::is_same_v<T, Aborting>) {
236 } else if constexpr (std::is_same_v<T, Aborted>) {
237 s = State::Aborted;
238 } else {
239 static_assert(always_false_v<T>, "non-exhaustive visitor!");
240 }
241 },
242 m_state);
243 return s;
244}
245
246constexpr log4cplus::Logger const& OcmDaqController::GetLogger() const noexcept {
247 return m_logger;
248}
249
251 switch (s) {
253 return StateVariant(NotStarted());
254 case State::Starting:
255 return StateVariant(Starting());
256 case State::Acquiring:
257 return StateVariant(Acquiring());
258 case State::Stopping:
259 return StateVariant(Stopping());
260 case State::Stopped:
261 return StateVariant(Stopped());
263 return StateVariant(Aborting());
264 case State::Aborted:
265 return StateVariant(Aborted());
266 default:
267 break;
268 };
269 LOG4CPLUS_FATAL(m_logger, fmt::format("Invalid state provided: '{}'", s));
270 std::terminate();
271}
272
274 m_state = s;
275 // Publish changes
276 GetStatusRef().SetState(GetState());
277}
278
282 alerts,
284 m_logger,
285 GetId(),
286 *m_pending_replies.get(),
290}
291
295 alerts,
297 m_logger,
298 GetId(),
299 *m_pending_replies.get(),
303 GetContext().await_interval);
304}
305
307 // Nothing yet.
308}
309
311 return !(m_prim_sources.empty() && m_meta_sources.empty());
312}
313
314boost::future<State> OcmDaqController::StartAsync() {
316 ActionEvent(GetId(), "DaqController::StartAsync()", GetStatusRef().GetStatus()));
317
318 // Make sure we're not already started.
319 if (!std::holds_alternative<NotStarted>(m_state)) {
320 return boost::make_exceptional_future<State>(
321 std::runtime_error("Data acquisition is already started"));
322 }
323
325
327
328 auto alerts = std::make_unique<op::AlertState>();
329 return m_async_ops.start(MakeParams(*alerts.get()))
330 .then(GetIoExecutor(),
331 [alerts = std::move(alerts),
332 daq = std::static_pointer_cast<OcmDaqController>(shared_from_this()),
333 this](boost::future<void> f) {
334 LOG4CPLUS_INFO(daq->GetLogger(),
335 fmt::format("{}: StartAsync: Completed {}",
336 *daq,
337 f.has_value() ? "successfully" : "with error"));
338 auto& status = GetStatusRef();
340 &status, alert::DAQ_CONTROLLER, "StartAsync()");
341 try {
342 // Merge alerts from async op
343 op::MergeAlerts(status, *alerts);
344
345 // Re-raise any exception
346 if (f.has_exception()) {
347 (void)f.get();
348 }
349 // Await completion if there are primary data sources
351 // No exception -> all done! update and return Acquiring state
353 alert.Clear();
354 return GetState();
355 } catch (std::exception const& e) {
356 LOG4CPLUS_ERROR(m_logger,
357 fmt::format("{}: AsyncStartDaq failed: {}", *this, e.what()));
358 alert.Set(fmt::format("Failed to start DAQ: {}", e.what()));
359 throw;
360 }
361 });
362}
363
364boost::future<Status> OcmDaqController::StopAsync(ErrorPolicy policy) {
366 ActionEvent(GetId(), "DaqController::StopAsync()", GetStatusRef().GetStatus()));
367
368 if (!(std::holds_alternative<Acquiring>(m_state) ||
369 std::holds_alternative<Stopping>(m_state))) {
370 return boost::make_exceptional_future<Status>(
371 std::runtime_error(fmt::format("Data acquisition can only be stopped while Acquiring "
372 "or being Stopped, current state: {}",
373 GetState())));
374 }
375
376 // @todo If we're in Starting this will potentially mess up assumption that we're
377 // in Starting. Revisit to remove this assumption?
378 // @todo: Store produced files
379 // m_status.AddFiles(reply.getFiles());
381
382 // As we're asked to stop manually we stop waiting for primary sources
384
385 auto alerts = std::make_unique<op::AlertState>();
386 return m_async_ops.stop(policy, MakeParams(*alerts.get()))
387 .then(GetIoExecutor(),
388 [alerts = std::move(alerts),
389 daq = std::static_pointer_cast<OcmDaqController>(shared_from_this()),
390 this](boost::future<Result<DpParts>> f) mutable -> Status {
391 if (GetState() != State::Stopping) {
392 // It can happen that a request to stop was superseded and completed before
393 // reply was received. In this case we treat it as a failure as we cannot
394 // determine if post conditions are met.
395 LOG4CPLUS_WARN(
396 GetLogger(),
397 fmt::format("{}: StopAsync: Data acquisition modified by other commands. "
398 "Do nothing else (errors are ignored). ",
399 *this));
400 throw std::runtime_error(fmt::format(
401 "Stop command could not be completed because Data Acquisitions state was "
402 "modified in the meantime (current state: {})",
403 GetState()));
404 }
405 LOG4CPLUS_INFO(GetLogger(),
406 fmt::format("{}: StopAsync: Completed {}",
407 *this,
408 f.has_value() ? "successfully" : "with error"));
409 auto& status = GetStatusRef();
411 &status, alert::DAQ_CONTROLLER, "StopAsync()");
412 try {
413 op::MergeAlerts(status, *alerts);
414
415 // Also may throw to propagate error
416 auto result = f.get();
417 if (result.error) {
418 alert.Set("Async operation Stop completed with error(s)");
419 } else {
420 alert.Clear();
421 }
422 AddDpParts(GetContextMut(), result.result);
424
425 // If there were no exceptions we're all done.
426 // Update and return Stopped state
427 SetState(Stopped{});
428 return GetStatus()->GetStatus();
429 } catch (std::exception const& e) {
430 LOG4CPLUS_ERROR(m_logger,
431 fmt::format("{}: StopAsync failed: {}", *this, e.what()));
432 alert.Set(fmt::format("Failed to stop DAQ: {}", e.what()));
433 throw;
434 }
435 });
436}
437
438boost::future<Status> OcmDaqController::AbortAsync(ErrorPolicy policy) {
440 GetId(), fmt::format("DaqController::AbortAsync({})", policy), GetStatusRef().GetStatus()));
441
442 if (std::holds_alternative<NotStarted>(m_state)) {
443 LOG4CPLUS_INFO(m_logger, fmt::format("{}: Aborting not started data acquisition", *this));
444 SetState(Aborted{});
445 return boost::make_ready_future<Status>(GetStatus()->GetStatus());
446 }
447
448 if (std::holds_alternative<Stopped>(m_state) || std::holds_alternative<Aborted>(m_state)) {
449 return boost::make_exceptional_future<Status>(
450 std::runtime_error("Data acquisition already stopped or aborted"));
451 }
452
454
455 // As we're asked to stop manually we stop waiting for primary sources
457
458 auto alerts = std::make_unique<op::AlertState>();
459 return m_async_ops.abort(policy, MakeParams(*alerts.get()))
460 .then(GetIoExecutor(),
461 [alerts = std::move(alerts),
462 daq = std::static_pointer_cast<OcmDaqController>(shared_from_this()),
463 this](boost::future<Result<void>> f) -> Status {
464 if (GetState() == State::Aborted) {
465 // It can happen that a request to abort was superseded and
466 // finalized before reply was received.
467 LOG4CPLUS_INFO(
468 GetLogger(),
469 fmt::format("{}: AbortAsync: Data acquisition already aborted. "
470 "Do nothing else (errors are ignored). ",
471 *this));
472 // @todo: Should throw instead as the command was superseeded by
473 // other command.
474 return GetStatus()->GetStatus();
475 }
476 LOG4CPLUS_DEBUG(GetLogger(),
477 fmt::format("{}: AbortAsync: Completed, updating DAQ status and "
478 "set reply remaining. Has fatal error={}",
479 *daq,
480 f.has_exception()));
481 auto& status = GetStatusRef();
482 // Merge alerts from saync op
484 &status, alert::DAQ_CONTROLLER, "AbortAsync()");
485 try {
486 op::MergeAlerts(status, *alerts);
487
488 auto result = f.get();
489 if (result.error) {
490 alert.Set("Async operation Abort completed with error(s)");
491 } else {
492 alert.Clear();
493 }
494 // Success
495 daq->SetState(Aborted{});
496 LOG4CPLUS_INFO(GetLogger(),
497 fmt::format("{}: AbortAsync: Completed successfully.", *daq));
498 return GetStatus()->GetStatus();
499 } catch (std::exception const& e) {
500 LOG4CPLUS_ERROR(m_logger,
501 fmt::format("{}: AbortAsync failed: {}", *this, e.what()));
502 alert.Set(fmt::format("Failed to abort DAQ: {}", e.what()));
503 throw;
504 }
505 });
506}
507
509 return boost::make_exceptional_future<State>(std::runtime_error(
510 fmt::format("ScheduleMergeAsync() is invalid in state: {}", GetState())));
511}
512
515 fmt::format("DaqController::UpdateKeywords(<omitted>)"),
517
518 if (std::holds_alternative<Stopped>(m_state) || std::holds_alternative<Aborted>(m_state)) {
519 throw boost::enable_current_exception(
520 std::runtime_error("Data acquisition already stopped or aborted"));
521 }
522
525}
526
527boost::future<State>
528OcmDaqController::AwaitAsync(std::vector<std::string> sources, std::chrono::milliseconds timeout) {
530 GetId(),
531 fmt::format("DaqController::AwaitAsync({}, {} ms)",
532 sources.empty() ? "all primary sources" : "a user defined list of sources",
533 timeout.count()),
535
536 std::vector<std::variant<Source<PrimSource>*, Source<MetaSource>*>> await_on;
537
538 if (!sources.empty()) {
539 for (auto const& source_id : sources) {
540 auto source_var = FindSource(source_id);
541 if (source_var) {
542 await_on.emplace_back(*source_var);
543 } else {
544 return boost::make_exceptional_future<State>(
545 std::invalid_argument(fmt::format("Source with id='{}' not found", source_id)));
546 }
547 }
548 } else {
549 // Use all primary sources by default
550 for (auto& source : m_prim_sources) {
551 await_on.emplace_back(&source);
552 }
553 }
554
555 // note that condition references sources in OcmDaqController and should not be invoked
556 // unless OcmDaqController is alive.
557 auto condition = [sources = await_on]() {
558 return std::all_of(sources.begin(), sources.end(), [](auto var) {
559 return std::visit(
560 [](auto v) {
561 return v->GetState() == State::Aborted || v->GetState() == State::Stopped;
562 },
563 var);
564 });
565 };
566
567 // Test if condition is already satified
568 if (condition()) {
569 return boost::make_ready_future<State>(GetState());
570 }
571
572 // Wait is not already satisfied, attach state listeners to all sources.
573 // daq is captured with a weak ptr to avoid keeping OcmDaqController alive if no state changes
574 // occur on monitored components.
575 auto promise = std::make_shared<boost::promise<State>>();
576
577 if (timeout > std::chrono::milliseconds(0)) {
578 auto timer = std::make_unique<boost::asio::steady_timer>(GetIoCtx(), timeout);
579 timer->async_wait([promise,
580 timer_ptr = timer.get(),
581 daq_weak = std::weak_ptr<OcmDaqController>(
582 std::static_pointer_cast<OcmDaqController>(shared_from_this()))](
583 boost::system::error_code ec) {
584 // Promise might already be fulfilled, there's no way to check if it is though.
585 try {
586 auto daq = daq_weak.lock();
587 if (ec) {
588 if (daq) {
589 LOG4CPLUS_DEBUG(
590 daq->m_logger,
591 fmt::format("{}: AsyncWait: Operation abandoned before completing.",
592 *daq));
593 }
594 // Timer deleted or was cancelled, set exception in promise
595 promise->set_exception(DaqOperationAborted(""));
596 } else {
597 // Normal timeout
598 // For this case we also want to delete the timer itself
599 if (daq) {
600 LOG4CPLUS_DEBUG(
601 daq->m_logger,
602 fmt::format("{}: AsyncWait: Operation timed out before completing.",
603 *daq));
604 daq->m_timers.erase(
605 std::remove_if(
606 daq->m_timers.begin(),
607 daq->m_timers.end(),
608 [timer_ptr](std::unique_ptr<boost::asio::steady_timer>& val) {
609 return val.get() == timer_ptr;
610 }),
611 daq->m_timers.end());
612 }
613
614 promise->set_exception(DaqOperationTimeout(""));
615 }
616 } catch (...) {
617 }
618 });
619 m_timers.emplace_back(std::move(timer));
620 }
621
622 auto listener = [condition,
623 promise,
624 daq_weak = std::weak_ptr<OcmDaqController>(
625 std::static_pointer_cast<OcmDaqController>(shared_from_this()))](State,
626 bool) {
627 auto daq = daq_weak.lock();
628 if (!daq) {
629 LOG4CPLUS_WARN("daq", "OcmDaqController deleted before await condition was fulfulled");
630 // this async op was abandoned. Do nothing.
631 return;
632 }
633
634 if (condition()) {
635 LOG4CPLUS_INFO(daq->m_logger,
636 fmt::format("{}: AwaitAsync: Await condition fulfilled", *daq));
637 try {
638 promise->set_value(daq->GetState());
639 } catch (...) {
640 // Exception might be thrown because promise is already fulfilled, which is
641 // expected to happen.
642 }
643 }
644 };
645
646 // Connect listeners to sources that should be awaited.
647 for (auto& source : await_on) {
648 std::visit(
649 [daq = std::static_pointer_cast<OcmDaqController>(shared_from_this()),
650 listener](auto s) {
651 LOG4CPLUS_DEBUG(
652 daq->m_logger,
653 fmt::format("{}: AsyncWait: Attaching listener on source '{}'.", *daq, *s));
654 // Use automatic connection disconnect if daq is destroyed. (i.e. track_foreign).
655 using SlotType =
656 typename std::remove_reference<decltype(*s)>::type::StateSignal::slot_type;
657 s->ConnectStateListener(SlotType(std::move(listener)).track_foreign(daq));
658 },
659 source);
660 }
661 return promise->get_future();
662}
663
664std::optional<std::variant<Source<PrimSource>*, Source<MetaSource>*>>
665OcmDaqController::FindSource(std::string_view source_id) {
666 {
667 auto it =
668 std::find_if(m_prim_sources.begin(), m_prim_sources.end(), [source_id](auto& source) {
669 return source.GetSource().GetName() == source_id;
670 });
671 if (it != m_prim_sources.end()) {
672 return &(*it);
673 }
674 }
675 {
676 auto it =
677 std::find_if(m_meta_sources.begin(), m_meta_sources.end(), [source_id](auto& source) {
678 return source.GetSource().GetName() == source_id;
679 });
680 if (it != m_meta_sources.end()) {
681 return &(*it);
682 }
683 }
684
685 // Not found
686 return {};
687}
688
689template <class SourceType>
690std::vector<Source<SourceType>> OcmDaqController::MakeSources(std::vector<SourceType> sources) {
691 //@todo: Check for duplicates
692 std::vector<Source<SourceType>> dest;
693 dest.reserve(sources.size());
694 std::transform(
695 std::make_move_iterator(sources.begin()),
696 std::make_move_iterator(sources.end()),
697 std::back_inserter(dest),
698 [](SourceType&& s) -> Source<SourceType> { return Source<SourceType>{std::move(s)}; });
699 return dest;
700}
701
702void OcmDaqController::InitiateStopCondition() {
703 AddEvent<ActionEvent>(
704 GetId(), "DaqController::InitiateStopCondition(): Initiating", GetStatusRef().GetStatus());
705 if (!HasStatefulSources()) {
706 LOG4CPLUS_DEBUG(
707 m_logger,
708 fmt::format("{}: InitiateStopCondition: No stateful sources to wait for -> stopping",
709 *this));
710 // If we don't have any stateful data sources at all we stop automatically. This needs to be
711 // done asynchronously though to match expectation that this is an async initiation
712 // function.
713 boost::async(GetIoExecutor(),
714 [daq = std::static_pointer_cast<OcmDaqController>(shared_from_this())] {
715 daq->StopAsync(ErrorPolicy::Strict);
716 });
717 return;
718 }
719 if (m_prim_sources.empty()) {
720 LOG4CPLUS_DEBUG(
721 m_logger,
722 fmt::format("{}: InitiateStopCondition: No primary sources to monitor.", *this));
723 return;
724 }
725 LOG4CPLUS_DEBUG(m_logger, fmt::format("{}: InitiateStopCondition: Starting operation.", *this));
726 auto alerts = std::make_unique<op::AlertState>();
727 auto [future, abort] = m_async_ops.await_prim(MakeAwaitParams(*alerts.get()));
728 m_abort_await_primary_sources = abort;
729 // Set up continuation that stops data acquisition automatically
730 future.then(
731 GetIoExecutor(),
732 [alerts = std::move(alerts),
733 daq = std::static_pointer_cast<OcmDaqController>(shared_from_this())](
734 boost::future<Result<DpParts>> fut) {
735 // Remove abort callback as async op has completed
736 daq->m_abort_await_primary_sources = nullptr;
737
738 // Add FITS files from the stopped sources
739 auto result = fut.get();
740 LOG4CPLUS_DEBUG(daq->m_logger,
741 fmt::format("{}: InitiateStopCondition: Adding {} files from "
742 "primary sources",
743 *daq,
744 result.result.size()));
745 // @todo Confirm if we want to add parts even though state has transitioned past
746 // Acquiring (which may mean manual stop and that parts have already been added)
747 // @todo: How to treat errors from await?
748 daq::AddDpParts(daq->GetContextMut(), result.result);
749 daq->EmitContextSignal();
750
751 // If daq is still acquiring we stop using strict error policy, otherwise do nothing
752 if (!std::holds_alternative<Acquiring>(daq->m_state)) {
753 LOG4CPLUS_DEBUG(
754 daq->m_logger,
755 fmt::format(
756 "{}: InitiateStopCondition: "
757 "AwaitAsync completed but another operation has already transitioned "
758 "DAQ from Acquiring so automatic stop will not be performed.",
759 *daq));
760 return;
761 }
762 daq->AddEvent<ActionEvent>(
763 daq->GetId(),
764 "DaqController::InitiateStopCondition(): "
765 "Primary sources completed. Performing automatic stop of metadata sources",
766 daq->GetStatusRef().GetStatus());
767
768 // Merge alerts from async op
769 auto defer = ObservableStatus::DeferSignal(&daq->GetStatusRef());
770 op::MergeAlerts(daq->GetStatusRef(), *alerts);
771
772 // No continuation is necessary. If an error occurs that information is published
773 // and user needs to intervene.
774 // If a separate topic is created for errors only it may be published with a
775 // continuation attached to the result of StopAsync (or more likely implemented inside
776 // StopAsync).
777 daq->StopAsync(ErrorPolicy::Strict);
778 });
779}
780
781void OcmDaqController::CancelAwaitPrimarySources() {
782 if (m_abort_await_primary_sources) {
783 LOG4CPLUS_TRACE(m_logger, "OcmDaqController::CancelAwaitPrimarySources()");
784 m_abort_await_primary_sources();
785 m_abort_await_primary_sources = nullptr;
786 }
787}
788
789DpmDaqController::DpmDaqController(boost::asio::io_context& io_context,
790 DaqContext context,
791 std::shared_ptr<ObservableStatus> status,
792 std::shared_ptr<ObservableEventLog> event_log,
793 std::shared_ptr<DpmClient> dpm_client)
794 : CommonDaqController(io_context, std::move(context), std::move(status), std::move(event_log))
795 , m_liveness(std::make_shared<bool>(true))
796 , m_dpm_client(std::move(dpm_client))
797 , m_logger(log4cplus::Logger::getInstance("daq.ocm.controller")) {
798 assert(m_dpm_client);
799 UpdateStateContext();
800
801 // Connect slot to status signal which mirrors DAQ status of *this* from DPM
802 //
803 // Note: weak_from_this cannot be used in constructor as the reference count is not yet
804 // incremented by the holding shared_ptr. For this reason the m_liveness is used instead.
805 //
806 // Additionally the status provided may from a past update, so we only update if received status
807 // is newer than current.
808 m_status_connection = m_dpm_client->ConnectStatusSignal(
809 [id = GetId(), weak = std::weak_ptr<bool>(m_liveness), this](Status const& status) {
810 if (id != status.id) {
811 // Update for another DAQ
812 return;
813 }
814 auto lock = weak.lock();
815 if (!lock) {
816 // Abandoned
817 LOG4CPLUS_DEBUG("daq", "DpmDaqController is abandoned: " << this);
818 return;
819 }
820
821 auto const& current = GetStatusRef().GetStatus();
822 if (current.timestamp > status.timestamp) {
823 LOG4CPLUS_TRACE(
824 m_logger,
825 "Ignoring DAQ status update: From DPM: " << status << ", current: " << current);
826 return;
827 }
828 // Assign status of DAQ from DPM.
829 LOG4CPLUS_TRACE(m_logger, *this << ": Assigning new DAQ status from DPM: " << status);
830 GetStatusRef() = status;
831 UpdateStateContext();
832 });
833}
834
835boost::future<State> DpmDaqController::StartAsync() {
836 return boost::make_exceptional_future<State>(
837 std::runtime_error(fmt::format("StartAsync() is invalid in state: {}", GetState())));
838}
839
841 return boost::make_exceptional_future<Status>(
842 std::runtime_error(fmt::format("StopAsync() is invalid in state: {}", GetState())));
843}
844
845boost::future<State>
846DpmDaqController::AwaitAsync(std::vector<std::string>, std::chrono::milliseconds) {
847 return boost::make_exceptional_future<State>(std::runtime_error(
848 fmt::format("AwaitAsync() with sources is invalid in state: {}", GetState())));
849}
850
852 throw std::runtime_error(fmt::format("UpdateKeywords() is invalid in state: {}", GetState()));
853}
854
856 return GetStatusRef().GetState();
857}
858
860 LOG4CPLUS_TRACE(m_logger, *this << ": DpmDaqController::ScheduleMergeAsync");
861
862 // If we are in NotScheduled and no request in-flight already: Send request to DPM to merge.
863 // otherwise fail.
864 auto starting_state = GetState();
865 if (starting_state != State::NotScheduled) {
866 return boost::make_exceptional_future<State>(std::runtime_error(
867 fmt::format("ScheduleMergeAsync() is invalid in state: {}", GetState())));
868 }
869
870 assert(std::holds_alternative<NotScheduled>(m_state_ctx));
871 auto& ctx = std::get<NotScheduled>(m_state_ctx);
872 if (ctx.schedule_reply_pending) {
873 // A request has already been sent
874 return boost::make_exceptional_future<State>(
875 std::logic_error("ScheduleMergeAsync() a request is already in flight"));
876 }
877 if (!m_dp_spec) {
878 auto dp_spec = MakeDataProductSpecification(GetContext(), m_logger);
879 nlohmann::json j;
880 to_json(j, dp_spec);
881 m_dp_spec = j.dump(2); // indent 2
882 LOG4CPLUS_DEBUG(m_logger, "Created DpSpec:\n" << *m_dp_spec);
883 }
884 auto status = std::string();
885 {
886 nlohmann::json j;
887 j = GetStatus()->GetStatus();
888 status = j.dump(2);
889 }
890 return m_dpm_client->ScheduleAsync(*m_dp_spec, status)
891 .then(GetIoExecutor(),
892 [starting_state, weak = weak_from_this(), this](boost::future<State> f) -> State {
893 auto alert_id = MakeAlertId(alert::REQUEST, "dpm.QueueDaq()");
894 auto shared = weak.lock();
895 try {
896 auto state = f.get();
897 if (shared) {
898 auto& status = GetStatusRef();
900 // Update state only if DAQ has not changed state from starting_state.
901 //
902 // This can e.g. happen if OCM receive published state change data sample
903 // from DPM before receiving command reply.
904 // @todo: Use timestamp instead?
905 if (GetState() == starting_state) {
906 SetState(state);
907 }
908 status.ClearAlert(alert_id);
909 LOG4CPLUS_INFO(m_logger,
910 fmt::format("{}: Scheduled DAQ successfully", *this));
911 } else {
912 LOG4CPLUS_DEBUG("daq", "DpmDaqController is abandoned: " << this);
913 }
914 return state;
915 } catch (elt::mal::TimeoutException const&) {
916 // Timeout errors does not result in error flag being set.
917 if (shared) {
918 LOG4CPLUS_INFO(m_logger,
919 fmt::format("{}: Schedule DAQ failed with timeout "
920 "(no error flag set for this condition)",
921 *this));
922 }
923 throw;
924 } catch (std::exception const& e) {
925 if (shared) {
926 auto& status = GetStatusRef();
928 status.SetAlert(MakeAlert(
929 alert_id,
930 fmt::format("Failed to schedule DAQ for merging: {}", e.what())));
931 LOG4CPLUS_ERROR(m_logger, fmt::format("{}: Scheduled DAQ failed", *this));
932 }
933 throw;
934 }
935 });
936}
937
938boost::future<Status> DpmDaqController::AbortAsync(ErrorPolicy policy) {
939 LOG4CPLUS_TRACE(m_logger, *this << ": DpmDaqController::AbortAsync");
940
941 // 1. If we are sure DAQ has not been scheduled we can abort immediately.
942 // @todo: Handle case where request to schedule has been sent, but no acknowledgement received.
943 auto state = GetState();
944 if (state == State::NotScheduled) {
945 assert(std::holds_alternative<NotScheduled>(m_state_ctx));
946 if (!std::get<NotScheduled>(m_state_ctx).schedule_reply_pending) {
947 SetState(State::Aborted);
948 return boost::make_ready_future<Status>(*GetStatus());
949 }
950 }
951
952 // 2. Otherwise we must delegate to DPM to abort (this is also why we are not setting state
953 // AbortingMerging as we cannot know until we communiate with DPM).
954 // If abort fails but is forced to complete an alert is added.
955 return m_dpm_client->AbortAsync(GetId()).then(
956 GetIoExecutor(), [policy, weak = weak_from_this(), this](boost::future<State> f) -> Status {
957 auto shared = weak.lock();
958 if (!shared) {
959 // Abandonded
960 LOG4CPLUS_DEBUG("daq", "DpmDaqController is abandoned: " << this);
961 throw boost::enable_current_exception(std::runtime_error("Operation aborted"));
962 }
963 auto& status = GetStatusRef();
964 auto alert =
965 ObservableStatus::AlertActivator(&status, alert::REQUEST, "dpm.AbortAsync()");
966 try {
967 auto result = f.get();
968 SetState(result);
969 alert.Clear();
970
971 return *GetStatus();
972 } catch (std::exception const& e) {
973 // Even if DPM failed we mark it as aborted if forced
974 // Unless we force completion we don't add the alert to DAQ as DPM owns the state.
975 if (policy == ErrorPolicy::Tolerant) {
976 LOG4CPLUS_ERROR(m_logger,
977 *this
978 << ": Request to abort DAQ to DPM returned with "
979 "error. ErrorPolicy::Tolerant is used so DAQ is marked "
980 "as aborted anyway");
981 alert.Set(
982 fmt::format("Async operation Abort completed with error(s): {}", e.what()));
983 SetState(State::Aborted);
984 return *GetStatus();
985 }
986 throw;
987 }
988 });
989}
990
991void DpmDaqController::UpdateStateContext() {
992 auto state = GetState();
993 switch (state) {
995 m_state_ctx = NotScheduled{false};
996 return;
997 default:
998 m_state_ctx = std::monostate();
999 };
1000}
1001
1002void DpmDaqController::SetState(State state) {
1003 auto prev = GetState();
1004 GetStatusRef().SetState(state);
1005 if (prev != state) {
1006 UpdateStateContext();
1007 }
1008}
1009
1010} // namespace daq
Contains declaration for the AbortAsync operation.
Contains declaration for the AwaitPrimAsync operation.
Implements common behaviour of OcmDaqController and DpmDaqController.
ObservableStatus & GetStatusRef() noexcept
DaqContext const & GetContext() const DAQ_NOEXCEPT override
std::shared_ptr< ObservableStatus > GetStatus() DAQ_NOEXCEPT override
bool GetErrorFlag() const DAQ_NOEXCEPT override
std::shared_ptr< ObservableEventLog > GetEventLog() DAQ_NOEXCEPT override
boost::signals2::connection ConnectContext(ContextSignal::slot_type const &slot) override
Connect observer that is invoked when context is modified.
rad::IoExecutor & GetIoExecutor() noexcept
std::string const & GetId() const DAQ_NOEXCEPT override
DaqContext & GetContextMut() noexcept
CommonDaqController(boost::asio::io_context &io_context, DaqContext context, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log)
ObservableEventLog & GetEventLogRef() noexcept
auto MakeDpmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log) -> std::shared_ptr< DaqController > override
Create instance for the DPM phase of the DAQ process.
auto MakeOcmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, fits::KeywordFormatter const &kw_formatter) -> std::shared_ptr< DaqController > override
Create instance for the OCM phase of the DAQ process.
DaqControllerFactoryDefault(boost::asio::io_context &io_ctx, elt::mal::Mal &m_mal, std::shared_ptr< DpmClient > dpm_client)
Controls the execution of single data acquisition that ultimately result in a set of FITS keywords an...
Data acquisition sources.
Definition: source.hpp:186
boost::future< Status > AbortAsync(ErrorPolicy policy) override
Attempts to abort Data Acquisition.
void UpdateKeywords(fits::KeywordVector const &keywords) override
boost::future< State > AwaitAsync(std::vector< std::string > sources, std::chrono::milliseconds timeout) override
boost::future< State > ScheduleMergeAsync() override
Schedules DAQ for merging by sending request to DPM.
State GetState() const DAQ_NOEXCEPT override
boost::future< Status > StopAsync(ErrorPolicy policy) override
boost::future< State > StartAsync() override
Keeps relevant state to be able to communicate with a primary data source.
Definition: source.hpp:142
metadaqif::MetaDaqAsync RrClient
Definition: source.hpp:144
void AddEvent(EventLog::EventType event)
Records that a file has been produced for this data acquisition.
Definition: eventLog.cpp:56
Provide more consise way to set/clear alerts.
Definition: status.hpp:266
Defer signal changes until later time.
Definition: status.hpp:236
State GetState() const noexcept
Definition: status.cpp:297
Status const & GetStatus() const noexcept
Connect observer that is invoked when state is modified.
Definition: status.cpp:371
void SetState(State s) noexcept
Set state of data acquisition.
Definition: status.cpp:305
Implements daq::DaqController for states responsible to be executed by OCM.
boost::future< State > StartAsync() override
Starts the data acquisition.
static std::shared_ptr< OcmDaqController > Create(boost::asio::io_context &io_context, fits::KeywordFormatter const &kw_formatter, DaqContext context, DaqSources const &sources, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, OcmAsyncOperations operations)
Construct object.
constexpr log4cplus::Logger const & GetLogger() const noexcept
std::vector< Source< PrimSource > > m_prim_sources
Note: Consider vector immutable!
State GetState() const DAQ_NOEXCEPT override
op::AsyncOpParams MakeParams(op::AlertState &)
Constructs the parameters used for asynchronous operations.
StateVariant MakeState(State s) const noexcept
log4cplus::Logger m_logger
OcmDaqController(boost::asio::io_context &io_context, fits::KeywordFormatter const &kw_formatter, DaqContext context, DaqSources const &sources, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, OcmAsyncOperations ops)
std::optional< std::variant< Source< PrimSource > *, Source< MetaSource > * > > FindSource(std::string_view source_id)
auto HasStatefulSources() noexcept -> bool
Queries if ctx has stateful sources (primary or metadata source).
void UpdateKeywords(fits::KeywordVector const &keywords) override
Updates (replace or add) list of keywords.
std::shared_ptr< PendingReplies > m_pending_replies
std::variant< NotStarted, Starting, Acquiring, Stopping, Stopped, Aborting, Aborted > StateVariant
OcmAsyncOperations m_async_ops
std::vector< Source< MetaSource > > m_meta_sources
Note: Consider vector immutable!
boost::future< State > ScheduleMergeAsync() override
Schedules DAQ for merging by sending request to DPM.
boost::future< State > AwaitAsync(std::vector< std::string > sources, std::chrono::milliseconds timeout) override
Awaits that data acquisition stops or aborts.
void CancelAwaitPrimarySources()
Cancels the awaiting of primary sources.
void InitiateStopCondition()
Initiate async operation that triggers DAQ stop automatically.
op::AwaitOpParams MakeAwaitParams(op::AlertState &)
fits::KeywordFormatter const & m_kw_formatter
boost::future< Status > AbortAsync(ErrorPolicy policy) override
Aborts the data acquisition.
void SetState(StateVariant &&s) noexcept
boost::future< Status > StopAsync(ErrorPolicy policy) override
Stops the data acquisition.
Simple class that allows you to keep track of how many replies are pending.
Keeps relevant state to be able to communicate with a primary data source.
Definition: source.hpp:98
recif::RecCmdsAsync RrClient
Definition: source.hpp:100
Formats keyword against e.g.
Definition: keyword.hpp:551
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
#define DAQ_NOEXCEPT
Definition: config.hpp:16
Declares JSON support for serialization.
Contains data structure for FITS keywords.
daq::DpmClient
Contains error related declarations for DAQ.
Contains declarations for the helper functions to initiate operations.
constexpr std::string_view REQUEST
Request.
Definition: status.hpp:32
constexpr std::string_view DAQ_CONTROLLER
Daq controller command failed which is normally when there was an exception from async operations.
Definition: status.hpp:61
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
Definition: keyword.hpp:423
void MergeAlerts(ObservableStatus &dest, AlertState &src)
Merge alerts.
void UpdateKeywords(fits::KeywordVector &out, fits::KeywordVector const &in, fits::KeywordFormatter const &fmt)
Updates (adds or replaces) primary HDU keywords.
Definition: daqContext.cpp:24
AlertId MakeAlertId(std::string_view category, std::string key)
Definition: status.cpp:55
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
Definition: conversion.cpp:77
void AddDpParts(DaqContext &ctx, std::vector< DpPart > const &parts)
Definition: daqContext.cpp:47
std::ostream & operator<<(std::ostream &os, AsyncProcessIf const &proc)
Formats proc representation in the form [<pid>] <args>
ErrorPolicy
Error policy supported by certain operations.
Definition: error.hpp:26
@ Tolerant
Errors that can be ignored with partial completion of a command will be tolerated and is reported as ...
State
Observable states of the data acquisition process.
Definition: state.hpp:41
@ NotScheduled
Before daq is acknowledged by dpm it remains in NotScheduled.
@ Aborted
Data acquisition has been aborted by user.
@ Stopping
Transitional state between Acquiring and Stopped.
@ Acquiring
All data sources have reported data acquisition is in progress.
@ Stopped
All data sources have reported they have stopped acquiring data.
@ Starting
Transitional state between NotStarted and Acquiring when sources have not begun acquiring data yet.
@ AbortingAcquiring
Transitional state for aborting during acquisition.
@ NotStarted
Initial state of data acquisition.
void to_json(nlohmann::json &j, Status const &p)
Definition: json.cpp:37
json::DpSpec MakeDataProductSpecification(DaqContext const &ctx, log4cplus::Logger &logger)
Creates a Data Product Specification as serialized JSON from the provided DaqContext.
Definition: makeDpSpec.cpp:282
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
Definition: status.cpp:45
Utility class that represents a result and an error.
Definition: utility.hpp:17
Contains declaration for for DaqController.
Contains declaration for the StartAsync operation.
Contains declaration for the StopAsync operation.
Event related to an action being requested or performed.
Definition: eventLog.hpp:56
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:42
OCM Async operations.
bool IsValid() const noexcept
std::function< boost::future< Result< void > >(ErrorPolicy, op::AsyncOpParams)> abort
std::reference_wrapper< rad::IoExecutor > executor
std::function< AwaitReturnType(op::AwaitOpParams)> await_prim
std::function< boost::future< void >(op::AsyncOpParams)> start
OcmAsyncOperations(rad::IoExecutor &executor)
Default constructs object with standard async operations.
std::function< boost::future< Result< DpParts > >(ErrorPolicy, op::AsyncOpParams)> stop
Simple class that holds the source and associated state.
Definition: source.hpp:30
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:164
Parameters required for each async operation.
Await specific parameters that is not provided with AsyncOpParams.