ifw-daq 3.1.0
IFW Data Acquisition modules
Loading...
Searching...
No Matches
main.cpp
Go to the documentation of this file.
1/**
2 * @file
3 * @ingroup server
4 * @copyright ESO - European Southern Observatory
5 * @author
6 *
7 * @brief main source file.
8 */
9
10#include "actionMgr.hpp"
11#include "actionsStd.hpp"
12#include "dataContext.hpp"
13#include "dbInterface.hpp"
15#include "logger.hpp"
16#include "ocmDaqService.hpp"
17#include "stdCmdsImpl.hpp"
18
19#include <Stdif.hpp>
20#include <events.rad.hpp>
21
22#include <ciiLogManager.hpp>
23#include <rad/actionCallback.hpp>
24#include <rad/exceptions.hpp>
25#include <rad/logger.hpp>
26#include <rad/mal/publisher.hpp>
27#include <rad/mal/replier.hpp>
28#include <rad/mal/utils.hpp>
29#include <rad/smAdapter.hpp>
30
31#include <scxml4cpp/Context.h>
32#include <scxml4cpp/EventQueue.h>
33
34#include <boost/asio.hpp>
35#include <boost/exception/diagnostic_information.hpp>
36#include <boost/filesystem.hpp>
37#include <log4cplus/hierarchy.h>
38
39#include <fmt/format.h>
40
41#include <functional>
42#include <memory>
43
44#include <daq/conversion.hpp>
45#include <daq/daqController.hpp>
46#include <daq/dpmClient.hpp>
47#include <daq/error/report.hpp>
48#include <daq/fits/keyword.hpp>
49#include <daq/manager.hpp>
50#include <daq/workspace.hpp>
51#include <daqif/uri.hpp>
52
54public:
55 DaqService(std::string name,
56 daq::ManagerParams manager_params,
57 std::filesystem::path const& workspace,
58 daq::fits::KeywordFormatter const& kw_formatter,
59 std::string const& output_path,
60 rad::IoExecutor& executor,
61 std::shared_ptr<mal::Mal> mal,
62 rad::cii::Replier& replier,
63 rad::cii::Publisher<daqif::DaqStatus>& publisher,
64 rad::SMAdapter& state_machine,
65 daq::DpmClientParams const& dpm_params)
66 : m_name(std::move(name))
67 , m_workspace(workspace)
68 , m_mal(std::move(mal))
69 , m_event_log(std::make_shared<daq::ObservableEventLog>())
70 , m_dpm_client(
71 std::make_shared<daq::DpmClientImpl>(executor.get_io_context(), *m_mal, dpm_params))
72 , m_daq_factory(executor.get_io_context(), *m_mal, m_dpm_client)
73 , m_manager(executor,
74 std::move(manager_params),
75 m_workspace,
76 kw_formatter,
77 m_event_log,
78 m_daq_factory,
79 m_dpm_client,
80 log4cplus::Logger::getInstance(server::LOGGER_NAME_MANAGER))
81 , m_service(std::make_shared<OcmDaqService>(
82 executor.get_io_context(), *m_mal, m_manager, m_name, output_path, m_event_log))
83 , m_replier(replier)
84 , m_publisher(publisher)
85 , m_state_machine(state_machine)
86 , m_is_active(false) {
87 m_sample = m_publisher.CreateTopic();
88 m_connection = m_manager.GetStatusSignal().ConnectObserver(
89 [this](auto const& status) { this->DaqStatusUpdate(status); });
90
91 // Try restoring from workspace
92 try {
93 m_manager.RestoreFromWorkspace();
94 } catch (...) {
95 daq::error::NestedExceptionReporter r(std::current_exception());
96 LOG4CPLUS_ERROR(server::GetLogger(),
97 "Failed to restore state from OCM workspace at "
98 << m_workspace.GetPath().native() << ":\n"
99 << r);
100 }
101 }
102
103 void Register() {
104 LOG4CPLUS_DEBUG(server::GetLogger(), "RegisterService");
105 m_replier.RegisterService(m_name,
106 std::static_pointer_cast<daqif::AsyncOcmDaqControl>(m_service));
107 }
108 void Unregister() {
109 LOG4CPLUS_DEBUG(server::GetLogger(), "UnregisterService");
110 m_replier.UnregisterService(m_name);
111 }
112
113 /**
114 * Is notified of any DAQ status change and will post events to SM on DAQ activity flank
115 * changes.
116 *
117 * @note Current implementation assumes all DAQs begin in state NotStarted. When ocm supports
118 * loading old status from persistent storage this will no longer work as number of active daqs
119 * is not starting at zero and state is not starting at NotStarted.
120 */
122 try {
123 LOG4CPLUS_DEBUG(server::GetLogger(),
124 fmt::format("Publishing DAQ status for DAQ {}", status.GetId()));
125 auto state = status.GetState();
126 if (state == daq::State::NotStarted && !m_is_active) {
127 // New data acquisition
128 m_is_active = true;
129 LOG4CPLUS_DEBUG(server::GetLogger(), "Calling ProcessEvent");
130 m_state_machine.ProcessEvent(Events::AnyDaqActive{});
131 LOG4CPLUS_DEBUG(server::GetLogger(), "Calling ProcessEvent done");
132 } else if (m_is_active && daq::IsFinalState(status.GetState())) {
133 // At least one DAQ is completed. Check if all DAQs are completed.
134 auto daqs = m_manager.GetDaqControllers();
135 auto all_final =
136 std::all_of(daqs.begin(), daqs.end(), [](auto const shrd_daq) -> bool {
137 assert(shrd_daq);
138 return daq::IsFinalState(shrd_daq->GetState());
139 });
140 if (all_final) {
141 m_is_active = false;
142 // Active -> Idle
143 LOG4CPLUS_DEBUG(server::GetLogger(), "Calling ProcessEvent");
144 m_state_machine.ProcessEvent(Events::AllDaqInactive{});
145 LOG4CPLUS_DEBUG(server::GetLogger(), "Calling ProcessEvent done");
146 }
147 }
148 // publish state
149 *m_sample << status.GetStatus();
150 m_publisher.Publish(*m_sample);
151 LOG4CPLUS_DEBUG(server::GetLogger(),
152 fmt::format("Publishing DAQ status for DAQ {} done", status.GetId()));
153 } catch (std::exception const& e) {
154 LOG4CPLUS_ERROR(server::GetLogger(),
155 fmt::format("Failed to publish status: {}", e.what()));
156 }
157 }
158
159private:
160 std::string m_name;
161 daq::WorkspaceImpl m_workspace;
162 std::shared_ptr<mal::Mal> m_mal;
163 std::shared_ptr<daq::ObservableEventLog> m_event_log;
164 std::shared_ptr<daq::DpmClientImpl> m_dpm_client;
166 daq::ManagerImpl m_manager;
167 std::shared_ptr<OcmDaqService> m_service;
168 rad::cii::Replier& m_replier;
169 rad::cii::Publisher<daqif::DaqStatus>& m_publisher;
170 rad::SMAdapter& m_state_machine;
171 bool m_is_active;
172
173 boost::signals2::connection m_connection;
174 std::shared_ptr<daqif::DaqStatus> m_sample;
175};
176
177/**
178 * Application main.
179 *
180 * @param[in] argc Number of command line options.
181 * @param[in] argv Command line options.
182 */
183int main(int argc, char* argv[]) {
184 namespace fs = boost::filesystem;
185
186 auto log_initializer = rad::LogInitializer();
187 LOG4CPLUS_INFO(server::GetLogger(), "Application ocm-server started.");
188 try {
189 /* Read only configuration */
190 server::Config config;
191 if (config.ParseOptions(argc, argv) == false) {
192 // request for help
193 return EXIT_SUCCESS;
194 }
195 /*
196 * Load CII/MAL middleware here because it resets
197 * the log4cplus configuration!
198 */
199 elt::mal::CiiFactory& factory = elt::mal::CiiFactory::getInstance();
200 auto mal = elt::mal::loadMal("zpb", {});
201 factory.registerMal("zpb", mal);
202
203 config.LoadConfig();
204
205 elt::log::CiiLogManager::SetApplicationName(config.GetProcName());
206 auto* daq_logs = std::getenv("DAQ_LOGS");
207 elt::log::CiiLogManager::Configure(rad::GetDefaultLogProperties(
208 daq_logs == nullptr ? std::string()
209 : fmt::format("{}/{}.log", daq_logs, config.GetProcName()),
210 config.GetLogLevel()));
211 if (auto const& file = config.GetLogProperties(); !file.empty()) {
212 auto resolved = rad::Helper::FindFile(config.GetLogProperties());
213 if (resolved.empty()) {
214 LOG4CPLUS_ERROR(server::GetLogger(),
215 "Configured log property file not found: '"
216 << file << "', $CFGPATH=" << rad::Helper::GetEnvVar("CFGPATH"));
217 return EXIT_FAILURE;
218 }
219 elt::log::CiiLogManager::Configure(resolved);
220 }
221
222 if (config.m_out_path.empty()) {
223 auto const msg =
224 "Output path not configured. Define environment "
225 "variable $DATAROOT or configuration " +
227 LOG4CPLUS_ERROR(server::GetLogger(), msg);
228 std::cerr << msg << '\n';
229 return -1;
230 }
231 // Create out path if necessary with permissive permissions
232 if (!fs::is_directory(config.m_out_path)) {
233 LOG4CPLUS_INFO(server::GetLogger(),
234 fmt::format("Output dir '{}' is not a directory. Creating it now",
235 config.m_out_path));
236 fs::create_directories(config.m_out_path);
237 fs::permissions(config.m_out_path, fs::others_read | fs::owner_all | fs::group_all);
238 }
239
240 auto workspace_root = config.GetWorkspace();
241 LOG4CPLUS_INFO(server::GetLogger(),
242 fmt::format("Using workspace directory: {}", workspace_root.native()));
243
244 /*
245 * LAN 2020-07-09 EICSSW-717
246 * Create CII/MAL replier as soon as possible to avoid problems when
247 * an exceptions is thrown from and Action/Guard.
248 */
249 rad::cii::Replier mal_replier(daqif::MakeServerUri(config.GetMsgReplierEndpoint()));
250 auto std_status_publisher = std::make_unique<rad::cii::Publisher<stdif::Status>>(
251 daqif::MakeServiceUri(config.GetPubEndpoint(), "std/status"));
252 auto daq_status_publisher = std::make_unique<rad::cii::Publisher<daqif::DaqStatus>>(
253 daqif::MakeServiceUri(config.GetPubEndpoint(), "daq/status"));
254
255 /* Runtime DB */
256 rad::OldbAsyncWriter oldb_writer(config.GetDbTimeout(), std::chrono::milliseconds(100));
257
258 /* Runtime data context */
259 server::DataContext data_ctx(config, oldb_writer);
260 data_ctx.UpdateDb();
261
262 /*
263 * Create event loop
264 */
265
266 // event loop
267 boost::asio::io_context io_ctx;
268 rad::IoExecutor executor(io_ctx);
269
270 /*
271 * State Machine related objects
272 */
273 // SM event queue and context
274 scxml4cpp::EventQueue external_events;
275 scxml4cpp::Context state_machine_ctx;
276
277 // State Machine facade
278 rad::SMAdapter state_machine(io_ctx, &state_machine_ctx, external_events);
279
280 daq::ManagerParams manager_params;
281 manager_params.instrument_id = config.m_instrument_id;
282 manager_params.acquiring_stale_age = config.m_stale_acquiring;
283 manager_params.merging_stale_age = config.m_stale_merging;
284
285 std::unique_ptr<daq::fits::KeywordFormatter> kw_formatter;
286 if (auto const& dicts = config.GetDictionaryNames(); !dicts.empty()) {
287 auto ptr = std::make_unique<daq::DictKeywordFormatter>();
288 ptr->Load(dicts);
289 LOG4CPLUS_DEBUG(server::GetLogger(),
290 "Loaded the following dictionaries: " << ptr->GetDictionariesNames());
291 kw_formatter = std::move(ptr);
292 } else {
293 kw_formatter = std::make_unique<daq::fits::StandardKeywordFormatter>();
294 }
295 // @todo: Load DPM RR URI and timeout
296 DaqService daq_service("daq",
297 manager_params,
298 workspace_root,
299 *kw_formatter,
300 config.m_out_path,
301 executor,
302 mal,
303 mal_replier,
304 *daq_status_publisher,
305 state_machine,
306 config.GetDpmClientParams());
307
308 // actions and activities
309 server::ActionMgr action_mgr;
310 action_mgr.CreateActions(io_ctx, state_machine, data_ctx);
311 action_mgr.CreateActivities(state_machine, data_ctx);
312 action_mgr.AddAction(new rad::ActionCallback("OcmDaq.RegisterService",
313 [&](auto) { daq_service.Register(); }));
314 action_mgr.AddAction(new rad::ActionCallback("OcmDaq.UnregisterService",
315 [&](auto) { daq_service.Unregister(); }));
316
317 // Load SM model
318 state_machine.Load(
319 config.GetSmScxmlFilename(), &action_mgr.GetActions(), &action_mgr.GetActivities());
320
321 // Register handlers to reject events
322 state_machine.RegisterDefaultRequestRejectHandler<Events::Init>();
323 state_machine.RegisterDefaultRequestRejectHandler<Events::Enable>();
324 state_machine.RegisterDefaultRequestRejectHandler<Events::Disable>();
325
326 // Register publisher to export state information
327 state_machine.SetStatusPublisher([&](std::string const& status) {
328 auto ics_status = server::MakeStatusString(state_machine.GetActiveStates())
329 .value_or("NotOperational;Undefined");
330
331 auto sample = std_status_publisher->CreateTopic();
332 sample->setStatus(ics_status);
333 sample->setSource(config.GetProcName());
334 std_status_publisher->Publish(*sample);
335 data_ctx.GetDbInterface().SetControlState(status);
336 });
337 /*
338 * Register CII/MAL replier
339 */
340 mal_replier.RegisterService<stdif::AsyncStdCmds>(
341 "std", std::make_shared<server::StdCmdsImpl>(state_machine));
342
343 /*
344 * Start event loop
345 */
346 state_machine.Start();
347 oldb_writer.StartWriter();
348 try {
349 io_ctx.run();
350 } catch (std::exception const& e) {
351 LOG4CPLUS_ERROR(
353 "Exception from asio context: " << daq::error::NestedExceptionReporter(e));
354 }
355 state_machine.Stop();
356 } catch (std::exception const& e) {
357 LOG4CPLUS_ERROR(server::GetLogger(),
358 "Exception in application main: \n"
360
361 std::cout << "Exit main()\n";
362 return EXIT_FAILURE;
363 } catch (...) {
364 LOG4CPLUS_ERROR(server::GetLogger(),
365 "Unknown exception in application main: "
366 << boost::current_exception_diagnostic_information());
367 std::cout << "Exit main()\n";
368 return EXIT_FAILURE;
369 }
370
371 // to avoid valgrind warnings on potential memory loss
372 // google::protobuf::ShutdownProtobufLibrary();
373
374 LOG4CPLUS_INFO(server::GetLogger(), "Application ocm-server terminated.");
375 std::cout << "Exit main()\n";
376 return EXIT_SUCCESS;
377}
ActionMgr class header file.
ActionsStd class header file.
DaqService(std::string name, daq::ManagerParams manager_params, std::filesystem::path const &workspace, daq::fits::KeywordFormatter const &kw_formatter, std::string const &output_path, rad::IoExecutor &executor, std::shared_ptr< mal::Mal > mal, rad::cii::Replier &replier, rad::cii::Publisher< daqif::DaqStatus > &publisher, rad::SMAdapter &state_machine, daq::DpmClientParams const &dpm_params)
Definition: main.cpp:55
void Register()
Definition: main.cpp:103
void DaqStatusUpdate(daq::ObservableStatus const &status)
Is notified of any DAQ status change and will post events to SM on DAQ activity flank changes.
Definition: main.cpp:121
void Unregister()
Definition: main.cpp:108
Default factory producing "real" implementations.
Implements daq::Manager.
Definition: manager.hpp:264
void RestoreFromWorkspace() override
Loads status and constructs DaqControllers corresponding to stored state.
Definition: manager.cpp:139
StatusSignal & GetStatusSignal() override
Definition: manager.cpp:615
std::vector< std::shared_ptr< DaqController const > > GetDaqControllers() override
Definition: manager.cpp:619
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:224
State GetState() const noexcept
Definition: status.cpp:297
Status const & GetStatus() const noexcept
Connect observer that is invoked when state is modified.
Definition: status.cpp:371
std::string const & GetId() const noexcept
Definition: status.cpp:281
boost::signals2::connection ConnectObserver(Observer o)
Definition: manager.hpp:93
Implementation of daq::Workspace.
Definition: workspace.hpp:99
auto GetPath() const -> std::filesystem::path override
Definition: workspace.hpp:118
Adapter object intended to be used in contexts without direct access to the output-stream object.
Definition: report.hpp:54
Formats keyword against e.g.
Definition: keyword.hpp:551
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
This class is responsible for the life-cycle management of actions and activities.
Definition: actionMgr.hpp:28
void CreateActivities(rad::SMAdapter &sm, DataContext &the_data)
Method to instantiate activity objects.
Definition: actionMgr.cpp:107
void CreateActions(boost::asio::io_service &ios, rad::SMAdapter &sm, DataContext &the_data)
Method to instantiate the action objects.
Definition: actionMgr.cpp:31
This class provide access to the command line options and the configuration parameters stored in the ...
Definition: config.hpp:61
const std::string & GetLogLevel() const
Definition: config.cpp:335
std::filesystem::path GetWorkspace() const
Definition: config.hpp:151
std::string m_out_path
Definition: config.hpp:177
const std::string & GetSmScxmlFilename() const
Definition: config.cpp:320
bool ParseOptions(int argc, char *argv[])
Disable assignment operator.
Definition: config.cpp:90
daq::DpmClientParams const & GetDpmClientParams() const
Definition: config.cpp:349
const std::string & GetMsgReplierEndpoint() const
Definition: config.cpp:297
void LoadConfig(const std::string &filename="")
This method load from a configuration file the application configuration overriding the initializatio...
Definition: config.cpp:166
std::chrono::seconds GetDbTimeout() const
Definition: config.cpp:316
std::chrono::hours m_stale_merging
Definition: config.hpp:181
std::chrono::hours m_stale_acquiring
Definition: config.hpp:180
std::string m_instrument_id
Definition: config.hpp:168
const std::string & GetLogProperties() const
Definition: config.cpp:340
const std::string & GetProcName() const
Definition: config.cpp:330
std::vector< std::string > const & GetDictionaryNames() const
Definition: config.cpp:345
const std::string & GetPubEndpoint() const
Definition: config.cpp:302
This class provide access to the application run-time data including the in-memory DB.
Definition: dataContext.hpp:21
void UpdateDb()
Try to connect to the DB and update the application configuration.
Definition: dataContext.cpp:37
DbInterface & GetDbInterface()
Definition: dataContext.cpp:50
void SetControlState(const std::string &value)
Definition: dbInterface.cpp:35
daq::Workspace interface and implementation declaration
Contains support functions for daqif.
DataContext class header file.
DbInterface class header file.
daq::DpmClient
int main(int argc, char **argv)
Definition: main.cpp:102
Implements the MAL interface daqif::OcmDaq (async version).
Contains data structure for FITS keywords.
Default logger name.
Declaration of daq::Manager
std::chrono::hours merging_stale_age
Age of DAQ in merging state, after which it is automatically considered abandoned and will be archive...
Definition: manager.hpp:63
std::string instrument_id
Instrument identifier.
Definition: manager.hpp:50
@ NotStarted
Initial state of data acquisition.
bool IsFinalState(State state) noexcept
Query whether state is in a final state.
Definition: state.cpp:15
std::chrono::hours acquiring_stale_age
Age of DAQ in acquiring state after which it is automatically considered abandoned and will be archiv...
Definition: manager.hpp:57
Connection parameters for DPM.
Definition: dpmClient.hpp:93
Configurations parameters directly related to manager.
Definition: manager.hpp:46
network::uri MakeServerUri(std::string uri)
Creates a server URI.
Definition: uri.cpp:13
network::uri MakeServiceUri(std::string base_uri, std::string_view service_path)
Creates a service URI of the form <baseuri>/<service>.
Definition: uri.cpp:19
std::optional< std::string > MakeStatusString(std::list< scxml4cpp::State * > &&states)
Make a status string from active states (as returned from scxml4cpp::Executor::getStatus()).
Definition: actionsStd.cpp:29
const std::string KEY_CONFIG_DATAROOT
Definition: config.hpp:28
log4cplus::Logger & GetLogger()
Definition: logger.cpp:14
Declaration of OcmDaqService.
Contains declaration for for DaqController.
StdCmds Interface implementation header file.
Contains URI support functions for daqif.