ifw-daq  1.0.0
IFW Data Acquisition modules
manager.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_ocm_libdaq
4  * @copyright 2021 ESO - European Southern Observatory
5  *
6  * @brief Definition of `daq::ManagerImpl` and related utilities.
7  */
8 #include <daq/manager.hpp>
9 
10 #include <time.h>
11 #include <algorithm>
12 #include <stdexcept>
13 
14 #include <log4cplus/loggingmacros.h>
15 #include <fmt/format.h>
16 #include <fmt/ostream.h>
17 #include <mal/Mal.hpp>
18 
19 #include <daq/daqController.hpp>
20 #include <daq/status.hpp>
21 
22 #include <daq/op/initiate.hpp>
23 #include <daq/op/awaitState.hpp>
24 
25 namespace daq {
26 
27 std::string MakeDaqIdCandidate(char const* instrument_id, unsigned jitter) {
28  // 'KMOS.2017-06-02T16:45:55.701'
29  // olas-id must be <= 5 characters
30  // Format: <olas>-<strfmtime>.<frac>
31  // Size: 1-5 1 20 1 3 = 30
32  // chrono formatting is not provided until C++20
33  struct timeval tv;
34  struct timeval jitter_tv {0, jitter * 1000}; // 1 ms
35  if (gettimeofday(&tv, nullptr) != 0) {
36  // GCOVR_EXCL_START
37  throw std::system_error(errno, std::generic_category());
38  // GCOVR_EXCL_STOP
39  }
40  // Add jitter
41  struct timeval res;
42  timeradd(&tv, &jitter_tv, &res);
43  tv = res;
44 
45  struct tm tm_time;
46  time_t time = tv.tv_sec;
47  if (gmtime_r(&time, &tm_time) == nullptr) {
48  // GCOVR_EXCL_START
49  throw std::system_error(errno, std::generic_category());
50  // GCOVR_EXCL_STOP
51  }
52  char time_str[31] = {0};
53  int n = snprintf(&time_str[0], 7, "%.5s.", instrument_id);
54  // This part is always 20 characters long
55  strftime(&time_str[n], 20, "%Y-%m-%dT%H:%M:%S", &tm_time);
56  char frac_str[5];
57  snprintf(&frac_str[0], 5, ".%.3d", static_cast<int>(tv.tv_usec / 1000.0));
58  // Append the fractional part
59  strncpy(&time_str[n + 19], &frac_str[0], 4);
60  return std::string(time_str, n + 23);
61 }
62 
63 ManagerImpl::ManagerImpl(rad::IoExecutor& executor, std::string instrument_id)
64  : m_alive_token(std::make_shared<bool>())
65  , m_executor(executor)
66  , m_instrument_id(std::move(instrument_id))
67  , m_logger(log4cplus::Logger::getInstance("daq.manager")) {
68 }
69 
70 
72  // Abort any ongoing operations
73  for (auto& op : m_abort_funcs) {
74  try {
75  op.Abort();
76  } catch (std::exception const& e) {
77  LOG4CPLUS_WARN(m_logger,
78  fmt::format("ManagerImpl::~ManagerImpl: Error when aborting "
79  "operation {}: {}",
80  op.GetId(),
81  e.what()));
82  }
83  }
84 }
85 
86 
87 std::string ManagerImpl::MakeDaqId() const {
88  for (unsigned jitter=0; ; ++jitter) {
89  auto id_candidate = daq::MakeDaqIdCandidate(m_instrument_id.c_str(), jitter);
90  if (!HaveDaq(id_candidate)) {
91  return id_candidate;
92  }
93  }
94 }
95 
96 bool ManagerImpl::HaveDaq(std::string_view id) const noexcept {
97  return FindDaq(id);
98 }
99 
100 void ManagerImpl::AddDaq(std::shared_ptr<DaqController> daq) {
101  if (HaveDaq(daq->GetId())) {
102  throw std::invalid_argument("DaqController with same id already exists");
103  }
104  m_daq_controllers.emplace_back(daq->GetId(),
105  daq,
106  daq->GetStatus()->ConnectObserver(
107  [this](ObservableStatus const& status) {
108  this->m_status_signal.Signal(status);
109  }));
110  // Notify observers that DAQ was added.
111  m_status_signal.Signal(*daq->GetStatus());
112 }
113 
114 Status ManagerImpl::GetStatus(std::string_view id) const {
115  auto& daq = FindDaqOrThrow(id);
116  return daq.GetStatus()->GetStatus();
117 }
118 
119 ManagerImpl::Daq::Daq(std::string id_arg,
120  std::shared_ptr<DaqController> controller_arg,
121  boost::signals2::connection connection_arg) noexcept
122  : id(std::move(id_arg))
123  , controller(std::move(controller_arg))
124  , connection(std::move(connection_arg)) {
125 }
126 
127 DaqController const* ManagerImpl::FindDaq(std::string_view id) const noexcept {
128  return const_cast<ManagerImpl*>(this)->FindDaq(id);
129 }
130 
131 DaqController* ManagerImpl::FindDaq(std::string_view id) noexcept {
132  auto it = std::find_if(m_daq_controllers.begin(),
133  m_daq_controllers.end(),
134  [id](auto const& daq) { return daq.id == id; });
135  if (it != m_daq_controllers.end()) {
136  return it->controller.get();
137  }
138  return nullptr;
139 }
140 
141 DaqController& ManagerImpl::FindDaqOrThrow(std::string_view id) {
142  auto daq_ptr = FindDaq(id);
143 
144  if (!daq_ptr) {
145  throw boost::enable_current_exception(std::invalid_argument(
146  fmt::format("DaqController with id '{}' does not exist", std::string(id))));
147  }
148  return *daq_ptr;
149 }
150 
151 DaqController const& ManagerImpl::FindDaqOrThrow(std::string_view id) const {
152  return const_cast<ManagerImpl*>(this)->FindDaqOrThrow(id);
153 }
154 
155 boost::future<State> ManagerImpl::StartDaqAsync(std::string_view id) {
156  try {
157  return FindDaqOrThrow(id).StartAsync();
158  } catch (...) {
159  return boost::make_exceptional_future<State>();
160  }
161 }
162 
163 boost::future<Status> ManagerImpl::StopDaqAsync(std::string_view id, ErrorPolicy policy) {
164  try {
165  return FindDaqOrThrow(id).StopAsync(policy);
166  } catch (...) {
167  return boost::make_exceptional_future<Status>();
168  }
169 }
170 
171 boost::future<Status> ManagerImpl::AbortDaqAsync(std::string_view id, ErrorPolicy policy) {
172  try {
173  return FindDaqOrThrow(id).AbortAsync(policy);
174  } catch (...) {
175  return boost::make_exceptional_future<Status>();
176  }
177 }
178 
179 boost::future<Result<Status>> ManagerImpl::AwaitDaqStateAsync(std::string_view id,
180  State state,
181  std::chrono::milliseconds timeout) {
182  try {
183  auto& daq = FindDaqOrThrow(id);
184  auto status = daq.GetStatus();
185  if (!IsSubsequentState(state, daq.GetState())) {
186  LOG4CPLUS_INFO(m_logger,
187  fmt::format("{}: Await condition already fulfilled.", *status));
188  // Condition already fulfilled.
189  return boost::make_ready_future<Result<Status>>({false, *status});
190  } else {
191  auto [fut, abort] = op::InitiateAbortableOperation<op::AwaitStateAsync>(
192  m_executor.get_io_context(), status, state, timeout);
193  // Store abort function so when Manager is deleted it will
194  auto& ref = m_abort_funcs.emplace_back(std::move(abort));
195  LOG4CPLUS_DEBUG("daq.manager",
196  fmt::format("op::AwaitStateAsync initiated. id={}",
197  ref.GetId()));
198  return fut.then(
199  m_executor,
200  [this, id = ref.GetId(), alive = std::weak_ptr<bool>(m_alive_token)](auto res) {
201  LOG4CPLUS_DEBUG("daq.manager",
202  fmt::format("op::AwaitStateAsync completed. id={}",
203  id));
204  // Remove abort function since operation completed, but only if
205  // object is alive.
206  auto is_alive = !alive.expired();
207  if (is_alive) {
208  // Manager is still alive, so we remove abort function
209  RemoveAbortFunc(id);
210  }
211  return res.get();
212  });
213  }
214  } catch (...) {
215  return boost::make_exceptional_future<Result<Status>>();
216  }
217 }
218 
219 void ManagerImpl::UpdateKeywords(std::string_view id, fits::KeywordVector const& keywords) {
220  return FindDaqOrThrow(id).UpdateKeywords(keywords);
221 }
222 
223 StatusSignal& ManagerImpl::GetStatusSignal() {
224  return m_status_signal;
225 }
226 
227 std::vector<std::shared_ptr<DaqController const>> ManagerImpl::GetDaqControllers() {
228  std::vector<std::shared_ptr<DaqController const>> controllers;
229  controllers.reserve(m_daq_controllers.size());
230  std::transform(m_daq_controllers.begin(),
231  m_daq_controllers.end(),
232  std::back_inserter(controllers),
233  [](auto const& daq) { return daq.controller; });
234  return controllers;
235 }
236 
237 void ManagerImpl::RemoveAbortFunc(uint64_t id) noexcept {
238  try {
239  m_abort_funcs.erase(std::remove_if(m_abort_funcs.begin(),
240  m_abort_funcs.end(),
241  [id](auto const& obj) { return id == obj.GetId(); }),
242  m_abort_funcs.end());
243  } catch(...) {}
244 }
245 
246 ManagerImpl::OpAbortFunc::OpAbortFunc(OpAbortFunc::Func&& func)
247  : m_id(NextId())
248  , m_func(std::move(func)) {
249  assert(m_func);
250 }
251 
252 uint64_t ManagerImpl::OpAbortFunc::GetId() const noexcept {
253  return m_id;
254 }
255 
256 
257 bool ManagerImpl::OpAbortFunc::Abort() noexcept {
258  return m_func();
259 }
260 
261 uint64_t ManagerImpl::OpAbortFunc::NextId() {
262  static uint64_t next_id = 0;
263  return next_id++;
264 }
265 
266 } // namespace daq
rad::IoExecutor::get_io_context
boost::asio::io_context & get_io_context() noexcept
Not part of the boost::thread::executor concept.
Definition: ioExecutor.hpp:41
daq::State
State
Observable states of the data acquisition process.
Definition: state.hpp:41
initiate.hpp
Contains declarations for the helper functions to initiate operations.
awaitState.hpp
Contains declaration for the AwaitStateAsync operation.
daq::ManagerImpl::AbortDaqAsync
boost::future< Status > AbortDaqAsync(std::string_view id, ErrorPolicy policy) override
Abort DaqController identified by id.
Definition: manager.cpp:171
daq::StatusSignal::Signal
void Signal(ObservableStatus const &status)
Definition: manager.hpp:55
rad::IoExecutor
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
daq::ManagerImpl::MakeDaqId
std::string MakeDaqId() const override
Creates a new unique identifier based on the instrumend id and current time.
Definition: manager.cpp:87
manager.hpp
Declaration of daq::Manager
daq::ObservableStatus
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:68
daq::fits::UpdateKeywords
void UpdateKeywords(KeywordVector &to, KeywordVector const &from)
Updates a with keywords from b.
Definition: keyword.cpp:120
daq::DaqController::StartAsync
virtual boost::future< State > StartAsync()=0
Starts the data acquisition.
daq::DaqController::AbortAsync
virtual boost::future< Status > AbortAsync(ErrorPolicy policy)=0
Aborts the data acquisition.
daq
Definition: daqController.cpp:18
daq::StatusSignal
Observes any status.
Definition: manager.hpp:46
daq::ManagerImpl::AwaitDaqStateAsync
boost::future< Result< Status > > AwaitDaqStateAsync(std::string_view id, State, std::chrono::milliseconds timeout) override
Await DAQ state.
Definition: manager.cpp:179
daq::ManagerImpl::ManagerImpl
ManagerImpl(rad::IoExecutor &executor, std::string instrument_id)
Definition: manager.cpp:63
daq::IsSubsequentState
bool IsSubsequentState(State state, State after) noexcept
Compares states and returns whether state occurs after after.
Definition: state.cpp:25
daq::ManagerImpl::StopDaqAsync
boost::future< Status > StopDaqAsync(std::string_view id, ErrorPolicy policy) override
Stop DaqController identified by id.
Definition: manager.cpp:163
daq::ManagerImpl::~ManagerImpl
~ManagerImpl() noexcept
Definition: manager.cpp:71
daqController.hpp
Contains declaration for for DaqController.
daq::ManagerImpl::StartDaqAsync
boost::future< State > StartDaqAsync(std::string_view id) override
Start DaqController identified by id.
Definition: manager.cpp:155
status.hpp
Contains declaration for Status and ObservableStatus.
daq::ManagerImpl::AddDaq
void AddDaq(std::shared_ptr< DaqController > daq) override
Add data acquisition.
Definition: manager.cpp:100
daq::Status
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:32
daq::fits::KeywordVector
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
Definition: keyword.hpp:138
daq::ManagerImpl::GetStatus
Status GetStatus(std::string_view id) const override
Get status.
Definition: manager.cpp:114
daq::ErrorPolicy
ErrorPolicy
Error policy supported by certain operations.
Definition: error.hpp:25
daq::MakeDaqIdCandidate
std::string MakeDaqIdCandidate(char const *instrument_id, unsigned jitter=0)
Creates a DAQ id candidate that may or may not be unique.
Definition: manager.cpp:27
daq::ManagerImpl::HaveDaq
bool HaveDaq(std::string_view id) const noexcept override
Query existing data acquisition by id.
Definition: manager.cpp:96
daq::DaqController::StopAsync
virtual boost::future< Status > StopAsync(ErrorPolicy policy)=0
Stops the data acquisition.