ifw-daq  3.0.0-pre2
IFW Data Acquisition modules
main.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup server
4  * @copyright ESO - European Southern Observatory
5  * @author
6  *
7  * @brief main source file.
8  */
9 
10 #include "actionMgr.hpp"
11 #include "actionsStd.hpp"
12 #include "dataContext.hpp"
13 #include "dbInterface.hpp"
14 #include "logger.hpp"
15 #include "ocmDaqService.hpp"
16 #include "stdCmdsImpl.hpp"
17 
18 #include <Stdif.hpp>
19 #include <events.rad.hpp>
20 
21 #include <ciiLogManager.hpp>
22 #include <rad/actionCallback.hpp>
23 #include <rad/exceptions.hpp>
24 #include <rad/logger.hpp>
25 #include <rad/mal/publisher.hpp>
26 #include <rad/mal/replier.hpp>
27 #include <rad/mal/utils.hpp>
28 #include <rad/smAdapter.hpp>
29 
30 #include <scxml4cpp/Context.h>
31 #include <scxml4cpp/EventQueue.h>
32 
33 #include <boost/asio.hpp>
34 #include <boost/exception/diagnostic_information.hpp>
35 #include <boost/filesystem.hpp>
36 #include <log4cplus/hierarchy.h>
37 
38 #include <fmt/format.h>
39 
40 #include <functional>
41 #include <memory>
42 
43 #include <daq/conversion.hpp>
44 #include <daq/daqController.hpp>
45 #include <daq/dpmClient.hpp>
46 #include <daq/error/report.hpp>
47 #include <daq/manager.hpp>
48 #include <daq/workspace.hpp>
49 #include <daqif/uri.hpp>
50 
51 class DaqService {
52 public:
53  DaqService(std::string name,
54  daq::ManagerParams manager_params,
55  std::filesystem::path const& workspace,
56  std::string const& output_path,
57  rad::IoExecutor& executor,
58  std::shared_ptr<mal::Mal> mal,
59  rad::cii::Replier& replier,
60  rad::cii::Publisher<daqif::DaqStatus>& publisher,
61  rad::SMAdapter& state_machine,
62  daq::DpmClientParams const& dpm_params)
63  : m_name(std::move(name))
64  , m_workspace(workspace)
65  , m_mal(std::move(mal))
66  , m_event_log(std::make_shared<daq::ObservableEventLog>())
67  , m_dpm_client(
68  std::make_shared<daq::DpmClientImpl>(executor.get_io_context(), *m_mal, dpm_params))
69  , m_daq_factory(executor.get_io_context(), *m_mal, m_dpm_client)
70  , m_manager(executor,
71  std::move(manager_params),
72  m_workspace,
73  m_event_log,
74  m_daq_factory,
75  log4cplus::Logger::getInstance(server::LOGGER_NAME_MANAGER))
76  , m_service(std::make_shared<OcmDaqService>(
77  executor.get_io_context(), *m_mal, m_manager, m_name, output_path, m_event_log))
78  , m_replier(replier)
79  , m_publisher(publisher)
80  , m_state_machine(state_machine)
81  , m_is_active(false) {
82  m_sample = m_publisher.CreateTopic();
83  m_connection = m_manager.GetStatusSignal().ConnectObserver(
84  [this](auto const& status) { this->DaqStatusUpdate(status); });
85 
86  // Try restoring from workspace
87  try {
88  m_manager.RestoreFromWorkspace();
89  } catch (...) {
90  daq::error::NestedExceptionReporter r(std::current_exception());
91  LOG4CPLUS_ERROR(server::GetLogger(),
92  "Failed to restore state from OCM workspace at "
93  << m_workspace.GetPath().native() << ":\n"
94  << r);
95  }
96  }
97 
98  void Register() {
99  LOG4CPLUS_DEBUG(server::GetLogger(), "RegisterService");
100  m_replier.RegisterService(m_name,
101  std::static_pointer_cast<daqif::AsyncOcmDaqControl>(m_service));
102  }
103  void Unregister() {
104  LOG4CPLUS_DEBUG(server::GetLogger(), "UnregisterService");
105  m_replier.UnregisterService(m_name);
106  }
107 
108  /**
109  * Is notified of any DAQ status change and will post events to SM on DAQ activity flank
110  * changes.
111  *
112  * @note Current implementation assumes all DAQs begin in state NotStarted. When ocm supports
113  * loading old status from persistent storage this will no longer work as number of active daqs
114  * is not starting at zero and state is not starting at NotStarted.
115  */
117  try {
118  LOG4CPLUS_DEBUG(server::GetLogger(),
119  fmt::format("Publishing DAQ status for DAQ {}", status.GetId()));
120  auto state = status.GetState();
121  if (state == daq::State::NotStarted && !m_is_active) {
122  // New data acquisition
123  m_is_active = true;
124  LOG4CPLUS_DEBUG(server::GetLogger(), "Calling ProcessEvent");
125  m_state_machine.ProcessEvent(Events::AnyDaqActive{});
126  LOG4CPLUS_DEBUG(server::GetLogger(), "Calling ProcessEvent done");
127  } else if (m_is_active && daq::IsFinalState(status.GetState())) {
128  // At least one DAQ is completed. Check if all DAQs are completed.
129  auto daqs = m_manager.GetDaqControllers();
130  auto all_final =
131  std::all_of(daqs.begin(), daqs.end(), [](auto const shrd_daq) -> bool {
132  assert(shrd_daq);
133  return daq::IsFinalState(shrd_daq->GetState());
134  });
135  if (all_final) {
136  m_is_active = false;
137  // Active -> Idle
138  LOG4CPLUS_DEBUG(server::GetLogger(), "Calling ProcessEvent");
139  m_state_machine.ProcessEvent(Events::AllDaqInactive{});
140  LOG4CPLUS_DEBUG(server::GetLogger(), "Calling ProcessEvent done");
141  }
142  }
143  // publish state
144  *m_sample << status.GetStatus();
145  m_publisher.Publish(*m_sample);
146  LOG4CPLUS_DEBUG(server::GetLogger(),
147  fmt::format("Publishing DAQ status for DAQ {} done", status.GetId()));
148  } catch (std::exception const& e) {
149  LOG4CPLUS_ERROR(server::GetLogger(),
150  fmt::format("Failed to publish status: {}", e.what()));
151  }
152  }
153 
154 private:
155  std::string m_name;
156  daq::WorkspaceImpl m_workspace;
157  std::shared_ptr<mal::Mal> m_mal;
158  std::shared_ptr<daq::ObservableEventLog> m_event_log;
159  std::shared_ptr<daq::DpmClientImpl> m_dpm_client;
160  daq::DaqControllerFactoryDefault m_daq_factory;
161  daq::ManagerImpl m_manager;
162  std::shared_ptr<OcmDaqService> m_service;
163  rad::cii::Replier& m_replier;
164  rad::cii::Publisher<daqif::DaqStatus>& m_publisher;
165  rad::SMAdapter& m_state_machine;
166  bool m_is_active;
167 
168  boost::signals2::connection m_connection;
169  std::shared_ptr<daqif::DaqStatus> m_sample;
170 };
171 
172 /**
173  * Application main.
174  *
175  * @param[in] argc Number of command line options.
176  * @param[in] argv Command line options.
177  */
178 int main(int argc, char* argv[]) {
179  namespace fs = boost::filesystem;
180 
181  auto log_initializer = rad::LogInitializer();
182  LOG4CPLUS_INFO(server::GetLogger(), "Application ocm-server started.");
183  try {
184  /* Read only configuration */
185  server::Config config;
186  if (config.ParseOptions(argc, argv) == false) {
187  // request for help
188  return EXIT_SUCCESS;
189  }
190  /*
191  * Load CII/MAL middleware here because it resets
192  * the log4cplus configuration!
193  */
194  elt::mal::CiiFactory& factory = elt::mal::CiiFactory::getInstance();
195  auto mal = elt::mal::loadMal("zpb", {});
196  factory.registerMal("zpb", mal);
197 
198  config.LoadConfig();
199 
200  elt::log::CiiLogManager::Configure(
201  rad::GetDefaultLogProperties(config.GetProcName() + ".log", config.GetLogLevel()));
202  if (auto const& file = config.GetLogProperties(); !file.empty()) {
203  auto resolved = rad::Helper::FindFile(config.GetLogProperties());
204  if (resolved.empty()) {
205  LOG4CPLUS_ERROR(server::GetLogger(),
206  "Configured log property file not found: '"
207  << file << "', $CFGPATH=" << rad::Helper::GetEnvVar("CFGPATH"));
208  return EXIT_FAILURE;
209  }
210  elt::log::CiiLogManager::Configure(resolved);
211  }
212 
213  if (config.m_out_path.empty()) {
214  auto const msg =
215  "Output path not configured. Define environment "
216  "variable $DATAROOT or configuration " +
218  LOG4CPLUS_ERROR(server::GetLogger(), msg);
219  std::cerr << msg << std::endl;
220  return -1;
221  }
222  // Create out path if necessary with permissive permissions
223  if (!fs::is_directory(config.m_out_path)) {
224  LOG4CPLUS_INFO(server::GetLogger(),
225  fmt::format("Output dir '{}' is not a directory. Creating it now",
226  config.m_out_path));
227  fs::create_directories(config.m_out_path);
228  fs::permissions(config.m_out_path, fs::others_read | fs::owner_all | fs::group_all);
229  }
230 
231  auto workspace_root = config.GetWorkspace();
232  LOG4CPLUS_INFO(server::GetLogger(),
233  fmt::format("Using workspace directory: {}", workspace_root.native()));
234 
235  /*
236  * LAN 2020-07-09 EICSSW-717
237  * Create CII/MAL replier as soon as possible to avoid problems when
238  * an exceptions is thrown from and Action/Guard.
239  */
240  rad::cii::Replier mal_replier(daqif::MakeServerUri(config.GetMsgReplierEndpoint()));
241  auto std_status_publisher = std::make_unique<rad::cii::Publisher<stdif::Status>>(
242  daqif::MakeServiceUri(config.GetPubEndpoint(), "std/status"));
243  auto daq_status_publisher = std::make_unique<rad::cii::Publisher<daqif::DaqStatus>>(
244  daqif::MakeServiceUri(config.GetPubEndpoint(), "daq/status"));
245 
246  /* Runtime DB */
247  rad::OldbAsyncWriter oldb_writer(config.GetDbTimeout(), std::chrono::milliseconds(100));
248 
249  /* Runtime data context */
250  server::DataContext data_ctx(config, oldb_writer);
251  data_ctx.UpdateDb();
252 
253  /*
254  * Create event loop
255  */
256 
257  // event loop
258  boost::asio::io_context io_ctx;
259  rad::IoExecutor executor(io_ctx);
260 
261  /*
262  * State Machine related objects
263  */
264  // SM event queue and context
265  scxml4cpp::EventQueue external_events;
266  scxml4cpp::Context state_machine_ctx;
267 
268  // State Machine facade
269  rad::SMAdapter state_machine(io_ctx, &state_machine_ctx, external_events);
270 
271  daq::ManagerParams manager_params;
272  manager_params.instrument_id = config.m_instrument_id;
273  manager_params.acquiring_stale_age = config.m_stale_acquiring;
274  manager_params.merging_stale_age = config.m_stale_merging;
275 
276  // @todo: Load DPM RR URI and timeout
277  DaqService daq_service("daq",
278  manager_params,
279  workspace_root,
280  config.m_out_path,
281  executor,
282  mal,
283  mal_replier,
284  *daq_status_publisher,
285  state_machine,
286  config.GetDpmClientParams());
287 
288  // actions and activities
289  server::ActionMgr action_mgr;
290  action_mgr.CreateActions(io_ctx, state_machine, data_ctx);
291  action_mgr.CreateActivities(state_machine, data_ctx);
292  action_mgr.AddAction(new rad::ActionCallback("OcmDaq.RegisterService",
293  [&](auto) { daq_service.Register(); }));
294  action_mgr.AddAction(new rad::ActionCallback("OcmDaq.UnregisterService",
295  [&](auto) { daq_service.Unregister(); }));
296 
297  // Load SM model
298  state_machine.Load(
299  config.GetSmScxmlFilename(), &action_mgr.GetActions(), &action_mgr.GetActivities());
300 
301  // Register handlers to reject events
302  state_machine.RegisterDefaultRequestRejectHandler<Events::Init>();
303  state_machine.RegisterDefaultRequestRejectHandler<Events::Enable>();
304  state_machine.RegisterDefaultRequestRejectHandler<Events::Disable>();
305 
306  // Register publisher to export state information
307  state_machine.SetStatusPublisher([&](std::string const& status) {
308  auto ics_status = server::MakeStatusString(state_machine.GetActiveStates())
309  .value_or("NotOperational;Undefined");
310 
311  auto sample = std_status_publisher->CreateTopic();
312  sample->setStatus(ics_status);
313  sample->setSource(config.GetProcName());
314  std_status_publisher->Publish(*sample);
315  data_ctx.GetDbInterface().SetControlState(status);
316  });
317  /*
318  * Register CII/MAL replier
319  */
320  mal_replier.RegisterService<stdif::AsyncStdCmds>(
321  "std", std::make_shared<server::StdCmdsImpl>(state_machine));
322 
323  /*
324  * Start event loop
325  */
326  state_machine.Start();
327  oldb_writer.StartWriter();
328  io_ctx.run();
329  state_machine.Stop();
330  } catch (rad::Exception& e) {
331  LOG4CPLUS_ERROR(server::GetLogger(), e.what());
332 
333  std::cout << "Exit main()\n";
334  return EXIT_FAILURE;
335  } catch (...) {
336  LOG4CPLUS_ERROR(server::GetLogger(), boost::current_exception_diagnostic_information());
337  std::cout << "Exit main()\n";
338  return EXIT_FAILURE;
339  }
340 
341  // to avoid valgrind warnings on potential memory loss
342  // google::protobuf::ShutdownProtobufLibrary();
343 
344  LOG4CPLUS_INFO(server::GetLogger(), "Application ocm-server terminated.");
345  std::cout << "Exit main()\n";
346  return EXIT_SUCCESS;
347 }
ActionMgr class header file.
ActionsStd class header file.
void Register()
Definition: main.cpp:98
void DaqStatusUpdate(daq::ObservableStatus const &status)
Is notified of any DAQ status change and will post events to SM on DAQ activity flank changes.
Definition: main.cpp:116
DaqService(std::string name, daq::ManagerParams manager_params, std::filesystem::path const &workspace, std::string const &output_path, rad::IoExecutor &executor, std::shared_ptr< mal::Mal > mal, rad::cii::Replier &replier, rad::cii::Publisher< daqif::DaqStatus > &publisher, rad::SMAdapter &state_machine, daq::DpmClientParams const &dpm_params)
Definition: main.cpp:53
void Unregister()
Definition: main.cpp:103
Default factory producing "real" implementations.
Implements daq::Manager.
Definition: manager.hpp:253
void RestoreFromWorkspace() override
Loads status and constructs DaqControllers corresponding to stored state.
Definition: manager.cpp:125
std::vector< std::shared_ptr< DaqController const > > GetDaqControllers() override
Definition: manager.cpp:533
StatusSignal & GetStatusSignal() override
Definition: manager.cpp:529
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:165
State GetState() const noexcept
Definition: status.cpp:215
Status const & GetStatus() const noexcept
Connect observer that is invoked when state is modified.
Definition: status.cpp:274
std::string const & GetId() const noexcept
Definition: status.cpp:203
boost::signals2::connection ConnectObserver(Observer o)
Definition: manager.hpp:82
Implementation of daq::Workspace.
Definition: workspace.hpp:90
auto GetPath() const -> std::filesystem::path override
Definition: workspace.hpp:109
Adapter object intended to be used in contexts without direct access to the output-stream object.
Definition: report.hpp:54
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
This class is responsible for the life-cycle management of actions and activities.
Definition: actionMgr.hpp:28
void CreateActivities(rad::SMAdapter &sm, DataContext &the_data)
Method to instantiate activity objects.
Definition: actionMgr.cpp:107
void CreateActions(boost::asio::io_service &ios, rad::SMAdapter &sm, DataContext &the_data)
Method to instantiate the action objects.
Definition: actionMgr.cpp:31
This class provide access to the command line options and the configuration parameters stored in the ...
Definition: config.hpp:60
const std::string & GetLogLevel() const
Definition: config.cpp:327
std::filesystem::path GetWorkspace() const
Definition: config.hpp:145
std::string m_out_path
Definition: config.hpp:171
const std::string & GetSmScxmlFilename() const
Definition: config.cpp:312
bool ParseOptions(int argc, char *argv[])
Disable assignment operator.
Definition: config.cpp:90
daq::DpmClientParams const & GetDpmClientParams() const
Definition: config.cpp:337
const std::string & GetMsgReplierEndpoint() const
Definition: config.cpp:289
void LoadConfig(const std::string &filename="")
This method load from a configuration file the application configuration overriding the initializatio...
Definition: config.cpp:166
std::chrono::seconds GetDbTimeout() const
Definition: config.cpp:308
std::chrono::hours m_stale_merging
Definition: config.hpp:175
std::chrono::hours m_stale_acquiring
Definition: config.hpp:174
std::string m_instrument_id
Definition: config.hpp:162
const std::string & GetLogProperties() const
Definition: config.cpp:332
const std::string & GetProcName() const
Definition: config.cpp:322
const std::string & GetPubEndpoint() const
Definition: config.cpp:294
This class provide access to the application run-time data including the in-memory DB.
Definition: dataContext.hpp:21
void UpdateDb()
Try to connect to the DB and update the application configuration.
Definition: dataContext.cpp:37
DbInterface & GetDbInterface()
Definition: dataContext.cpp:50
void SetControlState(const std::string &value)
Definition: dbInterface.cpp:35
daq::Workspace interface and implementation declaration
Contains support functions for daqif.
DataContext class header file.
DbInterface class header file.
daq::DpmClient
int main(int argc, char **argv)
Definition: main.cpp:68
Implements the MAL interface daqif::OcmDaq (async version).
Default logger name.
Declaration of daq::Manager
std::chrono::hours merging_stale_age
Age of DAQ in merging state, after which it is automatically considered abandoned and will be archive...
Definition: manager.hpp:52
std::string instrument_id
Instrument identifier.
Definition: manager.hpp:39
@ NotStarted
Initial state of data acquisition.
bool IsFinalState(State state) noexcept
Query whether state is in a final state.
Definition: state.cpp:15
std::chrono::hours acquiring_stale_age
Age of DAQ in acquiring state after which it is automatically considered abandoned and will be archiv...
Definition: manager.hpp:46
Connection parameters for DPM.
Definition: dpmClient.hpp:74
Configurations parameters directly related to manager.
Definition: manager.hpp:35
network::uri MakeServerUri(std::string uri)
Creates a server URI.
Definition: uri.cpp:13
network::uri MakeServiceUri(std::string base_uri, std::string_view service_path)
Creates a service URI of the form <baseuri>/<service>.
Definition: uri.cpp:19
std::optional< std::string > MakeStatusString(std::list< scxml4cpp::State * > &&states)
Make a status string from active states (as returned from scxml4cpp::Executor::getStatus()).
Definition: actionsStd.cpp:29
const std::string KEY_CONFIG_DATAROOT
Definition: config.hpp:28
const std::string LOGGER_NAME_MANAGER
Definition: logger.hpp:18
log4cplus::Logger & GetLogger()
Definition: logger.cpp:14
Declaration of OcmDaqService.
Contains declaration for for DaqController.
StdCmds Interface implementation header file.
Contains URI support functions for daqif.