11 #include <fmt/format.h> 
   12 #include <fmt/ostream.h> 
   13 #include <log4cplus/loggingmacros.h> 
   14 #include <mal/rr/qos/ConnectionTime.hpp> 
   15 #include <nlohmann/json.hpp> 
   23 using boost::enable_current_exception;
 
   24 using boost::make_exceptional_future;
 
   27 using std::chrono::duration_cast;
 
   28 using std::chrono::milliseconds;
 
   32 std::vector<daq::DaqContext::Source> 
ParseSource(std::string 
const& str) {
 
   33     std::vector<daq::DaqContext::Source> sources;
 
   36     sources.reserve(raw_sources.size());
 
   37     for (
auto const& raw : raw_sources) {
 
   38         sources.push_back({raw.name, raw.rr_uri});
 
   47     using std::chrono::duration_cast;
 
   48     using std::chrono::milliseconds;
 
   49     using Seconds = std::chrono::duration<double>;
 
   52     if (json_properties.empty()) {
 
   57     auto json = nlohmann::json::parse(json_properties);
 
   58     if (!
json.is_object()) {
 
   59         throw boost::enable_current_exception(std::invalid_argument(
 
   60             fmt::format(
"expected type object but got type {}", 
json.type_name())));
 
   62     if (
json.contains(
"keywords")) {
 
   65     if (
json.contains(
"awaitInterval")) {
 
   66         auto& value = 
json[
"awaitInterval"];
 
   67         if (!value.is_number()) {
 
   68             throw boost::enable_current_exception(std::invalid_argument(
 
   69                 fmt::format(
"'awaitInterval': unsupported type: {}", value.type_name())));
 
   71         auto await_interval = value.get<
double>();
 
   72         if (await_interval < 0.0) {
 
   73             throw boost::enable_current_exception(std::invalid_argument(
 
   74                 fmt::format(
"'awaitInterval' must be positive number, got {}", await_interval)));
 
   76         properties.
await_interval = duration_cast<milliseconds>(Seconds(value.get<
double>()));
 
   86     : name(std::move(name)), rr_uri(std::move(rr_uri)) {
 
   89     os << 
"name: '" << s.
name << 
"', rr_uri='" << s.
rr_uri << 
"'";
 
   94     auto start = s.find_first_not_of(
' ');
 
   95     auto name_end_pos = s.find_first_of(
'@');
 
   97     if (name_end_pos == std::string_view::npos) {
 
   98         throw boost::enable_current_exception(
 
   99             std::invalid_argument(
"separator '@' not found in expected format 'name@rr-uri'"));
 
  101     auto name = s.substr(start, name_end_pos - start);
 
  103         throw boost::enable_current_exception(
 
  104             std::invalid_argument(
"name part in 'name@rr-uri' is empty"));
 
  107     start = name_end_pos + 1;
 
  108     if (start >= s.size()) {
 
  109         throw boost::enable_current_exception(
 
  110             std::invalid_argument(
"invalid format string, expected format 'name@rr-uri'"));
 
  112     auto rr_uri_end_pos = s.find_first_of(
" ,", start);
 
  113     if (name_end_pos == std::string_view::npos) {
 
  114         throw boost::enable_current_exception(std::invalid_argument(
"separator ',' not found"));
 
  117     auto rr_uri = s.substr(start, rr_uri_end_pos - start);
 
  118     if (rr_uri.empty()) {
 
  119         throw boost::enable_current_exception(
 
  120             std::invalid_argument(
"rr_uri part in 'name@rr-uri' is empty"));
 
  122     return ParsedSource(std::string(name), std::string(rr_uri));
 
  131     std::vector<ParsedSource> result;
 
  134     while (begin < s.size()) {
 
  135         const auto end = s.find_first_of(
' ', begin);
 
  138             result.emplace_back(
ParseSourceUri(s.substr(begin, end - begin)));
 
  141         if (end == std::string_view::npos) {
 
  154                              std::string proc_name,
 
  155                              std::string output_path,
 
  156                              std::shared_ptr<daq::ObservableEventLog> event_log)
 
  158     , m_executor(m_io_ctx)
 
  161     , m_proc_name(std::move(proc_name))
 
  162     , m_output_path(std::move(output_path))
 
  163     , m_event_log(std::move(event_log))
 
  164     , m_log_observer_connection()
 
  165     , m_log_observer(log4cplus::Logger::getInstance(
"daq.eventlog"))
 
  166     , m_logger(log4cplus::Logger::getInstance(
LOGGER_NAME)) {
 
  167     m_log_observer_connection =
 
  168         m_event_log->ConnectObserver(std::reference_wrapper(m_log_observer));
 
  169     if (m_proc_name.empty()) {
 
  170         throw boost::enable_current_exception(
 
  171             std::invalid_argument(
"OcmDaqService: Process name cannot be empty"));
 
  176     m_log_observer_connection.disconnect();
 
  179 boost::future<std::shared_ptr<::daqif::DaqReply>>
 
  181                         const std::string& file_prefix,
 
  182                         const std::string& primary_sources,
 
  183                         const std::string& metadata_sources,
 
  184                         const std::string& json_properties) {
 
  187                [=, 
self = shared_from_this()]() 
mutable {
 
  190                        fmt::format(
"Request received: " 
  191                                    "StartDaq(id='{0}', file_prefix='{1}', " 
  192                                    "primary_sources='{2}', metadata_sources='{3}', " 
  193                                    "json_properties='{4}'",
 
  200                    std::filesystem::path prefix(file_prefix);
 
  201                    if (prefix.has_parent_path()) {
 
  202                        return boost::make_exceptional_future<std::shared_ptr<::daqif::DaqReply>>(
 
  205                                fmt::format(
"file_prefix \"{}\" may not contain parent paths",
 
  213                    } 
catch (std::exception 
const& e) {
 
  215                            fmt::format(
"Failed to parse StartDaq JSON properties: {}", e.what());
 
  216                        self->m_event_log->AddEvent(
daq::ErrorEvent(
id, msg, std::nullopt, 
"user"));
 
  217                        return boost::make_exceptional_future<std::shared_ptr<::daqif::DaqReply>>(
 
  218                            daqif::DaqException(
id, msg));
 
  222                    auto validated_id = id;
 
  223                    if (validated_id.empty()) {
 
  225                        validated_id = context.
file_id;
 
  228                            fmt::format(
"StartDaq(id='{0}'): Created and assigned DAQ id='{0}'",
 
  232                        if (self->m_mgr.HaveDaq(validated_id)) {
 
  235                                fmt::format(
"StartDaq(id='{0}'): DAQ with id='{0}' already exist",
 
  237                            return boost::make_exceptional_future<std::shared_ptr<daqif::DaqReply>>(
 
  238                                daqif::DaqException(
id,
 
  239                                                    "Data acquisition with same id already exist"));
 
  244                        context.
id = validated_id;
 
  253                        return self->m_mgr.StartDaqAsync(context).then(
 
  255                            [weak_self = std::weak_ptr(self->shared_from_this()), 
id = validated_id](
 
  256                                boost::future<daq::State> f) -> std::shared_ptr<daqif::DaqReply> {
 
  257                                auto self = weak_self.lock();
 
  259                                    LOG4CPLUS_WARN(LOGGER_NAME,
 
  260                                                   fmt::format(
"StartDaq(id='{}'): StartDaqAsync is " 
  261                                                               "complete but MAL service has " 
  262                                                               "been abandoned. Throwing exception.",
 
  264                                    throw boost::enable_current_exception(
 
  265                                        daqif::DaqException(id, 
"Service has been abandoned"));
 
  269                                    auto rep = self->m_mal.createDataEntity<daqif::DaqReply>();
 
  272                                    rep->setError(false);
 
  277                                    LOG4CPLUS_ERROR(self->m_logger,
 
  278                                                    fmt::format(
"StartDaq(id='{}'): StartDaqAsync " 
  279                                                                "completed with failure: {}",
 
  283                                    throw boost::enable_current_exception(daqif::DaqException(
 
  284                                        id, fmt::format(
"Start failed: {}", what)));
 
  287                    } 
catch (std::invalid_argument 
const& e) {
 
  288                        LOG4CPLUS_INFO(self->m_logger,
 
  289                                       fmt::format(
"StartDaq(id='{}'): Invalid argument error while " 
  290                                                   "processing request: {}",
 
  293                        return boost::make_exceptional_future<std::shared_ptr<daqif::DaqReply>>(
 
  294                            daqif::DaqException(validated_id, e.what()));
 
  295                    } 
catch (std::exception 
const& e) {
 
  296                        LOG4CPLUS_INFO(self->m_logger,
 
  297                                       fmt::format(
"StartDaq(id='{}'): Error while" 
  298                                                   "processing request: {}",
 
  301                        return boost::make_exceptional_future<std::shared_ptr<daqif::DaqReply>>(
 
  302                            daqif::DaqException(validated_id, e.what()));
 
  306                            fmt::format(
"StartDaq(id='{}'): Unknown error while processing request",
 
  308                        return boost::make_exceptional_future<std::shared_ptr<daqif::DaqReply>>(
 
  309                            daqif::DaqException(validated_id, 
"Uknown error"));
 
  319                [
self = shared_from_this(), 
id]() {
 
  321                                                                     fmt::format(
"Request received: " 
  325                    return self->StopDaq(
id, 
false);
 
  330 boost::future<std::shared_ptr<::daqif::DaqReply>>
 
  332     return boost::async(m_executor,
 
  333                         [
self = shared_from_this(), 
id]() {
 
  334                             self->m_event_log->AddEvent(
 
  336                                                      fmt::format(
"Request received: " 
  337                                                                  "ForceStopDaq(id='{0}')",
 
  340                             return self->StopDaq(
id, 
true);
 
  345 boost::future<std::shared_ptr<::daqif::DaqReply>>
 
  349                [
self = shared_from_this(), 
id, forced]() {
 
  355                            [weak_self = std::weak_ptr(self->shared_from_this()),
 
  356                             id](boost::future<daq::Status> f) -> std::shared_ptr<daqif::DaqReply> {
 
  357                                auto self = weak_self.lock();
 
  359                                    LOG4CPLUS_WARN(LOGGER_NAME,
 
  360                                                   fmt::format(
"StopDaq(id='{}'): StopDaqAsync is " 
  361                                                               "complete but MAL service has " 
  362                                                               "been abandoned. Throwing exception.",
 
  364                                    throw boost::enable_current_exception(
 
  365                                        daqif::DaqException(id, 
"Service has been abandoned"));
 
  369                                    auto rep = self->m_mal.createDataEntity<daqif::DaqReply>();
 
  372                                    rep->setError(false);
 
  377                                    LOG4CPLUS_INFO(self->m_logger,
 
  378                                                   fmt::format(
"StopDaq(id='{}'): " 
  379                                                               "completed with failure: {}",
 
  382                                    throw boost::enable_current_exception(daqif::DaqException(
 
  383                                        id, fmt::format(
"Stop failed: {}", what)));
 
  392     return boost::async(m_executor,
 
  393                         [
self = shared_from_this(), 
id]() {
 
  394                             self->m_event_log->AddEvent(
 
  396                                                      fmt::format(
"Request received: " 
  397                                                                  "AbortDaq(id='{0}')",
 
  400                             return self->AbortDaq(
id, 
false);
 
  405 boost::future<std::shared_ptr<::daqif::DaqReply>>
 
  407     return boost::async(m_executor,
 
  408                         [
self = shared_from_this(), 
id]() {
 
  409                             self->m_event_log->AddEvent(
 
  411                                                      fmt::format(
"Request received: " 
  412                                                                  "ForceAbortDaq(id='{0}')",
 
  415                             return self->AbortDaq(
id, 
true);
 
  420 boost::future<std::shared_ptr<::daqif::DaqReply>>
 
  424                [
self = shared_from_this(), 
id, forced]() {
 
  430                            [weak_self = std::weak_ptr(self->shared_from_this()), 
id, forced](
 
  431                                boost::future<daq::Status> f) -> std::shared_ptr<daqif::DaqReply> {
 
  432                                auto self = weak_self.lock();
 
  436                                        fmt::format(
"AbortDaq(id='{}', forced={}): AbortDaqAsync is " 
  437                                                    "complete but MAL service has " 
  438                                                    "been abandoned. Throwing exception.",
 
  441                                    throw boost::enable_current_exception(
 
  442                                        daqif::DaqException(id, 
"Service has been abandoned"));
 
  445                                    auto result = f.get();
 
  448                                        fmt::format(
"AbortDaq(id='{}', forced={}): " 
  449                                                    "AbortDaqAsync Completed successfully",
 
  452                                    auto rep = self->m_mal.createDataEntity<daqif::DaqReply>();
 
  455                                    rep->setError(result.error);
 
  458                                        fmt::format(
"AbortDaq(id='{}', forced={}): " 
  459                                                    "AbortDaqAsync Completed, returning reply now.",
 
  466                                    LOG4CPLUS_ERROR(self->m_logger,
 
  467                                                    fmt::format(
"AbortDaq(id='{}', forced={}): " 
  468                                                                "AbortDaqAsync Completed " 
  469                                                                "with fatal error:\n{}",
 
  473                                    throw boost::enable_current_exception(
 
  474                                        daqif::DaqException(
id, what));
 
  482 boost::future<std::shared_ptr<::daqif::DaqReply>>
 
  486         [
self = shared_from_this(), 
id, keywords]() -> std::shared_ptr<::daqif::DaqReply> {
 
  487             self->m_event_log->AddEvent(
 
  489                                      fmt::format(
"Request received: " 
  490                                                  "UpdateKeywords(id='{0}', keywords='{1}')",
 
  497             } 
catch (nlohmann::json::exception 
const& e) {
 
  500                     fmt::format(
"UpdateKeywords(id='{}', ...): Failed to parse JSON", 
id));
 
  501                 throw boost::enable_current_exception(
 
  502                     daqif::DaqException(
id, fmt::format(
"Invalid JSON string: {}", e.what())));
 
  503             } 
catch (std::invalid_argument 
const& e) {
 
  507                         "UpdateKeywords(id='{}', ...): JSON could be parsed but was invalid " 
  510                 throw boost::enable_current_exception(
 
  511                     daqif::DaqException(
id, fmt::format(
"Invalid JSON schema: {}", e.what())));
 
  512             } 
catch (std::exception 
const& e) {
 
  516                         "UpdateKeywords(id='{}', ...): std::exception: '{}'", 
id, e.what()));
 
  517                 throw boost::enable_current_exception(
 
  518                     daqif::DaqException(
id, fmt::format(
"std::exception: {}", e.what())));
 
  520                 throw boost::enable_current_exception(daqif::DaqException(
id, 
"unknown error"));
 
  523                 self->m_mgr.UpdateKeywords(
id, parsed_keywords);
 
  524                 auto rep = 
self->m_mal.createDataEntity<daqif::DaqReply>();
 
  526                 rep->setError(
false);
 
  528             } 
catch (std::invalid_argument 
const& e) {
 
  531                     fmt::format(
"UpdateKeywords(id='{}'): Invalid data acquisition id", 
id));
 
  532                 throw boost::enable_current_exception(
 
  533                     daqif::DaqException(
id, fmt::format(
"No data acquisition with id='{}'", 
id)));
 
  534             } 
catch (std::exception 
const& e) {
 
  538                         "UpdateKeywords(id='{}', ...): std::exception: '{}'", 
id, e.what()));
 
  539                 throw boost::enable_current_exception(
 
  540                     daqif::DaqException(
id, fmt::format(
"std::exception: {}", e.what())));
 
  542                 throw boost::enable_current_exception(daqif::DaqException(
id, 
"unknown error"));
 
  550                [
self = shared_from_this(), 
id]() {
 
  551                    self->m_event_log->AddEvent(
 
  553                                             fmt::format(
"Request received: " 
  554                                                         "GetStatus(id='{0}')",
 
  558                        LOG4CPLUS_INFO(self->m_logger, fmt::format(
"GetStatus(id='{}'): Enter", 
id));
 
  559                        auto status = 
self->m_mgr.GetStatus(
id);
 
  560                        auto rep = 
self->m_mal.createDataEntity<daqif::DaqStatus>();
 
  565                            fmt::format(
"GetStatus(id='{}'): Set result -> {}", 
id, status.state));
 
  566                        return boost::make_ready_future<std::shared_ptr<daqif::DaqStatus>>(rep);
 
  567                    } 
catch (std::invalid_argument 
const&) {
 
  570                            fmt::format(
"GetStatus(id='{}'): Invalid data acquisition id", 
id));
 
  571                        return boost::make_exceptional_future<std::shared_ptr<daqif::DaqStatus>>(
 
  572                            boost::enable_current_exception(daqif::DaqException(
 
  573                                id, fmt::format(
"No data acquisition with id='{}'", 
id))));
 
  582         [
self = shared_from_this()]() -> std::vector<std::shared_ptr<::daqif::DaqStatus>> {
 
  587             auto daqs = 
self->m_mgr.GetDaqControllers();
 
  588             std::vector<std::shared_ptr<daq::DaqController const>> active;
 
  589             std::vector<std::shared_ptr<daqif::DaqStatus>> reply;
 
  590             std::copy_if(daqs.begin(), daqs.end(), std::back_inserter(active), [](
auto daq_ctl) {
 
  591                 return !daq::IsFinalState(daq_ctl->GetState());
 
  593             std::transform(active.begin(),
 
  595                            std::back_inserter(reply),
 
  596                            [&
mal = self->m_mal](
auto daq_ctl) {
 
  597                                auto mal_status = mal.createDataEntity<daqif::DaqStatus>();
 
  598                                *mal_status << daq_ctl->GetStatus()->GetStatus();
 
  606     const std::string& 
id, daqif::DaqState state, daqif::DaqSubState substate, 
double timeout) {
 
  607     using Seconds = std::chrono::duration<double>;
 
  611                [=, 
self = shared_from_this()]() {
 
  613                                                                     format(
"Request received: " 
  614                                                                            "AwaitDaqState(id='{}', " 
  615                                                                            "state={}, substate={}, " 
  623                        return boost::make_exceptional_future<daq::Result<daq::Status>>(
 
  624                            std::invalid_argument(
 
  625                                format(
"Invalid argument `timeout`. Must be > 0", timeout)));
 
  628                        return boost::make_exceptional_future<daq::Result<daq::Status>>(
 
  629                            std::invalid_argument(fmt::format(
 
  630                                "Invalid state combination: {} and {}", state, substate)));
 
  633                    return self->m_mgr.AwaitDaqStateAsync(
 
  634                        id, daq_state, duration_cast<milliseconds>(Seconds(timeout)));
 
  638               [
id, 
self = shared_from_this()](boost::future<daq::Result<daq::Status>> fut) {
 
  640                       auto [timeout, status] = fut.get();
 
  641                       auto mal_reply = 
self->m_mal.createDataEntity<daqif::AwaitDaqReply>();
 
  643                       mal_reply->setTimeout(timeout);
 
  644                       auto mal_status_ptr = mal_reply->getStatus();
 
  645                       assert(mal_status_ptr);
 
  646                       *mal_status_ptr << status;
 
  649                           fmt::format(
"Request completed: {}",
 
  650                                       (timeout ? 
"condition not yet satisfied (timeout)" 
  651                                                : 
"condition satisfied")),
 
  654                   } 
catch (std::exception 
const& e) {
 
  657                           fmt::format(
"Request completed exceptionally: {}", e.what()),
 
  659                       throw boost::enable_current_exception(daqif::DaqException(
id, e.what()));
 
  662                           id, 
"Request completed exceptionally: Unknown exception", std::nullopt);
 
  663                       throw boost::enable_current_exception(
 
  664                           daqif::DaqException(
id, 
"Uknown exception"));