ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
daqController.hpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup daq_common_libdaq
4 * @copyright 2022 ESO - European Southern Observatory
5 *
6 * @brief Contains declaration for for DaqController
7 */
8#ifndef OCF_DAQ_DAQ_CONTROLLER_HPP_
9#define OCF_DAQ_DAQ_CONTROLLER_HPP_
10#include "config.hpp"
11
12#include <chrono>
13#include <iostream>
14#include <string>
15#include <type_traits>
16#include <utility>
17#include <variant>
18#include <vector>
19
20#include <Metadaqif.hpp>
21#include <boost/asio/io_context.hpp>
22#include <boost/asio/steady_timer.hpp>
23#include <boost/thread/future.hpp>
24#include <fmt/ostream.h>
25#include <log4cplus/logger.h>
26#include <nlohmann/json.hpp>
27#include <rad/ioExecutor.hpp>
28
29#include <daq/daqContext.hpp>
30#include <daq/dpPart.hpp>
31#include <daq/error.hpp>
32#include <daq/eventLog.hpp>
34#include <daq/op/initiate.hpp>
36#include <daq/source.hpp>
37#include <daq/state.hpp>
38#include <daq/status.hpp>
39#include <daq/utility.hpp>
40
41namespace daq {
42
43class DpmClient;
44class DaqController;
45
46/**
47 * OCM Async operations
48 *
49 * @ingroup daq_common_libdaq
50 */
52 // Await returns a pair of future return value and an abort function.
53 using AwaitReturnType = std::pair<boost::future<Result<DpParts>>, std::function<bool()>>;
54
55 /**
56 * Default constructs object with standard async operations.
57 */
63
64 bool IsValid() const noexcept;
65
66 std::reference_wrapper<rad::IoExecutor> executor;
67
68 std::function<boost::future<void>(op::AsyncOpParams)> start;
69 std::function<boost::future<Result<void>>(ErrorPolicy, op::AsyncOpParams)> abort;
70 std::function<boost::future<Result<DpParts>>(ErrorPolicy, op::AsyncOpParams)> stop;
71 std::function<AwaitReturnType(op::AwaitOpParams)> await_prim;
72};
73
74/**
75 * Abstract factory for DaqControllers.
76 *
77 * Main purpose is to allow daq::Manager to use a factory that creates mocks rather than real
78 * implementations for testing.
79 */
81public:
82 /**
83 * Create instance for the OCM phase of the DAQ process.
84 */
85 virtual auto
87 std::shared_ptr<ObservableStatus> status,
88 std::shared_ptr<ObservableEventLog> event_log,
89 fits::KeywordFormatter const& kw_formatter) -> std::shared_ptr<DaqController> = 0;
90 /**
91 * Create instance for the DPM phase of the DAQ process.
92 */
93 virtual auto MakeDpmPhase(DaqContext daq_ctx,
94 std::shared_ptr<ObservableStatus> status,
95 std::shared_ptr<ObservableEventLog> event_log)
96 -> std::shared_ptr<DaqController> = 0;
97};
98
99/**
100 * Default factory producing "real" implementations.
101 */
103public:
104 DaqControllerFactoryDefault(boost::asio::io_context& io_ctx,
105 elt::mal::Mal& m_mal,
106 std::shared_ptr<DpmClient> dpm_client);
107 auto MakeOcmPhase(DaqContext daq_ctx,
108 std::shared_ptr<ObservableStatus> status,
109 std::shared_ptr<ObservableEventLog> event_log,
110 fits::KeywordFormatter const& kw_formatter)
111 -> std::shared_ptr<DaqController> override;
112 auto MakeDpmPhase(DaqContext daq_ctx,
113 std::shared_ptr<ObservableStatus> status,
114 std::shared_ptr<ObservableEventLog> event_log)
115 -> std::shared_ptr<DaqController> override;
116
117private:
118 boost::asio::io_context& m_io_ctx;
119 rad::IoExecutor m_executor;
120 elt::mal::Mal& m_mal;
121 OcmAsyncOperations m_async_ops;
122 std::shared_ptr<DpmClient> m_dpm_client;
123};
124
125/**
126 * Controls the execution of single data acquisition that ultimately result in a set of FITS
127 * keywords and/or FITS files.
128 *
129 * Important
130 * ---------
131 *
132 * Due to the dynamic nature of data acquisitions (mostly w.r.t. fatal errors from data soures and
133 * commands from user) the following assumptions are made:
134 *
135 * - Each data source completes only *once*.
136 * - This means that any command that deals with potential completion (mainly the stop and await
137 * operations) *must* record the outcome (DpParts + Keywords) and mark the source state as
138 * completed.
139 * - This ensures that no outcome is lost and that subsequent commands does not include completed
140 * sources.
141 * - Conflicting commands are resolved in the order they are received and w.r.t. to state.
142 * - If data sources are awaited on, some of which has aleady completed, and DaqController is
143 * asked to abort, this means that the DAQ state is `Aborting` and even if the await operation
144 * completes successfully, the result will be discarded because user asked to abort.
145 * - If user asks to stop and primary soures are awaited on, there's a race between which request
146 * to data source completes first.
147 * - Commands are only sent to sources that are in-progress (not completed/aborted/stopped).
148 * - Whichever reply is received first (from await or stop) will determine the result from
149 * that data souce.
150 * - Async operations must be able to deal with partial failures.
151 * - This is required to allow user to retry operation.
152 * - Asynchronous errors are published and user response is requied.
153 * - If e.g. the await operation fails partially it means user must intervene to decide if it
154 * should try to stop or abort the DAQ (in the future a DAQ-wide policy could be set to decide
155 * what to do automatically).
156 *
157 * Usage
158 * -----
159 *
160 * 1. Starting
161 *
162 * Start with `StartAsync`
163 *
164 * 2. Stopping
165 *
166 * Manual option
167 * -------------
168 *
169 * Stop/abort with `StopAsync` or `AbortAsync`.
170 *
171 * Automatic option
172 * ----------------
173 *
174 * Automatically initiate stop if all primary sources are automatically stopped as
175 * indicated by `ObservableStatus` status update topic.
176 *
177 * If asynchronous operations have been started, it is still possible to abort.
178 * This command will then supersede ongoing commands, and ongoing commands will
179 * be aborted at the next synchronization point (typically when reply is received).
180 *
181 * It is not possible to retry or go backwards by executing `StartAsync` after
182 * `StopAsync` or `AbortAsync`. The reasoning behind this is to avoid the risk
183 * of duplicate data acquisition id being "in flight" at the same time..
184 *
185 * It is possible to abort on the other hand, even though the data acquisition might not have
186 * been started yet.
187 *
188 * For StartAsync, StopAsync, AbortAsync:
189 *
190 * The error handling strategy is to not Set result until we have a result
191 * from each source either by reply or internal error when Sending/receiving
192 * (MAL issues e.g.).
193 *
194 * This implies:
195 *
196 * - That way we can communicate the full result using the promise.
197 * On errors it contains the exception(s) on success the new state.
198 * - We can have a mixed result of partial success and partial failure,
199 * although the future will contain exception(s).
200 * - We let caller decide what to do (e.g. abort).
201 * - We use an error flag to indicate that error occurred, which is
202 * possible in any state.
203 * - We stay in the state that had the error to e.g. allow retries.
204 * So if an error occurred in `Starting` then DaqController remains
205 * in `Starting` with the error flag Set.
206 *
207 * Pending decision decisions:
208 * - Whether to fail fast or be robust,
209 * or even whether to be configurable.
210 * - This was addressed with the ErrorPolicy option when stopping/aborting.
211 * - Support aborting while there are still replies pending for startDaq?
212 * This would mean that one can be in state Starting and Stopping at the same time.
213 * - This was addressed to ignore errors when forcibly aborting.
214 * - Who connects the clients + configure QoS? Since connect is done implicitly there's
215 * no need actually. Simply specify timeout should be sufficient. And that should
216 * be done by the caller.
217 *
218 * @ingroup daq_common_libdaq
219 */
220class DaqController : public std::enable_shared_from_this<DaqController> {
221public:
222 DaqController() = default;
223 virtual ~DaqController() = default;
224 /**
225 * Starts the data acquisition.
226 *
227 * @returns A future that will be Set once data acquisition has started
228 * or or fails.
229 *
230 * @throws std::exception-derived exception if internal error occurs.
231 *
232 * @return Future that is ready once all sources are started or failed.
233 *
234 * @pre `GetState() == State::Notstarted`
235 * @post `GetState() == State::Starting` on success
236 * `GetState() == State::Error` on error
237 */
238 virtual boost::future<State> StartAsync() = 0;
239
240 /**
241 * Stops the data acquisition.
242 *
243 * @pre `GetState() not in (State::Stopped or State::Aborted)`
244 * @post `GetState() == State::Stopping`
245 */
246 virtual boost::future<Status> StopAsync(ErrorPolicy policy) = 0;
247
248 /**
249 * Aborts the data acquisition.
250 *
251 * @param policy Error policy determining if errors are tolerated or not.
252 *
253 * It is possible to issue this request more than once, to e.g. retry a failed abort attempt.
254 *
255 * @pre `GetState() not in (State::Aborted, State::Stopped)`
256 * @post `GetState() == State::Aborting` if a data acquisition was ongoing otherwise `GetState()
257 * == State::Aborted`.
258 */
259 virtual boost::future<Status> AbortAsync(ErrorPolicy policy) = 0;
260
261 /**
262 * Schedules DAQ for merging by sending request to DPM.
263 *
264 * @returns future containing exception:
265 * - std::logic_error if DAQ has already been scheduled
266 * - mal::TimeoutException if DPM is offline (timeout).
267 * - unspecified DPM fails
268 */
269 virtual boost::future<State> ScheduleMergeAsync() = 0;
270
271 /**
272 * Updates (replace or add) list of keywords.
273 *
274 * @param keywords Keywords to add.
275 */
276 virtual void UpdateKeywords(fits::KeywordVector const& keywords) = 0;
277
278 /** Awaits that data acquisition stops or aborts.
279 *
280 * It is possible to await only only a subset of data sources by specifying their ids in
281 * `sources`.
282 *
283 * @param sources An optional vector of source-ids to await, if empty all primary sources are
284 * awaited on.
285 *
286 * @returns future set with std::invalid_argument if source-id is not recognized.
287 * @returns future set with boost::broken_promise if DaqController is destroyed before
288 * operation completes.
289 */
290 virtual boost::future<State>
291 AwaitAsync(std::vector<std::string> sources, std::chrono::milliseconds timeout) = 0;
292
293 /**
294 * @returns state of data acquisition.
295 */
296 virtual State GetState() const DAQ_NOEXCEPT = 0;
297
298 /**
299 * @returns status object associated with this daq.
300 */
301 virtual std::shared_ptr<ObservableStatus> GetStatus() DAQ_NOEXCEPT = 0;
302 virtual std::shared_ptr<ObservableStatus const> GetStatus() const DAQ_NOEXCEPT = 0;
303
304 /**
305 * @returns event log associated with this daq.
306 */
307 virtual std::shared_ptr<ObservableEventLog> GetEventLog() DAQ_NOEXCEPT = 0;
308
309 /**
310 * @returns data acquisition identifier.
311 */
312 virtual std::string const& GetId() const DAQ_NOEXCEPT = 0;
313
314 /**
315 * @return Error flag.
316 */
317 virtual bool GetErrorFlag() const DAQ_NOEXCEPT = 0;
318
319 virtual DaqContext const& GetContext() const DAQ_NOEXCEPT = 0;
320
321 using ContextSignal = boost::signals2::signal<void(DaqContext const&)>;
322 /**
323 * Connect observer that is invoked when context is modified.
324 *
325 * @param o Observer callable invoked on context changes.
326 *
327 * @return signal connection object that can be used to disconnect observer:
328 */
329 virtual boost::signals2::connection ConnectContext(ContextSignal::slot_type const& slot) = 0;
330};
331
332std::ostream& operator<<(std::ostream& os, DaqController const& daq);
333
334/**
335 * Implements common behaviour of OcmDaqController and DpmDaqController.
336 *
337 */
339public:
340 CommonDaqController(boost::asio::io_context& io_context,
341 DaqContext context,
342 std::shared_ptr<ObservableStatus> status,
343 std::shared_ptr<ObservableEventLog> event_log);
344
345 std::shared_ptr<ObservableStatus> GetStatus() DAQ_NOEXCEPT override;
346 std::shared_ptr<ObservableStatus const> GetStatus() const DAQ_NOEXCEPT override;
347 std::shared_ptr<ObservableEventLog> GetEventLog() DAQ_NOEXCEPT override;
348 std::string const& GetId() const DAQ_NOEXCEPT override;
349 bool GetErrorFlag() const DAQ_NOEXCEPT override;
350 DaqContext const& GetContext() const DAQ_NOEXCEPT override;
351 boost::signals2::connection ConnectContext(ContextSignal::slot_type const& slot) override;
352
353protected:
354 template <class T, class... Args>
355 void AddEvent(Args&&... args) {
356 m_event_log->EmplaceEvent<T>(std::forward<Args>(args)...);
357 }
358 boost::asio::io_context& GetIoCtx() noexcept {
359 return m_io_ctx;
360 }
362 return m_executor;
363 }
365 return m_context;
366 }
368 return *m_event_log.get();
369 }
371 return *m_status.get();
372 }
373 ObservableStatus const& GetStatusRef() const noexcept {
374 return *m_status.get();
375 }
377 m_sig_context(m_context);
378 }
379
380private:
381 boost::asio::io_context& m_io_ctx;
382 rad::IoExecutor m_executor;
383 DaqContext m_context;
384 ContextSignal m_sig_context;
385 std::shared_ptr<ObservableStatus> m_status;
386 std::shared_ptr<ObservableEventLog> m_event_log;
387};
388
389/**
390 * Implements `daq::DaqController` for states responsible to be executed by OCM.
391 *
392 * The states executed by DPM are implemented by DpmDaqController.
393 *
394 * @ingroup daq_common_libdaq
395 */
397public:
398 /**
399 * Construct object.
400 *
401 * @param io_context Executor used for continuations and timer.
402 * @param context General context used to control DAQ execution.
403 * @param status Data acquisition status object, also contains identifier (may not be empty).
404 * Caller is responsible for making the id unique.
405 * @param prim Primary data sources
406 * @param meta Metadata sources
407 *
408 * @pre `status == true`.
409 * @pre `event_log == true`.
410 * @throws std::invalid_argument if arguments are invalid.
411 */
412 static std::shared_ptr<OcmDaqController> Create(boost::asio::io_context& io_context,
413 fits::KeywordFormatter const& kw_formatter,
414 DaqContext context,
415 DaqSources const& sources,
416 std::shared_ptr<ObservableStatus> status,
417 std::shared_ptr<ObservableEventLog> event_log,
418 OcmAsyncOperations operations);
419
420 boost::future<State> StartAsync() override;
421 boost::future<Status> StopAsync(ErrorPolicy policy) override;
422 boost::future<Status> AbortAsync(ErrorPolicy policy) override;
423 boost::future<State> ScheduleMergeAsync() override;
424 void UpdateKeywords(fits::KeywordVector const& keywords) override;
425 boost::future<State>
426 AwaitAsync(std::vector<std::string> sources, std::chrono::milliseconds timeout) override;
427
428 State GetState() const DAQ_NOEXCEPT override;
429
430 /**
431 * @returns Logger associated with this DaqController.
432 */
433 constexpr log4cplus::Logger const& GetLogger() const noexcept;
434
435protected:
436 struct NotStarted {};
437 struct Starting {};
438 struct Acquiring {};
439 struct Stopping {};
440 struct Stopped {};
441 struct Aborting {};
442 struct Aborted {};
443
445 std::variant<NotStarted, Starting, Acquiring, Stopping, Stopped, Aborting, Aborted>;
446 StateVariant MakeState(State s) const noexcept;
447#ifdef UNIT_TEST
448public:
449#endif
450 // Protected constructor so user is forced to use shared pointer semantics
451 // which is required because of future continuation style programming.
452 OcmDaqController(boost::asio::io_context& io_context,
453 fits::KeywordFormatter const& kw_formatter,
454 DaqContext context,
455 DaqSources const& sources,
456 std::shared_ptr<ObservableStatus> status,
457 std::shared_ptr<ObservableEventLog> event_log,
459#ifdef UNIT_TEST
460private:
461#endif
462 /**
463 * Constructs the parameters used for asynchronous operations.
464 *
465 * @note OcmAsyncOpParams will bind references to member variables so caller must
466 * guarantee that DaqController outlives the async operation.
467 * This is normally done by holding a shared copy in the last .then continuation
468 * for the async operation, thus guaranteeing that all intermediate continuations
469 * will access a valid object.
470 */
472 op::AwaitOpParams MakeAwaitParams(op::AlertState&);
473
474 /**
475 * Initiate async operation that triggers DAQ stop automatically.
476 *
477 * If there is no condition for automatic stop (e.g. meta data sources only) this method does
478 * nothing.
479 */
480 void InitiateStopCondition();
481 /**
482 * Cancels the awaiting of primary sources.
483 *
484 * This does nothing if no await process is ongoing.
485 */
486 void CancelAwaitPrimarySources();
487
488 void AddInitialKeywords();
489
490 /**
491 * Queries if ctx has stateful sources (primary or metadata source).
492 *
493 * @return true if ctx has 1 or more of primary or metadata sources.
494 * @returns false otherwise.
495 */
496 auto HasStatefulSources() noexcept -> bool;
497
498 /**
499 * Set alert with category alert::DAQ_CONTROLLER.
500 */
501 void SetAlert(std::string key, std::string description) noexcept;
502 void SetState(StateVariant&& s) noexcept;
503
504 std::optional<std::variant<Source<PrimSource>*, Source<MetaSource>*>>
505 FindSource(std::string_view source_id);
506
507 /// Helper to build source vector
508 template <class SourceType>
509 std::vector<Source<SourceType>> MakeSources(std::vector<SourceType> sources);
510
511 fits::KeywordFormatter const& m_kw_formatter;
513
514 std::vector<Source<PrimSource>> m_prim_sources; ///< Note: Consider vector immutable!
515 std::vector<Source<MetaSource>> m_meta_sources; ///< Note: Consider vector immutable!
516
518 std::shared_ptr<PendingReplies> m_pending_replies;
519 std::vector<std::unique_ptr<boost::asio::steady_timer>> m_timers;
520
521 /**
522 * If DaqController is awaiting the completion of primary data sources
523 * this function will hold the abort function.
524 *
525 * @important Users must check if it is valid before invoking it.
526 */
527 std::function<bool()> m_abort_await_primary_sources;
528 log4cplus::Logger m_logger;
529};
530
531/**
532 * Implements behaviour from the state NotScheduled to Completed.
533 *
534 * Once successfully scheduled to be merged by DPM (NotScheduled -> Scheduled) the "location" for
535 * the *true* DAQ state is also transferred. This means:
536 *
537 * - If DPM is offline:
538 * - OCM cannot know the true state of DAQ.
539 * - Aborting with strict error policy cannot be done.
540 * - Aborting with tolerant policy will forcefully abort DAQ locally but the DAQ may anyway be
541 * completed by DPM.
542 *
543 * DpmDaqController is responsible for requesting last known status from DPM (in state Scheduled and
544 * onwards). This is done with the DPM client which aggregates all instances and manages retries.
545 */
547public:
548 /**
549 * Constructor
550 *
551 * @important DpmDaqController must only be used with shared_ptr.
552 *
553 * @param io_context Used for async operations.
554 * @param context DAQ context.
555 * @param status DAQ status.
556 * @param dpm_client Interface to DPM. During construction DpmDaqController will register
557 * signal slot to receive DAQ status updates and mirror that in status accordingly.
558 *
559 * @throws std::invalid_argument if state of status is not NotScheduled or subsequent.
560 */
561 DpmDaqController(boost::asio::io_context& io_context,
562 DaqContext context,
563 std::shared_ptr<ObservableStatus> status,
564 std::shared_ptr<ObservableEventLog> event_log,
565 std::shared_ptr<DpmClient> dpm_client);
566
567 boost::future<State> ScheduleMergeAsync() override;
568
569 /**
570 * @returns future containing std::runtime_error because operation is always invalid.
571 */
572 boost::future<State> StartAsync() override;
573 /**
574 * @returns future containing std::runtime_error because operation is always invalid.
575 */
576 boost::future<Status> StopAsync(ErrorPolicy policy) override;
577
578 /**
579 * @throws std::runtime_error because operation is always invalid (keywords cannot be modified
580 * once delivered to DPM).
581 */
582 void UpdateKeywords(fits::KeywordVector const& keywords) override;
583
584 /**
585 * @returns future containing std::runtime_error because operation is always invalid.
586 */
587 boost::future<State>
588 AwaitAsync(std::vector<std::string> sources, std::chrono::milliseconds timeout) override;
589
590 /**
591 * Attempts to abort Data Acquisition.
592 *
593 * - If Data Acquisition is not-yet-scheduled it will be aborted immediately.
594 * - If Data Acquisition *may* have been scheduled (request has been sent to DPM) the request is
595 * forwarded to DPM.
596 *
597 * - If DPM is not running or otherwise fails the DAQ is still marked as aborted if policy is
598 * ErrorPolicy::Tolerant.
599 */
600 boost::future<Status> AbortAsync(ErrorPolicy policy) override;
601
602 State GetState() const DAQ_NOEXCEPT override;
603
604private:
605 struct NotScheduled {
606 /**
607 * We impose limit so that at most a single request to schedule merge can be outstanding.
608 * If a reply is pending this is indicated by the optional containing a future.
609 *
610 * This is valid in state NotScheduled.
611 */
612 bool schedule_reply_pending;
613 };
614
615 /**
616 * Changes m_state_ctx as necessary based on current state.
617 */
618 void UpdateStateContext();
619 void SetState(State state);
620
621 using StateVariant = std::variant<std::monostate, NotScheduled>;
622
623 /**
624 * An extra liveness object.
625 */
626 std::shared_ptr<bool> m_liveness;
627 std::shared_ptr<DpmClient> m_dpm_client;
628 /**
629 * Connection to disconnect slot from DpmClient::StatusSignal
630 */
631 boost::signals2::scoped_connection m_status_connection;
632
633 // "Cached" representation of the data product specification created from local DaqContect
634 // It is created on the fly in ScheduleMergeAsync as necessary.
635 std::optional<std::string> m_dp_spec;
636 StateVariant m_state_ctx;
637 log4cplus::Logger m_logger;
638};
639
640} // namespace daq
641
642template <>
643struct fmt::formatter<daq::DaqController> : ostream_formatter {};
644
645template <typename T, typename Char>
646struct fmt::formatter<T, Char, std::enable_if_t<std::is_convertible_v<T*, daq::DaqController*>>>
647 : ostream_formatter {};
648
649#endif // OCF_DAQ_DAQ_CONTROLLER_HPP_
Implements common behaviour of OcmDaqController and DpmDaqController.
ObservableStatus & GetStatusRef() noexcept
void AddEvent(Args &&... args)
ObservableStatus const & GetStatusRef() const noexcept
boost::asio::io_context & GetIoCtx() noexcept
rad::IoExecutor & GetIoExecutor() noexcept
DaqContext & GetContextMut() noexcept
ObservableEventLog & GetEventLogRef() noexcept
Default factory producing "real" implementations.
Abstract factory for DaqControllers.
virtual auto MakeDpmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log) -> std::shared_ptr< DaqController >=0
Create instance for the DPM phase of the DAQ process.
virtual auto MakeOcmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, fits::KeywordFormatter const &kw_formatter) -> std::shared_ptr< DaqController >=0
Create instance for the OCM phase of the DAQ process.
Controls the execution of single data acquisition that ultimately result in a set of FITS keywords an...
virtual boost::future< State > StartAsync()=0
Starts the data acquisition.
virtual boost::future< Status > AbortAsync(ErrorPolicy policy)=0
Aborts the data acquisition.
virtual boost::future< Status > StopAsync(ErrorPolicy policy)=0
Stops the data acquisition.
virtual void UpdateKeywords(fits::KeywordVector const &keywords)=0
Updates (replace or add) list of keywords.
virtual boost::future< State > ScheduleMergeAsync()=0
Schedules DAQ for merging by sending request to DPM.
virtual ~DaqController()=default
virtual boost::future< State > AwaitAsync(std::vector< std::string > sources, std::chrono::milliseconds timeout)=0
Awaits that data acquisition stops or aborts.
boost::signals2::signal< void(DaqContext const &)> ContextSignal
DaqController()=default
virtual State GetState() const DAQ_NOEXCEPT=0
Data acquisition sources.
Definition: source.hpp:186
Implements behaviour from the state NotScheduled to Completed.
Keeps relevant state to be able to communicate with a primary data source.
Definition: source.hpp:142
Stores data acquisition status and allows subscription to status changes.
Definition: eventLog.hpp:107
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:224
Implements daq::DaqController for states responsible to be executed by OCM.
std::variant< NotStarted, Starting, Acquiring, Stopping, Stopped, Aborting, Aborted > StateVariant
Simple class that allows you to keep track of how many replies are pending.
Keeps relevant state to be able to communicate with a primary data source.
Definition: source.hpp:98
Formats keyword against e.g.
Definition: keyword.hpp:551
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
#define DAQ_NOEXCEPT
Definition: config.hpp:16
Contains declaration of daq::Context.
Contains declaration for DpPart.
Contains error related declarations for DAQ.
Contains declaration for EventLog, ObservableEventLog and related events.
Contains declarations for the helper functions to initiate operations.
Declares daq::State and related functions.
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
Definition: keyword.hpp:423
void UpdateKeywords(fits::KeywordVector &out, fits::KeywordVector const &in, fits::KeywordFormatter const &fmt)
Updates (adds or replaces) primary HDU keywords.
Definition: daqContext.cpp:24
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
Definition: conversion.cpp:77
std::vector< DpPart > DpParts
Definition: dpPart.hpp:66
ErrorPolicy
Error policy supported by certain operations.
Definition: error.hpp:26
State
Observable states of the data acquisition process.
Definition: state.hpp:41
@ NotScheduled
Before daq is acknowledged by dpm it remains in NotScheduled.
Utility class that represents a result and an error.
Definition: utility.hpp:17
Config class header file.
Contains declaration for classes related to pending replies.
Declarations for daq::Source and related classes.
Contains declaration for Status and ObservableStatus.
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:42
OCM Async operations.
bool IsValid() const noexcept
OcmAsyncOperations & operator=(OcmAsyncOperations const &)=default
std::function< boost::future< Result< void > >(ErrorPolicy, op::AsyncOpParams)> abort
std::reference_wrapper< rad::IoExecutor > executor
std::function< AwaitReturnType(op::AwaitOpParams)> await_prim
std::function< boost::future< void >(op::AsyncOpParams)> start
OcmAsyncOperations(OcmAsyncOperations &&)=default
std::pair< boost::future< Result< DpParts > >, std::function< bool()> > AwaitReturnType
OcmAsyncOperations(OcmAsyncOperations const &)=default
OcmAsyncOperations & operator=(OcmAsyncOperations &&)=default
std::function< boost::future< Result< DpParts > >(ErrorPolicy, op::AsyncOpParams)> stop
Simple class that holds the source and associated state.
Definition: source.hpp:30
Parameters required for each async operation.
Await specific parameters that is not provided with AsyncOpParams.
Declaration of utilities.