ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
ocmDaqService.cpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup daq_ocm_server
4 * @copyright 2022 ESO - European Southern Observatory
5 *
6 * @brief Declaration of OcmDaqService
7 */
8#include "ocmDaqService.hpp"
9
10#include <algorithm>
11#include <cstddef>
12#include <regex>
13
14#include <fmt/format.h>
15#include <fmt/ostream.h>
16#include <log4cplus/loggingmacros.h>
17#include <mal/rr/qos/ConnectionTime.hpp>
18#include <nlohmann/json.hpp>
19
20#include <daq/conversion.hpp>
21#include <daq/daqController.hpp>
22#include <daq/error/report.hpp>
23#include <daq/fits/json.hpp>
24#include <daqif/state.hpp>
25
27
28#include "logger.hpp"
29
30using boost::enable_current_exception;
31using boost::make_exceptional_future;
33using fmt::format;
34using std::chrono::duration_cast;
35using std::chrono::milliseconds;
36
37namespace {
38
39auto MakeDaqifException(std::string const& id,
40 std::string const& msg,
41 std::exception const& exception) {
42 auto nested_msg = NestedExceptionReporter(exception).Str();
43 if (msg.empty()) {
44 return boost::enable_current_exception(
45 daqif::DaqException(id, fmt::format("{}", nested_msg)));
46
47 } else {
48 return boost::enable_current_exception(
49 daqif::DaqException(id, fmt::format("{}\n{}", msg, nested_msg)));
50 }
51}
52
53std::vector<daq::DaqContext::Source> ParseSource(std::string const& str) {
54 std::vector<daq::DaqContext::Source> sources;
55
56 std::vector<ParsedSource> raw_sources = ParseSourceUris(str);
57 sources.reserve(raw_sources.size());
58 for (auto const& raw : raw_sources) {
59 sources.push_back({raw.name, raw.rr_uri});
60 }
61
62 return sources;
63}
64
65void ValidateFilePrefix(char const* file_prefix) {
66 auto file_regex = std::regex(R"(^[-a-zA-Z0-9_\.]*$)");
67
68 if (!std::regex_match(file_prefix, file_regex)) {
69 throw std::invalid_argument(
70 fmt::format("file_prefix \"{}\" contains illegal characters, allowed: [a-zA-Z-0-9-_.]",
71 file_prefix));
72 }
73}
74
75/**
76 * Validate arguments.
77 *
78 * TODO:
79 * - Check merge target.
80 * - Source names must be unique (and shouldn't use reserved names)
81 *
82 * @throw std::invalid_argument on validation error.
83 */
84void ValidateDaqContext(daq::DaqContext const& ctx) {
85 ValidateFilePrefix(ctx.dp_name_prefix.c_str());
86}
87
88} // namespace
89
90daq::DaqContext ParseStartDaqContext(std::string const& json_properties) {
91 using std::chrono::duration_cast;
92 using std::chrono::milliseconds;
93 using Seconds = std::chrono::duration<double>;
94 daq::DaqContext properties;
95
96 if (json_properties.empty()) {
97 // No arguments
98 return properties;
99 }
100
101 auto json = nlohmann::json::parse(json_properties);
102 if (!json.is_object()) {
103 throw boost::enable_current_exception(std::invalid_argument(
104 fmt::format("expected type object but got type {}", json.type_name())));
105 }
106 if (json.contains("keywords")) {
107 properties.keywords = daq::fits::ParseJsonKeywords(json["keywords"]);
108 }
109 if (json.contains("awaitInterval")) {
110 auto& value = json["awaitInterval"];
111 if (!value.is_number()) {
112 throw boost::enable_current_exception(std::invalid_argument(
113 fmt::format("'awaitInterval': unsupported type: {}", value.type_name())));
114 }
115 auto await_interval = value.get<double>();
116 if (await_interval < 0.0) {
117 throw boost::enable_current_exception(std::invalid_argument(
118 fmt::format("'awaitInterval' must be positive number, got {}", await_interval)));
119 }
120 properties.await_interval = duration_cast<milliseconds>(Seconds(value.get<double>()));
121 }
122 return properties;
123}
124
126 return name == rhs.name && rr_uri == rhs.rr_uri;
127}
128
129ParsedSource::ParsedSource(std::string name, std::string rr_uri)
130 : name(std::move(name)), rr_uri(std::move(rr_uri)) {
131}
132std::ostream& operator<<(std::ostream& os, ParsedSource const& s) {
133 os << "name: '" << s.name << "', rr_uri='" << s.rr_uri << "'";
134 return os;
135}
136
137ParsedSource ParseSourceUri(std::string_view s) {
138 auto start = s.find_first_not_of(' ');
139 auto name_end_pos = s.find_first_of('@');
140
141 if (name_end_pos == std::string_view::npos) {
142 throw boost::enable_current_exception(
143 std::invalid_argument("separator '@' not found in expected format 'name@rr-uri'"));
144 }
145 auto name = s.substr(start, name_end_pos - start);
146 if (name.empty()) {
147 throw boost::enable_current_exception(
148 std::invalid_argument("name part in 'name@rr-uri' is empty"));
149 }
150
151 start = name_end_pos + 1;
152 if (start >= s.size()) {
153 throw boost::enable_current_exception(
154 std::invalid_argument("invalid format string, expected format 'name@rr-uri'"));
155 }
156 auto rr_uri_end_pos = s.find_first_of(" ,", start);
157 if (name_end_pos == std::string_view::npos) {
158 throw boost::enable_current_exception(std::invalid_argument("separator ',' not found"));
159 }
160
161 auto rr_uri = s.substr(start, rr_uri_end_pos - start);
162 if (rr_uri.empty()) {
163 throw boost::enable_current_exception(
164 std::invalid_argument("rr_uri part in 'name@rr-uri' is empty"));
165 }
166 return ParsedSource(std::string(name), std::string(rr_uri));
167}
168/**
169 * Parse user provided string in the format
170 * "<name>@<rr-uri>[ <name>@...]"
171 *
172 * @throw std::invalid_argument on errors.
173 */
174std::vector<ParsedSource> ParseSourceUris(std::string_view s) {
175 std::vector<ParsedSource> result;
176 std::size_t begin = 0;
177
178 while (begin < s.size()) {
179 const auto end = s.find_first_of(' ', begin);
180
181 if (begin != end) {
182 result.emplace_back(ParseSourceUri(s.substr(begin, end - begin)));
183 }
184
185 if (end == std::string_view::npos) {
186 break;
187 }
188
189 begin = end + 1;
190 }
191
192 return result;
193}
194
195OcmDaqService::OcmDaqService(boost::asio::io_context& io_ctx,
196 mal::Mal& mal,
197 daq::Manager& mgr,
198 std::string proc_name,
199 std::string output_path,
200 std::shared_ptr<daq::ObservableEventLog> event_log)
201 : m_io_ctx(io_ctx)
202 , m_executor(m_io_ctx)
203 , m_mal(mal)
204 , m_mgr(mgr)
205 , m_proc_name(std::move(proc_name))
206 , m_output_path(std::move(output_path))
207 , m_event_log(std::move(event_log))
208 , m_log_observer_connection()
209 , m_log_observer(log4cplus::Logger::getInstance(server::LOGGER_NAME_EVENTLOG))
210 , m_logger(log4cplus::Logger::getInstance(server::LOGGER_NAME)) {
211 m_log_observer_connection =
212 m_event_log->ConnectObserver(std::reference_wrapper(m_log_observer));
213 if (m_proc_name.empty()) {
214 throw boost::enable_current_exception(
215 std::invalid_argument("OcmDaqService: Process name cannot be empty"));
216 }
217}
218
220 m_log_observer_connection.disconnect();
221}
222
223boost::future<std::shared_ptr<::daqif::DaqReply>>
224OcmDaqService::StartDaq(const std::string& id,
225 const std::string& file_prefix,
226 const std::string& primary_sources,
227 const std::string& metadata_sources,
228 const std::string& json_properties) {
229 return boost::async(
230 m_executor,
231 [=, self = shared_from_this()]() mutable {
232 self->m_event_log->AddEvent(daq::UserActionEvent(
233 id,
234 fmt::format("Request received: "
235 "StartDaq(id='{0}', file_prefix='{1}', "
236 "primary_sources='{2}', metadata_sources='{3}', "
237 "json_properties='{4}'",
238 id,
239 file_prefix,
240 primary_sources,
241 metadata_sources,
242 json_properties),
243 std::nullopt));
244 try {
245 ValidateFilePrefix(file_prefix.c_str());
246 } catch (std::exception const& e) {
247 self->m_event_log->AddEvent(
248 daq::ErrorEvent(id, e.what(), std::nullopt, "user"));
249 return boost::make_exceptional_future<std::shared_ptr<::daqif::DaqReply>>(
250 daqif::DaqException(id, e.what()));
251 }
252 // Parse provided JSON
253 // Validation that require state is performed in m_executor for thread safety.
254 daq::DaqContext context;
255 try {
256 context = ParseStartDaqContext(json_properties);
257 } catch (std::exception const& e) {
258 auto msg =
259 fmt::format("Failed to parse StartDaq JSON properties: {}", e.what());
260 self->m_event_log->AddEvent(daq::ErrorEvent(id, msg, std::nullopt, "user"));
261 return boost::make_exceptional_future<std::shared_ptr<::daqif::DaqReply>>(
262 daqif::DaqException(id, msg));
263 }
264
265 context.file_id = self->m_mgr.MakeDaqId(&context.creation_time);
266 auto validated_id = id;
267 if (validated_id.empty()) {
268 // User did not provide an ID -> use file_id as DAQ id
269 validated_id = context.file_id;
270 LOG4CPLUS_INFO(
271 self->m_logger,
272 fmt::format("StartDaq(id='{0}'): Created and assigned DAQ id='{0}'",
273 context.file_id));
274 } else {
275 // Check that an instance does not already exist with id
276 if (self->m_mgr.HaveDaq(validated_id)) {
277 LOG4CPLUS_INFO(
278 self->m_logger,
279 fmt::format("StartDaq(id='{0}'): DAQ with id='{0}' already exist",
280 validated_id));
281 return boost::make_exceptional_future<std::shared_ptr<daqif::DaqReply>>(
282 daqif::DaqException(id,
283 "Data acquisition with same id already exist"));
284 }
285 }
286
287 try {
288 context.id = validated_id;
289 context.process_name = m_proc_name;
290 context.dp_name_prefix = file_prefix;
291 // Create primary sources
292 context.prim_sources = ParseSource(primary_sources);
293 // Create metadata sources
294 context.meta_sources = ParseSource(metadata_sources);
295
296 // Start
297 return self->StartDaq(context, "StartDaq");
298 } catch (daqif::DaqException const&) {
299 // Already correct exception
300 throw;
301 } catch (std::invalid_argument const& e) {
302 LOG4CPLUS_INFO(self->m_logger,
303 fmt::format("StartDaq(id='{}'): Invalid argument error while "
304 "processing request: {}",
305 validated_id,
306 e.what()));
307 return boost::make_exceptional_future<std::shared_ptr<daqif::DaqReply>>(
308 daqif::DaqException(validated_id, e.what()));
309 } catch (std::exception const& e) {
310 LOG4CPLUS_INFO(self->m_logger,
311 fmt::format("StartDaq(id='{}'): Error while"
312 "processing request: {}",
313 validated_id,
314 e.what()));
315 return boost::make_exceptional_future<std::shared_ptr<daqif::DaqReply>>(
316 daqif::DaqException(validated_id, e.what()));
317 } catch (...) {
318 LOG4CPLUS_INFO(
319 self->m_logger,
320 fmt::format("StartDaq(id='{}'): Unknown error while processing request",
321 validated_id));
322 return boost::make_exceptional_future<std::shared_ptr<daqif::DaqReply>>(
323 daqif::DaqException(validated_id, "Uknown error"));
324 }
325 })
326 // unwrap outer future from async() to get the future we want from m_mgr.StartDaqAsync()
327 .unwrap();
328}
329
330boost::future<std::shared_ptr<::daqif::DaqReply>>
331OcmDaqService::StartDaqV2(const std::string& specification) {
332 return boost::async(m_executor,
333 [=, self = shared_from_this()]() mutable {
334 self->m_event_log->AddEvent(daq::UserActionEvent(
335 "",
336 fmt::format("Request received: "
337 "StartDaqV2(specification=\n'{0}')",
338 specification),
339 std::nullopt));
340 try {
341 // Parse provided JSON
342 auto parsed = daq::json::ParseStartDaqV2Spec(
343 nlohmann::json::parse(specification));
344
345 // Validation that require state is performed in m_executor for
346 // thread safety.
347 daq::DaqContext context;
348 UpdateFrom(context, parsed);
349
350 assert(context.specification.has_value());
351
352 ValidateDaqContext(context);
353
354 {
355 nlohmann::json j;
356 to_json(j, *context.specification);
357 LOG4CPLUS_DEBUG(self->m_logger,
358 "Resulting specification after parsing: \n"
359 << j.dump(2));
360 }
361
362 // Start
363 return self->StartDaq(context, "StartDaqV2");
364 } catch (daqif::DaqException const&) {
365 // Already correct exception
366 throw;
367 } catch (nlohmann::json::exception const& e) {
368 throw MakeDaqifException("", "JSON parsing error", e);
369 } catch (daq::json::SchemaError const& e) {
370 throw MakeDaqifException("", "JSON Schema error", e);
371 } catch (std::invalid_argument const& e) {
372 throw MakeDaqifException("", "Invalid argument", e);
373 } catch (std::exception const& e) {
374 throw MakeDaqifException("", "", e);
375 }
376 })
377 .unwrap();
378}
379
380boost::future<std::shared_ptr<::daqif::DaqReply>> OcmDaqService::StopDaq(const std::string& id) {
381 return boost::async(
382 m_executor,
383 [self = shared_from_this(), id]() {
384 self->m_event_log->AddEvent(daq::UserActionEvent(id,
385 fmt::format("Request received: "
386 "StopDaq(id='{0}')",
387 id),
388 std::nullopt));
389 return self->StopDaq(id, false);
390 })
391 .unwrap();
392}
393
394boost::future<std::shared_ptr<::daqif::DaqReply>>
395OcmDaqService::ForceStopDaq(const std::string& id) {
396 return boost::async(m_executor,
397 [self = shared_from_this(), id]() {
398 self->m_event_log->AddEvent(
400 fmt::format("Request received: "
401 "ForceStopDaq(id='{0}')",
402 id),
403 std::nullopt));
404 return self->StopDaq(id, true);
405 })
406 .unwrap();
407}
408
409boost::future<std::shared_ptr<::daqif::DaqReply>>
410OcmDaqService::StartDaq(daq::DaqContext const& context, char const* function) {
411 return m_mgr.StartDaqAsync(context).then(
412 m_executor,
413 [function, weak_self = std::weak_ptr(shared_from_this()), id = context.id](
414 boost::future<daq::State> f) -> std::shared_ptr<daqif::DaqReply> {
415 auto self = weak_self.lock();
416 if (!self) {
417 LOG4CPLUS_WARN(LOGGER_NAME,
418 fmt::format("{}(id='{}'): StartDaqAsync is "
419 "complete but MAL service has "
420 "been abandoned. Throwing exception.",
421 function,
422 id));
423 throw boost::enable_current_exception(
424 daqif::DaqException(id, "Service has been abandoned"));
425 }
426 try {
427 f.get();
428 auto rep = self->m_mal.createDataEntity<daqif::DaqReply>();
429 assert(rep);
430 rep->setId(id);
431 rep->setError(false);
432 return rep;
433 } catch (...) {
434 auto what = self->MakeExceptionMessageWithStatus(id, std::current_exception());
435 LOG4CPLUS_ERROR(self->m_logger,
436 fmt::format("{}(id='{}'): StartDaqAsync "
437 "completed with failure: {}",
438 function,
439 id,
440 what));
441 throw boost::enable_current_exception(
442 daqif::DaqException(id, fmt::format("{}() failed: {}", function, what)));
443 }
444 });
445}
446
447boost::future<std::shared_ptr<::daqif::DaqReply>>
448OcmDaqService::StopDaq(const std::string& id, bool forced) {
449 return boost::async(
450 m_executor,
451 [self = shared_from_this(), id, forced]() {
452 return self->m_mgr
453 .StopDaqAsync(id,
455 .then(
456 self->m_executor,
457 [weak_self = std::weak_ptr(self->shared_from_this()),
458 id](boost::future<daq::Status> f) -> std::shared_ptr<daqif::DaqReply> {
459 auto self = weak_self.lock();
460 if (!self) {
461 LOG4CPLUS_WARN(LOGGER_NAME,
462 fmt::format("StopDaq(id='{}'): StopDaqAsync is "
463 "complete but MAL service has "
464 "been abandoned. Throwing exception.",
465 id));
466 throw boost::enable_current_exception(
467 daqif::DaqException(id, "Service has been abandoned"));
468 }
469 try {
470 f.get();
471 auto rep = self->m_mal.createDataEntity<daqif::DaqReply>();
472 assert(rep);
473 rep->setId(id);
474 rep->setError(false);
475 return rep;
476 } catch (...) {
477 auto what = self->MakeExceptionMessageWithStatus(
478 id, std::current_exception());
479 LOG4CPLUS_INFO(self->m_logger,
480 fmt::format("StopDaq(id='{}'): "
481 "completed with failure: {}",
482 id,
483 what));
484 throw boost::enable_current_exception(daqif::DaqException(
485 id, fmt::format("Stop failed\n\n{}", what)));
486 }
487 });
488 })
489 // unwrap outer future from async() to get the future we want from m_mgr.StopDaqAsync()
490 .unwrap();
491}
492
493boost::future<std::shared_ptr<::daqif::DaqReply>> OcmDaqService::AbortDaq(const std::string& id) {
494 return boost::async(m_executor,
495 [self = shared_from_this(), id]() {
496 self->m_event_log->AddEvent(
498 fmt::format("Request received: "
499 "AbortDaq(id='{0}')",
500 id),
501 std::nullopt));
502 return self->AbortDaq(id, false);
503 })
504 .unwrap();
505}
506
507boost::future<std::shared_ptr<::daqif::DaqReply>>
508OcmDaqService::ForceAbortDaq(const std::string& id) {
509 return boost::async(m_executor,
510 [self = shared_from_this(), id]() {
511 self->m_event_log->AddEvent(
513 fmt::format("Request received: "
514 "ForceAbortDaq(id='{0}')",
515 id),
516 std::nullopt));
517 return self->AbortDaq(id, true);
518 })
519 .unwrap();
520}
521
522boost::future<std::shared_ptr<::daqif::DaqReply>>
523OcmDaqService::AbortDaq(const std::string& id, bool forced) {
524 return boost::async(
525 m_executor,
526 [self = shared_from_this(), id, forced]() {
527 return self->m_mgr
528 .AbortDaqAsync(
530 .then(
531 self->m_executor,
532 [weak_self = std::weak_ptr(self->shared_from_this()), id, forced](
533 boost::future<daq::Status> f) -> std::shared_ptr<daqif::DaqReply> {
534 auto self = weak_self.lock();
535 if (!self) {
536 LOG4CPLUS_WARN(
537 LOGGER_NAME,
538 fmt::format("AbortDaq(id='{}', forced={}): AbortDaqAsync is "
539 "complete but MAL service has "
540 "been abandoned. Throwing exception.",
541 id,
542 forced));
543 throw boost::enable_current_exception(
544 daqif::DaqException(id, "Service has been abandoned"));
545 }
546 try {
547 auto result = f.get();
548 LOG4CPLUS_INFO(
549 self->m_logger,
550 fmt::format("AbortDaq(id='{}', forced={}): "
551 "AbortDaqAsync Completed successfully",
552 id,
553 forced));
554 auto rep = self->m_mal.createDataEntity<daqif::DaqReply>();
555 assert(rep);
556 rep->setId(id);
557 rep->setError(HasError(result));
558 LOG4CPLUS_DEBUG(
559 self->m_logger,
560 fmt::format("AbortDaq(id='{}', forced={}): "
561 "AbortDaqAsync Completed, returning reply now.",
562 id,
563 forced));
564 return rep;
565 } catch (...) {
566 auto what = self->MakeExceptionMessageWithStatus(
567 id, std::current_exception());
568 LOG4CPLUS_ERROR(self->m_logger,
569 fmt::format("AbortDaq(id='{}', forced={}): "
570 "AbortDaqAsync Completed "
571 "with fatal error:\n{}",
572 id,
573 forced,
574 what));
575 throw boost::enable_current_exception(daqif::DaqException(
576 id, fmt::format("Abort failed\n\n{}", what)));
577 }
578 }); // m_msg.AbortDaqAsync cont
579 }) // boost::async()
580 // unwrap outer future from async() to get the future we want from m_mgr.AbortDaqAsync()
581 .unwrap();
582}
583
584boost::future<std::shared_ptr<::daqif::DaqReply>>
585OcmDaqService::UpdateKeywords(const std::string& id, const std::string& keywords) {
586 return boost::async(
587 m_executor,
588 [self = shared_from_this(), id, keywords]() -> std::shared_ptr<::daqif::DaqReply> {
589 self->m_event_log->AddEvent(
591 fmt::format("Request received: "
592 "UpdateKeywords(id='{0}', keywords='{1}')",
593 id,
594 keywords),
595 std::nullopt));
596 daq::fits::KeywordVector parsed_keywords;
597 try {
598 daq::fits::ParseJsonKeywords(keywords.c_str()).swap(parsed_keywords);
599 } catch (nlohmann::json::exception const& e) {
600 LOG4CPLUS_ERROR(
601 self->m_logger,
602 fmt::format("UpdateKeywords(id='{}', ...): Failed to parse JSON", id));
603 throw boost::enable_current_exception(
604 daqif::DaqException(id, fmt::format("Invalid JSON string: {}", e.what())));
605 } catch (std::invalid_argument const& e) {
606 LOG4CPLUS_ERROR(
607 self->m_logger,
608 fmt::format(
609 "UpdateKeywords(id='{}', ...): JSON could be parsed but was invalid "
610 "schema",
611 id));
612 throw boost::enable_current_exception(
613 daqif::DaqException(id, fmt::format("Invalid JSON schema: {}", e.what())));
614 } catch (std::exception const& e) {
615 LOG4CPLUS_ERROR(
616 self->m_logger,
617 fmt::format(
618 "UpdateKeywords(id='{}', ...): std::exception: '{}'", id, e.what()));
619 throw boost::enable_current_exception(
620 daqif::DaqException(id, fmt::format("std::exception: {}", e.what())));
621 } catch (...) {
622 throw boost::enable_current_exception(daqif::DaqException(id, "unknown error"));
623 }
624 try {
625 self->m_mgr.UpdateKeywords(id, parsed_keywords);
626 auto rep = self->m_mal.createDataEntity<daqif::DaqReply>();
627 rep->setId(id);
628 rep->setError(false);
629 return rep;
630 } catch (daq::fits::InvalidKeyword const& e) {
631 LOG4CPLUS_ERROR(self->m_logger,
632 fmt::format("UpdateKeywords(id='{}'): {}", id, e.what()));
633 throw boost::enable_current_exception(daqif::DaqException(id, e.what()));
634 } catch (daq::InvalidDaqId const& e) {
635 LOG4CPLUS_ERROR(
636 self->m_logger,
637 fmt::format(
638 "UpdateKeywords(id='{}'): Invalid data acquisition id: ", id, e.what()));
639 throw boost::enable_current_exception(daqif::DaqException(id, e.what()));
640 } catch (std::exception const& e) {
641 LOG4CPLUS_ERROR(
642 self->m_logger,
643 fmt::format(
644 "UpdateKeywords(id='{}', ...): std::exception: '{}'", id, e.what()));
645 throw boost::enable_current_exception(
646 daqif::DaqException(id, fmt::format("std::exception: {}", e.what())));
647 } catch (...) {
648 throw boost::enable_current_exception(daqif::DaqException(id, "unknown error"));
649 }
650 });
651}
652
653boost::future<std::shared_ptr<::daqif::DaqStatus>> OcmDaqService::GetStatus(const std::string& id) {
654 return boost::async(
655 m_executor,
656 [self = shared_from_this(), id]() {
657 self->m_event_log->AddEvent(
659 fmt::format("Request received: "
660 "GetStatus(id='{0}')",
661 id),
662 std::nullopt));
663 try {
664 LOG4CPLUS_INFO(self->m_logger, fmt::format("GetStatus(id='{}'): Enter", id));
665 auto status = self->m_mgr.GetStatus(id);
666 auto rep = self->m_mal.createDataEntity<daqif::DaqStatus>();
667 assert(rep);
668 *rep << status;
669 LOG4CPLUS_INFO(
670 self->m_logger,
671 fmt::format("GetStatus(id='{}'): Set result -> {}", id, status.state));
672 return boost::make_ready_future<std::shared_ptr<daqif::DaqStatus>>(rep);
673 } catch (std::invalid_argument const&) {
674 LOG4CPLUS_ERROR(
675 self->m_logger,
676 fmt::format("GetStatus(id='{}'): Invalid data acquisition id", id));
677 return boost::make_exceptional_future<std::shared_ptr<daqif::DaqStatus>>(
678 boost::enable_current_exception(daqif::DaqException(
679 id, fmt::format("No data acquisition with id='{}'", id))));
680 }
681 })
682 .unwrap();
683}
684
685boost::future<std::vector<std::shared_ptr<::daqif::DaqStatus>>> OcmDaqService::GetActiveList() {
686 return boost::async(
687 m_executor,
688 [self = shared_from_this()]() -> std::vector<std::shared_ptr<::daqif::DaqStatus>> {
689 self->m_event_log->AddEvent(daq::UserActionEvent("",
690 "Request received: "
691 "GetActiveList()",
692 std::nullopt));
693 auto daqs = self->m_mgr.GetDaqControllers();
694 std::vector<std::shared_ptr<daq::DaqController const>> active;
695 std::vector<std::shared_ptr<daqif::DaqStatus>> reply;
696 std::copy_if(daqs.begin(), daqs.end(), std::back_inserter(active), [](auto daq_ctl) {
697 return !daq::IsFinalState(daq_ctl->GetState());
698 });
699 std::transform(active.begin(),
700 active.end(),
701 std::back_inserter(reply),
702 [&mal = self->m_mal](auto daq_ctl) {
703 auto mal_status = mal.createDataEntity<daqif::DaqStatus>();
704 *mal_status << daq_ctl->GetStatus()->GetStatus();
705 return mal_status;
706 });
707 return reply;
708 });
709}
710
711boost::future<std::shared_ptr<::daqif::AwaitDaqReply>> OcmDaqService::AwaitDaqState(
712 const std::string& id, daqif::DaqState state, daqif::DaqSubState substate, double timeout) {
713 using Seconds = std::chrono::duration<double>;
714
715 return boost::async(
716 m_executor,
717 [=, self = shared_from_this()]() {
718 self->m_event_log->AddEvent(daq::UserActionEvent(id,
719 format("Request received: "
720 "AwaitDaqState(id='{}', "
721 "state={}, substate={}, "
722 "timeout={})",
723 id,
724 daq::ToString(state),
725 daq::ToString(substate),
726 timeout),
727 std::nullopt));
728 if (timeout <= 0) {
729 return boost::make_exceptional_future<daq::Result<daq::Status>>(
730 std::invalid_argument(
731 format("Invalid argument `timeout`. Must be > 0", timeout)));
732 }
733 if (!daqif::IsStateValid(state, substate)) {
734 return boost::make_exceptional_future<daq::Result<daq::Status>>(
735 std::invalid_argument(fmt::format(
736 "Invalid state combination: {} and {}", state, substate)));
737 }
738 auto daq_state = daq::MakeState({state, substate});
739 return self->m_mgr.AwaitDaqStateAsync(
740 id, daq_state, duration_cast<milliseconds>(Seconds(timeout)));
741 })
742 .unwrap()
743 .then(
744 m_executor,
745 [id, self = shared_from_this()](boost::future<daq::Result<daq::Status>> fut) {
746 try {
747 auto [timeout, status] = fut.get();
748 auto mal_reply = self->m_mal.createDataEntity<daqif::AwaitDaqReply>();
749 assert(mal_reply);
750 mal_reply->setTimeout(timeout);
751 auto mal_status_ptr = mal_reply->getStatus();
752 assert(mal_status_ptr);
753 *mal_status_ptr << status;
755 fmt::format("Request completed: {}",
756 (timeout ? "condition not yet satisfied (timeout)"
757 : "condition satisfied")),
758 std::nullopt);
759 return mal_reply;
760 } catch (std::exception const& e) {
761 auto what = self->MakeExceptionMessageWithStatus(id, std::current_exception());
762 self->m_event_log->AddEvent(daq::ActionEvent(
763 id,
764 fmt::format("Await state completed exceptionally\n\n{}", what),
765 std::nullopt));
766 throw boost::enable_current_exception(
767 daqif::DaqException(id, fmt::format("Await state failed\n\n{}", what)));
768 } catch (...) {
770 id, "Request completed exceptionally: Unknown exception", std::nullopt);
771 throw boost::enable_current_exception(
772 daqif::DaqException(id, "Uknown exception"));
773 }
774 });
775}
776
777std::string
778OcmDaqService::MakeExceptionMessageWithStatus(std::string const& id,
779 std::exception_ptr const& exception) const {
780 auto nested_msg = NestedExceptionReporter(exception).Str();
781 auto alerts_msg = std::string("n/a");
782 try {
783 auto status = m_mgr.GetStatus(id);
784 alerts_msg = fmt::format("{}", status.alerts);
785 } catch (std::exception const& e) {
786 LOG4CPLUS_WARN(m_logger, fmt::format("GetStatus({}) failed:", id, e.what()));
787 }
788 return fmt::format("Errors(s):\n{}\nActive alert(s):\n{}", nested_msg, alerts_msg);
789}
790
791void OcmDaqService::UpdateFrom(daq::DaqContext& context, daq::json::StartDaqV2Spec const& spec) {
792 using PrimaryDataSource = daq::json::StartDaqV2Spec::PrimaryDataSource;
793 using MetadataSource = daq::json::StartDaqV2Spec::MetadataSource;
794 using FitsKeywordsSource = daq::json::FitsKeywordsSource;
795 using FitsFileSource = daq::json::FitsFileSource;
796
797 context.file_id = m_mgr.MakeDaqId(&context.creation_time);
798 if (context.id.empty()) {
799 context.id = !spec.id.empty() ? spec.id : context.file_id;
800 }
801
802 if (spec.await_completion_interval.has_value()) {
804 }
805
806 context.process_name = m_proc_name;
807 context.dp_name_prefix = spec.file_prefix;
808 context.specification = spec;
809
810 // Create sources
811 for (auto const& variant : spec.sources) {
812 if (auto const* ds = std::get_if<PrimaryDataSource>(&variant); ds != nullptr) {
813 context.prim_sources.push_back({ds->source_name, ds->rr_uri});
814 } else if (auto const* ds = std::get_if<MetadataSource>(&variant); ds != nullptr) {
815 context.meta_sources.push_back({ds->source_name, ds->rr_uri});
816 } else if (auto const* ds = std::get_if<FitsKeywordsSource>(&variant); ds != nullptr) {
817 context.results.emplace_back(ds->source_name, ds->keywords);
818 } else if (auto const* ds = std::get_if<FitsFileSource>(&variant); ds != nullptr) {
819 context.results.emplace_back(ds->source_name, ds->location);
820 } else {
821 LOG4CPLUS_ERROR(m_logger, "Unknown variant encountered");
822 }
823 }
824
825 // Check that an instance does not already exist with id
826 if (m_mgr.HaveDaq(context.id)) {
827 auto msg = fmt::format("DAQ with id='{0}' already exist", context.id);
828 LOG4CPLUS_INFO(m_logger, msg);
829 throw boost::enable_current_exception(std::invalid_argument(msg));
830 }
831}
Manager owns DaqController and FitsController (active data acquisitions) instances and multiplexes re...
Definition: manager.hpp:135
virtual boost::future< State > StartDaqAsync(DaqContext ctx)=0
Start DaqController identified by id.
virtual Status GetStatus(std::string_view id) const =0
Get status.
virtual std::string MakeDaqId(std::chrono::system_clock::time_point *time=nullptr) const =0
Creates a new unique identifier based on the instrument id and current time.
virtual bool HaveDaq(std::string_view id, std::string_view file_id={}) const DAQ_NOEXCEPT=0
Query existing data acquisition by id and optional file_id.
Adapter object intended to be used in contexts without direct access to the output-stream object.
Definition: report.hpp:54
std::string Str() const
Convenience function for constructing a std::string from the exception.
Definition: report.cpp:97
Indicates keyword is invalid for some reason.
Definition: keyword.hpp:534
Contains data structure for FITS keywords.
Contains support functions for daqif.
Contains State support functions for daqif.
boost::future< std::vector< std::shared_ptr<::daqif::DaqStatus > > > GetActiveList() override
OcmDaqService(boost::asio::io_context &io_ctx, mal::Mal &mal, daq::Manager &mgr, std::string proc_name, std::string output_path, std::shared_ptr< daq::ObservableEventLog > event_log)
boost::future< std::shared_ptr<::daqif::DaqReply > > StopDaq(const std::string &id) override
boost::future< std::shared_ptr<::daqif::AwaitDaqReply > > AwaitDaqState(const std::string &id, daqif::DaqState state, daqif::DaqSubState substate, double timeout) override
boost::future< std::shared_ptr<::daqif::DaqReply > > AbortDaq(const std::string &id) override
boost::future< std::shared_ptr<::daqif::DaqReply > > StartDaqV2(const std::string &specification) override
boost::future< std::shared_ptr<::daqif::DaqReply > > UpdateKeywords(const std::string &id, const std::string &keywords) override
boost::future< std::shared_ptr<::daqif::DaqReply > > StartDaq(const std::string &id, const std::string &file_prefix, const std::string &primary_sources, const std::string &metadata_sources, const std::string &properties) override
boost::future< std::shared_ptr<::daqif::DaqReply > > ForceStopDaq(const std::string &id) override
boost::future< std::shared_ptr<::daqif::DaqStatus > > GetStatus(const std::string &id) override
boost::future< std::shared_ptr<::daqif::DaqReply > > ForceAbortDaq(const std::string &id) override
Default logger name.
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
Definition: keyword.hpp:423
std::vector< KeywordVariant > ParseJsonKeywords(char const *keywords)
Parse and return FITS keywords.
Definition: json.cpp:124
std::optional< std::chrono::milliseconds > await_completion_interval
Definition: startDaqV2.hpp:55
DpSpec::SourceTypes ParseSource(Json const &json, JsonPointer const &breadcrumb)
Definition: dpSpec.cpp:34
StartDaqV2Spec ParseStartDaqV2Spec(nlohmann::json const &json)
Parse StartDaqSpec.
Definition: startDaqV2.cpp:46
std::vector< DataSourceTypes > sources
Definition: startDaqV2.hpp:51
Structure with a close mapping from JSON representation in the StartDaqV2 MAL request.
Definition: startDaqV2.hpp:33
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
Definition: conversion.cpp:77
std::string_view ToString(daqif::DaqState state) noexcept
Definition: conversion.cpp:160
@ Strict
Any error is considered fatal and may lead to the operation being aborted.
@ Tolerant
Errors that can be ignored with partial completion of a command will be tolerated and is reported as ...
bool IsStateValid(DaqState state, DaqSubState substate)
Validate state combination.
Definition: state.cpp:16
std::ostream & operator<<(std::ostream &os, ParsedSource const &s)
daq::DaqContext ParseStartDaqContext(std::string const &json_properties)
Parse the JSON properties user provides with StartDaq.
ParsedSource ParseSourceUri(std::string_view s)
Parse user provided string in the format "<name>@<rr-uri>".
std::vector< ParsedSource > ParseSourceUris(std::string_view s)
Parse user provided string in the format "<name>@<rr-uri>[ <name>@...]".
Declaration of OcmDaqService.
daq::DaqContext ParseStartDaqContext(std::string const &properties)
Parse the JSON properties user provides with StartDaq.
Contains declaration for for DaqController.
bool operator==(ParsedSource const &rhs) const
std::string name
ParsedSource()=default
std::string rr_uri
Event related to an action being requested or performed.
Definition: eventLog.hpp:56
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:42
std::vector< Source > meta_sources
Definition: daqContext.hpp:75
DpParts results
Results from Data Acquisition (FITS files and keywords).
Definition: daqContext.hpp:100
std::string process_name
User defined process name.
Definition: daqContext.hpp:68
std::vector< daq::fits::KeywordVariant > keywords
Keyword list provided by OCM to Data Product.
Definition: daqContext.hpp:85
std::vector< Source > prim_sources
Definition: daqContext.hpp:74
std::chrono::milliseconds await_interval
Interval (and thus duration) of the requests sent to primary sources to await end of recording.
Definition: daqContext.hpp:92
std::optional< json::StartDaqV2Spec > specification
Optional specification, if DAQ was started using StartDaqV2.
Definition: daqContext.hpp:114
std::string file_id
Data Product FileId as specified by OLAS ICD.
Definition: daqContext.hpp:63
std::string dp_name_prefix
Data product file name prefix.
Definition: daqContext.hpp:73
std::chrono::system_clock::time_point creation_time
Time when DAQ was created.
Definition: daqContext.hpp:106
std::string id
DAQ identfier, possibly provided by user.
Definition: daqContext.hpp:58
Exception indicating the DAQ id was invalid.
Definition: manager.hpp:38
Event directly related to user action, such as a command to do something.
Definition: eventLog.hpp:65
JSON Schema error.
Definition: schemaError.hpp:18