ifw-daq  3.0.1
IFW Data Acquisition modules
asyncProcess.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_libdaq
4  * @copyright (c) Copyright ESO 2022
5  * All Rights Reserved
6  * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7  *
8  * @brief daq::AsyncProcess class declaration
9  */
11 
12 #include <ostream>
13 
14 #include <boost/asio/read_until.hpp>
15 #include <boost/process/args.hpp>
16 #include <log4cplus/loggingmacros.h>
17 
18 #include <daq/log4cplus.hpp>
19 
20 namespace daq {
21 
22 namespace bp = boost::process;
23 
24 std::ostream& operator<<(std::ostream& os, AsyncProcessIf const& proc) {
25  if (auto pid = proc.GetPid(); pid) {
26  os << '[' << *pid << ']';
27  } else {
28  os << "[ <not running> ]";
29  }
30 
31  for (auto const& arg : proc.GetArguments()) {
32  os << ' ' << arg;
33  }
34  return os;
35 }
36 
37 AsyncProcess::AsyncProcess(boost::asio::io_context& ctx, std::vector<std::string> args)
38  : m_io_ctx(ctx), m_args(std::move(args)), m_stdout{ctx}, m_stderr{ctx} {
39  if (m_args.size() < 1) {
40  throw std::invalid_argument("No arguments provided");
41  }
42  m_args[0] = bp::search_path(m_args[0]).native();
43 }
44 
46 }
47 
48 boost::future<int> AsyncProcess::Initiate() {
49  assert(!m_proc.valid());
50 
51  // on_exit handler has the critical role of triggering the clean shutdown of in-progress
52  // async operations.
53  //
54  // There may be scheduled, but not yet executed, completion handlers for PIPE reads so we cannot
55  // simply cancel or close PIPES without risk loosing data.
56  //
57  // To ensure all output is read we close PIPE write end so that when read is exhausted EOF will
58  // be emitted.
59  //
60  // Observed: on_exit handler not invoked.
61  m_proc = bp::child(
62  bp::args = m_args,
63  bp::std_out > m_stdout.pipe,
64  bp::std_err > m_stderr.pipe,
65  m_io_ctx, // io_context is required for on_exit handler to function.
66  bp::on_exit = [this](int exit, const std::error_code& ec_in) {
67  // After process is terminated PIPEs will eventually reach EOF which triggers
68  // the completion AsyncProcess future. For now we signal the completion of the process
69  // by setting value.
70  m_result = {exit, ec_in};
71  CheckCompleted();
72  });
73  m_pid = m_proc.id();
74  AsyncReadStream(m_stdout);
75  AsyncReadStream(m_stderr);
76  return m_promise.get_future();
77 }
78 
79 std::error_code AsyncProcess::Abort() noexcept {
80  if (!IsRunning()) {
81  return std::make_error_code(std::errc::no_such_process);
82  }
83  // Terminate process hard, this triggers even handlers and normal unwind.
84  std::error_code ec;
85  m_proc.terminate(ec);
86  return ec;
87 }
88 
89 std::optional<pid_t> AsyncProcess::GetPid() const noexcept {
90  if (m_proc.valid()) {
91  return m_pid;
92  }
93  return std::nullopt;
94 }
95 
96 std::error_code AsyncProcess::Signal(int sig) noexcept {
97  if (!IsRunning()) {
98  return std::make_error_code(std::errc::no_such_process);
99  }
100 
101  pid_t id = m_proc.id();
102  int err = kill(id, sig);
103  return std::make_error_code(static_cast<std::errc>(err));
104 }
105 
106 bool AsyncProcess::IsRunning() const noexcept {
107  // warning: It is not safe to invoke child::running() due to bug in boost so we infer the
108  // running state based on whether exit_handler has been invoked or not. This is not 100%
109  // accurate but will work for most use-cases.
110  return m_proc.valid() && !m_result.has_value();
111 }
112 
113 void AsyncProcess::AsyncReadStream(AsyncProcess::Pipe& pipe) {
114  // Process pipe content 'line-buffered'
115  boost::asio::async_read_until(pipe.pipe,
116  pipe.buffer,
117  '\n',
118  [this, &pipe](const boost::system::error_code& ec, std::size_t) {
119  if (ec) {
120  // Expected error is 'EOF', but we don't make a
121  // distinction at this point.
122  //
123  // There might still be data in buffer from previous reads
124  // that do not end in newline, so we read all of it and
125  // send it out to connected slots.
126  std::istream is(&pipe.buffer);
127  std::string remaining(pipe.buffer.size(), '\0');
128  is.read(remaining.data(), remaining.size());
129  pipe.signal(m_pid, remaining);
130 
131  // Close pipe to signal we're done.
132  pipe.pipe.close();
133  CheckCompleted();
134 
135  // We're done, do not schedule another read.
136  return;
137  }
138 
139  // Signal any slots.
140  std::istream is(&pipe.buffer);
141  std::string line;
142  std::getline(is, line);
143  // note: getline consumes '\n' which we don't want since we
144  // don't want to modify source at all.
145  line.push_back('\n');
146  pipe.signal(m_pid, line);
147 
148  AsyncReadStream(pipe);
149  });
150 }
151 
152 void AsyncProcess::CheckCompleted() {
153  // Note that we cannot check if process is alive with m_proc.is_running() as this
154  if (!m_result.has_value() || m_stdout.pipe.is_open() || m_stderr.pipe.is_open()) {
155  // Not all read operations have completed.
156  return;
157  }
158  if (m_result->ec) {
159  m_promise.set_exception(std::system_error(m_result->ec));
160  } else {
161  m_promise.set_value(m_result->exit_code);
162  }
163 }
164 
165 LogCaptureLast::LogCaptureLast(log4cplus::Logger logger, std::size_t num_lines)
166  : m_logger(logger), m_buffer(num_lines) {
167 }
168 
169 void LogCaptureLast::operator()(pid_t pid, std::string const& line) noexcept {
170  LOG4CPLUS_INFO(m_logger, '[' << pid << "]: " << Trim(line));
171  m_buffer.push_back(line);
172 }
173 
174 std::ostream& operator<<(std::ostream& os, LogCaptureLast const& lines) {
175  for (auto const& line : lines.Lines()) {
176  // note: Line is unmodified from program and ends with \n
177  os << line;
178  }
179  return os;
180 }
181 
182 } // namespace daq
daq::AsyncProcess class definition
Interface to asynchronous process.
virtual std::vector< std::string > const & GetArguments() const noexcept=0
virtual std::optional< pid_t > GetPid() const noexcept=0
Get PID.
virtual ~AsyncProcess() noexcept
boost::future< int > Initiate() override
Starts process and asynchronous operations that read stdout and stderr.
AsyncProcess(boost::asio::io_context &ctx, std::vector< std::string > args)
Constructor.
Logs output to logger and keeps last N lines in circular buffer for later retrival.
void operator()(pid_t pid, std::string const &line) noexcept
auto Lines() const -> boost::circular_buffer< std::string >
Trim string from whitespace (' ', ' ')
Definition: log4cplus.hpp:39
Declaration of log4cplus helpers.
std::ostream & operator<<(std::ostream &os, AsyncProcessIf const &proc)
Formats proc representation in the form [<pid>] <args>