13#include <boost/asio.hpp>
14#include <boost/program_options.hpp>
15#include <ciiLogManager.hpp>
16#include <log4cplus/configurator.h>
17#include <log4cplus/logger.h>
18#include <log4cplus/loggingmacros.h>
20#include <rad/logger.hpp>
21#include <rad/mal/publisher.hpp>
22#include <rad/mal/replier.hpp>
23#include <rad/oldbAsyncWriter.hpp>
46namespace po = boost::program_options;
50 boost::asio::io_context io_ctx;
55 if (ws_path.is_relative()) {
74 [](boost::asio::io_context& io_ctx,
79 return std::make_unique<RsyncAsyncProcess>(
80 io_ctx, std::move(source), std::move(dest), opts, dry_run);
84 std::vector<std::string> args) {
85 return std::make_unique<AsyncProcess>(io_ctx, std::move(args));
91 [&](std::unique_ptr<DaqWorkspace> workspace,
92 Resources& resources) -> std::unique_ptr<DaqController> {
93 return std::make_unique<DaqControllerImpl>(exec,
98 daq_controller_options);
103 std::optional<rad::cii::Replier> replier;
104 std::shared_ptr<elt::mal::Mal>
mal;
105 std::unique_ptr<rad::cii::Publisher<daqif::DaqStatus>> daq_status_publisher;
106 std::unique_ptr<rad::cii::Publisher<daqif::InternalDaqStatus>> daq_dpm_status_publisher;
107 std::optional<rad::OldbAsyncWriter> oldb_writer;
109 mal = elt::mal::loadMal(
"zpb", {});
110 auto dpm_service = std::make_shared<DpmService>(exec, *
mal, workspace, scheduler);
115 elt::mal::CiiFactory& factory = elt::mal::CiiFactory::getInstance();
116 factory.registerMal(
"zpb",
mal);
121 replier.emplace(elt::mal::Uri(config.
rr_uri));
122 replier->RegisterService<daqif::AsyncDpmControl>(
"dpm", dpm_service);
123 replier->RegisterService<daqif::AsyncDpmDaqControl>(
"daq", dpm_service);
124 daq_status_publisher = std::make_unique<rad::cii::Publisher<daqif::DaqStatus>>(
126 daq_dpm_status_publisher = std::make_unique<rad::cii::Publisher<daqif::InternalDaqStatus>>(
129 auto daq_status_sample = daq_status_publisher->CreateTopic();
130 auto daq_dpm_status_sample = daq_dpm_status_publisher->CreateTopic();
131 scheduler.ConnectStatus([&,
132 daq_status_sample = std::move(daq_status_sample),
133 daq_dpm_status_sample =
134 std::move(daq_dpm_status_sample)](
Status const& status) {
136 LOG4CPLUS_TRACE(logger,
"Publishing DAQ status update: " << status);
137 *daq_status_sample << status;
138 daq_status_publisher->Publish(*daq_status_sample);
139 *daq_dpm_status_sample << status;
140 daq_dpm_status_publisher->Publish(*daq_dpm_status_sample);
141 }
catch (std::exception
const& e) {
142 LOG4CPLUS_ERROR(logger, fmt::format(
"Failed to publish status: {}", e.what()));
148 oldb_writer.emplace(config.
db_timeout, std::chrono::milliseconds(250));
150 if (prefix.empty()) {
151 throw std::invalid_argument(
"OLDB prefix is invalid (empty)");
153 if (prefix.back() !=
'/') {
154 prefix.push_back(
'/');
160 oldb_writer->StartWriter();
163 LOG4CPLUS_INFO(logger,
164 "Note: Running in standalone mode without interprocess communication."
165 " Stop process using CTRL-C/SIGINT");
169 LOG4CPLUS_INFO(logger, fmt::format(
"Application daqDpmServer started as '{}'", config.
name));
174 LOG4CPLUS_INFO(logger,
175 "Note: Polling once due to (--poll-once) to start operations, wait for "
176 "those to complete and then then exit");
187int main(
int argc,
char* argv[]) {
189 auto log_initializer = rad::LogInitializer();
204 elt::log::CiiLogManager::SetApplicationName(cfg_mgr.
GetConfig().
name);
206 auto* daq_logs = std::getenv(
"DAQ_LOGS");
207 elt::log::CiiLogManager::Configure(rad::GetDefaultLogProperties(
208 daq_logs ==
nullptr ? std::string()
209 : fmt::format(
"{}/{}.log", daq_logs, cfg_mgr.
GetConfig().
name),
213 auto resolved = rad::Helper::FindFile(log_cfg);
214 if (resolved.empty()) {
215 LOG4CPLUS_ERROR(logger,
216 "Configured log property file not found: '"
218 <<
"', $CFGPATH=" << rad::Helper::GetEnvVar(
"CFGPATH"));
221 elt::log::CiiLogManager::Configure(resolved);
227 LOG4CPLUS_INFO(logger,
"Application terminating with code " << rc);
229 }
catch (po::error
const& e) {
232 }
catch (std::exception
const& ex) {
237 std::cerr <<
"Unknown error during log initialization\n";
Metadata const & metadata
CII representation of real value.
DPM Server specific configuration manager.
void LoadConfig()
Load configuration file and update configuration.
bool ParseArguments(int argc, char *argv[])
Parse configuration from command line arguments.
Configuration const & GetConfig() const
void Visit(Func &&func)
Visit all configuration parameters.
std::function< std::unique_ptr< AsyncProcessIf >(boost::asio::io_context &, std::vector< std::string > const &)> ProcFactory
std::function< std::unique_ptr< RsyncAsyncProcessIf >(boost::asio::io_context &, std::string const &, std::string const &, RsyncOptions const &, RsyncAsyncProcess::DryRun)> RsyncFactory
Implementation of daq::dpm::Workspace.
Adapter object intended to be used in contexts without direct access to the output-stream object.
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Contains support functions for daqif.
daq::dpm::DpmService implements MAL services daqif::DpmControl and daqif::DpmDaqControl.
int main(int argc, char **argv)
std::string canonical_name
unsigned short daq
Limits how many DAQs overall can be scheduled concurrently.
std::string name
Process instance name.
log4cplus::LogLevel log_level
std::string rsync_bin
rsync application name.
unsigned short net_receive
Maximum number of concurrent input transfers = 0.
struct daq::dpm::SchedulerOptions::ConcurrencyLimits concurrency_limits
std::filesystem::path dataroot
Dataroot, normally this should be configured from environment variable $DATAROOT.
std::string merge_bin
Merge application name.
std::filesystem::path workspace
Workspace.
int Entrypoint(log4cplus::Logger const &logger, ConfigManager &cfg_mgr)
unsigned short net_send
Maximum number of concurrent output transfers.
std::chrono::seconds db_timeout
bool no_ipc
If true (set by command line option only) it disables MAL service registration.
std::string rr_uri
Request/reply URI.
std::string ps_uri
Pub/sub URI.
unsigned short merge
Maximum number of concurrent transfers.
const std::string LOGGER_NAME
std::string log_config
Log file, defaults to nothing.
bool poll_once
Run scheduler once.
std::filesystem::path config_file
Configuration file.
uint16_t limit_net_receive
Represents active configuration.
Options for DaqController.
Options controlling scheduler operations.
void ReportNestedExceptions(std::ostream &os) noexcept
Options controlling rsync invocation.
network::uri MakeServiceUri(std::string base_uri, std::string_view service_path)
Creates a service URI of the form <baseuri>/<service>.
@ InvalidProgramArgument
Invalid program arguments.
daq::dpm::Scheduler and related class declarations.
Contains declaration for Status and ObservableStatus.
Non observable status object that keeps stores status of data acquisition.
Contains URI support functions for daqif.