ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
rsyncAsyncProcess.cpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup daq_libdaq
4 * @copyright (c) Copyright ESO 2022
5 * All Rights Reserved
6 * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7 *
8 * @brief daq::RsyncAsyncProcess class definition
9 */
11
12#include <array>
13#include <string_view>
14
15#include <fmt/format.h>
16
17namespace daq {
18namespace {
19
20/**
21 * Literal speed units used by rsync.
22 */
23static constexpr std::array<std::string_view, 4> RSYNC_UNITS = {"B/s", "kB/s", "MB/s", "GB/s"};
24
25static std::vector<std::string>
26MakeAsyncProcessArgs(std::string source,
27 std::string dest,
28 RsyncOptions const& opts,
30 std::vector<std::string> args;
31 args.emplace_back(opts.rsync.value_or("rsync"));
32 if (opts.bw_limit) {
33 args.emplace_back(fmt::format("--bwlimit={}", *opts.bw_limit));
34 }
35 if (opts.whole_file) {
36 args.emplace_back(*opts.whole_file ? "--whole-file" : "--no-whole-file");
37 }
38 if (opts.inplace) {
39 args.emplace_back(*opts.whole_file ? "--inplace" : "--no-inplace");
40 }
41 if (opts.timeout) {
42 args.emplace_back(fmt::format("--timeout={}", opts.timeout->count()));
43 }
44 if (opts.logfile) {
45 args.emplace_back(fmt::format("--log-file={}", opts.logfile->native()));
46 }
48 args.emplace_back("--dry-run");
49 }
50 // preseve modification times
51 args.emplace_back("--times");
52 // Mandatory to have functioning progress updates.
53 args.emplace_back("--info=progress2");
54 // No human readable gets rid of thousand separators in number of transferred bytes.
55 args.emplace_back("--no-human-readable");
56 args.emplace_back(std::move(source));
57 args.emplace_back(std::move(dest));
58 return args;
59}
60
61} // namespace
62
63std::optional<RsyncProgress> ParseRsyncProgress(std::string const& line) noexcept {
64 using std::chrono::hours;
65 using std::chrono::minutes;
66 using std::chrono::seconds;
67
68 // Format in the form:
69 // 0 0% 0.00kB/s 0:00:00
70 // 367656960 34% 350.62MB/s 0:00:01
71 // 735313920 68% 350.80MB/s 0:00:00
72 // 1073741824 100% 351.29MB/s 0:00:02 (xfr#1 to-chk=0/1)
73 RsyncProgress progress;
74 unsigned percent;
75 float speed;
76 char unit[5];
77 unsigned hrs;
78 unsigned mins;
79 unsigned secs;
80
81 auto ret = std::sscanf(line.c_str(),
82 " %lu %u%% %f%4s %u:%u:%u",
83 &progress.transferred,
84 &percent,
85 &speed,
86 &unit[0],
87 &hrs,
88 &mins,
89 &secs);
90 if (ret != 7) {
91 return {};
92 }
93 progress.progress = percent / 100.0f;
94 bool found = false;
95 for (auto u : RSYNC_UNITS) {
96 if (u == unit) {
97 found = true;
98 break;
99 }
100 speed *= 1024.0f;
101 }
102 if (!found) {
103 return {};
104 }
105 progress.speed = speed;
106 progress.remaining = hours(hrs) + minutes(mins) + seconds(secs);
107 return progress;
108}
109
110RsyncAsyncProcess::RsyncAsyncProcess(boost::asio::io_context& ctx,
111 std::string source,
112 std::string dest,
113 RsyncOptions const& opts,
114 DryRun flag)
115 : AsyncProcess(ctx, MakeAsyncProcessArgs(std::move(source), std::move(dest), opts, flag)) {
116}
117
119}
120
121boost::future<int> RsyncAsyncProcess::Initiate() {
122 AsyncProcess::ConnectStdout([this](pid_t pid, std::string const& line) mutable {
123 // Note: parse failures are ignored
124 auto maybe_progress = ParseRsyncProgress(line);
125 if (maybe_progress) {
126 m_progress(pid, *maybe_progress);
127 }
128 });
129 return AsyncProcess::Initiate();;
130}
131
132boost::signals2::connection RsyncAsyncProcess::ConnectProgress(SigProgress::slot_type const& slot) {
133 return m_progress.connect(slot);
134}
135
136} // namespace daq
Represents a subprocess as an asynchronous operation.
boost::future< int > Initiate() override
Starts process and asynchronous operations that read stdout and stderr.
boost::signals2::connection ConnectStdout(SigOutStream::slot_type const &slot) override
Signal type for stdout/stderr signals.
boost::future< int > Initiate() override
Progress update signal.
RsyncAsyncProcess(boost::asio::io_context &ctx, std::string source, std::string dest, RsyncOptions const &opts={}, DryRun flag=DryRun::Disabled)
Construct async operation.
boost::signals2::connection ConnectProgress(SigProgress::slot_type const &slot) override
Connect to progress signal.
std::chrono::seconds remaining
Estimated remaining time.
std::optional< RsyncProgress > ParseRsyncProgress(std::string const &line) noexcept
Parse progress update from rsync.
float progress
Progress as fraction of 1 (complete == 1.0)
uint64_t transferred
Number of transferred bytes.
float speed
Transfer speed in bytes/sec.
Options controlling rsync invocation.
Describes file transfer progress,.
daq::RsyncAsyncProcess and related class declarations.