ifw-daq  3.0.1
IFW Data Acquisition modules
requestor.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_ocm_serverctl
4  * @copyright ESO - European Southern Observatory
5  *
6  * @brief
7  */
8 #include <chrono>
9 #include <string>
10 
11 #include <fmt/format.h>
12 #include <nlohmann/json.hpp>
13 
14 #include <daq/conversion.hpp>
15 #include <daqif/parsing.hpp>
16 
17 #include "requestor.hpp"
18 
19 using daq::ToString;
20 
21 namespace daqif {
22 // NOLINTNEXTLINE
23 void to_json(nlohmann::json& j, daqif::DaqStatus const& status) {
24  j = nlohmann::json{{"id", status.getId()},
25  {"fileId", status.getFileId()},
26  {"state", ToString(status.getState())},
27  {"substate", ToString(status.getSubState())},
28  {"error", status.getError()},
29  {"result", status.getResult()},
30  {"message", status.getMessage()},
31  {"timestamp", status.getTimestamp()}};
32 }
33 
34 } // namespace daqif
35 
36 std::ostream& operator<<(std::ostream& os, std::shared_ptr<daqif::DaqStatus> const& s) {
37  os << std::setprecision(std::numeric_limits<long double>::digits10 + 1);
38  os << "DaqStatus:\n"
39  << " id: " << s->getId() << "\n"
40  << " file_id: " << s->getFileId() << "\n"
41  << " state: \"" << ToString(s->getState()) << "\"\n"
42  << " substate: \"" << ToString(s->getSubState()) << "\"\n"
43  << " error: \"" << (s->getError() ? "true" : "false") << "\n"
44  << " message: " << s->getMessage() << "\n"
45  << " result: " << s->getResult() << "\n"
46  << " timestamp: " << s->getTimestamp();
47  os << "\n";
48 
49  return os;
50 }
51 
52 std::ostream& operator<<(std::ostream& os, std::shared_ptr<daqif::DaqReply> const& s) {
53  os << "DaqReply:\n"
54  << " id: " << s->getId() << "\n"
55  << " error: " << (s->getError() ? "true" : "false") << "\n";
56  return os;
57 }
58 
59 std::ostream& operator<<(std::ostream& os, std::shared_ptr<daqif::AwaitDaqReply> const& s) {
60  os << "AwaitDaqReply:\n"
61  << " timeout: " << (s->getTimeout() ? "true" : "false") << "\n"
62  << " status: " << s->getStatus() << "\n";
63  return os;
64 }
65 
66 std::ostream&
67 operator<<(std::ostream& os, std::vector<std::shared_ptr<daqif::DaqStatus>> const& vec) {
68  for (auto const& s : vec) {
69  os << "----\n";
70  os << s;
71  }
72  return os;
73 }
74 
75 std::ostream& operator<<(std::ostream& os, JsonPrint<std::shared_ptr<daqif::DaqStatus>> const& s) {
76  nlohmann::json j_status;
77  daqif::to_json(j_status, *s.t);
78  os << j_status.dump(2);
79  return os;
80 }
81 
82 std::ostream&
83 operator<<(std::ostream& os, JsonPrint<std::vector<std::shared_ptr<daqif::DaqStatus>>> const& vec) {
84  nlohmann::json j_array = nlohmann::json::array();
85  for (auto const& s : vec.t) {
86  nlohmann::json j_status;
87  daqif::to_json(j_status, *s);
88  j_array.push_back(j_status);
89  }
90  os << j_array.dump(2);
91  return os;
92 }
93 
94 std::ostream& operator<<(std::ostream& os, JsonPrint<std::shared_ptr<daqif::DaqReply>> const& s) {
95  os << "{\n"
96  " \"id\": \""
97  << s.t->getId()
98  << "\",\n"
99  " \"error\": "
100  << (s.t->getError() ? "true" : "false") << "\n";
101  os << "}";
102 
103  return os;
104 }
105 
106 std::ostream&
107 operator<<(std::ostream& os, JsonPrint<std::shared_ptr<daqif::AwaitDaqReply>> const& s) {
108  auto const& reply = *s.t;
109  auto const& rep = *reply.getStatus();
110  nlohmann::json j_status;
111  daqif::to_json(j_status, rep);
112  nlohmann::json j{{"timeout", reply.getTimeout()}, {"status", j_status}};
113  os << j.dump(2);
114  return os;
115 }
116 
117 std::ostream& operator<<(std::ostream& os, JsonPrint<std::string> const& s) {
118  os << "\"" << s.t << "\"";
119  return os;
120 }
121 
122 Requestor::Requestor(CommonArgs& args) : m_args(args) {
123 }
124 
125 void Requestor::AddOptions(CLI::App* app) {
126 }
127 
128 void SetLogLevelRequestor::AddOptions(CLI::App* sub) {
129  sub->add_option("logger", m_logger, "Logger name");
130  sub->add_option("level", m_level, "Log level");
131 }
132 
133 void SetLogLevelRequestor::Handle(::stdif::StdCmdsSync& std_cmds,
134  ::daqif::OcmDaqControlSync& daq_cmds,
135  CLI::App* app) {
136  auto log_info = m_args.mal->createDataEntity<stdif::LogInfo>();
137  log_info->setLogger(m_logger);
138  log_info->setLevel(m_level);
139  auto reply = std_cmds.SetLogLevel(log_info);
140  if (m_args.json) {
141  std::cout << JsonPrint(reply);
142  } else {
143  std::cout << reply << std::endl;
144  }
145 }
146 
147 void StartDaqRequestor::AddOptions(CLI::App* sub) {
148  sub->add_option("--id", m_id, "DAQ ID");
149  sub->add_option("--prefix", m_prefix, "File prefix");
150  sub->add_option("--properties", m_properties, "JSON properties");
151  sub->add_option("primary",
153  "Space separated list of primary data sources, each in the form `name@uri`, "
154  "e.g. \"dcs1@zpb.rr://10.207.214.220:1234/RecCmds\"");
155  sub->add_option("metadata",
157  "Space separated list of metadata data sources, each in the form `name@uri`, "
158  "e.g. \"fcf1@zpb.rr://10.207.214.218:1234/MetaDaq\"");
159 }
160 
161 void StartDaqRequestor::Handle(::stdif::StdCmdsSync& std_cmds,
162  ::daqif::OcmDaqControlSync& daq_cmds,
163  CLI::App* app) {
164  auto reply =
166  if (m_args.json) {
167  std::cout << JsonPrint(reply);
168  } else {
169  std::cout << reply << std::endl;
170  }
171 }
172 
173 void StartDaqV2Requestor::AddOptions(CLI::App* sub) {
174  auto file_or_json = [](std::string const& arg) -> std::string {
175  if (arg.size() > 1 && arg.front() == '@') {
176  auto path = arg.substr(1);
177  if (path == "-") {
178  std::string file_body((std::istreambuf_iterator<char>(std::cin)),
179  (std::istreambuf_iterator<char>()));
180  return file_body;
181  } else {
182  std::ifstream f(path);
183  if (!f.is_open()) {
184  throw std::invalid_argument(
185  fmt::format("File not found or readable: {}", arg.substr(1)));
186  }
187  std::string file_body((std::istreambuf_iterator<char>(f)),
188  (std::istreambuf_iterator<char>()));
189  return file_body;
190  }
191  } else {
192  return arg;
193  }
194  };
195  sub->add_option("specification",
196  m_spec,
197  "JSON specification. Load from file by prefixing with '@', e.g. @file.json\n"
198  "or from stdin '-' with @-")
199  ->transform(file_or_json)
200  ->required();
201 }
202 
203 void StartDaqV2Requestor::Handle(::stdif::StdCmdsSync& std_cmds,
204  ::daqif::OcmDaqControlSync& daq_cmds,
205  CLI::App* app) {
206  auto reply = daq_cmds.StartDaqV2(m_spec);
207  if (m_args.json) {
208  std::cout << JsonPrint(reply);
209  } else {
210  std::cout << reply << std::endl;
211  }
212 }
213 
215  sub->add_option("id", m_id, "DAQ ID");
216  sub->add_option("keywords", m_keywords, "JSON encoded list of keywords");
217 }
218 
219 void UpdateKeywordsRequestor::Handle(::stdif::StdCmdsSync& std_cmds,
220  ::daqif::OcmDaqControlSync& daq_cmds,
221  CLI::App* app) {
222  auto reply = daq_cmds.UpdateKeywords(m_id, m_keywords);
223  if (m_args.json) {
224  std::cout << JsonPrint(reply);
225  } else {
226  std::cout << reply << std::endl;
227  }
228 }
229 
230 void AwaitStateRequestor::AddOptions(CLI::App* sub) {
231  sub->add_option("id", m_id, "DAQ ID");
232  sub->add_option("state", m_state, "State to await");
233  sub->add_option("substate", m_substate, "Substate to await");
234  sub->add_option("timeout", m_timeout, "timeout in [fractional] seconds");
235  sub->callback([&] {
236  auto min_timeout = m_timeout + 2;
237  if (m_args.timeout < min_timeout) {
238  std::cerr << "Note: Setting request timeout to " << min_timeout
239  << "s due to await timeout exceeding request timeout\n";
240  m_args.timeout = min_timeout;
241  }
242  });
243 }
244 
245 void AwaitStateRequestor::Handle(::stdif::StdCmdsSync& std_cmds,
246  ::daqif::OcmDaqControlSync& daq_cmds,
247  CLI::App* app) {
248  auto state = daqif::FromString<daqif::DaqState>(m_state);
249  auto substate = daqif::FromString<daqif::DaqSubState>(m_substate);
250  auto reply = daq_cmds.AwaitDaqState(m_id, state, substate, m_timeout);
251  if (m_args.json) {
252  std::cout << JsonPrint(reply);
253  } else {
254  std::cout << reply << std::endl;
255  }
256 }
Contains support functions for daqif.
std::string_view ToString(daqif::DaqState state) noexcept
Definition: conversion.cpp:146
void to_json(nlohmann::json &j, daqif::DaqStatus const &status)
Definition: requestor.cpp:23
Contains parse functions for daqif.
std::ostream & operator<<(std::ostream &os, std::shared_ptr< daqif::DaqStatus > const &s)
Definition: requestor.cpp:36
void Handle(::stdif::StdCmdsSync &std_cmds, ::daqif::OcmDaqControlSync &daq_cmds, CLI::App *arg) override
Receives a parsed variables_map that include the options added with AddOptions.
Definition: requestor.cpp:245
void AddOptions(CLI::App *app) override
Add arguments to command.
Definition: requestor.cpp:230
unsigned timeout
Definition: requestor.hpp:47
elt::mal::Mal * mal
Definition: requestor.hpp:50
T const & t
Definition: requestor.hpp:33
virtual void AddOptions(CLI::App *app)
Add arguments to command.
Definition: requestor.cpp:125
Requestor(CommonArgs &args)
Definition: requestor.cpp:122
CommonArgs & m_args
Definition: requestor.hpp:73
virtual void Handle(::stdif::StdCmdsSync &std_cmds, ::daqif::OcmDaqControlSync &daq_cmds, CLI::App *arg) override
Receives a parsed variables_map that include the options added with AddOptions.
Definition: requestor.cpp:133
virtual void AddOptions(CLI::App *app) override
Add arguments to command.
Definition: requestor.cpp:128
std::string m_primary_sources
Definition: requestor.hpp:174
std::string m_id
Definition: requestor.hpp:172
void AddOptions(CLI::App *app) override
Add arguments to command.
Definition: requestor.cpp:147
void Handle(::stdif::StdCmdsSync &std_cmds, ::daqif::OcmDaqControlSync &daq_cmds, CLI::App *arg) override
Receives a parsed variables_map that include the options added with AddOptions.
Definition: requestor.cpp:161
std::string m_prefix
Definition: requestor.hpp:173
std::string m_metadata_sources
Definition: requestor.hpp:175
std::string m_properties
Definition: requestor.hpp:176
void Handle(::stdif::StdCmdsSync &std_cmds, ::daqif::OcmDaqControlSync &daq_cmds, CLI::App *arg) override
Receives a parsed variables_map that include the options added with AddOptions.
Definition: requestor.cpp:203
std::string m_spec
Definition: requestor.hpp:189
void AddOptions(CLI::App *app) override
Add arguments to command.
Definition: requestor.cpp:173
void AddOptions(CLI::App *app) override
Add arguments to command.
Definition: requestor.cpp:214
void Handle(::stdif::StdCmdsSync &std_cmds, ::daqif::OcmDaqControlSync &daq_cmds, CLI::App *arg) override
Receives a parsed variables_map that include the options added with AddOptions.
Definition: requestor.cpp:219