ifw-daq  3.0.1
IFW Data Acquisition modules
asyncProcess.hpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_common_libdaq
4  * @copyright (c) Copyright ESO 2022
5  * All Rights Reserved
6  * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7  *
8  * @brief daq::AsyncProcess class definition
9  */
10 #ifndef DAQ_ASYNC_PROCESS_HPP
11 #define DAQ_ASYNC_PROCESS_HPP
12 #include <daq/config.hpp>
13 
14 #include <iosfwd>
15 #include <optional>
16 #include <string>
17 #include <vector>
18 
19 #include <boost/circular_buffer.hpp>
20 #include <boost/process.hpp>
21 #include <boost/signals2/signal.hpp>
22 #include <boost/thread/future.hpp>
23 #include <log4cplus/logger.h>
24 
25 namespace daq {
26 
27 /**
28  * Interface to asynchronous process.
29  */
31 public:
32  virtual ~AsyncProcessIf(){};
33 
34  /**
35  * Initiates async operation by executing the specified process.
36  *
37  * This can only be called once.
38  */
39  [[nodiscard]] virtual boost::future<int> Initiate() = 0;
40 
41  /**
42  * Get PID.
43  *
44  * @returns pid of process if process is running, otherwise nullopt.
45  */
46  virtual std::optional<pid_t> GetPid() const noexcept = 0;
47 
48  /**
49  * Aborts the operation by terminating process which completes the operation.
50  * If process is not running this will not do anything.
51  *
52  * @returns Error code of operation. If process is not running, because it has not been started
53  * for with AsyncProcess::Initiate() for example, it returns std::errc::no_such_process.
54  */
55  virtual std::error_code Abort() noexcept = 0;
56 
57  /**
58  * Send signal to process. Unlike Abort() this can be used to gracefully terminate application
59  * by sending e.g. SIGTERM.
60  *
61  * @param sig Signal to send.
62  * @returns Error code of operation. If process is not running, because it has not been started
63  * for with AsyncProcess::Initiate() for example, it returns std::errc::no_such_process.
64  */
65  virtual std::error_code Signal(int sig) noexcept = 0;
66 
67  /**
68  * @return arguments used when launching process.
69  */
70  virtual std::vector<std::string> const& GetArguments() const noexcept = 0;
71 
72  /**
73  * @returns true if process is running.
74  * @returns false otherwise.
75  */
76  virtual bool IsRunning() const noexcept = 0;
77 
78  /**
79  * @name Signals
80  */
81  /// @{
82  /**
83  * Signal type for stdout/stderr signals.
84  */
85  using SigOutStream = boost::signals2::signal<void(pid_t, std::string const&)>;
86 
87  /**
88  * Connect slot to line-buffered stdout signal.
89  *
90  * Signal is invoked for every line read from process.
91  *
92  * @returns connection object.
93  */
94  virtual boost::signals2::connection ConnectStdout(SigOutStream::slot_type const& slot) = 0;
95 
96  /**
97  * Connect slot to line-buffered stderr signal.
98  *
99  * Signal is invoked for every line read from process.
100  *
101  * @returns connection object.
102  */
103  virtual boost::signals2::connection ConnectStderr(SigOutStream::slot_type const& slot) = 0;
104  /// @}
105 };
106 
107 /**
108  * Formats proc representation in the form `[<pid>] <args>`
109  *
110  * @param os Ostream to format to.
111  * @param proc instance to format.
112  */
113 std::ostream& operator<<(std::ostream& os, AsyncProcessIf const& proc);
114 
115 /**
116  * Represents a subprocess as an asynchronous operation.
117  *
118  * Once constructed the operation is initiated (only once) with `Initiate()` which starts the
119  * process and returns a boost::future object that will receive exit code when process terminates
120  * *and* all output has been read.
121  *
122  * @note All completion handlers have been executed and no signals will be emitted after future has
123  * received the value or exception, it is safe to delete AsyncProcess after this.
124  *
125  * Operation can be aborted with `Abort()` which will terminate process and set future with
126  * exceptional result.
127  *
128  * boost::process is pretty buggy so be very careful making changes to this.
129  * Examples:
130  * - Do not check if process is running with child::is_running() after `on_exit` has been executed.
131  * This cause exit codes to be lost [https://github.com/boostorg/process/issues/187]
132  *
133  * @ingroup daq_common_libdaq
134  */
135 class AsyncProcess : public virtual AsyncProcessIf {
136 public:
137  /**
138  * Constructor.
139  *
140  * @note Does not start the process or any other asynchronous operations. This is done in
141  * AsyncProcess::Initiate().
142  *
143  * @param ctx io_context instance to use.
144  * @param args Command line arguments. First argument specify the file to be executed.
145  */
146  explicit AsyncProcess(boost::asio::io_context& ctx, std::vector<std::string> args);
147  virtual ~AsyncProcess() noexcept;
148 
149  /**
150  * Starts process and asynchronous operations that read stdout and stderr.
151  *
152  * This can only be called once.
153  *
154  * @note Caller is responsible for keeping AsyncProcess alive until result is set on future.
155  *
156  * @returns Future that will receive process exit code @a after process has terminated and all
157  * output has been read.
158  */
159  [[nodiscard]] boost::future<int> Initiate() override;
160  std::optional<pid_t> GetPid() const noexcept override;
161 
162  /**
163  * Aborts the operation by terminating process which completes the operation.
164  * If process is not running this will not do anything.
165  *
166  * @returns Error code of operation. If process is not running, because it has not been started
167  * for with AsyncProcess::Initiate() for example, it returns std::errc::no_such_process.
168  */
169  std::error_code Abort() noexcept override;
170 
171  /**
172  * Send signal to process. Unlike Abort() this can be used to gracefully terminate application
173  * by sending e.g. SIGTERM.
174  *
175  * @param sig Signal to send.
176  * @returns Error code of operation. If process is not running, because it has not been started
177  * for with AsyncProcess::Initiate() for example, it returns std::errc::no_such_process.
178  */
179  std::error_code Signal(int sig) noexcept override;
180 
181  /**
182  * @returns true if process is running.
183  * @returns false otherwise.
184  */
185  bool IsRunning() const noexcept override;
186 
187  std::vector<std::string> const& GetArguments() const noexcept override {
188  return m_args;
189  }
190 
191  /**
192  * @name Signals
193  */
194  /// @{
196  boost::signals2::connection ConnectStdout(SigOutStream::slot_type const& slot) override {
197  return m_stdout.signal.connect(slot);
198  }
199  boost::signals2::connection ConnectStderr(SigOutStream::slot_type const& slot) override {
200  return m_stderr.signal.connect(slot);
201  }
202  /// @}
203 
204 protected:
205 private:
206  /**
207  * Describes process result.
208  */
209  struct Result {
210  /**
211  * If process was started this contains the exit_code.
212  */
213  int exit_code;
214  /**
215  * If process start fails this will be non-zero.
216  */
217  std::error_code ec;
218  };
219  struct Pipe {
220  boost::process::async_pipe pipe;
221  boost::asio::streambuf buffer = boost::asio::streambuf();
222  SigOutStream signal = {};
223  };
224  /**
225  * Checks if operation is completed and sets promise if it is.
226  *
227  * Three async operations may complete in any order and the future should be set only when all
228  * three complete.
229  *
230  * - stdout PIPE reads. Completes when read get EOF.
231  * - stderr PIPE reads. Completes when read get EOF.
232  * - on_exit handler, completed automatically when process exits.
233  *
234  * Each of these must invoke CheckCompleted() which will trigger completion of future only after
235  * all operations have completed.
236  */
237  void CheckCompleted();
238  /**
239  * Recursive async read initator */
240  void AsyncReadStream(Pipe&);
241 
242  boost::asio::io_context& m_io_ctx;
243  std::vector<std::string> m_args;
244  /** @name PIPEs and buffers
245  * @{
246  */
247  Pipe m_stdout;
248  Pipe m_stderr;
249  /** @} */
250 
251  boost::process::child m_proc;
252  pid_t m_pid;
253  std::optional<Result> m_result;
254  boost::promise<int> m_promise;
255 };
256 
257 /**
258  * Logs output to logger and keeps last N lines in circular buffer for later retrival.
259  */
261 public:
262  LogCaptureLast(log4cplus::Logger logger, std::size_t num_lines);
263  void operator()(pid_t pid, std::string const& line) noexcept;
264 
265  auto Lines() const -> boost::circular_buffer<std::string> {
266  return m_buffer;
267  }
268 
269 
270 private:
271  log4cplus::Logger m_logger;
272  boost::circular_buffer<std::string> m_buffer;
273 };
274 
275 std::ostream& operator<<(std::ostream& os, LogCaptureLast const& lines);
276 
277 } // namespace daq
278 
279 #endif // #ifndef DAQ_ASYNC_PROCESS_HPP
Interface to asynchronous process.
virtual boost::future< int > Initiate()=0
Initiates async operation by executing the specified process.
virtual std::error_code Abort() noexcept=0
Aborts the operation by terminating process which completes the operation.
virtual std::vector< std::string > const & GetArguments() const noexcept=0
virtual boost::signals2::connection ConnectStdout(SigOutStream::slot_type const &slot)=0
Connect slot to line-buffered stdout signal.
virtual ~AsyncProcessIf()
boost::signals2::signal< void(pid_t, std::string const &)> SigOutStream
Signal type for stdout/stderr signals.
virtual std::error_code Signal(int sig) noexcept=0
Send signal to process.
virtual boost::signals2::connection ConnectStderr(SigOutStream::slot_type const &slot)=0
Connect slot to line-buffered stderr signal.
virtual bool IsRunning() const noexcept=0
virtual std::optional< pid_t > GetPid() const noexcept=0
Get PID.
Represents a subprocess as an asynchronous operation.
boost::signals2::connection ConnectStdout(SigOutStream::slot_type const &slot) override
Signal type for stdout/stderr signals.
std::vector< std::string > const & GetArguments() const noexcept override
boost::signals2::connection ConnectStderr(SigOutStream::slot_type const &slot) override
Signal type for stdout/stderr signals.
Logs output to logger and keeps last N lines in circular buffer for later retrival.
auto Lines() const -> boost::circular_buffer< std::string >
std::ostream & operator<<(std::ostream &os, AsyncProcessIf const &proc)
Formats proc representation in the form [<pid>] <args>
Utility class that represents a result and an error.
Definition: utility.hpp:17