ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
asyncProcess.cpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup daq_libdaq
4 * @copyright (c) Copyright ESO 2022
5 * All Rights Reserved
6 * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7 *
8 * @brief daq::AsyncProcess class declaration
9 */
11
12#include <ostream>
13
14#include <boost/asio/read_until.hpp>
15#include <boost/process/args.hpp>
16#include <log4cplus/loggingmacros.h>
17
18#include <daq/log4cplus.hpp>
19
20namespace daq {
21
22namespace bp = boost::process;
23
24std::ostream& operator<<(std::ostream& os, AsyncProcessIf const& proc) {
25 if (auto pid = proc.GetPid(); pid) {
26 os << '[' << *pid << ']';
27 } else {
28 os << "[ <not running> ]";
29 }
30
31 for (auto const& arg : proc.GetArguments()) {
32 os << ' ' << arg;
33 }
34 return os;
35}
36
37AsyncProcess::AsyncProcess(boost::asio::io_context& ctx, std::vector<std::string> args)
38 : m_io_ctx(ctx), m_args(std::move(args)), m_stdout{ctx}, m_stderr{ctx} {
39 if (m_args.size() < 1) {
40 throw std::invalid_argument("No arguments provided");
41 }
42 m_args[0] = bp::search_path(m_args[0]).native();
43}
44
46}
47
48boost::future<int> AsyncProcess::Initiate() {
49 assert(!m_proc.valid());
50
51 // on_exit handler has the critical role of triggering the clean shutdown of in-progress
52 // async operations.
53 //
54 // There may be scheduled, but not yet executed, completion handlers for PIPE reads so we cannot
55 // simply cancel or close PIPES without risk loosing data.
56 //
57 // To ensure all output is read we close PIPE write end so that when read is exhausted EOF will
58 // be emitted.
59 //
60 // Observed: on_exit handler not invoked.
61 m_proc = bp::child(
62 bp::args = m_args,
63 bp::std_out > m_stdout.pipe,
64 bp::std_err > m_stderr.pipe,
65 m_io_ctx, // io_context is required for on_exit handler to function.
66 bp::on_exit = [this](int exit, const std::error_code& ec_in) {
67 // After process is terminated PIPEs will eventually reach EOF which triggers
68 // the completion AsyncProcess future. For now we signal the completion of the process
69 // by setting value.
70 m_result = {exit, ec_in};
71 CheckCompleted();
72 });
73 m_pid = m_proc.id();
74 AsyncReadStream(m_stdout);
75 AsyncReadStream(m_stderr);
76 return m_promise.get_future();
77}
78
79std::error_code AsyncProcess::Abort() noexcept {
80 if (!IsRunning()) {
81 return std::make_error_code(std::errc::no_such_process);
82 }
83 // Terminate process hard, this triggers even handlers and normal unwind.
84 std::error_code ec;
85 m_proc.terminate(ec);
86 return ec;
87}
88
89std::optional<pid_t> AsyncProcess::GetPid() const noexcept {
90 if (m_proc.valid()) {
91 return m_pid;
92 }
93 return std::nullopt;
94}
95
96std::error_code AsyncProcess::Signal(int sig) noexcept {
97 if (!IsRunning()) {
98 return std::make_error_code(std::errc::no_such_process);
99 }
100
101 pid_t id = m_proc.id();
102 int err = kill(id, sig);
103 return std::make_error_code(static_cast<std::errc>(err));
104}
105
106bool AsyncProcess::IsRunning() const noexcept {
107 // warning: It is not safe to invoke child::running() due to bug in boost so we infer the
108 // running state based on whether exit_handler has been invoked or not. This is not 100%
109 // accurate but will work for most use-cases.
110 return m_proc.valid() && !m_result.has_value();
111}
112
113void AsyncProcess::AsyncReadStream(AsyncProcess::Pipe& pipe) {
114 // Process pipe content 'line-buffered'
115 boost::asio::async_read_until(pipe.pipe,
116 pipe.buffer,
117 '\n',
118 [this, &pipe](const boost::system::error_code& ec, std::size_t) {
119 if (ec) {
120 // Expected error is 'EOF', but we don't make a
121 // distinction at this point.
122 //
123 // There might still be data in buffer from previous reads
124 // that do not end in newline, so we read all of it and
125 // send it out to connected slots.
126 std::istream is(&pipe.buffer);
127 std::string remaining(pipe.buffer.size(), '\0');
128 is.read(remaining.data(), remaining.size());
129 pipe.signal(m_pid, remaining);
130
131 // Close pipe to signal we're done.
132 pipe.pipe.close();
133 CheckCompleted();
134
135 // We're done, do not schedule another read.
136 return;
137 }
138
139 // Signal any slots.
140 std::istream is(&pipe.buffer);
141 std::string line;
142 std::getline(is, line);
143 // note: getline consumes '\n' which we don't want since we
144 // don't want to modify source at all.
145 line.push_back('\n');
146 pipe.signal(m_pid, line);
147
148 AsyncReadStream(pipe);
149 });
150}
151
152void AsyncProcess::CheckCompleted() {
153 // Note that we cannot check if process is alive with m_proc.is_running() as this
154 if (!m_result.has_value() || m_stdout.pipe.is_open() || m_stderr.pipe.is_open()) {
155 // Not all read operations have completed.
156 return;
157 }
158 if (m_result->ec) {
159 m_promise.set_exception(std::system_error(m_result->ec));
160 } else {
161 m_promise.set_value(m_result->exit_code);
162 }
163}
164
165LogCaptureLast::LogCaptureLast(log4cplus::Logger logger, std::size_t num_lines)
166 : m_logger(logger), m_buffer(num_lines) {
167}
168
169void LogCaptureLast::operator()(pid_t pid, std::string const& line) noexcept {
170 LOG4CPLUS_INFO(m_logger, '[' << pid << "]: " << Trim(line));
171 m_buffer.push_back(line);
172}
173
174std::ostream& operator<<(std::ostream& os, LogCaptureLast const& lines) {
175 for (auto const& line : lines.Lines()) {
176 // note: Line is unmodified from program and ends with \n
177 os << line;
178 }
179 return os;
180}
181
182} // namespace daq
daq::AsyncProcess class definition
Interface to asynchronous process.
virtual std::optional< pid_t > GetPid() const noexcept=0
Get PID.
virtual std::vector< std::string > const & GetArguments() const noexcept=0
virtual ~AsyncProcess() noexcept
boost::future< int > Initiate() override
Starts process and asynchronous operations that read stdout and stderr.
AsyncProcess(boost::asio::io_context &ctx, std::vector< std::string > args)
Constructor.
Logs output to logger and keeps last N lines in circular buffer for later retrival.
void operator()(pid_t pid, std::string const &line) noexcept
auto Lines() const -> boost::circular_buffer< std::string >
Trim string from whitespace (' ', ' ')
Definition: log4cplus.hpp:40
Declaration of log4cplus helpers.
std::ostream & operator<<(std::ostream &os, AsyncProcessIf const &proc)
Formats proc representation in the form [<pid>] <args>