ifw-daq  3.0.0-pre2
IFW Data Acquisition modules
main.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_ocm_serverctl
4  * @copyright ESO - European Southern Observatory
5  *
6  * @brief
7  */
8 #include <daq/config.hpp>
9 
10 #include <memory>
11 #include <thread>
12 #include <unordered_map>
13 #include <vector>
14 
15 #include <CLI/CLI.hpp>
16 #include <boost/format.hpp>
17 
18 #include <mal/Cii.hpp>
19 #include <mal/Mal.hpp>
20 #include <mal/rr/qos/ConnectionTime.hpp>
21 #include <mal/rr/qos/ReplyTime.hpp>
22 #include <mal/utility/LoadMal.hpp>
23 #include <rad/logger.hpp>
24 #include <log4cplus/hierarchy.h>
25 
26 #include <Daqif.hpp>
27 #include <Stdif.hpp>
28 
29 #include <daq/conversion.hpp>
30 #include <daqif/subscription.hpp>
31 #include <daqif/uri.hpp>
32 
33 #include "requestor.hpp"
34 
35 volatile std::sig_atomic_t g_signal_status;
36 void SignalHandler(int signal) {
37  g_signal_status = signal;
38 }
39 
40 
41 int main(int argc, char** argv) {
42  using elt::mal::Uri;
43 
44  log4cplus::initialize();
45  // Delete mal configuration that writes to stdout, interfering with JSON output
46  log4cplus::Logger::getDefaultHierarchy().resetConfiguration();
47 
48  // Log to stderr
49  log4cplus::BasicConfigurator(log4cplus::Logger::getDefaultHierarchy(), true).configure();
50  RAD_LOG_TO_CONSOLE(true);
51  RAD_LOG_SETLEVEL("TRACE");
52 
53  std::signal(SIGINT, SignalHandler);
54  std::signal(SIGTERM, SignalHandler);
55 
56  CommonArgs args;
57 
58  CLI::App app("daqOcmCtl");
59  app.add_option(
60  "--rep",
61  args.req_addr,
62  "request endpoint without service stem. e.g. zpb://127.0.0.1:4020, defaults to env. "
63  "$OCM_REQUEST_EP")
64  ->envname("OCM_REQUEST_EP");
65  auto* pep =
66  app.add_option("--pep", args.pub_addr, "publish endpoint, defaults to env $OCM_PUBLISH_EP")
67  ->envname("OCM_PUBLISH_EP");
68  app.add_flag("--json", args.json, "output in JSON format");
69  app.add_flag("--status",
70  args.status,
71  "subscribe to server topics (requires --pep or $OCM_PUBLISH_EP to be set)")
72  ->needs(pep);
73  app.add_option("--timeout,-t", args.timeout, "request timeout in seconds", true);
74  app.add_flag_callback(
75  "--version",
76  [] {
77  std::cerr << "daqOcmCtl " << VERSION
78 #ifdef DEBUG
79  << " (debug build)"
80 #endif
81  << std::endl;
82  throw CLI::Success(VERSION, 0);
83  },
84  "Print version and exit");
85 
86  std::vector<std::string> subargs;
87 
88  std::map<std::string, std::unique_ptr<Requestor>> requestors;
89  requestors.emplace(
90  std::make_pair("std.getstatus",
91  std::make_unique<SimpleRequestor<decltype(&stdif::StdCmds::GetStatus)>>(
92  &stdif::StdCmds::GetStatus, args)));
93  requestors.emplace(
94  std::make_pair("std.getstate",
95  std::make_unique<SimpleRequestor<decltype(&stdif::StdCmds::GetState)>>(
96  &stdif::StdCmds::GetState, args)));
97  requestors.emplace(
98  std::make_pair("std.getversion",
99  std::make_unique<SimpleRequestor<decltype(&stdif::StdCmds::GetVersion)>>(
100  &stdif::StdCmds::GetVersion, args)));
101  requestors.emplace(
102  std::make_pair("std.setloglevel", std::make_unique<SetLogLevelRequestor>(args)));
103  requestors.emplace(
104  std::make_pair("std.init",
105  std::make_unique<SimpleRequestor<decltype(&stdif::StdCmds::Init)>>(
106  &stdif::StdCmds::Init, args)));
107  requestors.emplace(
108  std::make_pair("std.enable",
109  std::make_unique<SimpleRequestor<decltype(&stdif::StdCmds::Enable)>>(
110  &stdif::StdCmds::Enable, args)));
111  requestors.emplace(
112  std::make_pair("std.disable",
113  std::make_unique<SimpleRequestor<decltype(&stdif::StdCmds::Disable)>>(
114  &stdif::StdCmds::Disable, args)));
115  requestors.emplace(
116  std::make_pair("std.exit",
117  std::make_unique<SimpleRequestor<decltype(&stdif::StdCmds::Exit)>>(
118  &stdif::StdCmds::Exit, args)));
119 
120  requestors.emplace(std::make_pair(
121  "daq.stop",
122  std::make_unique<SimpleDaqRequestor<decltype(&daqif::OcmDaqControl::StopDaq)>>(
123  &daqif::OcmDaqControl::StopDaq, args)));
124  requestors.emplace(std::make_pair(
125  "daq.forcestop",
126  std::make_unique<SimpleDaqRequestor<decltype(&daqif::OcmDaqControl::ForceStopDaq)>>(
127  &daqif::OcmDaqControl::ForceStopDaq, args)));
128  requestors.emplace(std::make_pair(
129  "daq.abort",
130  std::make_unique<SimpleDaqRequestor<decltype(&daqif::OcmDaqControl::AbortDaq)>>(
131  &daqif::OcmDaqControl::AbortDaq, args)));
132  requestors.emplace(std::make_pair(
133  "daq.forceabort",
134  std::make_unique<SimpleDaqRequestor<decltype(&daqif::OcmDaqControl::ForceAbortDaq)>>(
135  &daqif::OcmDaqControl::ForceAbortDaq, args)));
136 
137  requestors.emplace(std::make_pair(
138  "daq.getstatus",
139  std::make_unique<SimpleDaqRequestor<decltype(&daqif::OcmDaqControl::GetStatus)>>(
140  &daqif::OcmDaqControl::GetStatus, args)));
141  requestors.emplace(std::make_pair(
142  "daq.getactivelist",
143  std::make_unique<NoArgRequestor<decltype(&daqif::OcmDaqControl::GetActiveList)>>(
144  &daqif::OcmDaqControl::GetActiveList, args)));
145 
146  requestors.emplace(std::make_pair("daq.start", std::make_unique<StartDaqRequestor>(args)));
147  requestors.emplace(std::make_pair("daq.startv2", std::make_unique<StartDaqV2Requestor>(args)));
148  requestors.emplace(
149  std::make_pair("daq.updatekeywords", std::make_unique<UpdateKeywordsRequestor>(args)));
150  requestors.emplace(
151  std::make_pair("daq.awaitstate", std::make_unique<AwaitStateRequestor>(args)));
152 
153  // Allow each command to add their arguments
154  for (auto& r : requestors) {
155  auto& command = r.first;
156  CLI::App* sub = app.add_subcommand(command);
157  r.second->AddOptions(sub);
158  }
159 
160  try {
161  CLI11_PARSE(app, argc, argv);
162 
163  // Load ZPB and register MAL
164  auto mal = elt::mal::loadMal("zpb", {});
165  args.mal = mal.get();
166  elt::mal::CiiFactory::getInstance().registerMal("zpb", mal);
167 
168  std::optional<daqif::Subscription<stdif::Status>> std_status;
169  std::optional<daqif::Subscription<daqif::DaqStatus>> daq_status;
170 
171  if (args.status) {
172  if (args.pub_addr.back() != '/') {
173  args.pub_addr.push_back('/');
174  }
175  std_status.emplace(daqif::MakeSubscription<stdif::Status>(
176  *mal,
177  daqif::MakeServiceUri(args.pub_addr, "std/status"),
178  [](auto& sub, auto const& event) {
179  auto const& sample = event.getData();
180  std::cerr << "status: " << sample->getStatus() << std::endl;
181  }));
182  daq_status.emplace(daqif::MakeSubscription<daqif::DaqStatus>(
183  *mal, Uri(args.pub_addr + "daq/status"), [](auto& sub, auto const& event) {
184  auto const& sample = event.getData();
185  std::cerr << "daq: id=" << sample->getId()
186  << ", file_id=" << sample->getFileId()
187  << ", state=" << daq::ToString(sample->getState())
188  << ", substate=" << daq::ToString(sample->getSubState())
189  << ", error=" << (sample->getError() ? "true" : "false")
190  << ", message=" << sample->getMessage()
191  << std::endl;
192  }));
193  }
194 
195  auto command = app.get_subcommands();
196  if (command.empty()) {
197  if (args.status) {
198  std::cerr << "no command provided -> will subscribe indefinitely\n";
199  while (!g_signal_status) {
200  std::this_thread::sleep_for(std::chrono::milliseconds(100));
201  }
202  return 0;
203  }
204  return 1;
205  }
206 
207  auto std_cmds = elt::mal::CiiFactory::getInstance().getClient<stdif::StdCmdsSync>(
208  daqif::MakeServiceUri(args.req_addr, "std"),
209  {std::make_shared<elt::mal::rr::qos::ReplyTime>(std::chrono::seconds(args.timeout))},
210  {});
211  auto daq_cmds = elt::mal::CiiFactory::getInstance().getClient<::daqif::OcmDaqControlSync>(
212  daqif::MakeServiceUri(args.req_addr, "daq"),
213  {std::make_shared<elt::mal::rr::qos::ReplyTime>(std::chrono::seconds(args.timeout))},
214  {});
215 
216  // Look up the requestor in the known commands map
217  auto req_it = requestors.find(command[0]->get_name());
218  if (req_it != requestors.end()) {
219  req_it->second->Handle(*std_cmds, *daq_cmds, command[0]);
220  std::cerr << "Done\n";
221  return 0;
222  } else {
223  std::cerr << "error: Unknown command: '" << command[0]->get_name() << "'\n";
224  return 1;
225  }
226  } catch (daqif::DaqException const& e) {
227  std::cerr << "DaqException: id: '" << e.getId() << "', message: " << e.getMessage()
228  << std::endl;
229  return 1;
230  } catch (CLI::ParseError const& e) {
231  std::cerr << e.what() << std::endl;
232  return 1;
233  } catch (std::exception const& e) {
234  std::cerr << "error: " << e.what() << std::endl;
235  return 1;
236  } catch (...) {
237  std::cerr << "unknown error" << std::endl;
238  return 1;
239  }
240 }
Contains support functions for daqif.
int main(int argc, char **argv)
Definition: main.cpp:68
network::uri MakeServiceUri(std::string base_uri, std::string_view service_path)
Creates a service URI of the form <baseuri>/<service>.
Definition: uri.cpp:19
void SignalHandler(int signal)
Definition: main.cpp:36
volatile std::sig_atomic_t g_signal_status
Definition: main.cpp:35
unsigned timeout
Definition: requestor.hpp:47
std::string req_addr
Definition: requestor.hpp:48
elt::mal::Mal * mal
Definition: requestor.hpp:50
std::string pub_addr
Definition: requestor.hpp:49
bool status
Definition: requestor.hpp:46
Simple Daq commands that accepts a single argument id and returns a shared_ptr type that can be forma...
Definition: requestor.hpp:128
Simple requestor for commands without argument.
Definition: requestor.hpp:96
Contains URI support functions for daqif.
Contains URI support functions for daqif.