ifw-daq  3.0.0-pre2
IFW Data Acquisition modules
requestor.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_ocm_serverctl
4  * @copyright ESO - European Southern Observatory
5  *
6  * @brief
7  */
8 #include <chrono>
9 #include <string>
10 
11 #include <fmt/format.h>
12 
13 #include <daq/conversion.hpp>
14 #include <daqif/parsing.hpp>
15 
16 #include "requestor.hpp"
17 
18 using daq::ToString;
19 
20 std::ostream& operator<<(std::ostream& os, std::shared_ptr<daqif::DaqStatus> const& s) {
21  os << std::setprecision(std::numeric_limits<long double>::digits10 + 1);
22  os << "DaqStatus:\n"
23  << " id: " << s->getId() << "\n"
24  << " file_id: " << s->getFileId() << "\n"
25  << " state: \"" << ToString(s->getState()) << "\"\n"
26  << " substate: \"" << ToString(s->getSubState()) << "\"\n"
27  << " error: \"" << (s->getError() ? "true" : "false") << "\n"
28  << " message: " << s->getMessage() << "\n"
29  << " result: " << s->getResult() << "\n"
30  << " timestamp: " << s->getTimestamp();
31  os << "\n";
32 
33  return os;
34 }
35 
36 std::ostream& operator<<(std::ostream& os, std::shared_ptr<daqif::DaqReply> const& s) {
37  os << "DaqReply:\n"
38  << " id: " << s->getId() << "\n"
39  << " error: " << (s->getError() ? "true" : "false") << "\n";
40  return os;
41 }
42 
43 std::ostream& operator<<(std::ostream& os, std::shared_ptr<daqif::AwaitDaqReply> const& s) {
44  os << "AwaitDaqReply:\n"
45  << " timeout: " << (s->getTimeout() ? "true" : "false") << "\n"
46  << " status: " << s->getStatus() << "\n";
47  return os;
48 }
49 
50 std::ostream&
51 operator<<(std::ostream& os, std::vector<std::shared_ptr<daqif::DaqStatus>> const& vec) {
52  for (auto const& s : vec) {
53  os << JsonPrint(s);
54  os << "\n";
55  }
56  return os;
57 }
58 
59 std::ostream& operator<<(std::ostream& os, JsonPrint<std::shared_ptr<daqif::DaqStatus>> const& s) {
60  os << std::setprecision(std::numeric_limits<long double>::digits10 + 1);
61  os << "{\n"
62  " \"id\": \""
63  << s.t->getId()
64  << "\",\n"
65  " \"file_id\": \""
66  << s.t->getFileId()
67  << "\",\n"
68  " \"state\": \""
69  << ToString(s.t->getState())
70  << "\",\n"
71  " \"substate\": \""
72  << ToString(s.t->getSubState())
73  << "\",\n"
74  " \"error\": "
75  << (s.t->getError() ? "true" : "false")
76  << ",\n"
77  " \"result\": \""
78  << s.t->getResult()
79  << "\",\n"
80  " \"timestamp\": "
81  << s.t->getTimestamp() << "\n"
82  << "}";
83  return os;
84 }
85 
86 std::ostream&
87 operator<<(std::ostream& os, JsonPrint<std::vector<std::shared_ptr<daqif::DaqStatus>>> const& vec) {
88  os << "[\n";
89  size_t num = 0;
90  for (auto const& s : vec.t) {
91  num++;
92  os << JsonPrint(s);
93  if (num < vec.t.size()) {
94  os << ",\n";
95  }
96  }
97  os << "\n]";
98  return os;
99 }
100 
101 std::ostream& operator<<(std::ostream& os, JsonPrint<std::shared_ptr<daqif::DaqReply>> const& s) {
102  os << "{\n"
103  " \"id\": \""
104  << s.t->getId()
105  << "\",\n"
106  " \"error\": "
107  << (s.t->getError() ? "true" : "false") << "\n";
108  os << "}";
109 
110  return os;
111 }
112 
113 std::ostream&
114 operator<<(std::ostream& os, JsonPrint<std::shared_ptr<daqif::AwaitDaqReply>> const& s) {
115  os << "{\n"
116  " \"timeout\": "
117  << (s.t->getTimeout() ? "true" : "false")
118  << ",\n"
119  " \"status\": "
120  << JsonPrint(s.t->getStatus()) << "\n";
121  os << "}";
122 
123  return os;
124 }
125 
126 std::ostream& operator<<(std::ostream& os, JsonPrint<std::string> const& s) {
127  os << "\"" << s.t << "\"";
128  return os;
129 }
130 
131 Requestor::Requestor(CommonArgs& args) : m_args(args) {
132 }
133 
134 void Requestor::AddOptions(CLI::App* app) {
135 }
136 
137 void SetLogLevelRequestor::AddOptions(CLI::App* sub) {
138  sub->add_option("logger", m_logger, "Logger name");
139  sub->add_option("level", m_level, "Log level");
140 }
141 
142 void SetLogLevelRequestor::Handle(::stdif::StdCmdsSync& std_cmds,
143  ::daqif::OcmDaqControlSync& daq_cmds,
144  CLI::App* app) {
145  auto log_info = m_args.mal->createDataEntity<stdif::LogInfo>();
146  log_info->setLogger(m_logger);
147  log_info->setLevel(m_level);
148  auto reply = std_cmds.SetLogLevel(log_info);
149  if (m_args.json) {
150  std::cout << JsonPrint(reply);
151  } else {
152  std::cout << "Reply: " << reply << std::endl;
153  }
154 }
155 
156 void StartDaqRequestor::AddOptions(CLI::App* sub) {
157  sub->add_option("--id", m_id, "DAQ ID");
158  sub->add_option("--prefix", m_prefix, "File prefix");
159  sub->add_option("--properties", m_properties, "JSON properties");
160  sub->add_option("primary", m_primary_sources, "Primary data sources");
161  sub->add_option("metadata", m_metadata_sources, "Metadata sources");
162 }
163 
164 void StartDaqRequestor::Handle(::stdif::StdCmdsSync& std_cmds,
165  ::daqif::OcmDaqControlSync& daq_cmds,
166  CLI::App* app) {
167  auto reply =
169  if (m_args.json) {
170  std::cout << JsonPrint(reply);
171  } else {
172  std::cout << "Reply: " << reply << std::endl;
173  }
174 }
175 
176 void StartDaqV2Requestor::AddOptions(CLI::App* sub) {
177  auto file_or_json = [](std::string const& arg) -> std::string {
178  if (arg.size() > 1 && arg.front() == '@') {
179  auto path = arg.substr(1);
180  if (path == "-") {
181  std::string file_body((std::istreambuf_iterator<char>(std::cin)),
182  (std::istreambuf_iterator<char>()));
183  return file_body;
184  } else {
185  std::ifstream f(path);
186  if (!f.is_open()) {
187  throw std::invalid_argument(
188  fmt::format("File not found or readable: {}", arg.substr(1)));
189  }
190  std::string file_body((std::istreambuf_iterator<char>(f)),
191  (std::istreambuf_iterator<char>()));
192  return file_body;
193  }
194  } else {
195  return arg;
196  }
197  };
198  sub->add_option("specification",
199  m_spec,
200  "JSON specification. Load from file by prefixing with '@', e.g. @file.json\n"
201  "or from stdin '-' with @-")
202  ->transform(file_or_json)
203  ->required();
204 }
205 
206 void StartDaqV2Requestor::Handle(::stdif::StdCmdsSync& std_cmds,
207  ::daqif::OcmDaqControlSync& daq_cmds,
208  CLI::App* app) {
209  auto reply = daq_cmds.StartDaqV2(m_spec);
210  if (m_args.json) {
211  std::cout << JsonPrint(reply);
212  } else {
213  std::cout << "Reply: " << reply << std::endl;
214  }
215 }
216 
218  sub->add_option("id", m_id, "DAQ ID");
219  sub->add_option("keywords", m_keywords, "JSON encoded list of keywords");
220 }
221 
222 void UpdateKeywordsRequestor::Handle(::stdif::StdCmdsSync& std_cmds,
223  ::daqif::OcmDaqControlSync& daq_cmds,
224  CLI::App* app) {
225  auto reply = daq_cmds.UpdateKeywords(m_id, m_keywords);
226  if (m_args.json) {
227  std::cout << JsonPrint(reply);
228  } else {
229  std::cout << "Reply: " << reply << std::endl;
230  }
231 }
232 
233 void AwaitStateRequestor::AddOptions(CLI::App* sub) {
234  sub->add_option("id", m_id, "DAQ ID");
235  sub->add_option("state", m_state, "State to await");
236  sub->add_option("substate", m_substate, "Substate to await");
237  sub->add_option("timeout", m_timeout, "timeout in [fractional] seconds");
238  sub->callback([&] {
239  auto min_timeout = m_timeout + 2;
240  if (m_args.timeout < min_timeout) {
241  std::cerr << "Note: Setting request timeout to " << min_timeout
242  << "s due to await timeout exceeding request timeout\n";
243  m_args.timeout = min_timeout;
244  }
245  });
246 }
247 
248 void AwaitStateRequestor::Handle(::stdif::StdCmdsSync& std_cmds,
249  ::daqif::OcmDaqControlSync& daq_cmds,
250  CLI::App* app) {
251  auto state = daqif::FromString<daqif::DaqState>(m_state);
252  auto substate = daqif::FromString<daqif::DaqSubState>(m_substate);
253  auto reply = daq_cmds.AwaitDaqState(m_id, state, substate, m_timeout);
254  if (m_args.json) {
255  std::cout << JsonPrint(reply);
256  } else {
257  std::cout << "Reply: " << reply << std::endl;
258  }
259 }
Contains support functions for daqif.
std::string_view ToString(daqif::DaqState state) noexcept
Definition: conversion.cpp:142
Contains parse functions for daqif.
std::ostream & operator<<(std::ostream &os, std::shared_ptr< daqif::DaqStatus > const &s)
Definition: requestor.cpp:20
void Handle(::stdif::StdCmdsSync &std_cmds, ::daqif::OcmDaqControlSync &daq_cmds, CLI::App *arg) override
Receives a parsed variables_map that include the options added with AddOptions.
Definition: requestor.cpp:248
void AddOptions(CLI::App *app) override
Add arguments to command.
Definition: requestor.cpp:233
unsigned timeout
Definition: requestor.hpp:47
elt::mal::Mal * mal
Definition: requestor.hpp:50
T const & t
Definition: requestor.hpp:33
virtual void AddOptions(CLI::App *app)
Add arguments to command.
Definition: requestor.cpp:134
Requestor(CommonArgs &args)
Definition: requestor.cpp:131
CommonArgs & m_args
Definition: requestor.hpp:73
virtual void Handle(::stdif::StdCmdsSync &std_cmds, ::daqif::OcmDaqControlSync &daq_cmds, CLI::App *arg) override
Receives a parsed variables_map that include the options added with AddOptions.
Definition: requestor.cpp:142
virtual void AddOptions(CLI::App *app) override
Add arguments to command.
Definition: requestor.cpp:137
std::string m_primary_sources
Definition: requestor.hpp:174
std::string m_id
Definition: requestor.hpp:172
void AddOptions(CLI::App *app) override
Add arguments to command.
Definition: requestor.cpp:156
void Handle(::stdif::StdCmdsSync &std_cmds, ::daqif::OcmDaqControlSync &daq_cmds, CLI::App *arg) override
Receives a parsed variables_map that include the options added with AddOptions.
Definition: requestor.cpp:164
std::string m_prefix
Definition: requestor.hpp:173
std::string m_metadata_sources
Definition: requestor.hpp:175
std::string m_properties
Definition: requestor.hpp:176
void Handle(::stdif::StdCmdsSync &std_cmds, ::daqif::OcmDaqControlSync &daq_cmds, CLI::App *arg) override
Receives a parsed variables_map that include the options added with AddOptions.
Definition: requestor.cpp:206
std::string m_spec
Definition: requestor.hpp:189
void AddOptions(CLI::App *app) override
Add arguments to command.
Definition: requestor.cpp:176
void AddOptions(CLI::App *app) override
Add arguments to command.
Definition: requestor.cpp:217
void Handle(::stdif::StdCmdsSync &std_cmds, ::daqif::OcmDaqControlSync &daq_cmds, CLI::App *arg) override
Receives a parsed variables_map that include the options added with AddOptions.
Definition: requestor.cpp:222