ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
start.cpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup daq_op
4 * @copyright 2022 ESO - European Southern Observatory
5 *
6 * @brief Contains definition for the StartAsync operation
7 */
8#include <daq/op/start.hpp>
9
10#include <Metadaqif.hpp>
11#include <boost/assert.hpp>
12#include <fmt/format.h>
13#include <fmt/ostream.h>
14#include <log4cplus/loggingmacros.h>
15
16#include <daq/op/util.hpp>
17#include <daq/state.hpp>
18
19namespace {
20class RecPropertiesImpl : public recif::RecProperties {
21public:
22 double getAbsTime() const override {
23 return 0.0;
24 }
25 void setAbsTime(double abs_time) override {
26 }
27
28 std::string getId() const override {
29 return m_id;
30 }
31 void setId(const std::string& id) override {
32 m_id = id;
33 }
34
35 std::string getInfo() const override {
36 return "";
37 }
38 void setInfo(const std::string& info) override {
39 }
40
41 std::vector<std::string> getPublishers() const override {
42 return {};
43 }
44 void setPublishers(const std::vector<std::string>& publishers) override {
45 }
46 bool hasKey() const override {
47 return false;
48 }
49 std::unique_ptr<recif::RecProperties> cloneKey() const override {
50 throw std::runtime_error("not clonable");
51 }
52 std::unique_ptr<recif::RecProperties> clone() const override {
53 throw std::runtime_error("not clonable");
54 }
55 bool keyEquals(const recif::RecProperties& other) const override {
56 return false;
57 }
58
59private:
60 std::string m_id;
61};
62} // namespace
63
64namespace daq::op {
65
66StartAsync::StartAsync(AsyncOpParams params_arg) noexcept : m_params(params_arg) {
67}
68
69boost::future<void> StartAsync::Initiate() {
70 // Start metadata then primary
71 return StartMeta()
72 .then(m_params.executor,
73 [this](boost::future<void>&& f) -> boost::future<void> {
74 // Simply propagate errors, if any
75 if (f.has_exception()) {
76 LOG4CPLUS_INFO(m_params.logger,
77 fmt::format("{}: StartAsync: StartAsync metadaq "
78 "failed. Will not start primary data acquisition.",
79 m_params.status));
80 // Note: Can't throw directly here as we'll unwrap which then discards that
81 // exception
82 return std::move(f);
83 }
84 // If metadaq was started ok, start primary
85 return StartPrim();
86 })
87 .unwrap(); // unwrap outer future since we actually want the inner one.
88}
89
90boost::future<void> StartAsync::StartMeta() {
91 return SendRequestAndCollectReplies<void>(
92 m_params.meta_sources.begin(),
93 m_params.meta_sources.end(),
94 // predicate
95 [](auto&) { return true; },
96 m_params,
97 // Sender
98 [id = m_params.id](Source<MetaSource>& s) {
99 s.SetState(State::Starting, false);
100 return s.GetSource().GetRrClient().StartDaq(id);
101 },
102 // reply handler:
103 // We transform the return type to void here since there's nothing to keep
104 // in the reply
105 [](AsyncOpParams params,
106 Source<MetaSource>& source,
107 boost::future<std::shared_ptr<metadaqif::DaqReply>>&& fut) -> void {
108 HandleMetaDaqReply<std::shared_ptr<metadaqif::DaqReply>>("StartDaq",
109 State::Starting,
110 State::Acquiring,
111 State::NotStarted,
112 params,
113 source,
114 std::move(fut),
115 {});
116 },
117 std::string_view("StartDaq() (start metadata acquisition)"))
118 .then(UnwrapVoidReplies); // @note: UnwrapVoidReplies is thread safe
119}
120
121boost::future<void> StartAsync::StartPrim() {
122 return SendRequestAndCollectReplies<void>(
123 m_params.prim_sources.begin(),
124 m_params.prim_sources.end(),
125 [](auto&) { return true; },
126 m_params,
127 // sender
128 [id = m_params.id](Source<PrimSource>& s) {
129 auto& client = s.GetSource().GetRrClient();
130#if defined(UNIT_TEST)
131 // ECII-783: MAL suddenly requires that interface *must* be implemented by
132 // matching middleware otherwise it fails.
133 // To avoid complicated mocking we still use our local implementation for unit
134 // testing.
135 auto properties = std::make_shared<RecPropertiesImpl>();
136#else
137
138 auto mal = client.getMal();
139 BOOST_ASSERT_MSG(mal, "MAL RR client returned invalid MAL pointer");
140 auto properties = mal->createDataEntity<recif::RecProperties>();
141 // Initialize all members by assigning from local implementation
142 *properties = RecPropertiesImpl();
143#endif
144 properties->setId(id);
145 s.SetState(State::Starting, false);
146 return client.RecStart(properties);
147 },
148 // reply handler
149 [](AsyncOpParams params,
150 Source<PrimSource>& source,
151 boost::future<std::shared_ptr<recif::RecStatus>>&& fut) -> void {
152 HandlePrimDaqReply<std::shared_ptr<recif::RecStatus>>("RecStart",
153 State::Starting,
154 State::Acquiring,
155 State::NotStarted,
156 params,
157 source,
158 std::move(fut));
159 },
160 std::string_view("RecStart() (start primary data acquisition)"))
161 .then(UnwrapVoidReplies); // @note: UnwrapVoidReplies is thread safe
162}
163
164} // namespace daq::op
Declares daq::State and related functions.
Contains declaration for the StartAsync operation.
Simple class that holds the source and associated state.
Definition: source.hpp:30
Parameters required for each async operation.
rad::IoExecutor & executor
StartAsync(AsyncOpParams params) noexcept
Definition: start.cpp:66
boost::future< void > Initiate()
Initiates operation that starts metadata acquisition.
Definition: start.cpp:69
Contains declaration for the async op utilities.