ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
main.cpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup daq_dpm_server
4 * @copyright (c) Copyright ESO 2022
5 * All Rights Reserved
6 * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7 *
8 * @brief daqDpmServer entrypoint.
9 */
10#include <daq/config.hpp>
11#include <iostream>
12
13#include <boost/asio.hpp>
14#include <boost/program_options.hpp>
15#include <ciiLogManager.hpp>
16#include <log4cplus/configurator.h>
17#include <log4cplus/logger.h>
18#include <log4cplus/loggingmacros.h>
19#include <rad/ioExecutor.hpp>
20#include <rad/logger.hpp>
21#include <rad/mal/publisher.hpp>
22#include <rad/mal/replier.hpp>
23#include <rad/oldbAsyncWriter.hpp>
24
25#include <daq/conversion.hpp>
27#include <daq/dpm/scheduler.hpp>
28#include <daq/error/report.hpp>
29#include <daq/status.hpp>
30
31#include <daqif/uri.hpp>
32
33#include "configManager.hpp"
34
35namespace error {
36/**
37 * Error codes
38 */
39enum {
40 /** Invalid program arguments */
42 Unknown = 255,
43};
44} // namespace error
45
46namespace po = boost::program_options;
47namespace daq::dpm {
48
49int Entrypoint(log4cplus::Logger const& logger, ConfigManager& cfg_mgr) {
50 boost::asio::io_context io_ctx;
51 rad::IoExecutor exec(io_ctx);
52 Configuration const& config = cfg_mgr.GetConfig();
53
54 auto ws_path = config.workspace;
55 if (ws_path.is_relative()) {
56 ws_path = config.dataroot / ws_path;
57 }
58
59 // Construction also initializes/verifies workspace and may throw.
60 WorkspaceImpl workspace(ws_path);
61
62 // Scheduler
63 SchedulerOptions scheduler_options;
64 scheduler_options.concurrency_limits.daq = config.limit_daq;
65 scheduler_options.concurrency_limits.merge = config.limit_merge;
66 scheduler_options.concurrency_limits.net_send = config.limit_net_send;
67 scheduler_options.concurrency_limits.net_receive = config.limit_net_receive;
68
69 DaqControllerOptions daq_controller_options;
70 daq_controller_options.merge_bin = config.merge_bin;
71 daq_controller_options.rsync_bin = config.rsync_bin;
72
74 [](boost::asio::io_context& io_ctx,
75 std::string source, // source
76 std::string dest, // dest
77 RsyncOptions const& opts,
78 RsyncAsyncProcess::DryRun dry_run) -> std::unique_ptr<RsyncAsyncProcessIf> {
79 return std::make_unique<RsyncAsyncProcess>(
80 io_ctx, std::move(source), std::move(dest), opts, dry_run);
81 };
82
83 DaqControllerImpl::ProcFactory proc_factory = [](boost::asio::io_context& io_ctx,
84 std::vector<std::string> args) {
85 return std::make_unique<AsyncProcess>(io_ctx, std::move(args));
86 };
87
88 SchedulerImpl scheduler(
89 exec,
90 workspace,
91 [&](std::unique_ptr<DaqWorkspace> workspace,
92 Resources& resources) -> std::unique_ptr<DaqController> {
93 return std::make_unique<DaqControllerImpl>(exec,
94 std::move(workspace),
95 resources,
96 rsync_factory,
97 proc_factory,
98 daq_controller_options);
99 },
100 scheduler_options);
101
102 // MAL variables that have an empty value if --no-ipc is used.
103 std::optional<rad::cii::Replier> replier;
104 std::shared_ptr<elt::mal::Mal> mal;
105 std::unique_ptr<rad::cii::Publisher<daqif::DaqStatus>> daq_status_publisher;
106 std::unique_ptr<rad::cii::Publisher<daqif::InternalDaqStatus>> daq_dpm_status_publisher;
107 std::optional<rad::OldbAsyncWriter> oldb_writer;
108 if (!config.no_ipc) {
109 mal = elt::mal::loadMal("zpb", {});
110 auto dpm_service = std::make_shared<DpmService>(exec, *mal, workspace, scheduler);
111 /*
112 * Load CII/MAL middleware here because it resets
113 * the log4cplus configuration!
114 */
115 elt::mal::CiiFactory& factory = elt::mal::CiiFactory::getInstance();
116 factory.registerMal("zpb", mal);
117
118 /*
119 * Register CII/MAL replier
120 */
121 replier.emplace(elt::mal::Uri(config.rr_uri));
122 replier->RegisterService<daqif::AsyncDpmControl>("dpm", dpm_service);
123 replier->RegisterService<daqif::AsyncDpmDaqControl>("daq", dpm_service);
124 daq_status_publisher = std::make_unique<rad::cii::Publisher<daqif::DaqStatus>>(
125 daqif::MakeServiceUri(config.ps_uri, "daq/status"));
126 daq_dpm_status_publisher = std::make_unique<rad::cii::Publisher<daqif::InternalDaqStatus>>(
127 daqif::MakeServiceUri(config.ps_uri, "internal/daq/status"));
128 {
129 auto daq_status_sample = daq_status_publisher->CreateTopic();
130 auto daq_dpm_status_sample = daq_dpm_status_publisher->CreateTopic();
131 scheduler.ConnectStatus([&,
132 daq_status_sample = std::move(daq_status_sample),
133 daq_dpm_status_sample =
134 std::move(daq_dpm_status_sample)](Status const& status) {
135 try {
136 LOG4CPLUS_TRACE(logger, "Publishing DAQ status update: " << status);
137 *daq_status_sample << status;
138 daq_status_publisher->Publish(*daq_status_sample);
139 *daq_dpm_status_sample << status;
140 daq_dpm_status_publisher->Publish(*daq_dpm_status_sample);
141 } catch (std::exception const& e) {
142 LOG4CPLUS_ERROR(logger, fmt::format("Failed to publish status: {}", e.what()));
143 }
144 });
145 }
146 // Write configuration
147 {
148 oldb_writer.emplace(config.db_timeout, std::chrono::milliseconds(250));
149 auto prefix = config.db_prefix;
150 if (prefix.empty()) {
151 throw std::invalid_argument("OLDB prefix is invalid (empty)");
152 }
153 if (prefix.back() != '/') {
154 prefix.push_back('/');
155 }
156
157 cfg_mgr.Visit([&](daq::config::Manager::CiiValue const& param) {
158 oldb_writer->Set(prefix + param.metadata.canonical_name, param.value);
159 });
160 oldb_writer->StartWriter();
161 }
162 } else {
163 LOG4CPLUS_INFO(logger,
164 "Note: Running in standalone mode without interprocess communication."
165 " Stop process using CTRL-C/SIGINT");
166 }
167
168 // Run until requested to exit
169 LOG4CPLUS_INFO(logger, fmt::format("Application daqDpmServer started as '{}'", config.name));
170 scheduler.Start();
171 if (config.poll_once) {
172 // First poll may initiate async activities, after which we Stop scheduler and let it run to
173 // completion.
174 LOG4CPLUS_INFO(logger,
175 "Note: Polling once due to (--poll-once) to start operations, wait for "
176 "those to complete and then then exit");
177 io_ctx.poll();
178 scheduler.Stop();
179 io_ctx.restart();
180 } else {
181 io_ctx.run();
182 }
183 return 0;
184}
185} // namespace daq::dpm
186
187int main(int argc, char* argv[]) {
188 try {
189 auto log_initializer = rad::LogInitializer();
190 log4cplus::Logger logger = log4cplus::Logger::getInstance(daq::dpm::LOGGER_NAME);
191 try {
193 // @todo Uses default configuration at the moment. Load from command line and
194 // configuration file and environment variables as well.
195 daq::dpm::ConfigManager cfg_mgr(logger);
196 if (!cfg_mgr.ParseArguments(argc, argv)) {
197 // --help was invoked
198 return 0;
199 }
200
201 if (!cfg_mgr.GetConfig().config_file.empty()) {
202 cfg_mgr.LoadConfig();
203 }
204 elt::log::CiiLogManager::SetApplicationName(cfg_mgr.GetConfig().name);
205 auto level = log4cplus::getLogLevelManager().toString(cfg_mgr.GetConfig().log_level);
206 auto* daq_logs = std::getenv("DAQ_LOGS");
207 elt::log::CiiLogManager::Configure(rad::GetDefaultLogProperties(
208 daq_logs == nullptr ? std::string()
209 : fmt::format("{}/{}.log", daq_logs, cfg_mgr.GetConfig().name),
210 level));
211
212 if (auto log_cfg = cfg_mgr.GetConfig().log_config; !log_cfg.empty()) {
213 auto resolved = rad::Helper::FindFile(log_cfg);
214 if (resolved.empty()) {
215 LOG4CPLUS_ERROR(logger,
216 "Configured log property file not found: '"
217 << log_cfg
218 << "', $CFGPATH=" << rad::Helper::GetEnvVar("CFGPATH"));
219 return EXIT_FAILURE;
220 }
221 elt::log::CiiLogManager::Configure(resolved);
222 } else {
223 log4cplus::Logger::getRoot().setLogLevel(cfg_mgr.GetConfig().log_level);
224 }
225
226 int rc = daq::dpm::Entrypoint(logger, cfg_mgr);
227 LOG4CPLUS_INFO(logger, "Application terminating with code " << rc);
228 return rc;
229 } catch (po::error const& e) {
232 } catch (std::exception const& ex) {
233 LOG4CPLUS_ERROR(logger, daq::error::NestedExceptionReporter(ex));
234 return error::Unknown;
235 }
236 } catch (...) {
237 std::cerr << "Unknown error during log initialization\n";
238 return error::Unknown;
239 }
240}
rad::cii::OldbType value
Definition: manager.hpp:210
Metadata const & metadata
Definition: manager.hpp:212
CII representation of real value.
Definition: manager.hpp:209
DPM Server specific configuration manager.
void LoadConfig()
Load configuration file and update configuration.
bool ParseArguments(int argc, char *argv[])
Parse configuration from command line arguments.
Configuration const & GetConfig() const
void Visit(Func &&func)
Visit all configuration parameters.
std::function< std::unique_ptr< AsyncProcessIf >(boost::asio::io_context &, std::vector< std::string > const &)> ProcFactory
Definition: scheduler.hpp:442
std::function< std::unique_ptr< RsyncAsyncProcessIf >(boost::asio::io_context &, std::string const &, std::string const &, RsyncOptions const &, RsyncAsyncProcess::DryRun)> RsyncFactory
Definition: scheduler.hpp:439
Implementation of daq::dpm::Workspace.
Definition: workspace.hpp:219
Adapter object intended to be used in contexts without direct access to the output-stream object.
Definition: report.hpp:54
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
DPM server config.
Contains support functions for daqif.
daq::dpm::DpmService implements MAL services daqif::DpmControl and daqif::DpmDaqControl.
int main(int argc, char **argv)
Definition: main.cpp:102
std::string canonical_name
Definition: manager.hpp:74
unsigned short daq
Limits how many DAQs overall can be scheduled concurrently.
Definition: scheduler.hpp:142
std::string name
Process instance name.
Definition: config.hpp:37
log4cplus::LogLevel log_level
Definition: config.hpp:68
std::string rsync_bin
rsync application name.
Definition: config.hpp:82
unsigned short net_receive
Maximum number of concurrent input transfers = 0.
Definition: scheduler.hpp:152
struct daq::dpm::SchedulerOptions::ConcurrencyLimits concurrency_limits
std::filesystem::path dataroot
Dataroot, normally this should be configured from environment variable $DATAROOT.
Definition: config.hpp:42
std::string merge_bin
Merge application name.
Definition: config.hpp:77
std::filesystem::path workspace
Workspace.
Definition: config.hpp:49
int Entrypoint(log4cplus::Logger const &logger, ConfigManager &cfg_mgr)
Definition: main.cpp:49
unsigned short net_send
Maximum number of concurrent output transfers.
Definition: scheduler.hpp:147
std::chrono::seconds db_timeout
Definition: config.hpp:67
bool no_ipc
If true (set by command line option only) it disables MAL service registration.
Definition: config.hpp:64
std::string rr_uri
Request/reply URI.
Definition: config.hpp:54
std::string ps_uri
Pub/sub URI.
Definition: config.hpp:59
unsigned short merge
Maximum number of concurrent transfers.
Definition: scheduler.hpp:157
const std::string LOGGER_NAME
Definition: config.hpp:22
std::string db_prefix
Definition: config.hpp:66
std::string log_config
Log file, defaults to nothing.
Definition: config.hpp:72
bool poll_once
Run scheduler once.
Definition: config.hpp:94
uint16_t limit_net_send
Definition: config.hpp:98
std::filesystem::path config_file
Configuration file.
Definition: config.hpp:89
uint16_t limit_net_receive
Definition: config.hpp:99
Represents active configuration.
Definition: config.hpp:33
Options for DaqController.
Definition: scheduler.hpp:164
Limited resources.
Definition: scheduler.hpp:172
Options controlling scheduler operations.
Definition: scheduler.hpp:134
void ReportNestedExceptions(std::ostream &os) noexcept
Definition: report.cpp:50
Options controlling rsync invocation.
network::uri MakeServiceUri(std::string base_uri, std::string_view service_path)
Creates a service URI of the form <baseuri>/<service>.
Definition: uri.cpp:19
Definition: main.cpp:24
@ Unknown
Definition: main.cpp:47
@ InvalidProgramArgument
Invalid program arguments.
Definition: main.cpp:30
daq::dpm::Scheduler and related class declarations.
Contains declaration for Status and ObservableStatus.
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:164
Contains URI support functions for daqif.