ifw-daq  3.0.1
IFW Data Acquisition modules
main.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_dpm_server
4  * @copyright (c) Copyright ESO 2022
5  * All Rights Reserved
6  * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7  *
8  * @brief daqDpmServer entrypoint.
9  */
10 #include <daq/config.hpp>
11 #include <iostream>
12 
13 #include <boost/asio.hpp>
14 #include <boost/program_options.hpp>
15 #include <ciiLogManager.hpp>
16 #include <log4cplus/configurator.h>
17 #include <log4cplus/logger.h>
18 #include <log4cplus/loggingmacros.h>
19 #include <rad/ioExecutor.hpp>
20 #include <rad/logger.hpp>
21 #include <rad/mal/publisher.hpp>
22 #include <rad/mal/replier.hpp>
23 #include <rad/oldbAsyncWriter.hpp>
24 
25 #include <daq/conversion.hpp>
26 #include <daq/dpm/dpmService.hpp>
27 #include <daq/dpm/scheduler.hpp>
28 #include <daq/error/report.hpp>
29 #include <daq/status.hpp>
30 
31 #include <daqif/uri.hpp>
32 
33 #include "configManager.hpp"
34 
35 namespace error {
36 /**
37  * Error codes
38  */
39 enum {
40  /** Invalid program arguments */
42  Unknown = 255,
43 };
44 } // namespace error
45 
46 namespace po = boost::program_options;
47 namespace daq::dpm {
48 
49 int Entrypoint(log4cplus::Logger const& logger, ConfigManager& cfg_mgr) {
50  boost::asio::io_context io_ctx;
51  rad::IoExecutor exec(io_ctx);
52  Configuration const& config = cfg_mgr.GetConfig();
53 
54  auto ws_path = config.workspace;
55  if (ws_path.is_relative()) {
56  ws_path = config.dataroot / ws_path;
57  }
58 
59  // Construction also initializes/verifies workspace and may throw.
60  WorkspaceImpl workspace(ws_path);
61 
62  // Scheduler
63  SchedulerOptions scheduler_options;
64  scheduler_options.concurrency_limits.daq = config.limit_daq;
65  scheduler_options.concurrency_limits.merge = config.limit_merge;
66  scheduler_options.concurrency_limits.net_send = config.limit_net_send;
67  scheduler_options.concurrency_limits.net_receive = config.limit_net_receive;
68 
69  DaqControllerOptions daq_controller_options;
70  daq_controller_options.merge_bin = config.merge_bin;
71  daq_controller_options.rsync_bin = config.rsync_bin;
72 
73  DaqControllerImpl::RsyncFactory rsync_factory =
74  [](boost::asio::io_context& io_ctx,
75  std::string source, // source
76  std::string dest, // dest
77  RsyncOptions const& opts,
78  RsyncAsyncProcess::DryRun dry_run) -> std::unique_ptr<RsyncAsyncProcessIf> {
79  return std::make_unique<RsyncAsyncProcess>(
80  io_ctx, std::move(source), std::move(dest), opts, dry_run);
81  };
82 
83  DaqControllerImpl::ProcFactory proc_factory = [](boost::asio::io_context& io_ctx,
84  std::vector<std::string> args) {
85  return std::make_unique<AsyncProcess>(io_ctx, std::move(args));
86  };
87 
88  SchedulerImpl scheduler(
89  exec,
90  workspace,
91  [&](std::unique_ptr<DaqWorkspace> workspace,
92  Resources& resources) -> std::unique_ptr<DaqController> {
93  return std::make_unique<DaqControllerImpl>(exec,
94  std::move(workspace),
95  resources,
96  rsync_factory,
97  proc_factory,
98  daq_controller_options);
99  },
100  scheduler_options);
101 
102  // MAL variables that have an empty value if --no-ipc is used.
103  std::optional<rad::cii::Replier> replier;
104  std::shared_ptr<elt::mal::Mal> mal;
105  std::unique_ptr<rad::cii::Publisher<daqif::DaqStatus>> daq_status_publisher;
106  std::optional<rad::OldbAsyncWriter> oldb_writer;
107  if (!config.no_ipc) {
108  mal = elt::mal::loadMal("zpb", {});
109  auto dpm_service = std::make_shared<DpmService>(exec, *mal, workspace, scheduler);
110  /*
111  * Load CII/MAL middleware here because it resets
112  * the log4cplus configuration!
113  */
114  elt::mal::CiiFactory& factory = elt::mal::CiiFactory::getInstance();
115  factory.registerMal("zpb", mal);
116 
117  /*
118  * Register CII/MAL replier
119  */
120  replier.emplace(elt::mal::Uri(config.rr_uri));
121  replier->RegisterService<daqif::AsyncDpmControl>("dpm", dpm_service);
122  replier->RegisterService<daqif::AsyncDpmDaqControl>("daq", dpm_service);
123  daq_status_publisher = std::make_unique<rad::cii::Publisher<daqif::DaqStatus>>(
124  daqif::MakeServiceUri(config.ps_uri, "daq/status"));
125  {
126  auto sample = daq_status_publisher->CreateTopic();
127  scheduler.ConnectStatus([&, sample = std::move(sample)](Status const& status) {
128  try {
129  LOG4CPLUS_TRACE(logger, "Publishing DAQ status update: " << status);
130  *sample << status;
131  daq_status_publisher->Publish(*sample);
132  } catch (std::exception const& e) {
133  LOG4CPLUS_ERROR(logger, fmt::format("Failed to publish status: {}", e.what()));
134  }
135  });
136  }
137  // Write configuration
138  {
139  oldb_writer.emplace(config.db_timeout, std::chrono::milliseconds(250));
140  auto prefix = config.db_prefix;
141  if (prefix.empty()) {
142  throw std::invalid_argument("OLDB prefix is invalid (empty)");
143  }
144  if (prefix.back() != '/') {
145  prefix.push_back('/');
146  }
147 
148  cfg_mgr.Visit([&](daq::config::Manager::CiiValue const& param) {
149  oldb_writer->Set(prefix + param.metadata.canonical_name, param.value);
150  });
151  oldb_writer->StartWriter();
152  }
153  } else {
154  LOG4CPLUS_INFO(logger,
155  "Note: Running in standalone mode without interprocess communication."
156  " Stop process using CTRL-C/SIGINT");
157  }
158 
159  // Run until requested to exit
160  LOG4CPLUS_INFO(logger, fmt::format("Application daqDpmServer started as '{}'", config.name));
161  scheduler.Start();
162  if (config.poll_once) {
163  // First poll may initiate async activities, after which we Stop scheduler and let it run to
164  // completion.
165  LOG4CPLUS_INFO(logger,
166  "Note: Polling once due to (--poll-once) to start operations, wait for "
167  "those to complete and then then exit");
168  io_ctx.poll();
169  scheduler.Stop();
170  io_ctx.restart();
171  } else {
172  io_ctx.run();
173  }
174  return 0;
175 }
176 } // namespace daq::dpm
177 
178 int main(int argc, char* argv[]) {
179  try {
180  auto log_initializer = rad::LogInitializer();
181  log4cplus::Logger logger = log4cplus::Logger::getInstance(daq::dpm::LOGGER_NAME);
182  try {
184  // @todo Uses default configuration at the moment. Load from command line and
185  // configuration file and environment variables as well.
186  daq::dpm::ConfigManager cfg_mgr(logger);
187  if (!cfg_mgr.ParseArguments(argc, argv)) {
188  // --help was invoked
189  return 0;
190  }
191 
192  if (!cfg_mgr.GetConfig().config_file.empty()) {
193  cfg_mgr.LoadConfig();
194  }
195  auto level = log4cplus::getLogLevelManager().toString(cfg_mgr.GetConfig().log_level);
196  elt::log::CiiLogManager::Configure(
197  rad::GetDefaultLogProperties(cfg_mgr.GetConfig().name + ".log", level));
198 
199  if (auto log_cfg = cfg_mgr.GetConfig().log_config; !log_cfg.empty()) {
200  auto resolved = rad::Helper::FindFile(log_cfg);
201  if (resolved.empty()) {
202  LOG4CPLUS_ERROR(logger,
203  "Configured log property file not found: '"
204  << log_cfg
205  << "', $CFGPATH=" << rad::Helper::GetEnvVar("CFGPATH"));
206  return EXIT_FAILURE;
207  }
208  elt::log::CiiLogManager::Configure(resolved);
209  } else {
210  log4cplus::Logger::getRoot().setLogLevel(cfg_mgr.GetConfig().log_level);
211  }
212 
213  int rc = daq::dpm::Entrypoint(logger, cfg_mgr);
214  LOG4CPLUS_INFO(logger, "Application terminating with code " << rc);
215  return rc;
216  } catch (po::error const& e) {
219  } catch (std::exception const& ex) {
220  LOG4CPLUS_ERROR(logger, daq::error::NestedExceptionReporter(ex));
221  return error::Unknown;
222  }
223  } catch (...) {
224  std::cerr << "Unknown error during log initialization\n";
225  return error::Unknown;
226  }
227 }
rad::cii::OldbType value
Definition: manager.hpp:197
Metadata const & metadata
Definition: manager.hpp:199
CII representation of real value.
Definition: manager.hpp:196
DPM Server specific configuration manager.
Configuration const & GetConfig() const
void LoadConfig()
Load configuration file and update configuration.
bool ParseArguments(int argc, char *argv[])
Parse configuration from command line arguments.
void Visit(Func &&func)
Visit all configuration parameters.
std::function< std::unique_ptr< AsyncProcessIf >(boost::asio::io_context &, std::vector< std::string > const &)> ProcFactory
Definition: scheduler.hpp:443
std::function< std::unique_ptr< RsyncAsyncProcessIf >(boost::asio::io_context &, std::string const &, std::string const &, RsyncOptions const &, RsyncAsyncProcess::DryRun)> RsyncFactory
Definition: scheduler.hpp:440
Implementation of daq::dpm::Workspace.
Definition: workspace.hpp:215
Adapter object intended to be used in contexts without direct access to the output-stream object.
Definition: report.hpp:54
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
DPM server config.
Contains support functions for daqif.
daq::dpm::DpmService implements MAL services daqif::DpmControl and daqif::DpmDaqControl.
int main(int argc, char **argv)
Definition: main.cpp:68
std::string canonical_name
Definition: manager.hpp:71
unsigned short daq
Limits how many DAQs overall can be scheduled concurrently.
Definition: scheduler.hpp:141
std::string name
Process instance name.
Definition: config.hpp:37
log4cplus::LogLevel log_level
Definition: config.hpp:68
std::string rsync_bin
rsync application name.
Definition: config.hpp:82
unsigned short net_receive
Maximum number of concurrent input transfers = 0.
Definition: scheduler.hpp:151
struct daq::dpm::SchedulerOptions::ConcurrencyLimits concurrency_limits
std::filesystem::path dataroot
Dataroot, normally this should be configured from environment variable $DATAROOT.
Definition: config.hpp:42
std::string merge_bin
Merge application name.
Definition: config.hpp:77
std::filesystem::path workspace
Workspace.
Definition: config.hpp:49
int Entrypoint(log4cplus::Logger const &logger, ConfigManager &cfg_mgr)
Definition: main.cpp:49
unsigned short net_send
Maximum number of concurrent output transfers.
Definition: scheduler.hpp:146
std::chrono::seconds db_timeout
Definition: config.hpp:67
bool no_ipc
If true (set by command line option only) it disables MAL service registration.
Definition: config.hpp:64
std::string rr_uri
Request/reply URI.
Definition: config.hpp:54
std::string ps_uri
Pub/sub URI.
Definition: config.hpp:59
unsigned short merge
Maximum number of concurrent transfers.
Definition: scheduler.hpp:156
const std::string LOGGER_NAME
Definition: config.hpp:22
std::string db_prefix
Definition: config.hpp:66
std::string log_config
Log file, defaults to nothing.
Definition: config.hpp:72
bool poll_once
Run scheduler once.
Definition: config.hpp:94
uint16_t limit_net_send
Definition: config.hpp:98
std::filesystem::path config_file
Configuration file.
Definition: config.hpp:89
uint16_t limit_net_receive
Definition: config.hpp:99
Represents active configuration.
Definition: config.hpp:33
Options for DaqController.
Definition: scheduler.hpp:163
Limited resources.
Definition: scheduler.hpp:171
Options controlling scheduler operations.
Definition: scheduler.hpp:133
void ReportNestedExceptions(std::ostream &os) noexcept
Definition: report.cpp:50
Options controlling rsync invocation.
network::uri MakeServiceUri(std::string base_uri, std::string_view service_path)
Creates a service URI of the form <baseuri>/<service>.
Definition: uri.cpp:19
Definition: main.cpp:23
@ Unknown
Definition: main.cpp:46
@ InvalidProgramArgument
Invalid program arguments.
Definition: main.cpp:29
daq::dpm::Scheduler and related class declarations.
Contains declaration for Status and ObservableStatus.
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:153
Contains URI support functions for daqif.