ifw-daq  3.0.0-dev
IFW Data Acquisition modules
ocmDaqService.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_ocm_server
4  * @copyright 2021 ESO - European Southern Observatory
5  *
6  * @brief Declaration of OcmDaqService
7  */
8 #include "ocmDaqService.hpp"
9 
10 #include <algorithm>
11 #include <fmt/format.h>
12 #include <fmt/ostream.h>
13 #include <log4cplus/loggingmacros.h>
14 #include <nlohmann/json.hpp>
15 
16 #include <daq/daqController.hpp>
17 #include <daq/fits/json.hpp>
18 #include <ocmif/conversion.hpp>
19 
20 
21 using boost::enable_current_exception;
22 using boost::make_exceptional_future;
23 using fmt::format;
24 using std::chrono::duration_cast;
25 using std::chrono::milliseconds;
26 
27 namespace {
28 
29 /**
30  * Create meta source instances from user provided string
31  *
32  * {
33  "$schema": "http://json-schema.org/draft-07/schema#",
34  "type": "object",
35  "properties": {
36  "name": {
37  "type": "object",
38  "properties": {
39  "uri": {
40  "type": "string",
41  }
42  }
43  }
44  }
45 }
46  * {"source-id/name" : source-obj }
47  */
48 std::vector<daq::MetaSource> ParseMetaSource(mal::Mal& mal, std::string const& str) {
49  std::vector<daq::MetaSource> sources;
50 
51  std::vector<ParsedSource> raw_sources = ParseSourceUris(str);
52  sources.reserve(raw_sources.size());
53  for (auto const& raw : raw_sources) {
54  sources.emplace_back(
55  raw.name, mal.getClient<daq::MetaSource::RrClient>(mal::Uri(raw.rr_uri), {}, {}));
56  }
57 
58  return sources;
59 }
60 
61 std::vector<daq::PrimSource> ParsePrimSource(mal::Mal& mal, std::string const& str) {
62  std::vector<daq::PrimSource> sources;
63 
64  std::vector<ParsedSource> raw_sources = ParseSourceUris(str);
65  sources.reserve(raw_sources.size());
66  for (auto const& raw : raw_sources) {
67  sources.emplace_back(
68  raw.name, mal.getClient<daq::PrimSource::RrClient>(mal::Uri(raw.rr_uri), {}, {}));
69  }
70 
71  return sources;
72 }
73 
74 } // namespace
75 
76 daq::DaqProperties ParseStartDaqProperties(std::string const& json_properties) {
77  using std::chrono::milliseconds;
78  using std::chrono::duration_cast;
79  using Seconds = std::chrono::duration<double>;
80  daq::DaqProperties properties;
81 
82  if (json_properties.empty()) {
83  // No arguments
84  return properties;
85  }
86 
87  auto json = nlohmann::json::parse(json_properties);
88  if (!json.is_object()) {
89  throw std::invalid_argument(
90  fmt::format("expectd type object but got type {}", json.type_name()));
91  }
92  if (json.contains("keywords")) {
93  properties.keywords = daq::fits::ParseJsonKeywords(json["keywords"]);
94  }
95  if (json.contains("awaitInterval")) {
96  auto& value = json["awaitInterval"];
97  if (!value.is_number()) {
98  throw std::invalid_argument(fmt::format("'awaitInterval': unsupported type: {}",
99  value.type_name()));
100  }
101  auto await_interval = value.get<double>();
102  if (await_interval < 0.0) {
103  throw std::invalid_argument(
104  fmt::format("'awaitInterval' must be positive number, got {}", await_interval));
105  }
106  properties.await_interval = duration_cast<milliseconds>(Seconds(value.get<double>()));
107  }
108  return properties;
109 }
110 
111 bool ParsedSource::operator==(ParsedSource const& rhs) const {
112  return name == rhs.name && rr_uri == rhs.rr_uri;
113 }
114 
115 ParsedSource::ParsedSource(std::string name, std::string rr_uri)
116  : name(std::move(name)), rr_uri(std::move(rr_uri)) {
117 }
118 std::ostream& operator<<(std::ostream& os, ParsedSource const& s) {
119  os << "name: '" << s.name << "', rr_uri='" << s.rr_uri << "'";
120  return os;
121 }
122 
123 ParsedSource ParseSourceUri(std::string_view s) {
124  auto start = s.find_first_not_of(' ');
125  auto name_end_pos = s.find_first_of('@');
126 
127  if (name_end_pos == std::string_view::npos) {
128  throw std::invalid_argument(
129  "separator '@' not found in expected format 'name@rr-uri'");
130  }
131  auto name = s.substr(start, name_end_pos - start);
132  if (name.empty()) {
133  throw std::invalid_argument("name part in 'name@rr-uri' is empty");
134  }
135 
136  start = name_end_pos + 1;
137  if (start >= s.size()) {
138  throw std::invalid_argument("invalid format string, expected format 'name@rr-uri'");
139  }
140  auto rr_uri_end_pos = s.find_first_of(" ,", start);
141  if (name_end_pos == std::string_view::npos) {
142  throw std::invalid_argument("separator ',' not found");
143  }
144 
145  auto rr_uri = s.substr(start, rr_uri_end_pos - start);
146  if (rr_uri.empty()) {
147  throw std::invalid_argument("rr_uri part in 'name@rr-uri' is empty");
148  }
149  return ParsedSource(std::string(name), std::string(rr_uri));
150 }
151 /**
152  * Parse user provided string in the format
153  * "<name>@<rr-uri>[ <name>@...]"
154  *
155  * @throw std::invalid_argument on errors.
156  */
157 std::vector<ParsedSource> ParseSourceUris(std::string_view s) {
158  std::vector<ParsedSource> result;
159  size_t begin = 0;
160 
161  while (begin < s.size()) {
162  const auto end = s.find_first_of(' ', begin);
163 
164  if (begin != end) {
165  result.emplace_back(ParseSourceUri(s.substr(begin, end - begin)));
166  }
167 
168  if (end == std::string_view::npos) {
169  break;
170  }
171 
172  begin = end + 1;
173  }
174 
175  return result;
176 }
177 
178 
179 OcmDaqService::OcmDaqService(boost::asio::io_context& io_ctx, mal::Mal& mal, daq::Manager& mgr,
180  std::string output_path)
181  : m_io_ctx(io_ctx)
182  , m_executor(m_io_ctx)
183  , m_mal(mal)
184  , m_mgr(mgr)
185  , m_output_path(std::move(output_path))
186  , m_event_log(std::make_shared<daq::ObservableEventLog>())
187  , m_log_observer_connection()
188  , m_log_observer(log4cplus::Logger::getInstance("daq.eventlog"))
189  , m_logger(log4cplus::Logger::getInstance(LOGGER_NAME)) {
190  m_log_observer_connection =
191  m_event_log->ConnectObserver(std::reference_wrapper(m_log_observer));
192 }
193 
195  m_log_observer_connection.disconnect();
196 }
197 
198 boost::future<std::shared_ptr<::ocmif::DaqReply>>
199 OcmDaqService::StartDaq(const std::string& id,
200  const std::string& file_prefix,
201  const std::string& primary_sources,
202  const std::string& metadata_sources,
203  const std::string& json_properties) {
204  return boost::
205  async(m_executor,
206  [=, self = shared_from_this()]() mutable {
207  self->m_event_log->AddEvent(daq::UserActionEvent(
208  id,
209  fmt::format("Request received: "
210  "StartDaq(id='{0}', file_prefix='{1}', "
211  "primary_sources='{2}', metadata_sources='{3}', "
212  "json_properties='{4}'",
213  id,
214  file_prefix,
215  primary_sources,
216  metadata_sources,
217  json_properties),
218  std::nullopt));
219  // Parse provided JSON
220  // Validation that require state is performed in m_executor for thread safety.
221  daq::DaqProperties properties;
222  try {
223  properties = ParseStartDaqProperties(json_properties);
224  } catch (std::exception const& e) {
225  auto msg =
226  fmt::format("Failed to parse StartDaq JSON properties: {}", e.what());
227  self->m_event_log->AddEvent(daq::ErrorEvent(id, msg, std::nullopt, "user"));
228  return boost::make_exceptional_future<
229  std::shared_ptr<::ocmif::DaqReply>>(
230  ocmif::DaqException(id, msg));
231  }
232 
233  auto validated_id = id;
234  if (validated_id.empty()) {
235  validated_id = self->m_mgr.MakeDaqId();
236  LOG4CPLUS_INFO(
237  self->m_logger,
238  fmt::format("StartDaq(id='{0}'): Created and assigned DAQ id='{0}'",
239  validated_id));
240  } else {
241  // Check that an instance does not already exist with id
242  if (self->m_mgr.HaveDaq(id)) {
243  LOG4CPLUS_INFO(
244  self->m_logger,
245  fmt::format("StartDaq(id='{0}'): DAQ with id='{0}' already exist",
246  validated_id));
247  return boost::make_exceptional_future<
248  std::shared_ptr<ocmif::DaqReply>>(ocmif::DaqException(
249  id, "Data acquisition with same id already exist"));
250  }
251  }
252 
253  try {
254  properties.id = validated_id;
255  properties.dp_name_prefix = file_prefix;
256  // Create primary sources
257  properties.prim_sources = ParsePrimSource(m_mal, primary_sources);
258  // Create metadata sources
259  properties.meta_sources = ParseMetaSource(m_mal, metadata_sources);
260 
261  if (properties.ocm_dppart_root.empty()) {
262  properties.ocm_dppart_root = m_output_path;
263  }
264 
265  // Status
266  auto status = std::make_shared<daq::ObservableStatus>(validated_id);
267 
268  // Operations
271  self->m_io_ctx, std::move(properties), status, m_event_log, ops);
272  assert(daq);
273 
274  self->m_mgr.AddDaq(daq);
275 
276  // Start
277  return self->m_mgr.StartDaqAsync(validated_id)
278  .then(
279  self->m_executor,
280  [weak_self = std::weak_ptr(self->shared_from_this()),
281  id = validated_id](boost::future<daq::State> f)
282  -> std::shared_ptr<ocmif::DaqReply> {
283  auto self = weak_self.lock();
284  if (!self) {
285  LOG4CPLUS_WARN(
286  LOGGER_NAME,
287  fmt::format("StartDaq(id='{}'): StartDaqAsync is "
288  "complete but MAL service has "
289  "been abandoned. Throwing exception.",
290  id));
291  throw boost::enable_current_exception(
292  ocmif::DaqException(id,
293  "Service has been abandoned"));
294  }
295  if (f.has_exception()) {
296  LOG4CPLUS_INFO(self->m_logger,
297  fmt::format("StartDaq(id='{}'): StartDaqAsync "
298  "completed with failure",
299  id));
300  throw boost::enable_current_exception(
301  ocmif::DaqException(id, "Start failed"));
302  }
303  auto rep = self->m_mal.createDataEntity<ocmif::DaqReply>();
304  assert(rep);
305  rep->setId(id);
306  rep->setError(false);
307  return rep;
308  });
309  } catch (std::invalid_argument const& e) {
310  LOG4CPLUS_INFO(self->m_logger,
311  fmt::format("StartDaq(id='{}'): Invalid argument error while "
312  "processing request: {}",
313  validated_id,
314  e.what()));
315  return boost::make_exceptional_future<std::shared_ptr<ocmif::DaqReply>>(
316  ocmif::DaqException(validated_id, e.what()));
317  } catch (std::exception const& e) {
318  LOG4CPLUS_INFO(self->m_logger,
319  fmt::format("StartDaq(id='{}'): Error while"
320  "processing request: {}",
321  validated_id,
322  e.what()));
323  return boost::make_exceptional_future<std::shared_ptr<ocmif::DaqReply>>(
324  ocmif::DaqException(validated_id, e.what()));
325  } catch (...) {
326  LOG4CPLUS_INFO(
327  self->m_logger,
328  fmt::format("StartDaq(id='{}'): Unknown error while processing request",
329  validated_id));
330  return boost::make_exceptional_future<std::shared_ptr<ocmif::DaqReply>>(
331  ocmif::DaqException(validated_id, "Uknown error"));
332  }
333  })
334  // unwrap outer future from async() to get the future we want from m_mgr.StartDaqAsync()
335  .unwrap();
336 }
337 
338 boost::future<std::shared_ptr<::ocmif::DaqReply>>
339 OcmDaqService::StopDaq(const std::string& id) {
340  return boost::async(
341  m_executor,
342  [self = shared_from_this(), id]() {
343  self->m_event_log->AddEvent(daq::UserActionEvent(id,
344  fmt::format("Request received: "
345  "StopDaq(id='{0}')",
346  id),
347  std::nullopt));
348  return self->StopDaq(id, false);
349  }).unwrap();
350 }
351 
352 boost::future<std::shared_ptr<::ocmif::DaqReply>>
353 OcmDaqService::ForceStopDaq(const std::string& id) {
354  return boost::async(
355  m_executor,
356  [self = shared_from_this(), id]() {
357  self->m_event_log->AddEvent(daq::UserActionEvent(id,
358  fmt::format("Request received: "
359  "ForceStopDaq(id='{0}')",
360  id),
361  std::nullopt));
362  return self->StopDaq(id, true);
363  }).unwrap();
364 }
365 
366 boost::future<std::shared_ptr<::ocmif::DaqReply>>
367 OcmDaqService::StopDaq(const std::string& id, bool forced) {
368  return boost::async(
369  m_executor,
370  [self = shared_from_this(), id, forced]() {
371  return self->m_mgr
372  .StopDaqAsync(id,
373  forced ? daq::ErrorPolicy::Tolerant : daq::ErrorPolicy::Strict)
374  .then(
375  self->m_executor,
376  [weak_self = std::weak_ptr(self->shared_from_this()),
377  id](boost::future<daq::Status> f)
378  -> std::shared_ptr<ocmif::DaqReply> {
379  auto self = weak_self.lock();
380  if (!self) {
381  LOG4CPLUS_WARN(LOGGER_NAME,
382  fmt::format("StopDaq(id='{}'): StopDaqAsync is "
383  "complete but MAL service has "
384  "been abandoned. Throwing exception.",
385  id));
386  throw boost::enable_current_exception(
387  ocmif::DaqException(id, "Service has been abandoned"));
388  }
389  if (f.has_exception()) {
390  throw boost::enable_current_exception(
391  ocmif::DaqException(id, "Stop failed"));
392  }
393  auto rep = self->m_mal.createDataEntity<ocmif::DaqReply>();
394  assert(rep);
395  rep->setId(id);
396  rep->setError(false);
397  return rep;
398  });
399  })
400  // unwrap outer future from async() to get the future we want from m_mgr.StopDaqAsync()
401  .unwrap();
402 }
403 
404 boost::future<std::shared_ptr<::ocmif::DaqReply>>
405 OcmDaqService::AbortDaq(const std::string& id) {
406  return boost::async(
407  m_executor,
408  [self = shared_from_this(), id]() {
409  self->m_event_log->AddEvent(daq::UserActionEvent(id,
410  fmt::format("Request received: "
411  "AbortDaq(id='{0}')",
412  id),
413  std::nullopt));
414  return self->AbortDaq(id, false);
415  }).unwrap();
416 }
417 
418 boost::future<std::shared_ptr<::ocmif::DaqReply>>
419 OcmDaqService::ForceAbortDaq(const std::string& id) {
420  return boost::async(
421  m_executor,
422  [self = shared_from_this(), id]() {
423  self->m_event_log->AddEvent(daq::UserActionEvent(id,
424  fmt::format("Request received: "
425  "ForceAbortDaq(id='{0}')",
426  id),
427  std::nullopt));
428  return self->AbortDaq(id, true);
429  }).unwrap();
430 }
431 
432 boost::future<std::shared_ptr<::ocmif::DaqReply>>
433 OcmDaqService::AbortDaq(const std::string& id, bool forced) {
434  return boost::async(
435  m_executor,
436  [self = shared_from_this(), id, forced]() {
437  return self->m_mgr
438  .AbortDaqAsync(
439  id, forced ? daq::ErrorPolicy::Tolerant : daq::ErrorPolicy::Strict)
440  .then(
441  self->m_executor,
442  [weak_self = std::weak_ptr(self->shared_from_this()), id, forced](
443  boost::future<daq::Status> f)
444  -> std::shared_ptr<ocmif::DaqReply> {
445  auto self = weak_self.lock();
446  if (!self) {
447  LOG4CPLUS_WARN(
448  LOGGER_NAME,
449  fmt::format("AbortDaq(id='{}', forced={}): AbortDaqAsync is "
450  "complete but MAL service has "
451  "been abandoned. Throwing exception.",
452  id,
453  forced));
454  throw boost::enable_current_exception(
455  ocmif::DaqException(id, "Service has been abandoned"));
456  }
457  if (f.has_exception()) {
458  LOG4CPLUS_INFO(self->m_logger,
459  fmt::format("AbortDaq(id='{}', forced={}): "
460  "AbortDaqAsync Completed "
461  "with fatal error",
462  id,
463  forced));
464  throw boost::enable_current_exception(
465  ocmif::DaqException(id, "Abort failed"));
466  }
467  auto result = f.get();
468  LOG4CPLUS_INFO(self->m_logger,
469  fmt::format("AbortDaq(id='{}', forced={}): "
470  "AbortDaqAsync Completed successfully",
471  id,
472  forced));
473  auto rep = self->m_mal.createDataEntity<ocmif::DaqReply>();
474  assert(rep);
475  rep->setId(id);
476  rep->setError(result.error);
477  LOG4CPLUS_DEBUG(
478  self->m_logger,
479  fmt::format("AbortDaq(id='{}', forced={}): "
480  "AbortDaqAsync Completed, returning reply now.",
481  id,
482  forced));
483  return rep;
484  }); // m_msg.AbortDaqAsync cont
485  }) // boost::async()
486  // unwrap outer future from async() to get the future we want from m_mgr.AbortDaqAsync()
487  .unwrap();
488 }
489 
490 boost::future<std::shared_ptr<::ocmif::DaqReply>>
491 OcmDaqService::UpdateKeywords(const std::string& id, const std::string& keywords) {
492  return boost::async(
493  m_executor,
494  [self = shared_from_this(), id, keywords]() -> std::shared_ptr<::ocmif::DaqReply> {
495  self->m_event_log->AddEvent(
497  fmt::format("Request received: "
498  "UpdateKeywords(id='{0}', keywords='{1}')",
499  id,
500  keywords),
501  std::nullopt));
502  daq::fits::KeywordVector parsed_keywords;
503  try {
504  daq::fits::ParseJsonKeywords(keywords.c_str()).swap(parsed_keywords);
505  } catch (nlohmann::json::exception const& e) {
506  LOG4CPLUS_ERROR(
507  self->m_logger,
508  fmt::format("UpdateKeywords(id='{}', ...): Failed to parse JSON", id));
509  throw boost::enable_current_exception(ocmif::DaqException(
510  id, fmt::format("Invalid JSON string: {}", e.what())));
511  } catch (std::invalid_argument const& e) {
512  LOG4CPLUS_ERROR(
513  self->m_logger,
514  fmt::format(
515  "UpdateKeywords(id='{}', ...): JSON could be parsed but was invalid "
516  "schema",
517  id));
518  throw boost::enable_current_exception(ocmif::DaqException(
519  id, fmt::format("Invalid JSON schema: {}", e.what())));
520  } catch (std::exception const& e) {
521  LOG4CPLUS_ERROR(
522  self->m_logger,
523  fmt::format(
524  "UpdateKeywords(id='{}', ...): std::exception: '{}'",
525  id, e.what()));
526  throw boost::enable_current_exception(ocmif::DaqException(
527  id, fmt::format("std::exception: {}", e.what())));
528  } catch (...) {
529  throw boost::enable_current_exception(ocmif::DaqException(id, "unknown error"));
530  }
531  try {
532  self->m_mgr.UpdateKeywords(id, parsed_keywords);
533  auto rep = self->m_mal.createDataEntity<ocmif::DaqReply>();
534  rep->setId(id);
535  rep->setError(false);
536  return rep;
537  } catch (std::invalid_argument const& e) {
538  LOG4CPLUS_ERROR(
539  self->m_logger,
540  fmt::format("UpdateKeywords(id='{}'): Invalid data acquisition id", id));
541  throw boost::enable_current_exception(ocmif::DaqException(
542  id, fmt::format("No data acquisition with id='{}'", id)));
543  } catch (std::exception const& e) {
544  LOG4CPLUS_ERROR(
545  self->m_logger,
546  fmt::format(
547  "UpdateKeywords(id='{}', ...): std::exception: '{}'",
548  id, e.what()));
549  throw boost::enable_current_exception(ocmif::DaqException(
550  id, fmt::format("std::exception: {}", e.what())));
551  } catch (...) {
552  throw boost::enable_current_exception(ocmif::DaqException(id, "unknown error"));
553  }
554  });
555 }
556 
557 boost::future<std::shared_ptr<::ocmif::DaqStatus>>
558 OcmDaqService::GetStatus(const std::string& id) {
559  return boost::async(
560  m_executor,
561  [self = shared_from_this(), id]() {
562  self->m_event_log->AddEvent(
564  fmt::format("Request received: "
565  "GetStatus(id='{0}')",
566  id),
567  std::nullopt));
568  try {
569  LOG4CPLUS_INFO(self->m_logger, fmt::format("GetStatus(id='{}'): Enter", id));
570  auto status = self->m_mgr.GetStatus(id);
571  auto rep = self->m_mal.createDataEntity<ocmif::DaqStatus>();
572  assert(rep);
573  *rep << status;
574  LOG4CPLUS_INFO(self->m_logger,
575  fmt::format("GetStatus(id='{}'): Set result -> {}",
576  id,
577  status.state));
578  return boost::make_ready_future<std::shared_ptr<ocmif::DaqStatus>>(
579  rep);
580  } catch (std::invalid_argument const&) {
581  LOG4CPLUS_ERROR(
582  self->m_logger,
583  fmt::format("GetStatus(id='{}'): Invalid data acquisition id", id));
584  return boost::make_exceptional_future<
585  std::shared_ptr<ocmif::DaqStatus>>(
586  boost::enable_current_exception(ocmif::DaqException(
587  id, fmt::format("No data acquisition with id='{}'", id))));
588  }
589  })
590  .unwrap();
591 }
592 
593 boost::future<std::vector<std::shared_ptr<::ocmif::DaqStatus>>> OcmDaqService::GetActiveList() {
594  return boost::async(
595  m_executor,
596  [self = shared_from_this()]() -> std::vector<std::shared_ptr<::ocmif::DaqStatus>> {
597  self->m_event_log->AddEvent(daq::UserActionEvent("",
598  "Request received: "
599  "GetActiveList()",
600  std::nullopt));
601  auto daqs = self->m_mgr.GetDaqControllers();
602  std::vector<std::shared_ptr<daq::DaqController const>> active;
603  std::vector<std::shared_ptr<ocmif::DaqStatus>> reply;
604  std::copy_if(daqs.begin(),
605  daqs.end(),
606  std::back_inserter(active),
607  [](auto daq_ctl) { return !daq::IsFinalState(daq_ctl->GetState()); });
608  std::transform(
609  active.begin(),
610  active.end(),
611  std::back_inserter(reply),
612  [&mal = self->m_mal](auto daq_ctl) {
613  auto mal_status = mal.createDataEntity<ocmif::DaqStatus>();
614  *mal_status << daq_ctl->GetStatus()->GetStatus();
615  return mal_status;
616  });
617  return reply;
618  });
619 }
620 
621 boost::future<std::shared_ptr<::ocmif::AwaitDaqReply>> OcmDaqService::AwaitDaqState(
622  const std::string& id, ocmif::DaqState state, ocmif::DaqSubState substate, double timeout) {
623  using Seconds = std::chrono::duration<double>;
624 
625  return boost::async(m_executor, [=, self = shared_from_this()]() {
626  self->m_event_log->AddEvent(daq::UserActionEvent(id,
627  format("Request received: "
628  "AwaitDaqState(id='{}', "
629  "state={}, substate={}, "
630  "timeout={})",
631  id,
632  ocmif::ToString(state),
633  ocmif::ToString(substate),
634  timeout),
635  std::nullopt));
636  if (timeout <= 0) {
637  return boost::make_exceptional_future<daq::Result<daq::Status>>(
638  std::invalid_argument(format("Invalid argument `timeout`. Must be > 0", timeout)));
639  }
640  if (state != ocmif::StateAcquiring && substate != ocmif::Aborted) {
641  return boost::make_exceptional_future<daq::Result<daq::Status>>(
642  std::invalid_argument("Invalid argument `state`: "
643  "Only ocmif::StateAcquiring is supported"));
644  }
645  auto daq_state = ocmif::MakeState(substate);
646  return self->m_mgr.AwaitDaqStateAsync(id,
647  daq_state,
648  duration_cast<milliseconds>(Seconds(timeout)));
649  })
650  .unwrap()
651  .then(m_executor, [id, self = shared_from_this()](boost::future<daq::Result<daq::Status>> fut) {
652  try {
653  auto [timeout, status] = fut.get();
654  auto mal_reply = self->m_mal.createDataEntity<ocmif::AwaitDaqReply>();
655  assert(mal_reply);
656  mal_reply->setTimeout(timeout);
657  auto mal_status_ptr = mal_reply->getStatus();
658  assert(mal_status_ptr);
659  *mal_status_ptr << status;
660  daq::ActionEvent(id,
661  fmt::format("Request completed: {}",
662  (timeout ? "condition not yet satisfied (timeout)"
663  : "condition satisfied")),
664  std::nullopt);
665  return mal_reply;
666  } catch (std::exception const& e) {
667  self->m_event_log->AddEvent(
668  daq::ActionEvent(id,
669  fmt::format("Request completed exceptionally: {}", e.what()),
670  std::nullopt));
671  throw boost::enable_current_exception(ocmif::DaqException(id, e.what()));
672  } catch (...) {
673  daq::ActionEvent(id,
674  "Request completed exceptionally: Unknown exception",
675  std::nullopt);
676  throw boost::enable_current_exception(ocmif::DaqException(id, "Uknown exception"));
677  }
678  });
679 }
ocmif::ToString
std::string_view ToString(ocmif::DaqState state) noexcept
Definition: conversion.cpp:78
ParseSourceUri
ParsedSource ParseSourceUri(std::string_view s)
Parse user provided string in the format "<name>@<rr-uri>".
Definition: ocmDaqService.cpp:123
ParsedSource::operator==
bool operator==(ParsedSource const &rhs) const
Definition: ocmDaqService.cpp:111
daq::DaqProperties::dp_name_prefix
std::string dp_name_prefix
Data product file name prefix.
Definition: daqProperties.hpp:44
ParseSourceUris
std::vector< ParsedSource > ParseSourceUris(std::string_view s)
Parse user provided string in the format "<name>@<rr-uri>[ <name>@...]".
Definition: ocmDaqService.cpp:157
daq::DaqProperties
Structure carrying properties needed to start a DataAcquisition.
Definition: daqProperties.hpp:28
daq::DaqControllerImpl::Create
static std::shared_ptr< DaqControllerImpl > Create(boost::asio::io_context &io_context, DaqProperties properties, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, AsyncOperations operations)
Construct object.
Definition: daqController.cpp:86
daq::ErrorEvent
Definition: eventLog.hpp:69
OcmDaqService::ForceStopDaq
boost::future< std::shared_ptr<::ocmif::DaqReply > > ForceStopDaq(const std::string &id) override
Definition: ocmDaqService.cpp:353
daq::DaqProperties::meta_sources
std::vector< MetaSource > meta_sources
Definition: daqProperties.hpp:50
operator<<
std::ostream & operator<<(std::ostream &os, ParsedSource const &s)
Definition: ocmDaqService.cpp:118
conversion.hpp
Contains support functions for ocmif.
OcmDaqService::ForceAbortDaq
boost::future< std::shared_ptr<::ocmif::DaqReply > > ForceAbortDaq(const std::string &id) override
Definition: ocmDaqService.cpp:419
daq
Definition: daqController.cpp:18
ocmif::MakeState
ocmif::DaqSubState MakeState(daq::State state) noexcept
Converts daq::State to DaqSubstate.
Definition: conversion.cpp:32
ParsedSource::name
std::string name
Definition: ocmDaqService.hpp:34
daq::DaqProperties::id
std::string id
Definition: daqProperties.hpp:35
daqController.hpp
Contains declaration for for DaqController.
OcmDaqService::AbortDaq
boost::future< std::shared_ptr<::ocmif::DaqReply > > AbortDaq(const std::string &id) override
Definition: ocmDaqService.cpp:405
OcmDaqService::GetActiveList
boost::future< std::vector< std::shared_ptr<::ocmif::DaqStatus > > > GetActiveList() override
Definition: ocmDaqService.cpp:593
ocmDaqService.hpp
Declaration of OcmDaqService.
OcmDaqService::AwaitDaqState
boost::future< std::shared_ptr<::ocmif::AwaitDaqReply > > AwaitDaqState(const std::string &id, ocmif::DaqState state, ocmif::DaqSubState substate, double timeout) override
Definition: ocmDaqService.cpp:621
json.hpp
Contains data structure for FITS keywords.
OcmDaqService::OcmDaqService
OcmDaqService(boost::asio::io_context &io_ctx, mal::Mal &mal, daq::Manager &mgr, std::string output_path)
Definition: ocmDaqService.cpp:179
ParsedSource::ParsedSource
ParsedSource()=default
daq::DaqProperties::keywords
std::vector< daq::fits::KeywordVariant > keywords
Initial list of keywords provided by user when starting data acquisitions.
Definition: daqProperties.hpp:54
OcmDaqService::StopDaq
boost::future< std::shared_ptr<::ocmif::DaqReply > > StopDaq(const std::string &id) override
Definition: ocmDaqService.cpp:339
daq::fits::KeywordVector
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
Definition: keyword.hpp:138
OcmDaqService::~OcmDaqService
~OcmDaqService()
Definition: ocmDaqService.cpp:194
daq::Manager
Manager owns DaqController and FitsController (active data acquisitions) instances and multiplexes re...
Definition: manager.hpp:86
OcmDaqService::UpdateKeywords
boost::future< std::shared_ptr<::ocmif::DaqReply > > UpdateKeywords(const std::string &id, const std::string &keywords) override
Definition: ocmDaqService.cpp:491
daq::MetaSource::RrClient
metadaqif::MetaDaqAsync RrClient
Definition: source.hpp:141
server::LOGGER_NAME
const std::string LOGGER_NAME
Definition: logger.hpp:17
OcmDaqService::StartDaq
boost::future< std::shared_ptr<::ocmif::DaqReply > > StartDaq(const std::string &id, const std::string &file_prefix, const std::string &primary_sources, const std::string &metadata_sources, const std::string &properties) override
Definition: ocmDaqService.cpp:199
daq::ErrorPolicy::Strict
@ Strict
Any error is considered fatal and may lead to the operation being aborted.
daq::ActionEvent
Event related to an action being requested or performed.
Definition: eventLog.hpp:56
daq::UserActionEvent
Event directly related to user action, such as a command to do something.
Definition: eventLog.hpp:65
daq::DaqProperties::await_interval
std::chrono::milliseconds await_interval
Interval (and thus duration) of the requests sent to primary sources to await end of recording.
Definition: daqProperties.hpp:61
daq::DaqProperties::ocm_dppart_root
std::string ocm_dppart_root
Root directory for OCM data product part output.
Definition: daqProperties.hpp:48
ParsedSource::rr_uri
std::string rr_uri
Definition: ocmDaqService.hpp:35
daq::PrimSource::RrClient
recif::RecCmdsAsync RrClient
Definition: source.hpp:98
daq::AsyncOperations
Async operations.
Definition: daqController.hpp:46
daq::fits::ParseJsonKeywords
std::vector< KeywordVariant > ParseJsonKeywords(char const *keywords)
Parse and return FITS keywords.
Definition: json.cpp:73
daq::DaqProperties::prim_sources
std::vector< PrimSource > prim_sources
Definition: daqProperties.hpp:49
ParsedSource
Definition: ocmDaqService.hpp:25
ParseStartDaqProperties
daq::DaqProperties ParseStartDaqProperties(std::string const &json_properties)
Parse the JSON properties user provides with StartDaq.
Definition: ocmDaqService.cpp:76
OcmDaqService::GetStatus
boost::future< std::shared_ptr<::ocmif::DaqStatus > > GetStatus(const std::string &id) override
Definition: ocmDaqService.cpp:558